From d9723e6431efac2213481a2abe6ce5d7b498adae Mon Sep 17 00:00:00 2001 From: mr Date: Thu, 4 Jun 2026 12:04:04 +0200 Subject: [PATCH] oc lib test --- models/workflow/workflow.go | 136 ++++++++++++++++++++++++++++++++---- 1 file changed, 123 insertions(+), 13 deletions(-) diff --git a/models/workflow/workflow.go b/models/workflow/workflow.go index bfac8dc..e33c66e 100644 --- a/models/workflow/workflow.go +++ b/models/workflow/workflow.go @@ -1064,8 +1064,16 @@ const ( ViolationInvertedArrow ViolationType = "inverted_arrow" ViolationIsolatedProcessing ViolationType = "isolated_processing" ViolationStorageNotLinkedToProcessing ViolationType = "storage_not_linked_to_processing" + ViolationDynamicNotConfigured ViolationType = "dynamic_not_configured" + ViolationAnchorOnNonStorage ViolationType = "anchor_on_non_storage" ) +// ocAnchorAccess is the magic value used by oc-monitord to inject storage +// access credentials into a workflow step at runtime. It must only appear as +// an output of a storage (or dynamic-storage) element. +const ocAnchorAccess = "§oc:access§" + + // IntegrityViolation describes a single structural or semantic problem // found in the workflow graph. type IntegrityViolation struct { @@ -1103,6 +1111,8 @@ func (w *Workflow) ValidateIntegrity() []IntegrityViolation { violations = append(violations, w.detectInvertedArrows()...) violations = append(violations, w.detectIsolatedProcessings()...) violations = append(violations, w.detectOrphanedStorages()...) + violations = append(violations, w.validateDynamicFilters()...) + violations = append(violations, w.validateAnchorOutputs()...) return violations } @@ -1180,14 +1190,12 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation { var name string switch { case w.Graph.IsProcessing(item) && item.Processing != nil: - // IsService processings are long-running services and don't need a Compute booking. if item.Processing.IsService { continue } needsCompute = true name = item.Processing.GetName() case w.Graph.IsService(item) && item.Service != nil: - // HOSTED services use an existing endpoint — no Compute booking needed. inst := item.Service.GetSelectedInstance(nil) if inst != nil { if si, ok := inst.(*resources.ServiceInstance); ok && si.Mode == resources.HOSTED { @@ -1196,6 +1204,9 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation { } needsCompute = true name = item.Service.GetName() + case w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.PROCESSING_RESOURCE: + needsCompute = true + name = w.itemName(id) } if !needsCompute { continue @@ -1210,7 +1221,12 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation { } else { continue } - if other, ok := w.Graph.Items[otherID]; ok && w.Graph.IsCompute(other) { + other, ok := w.Graph.Items[otherID] + if !ok { + continue + } + // A concrete compute OR a dynamic node typed as compute satisfies the requirement. + if w.Graph.IsCompute(other) || (w.Graph.IsDynamic(other) && other.Dynamic.Type == tools.COMPUTE_RESOURCE) { hasCompute = true break } @@ -1301,12 +1317,41 @@ func (w *Workflow) detectCycles() []IntegrityViolation { // validateDataStorageLinks checks that every Data item with a non-empty Source // has at least one Storage linked — the builder needs this to inject the // download step (curl or NATS/Minio protocol). +// isStorageEquivalent returns true for items that can satisfy a Data node's +// storage requirement: concrete storage, dynamic-storage, or a compute that +// has at least one embedded storage available on one of its instances. +func (w *Workflow) isStorageEquivalent(item graph.GraphItem) bool { + if w.Graph.IsStorage(item) { + return true + } + if w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE { + return true + } + if w.Graph.IsCompute(item) && item.Compute != nil { + for _, inst := range item.Compute.Instances { + if len(inst.AvailableStorages) > 0 { + return true + } + } + } + return false +} + func (w *Workflow) validateDataStorageLinks() []IntegrityViolation { var violations []IntegrityViolation - dataStorageLinks := w.Graph.GetDataStorageLinks() + // Build the set of data item IDs that have a valid storage-equivalent link. linkedStorage := map[string]struct{}{} - for _, dsl := range dataStorageLinks { - linkedStorage[dsl.DataItemID] = struct{}{} + for _, link := range w.Graph.Links { + srcItem, srcOk := w.Graph.Items[link.Source.ID] + dstItem, dstOk := w.Graph.Items[link.Destination.ID] + if !srcOk || !dstOk { + continue + } + if w.Graph.IsData(srcItem) && w.isStorageEquivalent(dstItem) { + linkedStorage[link.Source.ID] = struct{}{} + } else if w.isStorageEquivalent(srcItem) && w.Graph.IsData(dstItem) { + linkedStorage[link.Destination.ID] = struct{}{} + } } for id, item := range w.Graph.Items { if !w.Graph.IsData(item) || item.Data == nil { @@ -1587,10 +1632,62 @@ func (w *Workflow) ValidateWorkflowAE( // detectOrphanedStorages warns when a storage node is not linked to any // processing node — it contributes no data flow to the workflow. +// validateDynamicFilters mirrors oc-front's dynamic-not-configured check: +// a dynamic node with no filters cannot be resolved by the scheduler. +func (w *Workflow) validateDynamicFilters() []IntegrityViolation { + var violations []IntegrityViolation + for id, item := range w.Graph.Items { + if !w.Graph.IsDynamic(item) { + continue + } + f := item.Dynamic.Filters + if len(f.And) == 0 && len(f.Or) == 0 { + violations = append(violations, IntegrityViolation{ + Severity: SeverityError, + Type: ViolationDynamicNotConfigured, + ItemIDs: []string{id}, + Message: fmt.Sprintf( + `"%s" is a dynamic %s with no filters — at least one filter is required for the scheduler to resolve it`, + w.itemName(id), item.Dynamic.Type, + ), + }) + } + } + return violations +} + +// validateAnchorOutputs mirrors oc-front's anchor-on-non-storage check: +// the §oc:access§ magic value must only appear as an output of a storage +// (or dynamic-storage) element. +func (w *Workflow) validateAnchorOutputs() []IntegrityViolation { + var violations []IntegrityViolation + for id, item := range w.Graph.Items { + if w.Graph.IsStorage(item) || (w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE) { + continue + } + for _, p := range w.Outputs[id] { + if p.Value == ocAnchorAccess { + violations = append(violations, IntegrityViolation{ + Severity: SeverityError, + Type: ViolationAnchorOnNonStorage, + ItemIDs: []string{id}, + Message: fmt.Sprintf( + `"%s" has an access anchor (%s) as output — access anchors may only be outputs of storage elements`, + w.itemName(id), ocAnchorAccess, + ), + }) + break + } + } + } + return violations +} + func (w *Workflow) detectOrphanedStorages() []IntegrityViolation { var violations []IntegrityViolation for id, item := range w.Graph.Items { - if !w.Graph.IsStorage(item) { + // Check both concrete storage and dynamic-storage nodes. + if !w.Graph.IsStorage(item) && !(w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE) { continue } linkedTopics := map[string]struct{}{} @@ -1603,15 +1700,28 @@ func (w *Workflow) detectOrphanedStorages() []IntegrityViolation { } else { continue } - if other, ok := w.Graph.Items[otherID]; ok { - switch { - case w.Graph.IsProcessing(other): + other, ok := w.Graph.Items[otherID] + if !ok { + continue + } + switch { + case w.Graph.IsProcessing(other): + linkedTopics["processing"] = struct{}{} + case w.Graph.IsCompute(other): + linkedTopics["compute"] = struct{}{} + case w.Graph.IsData(other): + linkedTopics["data"] = struct{}{} + case w.Graph.IsService(other): + linkedTopics["service"] = struct{}{} + case w.Graph.IsDynamic(other): + switch other.Dynamic.Type { + case tools.PROCESSING_RESOURCE: linkedTopics["processing"] = struct{}{} - case w.Graph.IsCompute(other): + case tools.COMPUTE_RESOURCE: linkedTopics["compute"] = struct{}{} - case w.Graph.IsData(other): + case tools.DATA_RESOURCE: linkedTopics["data"] = struct{}{} - case w.Graph.IsService(other): + case tools.SERVICE_RESOURCE: linkedTopics["service"] = struct{}{} } }