Files
oc-lib/models/workflow/workflow.go
T
2026-06-03 15:33:39 +02:00

1549 lines
53 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package workflow
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"mime/multipart"
"regexp"
"strconv"
"strings"
"time"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/booking/planner"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
"cloud.o-forge.io/core/oc-lib/models/common"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/resources/native_tools"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
)
type ConfigItem map[string]int
func (c ConfigItem) Get(key string) *int {
i := 0
if ins, ok := c[key]; ok {
i = ins
}
return &i
}
/*
* Workflow is a struct that represents a workflow
* it defines the native workflow
*/
type Workflow struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
resources.ResourceSet
Graph *graph.Graph `bson:"graph,omitempty" json:"graph,omitempty"` // Graph UI & logic representation of the workflow
// Schedule *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"` // Schedule is the schedule of the workflow
Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workflow // AbstractWorkflow contains the basic fields of a workflow
Env map[string][]models.Param `json:"env" bson:"env"`
Inputs map[string][]models.Param `json:"inputs" bson:"inputs"`
Outputs map[string][]models.Param `json:"outputs" bson:"outputs"`
Args map[string][]string `json:"args" bson:"args"`
Exposes map[string][]models.Expose `bson:"exposes" json:"exposes"` // Expose is the execution
SelectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty" bson:"selected_embedded_storages,omitempty"`
}
func (d *Workflow) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request) // Create a new instance of the accessor
}
func (d *Workflow) GetResources(dt tools.DataType) []resources.ResourceInterface {
itf := []resources.ResourceInterface{}
switch dt {
case tools.NATIVE_TOOL:
for _, d := range d.NativeTools {
itf = append(itf, d)
}
return itf
case tools.DATA_RESOURCE:
for _, d := range d.DataResources {
itf = append(itf, d)
}
return itf
case tools.PROCESSING_RESOURCE:
for _, d := range d.ProcessingResources {
itf = append(itf, d)
}
return itf
case tools.COMPUTE_RESOURCE:
for _, d := range d.ComputeResources {
itf = append(itf, d)
}
return itf
case tools.WORKFLOW_RESOURCE:
for _, d := range d.WorkflowResources {
itf = append(itf, d)
}
return itf
case tools.STORAGE_RESOURCE:
for _, d := range d.StorageResources {
itf = append(itf, d)
}
return itf
case tools.SERVICE_RESOURCE:
for _, d := range d.ServiceResources {
itf = append(itf, d)
}
return itf
case tools.DYNAMIC_RESOURCE:
for _, d := range d.DynamicResources {
itf = append(itf, d)
}
return itf
}
return itf
}
func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.APIRequest) (*Workflow, []string, error) {
if plantUML == nil {
return d, nil, errors.New("no file available to export")
}
defer plantUML.Close()
d.Datas = []string{}
d.Storages = []string{}
d.Processings = []string{}
d.Computes = []string{}
d.Workflows = []string{}
d.Dynamics = []string{}
d.Services = []string{}
d.DataResources = []*resources.DataResource{}
d.StorageResources = []*resources.StorageResource{}
d.ProcessingResources = []*resources.ProcessingResource{}
d.ComputeResources = []*resources.ComputeResource{}
d.WorkflowResources = []*resources.WorkflowResource{}
d.DynamicResources = []*resources.DynamicResource{}
d.ServiceResources = []*resources.ServiceResource{}
d.Graph = graph.NewGraph()
resourceCatalog := map[string]func() resources.ResourceInterface{
"Processing": func() resources.ResourceInterface {
return &resources.ProcessingResource{
AbstractInstanciatedResource: resources.AbstractInstanciatedResource[*resources.ProcessingInstance]{
Instances: []*resources.ProcessingInstance{},
},
}
},
"Storage": func() resources.ResourceInterface {
return &resources.StorageResource{
AbstractInstanciatedResource: resources.AbstractInstanciatedResource[*resources.StorageResourceInstance]{
Instances: []*resources.StorageResourceInstance{},
},
}
},
"Data": func() resources.ResourceInterface {
return &resources.DataResource{
AbstractInstanciatedResource: resources.AbstractInstanciatedResource[*resources.DataInstance]{
Instances: []*resources.DataInstance{},
},
}
},
"ComputeUnit": func() resources.ResourceInterface {
return &resources.ComputeResource{
AbstractInstanciatedResource: resources.AbstractInstanciatedResource[*resources.ComputeResourceInstance]{
Instances: []*resources.ComputeResourceInstance{},
},
}
},
"Service": func() resources.ResourceInterface {
return &resources.ServiceResource{
AbstractInstanciatedResource: resources.AbstractInstanciatedResource[*resources.ServiceInstance]{
Instances: []*resources.ServiceInstance{},
},
}
},
"Dynamic": func() resources.ResourceInterface {
return &resources.DynamicResource{}
},
// WorkflowEvent creates a NativeTool of Kind=WORKFLOW_EVENT directly,
// without DB lookup. It has no user-defined instance.
"WorkflowEvent": func() resources.ResourceInterface {
return &resources.NativeTool{
Kind: int(native_tools.WORKFLOW_EVENT),
}
},
}
graphVarName := map[string]graph.GraphItem{}
// Collect all lines first to support look-ahead (comment on the line after
// the declaration, as produced by ToPlantUML).
scanner := bufio.NewScanner(plantUML)
var lines []string
for scanner.Scan() {
lines = append(lines, scanner.Text())
}
if err := scanner.Err(); err != nil {
return d, nil, err
}
var warnings []string
for i, line := range lines {
trimmed := strings.TrimSpace(line)
// Skip pure comment lines and PlantUML directives — they must never be
// parsed as resource declarations or links. Without this guard, a comment
// like "' source: http://my-server.com" would match the "-" link check.
if strings.HasPrefix(trimmed, "'") ||
strings.HasPrefix(trimmed, "!") ||
strings.HasPrefix(trimmed, "@") ||
trimmed == "" {
continue
}
// Build the parse line: if the current line has no inline comment and the
// next line is a pure comment, append it so parsers receive one combined line.
// Also handles the legacy inline-comment format unchanged.
parseLine := line
if !strings.Contains(line, "'") && i+1 < len(lines) {
if next := strings.TrimSpace(lines[i+1]); strings.HasPrefix(next, "'") {
parseLine = line + " " + next
}
}
// Handle links outside the catalog loop: each link line must be processed
// exactly once (the catalog loop would otherwise call extractLink once per
// catalog entry, producing N×7 duplicate links in the graph).
if strings.Contains(line, "-->") {
if err := d.extractLink(parseLine, graphVarName, "-->", false); err != nil {
fmt.Println(err)
}
continue
}
if strings.Contains(line, "<--") {
if err := d.extractLink(parseLine, graphVarName, "<--", true); err != nil {
fmt.Println(err)
}
continue
}
if strings.Contains(line, "--") {
if err := d.extractLink(parseLine, graphVarName, "--", false); err != nil {
fmt.Println(err)
}
continue
}
for n, newFn := range resourceCatalog {
if strings.Contains(line, n+"(") && !strings.Contains(line, "!procedure") && !strings.Contains(line, "!define") {
newRes := newFn()
newRes.SetID(uuid.New().String())
varName, graphItem, warns, err := d.extractResourcePlantUML(parseLine, newRes, n, request.PeerID, request)
if err != nil {
return d, warnings, err
}
warnings = append(warnings, warns...)
if graphItem != nil {
graphVarName[varName] = *graphItem
}
break
}
}
}
d.generateResource(d.GetResources(tools.DATA_RESOURCE), request)
d.generateResource(d.GetResources(tools.PROCESSING_RESOURCE), request)
d.generateResource(d.GetResources(tools.SERVICE_RESOURCE), request)
d.generateResource(d.GetResources(tools.STORAGE_RESOURCE), request)
d.generateResource(d.GetResources(tools.COMPUTE_RESOURCE), request)
d.generateResource(d.GetResources(tools.WORKFLOW_RESOURCE), request)
d.generateResource(d.GetResources(tools.DYNAMIC_RESOURCE), request)
d.Graph.Items = graphVarName
return d, warnings, nil
}
func (d *Workflow) generateResource(datas []resources.ResourceInterface, request *tools.APIRequest) error {
for _, d := range datas {
if d.GetType() == tools.COMPUTE_RESOURCE.String() {
access := live.NewAccessor[*live.LiveDatacenter](tools.LIVE_DATACENTER, request)
if b, err := json.Marshal(d); err == nil {
var liv live.LiveDatacenter
json.Unmarshal(b, &liv)
data, _, err := access.StoreOne(&liv)
if err == nil {
access.CopyOne(data)
}
}
continue
} else if d.GetType() == tools.STORAGE_RESOURCE.String() {
access := live.NewAccessor[*live.LiveStorage](tools.LIVE_STORAGE, request)
if b, err := json.Marshal(d); err == nil {
var liv live.LiveStorage
json.Unmarshal(b, &liv)
data, _, err := access.StoreOne(&liv)
if err == nil {
access.CopyOne(data)
}
}
continue
}
d.GetAccessor(request).StoreOne(d)
}
return nil
}
// setNestedKey sets a value in a nested map using dot-notation path.
// "access.container.image" → m["access"]["container"]["image"] = value
func setNestedKey(m map[string]any, path string, value any) {
parts := strings.SplitN(path, ".", 2)
if len(parts) == 1 {
m[path] = value
return
}
key, rest := parts[0], parts[1]
if _, ok := m[key]; !ok {
m[key] = map[string]any{}
}
if sub, ok := m[key].(map[string]any); ok {
setNestedKey(sub, rest, value)
}
}
// parseHumanFriendlyAttrs converts a human-friendly comment into JSON bytes.
// Supports:
// - flat: "source: http://example.com, encryption: true, size: 500"
// - nested: "access.container.image: nginx, access.container.tag: latest"
// - raw JSON passthrough (backward-compat): '{"key": "value"}'
//
// Values are auto-typed: bool, float64, or string.
// Note: the first ':' in each pair is the key/value separator,
// so URLs like "http://..." are handled correctly.
func parseHumanFriendlyAttrs(comment string) []byte {
comment = strings.TrimSpace(comment)
if strings.HasPrefix(comment, "{") {
return []byte(comment)
}
m := map[string]any{}
for _, pair := range strings.Split(comment, ",") {
pair = strings.TrimSpace(pair)
parts := strings.SplitN(pair, ":", 2)
if len(parts) != 2 {
continue
}
key := strings.TrimSpace(parts[0])
val := strings.TrimSpace(parts[1])
var typed any
if b, err := strconv.ParseBool(val); err == nil {
typed = b
} else if n, err := strconv.ParseFloat(val, 64); err == nil {
typed = n
} else {
typed = val
}
setNestedKey(m, key, typed)
}
b, _ := json.Marshal(m)
return b
}
func (d *Workflow) extractLink(line string, graphVarName map[string]graph.GraphItem, pattern string, reverse bool) error {
splitted := strings.Split(line, pattern)
if len(splitted) < 2 {
return errors.New("links elements not found")
}
// Source: trim surrounding whitespace.
srcVar := strings.TrimSpace(splitted[0])
// Destination: trim whitespace then stop at the first space or apostrophe
// (the rest may be a trailing comment produced by ToPlantUML look-ahead).
dstTokens := strings.FieldsFunc(strings.TrimSpace(splitted[1]), func(r rune) bool {
return r == ' ' || r == '\t' || r == '\''
})
if len(dstTokens) == 0 {
return errors.New("link destination var name not found")
}
dstVar := dstTokens[0]
srcItem, srcOk := graphVarName[srcVar]
dstItem, dstOk := graphVarName[dstVar]
if !srcOk || srcItem.ID == "" {
return fmt.Errorf("link source %q not declared", srcVar)
}
if !dstOk || dstItem.ID == "" {
return fmt.Errorf("link destination %q not declared", dstVar)
}
link := &graph.GraphLink{
Source: graph.Position{
ID: srcItem.ID,
X: 0,
Y: 0,
},
Destination: graph.Position{
ID: dstItem.ID,
X: 0,
Y: 0,
},
}
if reverse {
tmp := link.Destination
link.Destination = link.Source
link.Source = tmp
}
splittedComments := strings.Split(line, "'")
if len(splittedComments) > 1 {
comment := strings.ReplaceAll(splittedComments[1], "'", "")
json.Unmarshal(parseHumanFriendlyAttrs(comment), link)
}
d.Graph.Links = append(d.Graph.Links, *link)
return nil
}
func (d *Workflow) extractResourcePlantUML(line string, resource resources.ResourceInterface, dataName string, peerID string, request *tools.APIRequest) (string, *graph.GraphItem, []string, error) {
splittedFunc := strings.Split(line, "(")
if len(splittedFunc) <= 1 {
return "", nil, nil, errors.New("Can't deserialize Object, there's no func")
}
splittedParams := strings.Split(splittedFunc[1], ",")
if len(splittedParams) <= 1 {
return "", nil, nil, errors.New("Can't deserialize Object, there's no params")
}
varName := splittedParams[0]
splitted := strings.Split(splittedParams[1], "\"")
if len(splitted) <= 1 {
return "", nil, nil, errors.New("Can't deserialize Object, there's no name")
}
name := strings.ReplaceAll(splitted[1], "\\n", " ")
// Extract comment text (if present) for metadata parsing.
comment := ""
if parts := strings.Split(line, "'"); len(parts) > 1 {
comment = strings.ReplaceAll(parts[1], "'", "")
}
var warns []string
// Try to resolve an existing catalog resource (by id, then by name).
if existing, warn := d.resolveExistingResource(resource, dataName, name, comment, request); existing != nil {
warns = append(warns, warn)
item := d.addExistingGraphItem(dataName, existing)
return varName, item, warns, nil
}
// No existing resource — create new.
resource.SetName(name)
instance := d.getNewInstance(dataName, name, peerID)
if instance != nil {
if b, err := json.Marshal(resource); err == nil {
json.Unmarshal(b, instance)
}
if comment != "" {
json.Unmarshal(parseHumanFriendlyAttrs(comment), instance)
}
resource.AddInstances(instance)
}
item := d.getNewGraphItem(dataName, resource)
if item != nil {
d.Graph.Items[item.ID] = *item
}
return varName, item, warns, nil
}
// resolveExistingResource tries to find an existing catalog resource matching
// the given name or the id embedded in the PlantUML comment.
// Returns (resource, warning message) or (nil, "") if none found.
func (d *Workflow) resolveExistingResource(
proto resources.ResourceInterface,
dataName, name, comment string,
request *tools.APIRequest,
) (resources.ResourceInterface, string) {
accessor := proto.GetAccessor(request)
// 1. Try lookup by id from comment ("id: <uuid>").
if comment != "" {
attrs := map[string]any{}
json.Unmarshal(parseHumanFriendlyAttrs(comment), &attrs)
if id, ok := attrs["id"].(string); ok && id != "" {
if dbObj, _, err := accessor.LoadOne(id); err == nil && dbObj != nil {
if ri, ok := dbObj.(resources.ResourceInterface); ok {
return ri, fmt.Sprintf(`[import warning] %s "%s": existing resource retrieved by id %s`, dataName, name, id)
}
}
}
}
// 2. Try search by exact name.
filter := &dbs.Filters{
Or: map[string][]dbs.Filter{
"abstractobject.name": {{Operator: dbs.EQUAL.String(), Value: name}},
},
}
if results, _, err := accessor.Search(filter, "", false, 0, 10); err == nil {
for _, r := range results {
if r.GetName() == name {
if dbObj, _, err2 := accessor.LoadOne(r.GetID()); err2 == nil && dbObj != nil {
if ri, ok := dbObj.(resources.ResourceInterface); ok {
return ri, fmt.Sprintf(`[import warning] %s "%s": existing resource found by name and retrieved instead of creating a new one`, dataName, name)
}
}
}
}
}
return nil, ""
}
// addExistingGraphItem registers an already-existing resource in the workflow's
// ID lists and graph, without adding it to the resource lists (so generateResource
// won't try to store it again).
func (d *Workflow) addExistingGraphItem(dataName string, resource resources.ResourceInterface) *graph.GraphItem {
graphItem := &graph.GraphItem{
ID: uuid.New().String(),
ItemResource: &resources.ItemResource{},
}
switch dataName {
case "Data":
d.Datas = append(d.Datas, resource.GetID())
if r, ok := resource.(*resources.DataResource); ok {
graphItem.Data = r
}
case "Processing":
d.Processings = append(d.Processings, resource.GetID())
if r, ok := resource.(*resources.ProcessingResource); ok {
graphItem.Processing = r
}
case "Service":
d.Services = append(d.Services, resource.GetID())
if r, ok := resource.(*resources.ServiceResource); ok {
graphItem.Service = r
}
case "Storage":
d.Storages = append(d.Storages, resource.GetID())
if r, ok := resource.(*resources.StorageResource); ok {
graphItem.Storage = r
}
case "ComputeUnit":
d.Computes = append(d.Computes, resource.GetID())
if r, ok := resource.(*resources.ComputeResource); ok {
graphItem.Compute = r
}
default:
return nil
}
return graphItem
}
func (d *Workflow) getNewGraphItem(dataName string, resource resources.ResourceInterface) *graph.GraphItem {
if resource == nil {
return nil
}
graphItem := &graph.GraphItem{
ID: uuid.New().String(),
ItemResource: &resources.ItemResource{},
}
switch dataName {
case "Data":
d.Datas = append(d.Datas, resource.GetID())
d.DataResources = append(d.DataResources, resource.(*resources.DataResource))
graphItem.Data = resource.(*resources.DataResource)
case "Processing":
d.Processings = append(d.Processings, resource.GetID())
d.ProcessingResources = append(d.ProcessingResources, resource.(*resources.ProcessingResource))
graphItem.Processing = resource.(*resources.ProcessingResource)
case "Service":
d.Services = append(d.Services, resource.GetID())
d.ServiceResources = append(d.ServiceResources, resource.(*resources.ServiceResource))
graphItem.Service = resource.(*resources.ServiceResource)
case "Dynamic":
d.Dynamics = append(d.Dynamics, resource.GetID())
d.DynamicResources = append(d.DynamicResources, resource.(*resources.DynamicResource))
graphItem.Dynamic = resource.(*resources.DynamicResource)
case "WorkflowEvent":
// The resource is already a *NativeTool with Kind=WORKFLOW_EVENT set by the
// catalog factory. We use it directly without any DB lookup.
nt := resource.(*resources.NativeTool)
nt.Name = native_tools.WORKFLOW_EVENT.String()
d.NativeTool = append(d.NativeTool, nt.GetID())
graphItem.NativeTool = nt
case "Storage":
d.Storages = append(d.Storages, resource.GetID())
d.StorageResources = append(d.StorageResources, resource.(*resources.StorageResource))
graphItem.Storage = resource.(*resources.StorageResource)
case "ComputeUnit":
d.Computes = append(d.Computes, resource.GetID())
d.ComputeResources = append(d.ComputeResources, resource.(*resources.ComputeResource))
graphItem.Compute = resource.(*resources.ComputeResource)
default:
return nil
}
return graphItem
}
func (d *Workflow) getNewInstance(dataName string, name string, peerID string) resources.ResourceInstanceITF {
switch dataName {
case "Data":
return resources.NewDataInstance(name, peerID)
case "Processing":
return resources.NewProcessingInstance(name, peerID)
case "Storage":
return resources.NewStorageResourceInstance(name, peerID)
case "ComputeUnit":
return resources.NewComputeResourceInstance(name, peerID)
case "Service":
return resources.NewServiceInstance(name, peerID)
default:
return nil
}
}
type Deps struct {
Source string
Dest string
}
func (w *Workflow) IsDependancy(id string) []Deps {
dependancyOfIDs := []Deps{}
for _, link := range w.Graph.Links {
if _, ok := w.Graph.Items[link.Destination.ID]; !ok {
continue
}
source := w.Graph.Items[link.Destination.ID].Processing
if id == link.Source.ID && source != nil {
dependancyOfIDs = append(dependancyOfIDs, Deps{Source: source.GetName(), Dest: link.Destination.ID})
}
sourceWF := w.Graph.Items[link.Destination.ID].Workflow
if id == link.Source.ID && sourceWF != nil {
dependancyOfIDs = append(dependancyOfIDs, Deps{Source: sourceWF.GetName(), Dest: link.Destination.ID})
}
}
return dependancyOfIDs
}
func (w *Workflow) GetFirstItems() []graph.GraphItem {
return w.GetGraphItems(func(item graph.GraphItem) bool {
return len(w.GetDependencies(w.GetID())) == 0
})
}
func (w *Workflow) GetDependencies(id string) (dependencies []Deps) {
for _, link := range w.Graph.Links {
if _, ok := w.Graph.Items[link.Source.ID]; !ok {
continue
}
source := w.Graph.Items[link.Source.ID].Processing
if id == link.Destination.ID && source != nil {
dependencies = append(dependencies, Deps{Source: source.GetName(), Dest: link.Source.ID})
continue
}
}
return
}
func (w *Workflow) GetGraphItems(f func(item graph.GraphItem) bool) (list_datas []graph.GraphItem) {
for _, item := range w.Graph.Items {
if f(item) {
list_datas = append(list_datas, item)
}
}
return
}
func (w *Workflow) GetPricedItem(
f func(item graph.GraphItem) bool, request *tools.APIRequest,
instance int,
partnership int,
buying int,
strategy int,
bookingMode int,
buyingStrategy int,
pricingStrategy int) (map[string]pricing.PricedItemITF, error) {
list_datas := map[string]pricing.PricedItemITF{}
for _, item := range w.Graph.Items {
if f(item) {
dt, res := item.GetResource()
ord, err := res.ConvertToPricedResource(dt, &instance, &partnership, &buying, &strategy, &bookingMode, request)
if err != nil {
return list_datas, err
}
list_datas[res.GetID()] = ord
}
}
return list_datas, nil
}
type Related struct {
Node resources.ResourceInterface
Links []graph.GraphLink
}
func (w *Workflow) GetByRelatedProcessing(processingID string, g func(item graph.GraphItem) bool) map[string]Related {
related := map[string]Related{}
for _, link := range w.Graph.Links {
nodeID := link.Destination.ID
var node resources.ResourceInterface
if g(w.Graph.Items[link.Source.ID]) {
item := w.Graph.Items[link.Source.ID]
_, node = item.GetResource()
}
if node == nil && g(w.Graph.Items[link.Destination.ID]) { // if the source is not a storage, we consider that the destination is the storage
nodeID = link.Source.ID
item := w.Graph.Items[link.Destination.ID] // and the processing is the source
_, node = item.GetResource() // we are looking for the storage as destination
}
if processingID == nodeID && node != nil { // if the storage is linked to the processing
relID := node.GetID()
rel := Related{}
rel.Node = node
rel.Links = append(rel.Links, link)
related[relID] = rel
}
}
return related
}
func (ao *Workflow) VerifyAuth(callName string, request *tools.APIRequest) bool {
isAuthorized := false
if len(ao.Shared) > 0 {
for _, shared := range ao.Shared {
shared, code, _ := shallow_collaborative_area.NewAccessor(request).LoadOne(shared)
if code != 200 || shared == nil {
isAuthorized = false
} else {
isAuthorized = shared.VerifyAuth(callName, request)
}
}
}
return ao.AbstractObject.VerifyAuth(callName, request) || isAuthorized
}
// TODO : Check Booking... + Storage
/*
* CheckBooking is a function that checks the booking of the workflow on peers (even ourselves)
*/
func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) {
// check if
if wfa.Graph == nil { // no graph no booking
return false, nil
}
accessor := (&resources.ComputeResource{}).GetAccessor(&tools.APIRequest{Caller: caller})
for _, link := range wfa.Graph.Links {
if ok, compute_id := link.IsComputeLink(*wfa.Graph); ok { // check if the link is a link between a compute and a resource
compute, code, _ := accessor.LoadOne(compute_id)
if code != 200 {
continue
}
// CHECK BOOKING ON PEER, compute could be a remote one
peerID := compute.(*resources.ComputeResource).CreatorID
if peerID == "" {
return false, errors.New("no peer id")
} // no peer id no booking, we need to know where to book
_, err := (&peer.Peer{}).LaunchPeerExecution(peerID, compute_id, tools.BOOKING, tools.GET, nil, caller)
if err != nil {
return false, err
}
}
}
return true, nil
}
// preemptDelay is the minimum lead time granted before a preempted booking starts.
const preemptDelay = 30 * time.Second
// Planify computes the scheduled start/end for every resource in the workflow.
//
// bookingMode controls availability checking when p (a live planner snapshot) is provided:
// - PREEMPTED : start from now+preemptDelay regardless of existing load.
// - WHEN_POSSIBLE: start from max(now, start); if a slot conflicts, slide to the next free window.
// - PLANNED : use start as-is; return an error if the slot is not available.
//
// Passing p = nil skips all availability checks (useful for sub-workflow recursion).
func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigItem, partnerships ConfigItem, buyings ConfigItem, strategies ConfigItem, bookingMode int, p planner.PlannerITF, request *tools.APIRequest) (bool, float64, map[tools.DataType]map[string]pricing.PricedItemITF, *Workflow, error) {
// 1. Adjust global start based on booking mode.
now := time.Now()
switch booking.BookingMode(bookingMode) {
case booking.PREEMPTED:
if earliest := now.Add(preemptDelay); start.Before(earliest) {
start = earliest
}
case booking.WHEN_POSSIBLE:
if start.Before(now) {
start = now
}
// PLANNED: honour the caller's start date as-is.
}
priceds := map[tools.DataType]map[string]pricing.PricedItemITF{}
var err error
// 2. Plan processings and services first so we can derive the total workflow duration.
// Services in DEPLOYMENT mode return duration=-1 (open-ended); HOSTED mode returns a bounded call window.
ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, wf.Graph.IsProcessing,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
d, err := wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(),
*instances.Get(res.GetID()), *partnerships.Get(res.GetID()), *buyings.Get(res.GetID()), *strategies.Get(res.GetID()),
bookingMode, request)
if err != nil {
return start, 0, err
}
return start.Add(time.Duration(d) * time.Second), priced.GetExplicitDurationInS(), nil
}, func(started time.Time, duration float64) (*time.Time, error) {
s := started.Add(time.Duration(duration) * time.Second)
return &s, nil
})
if err != nil {
return false, 0, priceds, nil, err
}
if _, priceds, err = plan[*resources.ServiceResource](tools.SERVICE_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, wf.Graph.IsService,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
d, err := wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(),
*instances.Get(res.GetID()), *partnerships.Get(res.GetID()), *buyings.Get(res.GetID()), *strategies.Get(res.GetID()),
bookingMode, request)
if err != nil {
return start, 0, err
}
return start.Add(time.Duration(d) * time.Second), priced.GetExplicitDurationInS(), nil
}, func(started time.Time, duration float64) (*time.Time, error) {
if duration < 0 {
return nil, nil // DEPLOYMENT mode: open-ended
}
s := started.Add(time.Duration(duration) * time.Second)
return &s, nil
}); err != nil {
return false, 0, priceds, nil, err
}
// Total workflow duration used as the booking window for compute/storage.
// Returns -1 if any processing is a service (open-ended).
workflowDuration := common.GetPlannerLongestTime(priceds)
if _, priceds, err = plan[resources.ResourceInterface](tools.NATIVE_TOOL, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request,
wf.Graph.IsNativeTool, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
return start, 0, nil
}, func(started time.Time, duration float64) (*time.Time, error) {
return end, nil
}); err != nil {
return false, 0, priceds, nil, err
}
if _, priceds, err = plan[resources.ResourceInterface](tools.DATA_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request,
wf.Graph.IsData, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
return start, 0, nil
}, func(started time.Time, duration float64) (*time.Time, error) {
return end, nil
}); err != nil {
return false, 0, priceds, nil, err
}
// 3. Compute/storage: duration = total workflow duration (conservative bound).
for k, f := range map[tools.DataType]func(graph.GraphItem) bool{tools.STORAGE_RESOURCE: wf.Graph.IsStorage,
tools.COMPUTE_RESOURCE: wf.Graph.IsCompute} {
if _, priceds, err = plan[resources.ResourceInterface](k, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request,
f, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
nearestStart, _, err := wf.Graph.GetAverageTimeRelatedToProcessingActivity(ps, res, func(i graph.GraphItem) (r resources.ResourceInterface) {
if f(i) {
_, r = i.GetResource()
}
return r
}, *instances.Get(res.GetID()), *partnerships.Get(res.GetID()),
*buyings.Get(res.GetID()), *strategies.Get(res.GetID()), bookingMode, request)
if err != nil {
return start, workflowDuration, err
}
return start.Add(time.Duration(nearestStart) * time.Second), workflowDuration, nil
}, func(started time.Time, duration float64) (*time.Time, error) {
if duration < 0 {
return nil, nil // service: open-ended booking
}
s := started.Add(time.Duration(duration) * time.Second)
return &s, nil
}); err != nil {
return false, 0, priceds, nil, err
}
}
longest := workflowDuration
if _, priceds, err = plan[resources.ResourceInterface](tools.WORKFLOW_RESOURCE, instances, partnerships, buyings, strategies,
bookingMode, wf, priceds, request, wf.Graph.IsWorkflow,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
start := start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds)) * time.Second)
longest := float64(-1)
r, code, err := res.GetAccessor(request).LoadOne(res.GetID())
if code != 200 || err != nil {
return start, longest, err
}
_, neoLongest, priceds2, _, err := r.(*Workflow).Planify(start, end, instances, partnerships, buyings, strategies, bookingMode, nil, request)
// should ... import priced
if err != nil {
return start, longest, err
} else if neoLongest > longest {
longest = neoLongest
}
for k, v := range priceds2 {
if priceds[k] == nil {
priceds[k] = map[string]pricing.PricedItemITF{}
}
for k2, v2 := range v {
if priceds[k][k2] != nil {
v2.AddQuantity(priceds[k][k2].GetQuantity())
}
}
}
return start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds)) * time.Second), longest, nil
}, func(start time.Time, longest float64) (*time.Time, error) {
s := start.Add(time.Duration(longest) * time.Second)
return &s, nil
}); err != nil {
return false, 0, priceds, nil, err
}
// 4. Availability check against the live planner (skipped for PREEMPTED and sub-workflows).
if p != nil && booking.BookingMode(bookingMode) != booking.PREEMPTED {
slide, err := plannerAvailabilitySlide(p, priceds, booking.BookingMode(bookingMode))
if err != nil {
return false, 0, priceds, nil, err
}
if slide > 0 {
// Re-plan from the corrected start; pass nil planner to avoid infinite recursion.
return wf.Planify(start.Add(slide), end, instances, partnerships, buyings, strategies, bookingMode, nil, request)
}
}
isPreemptible := true
for _, first := range wf.GetFirstItems() {
_, res := first.GetResource()
if res.GetBookingModes()[booking.PREEMPTED] == nil {
isPreemptible = false
break
}
}
return isPreemptible, longest, priceds, wf, nil
}
// plannerAvailabilitySlide checks all compute/storage resources in priceds against the planner.
// For PLANNED mode it returns an error immediately on the first conflict.
// For WHEN_POSSIBLE it returns the maximum slide (duration to add to global start) needed to
// clear all conflicts, or 0 if the plan is already conflict-free.
func plannerAvailabilitySlide(p planner.PlannerITF, priceds map[tools.DataType]map[string]pricing.PricedItemITF, mode booking.BookingMode) (time.Duration, error) {
maxSlide := time.Duration(0)
for _, dt := range []tools.DataType{tools.COMPUTE_RESOURCE, tools.STORAGE_RESOURCE} {
for _, priced := range priceds[dt] {
locStart := priced.GetLocationStart()
locEnd := priced.GetLocationEnd()
if locStart == nil || locEnd == nil {
continue // open-ended: skip availability check
}
d := locEnd.Sub(*locStart)
next := p.NextAvailableStart(priced.GetID(), priced.GetInstanceID(), *locStart, d)
slide := next.Sub(*locStart)
if slide <= 0 {
continue
}
if mode == booking.PLANNED {
return 0, errors.New("requested slot is not available for resource " + priced.GetID())
}
if slide > maxSlide {
maxSlide = slide
}
}
}
return maxSlide, nil
}
// Returns a map of DataType (processing,computing,data,storage,worfklow) where each resource (identified by its UUID)
// is mapped to the list of its items (different appearance) in the graph
// ex: if the same Minio storage is represented by several nodes in the graph, in [tools.STORAGE_RESSOURCE] its UUID will be mapped to
// the list of GraphItem ID that correspond to the ID of each node
func (w *Workflow) GetItemsByResources() map[tools.DataType]map[string][]string {
res := make(map[tools.DataType]map[string][]string)
dtMethodMap := map[tools.DataType]func() []graph.GraphItem{
tools.STORAGE_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsStorage) },
tools.DATA_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsData) },
tools.COMPUTE_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsCompute) },
tools.PROCESSING_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsProcessing) },
tools.SERVICE_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsService) },
tools.WORKFLOW_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsWorkflow) },
tools.DYNAMIC_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsDynamic) },
}
for dt, meth := range dtMethodMap {
res[dt] = make(map[string][]string)
items := meth()
for _, i := range items {
_, r := i.GetResource()
rId := r.GetID()
res[dt][rId] = append(res[dt][rId], i.ID)
}
}
return res
}
func plan[T resources.ResourceInterface](
dt tools.DataType, instances ConfigItem, partnerships ConfigItem, buyings ConfigItem, strategies ConfigItem, bookingMode int, wf *Workflow, priceds map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest,
f func(graph.GraphItem) bool,
start func(resources.ResourceInterface, pricing.PricedItemITF) (time.Time, float64, error),
end func(time.Time, float64) (*time.Time, error)) ([]T, map[tools.DataType]map[string]pricing.PricedItemITF, error) {
resources := []T{}
for _, item := range wf.GetGraphItems(f) {
if priceds[dt] == nil {
priceds[dt] = map[string]pricing.PricedItemITF{}
}
dt, realItem := item.GetResource()
if realItem == nil {
return resources, priceds, errors.New("could not load the processing resource")
}
priced, err := realItem.ConvertToPricedResource(dt, instances.Get(realItem.GetID()),
partnerships.Get(realItem.GetID()), buyings.Get(realItem.GetID()), strategies.Get(realItem.GetID()), &bookingMode, request)
if err != nil {
return resources, priceds, err
}
// Should be commented once the Pricing selection feature has been implemented, related to the commit d35ad440fa77763ec7f49ab34a85e47e75581b61
// if priced.SelectPricing() == nil {
// return resources, priceds, errors.New("no pricings are selected... can't proceed")
// }
started, duration, err := start(realItem, priced)
if err != nil {
return resources, priceds, err
}
priced.SetLocationStart(started)
if duration >= 0 {
if e, err := end(started, duration); err == nil && e != nil {
priced.SetLocationEnd(*e)
}
}
resources = append(resources, realItem.(T))
if priceds[dt][item.ID] != nil {
priced.AddQuantity(priceds[dt][item.ID].GetQuantity())
}
priceds[dt][item.ID] = priced
}
return resources, priceds, nil
}
// ── Integrity validation ─────────────────────────────────────────────────────
// Arrow direction constants matching the flutter_flow_chart ArrowDirection enum
// (index order: forward=0, backward=1, bidirectionnal=2).
const (
arrowDirectionBackward int64 = 1
)
// ViolationSeverity distinguishes blocking errors from non-blocking warnings.
type ViolationSeverity int
const (
SeverityError ViolationSeverity = iota // Blocks scheduling — must be fixed.
SeverityWarning // Reported but non-blocking.
)
// ViolationType identifies the category of the violation.
// Mirrors the TopologyErrorType / TopologyWarningType enums in oc-front.
type ViolationType string
const (
// Errors — block scheduling
ViolationVariableNotFound ViolationType = "variable_not_found"
ViolationMissingComputeUnit ViolationType = "missing_compute_unit"
ViolationCycle ViolationType = "cycle"
ViolationMissingDataStorage ViolationType = "missing_data_storage"
ViolationRequiredOutputMissing ViolationType = "required_output_missing"
// Warnings — non-blocking, reported for UX
ViolationInvertedArrow ViolationType = "inverted_arrow"
ViolationIsolatedProcessing ViolationType = "isolated_processing"
ViolationStorageNotLinkedToProcessing ViolationType = "storage_not_linked_to_processing"
)
// IntegrityViolation describes a single structural or semantic problem
// found in the workflow graph.
type IntegrityViolation struct {
Severity ViolationSeverity
Type ViolationType
ItemIDs []string // graph item IDs involved in the violation
Message string
}
func (v IntegrityViolation) IsError() bool { return v.Severity == SeverityError }
func (v IntegrityViolation) IsWarning() bool { return v.Severity == SeverityWarning }
// ValidateIntegrity checks the structural and semantic integrity of the workflow
// graph. It must be called by both oc-front (UX enforcement) and oc-schedulerd
// (sovereign enforcement, regardless of submission source — the front can be
// bypassed via direct API calls).
//
// Errors (block scheduling):
// 1. Variable not found — an arg references $varName not defined in env/inputs.
// 2. Missing compute — a Processing/non-HOSTED Service has no Compute linked.
// 3. Cycle — the processing DAG contains a directed cycle.
// 4. Missing data storage — a Data with Source has no Storage linked.
//
// Warnings (non-blocking):
// 5. Inverted arrow — a backward link between two processing nodes.
// 6. Isolated processing — a processing node with no processing neighbours.
// 7. Storage not linked to processing — a storage node orphaned from any processing.
func (w *Workflow) ValidateIntegrity() []IntegrityViolation {
var violations []IntegrityViolation
violations = append(violations, w.validateVariables()...)
violations = append(violations, w.validateRequiredInputs()...)
violations = append(violations, w.validateComputeLinks()...)
violations = append(violations, w.detectCycles()...)
violations = append(violations, w.validateDataStorageLinks()...)
violations = append(violations, w.detectInvertedArrows()...)
violations = append(violations, w.detectIsolatedProcessings()...)
violations = append(violations, w.detectOrphanedStorages()...)
return violations
}
// HasCriticalViolations returns true when ValidateIntegrity found at least one Error.
// oc-schedulerd uses this to reject a workflow without inspecting each violation.
func (w *Workflow) HasCriticalViolations() bool {
for _, v := range w.ValidateIntegrity() {
if v.IsError() {
return true
}
}
return false
}
// itemName returns a human-readable name for a graph item (falls back to itemID).
func (w *Workflow) itemName(itemID string) string {
item, ok := w.Graph.Items[itemID]
if !ok {
return itemID
}
_, res := item.GetResource()
if res != nil {
return res.GetName()
}
return itemID
}
// validateVariables checks that every $varName reference inside w.Args is
// defined in the corresponding element's env or inputs — mirroring
// WorkflowFactory.validateArgs() in oc-front.
var varRefPattern = regexp.MustCompile(`\$\{?([A-Za-z_][A-Za-z0-9_]*)\}?`)
func (w *Workflow) validateVariables() []IntegrityViolation {
var violations []IntegrityViolation
for itemID, argList := range w.Args {
if len(argList) == 0 {
continue
}
available := map[string]struct{}{}
for _, p := range w.Env[itemID] {
if p.Name != "" {
available[p.Name] = struct{}{}
}
}
for _, p := range w.Inputs[itemID] {
if p.Name != "" {
available[p.Name] = struct{}{}
}
}
name := w.itemName(itemID)
for _, arg := range argList {
for _, m := range varRefPattern.FindAllStringSubmatch(arg, -1) {
varName := m[1]
if _, ok := available[varName]; !ok {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationVariableNotFound,
ItemIDs: []string{itemID},
Message: fmt.Sprintf(`"%s": arg "%s" → variable $%s is not defined in env or inputs`, name, arg, varName),
})
}
}
}
}
return violations
}
// validateComputeLinks checks that every Processing node (and every non-HOSTED
// Service node) has at least one Compute linked — mirroring the computeErrors
// block in oc-front's checkTopology().
func (w *Workflow) validateComputeLinks() []IntegrityViolation {
var violations []IntegrityViolation
for id, item := range w.Graph.Items {
needsCompute := false
var name string
switch {
case w.Graph.IsProcessing(item) && item.Processing != nil:
// IsService processings are long-running services and don't need a Compute booking.
if item.Processing.IsService {
continue
}
needsCompute = true
name = item.Processing.GetName()
case w.Graph.IsService(item) && item.Service != nil:
// HOSTED services use an existing endpoint — no Compute booking needed.
inst := item.Service.GetSelectedInstance(nil)
if inst != nil {
if si, ok := inst.(*resources.ServiceInstance); ok && si.Mode == resources.HOSTED {
continue
}
}
needsCompute = true
name = item.Service.GetName()
}
if !needsCompute {
continue
}
hasCompute := false
for _, link := range w.Graph.Links {
var otherID string
if link.Source.ID == id {
otherID = link.Destination.ID
} else if link.Destination.ID == id {
otherID = link.Source.ID
} else {
continue
}
if other, ok := w.Graph.Items[otherID]; ok && w.Graph.IsCompute(other) {
hasCompute = true
break
}
}
if !hasCompute {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationMissingComputeUnit,
ItemIDs: []string{id},
Message: fmt.Sprintf(`"%s" has no compute unit linked`, name),
})
}
}
return violations
}
// detectCycles runs DFS colouring on the processing→processing directed graph
// and reports any back-edge as a cycle error — mirroring dfsCycle() in oc-front.
func (w *Workflow) detectCycles() []IntegrityViolation {
var violations []IntegrityViolation
// Collect processing + service + event node IDs (execution flux nodes).
procIDs := map[string]struct{}{}
for id, item := range w.Graph.Items {
if w.Graph.IsProcessing(item) || w.Graph.IsService(item) || w.Graph.IsNativeTool(item) {
procIDs[id] = struct{}{}
}
}
// Build directed successors honoring ArrowDirection.
successors := map[string][]string{}
for id := range procIDs {
successors[id] = []string{}
}
for _, link := range w.Graph.Links {
src, dst := link.Source.ID, link.Destination.ID
_, srcIsProc := procIDs[src]
_, dstIsProc := procIDs[dst]
if !srcIsProc || !dstIsProc {
continue
}
dir := int64(0)
if link.Style != nil {
dir = link.Style.ArrowDirection
}
if dir == arrowDirectionBackward {
// Visual arrow reversed: dst runs before src.
successors[dst] = append(successors[dst], src)
} else {
successors[src] = append(successors[src], dst)
}
}
// DFS colouring: 0=white, 1=grey (in stack), 2=black (done).
color := map[string]int{}
reported := map[string]struct{}{}
var dfs func(u string)
dfs = func(u string) {
color[u] = 1
for _, v := range successors[u] {
if color[v] == 1 {
key := u + "→" + v
if _, seen := reported[key]; !seen {
reported[key] = struct{}{}
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationCycle,
ItemIDs: []string{u, v},
Message: fmt.Sprintf(`Infinite loop: "%s" → "%s" creates a cycle that would block execution indefinitely`,
w.itemName(u), w.itemName(v)),
})
}
} else if color[v] == 0 {
dfs(v)
}
}
color[u] = 2
}
for id := range procIDs {
if color[id] == 0 {
dfs(id)
}
}
return violations
}
// validateDataStorageLinks checks that every Data item with a non-empty Source
// has at least one Storage linked — the builder needs this to inject the
// download step (curl or NATS/Minio protocol).
func (w *Workflow) validateDataStorageLinks() []IntegrityViolation {
var violations []IntegrityViolation
dataStorageLinks := w.Graph.GetDataStorageLinks()
linkedStorage := map[string]struct{}{}
for _, dsl := range dataStorageLinks {
linkedStorage[dsl.DataItemID] = struct{}{}
}
for id, item := range w.Graph.Items {
if !w.Graph.IsData(item) || item.Data == nil {
continue
}
hasSource := false
for _, inst := range item.Data.Instances {
if inst.Access.HasSource() {
hasSource = true
break
}
}
if !hasSource {
continue
}
if _, ok := linkedStorage[id]; !ok {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationMissingDataStorage,
ItemIDs: []string{id},
Message: fmt.Sprintf(`data "%s" has a source but no Storage linked`, item.Data.GetName()),
})
}
}
return violations
}
// validateRequiredInputs checks that for each processing node with a required
// input, every immediate predecessor outputs a parameter with that name.
// Mirrors the requiredOutputMissing check in oc-front's checkTopology().
func (w *Workflow) validateRequiredInputs() []IntegrityViolation {
var violations []IntegrityViolation
procIDs := map[string]struct{}{}
for id, item := range w.Graph.Items {
if w.Graph.IsProcessing(item) || w.Graph.IsService(item) || w.Graph.IsNativeTool(item) {
procIDs[id] = struct{}{}
}
}
// Build direct predecessors map.
predecessors := map[string][]string{}
for id := range procIDs {
predecessors[id] = []string{}
}
for _, link := range w.Graph.Links {
src, dst := link.Source.ID, link.Destination.ID
_, srcIsProc := procIDs[src]
_, dstIsProc := procIDs[dst]
if !srcIsProc || !dstIsProc {
continue
}
dir := int64(0)
if link.Style != nil {
dir = link.Style.ArrowDirection
}
if dir == arrowDirectionBackward {
predecessors[src] = append(predecessors[src], dst)
} else {
predecessors[dst] = append(predecessors[dst], src)
}
}
for id, reqInputs := range w.Inputs {
if _, isProc := procIDs[id]; !isProc {
continue
}
for _, inp := range reqInputs {
if !inp.Required || inp.Name == "" {
continue
}
for _, predID := range predecessors[id] {
if !w.nodeHasOutput(predID, inp.Name) {
violations = append(violations, IntegrityViolation{
Severity: SeverityError,
Type: ViolationRequiredOutputMissing,
ItemIDs: []string{id, predID},
Message: fmt.Sprintf(
`"%s" requires input "%s" but "%s" does not output it`,
w.itemName(id), inp.Name, w.itemName(predID),
),
})
}
}
}
}
return violations
}
// nodeHasOutput returns true if the given node outputs a parameter named name,
// either via workflow-level outputs or its resource's own outputs.
func (w *Workflow) nodeHasOutput(nodeID, name string) bool {
for _, p := range w.Outputs[nodeID] {
if p.Name == name {
return true
}
}
item, ok := w.Graph.Items[nodeID]
if !ok {
return false
}
var res resources.ResourceInterface
switch {
case item.Processing != nil:
res = item.Processing
case item.Service != nil:
res = item.Service
}
if res != nil {
for _, p := range res.GetOutputs() {
if p.Name == name {
return true
}
}
}
return false
}
// detectInvertedArrows warns when a link between two processing nodes uses a
// backward arrow direction — mirroring the invertedArrow warning in oc-front.
func (w *Workflow) detectInvertedArrows() []IntegrityViolation {
var violations []IntegrityViolation
for _, link := range w.Graph.Links {
if link.Style == nil || link.Style.ArrowDirection != arrowDirectionBackward {
continue
}
srcItem, srcOK := w.Graph.Items[link.Source.ID]
dstItem, dstOK := w.Graph.Items[link.Destination.ID]
if !srcOK || !dstOK {
continue
}
if (w.Graph.IsProcessing(srcItem) || w.Graph.IsService(srcItem)) &&
(w.Graph.IsProcessing(dstItem) || w.Graph.IsService(dstItem)) {
violations = append(violations, IntegrityViolation{
Severity: SeverityWarning,
Type: ViolationInvertedArrow,
ItemIDs: []string{link.Source.ID, link.Destination.ID},
Message: fmt.Sprintf(`Reversed arrow between "%s" & "%s": "%s" will execute before "%s" unexpectedly`,
w.itemName(link.Destination.ID), w.itemName(link.Source.ID),
w.itemName(link.Destination.ID), w.itemName(link.Source.ID)),
})
}
}
return violations
}
// detectIsolatedProcessings warns when a processing node has no link to another
// processing node — it will execute synchronously with the workflow's first elements.
func (w *Workflow) detectIsolatedProcessings() []IntegrityViolation {
var violations []IntegrityViolation
procIDs := map[string]struct{}{}
for id, item := range w.Graph.Items {
if w.Graph.IsProcessing(item) || w.Graph.IsService(item) || w.Graph.IsNativeTool(item) {
procIDs[id] = struct{}{}
}
}
for id := range procIDs {
hasProcNeighbour := false
for _, link := range w.Graph.Links {
var otherID string
if link.Source.ID == id {
otherID = link.Destination.ID
} else if link.Destination.ID == id {
otherID = link.Source.ID
} else {
continue
}
if _, ok := procIDs[otherID]; ok {
hasProcNeighbour = true
break
}
}
if !hasProcNeighbour {
violations = append(violations, IntegrityViolation{
Severity: SeverityWarning,
Type: ViolationIsolatedProcessing,
ItemIDs: []string{id},
Message: fmt.Sprintf(`"%s" is isolated (no connection with another processing) — will execute synchronously with the workflow's first element(s)`,
w.itemName(id)),
})
}
}
return violations
}
// detectOrphanedStorages warns when a storage node is not linked to any
// processing node — it contributes no data flow to the workflow.
func (w *Workflow) detectOrphanedStorages() []IntegrityViolation {
var violations []IntegrityViolation
for id, item := range w.Graph.Items {
if !w.Graph.IsStorage(item) {
continue
}
linkedTopics := map[string]struct{}{}
for _, link := range w.Graph.Links {
var otherID string
if link.Source.ID == id {
otherID = link.Destination.ID
} else if link.Destination.ID == id {
otherID = link.Source.ID
} else {
continue
}
if other, ok := w.Graph.Items[otherID]; ok {
switch {
case w.Graph.IsProcessing(other):
linkedTopics["processing"] = struct{}{}
case w.Graph.IsCompute(other):
linkedTopics["compute"] = struct{}{}
case w.Graph.IsData(other):
linkedTopics["data"] = struct{}{}
case w.Graph.IsService(other):
linkedTopics["service"] = struct{}{}
}
}
}
if _, ok := linkedTopics["processing"]; ok {
continue
}
name := w.itemName(id)
var msg string
if len(linkedTopics) == 0 {
msg = fmt.Sprintf(`"%s" is isolated (not linked to anything)`, name)
} else {
topics := make([]string, 0, len(linkedTopics))
for t := range linkedTopics {
topics = append(topics, t)
}
msg = fmt.Sprintf(`"%s" is not linked to any processing (only linked to: %s)`, name, strings.Join(topics, ", "))
}
violations = append(violations, IntegrityViolation{
Severity: SeverityWarning,
Type: ViolationStorageNotLinkedToProcessing,
ItemIDs: []string{id},
Message: msg,
})
}
return violations
}