39 Commits

Author SHA1 Message Date
mr 3a66b42c01 test 2026-06-23 09:40:33 +02:00
mr 58e97fbe74 lib 2026-06-22 07:50:01 +02:00
mr 1425a31494 Orga + Consent 2026-06-05 15:56:50 +02:00
mr 6ee169f444 can delete 2026-06-05 09:59:14 +02:00
mr 5be3c0a10a wf 2026-06-04 13:36:55 +02:00
mr d9723e6431 oc lib test 2026-06-04 12:04:04 +02:00
mr c726361deb Get Exploit 2026-06-04 11:31:03 +02:00
mr d19ff1f8b2 CreatorID 2026-06-04 09:00:31 +02:00
mr 69244163b4 workflow bson missing 2026-06-03 15:33:39 +02:00
mr 842364d145 deep merge 2026-06-03 11:23:41 +02:00
mr 9ab374b720 oc nano 2026-06-03 08:03:11 +02:00
mr aa2bca48ef test 2026-06-02 16:43:55 +02:00
mr 322ea38bb4 nil map 2026-06-02 15:57:33 +02:00
mr c1490a7746 proper 2026-06-02 15:49:03 +02:00
mr 49f60d9416 test 2026-06-02 15:34:54 +02:00
mr 548ed84b13 prospect 2026-06-02 15:11:58 +02:00
mr 178cd48314 prospect 2026-06-02 14:45:09 +02:00
mr b31df8cfed adjust allowed instances for type behaviors 2026-06-02 14:16:32 +02:00
mr a0a53f0477 Panic recovered FiltersFromFlatMap 2026-06-02 14:04:33 +02:00
mr dffaa6326f still prospecting 2026-06-02 14:03:30 +02:00
mr 8155f4b17a prospect 2026-06-02 13:47:41 +02:00
mr 51307bb067 trace debug dynamic 2026-06-02 13:33:47 +02:00
mr 6ac788a8ff test 2026-06-02 12:48:33 +02:00
mr a7d0c1208b full filter interpretation 2026-06-02 11:35:19 +02:00
mr 3924fca289 Kick name malformed 2026-06-02 10:50:42 +02:00
mr 797df972ac plantuml duplication behavior 2026-06-02 08:42:20 +02:00
mr 71ae0d2cfc inout change vars regime 2026-06-01 16:45:05 +02:00
mr 5806bdd3d2 Correct link 2026-06-01 15:01:45 +02:00
mr 99fbe82a51 Update Auto Outputs on sourced. 2026-06-01 08:45:50 +02:00
mr 7d8bec9a78 Container can be sourced 2026-06-01 08:26:48 +02:00
mr afd8a2d97c conditionnal is_draft 2026-05-29 14:31:44 +02:00
mr 82a4708f46 Is draft 2026-05-29 14:12:40 +02:00
mr a3bca24982 isdraft pb 2026-05-29 13:43:41 +02:00
mr 41706949fd isDraft Update dafuck 2026-05-29 12:51:04 +02:00
mr b1429596bb not proper enum compararison 2026-05-29 10:38:45 +02:00
mr ce110ee634 inspect comparision 2026-05-29 10:22:07 +02:00
mr 7e5b69b1d2 Live resource failed 2026-05-29 09:12:52 +02:00
mr 26948da3c1 relation peer mismatch 2026-05-28 16:29:36 +02:00
mr 4e1b1164cc relation mismatched 2026-05-28 16:28:51 +02:00
31 changed files with 1760 additions and 166 deletions
+53 -14
View File
@@ -3,6 +3,7 @@ package dbs
import (
"fmt"
"reflect"
"regexp"
"runtime/debug"
"strings"
@@ -160,11 +161,16 @@ type Filter struct {
// Keys inside "and"/"or" are json tag names; the function resolves each to its
// full dotted BSON path using the target struct. Unknown keys are kept as-is.
func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *Filters {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Panic recovered FiltersFromFlatMap: %v\n", r)
}
}()
filters := &Filters{
And: make(map[string][]Filter),
Or: make(map[string][]Filter),
}
paths := jsonToBsonPaths(reflect.TypeOf(target), "")
paths := jsonToBsonPaths(reflect.TypeOf(target), "", "")
resolve := func(jsonKey string) string {
if p, ok := paths[jsonKey]; ok {
return p
@@ -179,11 +185,12 @@ func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *Fil
}
for jsonKey, val := range m {
bsonKey := resolve(jsonKey)
items, ok := val.([]interface{})
//items, ok := val.([]interface{})
fmt.Println(jsonKey, val, bsonKey)
if !ok {
continue
}
for _, item := range items {
for _, item := range val.([]interface{}) {
entry, ok := item.(map[string]interface{})
if !ok {
continue
@@ -214,11 +221,22 @@ func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *Fil
//
// Anonymous embedded fields without any tag follow the BSON convention of this
// codebase: they are stored as a nested sub-document whose key is the lowercased
// struct type name (e.g. utils.AbstractObject → "abstractobject").
func jsonToBsonPaths(t reflect.Type, prefix string) map[string]string {
// struct type name (e.g. utils.AbstractObject → "abstractobject"). Their JSON
// fields are promoted (flat), so bsonPrefix advances but jsonPrefix does not.
//
// For fields inside slices or maps, both the leaf json name and the full dotted
// json path (e.g. "instances.access_protocol") are registered as keys so callers
// can use either form unambiguously.
func jsonToBsonPaths(t reflect.Type, bsonPrefix string, jsonPrefix string) map[string]string {
for t.Kind() == reflect.Ptr || t.Kind() == reflect.Slice {
t = t.Elem()
}
if t.Kind() == reflect.Map {
t = t.Elem()
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
}
result := make(map[string]string)
if t.Kind() != reflect.Struct {
return result
@@ -232,17 +250,21 @@ func jsonToBsonPaths(t reflect.Type, prefix string) map[string]string {
bsonName := strings.Split(bsonTag, ",")[0]
// Anonymous embedded struct with no tags: use lowercase type name as BSON prefix.
// JSON fields are promoted so jsonPrefix stays the same.
if field.Anonymous && jsonName == "" && bsonName == "" {
ft := field.Type
for ft.Kind() == reflect.Ptr {
ft = ft.Elem()
}
if ft.Kind() == reflect.Struct {
embedPrefix := strings.ToLower(ft.Name())
if prefix != "" {
embedPrefix = prefix + "." + embedPrefix
embedBsonPrefix := strings.ToLower(ft.Name())
re := regexp.MustCompile(`\[[^\]]*\]`)
embedBsonPrefix = re.ReplaceAllString(embedBsonPrefix, "")
embedBsonPrefix = strings.ReplaceAll(embedBsonPrefix, "*", "")
if bsonPrefix != "" {
embedBsonPrefix = bsonPrefix + "." + embedBsonPrefix
}
for k, v := range jsonToBsonPaths(ft, embedPrefix) {
for k, v := range jsonToBsonPaths(ft, embedBsonPrefix, jsonPrefix) {
if _, exists := result[k]; !exists {
result[k] = v
}
@@ -258,19 +280,36 @@ func jsonToBsonPaths(t reflect.Type, prefix string) map[string]string {
bsonName = jsonName
}
fullPath := bsonName
if prefix != "" {
fullPath = prefix + "." + bsonName
fullBsonPath := bsonName
if bsonPrefix != "" {
fullBsonPath = bsonPrefix + "." + bsonName
}
fullJsonPath := jsonName
if jsonPrefix != "" {
fullJsonPath = jsonPrefix + "." + jsonName
}
result[jsonName] = fullPath
result[jsonName] = fullBsonPath
// Also register the full dotted JSON path so callers can use
// "instances.access_protocol" instead of just "access_protocol".
if fullJsonPath != jsonName {
if _, exists := result[fullJsonPath]; !exists {
result[fullJsonPath] = fullBsonPath
}
}
ft := field.Type
for ft.Kind() == reflect.Ptr || ft.Kind() == reflect.Slice {
ft = ft.Elem()
}
if ft.Kind() == reflect.Map {
ft = ft.Elem()
for ft.Kind() == reflect.Ptr {
ft = ft.Elem()
}
}
if ft.Kind() == reflect.Struct {
for k, v := range jsonToBsonPaths(ft, fullPath) {
for k, v := range jsonToBsonPaths(ft, fullBsonPath, fullJsonPath) {
if _, exists := result[k]; !exists {
result[k] = v
}
+1
View File
@@ -77,6 +77,7 @@ const (
REFUND = tools.REFUND
DISCOUNT = tools.DISCOUNT
SUBSCRIPTION = tools.SUBSCRIPTION
POLICY = tools.POLICY
)
func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *dbs.Filters {
+2 -3
View File
@@ -10,14 +10,13 @@ import (
"cloud.o-forge.io/core/oc-lib/tools"
)
/*
* Booking is a struct that represents a booking
*/
type Booking struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
FromNano string `json:"from_nano,omitempty" bson:"priced_item,omitempty"`
FromNano string `json:"from_nano,omitempty" bson:"from_nano,omitempty"`
PricedItem map[string]interface{} `json:"priced_item,omitempty" bson:"priced_item,omitempty"` // We need to add the validate:"required" tag once the pricing feature is implemented, removed to avoid handling the error
ResumeMetrics map[string]map[string]models.MetricResume `json:"resume_metrics,omitempty" bson:"resume_metrics,omitempty"`
@@ -147,5 +146,5 @@ func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
}
func (r *Booking) CanDelete() bool {
return r.IsDraft // only draft bookings can be deleted
return true // only draft bookings can be deleted
}
+10
View File
@@ -1,5 +1,7 @@
package enum
import "fmt"
type InfrastructureType int
const (
@@ -18,3 +20,11 @@ func (t InfrastructureType) String() string {
func InfrastructureList() []InfrastructureType {
return []InfrastructureType{DOCKER, KUBERNETES, SLURM, HW, CONDOR}
}
func (d InfrastructureType) Compare(indexStr interface{}) bool {
return fmt.Sprintf("%v", indexStr) == fmt.Sprintf("%v", d.EnumIndex()) || fmt.Sprintf("%v", indexStr) == d.String()
}
func (d InfrastructureType) EnumIndex() int {
return int(d)
}
+10
View File
@@ -1,5 +1,7 @@
package enum
import "fmt"
type StorageSize int
// StorageType - Enum that defines the type of storage
@@ -54,3 +56,11 @@ func (t StorageType) String() string {
func TypeList() []StorageType {
return []StorageType{FILE, STREAM, API, DATABASE, S3, MEMORY, HARDWARE, AZURE, GCS}
}
func (d StorageType) Compare(indexStr interface{}) bool {
return fmt.Sprintf("%v", indexStr) == fmt.Sprintf("%v", d.EnumIndex()) || fmt.Sprintf("%v", indexStr) == d.String()
}
func (d StorageType) EnumIndex() int {
return int(d)
}
+61 -6
View File
@@ -1,18 +1,73 @@
package models
import "sort"
// SortedArgValues returns arg Values: readonly args first (sorted by Index), then non-readonly in order.
func SortedArgValues(args []Arg) []string {
var ro, nro []Arg
for _, a := range args {
if a.IsReadonly {
ro = append(ro, a)
} else {
nro = append(nro, a)
}
}
sort.Slice(ro, func(i, j int) bool { return ro[i].Index < ro[j].Index })
out := make([]string, 0, len(args))
for _, a := range ro {
out = append(out, a.Value)
}
for _, a := range nro {
out = append(out, a.Value)
}
return out
}
// ReadonlyArgValues returns only the readonly arg values sorted by Index.
func ReadonlyArgValues(args []Arg) []string {
var ro []Arg
for _, a := range args {
if a.IsReadonly {
ro = append(ro, a)
}
}
sort.Slice(ro, func(i, j int) bool { return ro[i].Index < ro[j].Index })
out := make([]string, 0, len(ro))
for _, a := range ro {
out = append(out, a.Value)
}
return out
}
// NonReadonlyArgValues returns only the non-readonly arg values in their order.
func NonReadonlyArgValues(args []Arg) []string {
out := make([]string, 0)
for _, a := range args {
if !a.IsReadonly {
out = append(out, a.Value)
}
}
return out
}
type Arg struct {
Value string `json:"value,omitempty" bson:"value,omitempty"` // Image is the container image TEMPO
Index int `json:"index,omitempty" bson:"index,omitempty"`
IsReadonly bool `json:"is_readonly,omitempty" bson:"is_readonly,omitempty"`
}
type PathSource struct {
Source string `json:"source,omitempty" bson:"source,omitempty"` // Image is the container image TEMPO
IsReachable bool `json:"is_reachable,omitempty" bson:"is_reachable,omitempty"`
Args string `json:"args,omitempty" bson:"args,omitempty"` // Args is the container arguments
Args []Arg `json:"args,omitempty" bson:"args,omitempty"` // Args is the container arguments
Volumes map[string]string `json:"volumes,omitempty" bson:"volumes,omitempty"` // Volumes is the container volumes
}
type Container struct {
Image string `json:"image,omitempty" bson:"image,omitempty"` // Image is the container image TEMPO
Command string `json:"command,omitempty" bson:"command,omitempty"` // Command is the container command
Args string `json:"args,omitempty" bson:"args,omitempty"` // Args is the container arguments
Env map[string]string `json:"env,omitempty" bson:"env,omitempty"` // Env is the container environment variables
Volumes map[string]string `json:"volumes,omitempty" bson:"volumes,omitempty"` // Volumes is the container volumes
PathSource
Image string `json:"image,omitempty" bson:"image,omitempty"` // Image is the container image TEMPO
Command string `json:"command,omitempty" bson:"command,omitempty"` // Command is the container command
}
type Expose struct {
+6 -6
View File
@@ -7,12 +7,12 @@ type Artifact struct {
}
type Param struct {
Name string `json:"name" bson:"name" validate:"required"`
Attr string `json:"attr,omitempty" bson:"attr,omitempty"`
Value string `json:"value,omitempty" bson:"value,omitempty"`
Origin string `json:"origin,omitempty" bson:"origin,omitempty"`
Readonly bool `json:"readonly" bson:"readonly" default:"true"`
Optionnal bool `json:"optionnal" bson:"optionnal" default:"true"`
Name string `json:"name" bson:"name" validate:"required"`
Attr string `json:"attr,omitempty" bson:"attr,omitempty"`
Value string `json:"value,omitempty" bson:"value,omitempty"`
Origin string `json:"origin,omitempty" bson:"origin,omitempty"`
Readonly bool `json:"readonly" bson:"readonly" default:"true"`
Required bool `json:"required" bson:"required" default:"true"`
}
type InOutputs struct {
+4 -1
View File
@@ -1,6 +1,8 @@
package live
import (
"fmt"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/utils"
@@ -36,7 +38,8 @@ type LiveDatacenter struct {
}
func (r *LiveDatacenter) IsCompatible(service map[string]interface{}) bool {
return service["infrastructure"] == r.Infrastructure && service["architecture"] == r.Architecture
fmt.Println("COMPARE <", r.Infrastructure.Compare(service["infrastructure"]), "> AND <", service["architecture"], "> <", r.Architecture, ">")
return r.Infrastructure.Compare(service["infrastructure"]) && service["architecture"] == r.Architecture
}
func (d *LiveDatacenter) GetAccessor(request *tools.APIRequest) utils.Accessor {
+10 -6
View File
@@ -1,6 +1,8 @@
package live
import (
"fmt"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
@@ -26,11 +28,11 @@ func (p ServiceProtocol) String() string {
// rather than trusted from the ServiceResource, which may be stale.
type LiveService struct {
AbstractLive
MaxConcurrent int `json:"max_concurrent" bson:"max_concurrent"`
Protocol ServiceProtocol `json:"protocol" bson:"protocol" default:"0"`
EndpointPattern string `json:"endpoint_pattern,omitempty" bson:"endpoint_pattern,omitempty"`
HealthCheckPath string `json:"health_check_path,omitempty" bson:"health_check_path,omitempty"`
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
MaxConcurrent int `json:"max_concurrent" bson:"max_concurrent"`
Protocol ServiceProtocol `json:"protocol" bson:"protocol" default:"0"`
EndpointPattern string `json:"endpoint_pattern,omitempty" bson:"endpoint_pattern,omitempty"`
HealthCheckPath string `json:"health_check_path,omitempty" bson:"health_check_path,omitempty"`
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
}
func (d *LiveService) GetAccessor(request *tools.APIRequest) utils.Accessor {
@@ -38,5 +40,7 @@ func (d *LiveService) GetAccessor(request *tools.APIRequest) utils.Accessor {
}
func (r *LiveService) IsCompatible(service map[string]interface{}) bool {
return service["infrastructure"] == r.Infrastructure
fmt.Println("COMPARE <", service["infrastructure"], "> <", r.Infrastructure, ">")
return r.Infrastructure.Compare(service["infrastructure"])
}
+5 -1
View File
@@ -1,6 +1,8 @@
package live
import (
"fmt"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
@@ -26,7 +28,9 @@ type LiveStorage struct {
}
func (r *LiveStorage) IsCompatible(service map[string]interface{}) bool {
return service["storage_type"] == r.StorageType
fmt.Println("COMPARE <", r.StorageType.Compare(service["storage_type"]), ">")
return r.StorageType.Compare(service["storage_type"])
}
func (d *LiveStorage) GetAccessor(request *tools.APIRequest) utils.Accessor {
+2
View File
@@ -11,6 +11,7 @@ import (
"cloud.o-forge.io/core/oc-lib/models/execution_verification"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/peer/policy"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
@@ -58,6 +59,7 @@ var ModelsCatalog = map[string]func() utils.DBObject{
tools.SUBSCRIPTION.String(): func() utils.DBObject { return &subscription.Subscription{} },
tools.EXECUTION_VERIFICATION.String(): func() utils.DBObject { return &execution_verification.ExecutionVerification{} },
tools.ALLOWED_IMAGE.String(): func() utils.DBObject { return &allowed_image.AllowedImage{} },
tools.POLICY.String(): func() utils.DBObject { return &policy.Policy{} },
}
// Model returns the model object based on the model type
+11
View File
@@ -0,0 +1,11 @@
package organization
// Organization holds descriptive data about a peer's organization.
// It is optional — a peer without an organization has a nil Organization field.
type Organization struct {
Name string `json:"name,omitempty" bson:"name,omitempty"`
Description string `json:"description,omitempty" bson:"description,omitempty"`
Website string `json:"website,omitempty" bson:"website,omitempty"`
Sector string `json:"sector,omitempty" bson:"sector,omitempty"`
Country string `json:"country,omitempty" bson:"country,omitempty"`
}
+32 -1
View File
@@ -5,6 +5,7 @@ import (
"strings"
"time"
"cloud.o-forge.io/core/oc-lib/models/organization"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/biter777/countries"
@@ -30,12 +31,23 @@ const (
NANO
PENDING_NANO
PENDING_MASTER
ORGANIZATION_MASTER
ORGANIZATION_MEMBER
ORGANIZATION_PARTNER
ORGANIZATION_MASTER_PENDING
ORGANIZATION_MEMBER_PENDING
)
var path = []string{"known", "self", "partner", "blacklist", "partner", "pending_partner", "master", "nano", "pending_nano", "pending_master"}
var path = []string{
"known", "self", "partner", "blacklist", "pending_partner",
"master", "nano", "pending_nano", "pending_master",
"organization_master", "organization_member", "organization_partner",
"organization_master_pending", "organization_member_pending",
}
func GetRelationPath(str string) int {
for i, p := range path {
fmt.Println("GetRelationPath", i, p)
if str == p {
return i
}
@@ -120,6 +132,20 @@ type Peer struct {
// When oc-discovery fails to reach a NANO, it routes the booking to MasterID instead.
MasterID string `json:"master_id,omitempty" bson:"master_id,omitempty"`
// OrganizationMasterID is the MongoDB _id of the peer acting as this node's
// organization master. Set automatically when an ORGANIZATION_MASTER relation
// is validated (equivalent of MasterID for the Nano/Master hierarchy).
OrganizationMasterID string `json:"organization_master_id,omitempty" bson:"organization_master_id,omitempty"`
// Organization holds optional descriptive data about the peer's organization.
// Null when the peer has not registered any organization data.
Organization *organization.Organization `json:"organization,omitempty" bson:"organization,omitempty"`
// PolicyID references the Policy document that governs which inbound
// libp2p streams are authorized for this peer.
// When empty, all non-vital streams are denied by default.
PolicyID string `json:"policy_id,omitempty" bson:"policy_id,omitempty"`
// Volatile connectivity state — never persisted to DB (bson:"-").
// Set in-memory by oc-peer when it receives a PEER_OBSERVE_RESPONSE_EVENT.
// Considered offline when LastHeartbeat is older than 60 s (30 s interval + 30 s grace).
@@ -136,6 +162,11 @@ func (ri *Peer) Extend(typ ...string) map[string][]tools.DataType {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.PEER)
case "policy":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.POLICY)
}
}
return ext
+30
View File
@@ -0,0 +1,30 @@
package policy
import (
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
// Policy defines which inbound libp2p streams are authorized for a peer.
// Vital streams (planner, considers, minio/admiralty config, source-presign,
// verify, observe, heartbeat) are always allowed regardless of policy.
type Policy struct {
utils.AbstractObject
// Resource CRUD
AllowSearch bool `json:"allow_search" bson:"allow_search"`
AllowCreate bool `json:"allow_create" bson:"allow_create"`
AllowUpdate bool `json:"allow_update" bson:"allow_update"`
AllowDelete bool `json:"allow_delete" bson:"allow_delete"`
// Resource freshness tracking
AllowRegisterWatcher bool `json:"allow_register_watcher" bson:"allow_register_watcher"`
AllowUnregisterWatcher bool `json:"allow_unregister_watcher" bson:"allow_unregister_watcher"`
// Organization partner confirmation
AllowOrgPartnerConfirm bool `json:"allow_org_partner_confirm" bson:"allow_org_partner_confirm"`
}
func (p *Policy) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request)
}
@@ -0,0 +1,31 @@
package policy
import (
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
type policyMongoAccessor struct {
utils.AbstractAccessor[*Policy]
}
func NewAccessor(request *tools.APIRequest) *policyMongoAccessor {
return &policyMongoAccessor{
AbstractAccessor: utils.AbstractAccessor[*Policy]{
Logger: logs.CreateLogger(tools.POLICY.String()),
Request: request,
Type: tools.POLICY,
New: func() *Policy { return &Policy{} },
},
}
}
func (a *policyMongoAccessor) GetObjectFilters(search string) *dbs.Filters {
return &dbs.Filters{
Or: map[string][]dbs.Filter{
"abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
},
}
}
+14
View File
@@ -0,0 +1,14 @@
package resources
// Consent represents a consent request attached to a resource.
// ConsentString is the question displayed to the user.
// Optional, when true, means the user may decline without blocking scheduling.
// A nil Optional is treated as required (false).
type Consent struct {
ConsentString string `json:"consent_string" bson:"consent_string"`
Optional *bool `json:"optional,omitempty" bson:"optional,omitempty"`
}
func (c Consent) IsOptional() bool {
return c.Optional != nil && *c.Optional
}
+240 -33
View File
@@ -20,9 +20,9 @@ import (
*/
type DynamicResource struct {
AbstractResource
Type tools.DataType `bson:"type,omitempty" json:"type,omitempty"`
Filters map[string]interface{} `bson:"filters,omitempty" json:"filters,omitempty"`
SortRules map[string]string `bson:"rules,omitempty" json:"rules,omitempty"`
Type tools.DataType `bson:"type,omitempty" json:"type,omitempty"`
Filters dbs.Filters `bson:"filters,omitempty" json:"filters,omitempty"`
SortRules map[string]string `bson:"rules,omitempty" json:"rules,omitempty"`
PeerIds map[int]string `bson:"peer_ids,omitempty" json:"peer_ids,omitempty"`
ResourceIds map[int]string `bson:"resource_ids,omitempty" json:"resource_ids,omitempty"`
@@ -37,44 +37,249 @@ type DynamicResource struct {
WatchedDynamicResource []string `bson:"watched_dynamic_resource,omitempty" json:"watched_dynamic_resource,omitempty"`
}
// WorkspaceCandidatesProvider can be set by the workspace package to supply
// contextual workspace resources for a given DataType and request without
// creating a circular import (workspace → resources → workspace).
// When set, SetAllowedInstances uses workspace-scoped resources instead of
// the full catalog for requests that carry a username.
var WorkspaceCandidatesProvider func(dt tools.DataType, request *tools.APIRequest) []ResourceInterface
func (d *DynamicResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
return nil
}
func (d *DynamicResource) SetAllowedInstances(request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF {
d.Instances = []ResourceInstanceITF{}
for k, v := range map[tools.DataType]ResourceInterface{
tools.COMPUTE_RESOURCE: &ComputeResource{},
tools.DATA_RESOURCE: &DataResource{},
tools.STORAGE_RESOURCE: &StorageResource{},
tools.PROCESSING_RESOURCE: &ProcessingResource{},
tools.WORKFLOW_RESOURCE: &WorkflowResource{}} {
if d.Type != k {
continue
}
access := NewAccessor[*DynamicResource](k, request)
a, _, _ := access.Search(dbs.FiltersFromFlatMap(d.Filters, v), "", false, 0, 100000)
d.PeerIds = map[int]string{}
d.ResourceIds = map[int]string{}
for _, res := range a {
for _, i := range res.(ResourceInterface).SetAllowedInstances(request, instance_id...) {
d.PeerIds[len(d.Instances)] = res.GetCreatorID()
d.ResourceIds[len(d.Instances)] = res.GetID()
d.Instances = append(d.Instances, i)
}
}
break
if WorkspaceCandidatesProvider != nil {
candidates := WorkspaceCandidatesProvider(d.Type, request)
return d.SetAllowedInstancesFromSet(candidates, request, instance_id...)
}
sorted := make([]ResourceInstanceITF, len(d.Instances))
copy(sorted, d.Instances)
slices.SortStableFunc(sorted, func(a, b ResourceInstanceITF) int {
d.SortRules["partnerships"] = "%v not contains 2"
return d.compareByRules(a, b, d.SortRules)
})
d.WatchedDynamicResource = []string{}
d.Instances = []ResourceInstanceITF{}
d.sortAndResetInstances()
return d.Instances
}
// SetAllowedInstancesFromSet fills d.Instances from a pre-loaded workspace resource set
// instead of querying the catalog. Filters are applied in-memory against the candidates.
// Called by WorkspaceResourceSet.Fill so dynamic resources only see workspace-scoped resources.
func (d *DynamicResource) SetAllowedInstancesFromSet(candidates []ResourceInterface, request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF {
d.Instances = []ResourceInstanceITF{}
d.PeerIds = map[int]string{}
d.ResourceIds = map[int]string{}
for _, res := range candidates {
if !d.matchesFilters(res) {
continue
}
for _, i := range res.SetAllowedInstances(request, instance_id...) {
d.PeerIds[len(d.Instances)] = res.GetCreatorID()
d.ResourceIds[len(d.Instances)] = res.GetID()
d.Instances = append(d.Instances, i)
}
}
d.sortAndResetInstances()
return d.Instances
}
func (d *DynamicResource) sortAndResetInstances() {
if d.SortRules != nil {
sorted := make([]ResourceInstanceITF, len(d.Instances))
copy(sorted, d.Instances)
slices.SortStableFunc(sorted, func(a, b ResourceInstanceITF) int {
d.SortRules["partnerships"] = "%v not contains 2"
return d.compareByRules(a, b, d.SortRules)
})
d.Instances = sorted
}
d.WatchedDynamicResource = []string{}
}
// matchesFilters applies d.Filters in-memory against a serialized resource.
// Keys in d.Filters are JSON tag names; Serialize returns JSON tag names — no bson conversion needed.
func (d *DynamicResource) matchesFilters(res ResourceInterface) bool {
if len(d.Filters.And) == 0 && len(d.Filters.Or) == 0 {
return true
}
m := res.Serialize(res)
for field, fs := range d.Filters.And {
vals := nestedVals(m, strings.Split(field, "."))
for _, f := range fs {
if !anyMatchesOp(vals, f) {
return false
}
}
}
if len(d.Filters.Or) > 0 {
matched := false
for field, fs := range d.Filters.Or {
vals := nestedVals(m, strings.Split(field, "."))
for _, f := range fs {
if anyMatchesOp(vals, f) {
matched = true
break
}
}
if matched {
break
}
}
if !matched {
return false
}
}
return true
}
// nestedVals navigates a dot-path into m and collects all leaf values.
// Arrays at any level are expanded: each element is recursed into.
func nestedVals(m map[string]interface{}, path []string) []interface{} {
if len(path) == 0 || m == nil {
return nil
}
val, ok := m[path[0]]
if !ok {
return nil
}
if len(path) == 1 {
if arr, ok := val.([]interface{}); ok {
return arr
}
return []interface{}{val}
}
rest := path[1:]
switch v := val.(type) {
case map[string]interface{}:
return nestedVals(v, rest)
case []interface{}:
var out []interface{}
for _, elem := range v {
if em, ok := elem.(map[string]interface{}); ok {
out = append(out, nestedVals(em, rest)...)
}
}
return out
}
return nil
}
// anyMatchesOp returns true if at least one value in vals satisfies filter f.
func anyMatchesOp(vals []interface{}, f dbs.Filter) bool {
if f.Operator == dbs.EXISTS.String() {
exists := len(vals) > 0 && vals[0] != nil
want := true
if b, ok := f.Value.(bool); ok {
want = b
}
return exists == want
}
if f.Operator == dbs.IN.String() {
list, ok := f.Value.([]interface{})
if !ok {
return false
}
for _, v := range vals {
sv := fmt.Sprintf("%v", v)
for _, item := range list {
if sv == fmt.Sprintf("%v", item) {
return true
}
}
}
return false
}
for _, v := range vals {
if opMatches(v, f) {
return true
}
}
return false
}
func opMatches(val interface{}, f dbs.Filter) bool {
switch f.Operator {
case dbs.EQUAL.String():
return fmt.Sprintf("%v", val) == fmt.Sprintf("%v", f.Value)
case dbs.NOT.String():
return fmt.Sprintf("%v", val) != fmt.Sprintf("%v", f.Value)
case dbs.LIKE.String():
return strings.Contains(strings.ToLower(fmt.Sprintf("%v", val)), strings.ToLower(fmt.Sprintf("%v", f.Value)))
case dbs.GT.String(), dbs.GTE.String(), dbs.LT.String(), dbs.LTE.String():
return numericCmp(val, f.Value, f.Operator)
case dbs.ELEMMATCH.String():
arr, ok := val.([]interface{})
if !ok {
return false
}
sub, ok := f.Value.(map[string]interface{})
if !ok {
return false
}
for _, elem := range arr {
em, ok := elem.(map[string]interface{})
if !ok {
continue
}
allOk := true
for k, sv := range sub {
if fmt.Sprintf("%v", em[k]) != fmt.Sprintf("%v", sv) {
allOk = false
break
}
}
if allOk {
return true
}
}
return false
}
return false
}
func numericCmp(a, b interface{}, op string) bool {
fa, aOk := toFloat64(a)
fb, bOk := toFloat64(b)
if !aOk || !bOk {
sa, sb := fmt.Sprintf("%v", a), fmt.Sprintf("%v", b)
switch op {
case dbs.GT.String():
return sa > sb
case dbs.GTE.String():
return sa >= sb
case dbs.LT.String():
return sa < sb
case dbs.LTE.String():
return sa <= sb
}
return false
}
switch op {
case dbs.GT.String():
return fa > fb
case dbs.GTE.String():
return fa >= fb
case dbs.LT.String():
return fa < fb
case dbs.LTE.String():
return fa <= fb
}
return false
}
func toFloat64(v interface{}) (float64, bool) {
switch n := v.(type) {
case float64:
return n, true
case float32:
return float64(n), true
case int:
return float64(n), true
case int32:
return float64(n), true
case int64:
return float64(n), true
}
return 0, false
}
func (d *DynamicResource) AddInstances(instance ResourceInstanceITF) {
d.Instances = append(d.Instances, instance)
}
@@ -91,13 +296,15 @@ func (d *DynamicResource) GetSelectedInstance(index *int) ResourceInstanceITF {
d.SelectedIndex = i
for i := range inst.GetPartnerships() {
fmt.Println(inst.GetProfile(d.PeerIds[i], &i, &d.SelectedBuyingStrategy, &d.SelectedPricingStrategy), d.PeerIds[i], &i, &d.SelectedBuyingStrategy, &d.SelectedPricingStrategy)
if inst.GetProfile(d.PeerIds[i], &i, &d.SelectedBuyingStrategy, &d.SelectedPricingStrategy) != nil {
d.SelectedPartnershipIndex = &i
break
}
}
if d.SelectedPartnershipIndex == nil {
continue
i := 0
d.SelectedPartnershipIndex = &i
}
return inst
}
+2
View File
@@ -29,6 +29,8 @@ type ResourceInterface interface {
GetEnv() []models.Param
GetInputs() []models.Param
GetOutputs() []models.Param
GetExploitationAuthorizations() []ExploitationAuthorization
GetConsents() []Consent
}
type ResourceInstanceITF interface {
+125 -1
View File
@@ -15,7 +15,8 @@ type ResourceSet struct {
Services []string `bson:"services,omitempty" json:"services,omitempty"`
Dynamics []string `bson:"dynamics,omitempty" json:"dynamics,omitempty"`
// DynamicResources are stored inline — no DB collection, resolved at runtime via SetAllowedInstances.
// Runtime-only resource objects — not persisted. Populated by Fill() from the ID lists above.
// Use WorkspaceResourceSet when full object persistence is needed (workspace fluid catalog).
DynamicResources []*DynamicResource `bson:"-" json:"dynamic_resources,omitempty"`
DataResources []*DataResource `bson:"-" json:"data_resources,omitempty"`
StorageResources []*StorageResource `bson:"-" json:"storage_resources,omitempty"`
@@ -26,6 +27,129 @@ type ResourceSet struct {
ServiceResources []*ServiceResource `bson:"-" json:"service_resources,omitempty"`
}
// WorkspaceResourceSet mirrors ResourceSet but persists complete resource objects to MongoDB.
// Use this in workspace documents where the workspace acts as a fluid resource catalog.
// The *Resource fields are loaded from bson on read; Fill() skips catalog lookup when they are
// already populated.
type WorkspaceResourceSet struct {
Datas []string `bson:"datas,omitempty" json:"datas,omitempty"`
Storages []string `bson:"storages,omitempty" json:"storages,omitempty"`
Processings []string `bson:"processings,omitempty" json:"processings,omitempty"`
Computes []string `bson:"computes,omitempty" json:"computes,omitempty"`
Workflows []string `bson:"workflows,omitempty" json:"workflows,omitempty"`
NativeTool []string `bson:"native,omitempty" json:"native,omitempty"`
Services []string `bson:"services,omitempty" json:"services,omitempty"`
Dynamics []string `bson:"dynamics,omitempty" json:"dynamics,omitempty"`
DynamicResources []*DynamicResource `bson:"dynamic_resources,omitempty" json:"dynamic_resources,omitempty"`
DataResources []*DataResource `bson:"data_resources,omitempty" json:"data_resources,omitempty"`
StorageResources []*StorageResource `bson:"storage_resources,omitempty" json:"storage_resources,omitempty"`
ProcessingResources []*ProcessingResource `bson:"processing_resources,omitempty" json:"processing_resources,omitempty"`
ComputeResources []*ComputeResource `bson:"compute_resources,omitempty" json:"compute_resources,omitempty"`
WorkflowResources []*WorkflowResource `bson:"workflow_resources,omitempty" json:"workflow_resources,omitempty"`
NativeTools []*NativeTool `bson:"native_tools,omitempty" json:"native_tools,omitempty"`
ServiceResources []*ServiceResource `bson:"service_resources,omitempty" json:"service_resources,omitempty"`
}
func (r *WorkspaceResourceSet) Clear() {
r.DataResources = nil
r.StorageResources = nil
r.ProcessingResources = nil
r.ComputeResources = nil
r.WorkflowResources = nil
r.ServiceResources = nil
r.DynamicResources = nil
r.NativeTools = nil
}
// Fill populates *Resource fields from their ID lists. When a field is already non-nil
// (loaded from the workspace MongoDB document), the catalog lookup is skipped for that type.
func (r *WorkspaceResourceSet) Fill(request *tools.APIRequest) {
if r.DataResources == nil {
for _, id := range r.Datas {
if d, _, e := (&DataResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.DataResources = append(r.DataResources, d.(*DataResource))
}
}
}
if r.ComputeResources == nil {
for _, id := range r.Computes {
if d, _, e := (&ComputeResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.ComputeResources = append(r.ComputeResources, d.(*ComputeResource))
}
}
}
if r.StorageResources == nil {
for _, id := range r.Storages {
if d, _, e := (&StorageResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.StorageResources = append(r.StorageResources, d.(*StorageResource))
}
}
}
if r.ProcessingResources == nil {
for _, id := range r.Processings {
if d, _, e := (&ProcessingResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.ProcessingResources = append(r.ProcessingResources, d.(*ProcessingResource))
}
}
}
if r.WorkflowResources == nil {
for _, id := range r.Workflows {
if d, _, e := (&WorkflowResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.WorkflowResources = append(r.WorkflowResources, d.(*WorkflowResource))
}
}
}
if r.ServiceResources == nil {
for _, id := range r.Services {
if d, _, e := (&ServiceResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.ServiceResources = append(r.ServiceResources, d.(*ServiceResource))
}
}
}
if r.DynamicResources == nil {
for _, id := range r.Dynamics {
if d, _, e := (&DynamicResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.DynamicResources = append(r.DynamicResources, d.(*DynamicResource))
}
}
}
for _, d := range r.DynamicResources {
var candidates []ResourceInterface
switch d.Type {
case tools.COMPUTE_RESOURCE:
for _, c := range r.ComputeResources {
candidates = append(candidates, c)
}
case tools.DATA_RESOURCE:
for _, c := range r.DataResources {
candidates = append(candidates, c)
}
case tools.STORAGE_RESOURCE:
for _, c := range r.StorageResources {
candidates = append(candidates, c)
}
case tools.PROCESSING_RESOURCE:
for _, c := range r.ProcessingResources {
candidates = append(candidates, c)
}
case tools.WORKFLOW_RESOURCE:
for _, c := range r.WorkflowResources {
candidates = append(candidates, c)
}
case tools.SERVICE_RESOURCE:
for _, c := range r.ServiceResources {
candidates = append(candidates, c)
}
}
if len(candidates) > 0 {
d.SetAllowedInstancesFromSet(candidates, request)
} else {
d.SetAllowedInstances(request)
}
}
}
func (r *ResourceSet) Clear() {
r.DataResources = nil
r.StorageResources = nil
+12 -1
View File
@@ -3,6 +3,7 @@ package resources
import (
"encoding/json"
"errors"
"fmt"
"slices"
"time"
@@ -49,6 +50,10 @@ type AbstractResource struct {
// NOT in a separate collection.
// Visibility-filtered per requesting peer before any response is sent.
ExploitationAuthorizations []ExploitationAuthorization `json:"exploitation_authorizations,omitempty" bson:"exploitation_authorizations,omitempty"`
// Consents lists the consent questions the user must acknowledge before
// scheduling this resource. Consents with Optional=true may be skipped.
Consents []Consent `json:"consents,omitempty" bson:"consents,omitempty"`
}
func (ri *AbstractResource) Extend(typ ...string) map[string][]tools.DataType {
@@ -99,6 +104,11 @@ func (r *AbstractResource) GetExploitationAuthorizations() []ExploitationAuthori
return r.ExploitationAuthorizations
}
// GetConsents returns the consent questions declared by this resource.
func (r *AbstractResource) GetConsents() []Consent {
return r.Consents
}
// FilterExploitationAuthorizations removes AEs that are not visible to peerID.
// Must be called before serializing the resource for a consumer peer.
// The resource owner (CreatorID) always sees all AEs unfiltered.
@@ -144,7 +154,8 @@ func (r *AbstractResource) StoreDraftDefault() {
}
func (r *AbstractResource) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
return r.IsDraft, set
fmt.Println("IsDrafted", r.IsDraft, set.IsDrafted())
return r.IsDraft || set.IsDrafted(), set
}
type AbstractInstanciatedResource[T ResourceInstanceITF] struct {
+201 -2
View File
@@ -1,12 +1,15 @@
package resources
import (
"encoding/json"
"errors"
"fmt"
"slices"
"cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
@@ -16,6 +19,48 @@ type ResourceMongoAccessor[T ResourceInterface] struct {
utils.AbstractAccessor[ResourceInterface] // AbstractAccessor contains the basic fields of an accessor (model, caller)
}
func sourceFromAccess(access *ResourceAccess) string {
if access == nil {
return ""
}
if access.Container != nil && access.Container.Source != "" {
return access.Container.Source
}
if access.Source != nil && access.Source.Source != "" {
return access.Source.Source
}
return ""
}
func upsertSourceParam(outputs []models.Param, source string) []models.Param {
for i, p := range outputs {
if p.Attr == "source" {
outputs[i].Value = source
return outputs
}
}
return append(outputs, models.Param{Attr: "source", Value: source, Readonly: true})
}
func applyAccessSourceOutput(data utils.DBObject) {
switch r := data.(type) {
case *ProcessingResource:
for _, inst := range r.Instances {
if src := sourceFromAccess(inst.Access); src != "" {
r.Outputs = upsertSourceParam(r.Outputs, src)
return
}
}
case *DataResource:
for _, inst := range r.Instances {
if src := sourceFromAccess(inst.Access); src != "" {
r.Outputs = upsertSourceParam(r.Outputs, src)
return
}
}
}
}
// New creates a new instance of the computeMongoAccessor
func NewAccessor[T ResourceInterface](t tools.DataType, request *tools.APIRequest) *ResourceMongoAccessor[T] {
if !slices.Contains([]tools.DataType{
@@ -67,6 +112,40 @@ func (dca *ResourceMongoAccessor[T]) LoadOne(id string) (utils.DBObject, int, er
return data, code, err
}
var workspaceResourceTypes = []tools.DataType{
tools.COMPUTE_RESOURCE,
tools.DATA_RESOURCE,
tools.PROCESSING_RESOURCE,
tools.STORAGE_RESOURCE,
tools.WORKFLOW_RESOURCE,
tools.SERVICE_RESOURCE,
}
func emitResourceNATS(method tools.NATSMethod, dt tools.DataType, payload []byte) {
if !slices.Contains(workspaceResourceTypes, dt) {
return
}
tools.NewNATSCaller().SetNATSPub(method, tools.NATSResponse{
FromApp: config.GetAppName(),
Datatype: dt,
Method: int(method),
Payload: payload,
})
}
func (dca *ResourceMongoAccessor[T]) DeleteOne(id string) (utils.DBObject, int, error) {
data, code, err := dca.AbstractAccessor.LoadOne(id)
if err != nil {
return data, code, err
}
res, code, err := dca.AbstractAccessor.DeleteOne(id)
if err == nil && data != nil {
b, _ := json.Marshal(data)
go emitResourceNATS(tools.REMOVE_RESOURCE, dca.GetType(), b)
}
return res, code, err
}
func (dca *ResourceMongoAccessor[T]) UpdateOne(set map[string]interface{}, id string) (utils.DBObject, int, error) {
if dca.GetType() == tools.COMPUTE_RESOURCE {
delete(set, "architecture")
@@ -76,6 +155,14 @@ func (dca *ResourceMongoAccessor[T]) UpdateOne(set map[string]interface{}, id st
} else if dca.GetType() == tools.STORAGE_RESOURCE {
delete(set, "storage_type")
}
if dca.GetType() == tools.PROCESSING_RESOURCE || dca.GetType() == tools.DATA_RESOURCE {
if merged, _, _, err := utils.ModelGenericUpdateOne(set, id, dca); err == nil {
applyAccessSourceOutput(merged)
if serialized := merged.Serialize(merged); serialized != nil {
set["outputs"] = serialized["outputs"]
}
}
}
return utils.GenericUpdateOne(set, id, dca)
}
@@ -94,6 +181,7 @@ func (dca *ResourceMongoAccessor[T]) StoreOne(data utils.DBObject) (utils.DBObje
}
a = live.NewAccessor[*live.LiveDatacenter](tools.LIVE_DATACENTER, &tools.APIRequest{Admin: true})
res, _, _ := a.LoadOne(r.Instances[0].GetID())
fmt.Println(res, r.Instances[0].GetID())
if res == nil {
return nil, 404, errors.New("can't create a non existing computing units resource not reported onto compute units catalog")
}
@@ -110,7 +198,7 @@ func (dca *ResourceMongoAccessor[T]) StoreOne(data utils.DBObject) (utils.DBObje
a = live.NewAccessor[*live.LiveService](tools.LIVE_SERVICE, &tools.APIRequest{Admin: true})
res, _, _ := a.LoadOne(r.Instances[0].GetID())
if res == nil {
return nil, 404, errors.New("can't create a non existing service resource not reported onto compute units catalog")
return nil, 404, errors.New("can't create a non existing service resource not reported onto service catalog")
}
if !res.(*live.LiveService).IsCompatible(data.Serialize(data)) {
return nil, 404, errors.New("live service target is not compatible")
@@ -125,7 +213,7 @@ func (dca *ResourceMongoAccessor[T]) StoreOne(data utils.DBObject) (utils.DBObje
a = live.NewAccessor[*live.LiveStorage](tools.LIVE_STORAGE, &tools.APIRequest{Admin: true})
res, _, _ := a.LoadOne(r.Instances[0].GetID())
if res == nil {
return nil, 404, errors.New("can't create a non existing storage resource not reported onto compute units catalog")
return nil, 404, errors.New("can't create a non existing storage resource not reported onto storage catalog")
}
if !res.(*live.LiveStorage).IsCompatible(data.Serialize(data)) {
return nil, 404, errors.New("live storage target is not compatible")
@@ -133,6 +221,7 @@ func (dca *ResourceMongoAccessor[T]) StoreOne(data utils.DBObject) (utils.DBObje
i = res.GetID()
idsToUpdate = res.(*live.LiveStorage).ResourcesID
}
applyAccessSourceOutput(data)
res, code, err := utils.GenericStoreOne(data, dca)
if res != nil && i != "" {
idsToUpdate = append(idsToUpdate, res.GetID())
@@ -140,9 +229,119 @@ func (dca *ResourceMongoAccessor[T]) StoreOne(data utils.DBObject) (utils.DBObje
"resources_id": idsToUpdate,
}, i)
}
if err == nil && res != nil {
b, _ := json.Marshal(res)
go emitResourceNATS(tools.CREATE_RESOURCE, dca.GetType(), b)
}
return res, code, err
}
// PurgedResourcePayload holds a silently-deleted resource's type and serialized payload.
type PurgedResourcePayload struct {
DT tools.DataType
Payload []byte
}
// purgeByType searches and silently deletes all resources of type T created by creatorID.
// Uses AbstractAccessor.DeleteOne directly to bypass the NATS-emitting override.
func purgeByType[T ResourceInterface](dt tools.DataType, creatorID string) []PurgedResourcePayload {
a := NewAccessor[T](dt, nil)
if a == nil {
return nil
}
res, _, _ := a.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"creator_id": {{Operator: dbs.EQUAL.String(), Value: creatorID}},
},
}, "", false, 0, 10000)
var result []PurgedResourcePayload
for _, item := range res {
b, err := json.Marshal(item)
if err != nil {
continue
}
a.AbstractAccessor.DeleteOne(item.GetID())
result = append(result, PurgedResourcePayload{DT: dt, Payload: b})
}
return result
}
// PurgeCreatorResources deletes all catalog resources created by creatorPeerID from
// the DB without emitting NATS. Used for non-blacklist peer privilege downgrades where
// workspace state should be left untouched.
func PurgeCreatorResources(creatorPeerID string) []PurgedResourcePayload {
var result []PurgedResourcePayload
result = append(result, purgeByType[*ComputeResource](tools.COMPUTE_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*DataResource](tools.DATA_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*ProcessingResource](tools.PROCESSING_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*StorageResource](tools.STORAGE_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*WorkflowResource](tools.WORKFLOW_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*ServiceResource](tools.SERVICE_RESOURCE, creatorPeerID)...)
return result
}
// FilterMapFromResourcePayload deserializes a resource payload by DataType, zeros out
// the AbstractInstanciatedResource (and its AbstractResource / Instances sub-fields),
// then marshals back to get only the concrete type's own JSON fields.
// Returns nil for WORKFLOW_RESOURCE and unknown types.
// JSON keys only — not BSON paths.
func FilterMapFromResourcePayload(dt tools.DataType, payload []byte) map[string]interface{} {
var m map[string]interface{}
switch dt {
case tools.COMPUTE_RESOURCE:
var r ComputeResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*ComputeResourceInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.DATA_RESOURCE:
var r DataResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*DataInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.PROCESSING_RESOURCE:
var r ProcessingResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*ProcessingInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.STORAGE_RESOURCE:
var r StorageResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*StorageResourceInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.SERVICE_RESOURCE:
var r ServiceResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*ServiceInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.WORKFLOW_RESOURCE:
var r WorkflowResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractResource = AbstractResource{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
default:
return nil
}
return m
}
func (dca *ResourceMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
return dca.StoreOne(data)
}
+5 -1
View File
@@ -98,6 +98,10 @@ func (r *AbstractObject) DeepCopy() *AbstractObject {
return &obj
}
func (r *AbstractObject) SetDraft(draft bool) {
r.IsDraft = draft
}
func (r *AbstractObject) SetName(name string) {
r.Name = name
}
@@ -146,7 +150,7 @@ func (ao *AbstractObject) UpToDate(user string, peer string, create bool) {
ao.UpdateDate = time.Now()
ao.UpdaterID = peer
ao.UserUpdaterID = user
if create && ao.CreatorID != "" {
if create && ao.CreatorID == "" {
ao.CreationDate = time.Now()
ao.CreatorID = peer
ao.UserCreatorID = user
+86 -4
View File
@@ -109,6 +109,9 @@ func ModelGenericUpdateOne(change map[string]interface{}, id string, a Accessor)
obj := a.NewObj()
b, _ := json.Marshal(r)
json.Unmarshal(b, obj)
if change["is_draft"] == true {
obj.SetDraft(change["is_draft"] == true)
}
if !a.GetRequest().Admin {
var ok bool
ok, r = r.CanUpdate(obj)
@@ -125,10 +128,8 @@ func ModelGenericUpdateOne(change map[string]interface{}, id string, a Accessor)
r.Sign()
}
loaded := r.Serialize(r) // get the loaded object
for k, v := range change { // apply the changes, with a flatten method
loaded[k] = v
}
loaded := r.Serialize(r) // get the loaded object
deepMerge(loaded, change)
newObj := a.NewObj()
b, err = json.Marshal(loaded)
if err != nil {
@@ -252,6 +253,87 @@ func IsMySelf(peerID string, wfa Accessor) (bool, string) {
return peerID == pp.GetID(), pp.GetID()
}
// deepMerge overlays patch values onto base, preserving base values for keys
// absent from patch, nil patch values, and empty strings when base is non-empty.
// This prevents partial frontend payloads from silently erasing server-managed
// fields (source, env, country, owners, creator_id, creation_date, …).
func deepMerge(base, patch map[string]interface{}) {
for k, pv := range patch {
bv := base[k]
switch pvTyped := pv.(type) {
case map[string]interface{}:
if bvMap, ok := bv.(map[string]interface{}); ok {
deepMerge(bvMap, pvTyped)
} else {
base[k] = pv
}
case []interface{}:
if bvSlice, ok := bv.([]interface{}); ok {
base[k] = mergeSlices(bvSlice, pvTyped)
} else {
base[k] = pv
}
case string:
// Don't overwrite a non-empty base value with an empty string.
if pvTyped != "" {
base[k] = pv
}
default:
if pv != nil {
base[k] = pv
}
}
}
}
// mergeSlices merges two slices element-wise.
// For slices of maps it matches elements by their "id" field when available;
// falls back to positional matching. An empty patch slice leaves base intact.
func mergeSlices(base, patch []interface{}) []interface{} {
if len(patch) == 0 {
return base
}
for _, e := range patch {
if _, ok := e.(map[string]interface{}); !ok {
return patch // non-map elements: replace wholesale
}
}
baseByID := map[string]map[string]interface{}{}
for _, e := range base {
if em, ok := e.(map[string]interface{}); ok {
if id, ok := em["id"].(string); ok && id != "" {
baseByID[id] = em
}
}
}
result := make([]interface{}, 0, len(patch))
for i, pe := range patch {
pm, _ := pe.(map[string]interface{})
if pm == nil {
result = append(result, pe)
continue
}
var baseElem map[string]interface{}
if id, ok := pm["id"].(string); ok && id != "" {
baseElem = baseByID[id]
}
if baseElem == nil && i < len(base) {
baseElem, _ = base[i].(map[string]interface{})
}
if baseElem != nil {
merged := make(map[string]interface{}, len(baseElem))
for k, v := range baseElem {
merged[k] = v
}
deepMerge(merged, pm)
result = append(result, merged)
} else {
result = append(result, pe)
}
}
return result
}
func GenerateNodeID() (string, error) {
folderStatic := "/var/lib/opencloud-node"
if _, err := os.Stat(folderStatic); err == nil {
+1
View File
@@ -26,6 +26,7 @@ type DBObject interface {
GetID() string
GetName() string
SetName(name string)
SetDraft(draft bool)
IsDrafted() bool
CanDelete() bool
StoreDraftDefault()
+490 -66
View File
@@ -11,6 +11,7 @@ import (
"strings"
"time"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/booking/planner"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
@@ -53,7 +54,11 @@ type Workflow struct {
Outputs map[string][]models.Param `json:"outputs" bson:"outputs"`
Args map[string][]string `json:"args" bson:"args"`
Exposes map[string][]models.Expose `bson:"exposes" json:"exposes"` // Expose is the execution
SelectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty"`
SelectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty" bson:"selected_embedded_storages,omitempty"`
// StaleMap maps resource ID → stale bool. Populated at GET time from the
// verify campaign results stored in oc-workflow's stale cache. Not persisted.
StaleMap map[string]bool `json:"stale_map,omitempty" bson:"-"`
}
func (d *Workflow) GetAccessor(request *tools.APIRequest) utils.Accessor {
@@ -108,9 +113,9 @@ func (d *Workflow) GetResources(dt tools.DataType) []resources.ResourceInterface
return itf
}
func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.APIRequest) (*Workflow, error) {
func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.APIRequest) (*Workflow, []string, error) {
if plantUML == nil {
return d, errors.New("no file available to export")
return d, nil, errors.New("no file available to export")
}
defer plantUML.Close()
@@ -189,9 +194,11 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A
lines = append(lines, scanner.Text())
}
if err := scanner.Err(); err != nil {
return d, err
return d, nil, err
}
var warnings []string
for i, line := range lines {
trimmed := strings.TrimSpace(line)
@@ -215,42 +222,41 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A
}
}
for n, new := range resourceCatalog {
if strings.Contains(line, n+"(") && !strings.Contains(line, "!procedure") && !strings.Contains(line, "!define") { // exclude macro declarations
newRes := new()
// Handle links outside the catalog loop: each link line must be processed
// exactly once (the catalog loop would otherwise call extractLink once per
// catalog entry, producing N×7 duplicate links in the graph).
if strings.Contains(line, "-->") {
if err := d.extractLink(parseLine, graphVarName, "-->", false); err != nil {
fmt.Println(err)
}
continue
}
if strings.Contains(line, "<--") {
if err := d.extractLink(parseLine, graphVarName, "<--", true); err != nil {
fmt.Println(err)
}
continue
}
if strings.Contains(line, "--") {
if err := d.extractLink(parseLine, graphVarName, "--", false); err != nil {
fmt.Println(err)
}
continue
}
for n, newFn := range resourceCatalog {
if strings.Contains(line, n+"(") && !strings.Contains(line, "!procedure") && !strings.Contains(line, "!define") {
newRes := newFn()
newRes.SetID(uuid.New().String())
varName, graphItem, err := d.extractResourcePlantUML(parseLine, newRes, n, request.PeerID)
varName, graphItem, warns, err := d.extractResourcePlantUML(parseLine, newRes, n, request.PeerID, request)
if err != nil {
return d, err
return d, warnings, err
}
warnings = append(warnings, warns...)
if graphItem != nil {
graphVarName[varName] = *graphItem
}
continue
} else if strings.Contains(line, "-->") {
err := d.extractLink(parseLine, graphVarName, "-->", false)
if err != nil {
fmt.Println(err)
continue
}
} else if strings.Contains(line, "<--") {
err := d.extractLink(parseLine, graphVarName, "<--", true)
if err != nil {
fmt.Println(err)
continue
}
} else if strings.Contains(line, "--") {
err := d.extractLink(parseLine, graphVarName, "--", false)
if err != nil {
fmt.Println(err)
continue
}
} else if strings.Contains(line, "-") {
err := d.extractLink(parseLine, graphVarName, "-", false)
if err != nil {
fmt.Println(err)
continue
}
break
}
}
}
@@ -260,10 +266,9 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A
d.generateResource(d.GetResources(tools.STORAGE_RESOURCE), request)
d.generateResource(d.GetResources(tools.COMPUTE_RESOURCE), request)
d.generateResource(d.GetResources(tools.WORKFLOW_RESOURCE), request)
d.generateResource(d.GetResources(tools.SERVICE_RESOURCE), request)
d.generateResource(d.GetResources(tools.DYNAMIC_RESOURCE), request)
d.Graph.Items = graphVarName
return d, nil
return d, warnings, nil
}
func (d *Workflow) generateResource(datas []resources.ResourceInterface, request *tools.APIRequest) error {
@@ -355,14 +360,37 @@ func (d *Workflow) extractLink(line string, graphVarName map[string]graph.GraphI
if len(splitted) < 2 {
return errors.New("links elements not found")
}
// Source: trim surrounding whitespace.
srcVar := strings.TrimSpace(splitted[0])
// Destination: trim whitespace then stop at the first space or apostrophe
// (the rest may be a trailing comment produced by ToPlantUML look-ahead).
dstTokens := strings.FieldsFunc(strings.TrimSpace(splitted[1]), func(r rune) bool {
return r == ' ' || r == '\t' || r == '\''
})
if len(dstTokens) == 0 {
return errors.New("link destination var name not found")
}
dstVar := dstTokens[0]
srcItem, srcOk := graphVarName[srcVar]
dstItem, dstOk := graphVarName[dstVar]
if !srcOk || srcItem.ID == "" {
return fmt.Errorf("link source %q not declared", srcVar)
}
if !dstOk || dstItem.ID == "" {
return fmt.Errorf("link destination %q not declared", dstVar)
}
link := &graph.GraphLink{
Source: graph.Position{
ID: graphVarName[splitted[0]].ID,
ID: srcItem.ID,
X: 0,
Y: 0,
},
Destination: graph.Position{
ID: graphVarName[splitted[1]].ID,
ID: dstItem.ID,
X: 0,
Y: 0,
},
@@ -381,35 +409,46 @@ func (d *Workflow) extractLink(line string, graphVarName map[string]graph.GraphI
return nil
}
func (d *Workflow) extractResourcePlantUML(line string, resource resources.ResourceInterface, dataName string, peerID string) (string, *graph.GraphItem, error) {
func (d *Workflow) extractResourcePlantUML(line string, resource resources.ResourceInterface, dataName string, peerID string, request *tools.APIRequest) (string, *graph.GraphItem, []string, error) {
splittedFunc := strings.Split(line, "(")
if len(splittedFunc) <= 1 {
return "", nil, errors.New("Can't deserialize Object, there's no func")
return "", nil, nil, errors.New("Can't deserialize Object, there's no func")
}
splittedParams := strings.Split(splittedFunc[1], ",")
if len(splittedParams) <= 1 {
return "", nil, errors.New("Can't deserialize Object, there's no params")
return "", nil, nil, errors.New("Can't deserialize Object, there's no params")
}
varName := splittedParams[0]
splitted := strings.Split(splittedParams[1], "\"")
if len(splitted) <= 1 {
return "", nil, errors.New("Can't deserialize Object, there's no name")
return "", nil, nil, errors.New("Can't deserialize Object, there's no name")
}
resource.SetName(strings.ReplaceAll(splitted[1], "\\n", " "))
name := strings.ReplaceAll(splitted[1], "\\n", " ")
// Resources with instances get a default one seeded from the parent resource,
// then overridden by any explicit comment attributes.
// Event (NativeTool) has no instance: getNewInstance returns nil and is skipped.
instance := d.getNewInstance(dataName, splitted[1], peerID)
// Extract comment text (if present) for metadata parsing.
comment := ""
if parts := strings.Split(line, "'"); len(parts) > 1 {
comment = strings.ReplaceAll(parts[1], "'", "")
}
var warns []string
// Try to resolve an existing catalog resource (by id, then by name).
if existing, warn := d.resolveExistingResource(resource, dataName, name, comment, request); existing != nil {
warns = append(warns, warn)
item := d.addExistingGraphItem(dataName, existing)
return varName, item, warns, nil
}
// No existing resource — create new.
resource.SetName(name)
instance := d.getNewInstance(dataName, name, peerID)
if instance != nil {
if b, err := json.Marshal(resource); err == nil {
json.Unmarshal(b, instance)
}
splittedComments := strings.Split(line, "'")
if len(splittedComments) > 1 {
comment := strings.ReplaceAll(splittedComments[1], "'", "")
if comment != "" {
json.Unmarshal(parseHumanFriendlyAttrs(comment), instance)
}
resource.AddInstances(instance)
@@ -420,7 +459,91 @@ func (d *Workflow) extractResourcePlantUML(line string, resource resources.Resou
d.Graph.Items[item.ID] = *item
}
return varName, item, nil
return varName, item, warns, nil
}
// resolveExistingResource tries to find an existing catalog resource matching
// the given name or the id embedded in the PlantUML comment.
// Returns (resource, warning message) or (nil, "") if none found.
func (d *Workflow) resolveExistingResource(
proto resources.ResourceInterface,
dataName, name, comment string,
request *tools.APIRequest,
) (resources.ResourceInterface, string) {
accessor := proto.GetAccessor(request)
// 1. Try lookup by id from comment ("id: <uuid>").
if comment != "" {
attrs := map[string]any{}
json.Unmarshal(parseHumanFriendlyAttrs(comment), &attrs)
if id, ok := attrs["id"].(string); ok && id != "" {
if dbObj, _, err := accessor.LoadOne(id); err == nil && dbObj != nil {
if ri, ok := dbObj.(resources.ResourceInterface); ok {
return ri, fmt.Sprintf(`[import warning] %s "%s": existing resource retrieved by id %s`, dataName, name, id)
}
}
}
}
// 2. Try search by exact name.
filter := &dbs.Filters{
Or: map[string][]dbs.Filter{
"abstractobject.name": {{Operator: dbs.EQUAL.String(), Value: name}},
},
}
if results, _, err := accessor.Search(filter, "", false, 0, 10); err == nil {
for _, r := range results {
if r.GetName() == name {
if dbObj, _, err2 := accessor.LoadOne(r.GetID()); err2 == nil && dbObj != nil {
if ri, ok := dbObj.(resources.ResourceInterface); ok {
return ri, fmt.Sprintf(`[import warning] %s "%s": existing resource found by name and retrieved instead of creating a new one`, dataName, name)
}
}
}
}
}
return nil, ""
}
// addExistingGraphItem registers an already-existing resource in the workflow's
// ID lists and graph, without adding it to the resource lists (so generateResource
// won't try to store it again).
func (d *Workflow) addExistingGraphItem(dataName string, resource resources.ResourceInterface) *graph.GraphItem {
graphItem := &graph.GraphItem{
ID: uuid.New().String(),
ItemResource: &resources.ItemResource{},
}
switch dataName {
case "Data":
d.Datas = append(d.Datas, resource.GetID())
if r, ok := resource.(*resources.DataResource); ok {
graphItem.Data = r
}
case "Processing":
d.Processings = append(d.Processings, resource.GetID())
if r, ok := resource.(*resources.ProcessingResource); ok {
graphItem.Processing = r
}
case "Service":
d.Services = append(d.Services, resource.GetID())
if r, ok := resource.(*resources.ServiceResource); ok {
graphItem.Service = r
}
case "Storage":
d.Storages = append(d.Storages, resource.GetID())
if r, ok := resource.(*resources.StorageResource); ok {
graphItem.Storage = r
}
case "ComputeUnit":
d.Computes = append(d.Computes, resource.GetID())
if r, ok := resource.(*resources.ComputeResource); ok {
graphItem.Compute = r
}
default:
return nil
}
return graphItem
}
func (d *Workflow) getNewGraphItem(dataName string, resource resources.ResourceInterface) *graph.GraphItem {
@@ -938,14 +1061,23 @@ const (
ViolationVariableNotFound ViolationType = "variable_not_found"
ViolationMissingComputeUnit ViolationType = "missing_compute_unit"
ViolationCycle ViolationType = "cycle"
ViolationMissingDataStorage ViolationType = "missing_data_storage"
ViolationMissingDataStorage ViolationType = "missing_data_storage"
ViolationRequiredOutputMissing ViolationType = "required_output_missing"
// Warnings — non-blocking, reported for UX
ViolationInvertedArrow ViolationType = "inverted_arrow"
ViolationIsolatedProcessing ViolationType = "isolated_processing"
ViolationStorageNotLinkedToProcessing ViolationType = "storage_not_linked_to_processing"
ViolationDynamicNotConfigured ViolationType = "dynamic_not_configured"
ViolationAnchorOnNonStorage ViolationType = "anchor_on_non_storage"
)
// ocAnchorAccess is the magic value used by oc-monitord to inject storage
// access credentials into a workflow step at runtime. It must only appear as
// an output of a storage (or dynamic-storage) element.
const ocAnchorAccess = "§oc:access§"
// IntegrityViolation describes a single structural or semantic problem
// found in the workflow graph.
type IntegrityViolation struct {
@@ -976,12 +1108,15 @@ func (v IntegrityViolation) IsWarning() bool { return v.Severity == SeverityWarn
func (w *Workflow) ValidateIntegrity() []IntegrityViolation {
var violations []IntegrityViolation
violations = append(violations, w.validateVariables()...)
violations = append(violations, w.validateRequiredInputs()...)
violations = append(violations, w.validateComputeLinks()...)
violations = append(violations, w.detectCycles()...)
violations = append(violations, w.validateDataStorageLinks()...)
violations = append(violations, w.detectInvertedArrows()...)
violations = append(violations, w.detectIsolatedProcessings()...)
violations = append(violations, w.detectOrphanedStorages()...)
violations = append(violations, w.validateDynamicFilters()...)
violations = append(violations, w.validateAnchorOutputs()...)
return violations
}
@@ -1059,14 +1194,12 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation {
var name string
switch {
case w.Graph.IsProcessing(item) && item.Processing != nil:
// IsService processings are long-running services and don't need a Compute booking.
if item.Processing.IsService {
continue
}
needsCompute = true
name = item.Processing.GetName()
case w.Graph.IsService(item) && item.Service != nil:
// HOSTED services use an existing endpoint — no Compute booking needed.
inst := item.Service.GetSelectedInstance(nil)
if inst != nil {
if si, ok := inst.(*resources.ServiceInstance); ok && si.Mode == resources.HOSTED {
@@ -1075,6 +1208,9 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation {
}
needsCompute = true
name = item.Service.GetName()
case w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.PROCESSING_RESOURCE:
needsCompute = true
name = w.itemName(id)
}
if !needsCompute {
continue
@@ -1089,7 +1225,12 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation {
} else {
continue
}
if other, ok := w.Graph.Items[otherID]; ok && w.Graph.IsCompute(other) {
other, ok := w.Graph.Items[otherID]
if !ok {
continue
}
// A concrete compute OR a dynamic node typed as compute satisfies the requirement.
if w.Graph.IsCompute(other) || (w.Graph.IsDynamic(other) && other.Dynamic.Type == tools.COMPUTE_RESOURCE) {
hasCompute = true
break
}
@@ -1180,12 +1321,41 @@ func (w *Workflow) detectCycles() []IntegrityViolation {
// validateDataStorageLinks checks that every Data item with a non-empty Source
// has at least one Storage linked — the builder needs this to inject the
// download step (curl or NATS/Minio protocol).
// isStorageEquivalent returns true for items that can satisfy a Data node's
// storage requirement: concrete storage, dynamic-storage, or a compute that
// has at least one embedded storage available on one of its instances.
func (w *Workflow) isStorageEquivalent(item graph.GraphItem) bool {
if w.Graph.IsStorage(item) {
return true
}
if w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE {
return true
}
if w.Graph.IsCompute(item) && item.Compute != nil {
for _, inst := range item.Compute.Instances {
if len(inst.AvailableStorages) > 0 {
return true
}
}
}
return false
}
func (w *Workflow) validateDataStorageLinks() []IntegrityViolation {
var violations []IntegrityViolation
dataStorageLinks := w.Graph.GetDataStorageLinks()
// Build the set of data item IDs that have a valid storage-equivalent link.
linkedStorage := map[string]struct{}{}
for _, dsl := range dataStorageLinks {
linkedStorage[dsl.DataItemID] = struct{}{}
for _, link := range w.Graph.Links {
srcItem, srcOk := w.Graph.Items[link.Source.ID]
dstItem, dstOk := w.Graph.Items[link.Destination.ID]
if !srcOk || !dstOk {
continue
}
if w.Graph.IsData(srcItem) && w.isStorageEquivalent(dstItem) {
linkedStorage[link.Source.ID] = struct{}{}
} else if w.isStorageEquivalent(srcItem) && w.Graph.IsData(dstItem) {
linkedStorage[link.Destination.ID] = struct{}{}
}
}
for id, item := range w.Graph.Items {
if !w.Graph.IsData(item) || item.Data == nil {
@@ -1213,6 +1383,97 @@ func (w *Workflow) validateDataStorageLinks() []IntegrityViolation {
return violations
}
// validateRequiredInputs checks that for each processing node with a required
// input, every immediate predecessor outputs a parameter with that name.
// Mirrors the requiredOutputMissing check in oc-front's checkTopology().
func (w *Workflow) validateRequiredInputs() []IntegrityViolation {
var violations []IntegrityViolation
procIDs := map[string]struct{}{}
for id, item := range w.Graph.Items {
if w.Graph.IsProcessing(item) || w.Graph.IsService(item) || w.Graph.IsNativeTool(item) {
procIDs[id] = struct{}{}
}
}
// Build direct predecessors map.
predecessors := map[string][]string{}
for id := range procIDs {
predecessors[id] = []string{}
}
for _, link := range w.Graph.Links {
src, dst := link.Source.ID, link.Destination.ID
_, srcIsProc := procIDs[src]
_, dstIsProc := procIDs[dst]
if !srcIsProc || !dstIsProc {
continue
}
dir := int64(0)
if link.Style != nil {
dir = link.Style.ArrowDirection
}
if dir == arrowDirectionBackward {
predecessors[src] = append(predecessors[src], dst)
} else {
predecessors[dst] = append(predecessors[dst], src)
}
}
for id, reqInputs := range w.Inputs {
if _, isProc := procIDs[id]; !isProc {
continue
}
for _, inp := range reqInputs {
if !inp.Required || inp.Name == "" {
continue
}
for _, predID := range predecessors[id] {
if !w.nodeHasOutput(predID, inp.Name) {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationRequiredOutputMissing,
ItemIDs: []string{id, predID},
Message: fmt.Sprintf(
`"%s" requires input "%s" but "%s" does not output it`,
w.itemName(id), inp.Name, w.itemName(predID),
),
})
}
}
}
}
return violations
}
// nodeHasOutput returns true if the given node outputs a parameter named name,
// either via workflow-level outputs or its resource's own outputs.
func (w *Workflow) nodeHasOutput(nodeID, name string) bool {
for _, p := range w.Outputs[nodeID] {
if p.Name == name {
return true
}
}
item, ok := w.Graph.Items[nodeID]
if !ok {
return false
}
var res resources.ResourceInterface
switch {
case item.Processing != nil:
res = item.Processing
case item.Service != nil:
res = item.Service
}
if res != nil {
for _, p := range res.GetOutputs() {
if p.Name == name {
return true
}
}
}
return false
}
// detectInvertedArrows warns when a link between two processing nodes uses a
// backward arrow direction — mirroring the invertedArrow warning in oc-front.
func (w *Workflow) detectInvertedArrows() []IntegrityViolation {
@@ -1280,12 +1541,162 @@ func (w *Workflow) detectIsolatedProcessings() []IntegrityViolation {
return violations
}
// ---------------------------------------------------------------------------
// AE validation helpers — centralised so both oc-scheduler and oc-schedulerd
// share the same logic without code duplication.
// ---------------------------------------------------------------------------
// BuildResourceIDSet constructs the per-type resource-ID map and the flat
// coupling-membership set used by ValidateWorkflowAE.
//
// selectedEmbeddedStorages and selectedInstances come from the scheduling
// request (WorkflowSchedule) or from the WorkflowExecution at launch time.
// Embedded storages are NOT stored in Workflow.Storages (they are inside
// ComputeResourceInstance.AvailableStorages), so they must be resolved here
// to make them visible to the AE coupling check.
func (w *Workflow) BuildResourceIDSet(
selectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection,
selectedInstances ConfigItem,
) (map[tools.DataType][]string, map[string]struct{}) {
resourcesByType := map[tools.DataType][]string{
tools.DATA_RESOURCE: w.Datas,
tools.PROCESSING_RESOURCE: w.Processings,
tools.STORAGE_RESOURCE: append([]string{}, w.Storages...),
tools.COMPUTE_RESOURCE: w.Computes,
tools.WORKFLOW_RESOURCE: w.Workflows,
tools.SERVICE_RESOURCE: w.Services,
}
idSet := map[string]struct{}{}
for _, ids := range resourcesByType {
for _, id := range ids {
idSet[id] = struct{}{}
}
}
for graphItemID, sel := range selectedEmbeddedStorages {
if sel == nil {
continue
}
c, ok := w.Graph.Items[graphItemID]
if !ok {
continue
}
_, computeRes := c.GetResource()
computeResource, ok := computeRes.(*resources.ComputeResource)
if !ok {
continue
}
computeIdx := 0
if d := selectedInstances.Get(computeResource.GetID()); d != nil {
computeIdx = *d
}
if computeIdx >= len(computeResource.Instances) {
continue
}
computeInst := computeResource.Instances[computeIdx]
if sel.StorageIndex >= len(computeInst.AvailableStorages) {
continue
}
storageID := computeInst.AvailableStorages[sel.StorageIndex].GetID()
if storageID == "" {
continue
}
idSet[storageID] = struct{}{}
resourcesByType[tools.STORAGE_RESOURCE] = append(resourcesByType[tools.STORAGE_RESOURCE], storageID)
}
return resourcesByType, idSet
}
// ValidateWorkflowAE checks the ExploitationAuthorizations of every resource
// referenced in resourcesByType against the coupling/peer/workflow constraints.
//
// loadResource is injected by the caller to avoid a circular import
// (oc-lib/models/resources → oclib → oc-lib/models → resources).
// A nil return from loadResource means "resource not found — skip".
func (w *Workflow) ValidateWorkflowAE(
workflowID, consumerPeerID string,
resourcesByType map[tools.DataType][]string,
idSet map[string]struct{},
loadResource func(tools.DataType, string) resources.ResourceInterface,
) []resources.AEViolation {
now := time.Now().UTC()
var violations []resources.AEViolation
for dt, ids := range resourcesByType {
for _, id := range ids {
res := loadResource(dt, id)
if res == nil {
continue
}
for _, ae := range res.GetExploitationAuthorizations() {
violations = append(violations, ae.CheckAE(id, workflowID, consumerPeerID, idSet, now)...)
}
}
}
return violations
}
// detectOrphanedStorages warns when a storage node is not linked to any
// processing node — it contributes no data flow to the workflow.
// validateDynamicFilters mirrors oc-front's dynamic-not-configured check:
// a dynamic node with no filters cannot be resolved by the scheduler.
func (w *Workflow) validateDynamicFilters() []IntegrityViolation {
var violations []IntegrityViolation
for id, item := range w.Graph.Items {
if !w.Graph.IsDynamic(item) {
continue
}
f := item.Dynamic.Filters
if len(f.And) == 0 && len(f.Or) == 0 {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationDynamicNotConfigured,
ItemIDs: []string{id},
Message: fmt.Sprintf(
`"%s" is a dynamic %s with no filters — at least one filter is required for the scheduler to resolve it`,
w.itemName(id), item.Dynamic.Type,
),
})
}
}
return violations
}
// validateAnchorOutputs mirrors oc-front's anchor-on-non-storage check:
// the §oc:access§ magic value must only appear as an output of a storage,
// dynamic-storage, or compute node that has an active embedded storage selected.
func (w *Workflow) validateAnchorOutputs() []IntegrityViolation {
var violations []IntegrityViolation
for id, item := range w.Graph.Items {
if w.Graph.IsStorage(item) || (w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE) {
continue
}
if w.Graph.IsCompute(item) {
if _, ok := w.SelectedEmbeddedStorages[id]; ok {
continue
}
}
for _, p := range w.Outputs[id] {
if p.Value == ocAnchorAccess {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationAnchorOnNonStorage,
ItemIDs: []string{id},
Message: fmt.Sprintf(
`"%s" has an access anchor (%s) as output — access anchors may only be outputs of storage elements`,
w.itemName(id), ocAnchorAccess,
),
})
break
}
}
}
return violations
}
func (w *Workflow) detectOrphanedStorages() []IntegrityViolation {
var violations []IntegrityViolation
for id, item := range w.Graph.Items {
if !w.Graph.IsStorage(item) {
// Check both concrete storage and dynamic-storage nodes.
if !w.Graph.IsStorage(item) && !(w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE) {
continue
}
linkedTopics := map[string]struct{}{}
@@ -1298,15 +1709,28 @@ func (w *Workflow) detectOrphanedStorages() []IntegrityViolation {
} else {
continue
}
if other, ok := w.Graph.Items[otherID]; ok {
switch {
case w.Graph.IsProcessing(other):
other, ok := w.Graph.Items[otherID]
if !ok {
continue
}
switch {
case w.Graph.IsProcessing(other):
linkedTopics["processing"] = struct{}{}
case w.Graph.IsCompute(other):
linkedTopics["compute"] = struct{}{}
case w.Graph.IsData(other):
linkedTopics["data"] = struct{}{}
case w.Graph.IsService(other):
linkedTopics["service"] = struct{}{}
case w.Graph.IsDynamic(other):
switch other.Dynamic.Type {
case tools.PROCESSING_RESOURCE:
linkedTopics["processing"] = struct{}{}
case w.Graph.IsCompute(other):
case tools.COMPUTE_RESOURCE:
linkedTopics["compute"] = struct{}{}
case w.Graph.IsData(other):
case tools.DATA_RESOURCE:
linkedTopics["data"] = struct{}{}
case w.Graph.IsService(other):
case tools.SERVICE_RESOURCE:
linkedTopics["service"] = struct{}{}
}
}
+2 -2
View File
@@ -160,7 +160,7 @@ func (a *workflowMongoAccessor) execute(workflow *Workflow, delete bool, active
if err == nil && len(resource) > 0 { // if the workspace already exists, update it
w := &workspace.Workspace{
Active: active,
ResourceSet: resources.ResourceSet{
WorkspaceResourceSet: resources.WorkspaceResourceSet{
Datas: workflow.Datas,
Processings: workflow.Processings,
Storages: workflow.Storages,
@@ -173,7 +173,7 @@ func (a *workflowMongoAccessor) execute(workflow *Workflow, delete bool, active
a.workspaceAccessor.StoreOne(&workspace.Workspace{
Active: active,
AbstractObject: utils.AbstractObject{Name: workflow.Name + "_workspace"},
ResourceSet: resources.ResourceSet{
WorkspaceResourceSet: resources.WorkspaceResourceSet{
Datas: workflow.Datas,
Processings: workflow.Processings,
Storages: workflow.Storages,
@@ -48,6 +48,10 @@ type WorkflowExecution struct {
BookingsState map[string]BookingState `json:"bookings_state" bson:"bookings_state,omitempty"` // booking_id → reservation+completion status
PurchasesState map[string]bool `json:"purchases_state" bson:"purchases_state,omitempty"` // purchase_id → confirmed
// ResourceConsents records which consent strings the user acknowledged per resource
// (resource_id → list of acknowledged ConsentString values) at scheduling time.
ResourceConsents map[string][]string `json:"resource_consents,omitempty" bson:"resource_consents,omitempty"`
// Graph is a lightweight, real-time summary of the workflow execution graph.
// Keyed by workflow graph item ID; updated by oc-scheduler on each step-done event.
// Consumed by oc-front to render the live execution panel via websocket updates.
@@ -97,7 +101,7 @@ func (r *WorkflowExecution) CanUpdate(set utils.DBObject) (bool, utils.DBObject)
}
func (r *WorkflowExecution) CanDelete() bool {
return r.IsDraft // only draft bookings can be deleted
return true // only draft bookings can be deleted
}
func (wfa *WorkflowExecution) Equals(we *WorkflowExecution) bool {
+176 -6
View File
@@ -1,23 +1,49 @@
package workspace
import (
"fmt"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
// trustedRelations holds peer relations that yield TrustMap = true for a resource.
var trustedRelations = map[peer.PeerRelation]bool{
peer.PARTNER: true,
peer.MASTER: true,
peer.NANO: true,
peer.ORGANIZATION_MASTER: true,
peer.ORGANIZATION_MEMBER: true,
peer.ORGANIZATION_PARTNER: true,
}
// Workspace is a struct that represents a workspace
type Workspace struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
resources.ResourceSet // ResourceSet contains the resources of the workspace (data, compute, processing, storage, workflow)
IsContextual bool `json:"is_contextual" bson:"is_contextual" default:"false"` // IsContextual is a flag that indicates if the workspace is contextual
Active bool `json:"active" bson:"active" default:"false"` // Active is a flag that indicates if the workspace is active
Shared string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workspace
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
resources.WorkspaceResourceSet // WorkspaceResourceSet persists both IDs and complete resource objects
IsContextual bool `json:"is_contextual" bson:"is_contextual" default:"false"`
Active bool `json:"active" bson:"active" default:"false"`
Shared string `json:"shared,omitempty" bson:"shared,omitempty"`
// Notifications accumulates strings for auto-modifications (e.g. resource removed after peer blacklist).
// Cleared by the owner via the notifications update endpoint.
Notifications []string `json:"notifications,omitempty" bson:"notifications,omitempty"`
// TrustMap maps resource ID → trust bool based on the creator peer's relation.
// Not persisted (bson:"-") — recomputed on every load by ComputeTrustAndClean.
TrustMap map[string]bool `json:"trust_map,omitempty" bson:"-"`
// StaleMap maps resource ID → stale bool. Populated at GET time from the
// verify campaign results stored in oc-workspace's stale cache. Not persisted.
StaleMap map[string]bool `json:"stale_map,omitempty" bson:"-"`
}
func (d *Workspace) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request) // Create a new instance of the accessor
return NewAccessor(request)
}
func (ao *Workspace) VerifyAuth(callName string, request *tools.APIRequest) bool {
@@ -30,3 +56,147 @@ func (ao *Workspace) VerifyAuth(callName string, request *tools.APIRequest) bool
}
return ao.AbstractObject.VerifyAuth(callName, request)
}
// ComputeTrustAndClean populates TrustMap for all resources embedded in this workspace,
// removes resources whose creator peer is blacklisted, and appends a deletion notification
// for each removal. Returns true when at least one resource was removed (caller should persist).
func (w *Workspace) ComputeTrustAndClean() bool {
w.TrustMap = map[string]bool{}
selfPeer, _ := utils.GetMySelf(peer.NewShallowAccessor())
var selfPeerID string
if selfPeer != nil {
if p, ok := selfPeer.(*peer.Peer); ok {
selfPeerID = p.PeerID
}
}
// Cache peer relations to avoid redundant DB lookups per workspace load.
cache := map[string]peer.PeerRelation{}
relation := func(creatorID string) peer.PeerRelation {
if r, ok := cache[creatorID]; ok {
return r
}
if creatorID == selfPeerID {
cache[creatorID] = peer.SELF
return peer.SELF
}
results, _, _ := peer.NewShallowAccessor().Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"peer_id": {{Operator: dbs.EQUAL.String(), Value: creatorID}},
},
}, "", false, 0, 1)
rel := peer.NONE
if len(results) > 0 {
if p, ok := results[0].(*peer.Peer); ok {
rel = p.Relation
}
}
cache[creatorID] = rel
return rel
}
setTrust := func(id, creatorID string, rel peer.PeerRelation) {
w.TrustMap[id] = (creatorID == selfPeerID) || trustedRelations[rel]
}
changed := false
var keptData []*resources.DataResource
for _, r := range w.DataResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptData = append(keptData, r)
}
}
w.DataResources = keptData
var keptCompute []*resources.ComputeResource
for _, r := range w.ComputeResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptCompute = append(keptCompute, r)
}
}
w.ComputeResources = keptCompute
var keptStorage []*resources.StorageResource
for _, r := range w.StorageResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptStorage = append(keptStorage, r)
}
}
w.StorageResources = keptStorage
var keptProcessing []*resources.ProcessingResource
for _, r := range w.ProcessingResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptProcessing = append(keptProcessing, r)
}
}
w.ProcessingResources = keptProcessing
var keptWorkflow []*resources.WorkflowResource
for _, r := range w.WorkflowResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptWorkflow = append(keptWorkflow, r)
}
}
w.WorkflowResources = keptWorkflow
var keptService []*resources.ServiceResource
for _, r := range w.ServiceResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptService = append(keptService, r)
}
}
w.ServiceResources = keptService
var keptDynamic []*resources.DynamicResource
for _, r := range w.DynamicResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptDynamic = append(keptDynamic, r)
}
}
w.DynamicResources = keptDynamic
var keptNative []*resources.NativeTool
for _, r := range w.NativeTools {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptNative = append(keptNative, r)
}
}
w.NativeTools = keptNative
return changed
}
+70 -6
View File
@@ -7,10 +7,60 @@ import (
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
func init() {
resources.WorkspaceCandidatesProvider = func(dt tools.DataType, request *tools.APIRequest) []resources.ResourceInterface {
res, _, _ := NewAccessor(request).Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"user_creator_id": {{Operator: dbs.EQUAL.String(), Value: request.Username}},
"active": {{Operator: dbs.EQUAL.String(), Value: true}},
},
}, "", false, 0, 1)
if len(res) == 0 {
return []resources.ResourceInterface{}
}
ws, ok := res[0].(*Workspace)
if !ok {
return []resources.ResourceInterface{}
}
// ws.Fill was already called by Search — typed slices are populated.
// Return an empty non-nil slice when the workspace exists but has no
// resources of the requested type: caller must not fall back to catalog.
out := []resources.ResourceInterface{}
switch dt {
case tools.COMPUTE_RESOURCE:
for _, c := range ws.ComputeResources {
out = append(out, c)
}
case tools.DATA_RESOURCE:
for _, c := range ws.DataResources {
out = append(out, c)
}
case tools.STORAGE_RESOURCE:
for _, c := range ws.StorageResources {
out = append(out, c)
}
case tools.PROCESSING_RESOURCE:
for _, c := range ws.ProcessingResources {
out = append(out, c)
}
case tools.WORKFLOW_RESOURCE:
for _, c := range ws.WorkflowResources {
out = append(out, c)
}
case tools.SERVICE_RESOURCE:
for _, c := range ws.ServiceResources {
out = append(out, c)
}
}
return out
}
}
// Workspace is a struct that represents a workspace
type workspaceMongoAccessor struct {
utils.AbstractAccessor[*Workspace] // AbstractAccessor contains the basic fields of an accessor (model, caller)
@@ -88,25 +138,39 @@ func (a *workspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject,
func (a *workspaceMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne(id, a.New(), func(d utils.DBObject) (utils.DBObject, int, error) {
d.(*Workspace).Fill(a.GetRequest())
return d, 200, nil
w := d.(*Workspace)
w.Fill(a.GetRequest())
a.applyTrustAndClean(w)
return w, 200, nil
}, a)
}
func (a *workspaceMongoAccessor) LoadAll(isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
return utils.GenericLoadAll[*Workspace](func(d utils.DBObject) utils.ShallowDBObject {
d.(*Workspace).Fill(a.GetRequest())
return d
w := d.(*Workspace)
w.Fill(a.GetRequest())
a.applyTrustAndClean(w)
return w
}, isDraft, a, offset, limit)
}
func (a *workspaceMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
return utils.GenericSearch[*Workspace](filters, search, (&Workspace{}).GetObjectFilters(search), func(d utils.DBObject) utils.ShallowDBObject {
d.(*Workspace).Fill(a.GetRequest())
return d
w := d.(*Workspace)
w.Fill(a.GetRequest())
a.applyTrustAndClean(w)
return w
}, isDraft, a, offset, limit)
}
// applyTrustAndClean calls ComputeTrustAndClean and, when resources were removed due to
// blacklisted peers, persists the cleaned workspace back to the database.
func (a *workspaceMongoAccessor) applyTrustAndClean(w *Workspace) {
if changed := w.ComputeTrustAndClean(); changed {
utils.GenericUpdateOne(w.Serialize(w), w.GetID(), a)
}
}
/*
This function is used to share the workspace with the peers
*/
+52 -3
View File
@@ -40,6 +40,7 @@ const (
REFUND
DISCOUNT
SUBSCRIPTION
POLICY
)
var NOAPI = func() string {
@@ -104,6 +105,7 @@ var InnerDefaultAPI = [...]func() string{
NOAPI,
NOAPI,
NOAPI,
PEERSAPI,
}
// Bind the standard data name to the data type
@@ -138,6 +140,7 @@ var Str = [...]string{
"refund",
"discount",
"subscription",
"policy",
}
func FromString(comp string) int {
@@ -174,7 +177,7 @@ func DataTypeList() []DataType {
WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY,
ORDER, PURCHASE_RESOURCE,
LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL, EXECUTION_VERIFICATION, ALLOWED_IMAGE, SERVICE_RESOURCE, DYNAMIC_RESOURCE, LIVE_SERVICE,
PAYMENT, REFUND, DISCOUNT, SUBSCRIPTION}
PAYMENT, REFUND, DISCOUNT, SUBSCRIPTION, POLICY}
}
type PropalgationMessage struct {
@@ -208,6 +211,40 @@ const (
// for a private source resource (isReachable=false, Phase 4).
// oc-discovery routes it to the resource owner peer via ProtocolSourcePresignResource.
PB_SOURCE_PRESIGN
// PB_ORG_PARTNER is propagated via PB_PROPAGATE through oc-discovery to the
// organization master's oc-discovery, which notifies its oc-peer via
// ORG_PARTNER_EVENT. The master's oc-peer confirms or rejects by emitting a
// PROPALGATION_EVENT back, which oc-discovery routes to the originating
// oc-discovery, which in turn notifies our oc-peer via ORG_PARTNER_EVENT to
// finalize the relation.
PB_ORG_PARTNER
// PB_WATCH_RESOURCE is emitted by oc-workspace when a non-self resource is
// stored in a workspace. oc-discovery contacts the creator peer to register
// the watching peerID in the creator's watcher cache so it receives future
// CREATE/DELETE events for that resource.
// Payload: { "creator_peer_id": "...", "resource_id": "..." }
PB_WATCH_RESOURCE
// PB_UNWATCH_RESOURCE is emitted by oc-workspace when a non-self resource is
// removed from all workspaces. oc-discovery contacts the creator peer to
// deregister the watching peerID from the creator's watcher cache.
// Payload: { "creator_peer_id": "...", "resource_id": "..." }
PB_UNWATCH_RESOURCE
// PB_BOOKING_SYNC is emitted by master every 24 h to each known NANO.
// Payload: {"peer_id": nano.PeerID, "booking_sync_ids": ["id1", "id2", ...]}
// Nano compares the list against its own confirmed bookings and calls
// SendBookingToMaster for any it has that master is missing.
PB_BOOKING_SYNC
// PB_VERIFY_RESOURCE is emitted by oc-workspace or oc-workflow on workspace
// activation / workflow opening to verify that an embedded non-self resource
// is still current. oc-discovery forwards the request to the creator peer via
// ProtocolVerifyResource; the result comes back as a VERIFY_RESOURCE NATS event.
// Payload: { "creator_peer_id": "…", "data_type": N, "resource_payload": {…} }
PB_VERIFY_RESOURCE
)
func GetActionString(ss string) PubSubAction {
@@ -242,6 +279,14 @@ func GetActionString(ss string) PubSubAction {
return PB_PROPAGATE
case "source_presign":
return PB_SOURCE_PRESIGN
case "org_partner":
return PB_ORG_PARTNER
case "watch_resource":
return PB_WATCH_RESOURCE
case "unwatch_resource":
return PB_UNWATCH_RESOURCE
case "booking_sync":
return PB_BOOKING_SYNC
default:
return NONE
}
@@ -264,8 +309,12 @@ var path = []string{
"none", // 12 NONE
"observe", // 13 PB_OBSERVE
"observe_close", // 14 PB_OBSERVE_CLOSE
"propagate", // 15 PB_PROPAGATE
"source_presign", // 16 PB_SOURCE_PRESIGN
"propagate", // 15 PB_PROPAGATE
"source_presign", // 16 PB_SOURCE_PRESIGN
"org_partner", // 17 PB_ORG_PARTNER
"watch_resource", // 18 PB_WATCH_RESOURCE
"unwatch_resource", // 19 PB_UNWATCH_RESOURCE
"booking_sync", // 20 PB_BOOKING_SYNC
}
func (m PubSubAction) String() string {
+11 -2
View File
@@ -32,7 +32,7 @@ var meths = []string{"remove execution", "create execution", "planner execution"
"considers event", "admiralty config event", "minio config event", "pvc config event",
"workflow started event", "workflow step done event", "workflow done event",
"peer behavior event", "peer observe response event", "peer observe event",
"source presign event",
"source presign event", "org partner event", "verify resource event",
}
const (
@@ -85,6 +85,15 @@ const (
// oc-datacenter listens to it to generate a pre-signed Minio URL and reply
// via PB_CONSIDERS (Phase 4 — isReachable=false).
SOURCE_PRESIGN_EVENT
// ORG_PARTNER_EVENT is emitted by a peer to its OrganizationMaster to ask:
// "is peer X one of your members?". The master replies via the same channel.
ORG_PARTNER_EVENT
// VERIFY_RESOURCE is emitted by oc-discovery when it receives a verify response
// from a remote peer via ProtocolVerifyResource. oc-workspace and oc-workflow
// listen to this event to update their stale caches.
VERIFY_RESOURCE
)
func (n NATSMethod) String() string {
@@ -98,7 +107,7 @@ func NameToMethod(name string) NATSMethod {
CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT, PVC_CONFIG_EVENT,
WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT,
PEER_BEHAVIOR_EVENT, PEER_OBSERVE_RESPONSE_EVENT, PEER_OBSERVE_EVENT,
SOURCE_PRESIGN_EVENT} {
SOURCE_PRESIGN_EVENT, ORG_PARTNER_EVENT, VERIFY_RESOURCE} {
if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) {
return v
}