Payment Flow + Access Flow Change

This commit is contained in:
mr
2026-05-27 15:50:23 +02:00
parent e6a9558cbf
commit cef23b5f30
40 changed files with 2227 additions and 410 deletions
+44
View File
@@ -145,6 +145,50 @@ func (g *Graph) GetAverageTimeProcessingBeforeStart(average float64, processingI
return max, nil
}
// DataStorageLink represents a resolved Data→Storage pair found in the graph.
type DataStorageLink struct {
DataItemID string
StorageItemID string
}
// GetDataStorageLinks returns all links that connect a Data item to a Storage item.
// These links are mandatory when the Data instance has a Source configured:
// the workflow builder uses them to know where to download the data before
// any processing step that consumes that storage.
func (g *Graph) GetDataStorageLinks() []DataStorageLink {
var result []DataStorageLink
for _, link := range g.Links {
srcItem, srcOk := g.Items[link.Source.ID]
dstItem, dstOk := g.Items[link.Destination.ID]
if !srcOk || !dstOk {
continue
}
if g.IsData(srcItem) && g.IsStorage(dstItem) {
result = append(result, DataStorageLink{
DataItemID: link.Source.ID,
StorageItemID: link.Destination.ID,
})
} else if g.IsStorage(srcItem) && g.IsData(dstItem) {
result = append(result, DataStorageLink{
DataItemID: link.Destination.ID,
StorageItemID: link.Source.ID,
})
}
}
return result
}
// GetLinkedStorageForData returns the storage item IDs linked to a given Data item.
func (g *Graph) GetLinkedStorageForData(dataItemID string) []string {
var storageIDs []string
for _, dsl := range g.GetDataStorageLinks() {
if dsl.DataItemID == dataItemID {
storageIDs = append(storageIDs, dsl.StorageItemID)
}
}
return storageIDs
}
func (g *Graph) GetResource(id string) (tools.DataType, resources.ResourceInterface) {
if item, ok := g.Items[id]; ok {
if item.NativeTool != nil {
+23 -23
View File
@@ -15,32 +15,32 @@ type GraphItem struct {
}
func (g *GraphItem) GetResource() (tools.DataType, resources.ResourceInterface) {
if g.Data != nil {
return tools.DATA_RESOURCE, g.Data
} else if g.Compute != nil {
return tools.COMPUTE_RESOURCE, g.Compute
} else if g.Workflow != nil {
return tools.WORKFLOW_RESOURCE, g.Workflow
} else if g.Processing != nil {
return tools.PROCESSING_RESOURCE, g.Processing
} else if g.Storage != nil {
return tools.STORAGE_RESOURCE, g.Storage
} else if g.NativeTool != nil {
return tools.NATIVE_TOOL, g.NativeTool
} else if g.Service != nil {
return tools.SERVICE_RESOURCE, g.Service
} else if g.Dynamic != nil {
return tools.DYNAMIC_RESOURCE, g.Dynamic
if g.ItemResource.Data != nil {
return tools.DATA_RESOURCE, g.ItemResource.Data
} else if g.ItemResource.Compute != nil {
return tools.COMPUTE_RESOURCE, g.ItemResource.Compute
} else if g.ItemResource.Workflow != nil {
return tools.WORKFLOW_RESOURCE, g.ItemResource.Workflow
} else if g.ItemResource.Processing != nil {
return tools.PROCESSING_RESOURCE, g.ItemResource.Processing
} else if g.ItemResource.Storage != nil {
return tools.STORAGE_RESOURCE, g.ItemResource.Storage
} else if g.ItemResource.NativeTool != nil {
return tools.NATIVE_TOOL, g.ItemResource.NativeTool
} else if g.ItemResource.Service != nil {
return tools.SERVICE_RESOURCE, g.ItemResource.Service
} else if g.ItemResource.Dynamic != nil {
return tools.DYNAMIC_RESOURCE, g.ItemResource.Dynamic
}
return tools.INVALID, nil
}
func (g *GraphItem) Clear() {
g.Data = nil
g.Compute = nil
g.Workflow = nil
g.Processing = nil
g.Storage = nil
g.Service = nil
g.Dynamic = nil
g.ItemResource.Data = nil
g.ItemResource.Compute = nil
g.ItemResource.Workflow = nil
g.ItemResource.Processing = nil
g.ItemResource.Storage = nil
g.ItemResource.Service = nil
g.ItemResource.Dynamic = nil
}
+4 -4
View File
@@ -23,11 +23,11 @@ func (l *GraphLink) IsComputeLink(g Graph) (bool, string) {
if g.Items == nil {
return false, ""
}
if d, ok := g.Items[l.Source.ID]; ok && d.Compute != nil {
return true, d.Compute.UUID
if d, ok := g.Items[l.Source.ID]; ok && d.ItemResource.Compute != nil {
return true, d.ItemResource.Compute.UUID
}
if d, ok := g.Items[l.Destination.ID]; ok && d.Compute != nil {
return true, d.Compute.UUID
if d, ok := g.Items[l.Destination.ID]; ok && d.ItemResource.Compute != nil {
return true, d.ItemResource.Compute.UUID
}
return false, ""
}
+18 -18
View File
@@ -104,17 +104,17 @@ func plantUMLVarNames(items map[string]graph.GraphItem) map[string]string {
func plantUMLPrefix(item graph.GraphItem) string {
switch {
case item.NativeTool != nil:
case item.ItemResource.NativeTool != nil:
return "e"
case item.Data != nil:
case item.ItemResource.Data != nil:
return "d"
case item.Processing != nil:
case item.ItemResource.Processing != nil:
return "p"
case item.Storage != nil:
case item.ItemResource.Storage != nil:
return "s"
case item.Compute != nil:
case item.ItemResource.Compute != nil:
return "c"
case item.Workflow != nil:
case item.ItemResource.Workflow != nil:
return "wf"
}
return "u"
@@ -123,24 +123,24 @@ func plantUMLPrefix(item graph.GraphItem) string {
// plantUMLItemLine builds the PlantUML declaration line for one graph item.
func plantUMLItemLine(varName string, item graph.GraphItem) string {
switch {
case item.NativeTool != nil:
case item.ItemResource.NativeTool != nil:
// WorkflowEvent has no instance and no configurable attributes.
return fmt.Sprintf("WorkflowEvent(%s, \"%s\")", varName, item.NativeTool.GetName())
return fmt.Sprintf("WorkflowEvent(%s, \"%s\")", varName, item.ItemResource.NativeTool.GetName())
case item.Data != nil:
return plantUMLResourceLine("Data", varName, item.Data)
case item.ItemResource.Data != nil:
return plantUMLResourceLine("Data", varName, item.ItemResource.Data)
case item.Processing != nil:
return plantUMLResourceLine("Processing", varName, item.Processing)
case item.ItemResource.Processing != nil:
return plantUMLResourceLine("Processing", varName, item.ItemResource.Processing)
case item.Storage != nil:
return plantUMLResourceLine("Storage", varName, item.Storage)
case item.ItemResource.Storage != nil:
return plantUMLResourceLine("Storage", varName, item.ItemResource.Storage)
case item.Compute != nil:
return plantUMLResourceLine("ComputeUnit", varName, item.Compute)
case item.ItemResource.Compute != nil:
return plantUMLResourceLine("ComputeUnit", varName, item.ItemResource.Compute)
case item.Workflow != nil:
return plantUMLResourceLine("Workflow", varName, item.Workflow)
case item.ItemResource.Workflow != nil:
return plantUMLResourceLine("Workflow", varName, item.ItemResource.Workflow)
}
return ""
}
+423
View File
@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"mime/multipart"
"regexp"
"strconv"
"strings"
"time"
@@ -911,3 +912,425 @@ func plan[T resources.ResourceInterface](
}
return resources, priceds, nil
}
// ── Integrity validation ─────────────────────────────────────────────────────
// Arrow direction constants matching the flutter_flow_chart ArrowDirection enum
// (index order: forward=0, backward=1, bidirectionnal=2).
const (
arrowDirectionBackward int64 = 1
)
// ViolationSeverity distinguishes blocking errors from non-blocking warnings.
type ViolationSeverity int
const (
SeverityError ViolationSeverity = iota // Blocks scheduling — must be fixed.
SeverityWarning // Reported but non-blocking.
)
// ViolationType identifies the category of the violation.
// Mirrors the TopologyErrorType / TopologyWarningType enums in oc-front.
type ViolationType string
const (
// Errors — block scheduling
ViolationVariableNotFound ViolationType = "variable_not_found"
ViolationMissingComputeUnit ViolationType = "missing_compute_unit"
ViolationCycle ViolationType = "cycle"
ViolationMissingDataStorage ViolationType = "missing_data_storage"
// Warnings — non-blocking, reported for UX
ViolationInvertedArrow ViolationType = "inverted_arrow"
ViolationIsolatedProcessing ViolationType = "isolated_processing"
ViolationStorageNotLinkedToProcessing ViolationType = "storage_not_linked_to_processing"
)
// IntegrityViolation describes a single structural or semantic problem
// found in the workflow graph.
type IntegrityViolation struct {
Severity ViolationSeverity
Type ViolationType
ItemIDs []string // graph item IDs involved in the violation
Message string
}
func (v IntegrityViolation) IsError() bool { return v.Severity == SeverityError }
func (v IntegrityViolation) IsWarning() bool { return v.Severity == SeverityWarning }
// ValidateIntegrity checks the structural and semantic integrity of the workflow
// graph. It must be called by both oc-front (UX enforcement) and oc-schedulerd
// (sovereign enforcement, regardless of submission source — the front can be
// bypassed via direct API calls).
//
// Errors (block scheduling):
// 1. Variable not found — an arg references $varName not defined in env/inputs.
// 2. Missing compute — a Processing/non-HOSTED Service has no Compute linked.
// 3. Cycle — the processing DAG contains a directed cycle.
// 4. Missing data storage — a Data with Source has no Storage linked.
//
// Warnings (non-blocking):
// 5. Inverted arrow — a backward link between two processing nodes.
// 6. Isolated processing — a processing node with no processing neighbours.
// 7. Storage not linked to processing — a storage node orphaned from any processing.
func (w *Workflow) ValidateIntegrity() []IntegrityViolation {
var violations []IntegrityViolation
violations = append(violations, w.validateVariables()...)
violations = append(violations, w.validateComputeLinks()...)
violations = append(violations, w.detectCycles()...)
violations = append(violations, w.validateDataStorageLinks()...)
violations = append(violations, w.detectInvertedArrows()...)
violations = append(violations, w.detectIsolatedProcessings()...)
violations = append(violations, w.detectOrphanedStorages()...)
return violations
}
// HasCriticalViolations returns true when ValidateIntegrity found at least one Error.
// oc-schedulerd uses this to reject a workflow without inspecting each violation.
func (w *Workflow) HasCriticalViolations() bool {
for _, v := range w.ValidateIntegrity() {
if v.IsError() {
return true
}
}
return false
}
// itemName returns a human-readable name for a graph item (falls back to itemID).
func (w *Workflow) itemName(itemID string) string {
item, ok := w.Graph.Items[itemID]
if !ok {
return itemID
}
_, res := item.GetResource()
if res != nil {
return res.GetName()
}
return itemID
}
// validateVariables checks that every $varName reference inside w.Args is
// defined in the corresponding element's env or inputs — mirroring
// WorkflowFactory.validateArgs() in oc-front.
var varRefPattern = regexp.MustCompile(`\$\{?([A-Za-z_][A-Za-z0-9_]*)\}?`)
func (w *Workflow) validateVariables() []IntegrityViolation {
var violations []IntegrityViolation
for itemID, argList := range w.Args {
if len(argList) == 0 {
continue
}
available := map[string]struct{}{}
for _, p := range w.Env[itemID] {
if p.Name != "" {
available[p.Name] = struct{}{}
}
}
for _, p := range w.Inputs[itemID] {
if p.Name != "" {
available[p.Name] = struct{}{}
}
}
name := w.itemName(itemID)
for _, arg := range argList {
for _, m := range varRefPattern.FindAllStringSubmatch(arg, -1) {
varName := m[1]
if _, ok := available[varName]; !ok {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationVariableNotFound,
ItemIDs: []string{itemID},
Message: fmt.Sprintf(`"%s": arg "%s" → variable $%s is not defined in env or inputs`, name, arg, varName),
})
}
}
}
}
return violations
}
// validateComputeLinks checks that every Processing node (and every non-HOSTED
// Service node) has at least one Compute linked — mirroring the computeErrors
// block in oc-front's checkTopology().
func (w *Workflow) validateComputeLinks() []IntegrityViolation {
var violations []IntegrityViolation
for id, item := range w.Graph.Items {
needsCompute := false
var name string
switch {
case w.Graph.IsProcessing(item) && item.Processing != nil:
// IsService processings are long-running services and don't need a Compute booking.
if item.Processing.IsService {
continue
}
needsCompute = true
name = item.Processing.GetName()
case w.Graph.IsService(item) && item.Service != nil:
// HOSTED services use an existing endpoint — no Compute booking needed.
inst := item.Service.GetSelectedInstance(nil)
if inst != nil {
if si, ok := inst.(*resources.ServiceInstance); ok && si.Mode == resources.HOSTED {
continue
}
}
needsCompute = true
name = item.Service.GetName()
}
if !needsCompute {
continue
}
hasCompute := false
for _, link := range w.Graph.Links {
var otherID string
if link.Source.ID == id {
otherID = link.Destination.ID
} else if link.Destination.ID == id {
otherID = link.Source.ID
} else {
continue
}
if other, ok := w.Graph.Items[otherID]; ok && w.Graph.IsCompute(other) {
hasCompute = true
break
}
}
if !hasCompute {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationMissingComputeUnit,
ItemIDs: []string{id},
Message: fmt.Sprintf(`"%s" has no compute unit linked`, name),
})
}
}
return violations
}
// detectCycles runs DFS colouring on the processing→processing directed graph
// and reports any back-edge as a cycle error — mirroring dfsCycle() in oc-front.
func (w *Workflow) detectCycles() []IntegrityViolation {
var violations []IntegrityViolation
// Collect processing + service + event node IDs (execution flux nodes).
procIDs := map[string]struct{}{}
for id, item := range w.Graph.Items {
if w.Graph.IsProcessing(item) || w.Graph.IsService(item) || w.Graph.IsNativeTool(item) {
procIDs[id] = struct{}{}
}
}
// Build directed successors honoring ArrowDirection.
successors := map[string][]string{}
for id := range procIDs {
successors[id] = []string{}
}
for _, link := range w.Graph.Links {
src, dst := link.Source.ID, link.Destination.ID
_, srcIsProc := procIDs[src]
_, dstIsProc := procIDs[dst]
if !srcIsProc || !dstIsProc {
continue
}
dir := int64(0)
if link.Style != nil {
dir = link.Style.ArrowDirection
}
if dir == arrowDirectionBackward {
// Visual arrow reversed: dst runs before src.
successors[dst] = append(successors[dst], src)
} else {
successors[src] = append(successors[src], dst)
}
}
// DFS colouring: 0=white, 1=grey (in stack), 2=black (done).
color := map[string]int{}
reported := map[string]struct{}{}
var dfs func(u string)
dfs = func(u string) {
color[u] = 1
for _, v := range successors[u] {
if color[v] == 1 {
key := u + "→" + v
if _, seen := reported[key]; !seen {
reported[key] = struct{}{}
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationCycle,
ItemIDs: []string{u, v},
Message: fmt.Sprintf(`Infinite loop: "%s" → "%s" creates a cycle that would block execution indefinitely`,
w.itemName(u), w.itemName(v)),
})
}
} else if color[v] == 0 {
dfs(v)
}
}
color[u] = 2
}
for id := range procIDs {
if color[id] == 0 {
dfs(id)
}
}
return violations
}
// validateDataStorageLinks checks that every Data item with a non-empty Source
// has at least one Storage linked — the builder needs this to inject the
// download step (curl or NATS/Minio protocol).
func (w *Workflow) validateDataStorageLinks() []IntegrityViolation {
var violations []IntegrityViolation
dataStorageLinks := w.Graph.GetDataStorageLinks()
linkedStorage := map[string]struct{}{}
for _, dsl := range dataStorageLinks {
linkedStorage[dsl.DataItemID] = struct{}{}
}
for id, item := range w.Graph.Items {
if !w.Graph.IsData(item) || item.Data == nil {
continue
}
hasSource := false
for _, inst := range item.Data.Instances {
if inst.Access.HasSource() {
hasSource = true
break
}
}
if !hasSource {
continue
}
if _, ok := linkedStorage[id]; !ok {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationMissingDataStorage,
ItemIDs: []string{id},
Message: fmt.Sprintf(`data "%s" has a source but no Storage linked`, item.Data.GetName()),
})
}
}
return violations
}
// detectInvertedArrows warns when a link between two processing nodes uses a
// backward arrow direction — mirroring the invertedArrow warning in oc-front.
func (w *Workflow) detectInvertedArrows() []IntegrityViolation {
var violations []IntegrityViolation
for _, link := range w.Graph.Links {
if link.Style == nil || link.Style.ArrowDirection != arrowDirectionBackward {
continue
}
srcItem, srcOK := w.Graph.Items[link.Source.ID]
dstItem, dstOK := w.Graph.Items[link.Destination.ID]
if !srcOK || !dstOK {
continue
}
if (w.Graph.IsProcessing(srcItem) || w.Graph.IsService(srcItem)) &&
(w.Graph.IsProcessing(dstItem) || w.Graph.IsService(dstItem)) {
violations = append(violations, IntegrityViolation{
Severity: SeverityWarning,
Type: ViolationInvertedArrow,
ItemIDs: []string{link.Source.ID, link.Destination.ID},
Message: fmt.Sprintf(`Reversed arrow between "%s" & "%s": "%s" will execute before "%s" unexpectedly`,
w.itemName(link.Destination.ID), w.itemName(link.Source.ID),
w.itemName(link.Destination.ID), w.itemName(link.Source.ID)),
})
}
}
return violations
}
// detectIsolatedProcessings warns when a processing node has no link to another
// processing node — it will execute synchronously with the workflow's first elements.
func (w *Workflow) detectIsolatedProcessings() []IntegrityViolation {
var violations []IntegrityViolation
procIDs := map[string]struct{}{}
for id, item := range w.Graph.Items {
if w.Graph.IsProcessing(item) || w.Graph.IsService(item) || w.Graph.IsNativeTool(item) {
procIDs[id] = struct{}{}
}
}
for id := range procIDs {
hasProcNeighbour := false
for _, link := range w.Graph.Links {
var otherID string
if link.Source.ID == id {
otherID = link.Destination.ID
} else if link.Destination.ID == id {
otherID = link.Source.ID
} else {
continue
}
if _, ok := procIDs[otherID]; ok {
hasProcNeighbour = true
break
}
}
if !hasProcNeighbour {
violations = append(violations, IntegrityViolation{
Severity: SeverityWarning,
Type: ViolationIsolatedProcessing,
ItemIDs: []string{id},
Message: fmt.Sprintf(`"%s" is isolated (no connection with another processing) — will execute synchronously with the workflow's first element(s)`,
w.itemName(id)),
})
}
}
return violations
}
// detectOrphanedStorages warns when a storage node is not linked to any
// processing node — it contributes no data flow to the workflow.
func (w *Workflow) detectOrphanedStorages() []IntegrityViolation {
var violations []IntegrityViolation
for id, item := range w.Graph.Items {
if !w.Graph.IsStorage(item) {
continue
}
linkedTopics := map[string]struct{}{}
for _, link := range w.Graph.Links {
var otherID string
if link.Source.ID == id {
otherID = link.Destination.ID
} else if link.Destination.ID == id {
otherID = link.Source.ID
} else {
continue
}
if other, ok := w.Graph.Items[otherID]; ok {
switch {
case w.Graph.IsProcessing(other):
linkedTopics["processing"] = struct{}{}
case w.Graph.IsCompute(other):
linkedTopics["compute"] = struct{}{}
case w.Graph.IsData(other):
linkedTopics["data"] = struct{}{}
case w.Graph.IsService(other):
linkedTopics["service"] = struct{}{}
}
}
}
if _, ok := linkedTopics["processing"]; ok {
continue
}
name := w.itemName(id)
var msg string
if len(linkedTopics) == 0 {
msg = fmt.Sprintf(`"%s" is isolated (not linked to anything)`, name)
} else {
topics := make([]string, 0, len(linkedTopics))
for t := range linkedTopics {
topics = append(topics, t)
}
msg = fmt.Sprintf(`"%s" is not linked to any processing (only linked to: %s)`, name, strings.Join(topics, ", "))
}
violations = append(violations, IntegrityViolation{
Severity: SeverityWarning,
Type: ViolationStorageNotLinkedToProcessing,
ItemIDs: []string{id},
Message: msg,
})
}
return violations
}