oc lib test

This commit is contained in:
mr
2026-06-04 12:04:04 +02:00
parent c726361deb
commit d9723e6431
+123 -13
View File
@@ -1064,8 +1064,16 @@ const (
ViolationInvertedArrow ViolationType = "inverted_arrow" ViolationInvertedArrow ViolationType = "inverted_arrow"
ViolationIsolatedProcessing ViolationType = "isolated_processing" ViolationIsolatedProcessing ViolationType = "isolated_processing"
ViolationStorageNotLinkedToProcessing ViolationType = "storage_not_linked_to_processing" ViolationStorageNotLinkedToProcessing ViolationType = "storage_not_linked_to_processing"
ViolationDynamicNotConfigured ViolationType = "dynamic_not_configured"
ViolationAnchorOnNonStorage ViolationType = "anchor_on_non_storage"
) )
// ocAnchorAccess is the magic value used by oc-monitord to inject storage
// access credentials into a workflow step at runtime. It must only appear as
// an output of a storage (or dynamic-storage) element.
const ocAnchorAccess = "§oc:access§"
// IntegrityViolation describes a single structural or semantic problem // IntegrityViolation describes a single structural or semantic problem
// found in the workflow graph. // found in the workflow graph.
type IntegrityViolation struct { type IntegrityViolation struct {
@@ -1103,6 +1111,8 @@ func (w *Workflow) ValidateIntegrity() []IntegrityViolation {
violations = append(violations, w.detectInvertedArrows()...) violations = append(violations, w.detectInvertedArrows()...)
violations = append(violations, w.detectIsolatedProcessings()...) violations = append(violations, w.detectIsolatedProcessings()...)
violations = append(violations, w.detectOrphanedStorages()...) violations = append(violations, w.detectOrphanedStorages()...)
violations = append(violations, w.validateDynamicFilters()...)
violations = append(violations, w.validateAnchorOutputs()...)
return violations return violations
} }
@@ -1180,14 +1190,12 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation {
var name string var name string
switch { switch {
case w.Graph.IsProcessing(item) && item.Processing != nil: case w.Graph.IsProcessing(item) && item.Processing != nil:
// IsService processings are long-running services and don't need a Compute booking.
if item.Processing.IsService { if item.Processing.IsService {
continue continue
} }
needsCompute = true needsCompute = true
name = item.Processing.GetName() name = item.Processing.GetName()
case w.Graph.IsService(item) && item.Service != nil: case w.Graph.IsService(item) && item.Service != nil:
// HOSTED services use an existing endpoint — no Compute booking needed.
inst := item.Service.GetSelectedInstance(nil) inst := item.Service.GetSelectedInstance(nil)
if inst != nil { if inst != nil {
if si, ok := inst.(*resources.ServiceInstance); ok && si.Mode == resources.HOSTED { if si, ok := inst.(*resources.ServiceInstance); ok && si.Mode == resources.HOSTED {
@@ -1196,6 +1204,9 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation {
} }
needsCompute = true needsCompute = true
name = item.Service.GetName() name = item.Service.GetName()
case w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.PROCESSING_RESOURCE:
needsCompute = true
name = w.itemName(id)
} }
if !needsCompute { if !needsCompute {
continue continue
@@ -1210,7 +1221,12 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation {
} else { } else {
continue continue
} }
if other, ok := w.Graph.Items[otherID]; ok && w.Graph.IsCompute(other) { other, ok := w.Graph.Items[otherID]
if !ok {
continue
}
// A concrete compute OR a dynamic node typed as compute satisfies the requirement.
if w.Graph.IsCompute(other) || (w.Graph.IsDynamic(other) && other.Dynamic.Type == tools.COMPUTE_RESOURCE) {
hasCompute = true hasCompute = true
break break
} }
@@ -1301,12 +1317,41 @@ func (w *Workflow) detectCycles() []IntegrityViolation {
// validateDataStorageLinks checks that every Data item with a non-empty Source // validateDataStorageLinks checks that every Data item with a non-empty Source
// has at least one Storage linked — the builder needs this to inject the // has at least one Storage linked — the builder needs this to inject the
// download step (curl or NATS/Minio protocol). // download step (curl or NATS/Minio protocol).
// isStorageEquivalent returns true for items that can satisfy a Data node's
// storage requirement: concrete storage, dynamic-storage, or a compute that
// has at least one embedded storage available on one of its instances.
func (w *Workflow) isStorageEquivalent(item graph.GraphItem) bool {
if w.Graph.IsStorage(item) {
return true
}
if w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE {
return true
}
if w.Graph.IsCompute(item) && item.Compute != nil {
for _, inst := range item.Compute.Instances {
if len(inst.AvailableStorages) > 0 {
return true
}
}
}
return false
}
func (w *Workflow) validateDataStorageLinks() []IntegrityViolation { func (w *Workflow) validateDataStorageLinks() []IntegrityViolation {
var violations []IntegrityViolation var violations []IntegrityViolation
dataStorageLinks := w.Graph.GetDataStorageLinks() // Build the set of data item IDs that have a valid storage-equivalent link.
linkedStorage := map[string]struct{}{} linkedStorage := map[string]struct{}{}
for _, dsl := range dataStorageLinks { for _, link := range w.Graph.Links {
linkedStorage[dsl.DataItemID] = struct{}{} srcItem, srcOk := w.Graph.Items[link.Source.ID]
dstItem, dstOk := w.Graph.Items[link.Destination.ID]
if !srcOk || !dstOk {
continue
}
if w.Graph.IsData(srcItem) && w.isStorageEquivalent(dstItem) {
linkedStorage[link.Source.ID] = struct{}{}
} else if w.isStorageEquivalent(srcItem) && w.Graph.IsData(dstItem) {
linkedStorage[link.Destination.ID] = struct{}{}
}
} }
for id, item := range w.Graph.Items { for id, item := range w.Graph.Items {
if !w.Graph.IsData(item) || item.Data == nil { if !w.Graph.IsData(item) || item.Data == nil {
@@ -1587,10 +1632,62 @@ func (w *Workflow) ValidateWorkflowAE(
// detectOrphanedStorages warns when a storage node is not linked to any // detectOrphanedStorages warns when a storage node is not linked to any
// processing node — it contributes no data flow to the workflow. // processing node — it contributes no data flow to the workflow.
// validateDynamicFilters mirrors oc-front's dynamic-not-configured check:
// a dynamic node with no filters cannot be resolved by the scheduler.
func (w *Workflow) validateDynamicFilters() []IntegrityViolation {
var violations []IntegrityViolation
for id, item := range w.Graph.Items {
if !w.Graph.IsDynamic(item) {
continue
}
f := item.Dynamic.Filters
if len(f.And) == 0 && len(f.Or) == 0 {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationDynamicNotConfigured,
ItemIDs: []string{id},
Message: fmt.Sprintf(
`"%s" is a dynamic %s with no filters — at least one filter is required for the scheduler to resolve it`,
w.itemName(id), item.Dynamic.Type,
),
})
}
}
return violations
}
// validateAnchorOutputs mirrors oc-front's anchor-on-non-storage check:
// the §oc:access§ magic value must only appear as an output of a storage
// (or dynamic-storage) element.
func (w *Workflow) validateAnchorOutputs() []IntegrityViolation {
var violations []IntegrityViolation
for id, item := range w.Graph.Items {
if w.Graph.IsStorage(item) || (w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE) {
continue
}
for _, p := range w.Outputs[id] {
if p.Value == ocAnchorAccess {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationAnchorOnNonStorage,
ItemIDs: []string{id},
Message: fmt.Sprintf(
`"%s" has an access anchor (%s) as output — access anchors may only be outputs of storage elements`,
w.itemName(id), ocAnchorAccess,
),
})
break
}
}
}
return violations
}
func (w *Workflow) detectOrphanedStorages() []IntegrityViolation { func (w *Workflow) detectOrphanedStorages() []IntegrityViolation {
var violations []IntegrityViolation var violations []IntegrityViolation
for id, item := range w.Graph.Items { for id, item := range w.Graph.Items {
if !w.Graph.IsStorage(item) { // Check both concrete storage and dynamic-storage nodes.
if !w.Graph.IsStorage(item) && !(w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE) {
continue continue
} }
linkedTopics := map[string]struct{}{} linkedTopics := map[string]struct{}{}
@@ -1603,15 +1700,28 @@ func (w *Workflow) detectOrphanedStorages() []IntegrityViolation {
} else { } else {
continue continue
} }
if other, ok := w.Graph.Items[otherID]; ok { other, ok := w.Graph.Items[otherID]
switch { if !ok {
case w.Graph.IsProcessing(other): continue
}
switch {
case w.Graph.IsProcessing(other):
linkedTopics["processing"] = struct{}{}
case w.Graph.IsCompute(other):
linkedTopics["compute"] = struct{}{}
case w.Graph.IsData(other):
linkedTopics["data"] = struct{}{}
case w.Graph.IsService(other):
linkedTopics["service"] = struct{}{}
case w.Graph.IsDynamic(other):
switch other.Dynamic.Type {
case tools.PROCESSING_RESOURCE:
linkedTopics["processing"] = struct{}{} linkedTopics["processing"] = struct{}{}
case w.Graph.IsCompute(other): case tools.COMPUTE_RESOURCE:
linkedTopics["compute"] = struct{}{} linkedTopics["compute"] = struct{}{}
case w.Graph.IsData(other): case tools.DATA_RESOURCE:
linkedTopics["data"] = struct{}{} linkedTopics["data"] = struct{}{}
case w.Graph.IsService(other): case tools.SERVICE_RESOURCE:
linkedTopics["service"] = struct{}{} linkedTopics["service"] = struct{}{}
} }
} }