20 Commits

Author SHA1 Message Date
mr 3a66b42c01 test 2026-06-23 09:40:33 +02:00
mr 58e97fbe74 lib 2026-06-22 07:50:01 +02:00
mr 1425a31494 Orga + Consent 2026-06-05 15:56:50 +02:00
mr 6ee169f444 can delete 2026-06-05 09:59:14 +02:00
mr 5be3c0a10a wf 2026-06-04 13:36:55 +02:00
mr d9723e6431 oc lib test 2026-06-04 12:04:04 +02:00
mr c726361deb Get Exploit 2026-06-04 11:31:03 +02:00
mr d19ff1f8b2 CreatorID 2026-06-04 09:00:31 +02:00
mr 69244163b4 workflow bson missing 2026-06-03 15:33:39 +02:00
mr 842364d145 deep merge 2026-06-03 11:23:41 +02:00
mr 9ab374b720 oc nano 2026-06-03 08:03:11 +02:00
mr aa2bca48ef test 2026-06-02 16:43:55 +02:00
mr 322ea38bb4 nil map 2026-06-02 15:57:33 +02:00
mr c1490a7746 proper 2026-06-02 15:49:03 +02:00
mr 49f60d9416 test 2026-06-02 15:34:54 +02:00
mr 548ed84b13 prospect 2026-06-02 15:11:58 +02:00
mr 178cd48314 prospect 2026-06-02 14:45:09 +02:00
mr b31df8cfed adjust allowed instances for type behaviors 2026-06-02 14:16:32 +02:00
mr a0a53f0477 Panic recovered FiltersFromFlatMap 2026-06-02 14:04:33 +02:00
mr dffaa6326f still prospecting 2026-06-02 14:03:30 +02:00
23 changed files with 1278 additions and 81 deletions
+8 -3
View File
@@ -161,6 +161,11 @@ type Filter struct {
// Keys inside "and"/"or" are json tag names; the function resolves each to its
// full dotted BSON path using the target struct. Unknown keys are kept as-is.
func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *Filters {
defer func() {
if r := recover(); r != nil {
fmt.Printf("Panic recovered FiltersFromFlatMap: %v\n", r)
}
}()
filters := &Filters{
And: make(map[string][]Filter),
Or: make(map[string][]Filter),
@@ -180,12 +185,12 @@ func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *Fil
}
for jsonKey, val := range m {
bsonKey := resolve(jsonKey)
items, ok := val.([]interface{})
fmt.Println(jsonKey, val, ok, bsonKey)
//items, ok := val.([]interface{})
fmt.Println(jsonKey, val, bsonKey)
if !ok {
continue
}
for _, item := range items {
for _, item := range val.([]interface{}) {
entry, ok := item.(map[string]interface{})
if !ok {
continue
+1
View File
@@ -77,6 +77,7 @@ const (
REFUND = tools.REFUND
DISCOUNT = tools.DISCOUNT
SUBSCRIPTION = tools.SUBSCRIPTION
POLICY = tools.POLICY
)
func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *dbs.Filters {
+2 -3
View File
@@ -10,14 +10,13 @@ import (
"cloud.o-forge.io/core/oc-lib/tools"
)
/*
* Booking is a struct that represents a booking
*/
type Booking struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
FromNano string `json:"from_nano,omitempty" bson:"priced_item,omitempty"`
FromNano string `json:"from_nano,omitempty" bson:"from_nano,omitempty"`
PricedItem map[string]interface{} `json:"priced_item,omitempty" bson:"priced_item,omitempty"` // We need to add the validate:"required" tag once the pricing feature is implemented, removed to avoid handling the error
ResumeMetrics map[string]map[string]models.MetricResume `json:"resume_metrics,omitempty" bson:"resume_metrics,omitempty"`
@@ -147,5 +146,5 @@ func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
}
func (r *Booking) CanDelete() bool {
return r.IsDraft // only draft bookings can be deleted
return true // only draft bookings can be deleted
}
+2
View File
@@ -11,6 +11,7 @@ import (
"cloud.o-forge.io/core/oc-lib/models/execution_verification"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/peer/policy"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
@@ -58,6 +59,7 @@ var ModelsCatalog = map[string]func() utils.DBObject{
tools.SUBSCRIPTION.String(): func() utils.DBObject { return &subscription.Subscription{} },
tools.EXECUTION_VERIFICATION.String(): func() utils.DBObject { return &execution_verification.ExecutionVerification{} },
tools.ALLOWED_IMAGE.String(): func() utils.DBObject { return &allowed_image.AllowedImage{} },
tools.POLICY.String(): func() utils.DBObject { return &policy.Policy{} },
}
// Model returns the model object based on the model type
+11
View File
@@ -0,0 +1,11 @@
package organization
// Organization holds descriptive data about a peer's organization.
// It is optional — a peer without an organization has a nil Organization field.
type Organization struct {
Name string `json:"name,omitempty" bson:"name,omitempty"`
Description string `json:"description,omitempty" bson:"description,omitempty"`
Website string `json:"website,omitempty" bson:"website,omitempty"`
Sector string `json:"sector,omitempty" bson:"sector,omitempty"`
Country string `json:"country,omitempty" bson:"country,omitempty"`
}
+31 -1
View File
@@ -5,6 +5,7 @@ import (
"strings"
"time"
"cloud.o-forge.io/core/oc-lib/models/organization"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/biter777/countries"
@@ -30,9 +31,19 @@ const (
NANO
PENDING_NANO
PENDING_MASTER
ORGANIZATION_MASTER
ORGANIZATION_MEMBER
ORGANIZATION_PARTNER
ORGANIZATION_MASTER_PENDING
ORGANIZATION_MEMBER_PENDING
)
var path = []string{"known", "self", "partner", "blacklist", "pending_partner", "master", "nano", "pending_nano", "pending_master"}
var path = []string{
"known", "self", "partner", "blacklist", "pending_partner",
"master", "nano", "pending_nano", "pending_master",
"organization_master", "organization_member", "organization_partner",
"organization_master_pending", "organization_member_pending",
}
func GetRelationPath(str string) int {
for i, p := range path {
@@ -121,6 +132,20 @@ type Peer struct {
// When oc-discovery fails to reach a NANO, it routes the booking to MasterID instead.
MasterID string `json:"master_id,omitempty" bson:"master_id,omitempty"`
// OrganizationMasterID is the MongoDB _id of the peer acting as this node's
// organization master. Set automatically when an ORGANIZATION_MASTER relation
// is validated (equivalent of MasterID for the Nano/Master hierarchy).
OrganizationMasterID string `json:"organization_master_id,omitempty" bson:"organization_master_id,omitempty"`
// Organization holds optional descriptive data about the peer's organization.
// Null when the peer has not registered any organization data.
Organization *organization.Organization `json:"organization,omitempty" bson:"organization,omitempty"`
// PolicyID references the Policy document that governs which inbound
// libp2p streams are authorized for this peer.
// When empty, all non-vital streams are denied by default.
PolicyID string `json:"policy_id,omitempty" bson:"policy_id,omitempty"`
// Volatile connectivity state — never persisted to DB (bson:"-").
// Set in-memory by oc-peer when it receives a PEER_OBSERVE_RESPONSE_EVENT.
// Considered offline when LastHeartbeat is older than 60 s (30 s interval + 30 s grace).
@@ -137,6 +162,11 @@ func (ri *Peer) Extend(typ ...string) map[string][]tools.DataType {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.PEER)
case "policy":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.POLICY)
}
}
return ext
+30
View File
@@ -0,0 +1,30 @@
package policy
import (
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
// Policy defines which inbound libp2p streams are authorized for a peer.
// Vital streams (planner, considers, minio/admiralty config, source-presign,
// verify, observe, heartbeat) are always allowed regardless of policy.
type Policy struct {
utils.AbstractObject
// Resource CRUD
AllowSearch bool `json:"allow_search" bson:"allow_search"`
AllowCreate bool `json:"allow_create" bson:"allow_create"`
AllowUpdate bool `json:"allow_update" bson:"allow_update"`
AllowDelete bool `json:"allow_delete" bson:"allow_delete"`
// Resource freshness tracking
AllowRegisterWatcher bool `json:"allow_register_watcher" bson:"allow_register_watcher"`
AllowUnregisterWatcher bool `json:"allow_unregister_watcher" bson:"allow_unregister_watcher"`
// Organization partner confirmation
AllowOrgPartnerConfirm bool `json:"allow_org_partner_confirm" bson:"allow_org_partner_confirm"`
}
func (p *Policy) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request)
}
@@ -0,0 +1,31 @@
package policy
import (
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
type policyMongoAccessor struct {
utils.AbstractAccessor[*Policy]
}
func NewAccessor(request *tools.APIRequest) *policyMongoAccessor {
return &policyMongoAccessor{
AbstractAccessor: utils.AbstractAccessor[*Policy]{
Logger: logs.CreateLogger(tools.POLICY.String()),
Request: request,
Type: tools.POLICY,
New: func() *Policy { return &Policy{} },
},
}
}
func (a *policyMongoAccessor) GetObjectFilters(search string) *dbs.Filters {
return &dbs.Filters{
Or: map[string][]dbs.Filter{
"abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
},
}
}
+14
View File
@@ -0,0 +1,14 @@
package resources
// Consent represents a consent request attached to a resource.
// ConsentString is the question displayed to the user.
// Optional, when true, means the user may decline without blocking scheduling.
// A nil Optional is treated as required (false).
type Consent struct {
ConsentString string `json:"consent_string" bson:"consent_string"`
Optional *bool `json:"optional,omitempty" bson:"optional,omitempty"`
}
func (c Consent) IsOptional() bool {
return c.Optional != nil && *c.Optional
}
+240 -34
View File
@@ -20,9 +20,9 @@ import (
*/
type DynamicResource struct {
AbstractResource
Type tools.DataType `bson:"type,omitempty" json:"type,omitempty"`
Filters map[string]interface{} `bson:"filters,omitempty" json:"filters,omitempty"`
SortRules map[string]string `bson:"rules,omitempty" json:"rules,omitempty"`
Type tools.DataType `bson:"type,omitempty" json:"type,omitempty"`
Filters dbs.Filters `bson:"filters,omitempty" json:"filters,omitempty"`
SortRules map[string]string `bson:"rules,omitempty" json:"rules,omitempty"`
PeerIds map[int]string `bson:"peer_ids,omitempty" json:"peer_ids,omitempty"`
ResourceIds map[int]string `bson:"resource_ids,omitempty" json:"resource_ids,omitempty"`
@@ -37,45 +37,249 @@ type DynamicResource struct {
WatchedDynamicResource []string `bson:"watched_dynamic_resource,omitempty" json:"watched_dynamic_resource,omitempty"`
}
// WorkspaceCandidatesProvider can be set by the workspace package to supply
// contextual workspace resources for a given DataType and request without
// creating a circular import (workspace → resources → workspace).
// When set, SetAllowedInstances uses workspace-scoped resources instead of
// the full catalog for requests that carry a username.
var WorkspaceCandidatesProvider func(dt tools.DataType, request *tools.APIRequest) []ResourceInterface
func (d *DynamicResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
return nil
}
func (d *DynamicResource) SetAllowedInstances(request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF {
d.Instances = []ResourceInstanceITF{}
for k, v := range map[tools.DataType]ResourceInterface{
tools.COMPUTE_RESOURCE: &ComputeResource{},
tools.DATA_RESOURCE: &DataResource{},
tools.STORAGE_RESOURCE: &StorageResource{},
tools.PROCESSING_RESOURCE: &ProcessingResource{},
tools.WORKFLOW_RESOURCE: &WorkflowResource{}} {
if d.Type != k {
continue
}
access := NewAccessor[*DynamicResource](k, request)
a, _, _ := access.Search(dbs.FiltersFromFlatMap(d.Filters, v), "", false, 0, 100000)
fmt.Println(a, dbs.FiltersFromFlatMap(d.Filters, v), d.Filters, v)
d.PeerIds = map[int]string{}
d.ResourceIds = map[int]string{}
for _, res := range a {
for _, i := range res.(ResourceInterface).SetAllowedInstances(request, instance_id...) {
d.PeerIds[len(d.Instances)] = res.GetCreatorID()
d.ResourceIds[len(d.Instances)] = res.GetID()
d.Instances = append(d.Instances, i)
}
}
break
if WorkspaceCandidatesProvider != nil {
candidates := WorkspaceCandidatesProvider(d.Type, request)
return d.SetAllowedInstancesFromSet(candidates, request, instance_id...)
}
sorted := make([]ResourceInstanceITF, len(d.Instances))
copy(sorted, d.Instances)
slices.SortStableFunc(sorted, func(a, b ResourceInstanceITF) int {
d.SortRules["partnerships"] = "%v not contains 2"
return d.compareByRules(a, b, d.SortRules)
})
d.WatchedDynamicResource = []string{}
d.Instances = []ResourceInstanceITF{}
d.sortAndResetInstances()
return d.Instances
}
// SetAllowedInstancesFromSet fills d.Instances from a pre-loaded workspace resource set
// instead of querying the catalog. Filters are applied in-memory against the candidates.
// Called by WorkspaceResourceSet.Fill so dynamic resources only see workspace-scoped resources.
func (d *DynamicResource) SetAllowedInstancesFromSet(candidates []ResourceInterface, request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF {
d.Instances = []ResourceInstanceITF{}
d.PeerIds = map[int]string{}
d.ResourceIds = map[int]string{}
for _, res := range candidates {
if !d.matchesFilters(res) {
continue
}
for _, i := range res.SetAllowedInstances(request, instance_id...) {
d.PeerIds[len(d.Instances)] = res.GetCreatorID()
d.ResourceIds[len(d.Instances)] = res.GetID()
d.Instances = append(d.Instances, i)
}
}
d.sortAndResetInstances()
return d.Instances
}
func (d *DynamicResource) sortAndResetInstances() {
if d.SortRules != nil {
sorted := make([]ResourceInstanceITF, len(d.Instances))
copy(sorted, d.Instances)
slices.SortStableFunc(sorted, func(a, b ResourceInstanceITF) int {
d.SortRules["partnerships"] = "%v not contains 2"
return d.compareByRules(a, b, d.SortRules)
})
d.Instances = sorted
}
d.WatchedDynamicResource = []string{}
}
// matchesFilters applies d.Filters in-memory against a serialized resource.
// Keys in d.Filters are JSON tag names; Serialize returns JSON tag names — no bson conversion needed.
func (d *DynamicResource) matchesFilters(res ResourceInterface) bool {
if len(d.Filters.And) == 0 && len(d.Filters.Or) == 0 {
return true
}
m := res.Serialize(res)
for field, fs := range d.Filters.And {
vals := nestedVals(m, strings.Split(field, "."))
for _, f := range fs {
if !anyMatchesOp(vals, f) {
return false
}
}
}
if len(d.Filters.Or) > 0 {
matched := false
for field, fs := range d.Filters.Or {
vals := nestedVals(m, strings.Split(field, "."))
for _, f := range fs {
if anyMatchesOp(vals, f) {
matched = true
break
}
}
if matched {
break
}
}
if !matched {
return false
}
}
return true
}
// nestedVals navigates a dot-path into m and collects all leaf values.
// Arrays at any level are expanded: each element is recursed into.
func nestedVals(m map[string]interface{}, path []string) []interface{} {
if len(path) == 0 || m == nil {
return nil
}
val, ok := m[path[0]]
if !ok {
return nil
}
if len(path) == 1 {
if arr, ok := val.([]interface{}); ok {
return arr
}
return []interface{}{val}
}
rest := path[1:]
switch v := val.(type) {
case map[string]interface{}:
return nestedVals(v, rest)
case []interface{}:
var out []interface{}
for _, elem := range v {
if em, ok := elem.(map[string]interface{}); ok {
out = append(out, nestedVals(em, rest)...)
}
}
return out
}
return nil
}
// anyMatchesOp returns true if at least one value in vals satisfies filter f.
func anyMatchesOp(vals []interface{}, f dbs.Filter) bool {
if f.Operator == dbs.EXISTS.String() {
exists := len(vals) > 0 && vals[0] != nil
want := true
if b, ok := f.Value.(bool); ok {
want = b
}
return exists == want
}
if f.Operator == dbs.IN.String() {
list, ok := f.Value.([]interface{})
if !ok {
return false
}
for _, v := range vals {
sv := fmt.Sprintf("%v", v)
for _, item := range list {
if sv == fmt.Sprintf("%v", item) {
return true
}
}
}
return false
}
for _, v := range vals {
if opMatches(v, f) {
return true
}
}
return false
}
func opMatches(val interface{}, f dbs.Filter) bool {
switch f.Operator {
case dbs.EQUAL.String():
return fmt.Sprintf("%v", val) == fmt.Sprintf("%v", f.Value)
case dbs.NOT.String():
return fmt.Sprintf("%v", val) != fmt.Sprintf("%v", f.Value)
case dbs.LIKE.String():
return strings.Contains(strings.ToLower(fmt.Sprintf("%v", val)), strings.ToLower(fmt.Sprintf("%v", f.Value)))
case dbs.GT.String(), dbs.GTE.String(), dbs.LT.String(), dbs.LTE.String():
return numericCmp(val, f.Value, f.Operator)
case dbs.ELEMMATCH.String():
arr, ok := val.([]interface{})
if !ok {
return false
}
sub, ok := f.Value.(map[string]interface{})
if !ok {
return false
}
for _, elem := range arr {
em, ok := elem.(map[string]interface{})
if !ok {
continue
}
allOk := true
for k, sv := range sub {
if fmt.Sprintf("%v", em[k]) != fmt.Sprintf("%v", sv) {
allOk = false
break
}
}
if allOk {
return true
}
}
return false
}
return false
}
func numericCmp(a, b interface{}, op string) bool {
fa, aOk := toFloat64(a)
fb, bOk := toFloat64(b)
if !aOk || !bOk {
sa, sb := fmt.Sprintf("%v", a), fmt.Sprintf("%v", b)
switch op {
case dbs.GT.String():
return sa > sb
case dbs.GTE.String():
return sa >= sb
case dbs.LT.String():
return sa < sb
case dbs.LTE.String():
return sa <= sb
}
return false
}
switch op {
case dbs.GT.String():
return fa > fb
case dbs.GTE.String():
return fa >= fb
case dbs.LT.String():
return fa < fb
case dbs.LTE.String():
return fa <= fb
}
return false
}
func toFloat64(v interface{}) (float64, bool) {
switch n := v.(type) {
case float64:
return n, true
case float32:
return float64(n), true
case int:
return float64(n), true
case int32:
return float64(n), true
case int64:
return float64(n), true
}
return 0, false
}
func (d *DynamicResource) AddInstances(instance ResourceInstanceITF) {
d.Instances = append(d.Instances, instance)
}
@@ -92,13 +296,15 @@ func (d *DynamicResource) GetSelectedInstance(index *int) ResourceInstanceITF {
d.SelectedIndex = i
for i := range inst.GetPartnerships() {
fmt.Println(inst.GetProfile(d.PeerIds[i], &i, &d.SelectedBuyingStrategy, &d.SelectedPricingStrategy), d.PeerIds[i], &i, &d.SelectedBuyingStrategy, &d.SelectedPricingStrategy)
if inst.GetProfile(d.PeerIds[i], &i, &d.SelectedBuyingStrategy, &d.SelectedPricingStrategy) != nil {
d.SelectedPartnershipIndex = &i
break
}
}
if d.SelectedPartnershipIndex == nil {
continue
i := 0
d.SelectedPartnershipIndex = &i
}
return inst
}
+2
View File
@@ -29,6 +29,8 @@ type ResourceInterface interface {
GetEnv() []models.Param
GetInputs() []models.Param
GetOutputs() []models.Param
GetExploitationAuthorizations() []ExploitationAuthorization
GetConsents() []Consent
}
type ResourceInstanceITF interface {
+125 -1
View File
@@ -15,7 +15,8 @@ type ResourceSet struct {
Services []string `bson:"services,omitempty" json:"services,omitempty"`
Dynamics []string `bson:"dynamics,omitempty" json:"dynamics,omitempty"`
// DynamicResources are stored inline — no DB collection, resolved at runtime via SetAllowedInstances.
// Runtime-only resource objects — not persisted. Populated by Fill() from the ID lists above.
// Use WorkspaceResourceSet when full object persistence is needed (workspace fluid catalog).
DynamicResources []*DynamicResource `bson:"-" json:"dynamic_resources,omitempty"`
DataResources []*DataResource `bson:"-" json:"data_resources,omitempty"`
StorageResources []*StorageResource `bson:"-" json:"storage_resources,omitempty"`
@@ -26,6 +27,129 @@ type ResourceSet struct {
ServiceResources []*ServiceResource `bson:"-" json:"service_resources,omitempty"`
}
// WorkspaceResourceSet mirrors ResourceSet but persists complete resource objects to MongoDB.
// Use this in workspace documents where the workspace acts as a fluid resource catalog.
// The *Resource fields are loaded from bson on read; Fill() skips catalog lookup when they are
// already populated.
type WorkspaceResourceSet struct {
Datas []string `bson:"datas,omitempty" json:"datas,omitempty"`
Storages []string `bson:"storages,omitempty" json:"storages,omitempty"`
Processings []string `bson:"processings,omitempty" json:"processings,omitempty"`
Computes []string `bson:"computes,omitempty" json:"computes,omitempty"`
Workflows []string `bson:"workflows,omitempty" json:"workflows,omitempty"`
NativeTool []string `bson:"native,omitempty" json:"native,omitempty"`
Services []string `bson:"services,omitempty" json:"services,omitempty"`
Dynamics []string `bson:"dynamics,omitempty" json:"dynamics,omitempty"`
DynamicResources []*DynamicResource `bson:"dynamic_resources,omitempty" json:"dynamic_resources,omitempty"`
DataResources []*DataResource `bson:"data_resources,omitempty" json:"data_resources,omitempty"`
StorageResources []*StorageResource `bson:"storage_resources,omitempty" json:"storage_resources,omitempty"`
ProcessingResources []*ProcessingResource `bson:"processing_resources,omitempty" json:"processing_resources,omitempty"`
ComputeResources []*ComputeResource `bson:"compute_resources,omitempty" json:"compute_resources,omitempty"`
WorkflowResources []*WorkflowResource `bson:"workflow_resources,omitempty" json:"workflow_resources,omitempty"`
NativeTools []*NativeTool `bson:"native_tools,omitempty" json:"native_tools,omitempty"`
ServiceResources []*ServiceResource `bson:"service_resources,omitempty" json:"service_resources,omitempty"`
}
func (r *WorkspaceResourceSet) Clear() {
r.DataResources = nil
r.StorageResources = nil
r.ProcessingResources = nil
r.ComputeResources = nil
r.WorkflowResources = nil
r.ServiceResources = nil
r.DynamicResources = nil
r.NativeTools = nil
}
// Fill populates *Resource fields from their ID lists. When a field is already non-nil
// (loaded from the workspace MongoDB document), the catalog lookup is skipped for that type.
func (r *WorkspaceResourceSet) Fill(request *tools.APIRequest) {
if r.DataResources == nil {
for _, id := range r.Datas {
if d, _, e := (&DataResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.DataResources = append(r.DataResources, d.(*DataResource))
}
}
}
if r.ComputeResources == nil {
for _, id := range r.Computes {
if d, _, e := (&ComputeResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.ComputeResources = append(r.ComputeResources, d.(*ComputeResource))
}
}
}
if r.StorageResources == nil {
for _, id := range r.Storages {
if d, _, e := (&StorageResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.StorageResources = append(r.StorageResources, d.(*StorageResource))
}
}
}
if r.ProcessingResources == nil {
for _, id := range r.Processings {
if d, _, e := (&ProcessingResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.ProcessingResources = append(r.ProcessingResources, d.(*ProcessingResource))
}
}
}
if r.WorkflowResources == nil {
for _, id := range r.Workflows {
if d, _, e := (&WorkflowResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.WorkflowResources = append(r.WorkflowResources, d.(*WorkflowResource))
}
}
}
if r.ServiceResources == nil {
for _, id := range r.Services {
if d, _, e := (&ServiceResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.ServiceResources = append(r.ServiceResources, d.(*ServiceResource))
}
}
}
if r.DynamicResources == nil {
for _, id := range r.Dynamics {
if d, _, e := (&DynamicResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.DynamicResources = append(r.DynamicResources, d.(*DynamicResource))
}
}
}
for _, d := range r.DynamicResources {
var candidates []ResourceInterface
switch d.Type {
case tools.COMPUTE_RESOURCE:
for _, c := range r.ComputeResources {
candidates = append(candidates, c)
}
case tools.DATA_RESOURCE:
for _, c := range r.DataResources {
candidates = append(candidates, c)
}
case tools.STORAGE_RESOURCE:
for _, c := range r.StorageResources {
candidates = append(candidates, c)
}
case tools.PROCESSING_RESOURCE:
for _, c := range r.ProcessingResources {
candidates = append(candidates, c)
}
case tools.WORKFLOW_RESOURCE:
for _, c := range r.WorkflowResources {
candidates = append(candidates, c)
}
case tools.SERVICE_RESOURCE:
for _, c := range r.ServiceResources {
candidates = append(candidates, c)
}
}
if len(candidates) > 0 {
d.SetAllowedInstancesFromSet(candidates, request)
} else {
d.SetAllowedInstances(request)
}
}
}
func (r *ResourceSet) Clear() {
r.DataResources = nil
r.StorageResources = nil
+9
View File
@@ -50,6 +50,10 @@ type AbstractResource struct {
// NOT in a separate collection.
// Visibility-filtered per requesting peer before any response is sent.
ExploitationAuthorizations []ExploitationAuthorization `json:"exploitation_authorizations,omitempty" bson:"exploitation_authorizations,omitempty"`
// Consents lists the consent questions the user must acknowledge before
// scheduling this resource. Consents with Optional=true may be skipped.
Consents []Consent `json:"consents,omitempty" bson:"consents,omitempty"`
}
func (ri *AbstractResource) Extend(typ ...string) map[string][]tools.DataType {
@@ -100,6 +104,11 @@ func (r *AbstractResource) GetExploitationAuthorizations() []ExploitationAuthori
return r.ExploitationAuthorizations
}
// GetConsents returns the consent questions declared by this resource.
func (r *AbstractResource) GetConsents() []Consent {
return r.Consents
}
// FilterExploitationAuthorizations removes AEs that are not visible to peerID.
// Must be called before serializing the resource for a consumer peer.
// The resource owner (CreatorID) always sees all AEs unfiltered.
+146
View File
@@ -1,10 +1,12 @@
package resources
import (
"encoding/json"
"errors"
"fmt"
"slices"
"cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/common/models"
@@ -110,6 +112,40 @@ func (dca *ResourceMongoAccessor[T]) LoadOne(id string) (utils.DBObject, int, er
return data, code, err
}
var workspaceResourceTypes = []tools.DataType{
tools.COMPUTE_RESOURCE,
tools.DATA_RESOURCE,
tools.PROCESSING_RESOURCE,
tools.STORAGE_RESOURCE,
tools.WORKFLOW_RESOURCE,
tools.SERVICE_RESOURCE,
}
func emitResourceNATS(method tools.NATSMethod, dt tools.DataType, payload []byte) {
if !slices.Contains(workspaceResourceTypes, dt) {
return
}
tools.NewNATSCaller().SetNATSPub(method, tools.NATSResponse{
FromApp: config.GetAppName(),
Datatype: dt,
Method: int(method),
Payload: payload,
})
}
func (dca *ResourceMongoAccessor[T]) DeleteOne(id string) (utils.DBObject, int, error) {
data, code, err := dca.AbstractAccessor.LoadOne(id)
if err != nil {
return data, code, err
}
res, code, err := dca.AbstractAccessor.DeleteOne(id)
if err == nil && data != nil {
b, _ := json.Marshal(data)
go emitResourceNATS(tools.REMOVE_RESOURCE, dca.GetType(), b)
}
return res, code, err
}
func (dca *ResourceMongoAccessor[T]) UpdateOne(set map[string]interface{}, id string) (utils.DBObject, int, error) {
if dca.GetType() == tools.COMPUTE_RESOURCE {
delete(set, "architecture")
@@ -193,9 +229,119 @@ func (dca *ResourceMongoAccessor[T]) StoreOne(data utils.DBObject) (utils.DBObje
"resources_id": idsToUpdate,
}, i)
}
if err == nil && res != nil {
b, _ := json.Marshal(res)
go emitResourceNATS(tools.CREATE_RESOURCE, dca.GetType(), b)
}
return res, code, err
}
// PurgedResourcePayload holds a silently-deleted resource's type and serialized payload.
type PurgedResourcePayload struct {
DT tools.DataType
Payload []byte
}
// purgeByType searches and silently deletes all resources of type T created by creatorID.
// Uses AbstractAccessor.DeleteOne directly to bypass the NATS-emitting override.
func purgeByType[T ResourceInterface](dt tools.DataType, creatorID string) []PurgedResourcePayload {
a := NewAccessor[T](dt, nil)
if a == nil {
return nil
}
res, _, _ := a.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"creator_id": {{Operator: dbs.EQUAL.String(), Value: creatorID}},
},
}, "", false, 0, 10000)
var result []PurgedResourcePayload
for _, item := range res {
b, err := json.Marshal(item)
if err != nil {
continue
}
a.AbstractAccessor.DeleteOne(item.GetID())
result = append(result, PurgedResourcePayload{DT: dt, Payload: b})
}
return result
}
// PurgeCreatorResources deletes all catalog resources created by creatorPeerID from
// the DB without emitting NATS. Used for non-blacklist peer privilege downgrades where
// workspace state should be left untouched.
func PurgeCreatorResources(creatorPeerID string) []PurgedResourcePayload {
var result []PurgedResourcePayload
result = append(result, purgeByType[*ComputeResource](tools.COMPUTE_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*DataResource](tools.DATA_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*ProcessingResource](tools.PROCESSING_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*StorageResource](tools.STORAGE_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*WorkflowResource](tools.WORKFLOW_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*ServiceResource](tools.SERVICE_RESOURCE, creatorPeerID)...)
return result
}
// FilterMapFromResourcePayload deserializes a resource payload by DataType, zeros out
// the AbstractInstanciatedResource (and its AbstractResource / Instances sub-fields),
// then marshals back to get only the concrete type's own JSON fields.
// Returns nil for WORKFLOW_RESOURCE and unknown types.
// JSON keys only — not BSON paths.
func FilterMapFromResourcePayload(dt tools.DataType, payload []byte) map[string]interface{} {
var m map[string]interface{}
switch dt {
case tools.COMPUTE_RESOURCE:
var r ComputeResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*ComputeResourceInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.DATA_RESOURCE:
var r DataResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*DataInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.PROCESSING_RESOURCE:
var r ProcessingResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*ProcessingInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.STORAGE_RESOURCE:
var r StorageResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*StorageResourceInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.SERVICE_RESOURCE:
var r ServiceResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*ServiceInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.WORKFLOW_RESOURCE:
var r WorkflowResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractResource = AbstractResource{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
default:
return nil
}
return m
}
func (dca *ResourceMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
return dca.StoreOne(data)
}
+1 -1
View File
@@ -150,7 +150,7 @@ func (ao *AbstractObject) UpToDate(user string, peer string, create bool) {
ao.UpdateDate = time.Now()
ao.UpdaterID = peer
ao.UserUpdaterID = user
if create && ao.CreatorID != "" {
if create && ao.CreatorID == "" {
ao.CreationDate = time.Now()
ao.CreatorID = peer
ao.UserCreatorID = user
+83 -4
View File
@@ -128,10 +128,8 @@ func ModelGenericUpdateOne(change map[string]interface{}, id string, a Accessor)
r.Sign()
}
loaded := r.Serialize(r) // get the loaded object
for k, v := range change { // apply the changes, with a flatten method
loaded[k] = v
}
loaded := r.Serialize(r) // get the loaded object
deepMerge(loaded, change)
newObj := a.NewObj()
b, err = json.Marshal(loaded)
if err != nil {
@@ -255,6 +253,87 @@ func IsMySelf(peerID string, wfa Accessor) (bool, string) {
return peerID == pp.GetID(), pp.GetID()
}
// deepMerge overlays patch values onto base, preserving base values for keys
// absent from patch, nil patch values, and empty strings when base is non-empty.
// This prevents partial frontend payloads from silently erasing server-managed
// fields (source, env, country, owners, creator_id, creation_date, …).
func deepMerge(base, patch map[string]interface{}) {
for k, pv := range patch {
bv := base[k]
switch pvTyped := pv.(type) {
case map[string]interface{}:
if bvMap, ok := bv.(map[string]interface{}); ok {
deepMerge(bvMap, pvTyped)
} else {
base[k] = pv
}
case []interface{}:
if bvSlice, ok := bv.([]interface{}); ok {
base[k] = mergeSlices(bvSlice, pvTyped)
} else {
base[k] = pv
}
case string:
// Don't overwrite a non-empty base value with an empty string.
if pvTyped != "" {
base[k] = pv
}
default:
if pv != nil {
base[k] = pv
}
}
}
}
// mergeSlices merges two slices element-wise.
// For slices of maps it matches elements by their "id" field when available;
// falls back to positional matching. An empty patch slice leaves base intact.
func mergeSlices(base, patch []interface{}) []interface{} {
if len(patch) == 0 {
return base
}
for _, e := range patch {
if _, ok := e.(map[string]interface{}); !ok {
return patch // non-map elements: replace wholesale
}
}
baseByID := map[string]map[string]interface{}{}
for _, e := range base {
if em, ok := e.(map[string]interface{}); ok {
if id, ok := em["id"].(string); ok && id != "" {
baseByID[id] = em
}
}
}
result := make([]interface{}, 0, len(patch))
for i, pe := range patch {
pm, _ := pe.(map[string]interface{})
if pm == nil {
result = append(result, pe)
continue
}
var baseElem map[string]interface{}
if id, ok := pm["id"].(string); ok && id != "" {
baseElem = baseByID[id]
}
if baseElem == nil && i < len(base) {
baseElem, _ = base[i].(map[string]interface{})
}
if baseElem != nil {
merged := make(map[string]interface{}, len(baseElem))
for k, v := range baseElem {
merged[k] = v
}
deepMerge(merged, pm)
result = append(result, merged)
} else {
result = append(result, pe)
}
}
return result
}
func GenerateNodeID() (string, error) {
folderStatic := "/var/lib/opencloud-node"
if _, err := os.Stat(folderStatic); err == nil {
+226 -14
View File
@@ -54,7 +54,11 @@ type Workflow struct {
Outputs map[string][]models.Param `json:"outputs" bson:"outputs"`
Args map[string][]string `json:"args" bson:"args"`
Exposes map[string][]models.Expose `bson:"exposes" json:"exposes"` // Expose is the execution
SelectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty"`
SelectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty" bson:"selected_embedded_storages,omitempty"`
// StaleMap maps resource ID → stale bool. Populated at GET time from the
// verify campaign results stored in oc-workflow's stale cache. Not persisted.
StaleMap map[string]bool `json:"stale_map,omitempty" bson:"-"`
}
func (d *Workflow) GetAccessor(request *tools.APIRequest) utils.Accessor {
@@ -1064,8 +1068,16 @@ const (
ViolationInvertedArrow ViolationType = "inverted_arrow"
ViolationIsolatedProcessing ViolationType = "isolated_processing"
ViolationStorageNotLinkedToProcessing ViolationType = "storage_not_linked_to_processing"
ViolationDynamicNotConfigured ViolationType = "dynamic_not_configured"
ViolationAnchorOnNonStorage ViolationType = "anchor_on_non_storage"
)
// ocAnchorAccess is the magic value used by oc-monitord to inject storage
// access credentials into a workflow step at runtime. It must only appear as
// an output of a storage (or dynamic-storage) element.
const ocAnchorAccess = "§oc:access§"
// IntegrityViolation describes a single structural or semantic problem
// found in the workflow graph.
type IntegrityViolation struct {
@@ -1103,6 +1115,8 @@ func (w *Workflow) ValidateIntegrity() []IntegrityViolation {
violations = append(violations, w.detectInvertedArrows()...)
violations = append(violations, w.detectIsolatedProcessings()...)
violations = append(violations, w.detectOrphanedStorages()...)
violations = append(violations, w.validateDynamicFilters()...)
violations = append(violations, w.validateAnchorOutputs()...)
return violations
}
@@ -1180,14 +1194,12 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation {
var name string
switch {
case w.Graph.IsProcessing(item) && item.Processing != nil:
// IsService processings are long-running services and don't need a Compute booking.
if item.Processing.IsService {
continue
}
needsCompute = true
name = item.Processing.GetName()
case w.Graph.IsService(item) && item.Service != nil:
// HOSTED services use an existing endpoint — no Compute booking needed.
inst := item.Service.GetSelectedInstance(nil)
if inst != nil {
if si, ok := inst.(*resources.ServiceInstance); ok && si.Mode == resources.HOSTED {
@@ -1196,6 +1208,9 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation {
}
needsCompute = true
name = item.Service.GetName()
case w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.PROCESSING_RESOURCE:
needsCompute = true
name = w.itemName(id)
}
if !needsCompute {
continue
@@ -1210,7 +1225,12 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation {
} else {
continue
}
if other, ok := w.Graph.Items[otherID]; ok && w.Graph.IsCompute(other) {
other, ok := w.Graph.Items[otherID]
if !ok {
continue
}
// A concrete compute OR a dynamic node typed as compute satisfies the requirement.
if w.Graph.IsCompute(other) || (w.Graph.IsDynamic(other) && other.Dynamic.Type == tools.COMPUTE_RESOURCE) {
hasCompute = true
break
}
@@ -1301,12 +1321,41 @@ func (w *Workflow) detectCycles() []IntegrityViolation {
// validateDataStorageLinks checks that every Data item with a non-empty Source
// has at least one Storage linked — the builder needs this to inject the
// download step (curl or NATS/Minio protocol).
// isStorageEquivalent returns true for items that can satisfy a Data node's
// storage requirement: concrete storage, dynamic-storage, or a compute that
// has at least one embedded storage available on one of its instances.
func (w *Workflow) isStorageEquivalent(item graph.GraphItem) bool {
if w.Graph.IsStorage(item) {
return true
}
if w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE {
return true
}
if w.Graph.IsCompute(item) && item.Compute != nil {
for _, inst := range item.Compute.Instances {
if len(inst.AvailableStorages) > 0 {
return true
}
}
}
return false
}
func (w *Workflow) validateDataStorageLinks() []IntegrityViolation {
var violations []IntegrityViolation
dataStorageLinks := w.Graph.GetDataStorageLinks()
// Build the set of data item IDs that have a valid storage-equivalent link.
linkedStorage := map[string]struct{}{}
for _, dsl := range dataStorageLinks {
linkedStorage[dsl.DataItemID] = struct{}{}
for _, link := range w.Graph.Links {
srcItem, srcOk := w.Graph.Items[link.Source.ID]
dstItem, dstOk := w.Graph.Items[link.Destination.ID]
if !srcOk || !dstOk {
continue
}
if w.Graph.IsData(srcItem) && w.isStorageEquivalent(dstItem) {
linkedStorage[link.Source.ID] = struct{}{}
} else if w.isStorageEquivalent(srcItem) && w.Graph.IsData(dstItem) {
linkedStorage[link.Destination.ID] = struct{}{}
}
}
for id, item := range w.Graph.Items {
if !w.Graph.IsData(item) || item.Data == nil {
@@ -1492,12 +1541,162 @@ func (w *Workflow) detectIsolatedProcessings() []IntegrityViolation {
return violations
}
// ---------------------------------------------------------------------------
// AE validation helpers — centralised so both oc-scheduler and oc-schedulerd
// share the same logic without code duplication.
// ---------------------------------------------------------------------------
// BuildResourceIDSet constructs the per-type resource-ID map and the flat
// coupling-membership set used by ValidateWorkflowAE.
//
// selectedEmbeddedStorages and selectedInstances come from the scheduling
// request (WorkflowSchedule) or from the WorkflowExecution at launch time.
// Embedded storages are NOT stored in Workflow.Storages (they are inside
// ComputeResourceInstance.AvailableStorages), so they must be resolved here
// to make them visible to the AE coupling check.
func (w *Workflow) BuildResourceIDSet(
selectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection,
selectedInstances ConfigItem,
) (map[tools.DataType][]string, map[string]struct{}) {
resourcesByType := map[tools.DataType][]string{
tools.DATA_RESOURCE: w.Datas,
tools.PROCESSING_RESOURCE: w.Processings,
tools.STORAGE_RESOURCE: append([]string{}, w.Storages...),
tools.COMPUTE_RESOURCE: w.Computes,
tools.WORKFLOW_RESOURCE: w.Workflows,
tools.SERVICE_RESOURCE: w.Services,
}
idSet := map[string]struct{}{}
for _, ids := range resourcesByType {
for _, id := range ids {
idSet[id] = struct{}{}
}
}
for graphItemID, sel := range selectedEmbeddedStorages {
if sel == nil {
continue
}
c, ok := w.Graph.Items[graphItemID]
if !ok {
continue
}
_, computeRes := c.GetResource()
computeResource, ok := computeRes.(*resources.ComputeResource)
if !ok {
continue
}
computeIdx := 0
if d := selectedInstances.Get(computeResource.GetID()); d != nil {
computeIdx = *d
}
if computeIdx >= len(computeResource.Instances) {
continue
}
computeInst := computeResource.Instances[computeIdx]
if sel.StorageIndex >= len(computeInst.AvailableStorages) {
continue
}
storageID := computeInst.AvailableStorages[sel.StorageIndex].GetID()
if storageID == "" {
continue
}
idSet[storageID] = struct{}{}
resourcesByType[tools.STORAGE_RESOURCE] = append(resourcesByType[tools.STORAGE_RESOURCE], storageID)
}
return resourcesByType, idSet
}
// ValidateWorkflowAE checks the ExploitationAuthorizations of every resource
// referenced in resourcesByType against the coupling/peer/workflow constraints.
//
// loadResource is injected by the caller to avoid a circular import
// (oc-lib/models/resources → oclib → oc-lib/models → resources).
// A nil return from loadResource means "resource not found — skip".
func (w *Workflow) ValidateWorkflowAE(
workflowID, consumerPeerID string,
resourcesByType map[tools.DataType][]string,
idSet map[string]struct{},
loadResource func(tools.DataType, string) resources.ResourceInterface,
) []resources.AEViolation {
now := time.Now().UTC()
var violations []resources.AEViolation
for dt, ids := range resourcesByType {
for _, id := range ids {
res := loadResource(dt, id)
if res == nil {
continue
}
for _, ae := range res.GetExploitationAuthorizations() {
violations = append(violations, ae.CheckAE(id, workflowID, consumerPeerID, idSet, now)...)
}
}
}
return violations
}
// detectOrphanedStorages warns when a storage node is not linked to any
// processing node — it contributes no data flow to the workflow.
// validateDynamicFilters mirrors oc-front's dynamic-not-configured check:
// a dynamic node with no filters cannot be resolved by the scheduler.
func (w *Workflow) validateDynamicFilters() []IntegrityViolation {
var violations []IntegrityViolation
for id, item := range w.Graph.Items {
if !w.Graph.IsDynamic(item) {
continue
}
f := item.Dynamic.Filters
if len(f.And) == 0 && len(f.Or) == 0 {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationDynamicNotConfigured,
ItemIDs: []string{id},
Message: fmt.Sprintf(
`"%s" is a dynamic %s with no filters — at least one filter is required for the scheduler to resolve it`,
w.itemName(id), item.Dynamic.Type,
),
})
}
}
return violations
}
// validateAnchorOutputs mirrors oc-front's anchor-on-non-storage check:
// the §oc:access§ magic value must only appear as an output of a storage,
// dynamic-storage, or compute node that has an active embedded storage selected.
func (w *Workflow) validateAnchorOutputs() []IntegrityViolation {
var violations []IntegrityViolation
for id, item := range w.Graph.Items {
if w.Graph.IsStorage(item) || (w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE) {
continue
}
if w.Graph.IsCompute(item) {
if _, ok := w.SelectedEmbeddedStorages[id]; ok {
continue
}
}
for _, p := range w.Outputs[id] {
if p.Value == ocAnchorAccess {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationAnchorOnNonStorage,
ItemIDs: []string{id},
Message: fmt.Sprintf(
`"%s" has an access anchor (%s) as output — access anchors may only be outputs of storage elements`,
w.itemName(id), ocAnchorAccess,
),
})
break
}
}
}
return violations
}
func (w *Workflow) detectOrphanedStorages() []IntegrityViolation {
var violations []IntegrityViolation
for id, item := range w.Graph.Items {
if !w.Graph.IsStorage(item) {
// Check both concrete storage and dynamic-storage nodes.
if !w.Graph.IsStorage(item) && !(w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE) {
continue
}
linkedTopics := map[string]struct{}{}
@@ -1510,15 +1709,28 @@ func (w *Workflow) detectOrphanedStorages() []IntegrityViolation {
} else {
continue
}
if other, ok := w.Graph.Items[otherID]; ok {
switch {
case w.Graph.IsProcessing(other):
other, ok := w.Graph.Items[otherID]
if !ok {
continue
}
switch {
case w.Graph.IsProcessing(other):
linkedTopics["processing"] = struct{}{}
case w.Graph.IsCompute(other):
linkedTopics["compute"] = struct{}{}
case w.Graph.IsData(other):
linkedTopics["data"] = struct{}{}
case w.Graph.IsService(other):
linkedTopics["service"] = struct{}{}
case w.Graph.IsDynamic(other):
switch other.Dynamic.Type {
case tools.PROCESSING_RESOURCE:
linkedTopics["processing"] = struct{}{}
case w.Graph.IsCompute(other):
case tools.COMPUTE_RESOURCE:
linkedTopics["compute"] = struct{}{}
case w.Graph.IsData(other):
case tools.DATA_RESOURCE:
linkedTopics["data"] = struct{}{}
case w.Graph.IsService(other):
case tools.SERVICE_RESOURCE:
linkedTopics["service"] = struct{}{}
}
}
+2 -2
View File
@@ -160,7 +160,7 @@ func (a *workflowMongoAccessor) execute(workflow *Workflow, delete bool, active
if err == nil && len(resource) > 0 { // if the workspace already exists, update it
w := &workspace.Workspace{
Active: active,
ResourceSet: resources.ResourceSet{
WorkspaceResourceSet: resources.WorkspaceResourceSet{
Datas: workflow.Datas,
Processings: workflow.Processings,
Storages: workflow.Storages,
@@ -173,7 +173,7 @@ func (a *workflowMongoAccessor) execute(workflow *Workflow, delete bool, active
a.workspaceAccessor.StoreOne(&workspace.Workspace{
Active: active,
AbstractObject: utils.AbstractObject{Name: workflow.Name + "_workspace"},
ResourceSet: resources.ResourceSet{
WorkspaceResourceSet: resources.WorkspaceResourceSet{
Datas: workflow.Datas,
Processings: workflow.Processings,
Storages: workflow.Storages,
@@ -48,6 +48,10 @@ type WorkflowExecution struct {
BookingsState map[string]BookingState `json:"bookings_state" bson:"bookings_state,omitempty"` // booking_id → reservation+completion status
PurchasesState map[string]bool `json:"purchases_state" bson:"purchases_state,omitempty"` // purchase_id → confirmed
// ResourceConsents records which consent strings the user acknowledged per resource
// (resource_id → list of acknowledged ConsentString values) at scheduling time.
ResourceConsents map[string][]string `json:"resource_consents,omitempty" bson:"resource_consents,omitempty"`
// Graph is a lightweight, real-time summary of the workflow execution graph.
// Keyed by workflow graph item ID; updated by oc-scheduler on each step-done event.
// Consumed by oc-front to render the live execution panel via websocket updates.
@@ -97,7 +101,7 @@ func (r *WorkflowExecution) CanUpdate(set utils.DBObject) (bool, utils.DBObject)
}
func (r *WorkflowExecution) CanDelete() bool {
return r.IsDraft // only draft bookings can be deleted
return true // only draft bookings can be deleted
}
func (wfa *WorkflowExecution) Equals(we *WorkflowExecution) bool {
+176 -6
View File
@@ -1,23 +1,49 @@
package workspace
import (
"fmt"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
// trustedRelations holds peer relations that yield TrustMap = true for a resource.
var trustedRelations = map[peer.PeerRelation]bool{
peer.PARTNER: true,
peer.MASTER: true,
peer.NANO: true,
peer.ORGANIZATION_MASTER: true,
peer.ORGANIZATION_MEMBER: true,
peer.ORGANIZATION_PARTNER: true,
}
// Workspace is a struct that represents a workspace
type Workspace struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
resources.ResourceSet // ResourceSet contains the resources of the workspace (data, compute, processing, storage, workflow)
IsContextual bool `json:"is_contextual" bson:"is_contextual" default:"false"` // IsContextual is a flag that indicates if the workspace is contextual
Active bool `json:"active" bson:"active" default:"false"` // Active is a flag that indicates if the workspace is active
Shared string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workspace
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
resources.WorkspaceResourceSet // WorkspaceResourceSet persists both IDs and complete resource objects
IsContextual bool `json:"is_contextual" bson:"is_contextual" default:"false"`
Active bool `json:"active" bson:"active" default:"false"`
Shared string `json:"shared,omitempty" bson:"shared,omitempty"`
// Notifications accumulates strings for auto-modifications (e.g. resource removed after peer blacklist).
// Cleared by the owner via the notifications update endpoint.
Notifications []string `json:"notifications,omitempty" bson:"notifications,omitempty"`
// TrustMap maps resource ID → trust bool based on the creator peer's relation.
// Not persisted (bson:"-") — recomputed on every load by ComputeTrustAndClean.
TrustMap map[string]bool `json:"trust_map,omitempty" bson:"-"`
// StaleMap maps resource ID → stale bool. Populated at GET time from the
// verify campaign results stored in oc-workspace's stale cache. Not persisted.
StaleMap map[string]bool `json:"stale_map,omitempty" bson:"-"`
}
func (d *Workspace) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request) // Create a new instance of the accessor
return NewAccessor(request)
}
func (ao *Workspace) VerifyAuth(callName string, request *tools.APIRequest) bool {
@@ -30,3 +56,147 @@ func (ao *Workspace) VerifyAuth(callName string, request *tools.APIRequest) bool
}
return ao.AbstractObject.VerifyAuth(callName, request)
}
// ComputeTrustAndClean populates TrustMap for all resources embedded in this workspace,
// removes resources whose creator peer is blacklisted, and appends a deletion notification
// for each removal. Returns true when at least one resource was removed (caller should persist).
func (w *Workspace) ComputeTrustAndClean() bool {
w.TrustMap = map[string]bool{}
selfPeer, _ := utils.GetMySelf(peer.NewShallowAccessor())
var selfPeerID string
if selfPeer != nil {
if p, ok := selfPeer.(*peer.Peer); ok {
selfPeerID = p.PeerID
}
}
// Cache peer relations to avoid redundant DB lookups per workspace load.
cache := map[string]peer.PeerRelation{}
relation := func(creatorID string) peer.PeerRelation {
if r, ok := cache[creatorID]; ok {
return r
}
if creatorID == selfPeerID {
cache[creatorID] = peer.SELF
return peer.SELF
}
results, _, _ := peer.NewShallowAccessor().Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"peer_id": {{Operator: dbs.EQUAL.String(), Value: creatorID}},
},
}, "", false, 0, 1)
rel := peer.NONE
if len(results) > 0 {
if p, ok := results[0].(*peer.Peer); ok {
rel = p.Relation
}
}
cache[creatorID] = rel
return rel
}
setTrust := func(id, creatorID string, rel peer.PeerRelation) {
w.TrustMap[id] = (creatorID == selfPeerID) || trustedRelations[rel]
}
changed := false
var keptData []*resources.DataResource
for _, r := range w.DataResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptData = append(keptData, r)
}
}
w.DataResources = keptData
var keptCompute []*resources.ComputeResource
for _, r := range w.ComputeResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptCompute = append(keptCompute, r)
}
}
w.ComputeResources = keptCompute
var keptStorage []*resources.StorageResource
for _, r := range w.StorageResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptStorage = append(keptStorage, r)
}
}
w.StorageResources = keptStorage
var keptProcessing []*resources.ProcessingResource
for _, r := range w.ProcessingResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptProcessing = append(keptProcessing, r)
}
}
w.ProcessingResources = keptProcessing
var keptWorkflow []*resources.WorkflowResource
for _, r := range w.WorkflowResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptWorkflow = append(keptWorkflow, r)
}
}
w.WorkflowResources = keptWorkflow
var keptService []*resources.ServiceResource
for _, r := range w.ServiceResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptService = append(keptService, r)
}
}
w.ServiceResources = keptService
var keptDynamic []*resources.DynamicResource
for _, r := range w.DynamicResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptDynamic = append(keptDynamic, r)
}
}
w.DynamicResources = keptDynamic
var keptNative []*resources.NativeTool
for _, r := range w.NativeTools {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptNative = append(keptNative, r)
}
}
w.NativeTools = keptNative
return changed
}
+70 -6
View File
@@ -7,10 +7,60 @@ import (
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
func init() {
resources.WorkspaceCandidatesProvider = func(dt tools.DataType, request *tools.APIRequest) []resources.ResourceInterface {
res, _, _ := NewAccessor(request).Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"user_creator_id": {{Operator: dbs.EQUAL.String(), Value: request.Username}},
"active": {{Operator: dbs.EQUAL.String(), Value: true}},
},
}, "", false, 0, 1)
if len(res) == 0 {
return []resources.ResourceInterface{}
}
ws, ok := res[0].(*Workspace)
if !ok {
return []resources.ResourceInterface{}
}
// ws.Fill was already called by Search — typed slices are populated.
// Return an empty non-nil slice when the workspace exists but has no
// resources of the requested type: caller must not fall back to catalog.
out := []resources.ResourceInterface{}
switch dt {
case tools.COMPUTE_RESOURCE:
for _, c := range ws.ComputeResources {
out = append(out, c)
}
case tools.DATA_RESOURCE:
for _, c := range ws.DataResources {
out = append(out, c)
}
case tools.STORAGE_RESOURCE:
for _, c := range ws.StorageResources {
out = append(out, c)
}
case tools.PROCESSING_RESOURCE:
for _, c := range ws.ProcessingResources {
out = append(out, c)
}
case tools.WORKFLOW_RESOURCE:
for _, c := range ws.WorkflowResources {
out = append(out, c)
}
case tools.SERVICE_RESOURCE:
for _, c := range ws.ServiceResources {
out = append(out, c)
}
}
return out
}
}
// Workspace is a struct that represents a workspace
type workspaceMongoAccessor struct {
utils.AbstractAccessor[*Workspace] // AbstractAccessor contains the basic fields of an accessor (model, caller)
@@ -88,25 +138,39 @@ func (a *workspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject,
func (a *workspaceMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne(id, a.New(), func(d utils.DBObject) (utils.DBObject, int, error) {
d.(*Workspace).Fill(a.GetRequest())
return d, 200, nil
w := d.(*Workspace)
w.Fill(a.GetRequest())
a.applyTrustAndClean(w)
return w, 200, nil
}, a)
}
func (a *workspaceMongoAccessor) LoadAll(isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
return utils.GenericLoadAll[*Workspace](func(d utils.DBObject) utils.ShallowDBObject {
d.(*Workspace).Fill(a.GetRequest())
return d
w := d.(*Workspace)
w.Fill(a.GetRequest())
a.applyTrustAndClean(w)
return w
}, isDraft, a, offset, limit)
}
func (a *workspaceMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
return utils.GenericSearch[*Workspace](filters, search, (&Workspace{}).GetObjectFilters(search), func(d utils.DBObject) utils.ShallowDBObject {
d.(*Workspace).Fill(a.GetRequest())
return d
w := d.(*Workspace)
w.Fill(a.GetRequest())
a.applyTrustAndClean(w)
return w
}, isDraft, a, offset, limit)
}
// applyTrustAndClean calls ComputeTrustAndClean and, when resources were removed due to
// blacklisted peers, persists the cleaned workspace back to the database.
func (a *workspaceMongoAccessor) applyTrustAndClean(w *Workspace) {
if changed := w.ComputeTrustAndClean(); changed {
utils.GenericUpdateOne(w.Serialize(w), w.GetID(), a)
}
}
/*
This function is used to share the workspace with the peers
*/
+52 -3
View File
@@ -40,6 +40,7 @@ const (
REFUND
DISCOUNT
SUBSCRIPTION
POLICY
)
var NOAPI = func() string {
@@ -104,6 +105,7 @@ var InnerDefaultAPI = [...]func() string{
NOAPI,
NOAPI,
NOAPI,
PEERSAPI,
}
// Bind the standard data name to the data type
@@ -138,6 +140,7 @@ var Str = [...]string{
"refund",
"discount",
"subscription",
"policy",
}
func FromString(comp string) int {
@@ -174,7 +177,7 @@ func DataTypeList() []DataType {
WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY,
ORDER, PURCHASE_RESOURCE,
LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL, EXECUTION_VERIFICATION, ALLOWED_IMAGE, SERVICE_RESOURCE, DYNAMIC_RESOURCE, LIVE_SERVICE,
PAYMENT, REFUND, DISCOUNT, SUBSCRIPTION}
PAYMENT, REFUND, DISCOUNT, SUBSCRIPTION, POLICY}
}
type PropalgationMessage struct {
@@ -208,6 +211,40 @@ const (
// for a private source resource (isReachable=false, Phase 4).
// oc-discovery routes it to the resource owner peer via ProtocolSourcePresignResource.
PB_SOURCE_PRESIGN
// PB_ORG_PARTNER is propagated via PB_PROPAGATE through oc-discovery to the
// organization master's oc-discovery, which notifies its oc-peer via
// ORG_PARTNER_EVENT. The master's oc-peer confirms or rejects by emitting a
// PROPALGATION_EVENT back, which oc-discovery routes to the originating
// oc-discovery, which in turn notifies our oc-peer via ORG_PARTNER_EVENT to
// finalize the relation.
PB_ORG_PARTNER
// PB_WATCH_RESOURCE is emitted by oc-workspace when a non-self resource is
// stored in a workspace. oc-discovery contacts the creator peer to register
// the watching peerID in the creator's watcher cache so it receives future
// CREATE/DELETE events for that resource.
// Payload: { "creator_peer_id": "...", "resource_id": "..." }
PB_WATCH_RESOURCE
// PB_UNWATCH_RESOURCE is emitted by oc-workspace when a non-self resource is
// removed from all workspaces. oc-discovery contacts the creator peer to
// deregister the watching peerID from the creator's watcher cache.
// Payload: { "creator_peer_id": "...", "resource_id": "..." }
PB_UNWATCH_RESOURCE
// PB_BOOKING_SYNC is emitted by master every 24 h to each known NANO.
// Payload: {"peer_id": nano.PeerID, "booking_sync_ids": ["id1", "id2", ...]}
// Nano compares the list against its own confirmed bookings and calls
// SendBookingToMaster for any it has that master is missing.
PB_BOOKING_SYNC
// PB_VERIFY_RESOURCE is emitted by oc-workspace or oc-workflow on workspace
// activation / workflow opening to verify that an embedded non-self resource
// is still current. oc-discovery forwards the request to the creator peer via
// ProtocolVerifyResource; the result comes back as a VERIFY_RESOURCE NATS event.
// Payload: { "creator_peer_id": "…", "data_type": N, "resource_payload": {…} }
PB_VERIFY_RESOURCE
)
func GetActionString(ss string) PubSubAction {
@@ -242,6 +279,14 @@ func GetActionString(ss string) PubSubAction {
return PB_PROPAGATE
case "source_presign":
return PB_SOURCE_PRESIGN
case "org_partner":
return PB_ORG_PARTNER
case "watch_resource":
return PB_WATCH_RESOURCE
case "unwatch_resource":
return PB_UNWATCH_RESOURCE
case "booking_sync":
return PB_BOOKING_SYNC
default:
return NONE
}
@@ -264,8 +309,12 @@ var path = []string{
"none", // 12 NONE
"observe", // 13 PB_OBSERVE
"observe_close", // 14 PB_OBSERVE_CLOSE
"propagate", // 15 PB_PROPAGATE
"source_presign", // 16 PB_SOURCE_PRESIGN
"propagate", // 15 PB_PROPAGATE
"source_presign", // 16 PB_SOURCE_PRESIGN
"org_partner", // 17 PB_ORG_PARTNER
"watch_resource", // 18 PB_WATCH_RESOURCE
"unwatch_resource", // 19 PB_UNWATCH_RESOURCE
"booking_sync", // 20 PB_BOOKING_SYNC
}
func (m PubSubAction) String() string {
+11 -2
View File
@@ -32,7 +32,7 @@ var meths = []string{"remove execution", "create execution", "planner execution"
"considers event", "admiralty config event", "minio config event", "pvc config event",
"workflow started event", "workflow step done event", "workflow done event",
"peer behavior event", "peer observe response event", "peer observe event",
"source presign event",
"source presign event", "org partner event", "verify resource event",
}
const (
@@ -85,6 +85,15 @@ const (
// oc-datacenter listens to it to generate a pre-signed Minio URL and reply
// via PB_CONSIDERS (Phase 4 — isReachable=false).
SOURCE_PRESIGN_EVENT
// ORG_PARTNER_EVENT is emitted by a peer to its OrganizationMaster to ask:
// "is peer X one of your members?". The master replies via the same channel.
ORG_PARTNER_EVENT
// VERIFY_RESOURCE is emitted by oc-discovery when it receives a verify response
// from a remote peer via ProtocolVerifyResource. oc-workspace and oc-workflow
// listen to this event to update their stale caches.
VERIFY_RESOURCE
)
func (n NATSMethod) String() string {
@@ -98,7 +107,7 @@ func NameToMethod(name string) NATSMethod {
CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT, PVC_CONFIG_EVENT,
WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT,
PEER_BEHAVIOR_EVENT, PEER_OBSERVE_RESPONSE_EVENT, PEER_OBSERVE_EVENT,
SOURCE_PRESIGN_EVENT} {
SOURCE_PRESIGN_EVENT, ORG_PARTNER_EVENT, VERIFY_RESOURCE} {
if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) {
return v
}