39 Commits

Author SHA1 Message Date
mr 3a66b42c01 test 2026-06-23 09:40:33 +02:00
mr 58e97fbe74 lib 2026-06-22 07:50:01 +02:00
mr 1425a31494 Orga + Consent 2026-06-05 15:56:50 +02:00
mr 6ee169f444 can delete 2026-06-05 09:59:14 +02:00
mr 5be3c0a10a wf 2026-06-04 13:36:55 +02:00
mr d9723e6431 oc lib test 2026-06-04 12:04:04 +02:00
mr c726361deb Get Exploit 2026-06-04 11:31:03 +02:00
mr d19ff1f8b2 CreatorID 2026-06-04 09:00:31 +02:00
mr 69244163b4 workflow bson missing 2026-06-03 15:33:39 +02:00
mr 842364d145 deep merge 2026-06-03 11:23:41 +02:00
mr 9ab374b720 oc nano 2026-06-03 08:03:11 +02:00
mr aa2bca48ef test 2026-06-02 16:43:55 +02:00
mr 322ea38bb4 nil map 2026-06-02 15:57:33 +02:00
mr c1490a7746 proper 2026-06-02 15:49:03 +02:00
mr 49f60d9416 test 2026-06-02 15:34:54 +02:00
mr 548ed84b13 prospect 2026-06-02 15:11:58 +02:00
mr 178cd48314 prospect 2026-06-02 14:45:09 +02:00
mr b31df8cfed adjust allowed instances for type behaviors 2026-06-02 14:16:32 +02:00
mr a0a53f0477 Panic recovered FiltersFromFlatMap 2026-06-02 14:04:33 +02:00
mr dffaa6326f still prospecting 2026-06-02 14:03:30 +02:00
mr 8155f4b17a prospect 2026-06-02 13:47:41 +02:00
mr 51307bb067 trace debug dynamic 2026-06-02 13:33:47 +02:00
mr 6ac788a8ff test 2026-06-02 12:48:33 +02:00
mr a7d0c1208b full filter interpretation 2026-06-02 11:35:19 +02:00
mr 3924fca289 Kick name malformed 2026-06-02 10:50:42 +02:00
mr 797df972ac plantuml duplication behavior 2026-06-02 08:42:20 +02:00
mr 71ae0d2cfc inout change vars regime 2026-06-01 16:45:05 +02:00
mr 5806bdd3d2 Correct link 2026-06-01 15:01:45 +02:00
mr 99fbe82a51 Update Auto Outputs on sourced. 2026-06-01 08:45:50 +02:00
mr 7d8bec9a78 Container can be sourced 2026-06-01 08:26:48 +02:00
mr afd8a2d97c conditionnal is_draft 2026-05-29 14:31:44 +02:00
mr 82a4708f46 Is draft 2026-05-29 14:12:40 +02:00
mr a3bca24982 isdraft pb 2026-05-29 13:43:41 +02:00
mr 41706949fd isDraft Update dafuck 2026-05-29 12:51:04 +02:00
mr b1429596bb not proper enum compararison 2026-05-29 10:38:45 +02:00
mr ce110ee634 inspect comparision 2026-05-29 10:22:07 +02:00
mr 7e5b69b1d2 Live resource failed 2026-05-29 09:12:52 +02:00
mr 26948da3c1 relation peer mismatch 2026-05-28 16:29:36 +02:00
mr 4e1b1164cc relation mismatched 2026-05-28 16:28:51 +02:00
31 changed files with 1760 additions and 166 deletions
+53 -14
View File
@@ -3,6 +3,7 @@ package dbs
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"regexp"
"runtime/debug" "runtime/debug"
"strings" "strings"
@@ -160,11 +161,16 @@ type Filter struct {
// Keys inside "and"/"or" are json tag names; the function resolves each to its // Keys inside "and"/"or" are json tag names; the function resolves each to its
// full dotted BSON path using the target struct. Unknown keys are kept as-is. // full dotted BSON path using the target struct. Unknown keys are kept as-is.
func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *Filters { func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *Filters {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Panic recovered FiltersFromFlatMap: %v\n", r)
}
}()
filters := &Filters{ filters := &Filters{
And: make(map[string][]Filter), And: make(map[string][]Filter),
Or: make(map[string][]Filter), Or: make(map[string][]Filter),
} }
paths := jsonToBsonPaths(reflect.TypeOf(target), "") paths := jsonToBsonPaths(reflect.TypeOf(target), "", "")
resolve := func(jsonKey string) string { resolve := func(jsonKey string) string {
if p, ok := paths[jsonKey]; ok { if p, ok := paths[jsonKey]; ok {
return p return p
@@ -179,11 +185,12 @@ func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *Fil
} }
for jsonKey, val := range m { for jsonKey, val := range m {
bsonKey := resolve(jsonKey) bsonKey := resolve(jsonKey)
items, ok := val.([]interface{}) //items, ok := val.([]interface{})
fmt.Println(jsonKey, val, bsonKey)
if !ok { if !ok {
continue continue
} }
for _, item := range items { for _, item := range val.([]interface{}) {
entry, ok := item.(map[string]interface{}) entry, ok := item.(map[string]interface{})
if !ok { if !ok {
continue continue
@@ -214,11 +221,22 @@ func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *Fil
// //
// Anonymous embedded fields without any tag follow the BSON convention of this // Anonymous embedded fields without any tag follow the BSON convention of this
// codebase: they are stored as a nested sub-document whose key is the lowercased // codebase: they are stored as a nested sub-document whose key is the lowercased
// struct type name (e.g. utils.AbstractObject → "abstractobject"). // struct type name (e.g. utils.AbstractObject → "abstractobject"). Their JSON
func jsonToBsonPaths(t reflect.Type, prefix string) map[string]string { // fields are promoted (flat), so bsonPrefix advances but jsonPrefix does not.
//
// For fields inside slices or maps, both the leaf json name and the full dotted
// json path (e.g. "instances.access_protocol") are registered as keys so callers
// can use either form unambiguously.
func jsonToBsonPaths(t reflect.Type, bsonPrefix string, jsonPrefix string) map[string]string {
for t.Kind() == reflect.Ptr || t.Kind() == reflect.Slice { for t.Kind() == reflect.Ptr || t.Kind() == reflect.Slice {
t = t.Elem() t = t.Elem()
} }
if t.Kind() == reflect.Map {
t = t.Elem()
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
}
result := make(map[string]string) result := make(map[string]string)
if t.Kind() != reflect.Struct { if t.Kind() != reflect.Struct {
return result return result
@@ -232,17 +250,21 @@ func jsonToBsonPaths(t reflect.Type, prefix string) map[string]string {
bsonName := strings.Split(bsonTag, ",")[0] bsonName := strings.Split(bsonTag, ",")[0]
// Anonymous embedded struct with no tags: use lowercase type name as BSON prefix. // Anonymous embedded struct with no tags: use lowercase type name as BSON prefix.
// JSON fields are promoted so jsonPrefix stays the same.
if field.Anonymous && jsonName == "" && bsonName == "" { if field.Anonymous && jsonName == "" && bsonName == "" {
ft := field.Type ft := field.Type
for ft.Kind() == reflect.Ptr { for ft.Kind() == reflect.Ptr {
ft = ft.Elem() ft = ft.Elem()
} }
if ft.Kind() == reflect.Struct { if ft.Kind() == reflect.Struct {
embedPrefix := strings.ToLower(ft.Name()) embedBsonPrefix := strings.ToLower(ft.Name())
if prefix != "" { re := regexp.MustCompile(`\[[^\]]*\]`)
embedPrefix = prefix + "." + embedPrefix embedBsonPrefix = re.ReplaceAllString(embedBsonPrefix, "")
embedBsonPrefix = strings.ReplaceAll(embedBsonPrefix, "*", "")
if bsonPrefix != "" {
embedBsonPrefix = bsonPrefix + "." + embedBsonPrefix
} }
for k, v := range jsonToBsonPaths(ft, embedPrefix) { for k, v := range jsonToBsonPaths(ft, embedBsonPrefix, jsonPrefix) {
if _, exists := result[k]; !exists { if _, exists := result[k]; !exists {
result[k] = v result[k] = v
} }
@@ -258,19 +280,36 @@ func jsonToBsonPaths(t reflect.Type, prefix string) map[string]string {
bsonName = jsonName bsonName = jsonName
} }
fullPath := bsonName fullBsonPath := bsonName
if prefix != "" { if bsonPrefix != "" {
fullPath = prefix + "." + bsonName fullBsonPath = bsonPrefix + "." + bsonName
}
fullJsonPath := jsonName
if jsonPrefix != "" {
fullJsonPath = jsonPrefix + "." + jsonName
} }
result[jsonName] = fullPath result[jsonName] = fullBsonPath
// Also register the full dotted JSON path so callers can use
// "instances.access_protocol" instead of just "access_protocol".
if fullJsonPath != jsonName {
if _, exists := result[fullJsonPath]; !exists {
result[fullJsonPath] = fullBsonPath
}
}
ft := field.Type ft := field.Type
for ft.Kind() == reflect.Ptr || ft.Kind() == reflect.Slice { for ft.Kind() == reflect.Ptr || ft.Kind() == reflect.Slice {
ft = ft.Elem() ft = ft.Elem()
} }
if ft.Kind() == reflect.Map {
ft = ft.Elem()
for ft.Kind() == reflect.Ptr {
ft = ft.Elem()
}
}
if ft.Kind() == reflect.Struct { if ft.Kind() == reflect.Struct {
for k, v := range jsonToBsonPaths(ft, fullPath) { for k, v := range jsonToBsonPaths(ft, fullBsonPath, fullJsonPath) {
if _, exists := result[k]; !exists { if _, exists := result[k]; !exists {
result[k] = v result[k] = v
} }
+1
View File
@@ -77,6 +77,7 @@ const (
REFUND = tools.REFUND REFUND = tools.REFUND
DISCOUNT = tools.DISCOUNT DISCOUNT = tools.DISCOUNT
SUBSCRIPTION = tools.SUBSCRIPTION SUBSCRIPTION = tools.SUBSCRIPTION
POLICY = tools.POLICY
) )
func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *dbs.Filters { func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *dbs.Filters {
+2 -3
View File
@@ -10,14 +10,13 @@ import (
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
) )
/* /*
* Booking is a struct that represents a booking * Booking is a struct that represents a booking
*/ */
type Booking struct { type Booking struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
FromNano string `json:"from_nano,omitempty" bson:"priced_item,omitempty"` FromNano string `json:"from_nano,omitempty" bson:"from_nano,omitempty"`
PricedItem map[string]interface{} `json:"priced_item,omitempty" bson:"priced_item,omitempty"` // We need to add the validate:"required" tag once the pricing feature is implemented, removed to avoid handling the error PricedItem map[string]interface{} `json:"priced_item,omitempty" bson:"priced_item,omitempty"` // We need to add the validate:"required" tag once the pricing feature is implemented, removed to avoid handling the error
ResumeMetrics map[string]map[string]models.MetricResume `json:"resume_metrics,omitempty" bson:"resume_metrics,omitempty"` ResumeMetrics map[string]map[string]models.MetricResume `json:"resume_metrics,omitempty" bson:"resume_metrics,omitempty"`
@@ -147,5 +146,5 @@ func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
} }
func (r *Booking) CanDelete() bool { func (r *Booking) CanDelete() bool {
return r.IsDraft // only draft bookings can be deleted return true // only draft bookings can be deleted
} }
+10
View File
@@ -1,5 +1,7 @@
package enum package enum
import "fmt"
type InfrastructureType int type InfrastructureType int
const ( const (
@@ -18,3 +20,11 @@ func (t InfrastructureType) String() string {
func InfrastructureList() []InfrastructureType { func InfrastructureList() []InfrastructureType {
return []InfrastructureType{DOCKER, KUBERNETES, SLURM, HW, CONDOR} return []InfrastructureType{DOCKER, KUBERNETES, SLURM, HW, CONDOR}
} }
func (d InfrastructureType) Compare(indexStr interface{}) bool {
return fmt.Sprintf("%v", indexStr) == fmt.Sprintf("%v", d.EnumIndex()) || fmt.Sprintf("%v", indexStr) == d.String()
}
func (d InfrastructureType) EnumIndex() int {
return int(d)
}
+10
View File
@@ -1,5 +1,7 @@
package enum package enum
import "fmt"
type StorageSize int type StorageSize int
// StorageType - Enum that defines the type of storage // StorageType - Enum that defines the type of storage
@@ -54,3 +56,11 @@ func (t StorageType) String() string {
func TypeList() []StorageType { func TypeList() []StorageType {
return []StorageType{FILE, STREAM, API, DATABASE, S3, MEMORY, HARDWARE, AZURE, GCS} return []StorageType{FILE, STREAM, API, DATABASE, S3, MEMORY, HARDWARE, AZURE, GCS}
} }
func (d StorageType) Compare(indexStr interface{}) bool {
return fmt.Sprintf("%v", indexStr) == fmt.Sprintf("%v", d.EnumIndex()) || fmt.Sprintf("%v", indexStr) == d.String()
}
func (d StorageType) EnumIndex() int {
return int(d)
}
+59 -4
View File
@@ -1,18 +1,73 @@
package models package models
import "sort"
// SortedArgValues returns arg Values: readonly args first (sorted by Index), then non-readonly in order.
func SortedArgValues(args []Arg) []string {
var ro, nro []Arg
for _, a := range args {
if a.IsReadonly {
ro = append(ro, a)
} else {
nro = append(nro, a)
}
}
sort.Slice(ro, func(i, j int) bool { return ro[i].Index < ro[j].Index })
out := make([]string, 0, len(args))
for _, a := range ro {
out = append(out, a.Value)
}
for _, a := range nro {
out = append(out, a.Value)
}
return out
}
// ReadonlyArgValues returns only the readonly arg values sorted by Index.
func ReadonlyArgValues(args []Arg) []string {
var ro []Arg
for _, a := range args {
if a.IsReadonly {
ro = append(ro, a)
}
}
sort.Slice(ro, func(i, j int) bool { return ro[i].Index < ro[j].Index })
out := make([]string, 0, len(ro))
for _, a := range ro {
out = append(out, a.Value)
}
return out
}
// NonReadonlyArgValues returns only the non-readonly arg values in their order.
func NonReadonlyArgValues(args []Arg) []string {
out := make([]string, 0)
for _, a := range args {
if !a.IsReadonly {
out = append(out, a.Value)
}
}
return out
}
type Arg struct {
Value string `json:"value,omitempty" bson:"value,omitempty"` // Image is the container image TEMPO
Index int `json:"index,omitempty" bson:"index,omitempty"`
IsReadonly bool `json:"is_readonly,omitempty" bson:"is_readonly,omitempty"`
}
type PathSource struct { type PathSource struct {
Source string `json:"source,omitempty" bson:"source,omitempty"` // Image is the container image TEMPO Source string `json:"source,omitempty" bson:"source,omitempty"` // Image is the container image TEMPO
IsReachable bool `json:"is_reachable,omitempty" bson:"is_reachable,omitempty"` IsReachable bool `json:"is_reachable,omitempty" bson:"is_reachable,omitempty"`
Args string `json:"args,omitempty" bson:"args,omitempty"` // Args is the container arguments Args []Arg `json:"args,omitempty" bson:"args,omitempty"` // Args is the container arguments
Volumes map[string]string `json:"volumes,omitempty" bson:"volumes,omitempty"` // Volumes is the container volumes Volumes map[string]string `json:"volumes,omitempty" bson:"volumes,omitempty"` // Volumes is the container volumes
} }
type Container struct { type Container struct {
PathSource
Image string `json:"image,omitempty" bson:"image,omitempty"` // Image is the container image TEMPO Image string `json:"image,omitempty" bson:"image,omitempty"` // Image is the container image TEMPO
Command string `json:"command,omitempty" bson:"command,omitempty"` // Command is the container command Command string `json:"command,omitempty" bson:"command,omitempty"` // Command is the container command
Args string `json:"args,omitempty" bson:"args,omitempty"` // Args is the container arguments
Env map[string]string `json:"env,omitempty" bson:"env,omitempty"` // Env is the container environment variables
Volumes map[string]string `json:"volumes,omitempty" bson:"volumes,omitempty"` // Volumes is the container volumes
} }
type Expose struct { type Expose struct {
+1 -1
View File
@@ -12,7 +12,7 @@ type Param struct {
Value string `json:"value,omitempty" bson:"value,omitempty"` Value string `json:"value,omitempty" bson:"value,omitempty"`
Origin string `json:"origin,omitempty" bson:"origin,omitempty"` Origin string `json:"origin,omitempty" bson:"origin,omitempty"`
Readonly bool `json:"readonly" bson:"readonly" default:"true"` Readonly bool `json:"readonly" bson:"readonly" default:"true"`
Optionnal bool `json:"optionnal" bson:"optionnal" default:"true"` Required bool `json:"required" bson:"required" default:"true"`
} }
type InOutputs struct { type InOutputs struct {
+4 -1
View File
@@ -1,6 +1,8 @@
package live package live
import ( import (
"fmt"
"cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/models" "cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
@@ -36,7 +38,8 @@ type LiveDatacenter struct {
} }
func (r *LiveDatacenter) IsCompatible(service map[string]interface{}) bool { func (r *LiveDatacenter) IsCompatible(service map[string]interface{}) bool {
return service["infrastructure"] == r.Infrastructure && service["architecture"] == r.Architecture fmt.Println("COMPARE <", r.Infrastructure.Compare(service["infrastructure"]), "> AND <", service["architecture"], "> <", r.Architecture, ">")
return r.Infrastructure.Compare(service["infrastructure"]) && service["architecture"] == r.Architecture
} }
func (d *LiveDatacenter) GetAccessor(request *tools.APIRequest) utils.Accessor { func (d *LiveDatacenter) GetAccessor(request *tools.APIRequest) utils.Accessor {
+5 -1
View File
@@ -1,6 +1,8 @@
package live package live
import ( import (
"fmt"
"cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
@@ -38,5 +40,7 @@ func (d *LiveService) GetAccessor(request *tools.APIRequest) utils.Accessor {
} }
func (r *LiveService) IsCompatible(service map[string]interface{}) bool { func (r *LiveService) IsCompatible(service map[string]interface{}) bool {
return service["infrastructure"] == r.Infrastructure fmt.Println("COMPARE <", service["infrastructure"], "> <", r.Infrastructure, ">")
return r.Infrastructure.Compare(service["infrastructure"])
} }
+5 -1
View File
@@ -1,6 +1,8 @@
package live package live
import ( import (
"fmt"
"cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
@@ -26,7 +28,9 @@ type LiveStorage struct {
} }
func (r *LiveStorage) IsCompatible(service map[string]interface{}) bool { func (r *LiveStorage) IsCompatible(service map[string]interface{}) bool {
return service["storage_type"] == r.StorageType fmt.Println("COMPARE <", r.StorageType.Compare(service["storage_type"]), ">")
return r.StorageType.Compare(service["storage_type"])
} }
func (d *LiveStorage) GetAccessor(request *tools.APIRequest) utils.Accessor { func (d *LiveStorage) GetAccessor(request *tools.APIRequest) utils.Accessor {
+2
View File
@@ -11,6 +11,7 @@ import (
"cloud.o-forge.io/core/oc-lib/models/execution_verification" "cloud.o-forge.io/core/oc-lib/models/execution_verification"
"cloud.o-forge.io/core/oc-lib/models/live" "cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/order" "cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/peer/policy"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource" "cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
@@ -58,6 +59,7 @@ var ModelsCatalog = map[string]func() utils.DBObject{
tools.SUBSCRIPTION.String(): func() utils.DBObject { return &subscription.Subscription{} }, tools.SUBSCRIPTION.String(): func() utils.DBObject { return &subscription.Subscription{} },
tools.EXECUTION_VERIFICATION.String(): func() utils.DBObject { return &execution_verification.ExecutionVerification{} }, tools.EXECUTION_VERIFICATION.String(): func() utils.DBObject { return &execution_verification.ExecutionVerification{} },
tools.ALLOWED_IMAGE.String(): func() utils.DBObject { return &allowed_image.AllowedImage{} }, tools.ALLOWED_IMAGE.String(): func() utils.DBObject { return &allowed_image.AllowedImage{} },
tools.POLICY.String(): func() utils.DBObject { return &policy.Policy{} },
} }
// Model returns the model object based on the model type // Model returns the model object based on the model type
+11
View File
@@ -0,0 +1,11 @@
package organization
// Organization holds descriptive data about a peer's organization.
// It is optional — a peer without an organization has a nil Organization field.
type Organization struct {
Name string `json:"name,omitempty" bson:"name,omitempty"`
Description string `json:"description,omitempty" bson:"description,omitempty"`
Website string `json:"website,omitempty" bson:"website,omitempty"`
Sector string `json:"sector,omitempty" bson:"sector,omitempty"`
Country string `json:"country,omitempty" bson:"country,omitempty"`
}
+32 -1
View File
@@ -5,6 +5,7 @@ import (
"strings" "strings"
"time" "time"
"cloud.o-forge.io/core/oc-lib/models/organization"
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
"github.com/biter777/countries" "github.com/biter777/countries"
@@ -30,12 +31,23 @@ const (
NANO NANO
PENDING_NANO PENDING_NANO
PENDING_MASTER PENDING_MASTER
ORGANIZATION_MASTER
ORGANIZATION_MEMBER
ORGANIZATION_PARTNER
ORGANIZATION_MASTER_PENDING
ORGANIZATION_MEMBER_PENDING
) )
var path = []string{"known", "self", "partner", "blacklist", "partner", "pending_partner", "master", "nano", "pending_nano", "pending_master"} var path = []string{
"known", "self", "partner", "blacklist", "pending_partner",
"master", "nano", "pending_nano", "pending_master",
"organization_master", "organization_member", "organization_partner",
"organization_master_pending", "organization_member_pending",
}
func GetRelationPath(str string) int { func GetRelationPath(str string) int {
for i, p := range path { for i, p := range path {
fmt.Println("GetRelationPath", i, p)
if str == p { if str == p {
return i return i
} }
@@ -120,6 +132,20 @@ type Peer struct {
// When oc-discovery fails to reach a NANO, it routes the booking to MasterID instead. // When oc-discovery fails to reach a NANO, it routes the booking to MasterID instead.
MasterID string `json:"master_id,omitempty" bson:"master_id,omitempty"` MasterID string `json:"master_id,omitempty" bson:"master_id,omitempty"`
// OrganizationMasterID is the MongoDB _id of the peer acting as this node's
// organization master. Set automatically when an ORGANIZATION_MASTER relation
// is validated (equivalent of MasterID for the Nano/Master hierarchy).
OrganizationMasterID string `json:"organization_master_id,omitempty" bson:"organization_master_id,omitempty"`
// Organization holds optional descriptive data about the peer's organization.
// Null when the peer has not registered any organization data.
Organization *organization.Organization `json:"organization,omitempty" bson:"organization,omitempty"`
// PolicyID references the Policy document that governs which inbound
// libp2p streams are authorized for this peer.
// When empty, all non-vital streams are denied by default.
PolicyID string `json:"policy_id,omitempty" bson:"policy_id,omitempty"`
// Volatile connectivity state — never persisted to DB (bson:"-"). // Volatile connectivity state — never persisted to DB (bson:"-").
// Set in-memory by oc-peer when it receives a PEER_OBSERVE_RESPONSE_EVENT. // Set in-memory by oc-peer when it receives a PEER_OBSERVE_RESPONSE_EVENT.
// Considered offline when LastHeartbeat is older than 60 s (30 s interval + 30 s grace). // Considered offline when LastHeartbeat is older than 60 s (30 s interval + 30 s grace).
@@ -136,6 +162,11 @@ func (ri *Peer) Extend(typ ...string) map[string][]tools.DataType {
ext[t] = []tools.DataType{} ext[t] = []tools.DataType{}
} }
ext[t] = append(ext[t], tools.PEER) ext[t] = append(ext[t], tools.PEER)
case "policy":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.POLICY)
} }
} }
return ext return ext
+30
View File
@@ -0,0 +1,30 @@
package policy
import (
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
// Policy defines which inbound libp2p streams are authorized for a peer.
// Vital streams (planner, considers, minio/admiralty config, source-presign,
// verify, observe, heartbeat) are always allowed regardless of policy.
type Policy struct {
utils.AbstractObject
// Resource CRUD
AllowSearch bool `json:"allow_search" bson:"allow_search"`
AllowCreate bool `json:"allow_create" bson:"allow_create"`
AllowUpdate bool `json:"allow_update" bson:"allow_update"`
AllowDelete bool `json:"allow_delete" bson:"allow_delete"`
// Resource freshness tracking
AllowRegisterWatcher bool `json:"allow_register_watcher" bson:"allow_register_watcher"`
AllowUnregisterWatcher bool `json:"allow_unregister_watcher" bson:"allow_unregister_watcher"`
// Organization partner confirmation
AllowOrgPartnerConfirm bool `json:"allow_org_partner_confirm" bson:"allow_org_partner_confirm"`
}
func (p *Policy) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request)
}
@@ -0,0 +1,31 @@
package policy
import (
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
type policyMongoAccessor struct {
utils.AbstractAccessor[*Policy]
}
func NewAccessor(request *tools.APIRequest) *policyMongoAccessor {
return &policyMongoAccessor{
AbstractAccessor: utils.AbstractAccessor[*Policy]{
Logger: logs.CreateLogger(tools.POLICY.String()),
Request: request,
Type: tools.POLICY,
New: func() *Policy { return &Policy{} },
},
}
}
func (a *policyMongoAccessor) GetObjectFilters(search string) *dbs.Filters {
return &dbs.Filters{
Or: map[string][]dbs.Filter{
"abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
},
}
}
+14
View File
@@ -0,0 +1,14 @@
package resources
// Consent represents a consent request attached to a resource.
// ConsentString is the question displayed to the user.
// Optional, when true, means the user may decline without blocking scheduling.
// A nil Optional is treated as required (false).
type Consent struct {
ConsentString string `json:"consent_string" bson:"consent_string"`
Optional *bool `json:"optional,omitempty" bson:"optional,omitempty"`
}
func (c Consent) IsOptional() bool {
return c.Optional != nil && *c.Optional
}
+225 -18
View File
@@ -21,7 +21,7 @@ import (
type DynamicResource struct { type DynamicResource struct {
AbstractResource AbstractResource
Type tools.DataType `bson:"type,omitempty" json:"type,omitempty"` Type tools.DataType `bson:"type,omitempty" json:"type,omitempty"`
Filters map[string]interface{} `bson:"filters,omitempty" json:"filters,omitempty"` Filters dbs.Filters `bson:"filters,omitempty" json:"filters,omitempty"`
SortRules map[string]string `bson:"rules,omitempty" json:"rules,omitempty"` SortRules map[string]string `bson:"rules,omitempty" json:"rules,omitempty"`
PeerIds map[int]string `bson:"peer_ids,omitempty" json:"peer_ids,omitempty"` PeerIds map[int]string `bson:"peer_ids,omitempty" json:"peer_ids,omitempty"`
@@ -37,43 +37,248 @@ type DynamicResource struct {
WatchedDynamicResource []string `bson:"watched_dynamic_resource,omitempty" json:"watched_dynamic_resource,omitempty"` WatchedDynamicResource []string `bson:"watched_dynamic_resource,omitempty" json:"watched_dynamic_resource,omitempty"`
} }
// WorkspaceCandidatesProvider can be set by the workspace package to supply
// contextual workspace resources for a given DataType and request without
// creating a circular import (workspace → resources → workspace).
// When set, SetAllowedInstances uses workspace-scoped resources instead of
// the full catalog for requests that carry a username.
var WorkspaceCandidatesProvider func(dt tools.DataType, request *tools.APIRequest) []ResourceInterface
func (d *DynamicResource) GetAccessor(request *tools.APIRequest) utils.Accessor { func (d *DynamicResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
return nil return nil
} }
func (d *DynamicResource) SetAllowedInstances(request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF { func (d *DynamicResource) SetAllowedInstances(request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF {
d.Instances = []ResourceInstanceITF{} if WorkspaceCandidatesProvider != nil {
for k, v := range map[tools.DataType]ResourceInterface{ candidates := WorkspaceCandidatesProvider(d.Type, request)
tools.COMPUTE_RESOURCE: &ComputeResource{}, return d.SetAllowedInstancesFromSet(candidates, request, instance_id...)
tools.DATA_RESOURCE: &DataResource{},
tools.STORAGE_RESOURCE: &StorageResource{},
tools.PROCESSING_RESOURCE: &ProcessingResource{},
tools.WORKFLOW_RESOURCE: &WorkflowResource{}} {
if d.Type != k {
continue
} }
access := NewAccessor[*DynamicResource](k, request) d.Instances = []ResourceInstanceITF{}
a, _, _ := access.Search(dbs.FiltersFromFlatMap(d.Filters, v), "", false, 0, 100000) d.sortAndResetInstances()
return d.Instances
}
// SetAllowedInstancesFromSet fills d.Instances from a pre-loaded workspace resource set
// instead of querying the catalog. Filters are applied in-memory against the candidates.
// Called by WorkspaceResourceSet.Fill so dynamic resources only see workspace-scoped resources.
func (d *DynamicResource) SetAllowedInstancesFromSet(candidates []ResourceInterface, request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF {
d.Instances = []ResourceInstanceITF{}
d.PeerIds = map[int]string{} d.PeerIds = map[int]string{}
d.ResourceIds = map[int]string{} d.ResourceIds = map[int]string{}
for _, res := range a { for _, res := range candidates {
for _, i := range res.(ResourceInterface).SetAllowedInstances(request, instance_id...) { if !d.matchesFilters(res) {
continue
}
for _, i := range res.SetAllowedInstances(request, instance_id...) {
d.PeerIds[len(d.Instances)] = res.GetCreatorID() d.PeerIds[len(d.Instances)] = res.GetCreatorID()
d.ResourceIds[len(d.Instances)] = res.GetID() d.ResourceIds[len(d.Instances)] = res.GetID()
d.Instances = append(d.Instances, i) d.Instances = append(d.Instances, i)
} }
} }
break d.sortAndResetInstances()
return d.Instances
} }
func (d *DynamicResource) sortAndResetInstances() {
if d.SortRules != nil {
sorted := make([]ResourceInstanceITF, len(d.Instances)) sorted := make([]ResourceInstanceITF, len(d.Instances))
copy(sorted, d.Instances) copy(sorted, d.Instances)
slices.SortStableFunc(sorted, func(a, b ResourceInstanceITF) int { slices.SortStableFunc(sorted, func(a, b ResourceInstanceITF) int {
d.SortRules["partnerships"] = "%v not contains 2" d.SortRules["partnerships"] = "%v not contains 2"
return d.compareByRules(a, b, d.SortRules) return d.compareByRules(a, b, d.SortRules)
}) })
d.WatchedDynamicResource = []string{} d.Instances = sorted
return d.Instances
} }
d.WatchedDynamicResource = []string{}
}
// matchesFilters applies d.Filters in-memory against a serialized resource.
// Keys in d.Filters are JSON tag names; Serialize returns JSON tag names — no bson conversion needed.
func (d *DynamicResource) matchesFilters(res ResourceInterface) bool {
if len(d.Filters.And) == 0 && len(d.Filters.Or) == 0 {
return true
}
m := res.Serialize(res)
for field, fs := range d.Filters.And {
vals := nestedVals(m, strings.Split(field, "."))
for _, f := range fs {
if !anyMatchesOp(vals, f) {
return false
}
}
}
if len(d.Filters.Or) > 0 {
matched := false
for field, fs := range d.Filters.Or {
vals := nestedVals(m, strings.Split(field, "."))
for _, f := range fs {
if anyMatchesOp(vals, f) {
matched = true
break
}
}
if matched {
break
}
}
if !matched {
return false
}
}
return true
}
// nestedVals navigates a dot-path into m and collects all leaf values.
// Arrays at any level are expanded: each element is recursed into.
func nestedVals(m map[string]interface{}, path []string) []interface{} {
if len(path) == 0 || m == nil {
return nil
}
val, ok := m[path[0]]
if !ok {
return nil
}
if len(path) == 1 {
if arr, ok := val.([]interface{}); ok {
return arr
}
return []interface{}{val}
}
rest := path[1:]
switch v := val.(type) {
case map[string]interface{}:
return nestedVals(v, rest)
case []interface{}:
var out []interface{}
for _, elem := range v {
if em, ok := elem.(map[string]interface{}); ok {
out = append(out, nestedVals(em, rest)...)
}
}
return out
}
return nil
}
// anyMatchesOp returns true if at least one value in vals satisfies filter f.
func anyMatchesOp(vals []interface{}, f dbs.Filter) bool {
if f.Operator == dbs.EXISTS.String() {
exists := len(vals) > 0 && vals[0] != nil
want := true
if b, ok := f.Value.(bool); ok {
want = b
}
return exists == want
}
if f.Operator == dbs.IN.String() {
list, ok := f.Value.([]interface{})
if !ok {
return false
}
for _, v := range vals {
sv := fmt.Sprintf("%v", v)
for _, item := range list {
if sv == fmt.Sprintf("%v", item) {
return true
}
}
}
return false
}
for _, v := range vals {
if opMatches(v, f) {
return true
}
}
return false
}
func opMatches(val interface{}, f dbs.Filter) bool {
switch f.Operator {
case dbs.EQUAL.String():
return fmt.Sprintf("%v", val) == fmt.Sprintf("%v", f.Value)
case dbs.NOT.String():
return fmt.Sprintf("%v", val) != fmt.Sprintf("%v", f.Value)
case dbs.LIKE.String():
return strings.Contains(strings.ToLower(fmt.Sprintf("%v", val)), strings.ToLower(fmt.Sprintf("%v", f.Value)))
case dbs.GT.String(), dbs.GTE.String(), dbs.LT.String(), dbs.LTE.String():
return numericCmp(val, f.Value, f.Operator)
case dbs.ELEMMATCH.String():
arr, ok := val.([]interface{})
if !ok {
return false
}
sub, ok := f.Value.(map[string]interface{})
if !ok {
return false
}
for _, elem := range arr {
em, ok := elem.(map[string]interface{})
if !ok {
continue
}
allOk := true
for k, sv := range sub {
if fmt.Sprintf("%v", em[k]) != fmt.Sprintf("%v", sv) {
allOk = false
break
}
}
if allOk {
return true
}
}
return false
}
return false
}
func numericCmp(a, b interface{}, op string) bool {
fa, aOk := toFloat64(a)
fb, bOk := toFloat64(b)
if !aOk || !bOk {
sa, sb := fmt.Sprintf("%v", a), fmt.Sprintf("%v", b)
switch op {
case dbs.GT.String():
return sa > sb
case dbs.GTE.String():
return sa >= sb
case dbs.LT.String():
return sa < sb
case dbs.LTE.String():
return sa <= sb
}
return false
}
switch op {
case dbs.GT.String():
return fa > fb
case dbs.GTE.String():
return fa >= fb
case dbs.LT.String():
return fa < fb
case dbs.LTE.String():
return fa <= fb
}
return false
}
func toFloat64(v interface{}) (float64, bool) {
switch n := v.(type) {
case float64:
return n, true
case float32:
return float64(n), true
case int:
return float64(n), true
case int32:
return float64(n), true
case int64:
return float64(n), true
}
return 0, false
}
func (d *DynamicResource) AddInstances(instance ResourceInstanceITF) { func (d *DynamicResource) AddInstances(instance ResourceInstanceITF) {
d.Instances = append(d.Instances, instance) d.Instances = append(d.Instances, instance)
@@ -91,13 +296,15 @@ func (d *DynamicResource) GetSelectedInstance(index *int) ResourceInstanceITF {
d.SelectedIndex = i d.SelectedIndex = i
for i := range inst.GetPartnerships() { for i := range inst.GetPartnerships() {
fmt.Println(inst.GetProfile(d.PeerIds[i], &i, &d.SelectedBuyingStrategy, &d.SelectedPricingStrategy), d.PeerIds[i], &i, &d.SelectedBuyingStrategy, &d.SelectedPricingStrategy)
if inst.GetProfile(d.PeerIds[i], &i, &d.SelectedBuyingStrategy, &d.SelectedPricingStrategy) != nil { if inst.GetProfile(d.PeerIds[i], &i, &d.SelectedBuyingStrategy, &d.SelectedPricingStrategy) != nil {
d.SelectedPartnershipIndex = &i d.SelectedPartnershipIndex = &i
break break
} }
} }
if d.SelectedPartnershipIndex == nil { if d.SelectedPartnershipIndex == nil {
continue i := 0
d.SelectedPartnershipIndex = &i
} }
return inst return inst
} }
+2
View File
@@ -29,6 +29,8 @@ type ResourceInterface interface {
GetEnv() []models.Param GetEnv() []models.Param
GetInputs() []models.Param GetInputs() []models.Param
GetOutputs() []models.Param GetOutputs() []models.Param
GetExploitationAuthorizations() []ExploitationAuthorization
GetConsents() []Consent
} }
type ResourceInstanceITF interface { type ResourceInstanceITF interface {
+125 -1
View File
@@ -15,7 +15,8 @@ type ResourceSet struct {
Services []string `bson:"services,omitempty" json:"services,omitempty"` Services []string `bson:"services,omitempty" json:"services,omitempty"`
Dynamics []string `bson:"dynamics,omitempty" json:"dynamics,omitempty"` Dynamics []string `bson:"dynamics,omitempty" json:"dynamics,omitempty"`
// DynamicResources are stored inline — no DB collection, resolved at runtime via SetAllowedInstances. // Runtime-only resource objects — not persisted. Populated by Fill() from the ID lists above.
// Use WorkspaceResourceSet when full object persistence is needed (workspace fluid catalog).
DynamicResources []*DynamicResource `bson:"-" json:"dynamic_resources,omitempty"` DynamicResources []*DynamicResource `bson:"-" json:"dynamic_resources,omitempty"`
DataResources []*DataResource `bson:"-" json:"data_resources,omitempty"` DataResources []*DataResource `bson:"-" json:"data_resources,omitempty"`
StorageResources []*StorageResource `bson:"-" json:"storage_resources,omitempty"` StorageResources []*StorageResource `bson:"-" json:"storage_resources,omitempty"`
@@ -26,6 +27,129 @@ type ResourceSet struct {
ServiceResources []*ServiceResource `bson:"-" json:"service_resources,omitempty"` ServiceResources []*ServiceResource `bson:"-" json:"service_resources,omitempty"`
} }
// WorkspaceResourceSet mirrors ResourceSet but persists complete resource objects to MongoDB.
// Use this in workspace documents where the workspace acts as a fluid resource catalog.
// The *Resource fields are loaded from bson on read; Fill() skips catalog lookup when they are
// already populated.
type WorkspaceResourceSet struct {
Datas []string `bson:"datas,omitempty" json:"datas,omitempty"`
Storages []string `bson:"storages,omitempty" json:"storages,omitempty"`
Processings []string `bson:"processings,omitempty" json:"processings,omitempty"`
Computes []string `bson:"computes,omitempty" json:"computes,omitempty"`
Workflows []string `bson:"workflows,omitempty" json:"workflows,omitempty"`
NativeTool []string `bson:"native,omitempty" json:"native,omitempty"`
Services []string `bson:"services,omitempty" json:"services,omitempty"`
Dynamics []string `bson:"dynamics,omitempty" json:"dynamics,omitempty"`
DynamicResources []*DynamicResource `bson:"dynamic_resources,omitempty" json:"dynamic_resources,omitempty"`
DataResources []*DataResource `bson:"data_resources,omitempty" json:"data_resources,omitempty"`
StorageResources []*StorageResource `bson:"storage_resources,omitempty" json:"storage_resources,omitempty"`
ProcessingResources []*ProcessingResource `bson:"processing_resources,omitempty" json:"processing_resources,omitempty"`
ComputeResources []*ComputeResource `bson:"compute_resources,omitempty" json:"compute_resources,omitempty"`
WorkflowResources []*WorkflowResource `bson:"workflow_resources,omitempty" json:"workflow_resources,omitempty"`
NativeTools []*NativeTool `bson:"native_tools,omitempty" json:"native_tools,omitempty"`
ServiceResources []*ServiceResource `bson:"service_resources,omitempty" json:"service_resources,omitempty"`
}
func (r *WorkspaceResourceSet) Clear() {
r.DataResources = nil
r.StorageResources = nil
r.ProcessingResources = nil
r.ComputeResources = nil
r.WorkflowResources = nil
r.ServiceResources = nil
r.DynamicResources = nil
r.NativeTools = nil
}
// Fill populates *Resource fields from their ID lists. When a field is already non-nil
// (loaded from the workspace MongoDB document), the catalog lookup is skipped for that type.
func (r *WorkspaceResourceSet) Fill(request *tools.APIRequest) {
if r.DataResources == nil {
for _, id := range r.Datas {
if d, _, e := (&DataResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.DataResources = append(r.DataResources, d.(*DataResource))
}
}
}
if r.ComputeResources == nil {
for _, id := range r.Computes {
if d, _, e := (&ComputeResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.ComputeResources = append(r.ComputeResources, d.(*ComputeResource))
}
}
}
if r.StorageResources == nil {
for _, id := range r.Storages {
if d, _, e := (&StorageResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.StorageResources = append(r.StorageResources, d.(*StorageResource))
}
}
}
if r.ProcessingResources == nil {
for _, id := range r.Processings {
if d, _, e := (&ProcessingResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.ProcessingResources = append(r.ProcessingResources, d.(*ProcessingResource))
}
}
}
if r.WorkflowResources == nil {
for _, id := range r.Workflows {
if d, _, e := (&WorkflowResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.WorkflowResources = append(r.WorkflowResources, d.(*WorkflowResource))
}
}
}
if r.ServiceResources == nil {
for _, id := range r.Services {
if d, _, e := (&ServiceResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.ServiceResources = append(r.ServiceResources, d.(*ServiceResource))
}
}
}
if r.DynamicResources == nil {
for _, id := range r.Dynamics {
if d, _, e := (&DynamicResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.DynamicResources = append(r.DynamicResources, d.(*DynamicResource))
}
}
}
for _, d := range r.DynamicResources {
var candidates []ResourceInterface
switch d.Type {
case tools.COMPUTE_RESOURCE:
for _, c := range r.ComputeResources {
candidates = append(candidates, c)
}
case tools.DATA_RESOURCE:
for _, c := range r.DataResources {
candidates = append(candidates, c)
}
case tools.STORAGE_RESOURCE:
for _, c := range r.StorageResources {
candidates = append(candidates, c)
}
case tools.PROCESSING_RESOURCE:
for _, c := range r.ProcessingResources {
candidates = append(candidates, c)
}
case tools.WORKFLOW_RESOURCE:
for _, c := range r.WorkflowResources {
candidates = append(candidates, c)
}
case tools.SERVICE_RESOURCE:
for _, c := range r.ServiceResources {
candidates = append(candidates, c)
}
}
if len(candidates) > 0 {
d.SetAllowedInstancesFromSet(candidates, request)
} else {
d.SetAllowedInstances(request)
}
}
}
func (r *ResourceSet) Clear() { func (r *ResourceSet) Clear() {
r.DataResources = nil r.DataResources = nil
r.StorageResources = nil r.StorageResources = nil
+12 -1
View File
@@ -3,6 +3,7 @@ package resources
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"slices" "slices"
"time" "time"
@@ -49,6 +50,10 @@ type AbstractResource struct {
// NOT in a separate collection. // NOT in a separate collection.
// Visibility-filtered per requesting peer before any response is sent. // Visibility-filtered per requesting peer before any response is sent.
ExploitationAuthorizations []ExploitationAuthorization `json:"exploitation_authorizations,omitempty" bson:"exploitation_authorizations,omitempty"` ExploitationAuthorizations []ExploitationAuthorization `json:"exploitation_authorizations,omitempty" bson:"exploitation_authorizations,omitempty"`
// Consents lists the consent questions the user must acknowledge before
// scheduling this resource. Consents with Optional=true may be skipped.
Consents []Consent `json:"consents,omitempty" bson:"consents,omitempty"`
} }
func (ri *AbstractResource) Extend(typ ...string) map[string][]tools.DataType { func (ri *AbstractResource) Extend(typ ...string) map[string][]tools.DataType {
@@ -99,6 +104,11 @@ func (r *AbstractResource) GetExploitationAuthorizations() []ExploitationAuthori
return r.ExploitationAuthorizations return r.ExploitationAuthorizations
} }
// GetConsents returns the consent questions declared by this resource.
func (r *AbstractResource) GetConsents() []Consent {
return r.Consents
}
// FilterExploitationAuthorizations removes AEs that are not visible to peerID. // FilterExploitationAuthorizations removes AEs that are not visible to peerID.
// Must be called before serializing the resource for a consumer peer. // Must be called before serializing the resource for a consumer peer.
// The resource owner (CreatorID) always sees all AEs unfiltered. // The resource owner (CreatorID) always sees all AEs unfiltered.
@@ -144,7 +154,8 @@ func (r *AbstractResource) StoreDraftDefault() {
} }
func (r *AbstractResource) CanUpdate(set utils.DBObject) (bool, utils.DBObject) { func (r *AbstractResource) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
return r.IsDraft, set fmt.Println("IsDrafted", r.IsDraft, set.IsDrafted())
return r.IsDraft || set.IsDrafted(), set
} }
type AbstractInstanciatedResource[T ResourceInstanceITF] struct { type AbstractInstanciatedResource[T ResourceInstanceITF] struct {
+201 -2
View File
@@ -1,12 +1,15 @@
package resources package resources
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"slices" "slices"
"cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/live" "cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
@@ -16,6 +19,48 @@ type ResourceMongoAccessor[T ResourceInterface] struct {
utils.AbstractAccessor[ResourceInterface] // AbstractAccessor contains the basic fields of an accessor (model, caller) utils.AbstractAccessor[ResourceInterface] // AbstractAccessor contains the basic fields of an accessor (model, caller)
} }
func sourceFromAccess(access *ResourceAccess) string {
if access == nil {
return ""
}
if access.Container != nil && access.Container.Source != "" {
return access.Container.Source
}
if access.Source != nil && access.Source.Source != "" {
return access.Source.Source
}
return ""
}
func upsertSourceParam(outputs []models.Param, source string) []models.Param {
for i, p := range outputs {
if p.Attr == "source" {
outputs[i].Value = source
return outputs
}
}
return append(outputs, models.Param{Attr: "source", Value: source, Readonly: true})
}
func applyAccessSourceOutput(data utils.DBObject) {
switch r := data.(type) {
case *ProcessingResource:
for _, inst := range r.Instances {
if src := sourceFromAccess(inst.Access); src != "" {
r.Outputs = upsertSourceParam(r.Outputs, src)
return
}
}
case *DataResource:
for _, inst := range r.Instances {
if src := sourceFromAccess(inst.Access); src != "" {
r.Outputs = upsertSourceParam(r.Outputs, src)
return
}
}
}
}
// New creates a new instance of the computeMongoAccessor // New creates a new instance of the computeMongoAccessor
func NewAccessor[T ResourceInterface](t tools.DataType, request *tools.APIRequest) *ResourceMongoAccessor[T] { func NewAccessor[T ResourceInterface](t tools.DataType, request *tools.APIRequest) *ResourceMongoAccessor[T] {
if !slices.Contains([]tools.DataType{ if !slices.Contains([]tools.DataType{
@@ -67,6 +112,40 @@ func (dca *ResourceMongoAccessor[T]) LoadOne(id string) (utils.DBObject, int, er
return data, code, err return data, code, err
} }
var workspaceResourceTypes = []tools.DataType{
tools.COMPUTE_RESOURCE,
tools.DATA_RESOURCE,
tools.PROCESSING_RESOURCE,
tools.STORAGE_RESOURCE,
tools.WORKFLOW_RESOURCE,
tools.SERVICE_RESOURCE,
}
func emitResourceNATS(method tools.NATSMethod, dt tools.DataType, payload []byte) {
if !slices.Contains(workspaceResourceTypes, dt) {
return
}
tools.NewNATSCaller().SetNATSPub(method, tools.NATSResponse{
FromApp: config.GetAppName(),
Datatype: dt,
Method: int(method),
Payload: payload,
})
}
func (dca *ResourceMongoAccessor[T]) DeleteOne(id string) (utils.DBObject, int, error) {
data, code, err := dca.AbstractAccessor.LoadOne(id)
if err != nil {
return data, code, err
}
res, code, err := dca.AbstractAccessor.DeleteOne(id)
if err == nil && data != nil {
b, _ := json.Marshal(data)
go emitResourceNATS(tools.REMOVE_RESOURCE, dca.GetType(), b)
}
return res, code, err
}
func (dca *ResourceMongoAccessor[T]) UpdateOne(set map[string]interface{}, id string) (utils.DBObject, int, error) { func (dca *ResourceMongoAccessor[T]) UpdateOne(set map[string]interface{}, id string) (utils.DBObject, int, error) {
if dca.GetType() == tools.COMPUTE_RESOURCE { if dca.GetType() == tools.COMPUTE_RESOURCE {
delete(set, "architecture") delete(set, "architecture")
@@ -76,6 +155,14 @@ func (dca *ResourceMongoAccessor[T]) UpdateOne(set map[string]interface{}, id st
} else if dca.GetType() == tools.STORAGE_RESOURCE { } else if dca.GetType() == tools.STORAGE_RESOURCE {
delete(set, "storage_type") delete(set, "storage_type")
} }
if dca.GetType() == tools.PROCESSING_RESOURCE || dca.GetType() == tools.DATA_RESOURCE {
if merged, _, _, err := utils.ModelGenericUpdateOne(set, id, dca); err == nil {
applyAccessSourceOutput(merged)
if serialized := merged.Serialize(merged); serialized != nil {
set["outputs"] = serialized["outputs"]
}
}
}
return utils.GenericUpdateOne(set, id, dca) return utils.GenericUpdateOne(set, id, dca)
} }
@@ -94,6 +181,7 @@ func (dca *ResourceMongoAccessor[T]) StoreOne(data utils.DBObject) (utils.DBObje
} }
a = live.NewAccessor[*live.LiveDatacenter](tools.LIVE_DATACENTER, &tools.APIRequest{Admin: true}) a = live.NewAccessor[*live.LiveDatacenter](tools.LIVE_DATACENTER, &tools.APIRequest{Admin: true})
res, _, _ := a.LoadOne(r.Instances[0].GetID()) res, _, _ := a.LoadOne(r.Instances[0].GetID())
fmt.Println(res, r.Instances[0].GetID())
if res == nil { if res == nil {
return nil, 404, errors.New("can't create a non existing computing units resource not reported onto compute units catalog") return nil, 404, errors.New("can't create a non existing computing units resource not reported onto compute units catalog")
} }
@@ -110,7 +198,7 @@ func (dca *ResourceMongoAccessor[T]) StoreOne(data utils.DBObject) (utils.DBObje
a = live.NewAccessor[*live.LiveService](tools.LIVE_SERVICE, &tools.APIRequest{Admin: true}) a = live.NewAccessor[*live.LiveService](tools.LIVE_SERVICE, &tools.APIRequest{Admin: true})
res, _, _ := a.LoadOne(r.Instances[0].GetID()) res, _, _ := a.LoadOne(r.Instances[0].GetID())
if res == nil { if res == nil {
return nil, 404, errors.New("can't create a non existing service resource not reported onto compute units catalog") return nil, 404, errors.New("can't create a non existing service resource not reported onto service catalog")
} }
if !res.(*live.LiveService).IsCompatible(data.Serialize(data)) { if !res.(*live.LiveService).IsCompatible(data.Serialize(data)) {
return nil, 404, errors.New("live service target is not compatible") return nil, 404, errors.New("live service target is not compatible")
@@ -125,7 +213,7 @@ func (dca *ResourceMongoAccessor[T]) StoreOne(data utils.DBObject) (utils.DBObje
a = live.NewAccessor[*live.LiveStorage](tools.LIVE_STORAGE, &tools.APIRequest{Admin: true}) a = live.NewAccessor[*live.LiveStorage](tools.LIVE_STORAGE, &tools.APIRequest{Admin: true})
res, _, _ := a.LoadOne(r.Instances[0].GetID()) res, _, _ := a.LoadOne(r.Instances[0].GetID())
if res == nil { if res == nil {
return nil, 404, errors.New("can't create a non existing storage resource not reported onto compute units catalog") return nil, 404, errors.New("can't create a non existing storage resource not reported onto storage catalog")
} }
if !res.(*live.LiveStorage).IsCompatible(data.Serialize(data)) { if !res.(*live.LiveStorage).IsCompatible(data.Serialize(data)) {
return nil, 404, errors.New("live storage target is not compatible") return nil, 404, errors.New("live storage target is not compatible")
@@ -133,6 +221,7 @@ func (dca *ResourceMongoAccessor[T]) StoreOne(data utils.DBObject) (utils.DBObje
i = res.GetID() i = res.GetID()
idsToUpdate = res.(*live.LiveStorage).ResourcesID idsToUpdate = res.(*live.LiveStorage).ResourcesID
} }
applyAccessSourceOutput(data)
res, code, err := utils.GenericStoreOne(data, dca) res, code, err := utils.GenericStoreOne(data, dca)
if res != nil && i != "" { if res != nil && i != "" {
idsToUpdate = append(idsToUpdate, res.GetID()) idsToUpdate = append(idsToUpdate, res.GetID())
@@ -140,9 +229,119 @@ func (dca *ResourceMongoAccessor[T]) StoreOne(data utils.DBObject) (utils.DBObje
"resources_id": idsToUpdate, "resources_id": idsToUpdate,
}, i) }, i)
} }
if err == nil && res != nil {
b, _ := json.Marshal(res)
go emitResourceNATS(tools.CREATE_RESOURCE, dca.GetType(), b)
}
return res, code, err return res, code, err
} }
// PurgedResourcePayload holds a silently-deleted resource's type and serialized payload.
type PurgedResourcePayload struct {
DT tools.DataType
Payload []byte
}
// purgeByType searches and silently deletes all resources of type T created by creatorID.
// Uses AbstractAccessor.DeleteOne directly to bypass the NATS-emitting override.
func purgeByType[T ResourceInterface](dt tools.DataType, creatorID string) []PurgedResourcePayload {
a := NewAccessor[T](dt, nil)
if a == nil {
return nil
}
res, _, _ := a.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"creator_id": {{Operator: dbs.EQUAL.String(), Value: creatorID}},
},
}, "", false, 0, 10000)
var result []PurgedResourcePayload
for _, item := range res {
b, err := json.Marshal(item)
if err != nil {
continue
}
a.AbstractAccessor.DeleteOne(item.GetID())
result = append(result, PurgedResourcePayload{DT: dt, Payload: b})
}
return result
}
// PurgeCreatorResources deletes all catalog resources created by creatorPeerID from
// the DB without emitting NATS. Used for non-blacklist peer privilege downgrades where
// workspace state should be left untouched.
func PurgeCreatorResources(creatorPeerID string) []PurgedResourcePayload {
var result []PurgedResourcePayload
result = append(result, purgeByType[*ComputeResource](tools.COMPUTE_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*DataResource](tools.DATA_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*ProcessingResource](tools.PROCESSING_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*StorageResource](tools.STORAGE_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*WorkflowResource](tools.WORKFLOW_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*ServiceResource](tools.SERVICE_RESOURCE, creatorPeerID)...)
return result
}
// FilterMapFromResourcePayload deserializes a resource payload by DataType, zeros out
// the AbstractInstanciatedResource (and its AbstractResource / Instances sub-fields),
// then marshals back to get only the concrete type's own JSON fields.
// Returns nil for WORKFLOW_RESOURCE and unknown types.
// JSON keys only — not BSON paths.
func FilterMapFromResourcePayload(dt tools.DataType, payload []byte) map[string]interface{} {
var m map[string]interface{}
switch dt {
case tools.COMPUTE_RESOURCE:
var r ComputeResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*ComputeResourceInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.DATA_RESOURCE:
var r DataResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*DataInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.PROCESSING_RESOURCE:
var r ProcessingResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*ProcessingInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.STORAGE_RESOURCE:
var r StorageResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*StorageResourceInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.SERVICE_RESOURCE:
var r ServiceResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*ServiceInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.WORKFLOW_RESOURCE:
var r WorkflowResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractResource = AbstractResource{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
default:
return nil
}
return m
}
func (dca *ResourceMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObject, int, error) { func (dca *ResourceMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
return dca.StoreOne(data) return dca.StoreOne(data)
} }
+5 -1
View File
@@ -98,6 +98,10 @@ func (r *AbstractObject) DeepCopy() *AbstractObject {
return &obj return &obj
} }
func (r *AbstractObject) SetDraft(draft bool) {
r.IsDraft = draft
}
func (r *AbstractObject) SetName(name string) { func (r *AbstractObject) SetName(name string) {
r.Name = name r.Name = name
} }
@@ -146,7 +150,7 @@ func (ao *AbstractObject) UpToDate(user string, peer string, create bool) {
ao.UpdateDate = time.Now() ao.UpdateDate = time.Now()
ao.UpdaterID = peer ao.UpdaterID = peer
ao.UserUpdaterID = user ao.UserUpdaterID = user
if create && ao.CreatorID != "" { if create && ao.CreatorID == "" {
ao.CreationDate = time.Now() ao.CreationDate = time.Now()
ao.CreatorID = peer ao.CreatorID = peer
ao.UserCreatorID = user ao.UserCreatorID = user
+85 -3
View File
@@ -109,6 +109,9 @@ func ModelGenericUpdateOne(change map[string]interface{}, id string, a Accessor)
obj := a.NewObj() obj := a.NewObj()
b, _ := json.Marshal(r) b, _ := json.Marshal(r)
json.Unmarshal(b, obj) json.Unmarshal(b, obj)
if change["is_draft"] == true {
obj.SetDraft(change["is_draft"] == true)
}
if !a.GetRequest().Admin { if !a.GetRequest().Admin {
var ok bool var ok bool
ok, r = r.CanUpdate(obj) ok, r = r.CanUpdate(obj)
@@ -126,9 +129,7 @@ func ModelGenericUpdateOne(change map[string]interface{}, id string, a Accessor)
} }
loaded := r.Serialize(r) // get the loaded object loaded := r.Serialize(r) // get the loaded object
for k, v := range change { // apply the changes, with a flatten method deepMerge(loaded, change)
loaded[k] = v
}
newObj := a.NewObj() newObj := a.NewObj()
b, err = json.Marshal(loaded) b, err = json.Marshal(loaded)
if err != nil { if err != nil {
@@ -252,6 +253,87 @@ func IsMySelf(peerID string, wfa Accessor) (bool, string) {
return peerID == pp.GetID(), pp.GetID() return peerID == pp.GetID(), pp.GetID()
} }
// deepMerge overlays patch values onto base, preserving base values for keys
// absent from patch, nil patch values, and empty strings when base is non-empty.
// This prevents partial frontend payloads from silently erasing server-managed
// fields (source, env, country, owners, creator_id, creation_date, …).
func deepMerge(base, patch map[string]interface{}) {
for k, pv := range patch {
bv := base[k]
switch pvTyped := pv.(type) {
case map[string]interface{}:
if bvMap, ok := bv.(map[string]interface{}); ok {
deepMerge(bvMap, pvTyped)
} else {
base[k] = pv
}
case []interface{}:
if bvSlice, ok := bv.([]interface{}); ok {
base[k] = mergeSlices(bvSlice, pvTyped)
} else {
base[k] = pv
}
case string:
// Don't overwrite a non-empty base value with an empty string.
if pvTyped != "" {
base[k] = pv
}
default:
if pv != nil {
base[k] = pv
}
}
}
}
// mergeSlices merges two slices element-wise.
// For slices of maps it matches elements by their "id" field when available;
// falls back to positional matching. An empty patch slice leaves base intact.
func mergeSlices(base, patch []interface{}) []interface{} {
if len(patch) == 0 {
return base
}
for _, e := range patch {
if _, ok := e.(map[string]interface{}); !ok {
return patch // non-map elements: replace wholesale
}
}
baseByID := map[string]map[string]interface{}{}
for _, e := range base {
if em, ok := e.(map[string]interface{}); ok {
if id, ok := em["id"].(string); ok && id != "" {
baseByID[id] = em
}
}
}
result := make([]interface{}, 0, len(patch))
for i, pe := range patch {
pm, _ := pe.(map[string]interface{})
if pm == nil {
result = append(result, pe)
continue
}
var baseElem map[string]interface{}
if id, ok := pm["id"].(string); ok && id != "" {
baseElem = baseByID[id]
}
if baseElem == nil && i < len(base) {
baseElem, _ = base[i].(map[string]interface{})
}
if baseElem != nil {
merged := make(map[string]interface{}, len(baseElem))
for k, v := range baseElem {
merged[k] = v
}
deepMerge(merged, pm)
result = append(result, merged)
} else {
result = append(result, pe)
}
}
return result
}
func GenerateNodeID() (string, error) { func GenerateNodeID() (string, error) {
folderStatic := "/var/lib/opencloud-node" folderStatic := "/var/lib/opencloud-node"
if _, err := os.Stat(folderStatic); err == nil { if _, err := os.Stat(folderStatic); err == nil {
+1
View File
@@ -26,6 +26,7 @@ type DBObject interface {
GetID() string GetID() string
GetName() string GetName() string
SetName(name string) SetName(name string)
SetDraft(draft bool)
IsDrafted() bool IsDrafted() bool
CanDelete() bool CanDelete() bool
StoreDraftDefault() StoreDraftDefault()
+486 -62
View File
@@ -11,6 +11,7 @@ import (
"strings" "strings"
"time" "time"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/booking/planner" "cloud.o-forge.io/core/oc-lib/models/booking/planner"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area" "cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
@@ -53,7 +54,11 @@ type Workflow struct {
Outputs map[string][]models.Param `json:"outputs" bson:"outputs"` Outputs map[string][]models.Param `json:"outputs" bson:"outputs"`
Args map[string][]string `json:"args" bson:"args"` Args map[string][]string `json:"args" bson:"args"`
Exposes map[string][]models.Expose `bson:"exposes" json:"exposes"` // Expose is the execution Exposes map[string][]models.Expose `bson:"exposes" json:"exposes"` // Expose is the execution
SelectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty"` SelectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty" bson:"selected_embedded_storages,omitempty"`
// StaleMap maps resource ID → stale bool. Populated at GET time from the
// verify campaign results stored in oc-workflow's stale cache. Not persisted.
StaleMap map[string]bool `json:"stale_map,omitempty" bson:"-"`
} }
func (d *Workflow) GetAccessor(request *tools.APIRequest) utils.Accessor { func (d *Workflow) GetAccessor(request *tools.APIRequest) utils.Accessor {
@@ -108,9 +113,9 @@ func (d *Workflow) GetResources(dt tools.DataType) []resources.ResourceInterface
return itf return itf
} }
func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.APIRequest) (*Workflow, error) { func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.APIRequest) (*Workflow, []string, error) {
if plantUML == nil { if plantUML == nil {
return d, errors.New("no file available to export") return d, nil, errors.New("no file available to export")
} }
defer plantUML.Close() defer plantUML.Close()
@@ -189,9 +194,11 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A
lines = append(lines, scanner.Text()) lines = append(lines, scanner.Text())
} }
if err := scanner.Err(); err != nil { if err := scanner.Err(); err != nil {
return d, err return d, nil, err
} }
var warnings []string
for i, line := range lines { for i, line := range lines {
trimmed := strings.TrimSpace(line) trimmed := strings.TrimSpace(line)
@@ -215,42 +222,41 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A
} }
} }
for n, new := range resourceCatalog { // Handle links outside the catalog loop: each link line must be processed
if strings.Contains(line, n+"(") && !strings.Contains(line, "!procedure") && !strings.Contains(line, "!define") { // exclude macro declarations // exactly once (the catalog loop would otherwise call extractLink once per
newRes := new() // catalog entry, producing N×7 duplicate links in the graph).
newRes.SetID(uuid.New().String()) if strings.Contains(line, "-->") {
varName, graphItem, err := d.extractResourcePlantUML(parseLine, newRes, n, request.PeerID) if err := d.extractLink(parseLine, graphVarName, "-->", false); err != nil {
if err != nil { fmt.Println(err)
return d, err
} }
continue
}
if strings.Contains(line, "<--") {
if err := d.extractLink(parseLine, graphVarName, "<--", true); err != nil {
fmt.Println(err)
}
continue
}
if strings.Contains(line, "--") {
if err := d.extractLink(parseLine, graphVarName, "--", false); err != nil {
fmt.Println(err)
}
continue
}
for n, newFn := range resourceCatalog {
if strings.Contains(line, n+"(") && !strings.Contains(line, "!procedure") && !strings.Contains(line, "!define") {
newRes := newFn()
newRes.SetID(uuid.New().String())
varName, graphItem, warns, err := d.extractResourcePlantUML(parseLine, newRes, n, request.PeerID, request)
if err != nil {
return d, warnings, err
}
warnings = append(warnings, warns...)
if graphItem != nil { if graphItem != nil {
graphVarName[varName] = *graphItem graphVarName[varName] = *graphItem
} }
continue break
} else if strings.Contains(line, "-->") {
err := d.extractLink(parseLine, graphVarName, "-->", false)
if err != nil {
fmt.Println(err)
continue
}
} else if strings.Contains(line, "<--") {
err := d.extractLink(parseLine, graphVarName, "<--", true)
if err != nil {
fmt.Println(err)
continue
}
} else if strings.Contains(line, "--") {
err := d.extractLink(parseLine, graphVarName, "--", false)
if err != nil {
fmt.Println(err)
continue
}
} else if strings.Contains(line, "-") {
err := d.extractLink(parseLine, graphVarName, "-", false)
if err != nil {
fmt.Println(err)
continue
}
} }
} }
} }
@@ -260,10 +266,9 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A
d.generateResource(d.GetResources(tools.STORAGE_RESOURCE), request) d.generateResource(d.GetResources(tools.STORAGE_RESOURCE), request)
d.generateResource(d.GetResources(tools.COMPUTE_RESOURCE), request) d.generateResource(d.GetResources(tools.COMPUTE_RESOURCE), request)
d.generateResource(d.GetResources(tools.WORKFLOW_RESOURCE), request) d.generateResource(d.GetResources(tools.WORKFLOW_RESOURCE), request)
d.generateResource(d.GetResources(tools.SERVICE_RESOURCE), request)
d.generateResource(d.GetResources(tools.DYNAMIC_RESOURCE), request) d.generateResource(d.GetResources(tools.DYNAMIC_RESOURCE), request)
d.Graph.Items = graphVarName d.Graph.Items = graphVarName
return d, nil return d, warnings, nil
} }
func (d *Workflow) generateResource(datas []resources.ResourceInterface, request *tools.APIRequest) error { func (d *Workflow) generateResource(datas []resources.ResourceInterface, request *tools.APIRequest) error {
@@ -355,14 +360,37 @@ func (d *Workflow) extractLink(line string, graphVarName map[string]graph.GraphI
if len(splitted) < 2 { if len(splitted) < 2 {
return errors.New("links elements not found") return errors.New("links elements not found")
} }
// Source: trim surrounding whitespace.
srcVar := strings.TrimSpace(splitted[0])
// Destination: trim whitespace then stop at the first space or apostrophe
// (the rest may be a trailing comment produced by ToPlantUML look-ahead).
dstTokens := strings.FieldsFunc(strings.TrimSpace(splitted[1]), func(r rune) bool {
return r == ' ' || r == '\t' || r == '\''
})
if len(dstTokens) == 0 {
return errors.New("link destination var name not found")
}
dstVar := dstTokens[0]
srcItem, srcOk := graphVarName[srcVar]
dstItem, dstOk := graphVarName[dstVar]
if !srcOk || srcItem.ID == "" {
return fmt.Errorf("link source %q not declared", srcVar)
}
if !dstOk || dstItem.ID == "" {
return fmt.Errorf("link destination %q not declared", dstVar)
}
link := &graph.GraphLink{ link := &graph.GraphLink{
Source: graph.Position{ Source: graph.Position{
ID: graphVarName[splitted[0]].ID, ID: srcItem.ID,
X: 0, X: 0,
Y: 0, Y: 0,
}, },
Destination: graph.Position{ Destination: graph.Position{
ID: graphVarName[splitted[1]].ID, ID: dstItem.ID,
X: 0, X: 0,
Y: 0, Y: 0,
}, },
@@ -381,35 +409,46 @@ func (d *Workflow) extractLink(line string, graphVarName map[string]graph.GraphI
return nil return nil
} }
func (d *Workflow) extractResourcePlantUML(line string, resource resources.ResourceInterface, dataName string, peerID string) (string, *graph.GraphItem, error) { func (d *Workflow) extractResourcePlantUML(line string, resource resources.ResourceInterface, dataName string, peerID string, request *tools.APIRequest) (string, *graph.GraphItem, []string, error) {
splittedFunc := strings.Split(line, "(") splittedFunc := strings.Split(line, "(")
if len(splittedFunc) <= 1 { if len(splittedFunc) <= 1 {
return "", nil, errors.New("Can't deserialize Object, there's no func") return "", nil, nil, errors.New("Can't deserialize Object, there's no func")
} }
splittedParams := strings.Split(splittedFunc[1], ",") splittedParams := strings.Split(splittedFunc[1], ",")
if len(splittedParams) <= 1 { if len(splittedParams) <= 1 {
return "", nil, errors.New("Can't deserialize Object, there's no params") return "", nil, nil, errors.New("Can't deserialize Object, there's no params")
} }
varName := splittedParams[0] varName := splittedParams[0]
splitted := strings.Split(splittedParams[1], "\"") splitted := strings.Split(splittedParams[1], "\"")
if len(splitted) <= 1 { if len(splitted) <= 1 {
return "", nil, errors.New("Can't deserialize Object, there's no name") return "", nil, nil, errors.New("Can't deserialize Object, there's no name")
} }
resource.SetName(strings.ReplaceAll(splitted[1], "\\n", " ")) name := strings.ReplaceAll(splitted[1], "\\n", " ")
// Resources with instances get a default one seeded from the parent resource, // Extract comment text (if present) for metadata parsing.
// then overridden by any explicit comment attributes. comment := ""
// Event (NativeTool) has no instance: getNewInstance returns nil and is skipped. if parts := strings.Split(line, "'"); len(parts) > 1 {
instance := d.getNewInstance(dataName, splitted[1], peerID) comment = strings.ReplaceAll(parts[1], "'", "")
}
var warns []string
// Try to resolve an existing catalog resource (by id, then by name).
if existing, warn := d.resolveExistingResource(resource, dataName, name, comment, request); existing != nil {
warns = append(warns, warn)
item := d.addExistingGraphItem(dataName, existing)
return varName, item, warns, nil
}
// No existing resource — create new.
resource.SetName(name)
instance := d.getNewInstance(dataName, name, peerID)
if instance != nil { if instance != nil {
if b, err := json.Marshal(resource); err == nil { if b, err := json.Marshal(resource); err == nil {
json.Unmarshal(b, instance) json.Unmarshal(b, instance)
} }
splittedComments := strings.Split(line, "'") if comment != "" {
if len(splittedComments) > 1 {
comment := strings.ReplaceAll(splittedComments[1], "'", "")
json.Unmarshal(parseHumanFriendlyAttrs(comment), instance) json.Unmarshal(parseHumanFriendlyAttrs(comment), instance)
} }
resource.AddInstances(instance) resource.AddInstances(instance)
@@ -420,7 +459,91 @@ func (d *Workflow) extractResourcePlantUML(line string, resource resources.Resou
d.Graph.Items[item.ID] = *item d.Graph.Items[item.ID] = *item
} }
return varName, item, nil return varName, item, warns, nil
}
// resolveExistingResource tries to find an existing catalog resource matching
// the given name or the id embedded in the PlantUML comment.
// Returns (resource, warning message) or (nil, "") if none found.
func (d *Workflow) resolveExistingResource(
proto resources.ResourceInterface,
dataName, name, comment string,
request *tools.APIRequest,
) (resources.ResourceInterface, string) {
accessor := proto.GetAccessor(request)
// 1. Try lookup by id from comment ("id: <uuid>").
if comment != "" {
attrs := map[string]any{}
json.Unmarshal(parseHumanFriendlyAttrs(comment), &attrs)
if id, ok := attrs["id"].(string); ok && id != "" {
if dbObj, _, err := accessor.LoadOne(id); err == nil && dbObj != nil {
if ri, ok := dbObj.(resources.ResourceInterface); ok {
return ri, fmt.Sprintf(`[import warning] %s "%s": existing resource retrieved by id %s`, dataName, name, id)
}
}
}
}
// 2. Try search by exact name.
filter := &dbs.Filters{
Or: map[string][]dbs.Filter{
"abstractobject.name": {{Operator: dbs.EQUAL.String(), Value: name}},
},
}
if results, _, err := accessor.Search(filter, "", false, 0, 10); err == nil {
for _, r := range results {
if r.GetName() == name {
if dbObj, _, err2 := accessor.LoadOne(r.GetID()); err2 == nil && dbObj != nil {
if ri, ok := dbObj.(resources.ResourceInterface); ok {
return ri, fmt.Sprintf(`[import warning] %s "%s": existing resource found by name and retrieved instead of creating a new one`, dataName, name)
}
}
}
}
}
return nil, ""
}
// addExistingGraphItem registers an already-existing resource in the workflow's
// ID lists and graph, without adding it to the resource lists (so generateResource
// won't try to store it again).
func (d *Workflow) addExistingGraphItem(dataName string, resource resources.ResourceInterface) *graph.GraphItem {
graphItem := &graph.GraphItem{
ID: uuid.New().String(),
ItemResource: &resources.ItemResource{},
}
switch dataName {
case "Data":
d.Datas = append(d.Datas, resource.GetID())
if r, ok := resource.(*resources.DataResource); ok {
graphItem.Data = r
}
case "Processing":
d.Processings = append(d.Processings, resource.GetID())
if r, ok := resource.(*resources.ProcessingResource); ok {
graphItem.Processing = r
}
case "Service":
d.Services = append(d.Services, resource.GetID())
if r, ok := resource.(*resources.ServiceResource); ok {
graphItem.Service = r
}
case "Storage":
d.Storages = append(d.Storages, resource.GetID())
if r, ok := resource.(*resources.StorageResource); ok {
graphItem.Storage = r
}
case "ComputeUnit":
d.Computes = append(d.Computes, resource.GetID())
if r, ok := resource.(*resources.ComputeResource); ok {
graphItem.Compute = r
}
default:
return nil
}
return graphItem
} }
func (d *Workflow) getNewGraphItem(dataName string, resource resources.ResourceInterface) *graph.GraphItem { func (d *Workflow) getNewGraphItem(dataName string, resource resources.ResourceInterface) *graph.GraphItem {
@@ -939,13 +1062,22 @@ const (
ViolationMissingComputeUnit ViolationType = "missing_compute_unit" ViolationMissingComputeUnit ViolationType = "missing_compute_unit"
ViolationCycle ViolationType = "cycle" ViolationCycle ViolationType = "cycle"
ViolationMissingDataStorage ViolationType = "missing_data_storage" ViolationMissingDataStorage ViolationType = "missing_data_storage"
ViolationRequiredOutputMissing ViolationType = "required_output_missing"
// Warnings — non-blocking, reported for UX // Warnings — non-blocking, reported for UX
ViolationInvertedArrow ViolationType = "inverted_arrow" ViolationInvertedArrow ViolationType = "inverted_arrow"
ViolationIsolatedProcessing ViolationType = "isolated_processing" ViolationIsolatedProcessing ViolationType = "isolated_processing"
ViolationStorageNotLinkedToProcessing ViolationType = "storage_not_linked_to_processing" ViolationStorageNotLinkedToProcessing ViolationType = "storage_not_linked_to_processing"
ViolationDynamicNotConfigured ViolationType = "dynamic_not_configured"
ViolationAnchorOnNonStorage ViolationType = "anchor_on_non_storage"
) )
// ocAnchorAccess is the magic value used by oc-monitord to inject storage
// access credentials into a workflow step at runtime. It must only appear as
// an output of a storage (or dynamic-storage) element.
const ocAnchorAccess = "§oc:access§"
// IntegrityViolation describes a single structural or semantic problem // IntegrityViolation describes a single structural or semantic problem
// found in the workflow graph. // found in the workflow graph.
type IntegrityViolation struct { type IntegrityViolation struct {
@@ -976,12 +1108,15 @@ func (v IntegrityViolation) IsWarning() bool { return v.Severity == SeverityWarn
func (w *Workflow) ValidateIntegrity() []IntegrityViolation { func (w *Workflow) ValidateIntegrity() []IntegrityViolation {
var violations []IntegrityViolation var violations []IntegrityViolation
violations = append(violations, w.validateVariables()...) violations = append(violations, w.validateVariables()...)
violations = append(violations, w.validateRequiredInputs()...)
violations = append(violations, w.validateComputeLinks()...) violations = append(violations, w.validateComputeLinks()...)
violations = append(violations, w.detectCycles()...) violations = append(violations, w.detectCycles()...)
violations = append(violations, w.validateDataStorageLinks()...) violations = append(violations, w.validateDataStorageLinks()...)
violations = append(violations, w.detectInvertedArrows()...) violations = append(violations, w.detectInvertedArrows()...)
violations = append(violations, w.detectIsolatedProcessings()...) violations = append(violations, w.detectIsolatedProcessings()...)
violations = append(violations, w.detectOrphanedStorages()...) violations = append(violations, w.detectOrphanedStorages()...)
violations = append(violations, w.validateDynamicFilters()...)
violations = append(violations, w.validateAnchorOutputs()...)
return violations return violations
} }
@@ -1059,14 +1194,12 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation {
var name string var name string
switch { switch {
case w.Graph.IsProcessing(item) && item.Processing != nil: case w.Graph.IsProcessing(item) && item.Processing != nil:
// IsService processings are long-running services and don't need a Compute booking.
if item.Processing.IsService { if item.Processing.IsService {
continue continue
} }
needsCompute = true needsCompute = true
name = item.Processing.GetName() name = item.Processing.GetName()
case w.Graph.IsService(item) && item.Service != nil: case w.Graph.IsService(item) && item.Service != nil:
// HOSTED services use an existing endpoint — no Compute booking needed.
inst := item.Service.GetSelectedInstance(nil) inst := item.Service.GetSelectedInstance(nil)
if inst != nil { if inst != nil {
if si, ok := inst.(*resources.ServiceInstance); ok && si.Mode == resources.HOSTED { if si, ok := inst.(*resources.ServiceInstance); ok && si.Mode == resources.HOSTED {
@@ -1075,6 +1208,9 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation {
} }
needsCompute = true needsCompute = true
name = item.Service.GetName() name = item.Service.GetName()
case w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.PROCESSING_RESOURCE:
needsCompute = true
name = w.itemName(id)
} }
if !needsCompute { if !needsCompute {
continue continue
@@ -1089,7 +1225,12 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation {
} else { } else {
continue continue
} }
if other, ok := w.Graph.Items[otherID]; ok && w.Graph.IsCompute(other) { other, ok := w.Graph.Items[otherID]
if !ok {
continue
}
// A concrete compute OR a dynamic node typed as compute satisfies the requirement.
if w.Graph.IsCompute(other) || (w.Graph.IsDynamic(other) && other.Dynamic.Type == tools.COMPUTE_RESOURCE) {
hasCompute = true hasCompute = true
break break
} }
@@ -1180,12 +1321,41 @@ func (w *Workflow) detectCycles() []IntegrityViolation {
// validateDataStorageLinks checks that every Data item with a non-empty Source // validateDataStorageLinks checks that every Data item with a non-empty Source
// has at least one Storage linked — the builder needs this to inject the // has at least one Storage linked — the builder needs this to inject the
// download step (curl or NATS/Minio protocol). // download step (curl or NATS/Minio protocol).
// isStorageEquivalent returns true for items that can satisfy a Data node's
// storage requirement: concrete storage, dynamic-storage, or a compute that
// has at least one embedded storage available on one of its instances.
func (w *Workflow) isStorageEquivalent(item graph.GraphItem) bool {
if w.Graph.IsStorage(item) {
return true
}
if w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE {
return true
}
if w.Graph.IsCompute(item) && item.Compute != nil {
for _, inst := range item.Compute.Instances {
if len(inst.AvailableStorages) > 0 {
return true
}
}
}
return false
}
func (w *Workflow) validateDataStorageLinks() []IntegrityViolation { func (w *Workflow) validateDataStorageLinks() []IntegrityViolation {
var violations []IntegrityViolation var violations []IntegrityViolation
dataStorageLinks := w.Graph.GetDataStorageLinks() // Build the set of data item IDs that have a valid storage-equivalent link.
linkedStorage := map[string]struct{}{} linkedStorage := map[string]struct{}{}
for _, dsl := range dataStorageLinks { for _, link := range w.Graph.Links {
linkedStorage[dsl.DataItemID] = struct{}{} srcItem, srcOk := w.Graph.Items[link.Source.ID]
dstItem, dstOk := w.Graph.Items[link.Destination.ID]
if !srcOk || !dstOk {
continue
}
if w.Graph.IsData(srcItem) && w.isStorageEquivalent(dstItem) {
linkedStorage[link.Source.ID] = struct{}{}
} else if w.isStorageEquivalent(srcItem) && w.Graph.IsData(dstItem) {
linkedStorage[link.Destination.ID] = struct{}{}
}
} }
for id, item := range w.Graph.Items { for id, item := range w.Graph.Items {
if !w.Graph.IsData(item) || item.Data == nil { if !w.Graph.IsData(item) || item.Data == nil {
@@ -1213,6 +1383,97 @@ func (w *Workflow) validateDataStorageLinks() []IntegrityViolation {
return violations return violations
} }
// validateRequiredInputs checks that for each processing node with a required
// input, every immediate predecessor outputs a parameter with that name.
// Mirrors the requiredOutputMissing check in oc-front's checkTopology().
func (w *Workflow) validateRequiredInputs() []IntegrityViolation {
var violations []IntegrityViolation
procIDs := map[string]struct{}{}
for id, item := range w.Graph.Items {
if w.Graph.IsProcessing(item) || w.Graph.IsService(item) || w.Graph.IsNativeTool(item) {
procIDs[id] = struct{}{}
}
}
// Build direct predecessors map.
predecessors := map[string][]string{}
for id := range procIDs {
predecessors[id] = []string{}
}
for _, link := range w.Graph.Links {
src, dst := link.Source.ID, link.Destination.ID
_, srcIsProc := procIDs[src]
_, dstIsProc := procIDs[dst]
if !srcIsProc || !dstIsProc {
continue
}
dir := int64(0)
if link.Style != nil {
dir = link.Style.ArrowDirection
}
if dir == arrowDirectionBackward {
predecessors[src] = append(predecessors[src], dst)
} else {
predecessors[dst] = append(predecessors[dst], src)
}
}
for id, reqInputs := range w.Inputs {
if _, isProc := procIDs[id]; !isProc {
continue
}
for _, inp := range reqInputs {
if !inp.Required || inp.Name == "" {
continue
}
for _, predID := range predecessors[id] {
if !w.nodeHasOutput(predID, inp.Name) {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationRequiredOutputMissing,
ItemIDs: []string{id, predID},
Message: fmt.Sprintf(
`"%s" requires input "%s" but "%s" does not output it`,
w.itemName(id), inp.Name, w.itemName(predID),
),
})
}
}
}
}
return violations
}
// nodeHasOutput returns true if the given node outputs a parameter named name,
// either via workflow-level outputs or its resource's own outputs.
func (w *Workflow) nodeHasOutput(nodeID, name string) bool {
for _, p := range w.Outputs[nodeID] {
if p.Name == name {
return true
}
}
item, ok := w.Graph.Items[nodeID]
if !ok {
return false
}
var res resources.ResourceInterface
switch {
case item.Processing != nil:
res = item.Processing
case item.Service != nil:
res = item.Service
}
if res != nil {
for _, p := range res.GetOutputs() {
if p.Name == name {
return true
}
}
}
return false
}
// detectInvertedArrows warns when a link between two processing nodes uses a // detectInvertedArrows warns when a link between two processing nodes uses a
// backward arrow direction — mirroring the invertedArrow warning in oc-front. // backward arrow direction — mirroring the invertedArrow warning in oc-front.
func (w *Workflow) detectInvertedArrows() []IntegrityViolation { func (w *Workflow) detectInvertedArrows() []IntegrityViolation {
@@ -1280,12 +1541,162 @@ func (w *Workflow) detectIsolatedProcessings() []IntegrityViolation {
return violations return violations
} }
// ---------------------------------------------------------------------------
// AE validation helpers — centralised so both oc-scheduler and oc-schedulerd
// share the same logic without code duplication.
// ---------------------------------------------------------------------------
// BuildResourceIDSet constructs the per-type resource-ID map and the flat
// coupling-membership set used by ValidateWorkflowAE.
//
// selectedEmbeddedStorages and selectedInstances come from the scheduling
// request (WorkflowSchedule) or from the WorkflowExecution at launch time.
// Embedded storages are NOT stored in Workflow.Storages (they are inside
// ComputeResourceInstance.AvailableStorages), so they must be resolved here
// to make them visible to the AE coupling check.
func (w *Workflow) BuildResourceIDSet(
selectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection,
selectedInstances ConfigItem,
) (map[tools.DataType][]string, map[string]struct{}) {
resourcesByType := map[tools.DataType][]string{
tools.DATA_RESOURCE: w.Datas,
tools.PROCESSING_RESOURCE: w.Processings,
tools.STORAGE_RESOURCE: append([]string{}, w.Storages...),
tools.COMPUTE_RESOURCE: w.Computes,
tools.WORKFLOW_RESOURCE: w.Workflows,
tools.SERVICE_RESOURCE: w.Services,
}
idSet := map[string]struct{}{}
for _, ids := range resourcesByType {
for _, id := range ids {
idSet[id] = struct{}{}
}
}
for graphItemID, sel := range selectedEmbeddedStorages {
if sel == nil {
continue
}
c, ok := w.Graph.Items[graphItemID]
if !ok {
continue
}
_, computeRes := c.GetResource()
computeResource, ok := computeRes.(*resources.ComputeResource)
if !ok {
continue
}
computeIdx := 0
if d := selectedInstances.Get(computeResource.GetID()); d != nil {
computeIdx = *d
}
if computeIdx >= len(computeResource.Instances) {
continue
}
computeInst := computeResource.Instances[computeIdx]
if sel.StorageIndex >= len(computeInst.AvailableStorages) {
continue
}
storageID := computeInst.AvailableStorages[sel.StorageIndex].GetID()
if storageID == "" {
continue
}
idSet[storageID] = struct{}{}
resourcesByType[tools.STORAGE_RESOURCE] = append(resourcesByType[tools.STORAGE_RESOURCE], storageID)
}
return resourcesByType, idSet
}
// ValidateWorkflowAE checks the ExploitationAuthorizations of every resource
// referenced in resourcesByType against the coupling/peer/workflow constraints.
//
// loadResource is injected by the caller to avoid a circular import
// (oc-lib/models/resources → oclib → oc-lib/models → resources).
// A nil return from loadResource means "resource not found — skip".
func (w *Workflow) ValidateWorkflowAE(
workflowID, consumerPeerID string,
resourcesByType map[tools.DataType][]string,
idSet map[string]struct{},
loadResource func(tools.DataType, string) resources.ResourceInterface,
) []resources.AEViolation {
now := time.Now().UTC()
var violations []resources.AEViolation
for dt, ids := range resourcesByType {
for _, id := range ids {
res := loadResource(dt, id)
if res == nil {
continue
}
for _, ae := range res.GetExploitationAuthorizations() {
violations = append(violations, ae.CheckAE(id, workflowID, consumerPeerID, idSet, now)...)
}
}
}
return violations
}
// detectOrphanedStorages warns when a storage node is not linked to any // detectOrphanedStorages warns when a storage node is not linked to any
// processing node — it contributes no data flow to the workflow. // processing node — it contributes no data flow to the workflow.
// validateDynamicFilters mirrors oc-front's dynamic-not-configured check:
// a dynamic node with no filters cannot be resolved by the scheduler.
func (w *Workflow) validateDynamicFilters() []IntegrityViolation {
var violations []IntegrityViolation
for id, item := range w.Graph.Items {
if !w.Graph.IsDynamic(item) {
continue
}
f := item.Dynamic.Filters
if len(f.And) == 0 && len(f.Or) == 0 {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationDynamicNotConfigured,
ItemIDs: []string{id},
Message: fmt.Sprintf(
`"%s" is a dynamic %s with no filters — at least one filter is required for the scheduler to resolve it`,
w.itemName(id), item.Dynamic.Type,
),
})
}
}
return violations
}
// validateAnchorOutputs mirrors oc-front's anchor-on-non-storage check:
// the §oc:access§ magic value must only appear as an output of a storage,
// dynamic-storage, or compute node that has an active embedded storage selected.
func (w *Workflow) validateAnchorOutputs() []IntegrityViolation {
var violations []IntegrityViolation
for id, item := range w.Graph.Items {
if w.Graph.IsStorage(item) || (w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE) {
continue
}
if w.Graph.IsCompute(item) {
if _, ok := w.SelectedEmbeddedStorages[id]; ok {
continue
}
}
for _, p := range w.Outputs[id] {
if p.Value == ocAnchorAccess {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationAnchorOnNonStorage,
ItemIDs: []string{id},
Message: fmt.Sprintf(
`"%s" has an access anchor (%s) as output — access anchors may only be outputs of storage elements`,
w.itemName(id), ocAnchorAccess,
),
})
break
}
}
}
return violations
}
func (w *Workflow) detectOrphanedStorages() []IntegrityViolation { func (w *Workflow) detectOrphanedStorages() []IntegrityViolation {
var violations []IntegrityViolation var violations []IntegrityViolation
for id, item := range w.Graph.Items { for id, item := range w.Graph.Items {
if !w.Graph.IsStorage(item) { // Check both concrete storage and dynamic-storage nodes.
if !w.Graph.IsStorage(item) && !(w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE) {
continue continue
} }
linkedTopics := map[string]struct{}{} linkedTopics := map[string]struct{}{}
@@ -1298,7 +1709,10 @@ func (w *Workflow) detectOrphanedStorages() []IntegrityViolation {
} else { } else {
continue continue
} }
if other, ok := w.Graph.Items[otherID]; ok { other, ok := w.Graph.Items[otherID]
if !ok {
continue
}
switch { switch {
case w.Graph.IsProcessing(other): case w.Graph.IsProcessing(other):
linkedTopics["processing"] = struct{}{} linkedTopics["processing"] = struct{}{}
@@ -1308,6 +1722,16 @@ func (w *Workflow) detectOrphanedStorages() []IntegrityViolation {
linkedTopics["data"] = struct{}{} linkedTopics["data"] = struct{}{}
case w.Graph.IsService(other): case w.Graph.IsService(other):
linkedTopics["service"] = struct{}{} linkedTopics["service"] = struct{}{}
case w.Graph.IsDynamic(other):
switch other.Dynamic.Type {
case tools.PROCESSING_RESOURCE:
linkedTopics["processing"] = struct{}{}
case tools.COMPUTE_RESOURCE:
linkedTopics["compute"] = struct{}{}
case tools.DATA_RESOURCE:
linkedTopics["data"] = struct{}{}
case tools.SERVICE_RESOURCE:
linkedTopics["service"] = struct{}{}
} }
} }
} }
+2 -2
View File
@@ -160,7 +160,7 @@ func (a *workflowMongoAccessor) execute(workflow *Workflow, delete bool, active
if err == nil && len(resource) > 0 { // if the workspace already exists, update it if err == nil && len(resource) > 0 { // if the workspace already exists, update it
w := &workspace.Workspace{ w := &workspace.Workspace{
Active: active, Active: active,
ResourceSet: resources.ResourceSet{ WorkspaceResourceSet: resources.WorkspaceResourceSet{
Datas: workflow.Datas, Datas: workflow.Datas,
Processings: workflow.Processings, Processings: workflow.Processings,
Storages: workflow.Storages, Storages: workflow.Storages,
@@ -173,7 +173,7 @@ func (a *workflowMongoAccessor) execute(workflow *Workflow, delete bool, active
a.workspaceAccessor.StoreOne(&workspace.Workspace{ a.workspaceAccessor.StoreOne(&workspace.Workspace{
Active: active, Active: active,
AbstractObject: utils.AbstractObject{Name: workflow.Name + "_workspace"}, AbstractObject: utils.AbstractObject{Name: workflow.Name + "_workspace"},
ResourceSet: resources.ResourceSet{ WorkspaceResourceSet: resources.WorkspaceResourceSet{
Datas: workflow.Datas, Datas: workflow.Datas,
Processings: workflow.Processings, Processings: workflow.Processings,
Storages: workflow.Storages, Storages: workflow.Storages,
@@ -48,6 +48,10 @@ type WorkflowExecution struct {
BookingsState map[string]BookingState `json:"bookings_state" bson:"bookings_state,omitempty"` // booking_id → reservation+completion status BookingsState map[string]BookingState `json:"bookings_state" bson:"bookings_state,omitempty"` // booking_id → reservation+completion status
PurchasesState map[string]bool `json:"purchases_state" bson:"purchases_state,omitempty"` // purchase_id → confirmed PurchasesState map[string]bool `json:"purchases_state" bson:"purchases_state,omitempty"` // purchase_id → confirmed
// ResourceConsents records which consent strings the user acknowledged per resource
// (resource_id → list of acknowledged ConsentString values) at scheduling time.
ResourceConsents map[string][]string `json:"resource_consents,omitempty" bson:"resource_consents,omitempty"`
// Graph is a lightweight, real-time summary of the workflow execution graph. // Graph is a lightweight, real-time summary of the workflow execution graph.
// Keyed by workflow graph item ID; updated by oc-scheduler on each step-done event. // Keyed by workflow graph item ID; updated by oc-scheduler on each step-done event.
// Consumed by oc-front to render the live execution panel via websocket updates. // Consumed by oc-front to render the live execution panel via websocket updates.
@@ -97,7 +101,7 @@ func (r *WorkflowExecution) CanUpdate(set utils.DBObject) (bool, utils.DBObject)
} }
func (r *WorkflowExecution) CanDelete() bool { func (r *WorkflowExecution) CanDelete() bool {
return r.IsDraft // only draft bookings can be deleted return true // only draft bookings can be deleted
} }
func (wfa *WorkflowExecution) Equals(we *WorkflowExecution) bool { func (wfa *WorkflowExecution) Equals(we *WorkflowExecution) bool {
+175 -5
View File
@@ -1,23 +1,49 @@
package workspace package workspace
import ( import (
"fmt"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area" "cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
) )
// trustedRelations holds peer relations that yield TrustMap = true for a resource.
var trustedRelations = map[peer.PeerRelation]bool{
peer.PARTNER: true,
peer.MASTER: true,
peer.NANO: true,
peer.ORGANIZATION_MASTER: true,
peer.ORGANIZATION_MEMBER: true,
peer.ORGANIZATION_PARTNER: true,
}
// Workspace is a struct that represents a workspace // Workspace is a struct that represents a workspace
type Workspace struct { type Workspace struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
resources.ResourceSet // ResourceSet contains the resources of the workspace (data, compute, processing, storage, workflow) resources.WorkspaceResourceSet // WorkspaceResourceSet persists both IDs and complete resource objects
IsContextual bool `json:"is_contextual" bson:"is_contextual" default:"false"` // IsContextual is a flag that indicates if the workspace is contextual IsContextual bool `json:"is_contextual" bson:"is_contextual" default:"false"`
Active bool `json:"active" bson:"active" default:"false"` // Active is a flag that indicates if the workspace is active Active bool `json:"active" bson:"active" default:"false"`
Shared string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workspace Shared string `json:"shared,omitempty" bson:"shared,omitempty"`
// Notifications accumulates strings for auto-modifications (e.g. resource removed after peer blacklist).
// Cleared by the owner via the notifications update endpoint.
Notifications []string `json:"notifications,omitempty" bson:"notifications,omitempty"`
// TrustMap maps resource ID → trust bool based on the creator peer's relation.
// Not persisted (bson:"-") — recomputed on every load by ComputeTrustAndClean.
TrustMap map[string]bool `json:"trust_map,omitempty" bson:"-"`
// StaleMap maps resource ID → stale bool. Populated at GET time from the
// verify campaign results stored in oc-workspace's stale cache. Not persisted.
StaleMap map[string]bool `json:"stale_map,omitempty" bson:"-"`
} }
func (d *Workspace) GetAccessor(request *tools.APIRequest) utils.Accessor { func (d *Workspace) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request) // Create a new instance of the accessor return NewAccessor(request)
} }
func (ao *Workspace) VerifyAuth(callName string, request *tools.APIRequest) bool { func (ao *Workspace) VerifyAuth(callName string, request *tools.APIRequest) bool {
@@ -30,3 +56,147 @@ func (ao *Workspace) VerifyAuth(callName string, request *tools.APIRequest) bool
} }
return ao.AbstractObject.VerifyAuth(callName, request) return ao.AbstractObject.VerifyAuth(callName, request)
} }
// ComputeTrustAndClean populates TrustMap for all resources embedded in this workspace,
// removes resources whose creator peer is blacklisted, and appends a deletion notification
// for each removal. Returns true when at least one resource was removed (caller should persist).
func (w *Workspace) ComputeTrustAndClean() bool {
w.TrustMap = map[string]bool{}
selfPeer, _ := utils.GetMySelf(peer.NewShallowAccessor())
var selfPeerID string
if selfPeer != nil {
if p, ok := selfPeer.(*peer.Peer); ok {
selfPeerID = p.PeerID
}
}
// Cache peer relations to avoid redundant DB lookups per workspace load.
cache := map[string]peer.PeerRelation{}
relation := func(creatorID string) peer.PeerRelation {
if r, ok := cache[creatorID]; ok {
return r
}
if creatorID == selfPeerID {
cache[creatorID] = peer.SELF
return peer.SELF
}
results, _, _ := peer.NewShallowAccessor().Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"peer_id": {{Operator: dbs.EQUAL.String(), Value: creatorID}},
},
}, "", false, 0, 1)
rel := peer.NONE
if len(results) > 0 {
if p, ok := results[0].(*peer.Peer); ok {
rel = p.Relation
}
}
cache[creatorID] = rel
return rel
}
setTrust := func(id, creatorID string, rel peer.PeerRelation) {
w.TrustMap[id] = (creatorID == selfPeerID) || trustedRelations[rel]
}
changed := false
var keptData []*resources.DataResource
for _, r := range w.DataResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptData = append(keptData, r)
}
}
w.DataResources = keptData
var keptCompute []*resources.ComputeResource
for _, r := range w.ComputeResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptCompute = append(keptCompute, r)
}
}
w.ComputeResources = keptCompute
var keptStorage []*resources.StorageResource
for _, r := range w.StorageResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptStorage = append(keptStorage, r)
}
}
w.StorageResources = keptStorage
var keptProcessing []*resources.ProcessingResource
for _, r := range w.ProcessingResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptProcessing = append(keptProcessing, r)
}
}
w.ProcessingResources = keptProcessing
var keptWorkflow []*resources.WorkflowResource
for _, r := range w.WorkflowResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptWorkflow = append(keptWorkflow, r)
}
}
w.WorkflowResources = keptWorkflow
var keptService []*resources.ServiceResource
for _, r := range w.ServiceResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptService = append(keptService, r)
}
}
w.ServiceResources = keptService
var keptDynamic []*resources.DynamicResource
for _, r := range w.DynamicResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptDynamic = append(keptDynamic, r)
}
}
w.DynamicResources = keptDynamic
var keptNative []*resources.NativeTool
for _, r := range w.NativeTools {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptNative = append(keptNative, r)
}
}
w.NativeTools = keptNative
return changed
}
+70 -6
View File
@@ -7,10 +7,60 @@ import (
"cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area" "cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
"cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
) )
func init() {
resources.WorkspaceCandidatesProvider = func(dt tools.DataType, request *tools.APIRequest) []resources.ResourceInterface {
res, _, _ := NewAccessor(request).Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"user_creator_id": {{Operator: dbs.EQUAL.String(), Value: request.Username}},
"active": {{Operator: dbs.EQUAL.String(), Value: true}},
},
}, "", false, 0, 1)
if len(res) == 0 {
return []resources.ResourceInterface{}
}
ws, ok := res[0].(*Workspace)
if !ok {
return []resources.ResourceInterface{}
}
// ws.Fill was already called by Search — typed slices are populated.
// Return an empty non-nil slice when the workspace exists but has no
// resources of the requested type: caller must not fall back to catalog.
out := []resources.ResourceInterface{}
switch dt {
case tools.COMPUTE_RESOURCE:
for _, c := range ws.ComputeResources {
out = append(out, c)
}
case tools.DATA_RESOURCE:
for _, c := range ws.DataResources {
out = append(out, c)
}
case tools.STORAGE_RESOURCE:
for _, c := range ws.StorageResources {
out = append(out, c)
}
case tools.PROCESSING_RESOURCE:
for _, c := range ws.ProcessingResources {
out = append(out, c)
}
case tools.WORKFLOW_RESOURCE:
for _, c := range ws.WorkflowResources {
out = append(out, c)
}
case tools.SERVICE_RESOURCE:
for _, c := range ws.ServiceResources {
out = append(out, c)
}
}
return out
}
}
// Workspace is a struct that represents a workspace // Workspace is a struct that represents a workspace
type workspaceMongoAccessor struct { type workspaceMongoAccessor struct {
utils.AbstractAccessor[*Workspace] // AbstractAccessor contains the basic fields of an accessor (model, caller) utils.AbstractAccessor[*Workspace] // AbstractAccessor contains the basic fields of an accessor (model, caller)
@@ -88,25 +138,39 @@ func (a *workspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject,
func (a *workspaceMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { func (a *workspaceMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne(id, a.New(), func(d utils.DBObject) (utils.DBObject, int, error) { return utils.GenericLoadOne(id, a.New(), func(d utils.DBObject) (utils.DBObject, int, error) {
d.(*Workspace).Fill(a.GetRequest()) w := d.(*Workspace)
return d, 200, nil w.Fill(a.GetRequest())
a.applyTrustAndClean(w)
return w, 200, nil
}, a) }, a)
} }
func (a *workspaceMongoAccessor) LoadAll(isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) { func (a *workspaceMongoAccessor) LoadAll(isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
return utils.GenericLoadAll[*Workspace](func(d utils.DBObject) utils.ShallowDBObject { return utils.GenericLoadAll[*Workspace](func(d utils.DBObject) utils.ShallowDBObject {
d.(*Workspace).Fill(a.GetRequest()) w := d.(*Workspace)
return d w.Fill(a.GetRequest())
a.applyTrustAndClean(w)
return w
}, isDraft, a, offset, limit) }, isDraft, a, offset, limit)
} }
func (a *workspaceMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) { func (a *workspaceMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
return utils.GenericSearch[*Workspace](filters, search, (&Workspace{}).GetObjectFilters(search), func(d utils.DBObject) utils.ShallowDBObject { return utils.GenericSearch[*Workspace](filters, search, (&Workspace{}).GetObjectFilters(search), func(d utils.DBObject) utils.ShallowDBObject {
d.(*Workspace).Fill(a.GetRequest()) w := d.(*Workspace)
return d w.Fill(a.GetRequest())
a.applyTrustAndClean(w)
return w
}, isDraft, a, offset, limit) }, isDraft, a, offset, limit)
} }
// applyTrustAndClean calls ComputeTrustAndClean and, when resources were removed due to
// blacklisted peers, persists the cleaned workspace back to the database.
func (a *workspaceMongoAccessor) applyTrustAndClean(w *Workspace) {
if changed := w.ComputeTrustAndClean(); changed {
utils.GenericUpdateOne(w.Serialize(w), w.GetID(), a)
}
}
/* /*
This function is used to share the workspace with the peers This function is used to share the workspace with the peers
*/ */
+50 -1
View File
@@ -40,6 +40,7 @@ const (
REFUND REFUND
DISCOUNT DISCOUNT
SUBSCRIPTION SUBSCRIPTION
POLICY
) )
var NOAPI = func() string { var NOAPI = func() string {
@@ -104,6 +105,7 @@ var InnerDefaultAPI = [...]func() string{
NOAPI, NOAPI,
NOAPI, NOAPI,
NOAPI, NOAPI,
PEERSAPI,
} }
// Bind the standard data name to the data type // Bind the standard data name to the data type
@@ -138,6 +140,7 @@ var Str = [...]string{
"refund", "refund",
"discount", "discount",
"subscription", "subscription",
"policy",
} }
func FromString(comp string) int { func FromString(comp string) int {
@@ -174,7 +177,7 @@ func DataTypeList() []DataType {
WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY, WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY,
ORDER, PURCHASE_RESOURCE, ORDER, PURCHASE_RESOURCE,
LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL, EXECUTION_VERIFICATION, ALLOWED_IMAGE, SERVICE_RESOURCE, DYNAMIC_RESOURCE, LIVE_SERVICE, LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL, EXECUTION_VERIFICATION, ALLOWED_IMAGE, SERVICE_RESOURCE, DYNAMIC_RESOURCE, LIVE_SERVICE,
PAYMENT, REFUND, DISCOUNT, SUBSCRIPTION} PAYMENT, REFUND, DISCOUNT, SUBSCRIPTION, POLICY}
} }
type PropalgationMessage struct { type PropalgationMessage struct {
@@ -208,6 +211,40 @@ const (
// for a private source resource (isReachable=false, Phase 4). // for a private source resource (isReachable=false, Phase 4).
// oc-discovery routes it to the resource owner peer via ProtocolSourcePresignResource. // oc-discovery routes it to the resource owner peer via ProtocolSourcePresignResource.
PB_SOURCE_PRESIGN PB_SOURCE_PRESIGN
// PB_ORG_PARTNER is propagated via PB_PROPAGATE through oc-discovery to the
// organization master's oc-discovery, which notifies its oc-peer via
// ORG_PARTNER_EVENT. The master's oc-peer confirms or rejects by emitting a
// PROPALGATION_EVENT back, which oc-discovery routes to the originating
// oc-discovery, which in turn notifies our oc-peer via ORG_PARTNER_EVENT to
// finalize the relation.
PB_ORG_PARTNER
// PB_WATCH_RESOURCE is emitted by oc-workspace when a non-self resource is
// stored in a workspace. oc-discovery contacts the creator peer to register
// the watching peerID in the creator's watcher cache so it receives future
// CREATE/DELETE events for that resource.
// Payload: { "creator_peer_id": "...", "resource_id": "..." }
PB_WATCH_RESOURCE
// PB_UNWATCH_RESOURCE is emitted by oc-workspace when a non-self resource is
// removed from all workspaces. oc-discovery contacts the creator peer to
// deregister the watching peerID from the creator's watcher cache.
// Payload: { "creator_peer_id": "...", "resource_id": "..." }
PB_UNWATCH_RESOURCE
// PB_BOOKING_SYNC is emitted by master every 24 h to each known NANO.
// Payload: {"peer_id": nano.PeerID, "booking_sync_ids": ["id1", "id2", ...]}
// Nano compares the list against its own confirmed bookings and calls
// SendBookingToMaster for any it has that master is missing.
PB_BOOKING_SYNC
// PB_VERIFY_RESOURCE is emitted by oc-workspace or oc-workflow on workspace
// activation / workflow opening to verify that an embedded non-self resource
// is still current. oc-discovery forwards the request to the creator peer via
// ProtocolVerifyResource; the result comes back as a VERIFY_RESOURCE NATS event.
// Payload: { "creator_peer_id": "…", "data_type": N, "resource_payload": {…} }
PB_VERIFY_RESOURCE
) )
func GetActionString(ss string) PubSubAction { func GetActionString(ss string) PubSubAction {
@@ -242,6 +279,14 @@ func GetActionString(ss string) PubSubAction {
return PB_PROPAGATE return PB_PROPAGATE
case "source_presign": case "source_presign":
return PB_SOURCE_PRESIGN return PB_SOURCE_PRESIGN
case "org_partner":
return PB_ORG_PARTNER
case "watch_resource":
return PB_WATCH_RESOURCE
case "unwatch_resource":
return PB_UNWATCH_RESOURCE
case "booking_sync":
return PB_BOOKING_SYNC
default: default:
return NONE return NONE
} }
@@ -266,6 +311,10 @@ var path = []string{
"observe_close", // 14 PB_OBSERVE_CLOSE "observe_close", // 14 PB_OBSERVE_CLOSE
"propagate", // 15 PB_PROPAGATE "propagate", // 15 PB_PROPAGATE
"source_presign", // 16 PB_SOURCE_PRESIGN "source_presign", // 16 PB_SOURCE_PRESIGN
"org_partner", // 17 PB_ORG_PARTNER
"watch_resource", // 18 PB_WATCH_RESOURCE
"unwatch_resource", // 19 PB_UNWATCH_RESOURCE
"booking_sync", // 20 PB_BOOKING_SYNC
} }
func (m PubSubAction) String() string { func (m PubSubAction) String() string {
+11 -2
View File
@@ -32,7 +32,7 @@ var meths = []string{"remove execution", "create execution", "planner execution"
"considers event", "admiralty config event", "minio config event", "pvc config event", "considers event", "admiralty config event", "minio config event", "pvc config event",
"workflow started event", "workflow step done event", "workflow done event", "workflow started event", "workflow step done event", "workflow done event",
"peer behavior event", "peer observe response event", "peer observe event", "peer behavior event", "peer observe response event", "peer observe event",
"source presign event", "source presign event", "org partner event", "verify resource event",
} }
const ( const (
@@ -85,6 +85,15 @@ const (
// oc-datacenter listens to it to generate a pre-signed Minio URL and reply // oc-datacenter listens to it to generate a pre-signed Minio URL and reply
// via PB_CONSIDERS (Phase 4 — isReachable=false). // via PB_CONSIDERS (Phase 4 — isReachable=false).
SOURCE_PRESIGN_EVENT SOURCE_PRESIGN_EVENT
// ORG_PARTNER_EVENT is emitted by a peer to its OrganizationMaster to ask:
// "is peer X one of your members?". The master replies via the same channel.
ORG_PARTNER_EVENT
// VERIFY_RESOURCE is emitted by oc-discovery when it receives a verify response
// from a remote peer via ProtocolVerifyResource. oc-workspace and oc-workflow
// listen to this event to update their stale caches.
VERIFY_RESOURCE
) )
func (n NATSMethod) String() string { func (n NATSMethod) String() string {
@@ -98,7 +107,7 @@ func NameToMethod(name string) NATSMethod {
CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT, PVC_CONFIG_EVENT, CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT, PVC_CONFIG_EVENT,
WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT, WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT,
PEER_BEHAVIOR_EVENT, PEER_OBSERVE_RESPONSE_EVENT, PEER_OBSERVE_EVENT, PEER_BEHAVIOR_EVENT, PEER_OBSERVE_RESPONSE_EVENT, PEER_OBSERVE_EVENT,
SOURCE_PRESIGN_EVENT} { SOURCE_PRESIGN_EVENT, ORG_PARTNER_EVENT, VERIFY_RESOURCE} {
if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) { if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) {
return v return v
} }