2024-08-30 09:14:03 +02:00
package workflow
2024-07-19 10:54:58 +02:00
import (
2026-01-12 11:59:05 +01:00
"bufio"
"encoding/json"
2024-08-12 16:11:25 +02:00
"errors"
2026-01-12 11:59:05 +01:00
"fmt"
2026-01-12 14:26:29 +01:00
"mime/multipart"
2026-05-27 15:50:23 +02:00
"regexp"
2026-03-18 08:30:02 +01:00
"strconv"
2026-01-12 11:59:05 +01:00
"strings"
2024-12-12 16:25:47 +01:00
"time"
2024-07-19 10:54:58 +02:00
2026-06-02 08:42:20 +02:00
"cloud.o-forge.io/core/oc-lib/dbs"
2026-01-13 16:04:31 +01:00
"cloud.o-forge.io/core/oc-lib/models/booking"
2026-03-20 16:14:07 +01:00
"cloud.o-forge.io/core/oc-lib/models/booking/planner"
2024-12-03 10:57:28 +01:00
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
2025-01-15 10:56:44 +01:00
"cloud.o-forge.io/core/oc-lib/models/common"
2026-02-18 12:24:19 +01:00
"cloud.o-forge.io/core/oc-lib/models/common/models"
2024-12-12 16:25:47 +01:00
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
2026-03-17 16:09:39 +01:00
"cloud.o-forge.io/core/oc-lib/models/live"
2024-08-13 09:49:42 +02:00
"cloud.o-forge.io/core/oc-lib/models/peer"
2024-07-26 10:36:23 +02:00
"cloud.o-forge.io/core/oc-lib/models/resources"
2026-03-18 08:30:02 +01:00
"cloud.o-forge.io/core/oc-lib/models/resources/native_tools"
2024-07-19 10:54:58 +02:00
"cloud.o-forge.io/core/oc-lib/models/utils"
2024-11-28 16:49:41 +01:00
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
2024-08-12 16:11:25 +02:00
"cloud.o-forge.io/core/oc-lib/tools"
2026-01-12 11:59:05 +01:00
"github.com/google/uuid"
2024-07-19 10:54:58 +02:00
)
2026-01-13 16:04:31 +01:00
type ConfigItem map [ string ] int
func ( c ConfigItem ) Get ( key string ) * int {
i := 0
if ins , ok := c [ key ] ; ok {
i = ins
}
return & i
}
2024-08-30 14:50:48 +02:00
/*
2025-01-15 10:56:44 +01:00
* Workflow is a struct that represents a workflow
* it defines the native workflow
2024-08-30 14:50:48 +02:00
*/
2025-01-15 10:56:44 +01:00
type Workflow struct {
utils . AbstractObject // AbstractObject contains the basic fields of an object (id, name)
2024-07-26 10:36:23 +02:00
resources . ResourceSet
2026-04-02 14:31:19 +02:00
Graph * graph . Graph ` bson:"graph,omitempty" json:"graph,omitempty" ` // Graph UI & logic representation of the workflow
2024-12-12 16:25:47 +01:00
// Schedule *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"` // Schedule is the schedule of the workflow
2026-04-02 14:31:19 +02:00
Shared [ ] string ` json:"shared,omitempty" bson:"shared,omitempty" ` // Shared is the ID of the shared workflow // AbstractWorkflow contains the basic fields of a workflow
2026-04-28 08:55:08 +02:00
Env map [ string ] [ ] models . Param ` json:"env" bson:"env" `
Inputs map [ string ] [ ] models . Param ` json:"inputs" bson:"inputs" `
Outputs map [ string ] [ ] models . Param ` json:"outputs" bson:"outputs" `
Args map [ string ] [ ] string ` json:"args" bson:"args" `
Exposes map [ string ] [ ] models . Expose ` bson:"exposes" json:"exposes" ` // Expose is the execution
2026-06-03 15:33:39 +02:00
SelectedEmbeddedStorages map [ string ] * resources . EmbeddedStorageSelection ` json:"selected_embedded_storages,omitempty" bson:"selected_embedded_storages,omitempty" `
2024-07-26 10:36:23 +02:00
}
2024-12-12 16:25:47 +01:00
func ( d * Workflow ) GetAccessor ( request * tools . APIRequest ) utils . Accessor {
return NewAccessor ( request ) // Create a new instance of the accessor
}
2026-01-12 11:59:05 +01:00
func ( d * Workflow ) GetResources ( dt tools . DataType ) [ ] resources . ResourceInterface {
itf := [ ] resources . ResourceInterface { }
switch dt {
2026-01-13 16:04:31 +01:00
case tools . NATIVE_TOOL :
for _ , d := range d . NativeTools {
itf = append ( itf , d )
}
return itf
2026-01-12 11:59:05 +01:00
case tools . DATA_RESOURCE :
for _ , d := range d . DataResources {
itf = append ( itf , d )
}
return itf
case tools . PROCESSING_RESOURCE :
for _ , d := range d . ProcessingResources {
itf = append ( itf , d )
}
return itf
case tools . COMPUTE_RESOURCE :
for _ , d := range d . ComputeResources {
itf = append ( itf , d )
}
return itf
case tools . WORKFLOW_RESOURCE :
for _ , d := range d . WorkflowResources {
itf = append ( itf , d )
}
return itf
case tools . STORAGE_RESOURCE :
for _ , d := range d . StorageResources {
itf = append ( itf , d )
}
return itf
2026-04-23 09:24:02 +02:00
case tools . SERVICE_RESOURCE :
for _ , d := range d . ServiceResources {
itf = append ( itf , d )
}
return itf
2026-04-28 13:24:25 +02:00
case tools . DYNAMIC_RESOURCE :
for _ , d := range d . DynamicResources {
itf = append ( itf , d )
}
return itf
2026-01-12 11:59:05 +01:00
}
2026-04-28 13:24:25 +02:00
2026-01-12 11:59:05 +01:00
return itf
}
2026-06-02 08:42:20 +02:00
func ( d * Workflow ) ExtractFromPlantUML ( plantUML multipart . File , request * tools . APIRequest ) ( * Workflow , [ ] string , error ) {
2026-01-12 11:59:05 +01:00
if plantUML == nil {
2026-06-02 08:42:20 +02:00
return d , nil , errors . New ( "no file available to export" )
2026-01-12 11:59:05 +01:00
}
2026-01-12 14:26:29 +01:00
2026-01-12 11:59:05 +01:00
defer plantUML . Close ( )
d . Datas = [ ] string { }
d . Storages = [ ] string { }
d . Processings = [ ] string { }
d . Computes = [ ] string { }
d . Workflows = [ ] string { }
2026-04-28 13:24:25 +02:00
d . Dynamics = [ ] string { }
d . Services = [ ] string { }
2026-01-12 11:59:05 +01:00
d . DataResources = [ ] * resources . DataResource { }
d . StorageResources = [ ] * resources . StorageResource { }
d . ProcessingResources = [ ] * resources . ProcessingResource { }
d . ComputeResources = [ ] * resources . ComputeResource { }
d . WorkflowResources = [ ] * resources . WorkflowResource { }
2026-04-28 13:24:25 +02:00
d . DynamicResources = [ ] * resources . DynamicResource { }
d . ServiceResources = [ ] * resources . ServiceResource { }
2026-01-12 11:59:05 +01:00
d . Graph = graph . NewGraph ( )
resourceCatalog := map [ string ] func ( ) resources . ResourceInterface {
"Processing" : func ( ) resources . ResourceInterface {
return & resources . ProcessingResource {
AbstractInstanciatedResource : resources . AbstractInstanciatedResource [ * resources . ProcessingInstance ] {
Instances : [ ] * resources . ProcessingInstance { } ,
} ,
}
} ,
"Storage" : func ( ) resources . ResourceInterface {
return & resources . StorageResource {
AbstractInstanciatedResource : resources . AbstractInstanciatedResource [ * resources . StorageResourceInstance ] {
Instances : [ ] * resources . StorageResourceInstance { } ,
} ,
}
} ,
"Data" : func ( ) resources . ResourceInterface {
return & resources . DataResource {
AbstractInstanciatedResource : resources . AbstractInstanciatedResource [ * resources . DataInstance ] {
Instances : [ ] * resources . DataInstance { } ,
} ,
}
} ,
"ComputeUnit" : func ( ) resources . ResourceInterface {
return & resources . ComputeResource {
AbstractInstanciatedResource : resources . AbstractInstanciatedResource [ * resources . ComputeResourceInstance ] {
Instances : [ ] * resources . ComputeResourceInstance { } ,
} ,
}
} ,
2026-04-28 13:24:25 +02:00
"Service" : func ( ) resources . ResourceInterface {
return & resources . ServiceResource {
AbstractInstanciatedResource : resources . AbstractInstanciatedResource [ * resources . ServiceInstance ] {
Instances : [ ] * resources . ServiceInstance { } ,
} ,
}
} ,
"Dynamic" : func ( ) resources . ResourceInterface {
return & resources . DynamicResource { }
} ,
2026-03-18 08:30:02 +01:00
// WorkflowEvent creates a NativeTool of Kind=WORKFLOW_EVENT directly,
// without DB lookup. It has no user-defined instance.
"WorkflowEvent" : func ( ) resources . ResourceInterface {
return & resources . NativeTool {
Kind : int ( native_tools . WORKFLOW_EVENT ) ,
}
} ,
2026-01-12 11:59:05 +01:00
}
2026-03-17 14:26:11 +01:00
graphVarName := map [ string ] graph . GraphItem { }
2026-01-12 11:59:05 +01:00
2026-03-18 09:41:09 +01:00
// Collect all lines first to support look-ahead (comment on the line after
// the declaration, as produced by ToPlantUML).
scanner := bufio . NewScanner ( plantUML )
var lines [ ] string
2026-01-12 11:59:05 +01:00
for scanner . Scan ( ) {
2026-03-18 09:41:09 +01:00
lines = append ( lines , scanner . Text ( ) )
}
if err := scanner . Err ( ) ; err != nil {
2026-06-02 08:42:20 +02:00
return d , nil , err
2026-03-18 09:41:09 +01:00
}
2026-06-02 08:42:20 +02:00
var warnings [ ] string
2026-03-18 09:41:09 +01:00
for i , line := range lines {
trimmed := strings . TrimSpace ( line )
// Skip pure comment lines and PlantUML directives — they must never be
// parsed as resource declarations or links. Without this guard, a comment
// like "' source: http://my-server.com" would match the "-" link check.
if strings . HasPrefix ( trimmed , "'" ) ||
strings . HasPrefix ( trimmed , "!" ) ||
strings . HasPrefix ( trimmed , "@" ) ||
trimmed == "" {
continue
}
// Build the parse line: if the current line has no inline comment and the
// next line is a pure comment, append it so parsers receive one combined line.
// Also handles the legacy inline-comment format unchanged.
parseLine := line
if ! strings . Contains ( line , "'" ) && i + 1 < len ( lines ) {
if next := strings . TrimSpace ( lines [ i + 1 ] ) ; strings . HasPrefix ( next , "'" ) {
parseLine = line + " " + next
}
}
2026-06-01 15:01:45 +02:00
// Handle links outside the catalog loop: each link line must be processed
// exactly once (the catalog loop would otherwise call extractLink once per
// catalog entry, producing N× 7 duplicate links in the graph).
if strings . Contains ( line , "-->" ) {
if err := d . extractLink ( parseLine , graphVarName , "-->" , false ) ; err != nil {
fmt . Println ( err )
}
continue
}
if strings . Contains ( line , "<--" ) {
if err := d . extractLink ( parseLine , graphVarName , "<--" , true ) ; err != nil {
fmt . Println ( err )
}
continue
}
if strings . Contains ( line , "--" ) {
if err := d . extractLink ( parseLine , graphVarName , "--" , false ) ; err != nil {
fmt . Println ( err )
}
continue
}
2026-06-02 08:42:20 +02:00
for n , newFn := range resourceCatalog {
2026-06-01 15:01:45 +02:00
if strings . Contains ( line , n + "(" ) && ! strings . Contains ( line , "!procedure" ) && ! strings . Contains ( line , "!define" ) {
2026-06-02 08:42:20 +02:00
newRes := newFn ( )
2026-03-17 14:17:49 +01:00
newRes . SetID ( uuid . New ( ) . String ( ) )
2026-06-02 08:42:20 +02:00
varName , graphItem , warns , err := d . extractResourcePlantUML ( parseLine , newRes , n , request . PeerID , request )
2026-01-12 11:59:05 +01:00
if err != nil {
2026-06-02 08:42:20 +02:00
return d , warnings , err
2026-01-12 11:59:05 +01:00
}
2026-06-02 08:42:20 +02:00
warnings = append ( warnings , warns ... )
2026-03-17 14:26:11 +01:00
if graphItem != nil {
graphVarName [ varName ] = * graphItem
}
2026-06-01 15:01:45 +02:00
break
2026-01-12 11:59:05 +01:00
}
}
}
d . generateResource ( d . GetResources ( tools . DATA_RESOURCE ) , request )
d . generateResource ( d . GetResources ( tools . PROCESSING_RESOURCE ) , request )
2026-04-23 09:24:02 +02:00
d . generateResource ( d . GetResources ( tools . SERVICE_RESOURCE ) , request )
2026-01-12 11:59:05 +01:00
d . generateResource ( d . GetResources ( tools . STORAGE_RESOURCE ) , request )
d . generateResource ( d . GetResources ( tools . COMPUTE_RESOURCE ) , request )
d . generateResource ( d . GetResources ( tools . WORKFLOW_RESOURCE ) , request )
2026-04-28 13:24:25 +02:00
d . generateResource ( d . GetResources ( tools . DYNAMIC_RESOURCE ) , request )
2026-03-17 14:26:11 +01:00
d . Graph . Items = graphVarName
2026-06-02 08:42:20 +02:00
return d , warnings , nil
2026-01-12 11:59:05 +01:00
}
func ( d * Workflow ) generateResource ( datas [ ] resources . ResourceInterface , request * tools . APIRequest ) error {
for _ , d := range datas {
2026-03-17 16:09:39 +01:00
if d . GetType ( ) == tools . COMPUTE_RESOURCE . String ( ) {
access := live . NewAccessor [ * live . LiveDatacenter ] ( tools . LIVE_DATACENTER , request )
if b , err := json . Marshal ( d ) ; err == nil {
var liv live . LiveDatacenter
json . Unmarshal ( b , & liv )
data , _ , err := access . StoreOne ( & liv )
if err == nil {
2026-03-19 10:50:00 +01:00
access . CopyOne ( data )
2026-03-17 16:09:39 +01:00
}
}
continue
} else if d . GetType ( ) == tools . STORAGE_RESOURCE . String ( ) {
access := live . NewAccessor [ * live . LiveStorage ] ( tools . LIVE_STORAGE , request )
if b , err := json . Marshal ( d ) ; err == nil {
var liv live . LiveStorage
json . Unmarshal ( b , & liv )
data , _ , err := access . StoreOne ( & liv )
if err == nil {
2026-03-19 10:50:00 +01:00
access . CopyOne ( data )
2026-03-17 16:09:39 +01:00
}
}
2026-01-12 11:59:05 +01:00
continue
}
2026-03-19 10:50:00 +01:00
d . GetAccessor ( request ) . StoreOne ( d )
2026-01-12 11:59:05 +01:00
}
return nil
}
2026-03-18 08:30:02 +01:00
// setNestedKey sets a value in a nested map using dot-notation path.
// "access.container.image" → m["access"]["container"]["image"] = value
func setNestedKey ( m map [ string ] any , path string , value any ) {
parts := strings . SplitN ( path , "." , 2 )
if len ( parts ) == 1 {
m [ path ] = value
return
}
key , rest := parts [ 0 ] , parts [ 1 ]
if _ , ok := m [ key ] ; ! ok {
m [ key ] = map [ string ] any { }
}
if sub , ok := m [ key ] . ( map [ string ] any ) ; ok {
setNestedKey ( sub , rest , value )
}
}
// parseHumanFriendlyAttrs converts a human-friendly comment into JSON bytes.
// Supports:
// - flat: "source: http://example.com, encryption: true, size: 500"
// - nested: "access.container.image: nginx, access.container.tag: latest"
// - raw JSON passthrough (backward-compat): '{"key": "value"}'
//
// Values are auto-typed: bool, float64, or string.
// Note: the first ':' in each pair is the key/value separator,
// so URLs like "http://..." are handled correctly.
func parseHumanFriendlyAttrs ( comment string ) [ ] byte {
comment = strings . TrimSpace ( comment )
if strings . HasPrefix ( comment , "{" ) {
return [ ] byte ( comment )
}
m := map [ string ] any { }
for _ , pair := range strings . Split ( comment , "," ) {
pair = strings . TrimSpace ( pair )
parts := strings . SplitN ( pair , ":" , 2 )
if len ( parts ) != 2 {
continue
}
key := strings . TrimSpace ( parts [ 0 ] )
val := strings . TrimSpace ( parts [ 1 ] )
var typed any
if b , err := strconv . ParseBool ( val ) ; err == nil {
typed = b
} else if n , err := strconv . ParseFloat ( val , 64 ) ; err == nil {
typed = n
} else {
typed = val
}
setNestedKey ( m , key , typed )
}
b , _ := json . Marshal ( m )
return b
}
2026-03-17 14:26:11 +01:00
func ( d * Workflow ) extractLink ( line string , graphVarName map [ string ] graph . GraphItem , pattern string , reverse bool ) error {
2026-01-12 11:59:05 +01:00
splitted := strings . Split ( line , pattern )
if len ( splitted ) < 2 {
return errors . New ( "links elements not found" )
}
2026-06-01 15:01:45 +02:00
// Source: trim surrounding whitespace.
srcVar := strings . TrimSpace ( splitted [ 0 ] )
// Destination: trim whitespace then stop at the first space or apostrophe
// (the rest may be a trailing comment produced by ToPlantUML look-ahead).
dstTokens := strings . FieldsFunc ( strings . TrimSpace ( splitted [ 1 ] ) , func ( r rune ) bool {
return r == ' ' || r == '\t' || r == '\''
} )
if len ( dstTokens ) == 0 {
return errors . New ( "link destination var name not found" )
}
dstVar := dstTokens [ 0 ]
srcItem , srcOk := graphVarName [ srcVar ]
dstItem , dstOk := graphVarName [ dstVar ]
if ! srcOk || srcItem . ID == "" {
return fmt . Errorf ( "link source %q not declared" , srcVar )
}
if ! dstOk || dstItem . ID == "" {
return fmt . Errorf ( "link destination %q not declared" , dstVar )
}
2026-01-12 11:59:05 +01:00
link := & graph . GraphLink {
Source : graph . Position {
2026-06-01 15:01:45 +02:00
ID : srcItem . ID ,
2026-01-12 11:59:05 +01:00
X : 0 ,
Y : 0 ,
} ,
Destination : graph . Position {
2026-06-01 15:01:45 +02:00
ID : dstItem . ID ,
2026-01-12 11:59:05 +01:00
X : 0 ,
Y : 0 ,
} ,
}
2026-01-13 16:04:31 +01:00
if reverse {
tmp := link . Destination
link . Destination = link . Source
link . Source = tmp
}
2026-01-12 11:59:05 +01:00
splittedComments := strings . Split ( line , "'" )
2026-03-17 14:17:49 +01:00
if len ( splittedComments ) > 1 {
2026-03-18 08:30:02 +01:00
comment := strings . ReplaceAll ( splittedComments [ 1 ] , "'" , "" )
json . Unmarshal ( parseHumanFriendlyAttrs ( comment ) , link )
2026-01-12 11:59:05 +01:00
}
d . Graph . Links = append ( d . Graph . Links , * link )
return nil
}
2026-06-02 08:42:20 +02:00
func ( d * Workflow ) extractResourcePlantUML ( line string , resource resources . ResourceInterface , dataName string , peerID string , request * tools . APIRequest ) ( string , * graph . GraphItem , [ ] string , error ) {
2026-01-12 11:59:05 +01:00
splittedFunc := strings . Split ( line , "(" )
if len ( splittedFunc ) <= 1 {
2026-06-02 08:42:20 +02:00
return "" , nil , nil , errors . New ( "Can't deserialize Object, there's no func" )
2026-01-12 11:59:05 +01:00
}
splittedParams := strings . Split ( splittedFunc [ 1 ] , "," )
2026-03-18 08:30:02 +01:00
if len ( splittedParams ) <= 1 {
2026-06-02 08:42:20 +02:00
return "" , nil , nil , errors . New ( "Can't deserialize Object, there's no params" )
2026-01-12 11:59:05 +01:00
}
varName := splittedParams [ 0 ]
splitted := strings . Split ( splittedParams [ 1 ] , "\"" )
if len ( splitted ) <= 1 {
2026-06-02 08:42:20 +02:00
return "" , nil , nil , errors . New ( "Can't deserialize Object, there's no name" )
}
name := strings . ReplaceAll ( splitted [ 1 ] , "\\n" , " " )
// Extract comment text (if present) for metadata parsing.
comment := ""
if parts := strings . Split ( line , "'" ) ; len ( parts ) > 1 {
comment = strings . ReplaceAll ( parts [ 1 ] , "'" , "" )
2026-01-12 11:59:05 +01:00
}
2026-06-02 08:42:20 +02:00
var warns [ ] string
// Try to resolve an existing catalog resource (by id, then by name).
if existing , warn := d . resolveExistingResource ( resource , dataName , name , comment , request ) ; existing != nil {
warns = append ( warns , warn )
item := d . addExistingGraphItem ( dataName , existing )
return varName , item , warns , nil
}
// No existing resource — create new.
resource . SetName ( name )
instance := d . getNewInstance ( dataName , name , peerID )
2026-03-18 08:30:02 +01:00
if instance != nil {
if b , err := json . Marshal ( resource ) ; err == nil {
json . Unmarshal ( b , instance )
}
2026-06-02 08:42:20 +02:00
if comment != "" {
2026-03-18 08:30:02 +01:00
json . Unmarshal ( parseHumanFriendlyAttrs ( comment ) , instance )
2026-03-17 13:19:51 +01:00
}
resource . AddInstances ( instance )
}
2026-03-18 08:30:02 +01:00
2026-03-17 13:52:43 +01:00
item := d . getNewGraphItem ( dataName , resource )
if item != nil {
d . Graph . Items [ item . ID ] = * item
2026-03-17 13:36:36 +01:00
}
2026-06-02 08:42:20 +02:00
return varName , item , warns , nil
}
// resolveExistingResource tries to find an existing catalog resource matching
// the given name or the id embedded in the PlantUML comment.
// Returns (resource, warning message) or (nil, "") if none found.
func ( d * Workflow ) resolveExistingResource (
proto resources . ResourceInterface ,
dataName , name , comment string ,
request * tools . APIRequest ,
) ( resources . ResourceInterface , string ) {
accessor := proto . GetAccessor ( request )
// 1. Try lookup by id from comment ("id: <uuid>").
if comment != "" {
attrs := map [ string ] any { }
json . Unmarshal ( parseHumanFriendlyAttrs ( comment ) , & attrs )
if id , ok := attrs [ "id" ] . ( string ) ; ok && id != "" {
if dbObj , _ , err := accessor . LoadOne ( id ) ; err == nil && dbObj != nil {
if ri , ok := dbObj . ( resources . ResourceInterface ) ; ok {
return ri , fmt . Sprintf ( ` [import warning] %s "%s": existing resource retrieved by id %s ` , dataName , name , id )
}
}
}
}
// 2. Try search by exact name.
filter := & dbs . Filters {
Or : map [ string ] [ ] dbs . Filter {
"abstractobject.name" : { { Operator : dbs . EQUAL . String ( ) , Value : name } } ,
} ,
}
if results , _ , err := accessor . Search ( filter , "" , false , 0 , 10 ) ; err == nil {
for _ , r := range results {
if r . GetName ( ) == name {
if dbObj , _ , err2 := accessor . LoadOne ( r . GetID ( ) ) ; err2 == nil && dbObj != nil {
if ri , ok := dbObj . ( resources . ResourceInterface ) ; ok {
return ri , fmt . Sprintf ( ` [import warning] %s "%s": existing resource found by name and retrieved instead of creating a new one ` , dataName , name )
}
}
}
}
}
return nil , ""
}
// addExistingGraphItem registers an already-existing resource in the workflow's
// ID lists and graph, without adding it to the resource lists (so generateResource
// won't try to store it again).
func ( d * Workflow ) addExistingGraphItem ( dataName string , resource resources . ResourceInterface ) * graph . GraphItem {
graphItem := & graph . GraphItem {
ID : uuid . New ( ) . String ( ) ,
ItemResource : & resources . ItemResource { } ,
}
switch dataName {
case "Data" :
d . Datas = append ( d . Datas , resource . GetID ( ) )
if r , ok := resource . ( * resources . DataResource ) ; ok {
graphItem . Data = r
}
case "Processing" :
d . Processings = append ( d . Processings , resource . GetID ( ) )
if r , ok := resource . ( * resources . ProcessingResource ) ; ok {
graphItem . Processing = r
}
case "Service" :
d . Services = append ( d . Services , resource . GetID ( ) )
if r , ok := resource . ( * resources . ServiceResource ) ; ok {
graphItem . Service = r
}
case "Storage" :
d . Storages = append ( d . Storages , resource . GetID ( ) )
if r , ok := resource . ( * resources . StorageResource ) ; ok {
graphItem . Storage = r
}
case "ComputeUnit" :
d . Computes = append ( d . Computes , resource . GetID ( ) )
if r , ok := resource . ( * resources . ComputeResource ) ; ok {
graphItem . Compute = r
}
default :
return nil
}
return graphItem
2026-01-12 11:59:05 +01:00
}
2026-03-17 13:52:43 +01:00
func ( d * Workflow ) getNewGraphItem ( dataName string , resource resources . ResourceInterface ) * graph . GraphItem {
2026-03-17 13:36:36 +01:00
if resource == nil {
return nil
}
2026-03-17 13:52:43 +01:00
graphItem := & graph . GraphItem {
2026-03-17 14:03:19 +01:00
ID : uuid . New ( ) . String ( ) ,
ItemResource : & resources . ItemResource { } ,
2026-03-17 13:52:43 +01:00
}
2026-01-12 11:59:05 +01:00
switch dataName {
case "Data" :
d . Datas = append ( d . Datas , resource . GetID ( ) )
d . DataResources = append ( d . DataResources , resource . ( * resources . DataResource ) )
graphItem . Data = resource . ( * resources . DataResource )
2026-01-13 16:04:31 +01:00
case "Processing" :
2026-01-12 11:59:05 +01:00
d . Processings = append ( d . Processings , resource . GetID ( ) )
d . ProcessingResources = append ( d . ProcessingResources , resource . ( * resources . ProcessingResource ) )
graphItem . Processing = resource . ( * resources . ProcessingResource )
2026-04-28 13:24:25 +02:00
case "Service" :
d . Services = append ( d . Services , resource . GetID ( ) )
d . ServiceResources = append ( d . ServiceResources , resource . ( * resources . ServiceResource ) )
graphItem . Service = resource . ( * resources . ServiceResource )
case "Dynamic" :
d . Dynamics = append ( d . Dynamics , resource . GetID ( ) )
d . DynamicResources = append ( d . DynamicResources , resource . ( * resources . DynamicResource ) )
graphItem . Dynamic = resource . ( * resources . DynamicResource )
2026-03-18 08:30:02 +01:00
case "WorkflowEvent" :
// The resource is already a *NativeTool with Kind=WORKFLOW_EVENT set by the
// catalog factory. We use it directly without any DB lookup.
nt := resource . ( * resources . NativeTool )
nt . Name = native_tools . WORKFLOW_EVENT . String ( )
d . NativeTool = append ( d . NativeTool , nt . GetID ( ) )
graphItem . NativeTool = nt
2026-01-12 11:59:05 +01:00
case "Storage" :
d . Storages = append ( d . Storages , resource . GetID ( ) )
d . StorageResources = append ( d . StorageResources , resource . ( * resources . StorageResource ) )
graphItem . Storage = resource . ( * resources . StorageResource )
case "ComputeUnit" :
d . Computes = append ( d . Computes , resource . GetID ( ) )
d . ComputeResources = append ( d . ComputeResources , resource . ( * resources . ComputeResource ) )
graphItem . Compute = resource . ( * resources . ComputeResource )
default :
2026-03-17 13:52:43 +01:00
return nil
2026-01-12 11:59:05 +01:00
}
2026-03-17 14:37:06 +01:00
return graphItem
2026-01-12 11:59:05 +01:00
}
func ( d * Workflow ) getNewInstance ( dataName string , name string , peerID string ) resources . ResourceInstanceITF {
switch dataName {
case "Data" :
return resources . NewDataInstance ( name , peerID )
case "Processing" :
return resources . NewProcessingInstance ( name , peerID )
case "Storage" :
return resources . NewStorageResourceInstance ( name , peerID )
case "ComputeUnit" :
return resources . NewComputeResourceInstance ( name , peerID )
2026-04-28 13:24:25 +02:00
case "Service" :
return resources . NewServiceInstance ( name , peerID )
2026-01-12 11:59:05 +01:00
default :
return nil
}
}
2025-06-12 10:42:05 +02:00
type Deps struct {
Source string
Dest string
}
2025-06-12 10:47:38 +02:00
func ( w * Workflow ) IsDependancy ( id string ) [ ] Deps {
2025-06-12 10:42:05 +02:00
dependancyOfIDs := [ ] Deps { }
for _ , link := range w . Graph . Links {
if _ , ok := w . Graph . Items [ link . Destination . ID ] ; ! ok {
continue
}
source := w . Graph . Items [ link . Destination . ID ] . Processing
if id == link . Source . ID && source != nil {
dependancyOfIDs = append ( dependancyOfIDs , Deps { Source : source . GetName ( ) , Dest : link . Destination . ID } )
}
sourceWF := w . Graph . Items [ link . Destination . ID ] . Workflow
if id == link . Source . ID && sourceWF != nil {
dependancyOfIDs = append ( dependancyOfIDs , Deps { Source : sourceWF . GetName ( ) , Dest : link . Destination . ID } )
}
}
return dependancyOfIDs
}
2026-01-13 16:04:31 +01:00
func ( w * Workflow ) GetFirstItems ( ) [ ] graph . GraphItem {
return w . GetGraphItems ( func ( item graph . GraphItem ) bool {
return len ( w . GetDependencies ( w . GetID ( ) ) ) == 0
} )
}
2025-06-12 10:42:05 +02:00
func ( w * Workflow ) GetDependencies ( id string ) ( dependencies [ ] Deps ) {
for _ , link := range w . Graph . Links {
if _ , ok := w . Graph . Items [ link . Source . ID ] ; ! ok {
continue
}
source := w . Graph . Items [ link . Source . ID ] . Processing
if id == link . Destination . ID && source != nil {
dependencies = append ( dependencies , Deps { Source : source . GetName ( ) , Dest : link . Source . ID } )
continue
}
}
return
}
2025-01-15 10:56:44 +01:00
func ( w * Workflow ) GetGraphItems ( f func ( item graph . GraphItem ) bool ) ( list_datas [ ] graph . GraphItem ) {
2024-10-03 17:25:54 +02:00
for _ , item := range w . Graph . Items {
2024-12-12 16:25:47 +01:00
if f ( item ) {
list_datas = append ( list_datas , item )
2024-10-03 17:25:54 +02:00
}
}
return
}
2025-06-19 08:11:11 +02:00
func ( w * Workflow ) GetPricedItem (
2026-01-13 16:04:31 +01:00
f func ( item graph . GraphItem ) bool , request * tools . APIRequest ,
instance int ,
partnership int ,
buying int ,
strategy int ,
bookingMode int ,
buyingStrategy int ,
pricingStrategy int ) ( map [ string ] pricing . PricedItemITF , error ) {
2024-12-12 16:25:47 +01:00
list_datas := map [ string ] pricing . PricedItemITF { }
for _ , item := range w . Graph . Items {
if f ( item ) {
2025-01-13 11:24:07 +01:00
dt , res := item . GetResource ( )
2026-02-18 12:24:19 +01:00
2026-01-13 16:04:31 +01:00
ord , err := res . ConvertToPricedResource ( dt , & instance , & partnership , & buying , & strategy , & bookingMode , request )
if err != nil {
return list_datas , err
}
2024-12-12 16:25:47 +01:00
list_datas [ res . GetID ( ) ] = ord
2026-02-18 12:24:19 +01:00
2024-11-14 10:02:18 +01:00
}
}
2026-01-13 16:04:31 +01:00
return list_datas , nil
2024-11-14 10:02:18 +01:00
}
2025-01-30 09:45:13 +01:00
type Related struct {
2025-01-30 11:11:34 +01:00
Node resources . ResourceInterface
2025-01-30 09:45:13 +01:00
Links [ ] graph . GraphLink
}
func ( w * Workflow ) GetByRelatedProcessing ( processingID string , g func ( item graph . GraphItem ) bool ) map [ string ] Related {
related := map [ string ] Related { }
2024-10-10 09:38:27 +02:00
for _ , link := range w . Graph . Links {
2024-12-12 16:25:47 +01:00
nodeID := link . Destination . ID
2025-01-13 11:24:07 +01:00
var node resources . ResourceInterface
2024-12-12 16:25:47 +01:00
if g ( w . Graph . Items [ link . Source . ID ] ) {
item := w . Graph . Items [ link . Source . ID ]
2025-01-13 11:24:07 +01:00
_ , node = item . GetResource ( )
2024-12-12 16:25:47 +01:00
}
if node == nil && g ( w . Graph . Items [ link . Destination . ID ] ) { // if the source is not a storage, we consider that the destination is the storage
nodeID = link . Source . ID
item := w . Graph . Items [ link . Destination . ID ] // and the processing is the source
2025-01-13 11:24:07 +01:00
_ , node = item . GetResource ( ) // we are looking for the storage as destination
2024-10-10 09:38:27 +02:00
}
if processingID == nodeID && node != nil { // if the storage is linked to the processing
2025-08-08 16:15:53 +02:00
relID := node . GetID ( )
rel := Related { }
2025-01-30 11:11:34 +01:00
rel . Node = node
2025-01-30 09:45:13 +01:00
rel . Links = append ( rel . Links , link )
2025-08-08 16:15:53 +02:00
related [ relID ] = rel
2024-10-10 09:38:27 +02:00
}
}
2025-01-30 09:45:13 +01:00
return related
2024-10-10 09:38:27 +02:00
}
2026-01-13 16:04:31 +01:00
func ( ao * Workflow ) VerifyAuth ( callName string , request * tools . APIRequest ) bool {
2024-12-03 10:57:28 +01:00
isAuthorized := false
if len ( ao . Shared ) > 0 {
for _ , shared := range ao . Shared {
2024-12-12 16:25:47 +01:00
shared , code , _ := shallow_collaborative_area . NewAccessor ( request ) . LoadOne ( shared )
2024-12-03 10:57:28 +01:00
if code != 200 || shared == nil {
isAuthorized = false
2025-02-03 12:21:50 +01:00
} else {
2026-01-13 16:04:31 +01:00
isAuthorized = shared . VerifyAuth ( callName , request )
2024-12-03 10:57:28 +01:00
}
}
}
2026-01-13 16:04:31 +01:00
return ao . AbstractObject . VerifyAuth ( callName , request ) || isAuthorized
2024-12-03 10:57:28 +01:00
}
2026-02-18 12:24:19 +01:00
// TODO : Check Booking... + Storage
2024-08-30 14:50:48 +02:00
/*
* CheckBooking is a function that checks the booking of the workflow on peers (even ourselves)
*/
2024-08-23 09:53:37 +02:00
func ( wfa * Workflow ) CheckBooking ( caller * tools . HTTPCaller ) ( bool , error ) {
2024-08-12 14:18:13 +02:00
// check if
2024-08-30 14:50:48 +02:00
if wfa . Graph == nil { // no graph no booking
2024-08-12 14:18:13 +02:00
return false , nil
}
2024-12-12 16:25:47 +01:00
accessor := ( & resources . ComputeResource { } ) . GetAccessor ( & tools . APIRequest { Caller : caller } )
2024-08-12 16:11:25 +02:00
for _ , link := range wfa . Graph . Links {
2024-12-12 16:25:47 +01:00
if ok , compute_id := link . IsComputeLink ( * wfa . Graph ) ; ok { // check if the link is a link between a compute and a resource
compute , code , _ := accessor . LoadOne ( compute_id )
2024-08-12 16:11:25 +02:00
if code != 200 {
continue
}
2024-11-07 11:05:24 +01:00
// CHECK BOOKING ON PEER, compute could be a remote one
2024-12-12 16:25:47 +01:00
peerID := compute . ( * resources . ComputeResource ) . CreatorID
2024-08-13 09:49:42 +02:00
if peerID == "" {
return false , errors . New ( "no peer id" )
2024-08-30 14:50:48 +02:00
} // no peer id no booking, we need to know where to book
2024-12-12 16:25:47 +01:00
_ , err := ( & peer . Peer { } ) . LaunchPeerExecution ( peerID , compute_id , tools . BOOKING , tools . GET , nil , caller )
2024-08-12 16:11:25 +02:00
if err != nil {
return false , err
}
}
2024-08-12 14:18:13 +02:00
}
2024-08-12 16:11:25 +02:00
return true , nil
}
2025-01-13 11:24:07 +01:00
2026-03-20 16:14:07 +01:00
// preemptDelay is the minimum lead time granted before a preempted booking starts.
const preemptDelay = 30 * time . Second
// Planify computes the scheduled start/end for every resource in the workflow.
//
// bookingMode controls availability checking when p (a live planner snapshot) is provided:
// - PREEMPTED : start from now+preemptDelay regardless of existing load.
// - WHEN_POSSIBLE: start from max(now, start); if a slot conflicts, slide to the next free window.
// - PLANNED : use start as-is; return an error if the slot is not available.
//
// Passing p = nil skips all availability checks (useful for sub-workflow recursion).
func ( wf * Workflow ) Planify ( start time . Time , end * time . Time , instances ConfigItem , partnerships ConfigItem , buyings ConfigItem , strategies ConfigItem , bookingMode int , p planner . PlannerITF , request * tools . APIRequest ) ( bool , float64 , map [ tools . DataType ] map [ string ] pricing . PricedItemITF , * Workflow , error ) {
// 1. Adjust global start based on booking mode.
now := time . Now ( )
switch booking . BookingMode ( bookingMode ) {
case booking . PREEMPTED :
if earliest := now . Add ( preemptDelay ) ; start . Before ( earliest ) {
start = earliest
}
case booking . WHEN_POSSIBLE :
if start . Before ( now ) {
start = now
}
2026-04-02 14:31:19 +02:00
// PLANNED: honour the caller's start date as-is.
2026-03-20 16:14:07 +01:00
}
2025-02-13 09:10:24 +01:00
priceds := map [ tools . DataType ] map [ string ] pricing . PricedItemITF { }
2026-03-20 16:14:07 +01:00
var err error
2026-04-23 09:24:02 +02:00
// 2. Plan processings and services first so we can derive the total workflow duration.
// Services in DEPLOYMENT mode return duration=-1 (open-ended); HOSTED mode returns a bounded call window.
2026-01-13 16:04:31 +01:00
ps , priceds , err := plan [ * resources . ProcessingResource ] ( tools . PROCESSING_RESOURCE , instances , partnerships , buyings , strategies , bookingMode , wf , priceds , request , wf . Graph . IsProcessing ,
func ( res resources . ResourceInterface , priced pricing . PricedItemITF ) ( time . Time , float64 , error ) {
d , err := wf . Graph . GetAverageTimeProcessingBeforeStart ( 0 , res . GetID ( ) ,
* instances . Get ( res . GetID ( ) ) , * partnerships . Get ( res . GetID ( ) ) , * buyings . Get ( res . GetID ( ) ) , * strategies . Get ( res . GetID ( ) ) ,
bookingMode , request )
if err != nil {
return start , 0 , err
}
return start . Add ( time . Duration ( d ) * time . Second ) , priced . GetExplicitDurationInS ( ) , nil
} , func ( started time . Time , duration float64 ) ( * time . Time , error ) {
2026-03-20 10:30:30 +01:00
s := started . Add ( time . Duration ( duration ) * time . Second )
2026-01-13 16:04:31 +01:00
return & s , nil
2025-01-15 10:56:44 +01:00
} )
if err != nil {
2026-01-13 16:04:31 +01:00
return false , 0 , priceds , nil , err
2025-01-13 11:24:07 +01:00
}
2026-04-23 09:24:02 +02:00
if _ , priceds , err = plan [ * resources . ServiceResource ] ( tools . SERVICE_RESOURCE , instances , partnerships , buyings , strategies , bookingMode , wf , priceds , request , wf . Graph . IsService ,
func ( res resources . ResourceInterface , priced pricing . PricedItemITF ) ( time . Time , float64 , error ) {
d , err := wf . Graph . GetAverageTimeProcessingBeforeStart ( 0 , res . GetID ( ) ,
* instances . Get ( res . GetID ( ) ) , * partnerships . Get ( res . GetID ( ) ) , * buyings . Get ( res . GetID ( ) ) , * strategies . Get ( res . GetID ( ) ) ,
bookingMode , request )
if err != nil {
return start , 0 , err
}
return start . Add ( time . Duration ( d ) * time . Second ) , priced . GetExplicitDurationInS ( ) , nil
} , func ( started time . Time , duration float64 ) ( * time . Time , error ) {
if duration < 0 {
return nil , nil // DEPLOYMENT mode: open-ended
}
s := started . Add ( time . Duration ( duration ) * time . Second )
return & s , nil
} ) ; err != nil {
return false , 0 , priceds , nil , err
}
2026-03-20 16:14:07 +01:00
// Total workflow duration used as the booking window for compute/storage.
// Returns -1 if any processing is a service (open-ended).
workflowDuration := common . GetPlannerLongestTime ( priceds )
2026-01-13 16:04:31 +01:00
if _ , priceds , err = plan [ resources . ResourceInterface ] ( tools . NATIVE_TOOL , instances , partnerships , buyings , strategies , bookingMode , wf , priceds , request ,
wf . Graph . IsNativeTool , func ( res resources . ResourceInterface , priced pricing . PricedItemITF ) ( time . Time , float64 , error ) {
return start , 0 , nil
} , func ( started time . Time , duration float64 ) ( * time . Time , error ) {
return end , nil
2025-01-15 10:56:44 +01:00
} ) ; err != nil {
2026-01-13 16:04:31 +01:00
return false , 0 , priceds , nil , err
2025-01-13 11:24:07 +01:00
}
2026-01-13 16:04:31 +01:00
if _ , priceds , err = plan [ resources . ResourceInterface ] ( tools . DATA_RESOURCE , instances , partnerships , buyings , strategies , bookingMode , wf , priceds , request ,
wf . Graph . IsData , func ( res resources . ResourceInterface , priced pricing . PricedItemITF ) ( time . Time , float64 , error ) {
return start , 0 , nil
} , func ( started time . Time , duration float64 ) ( * time . Time , error ) {
return end , nil
} ) ; err != nil {
return false , 0 , priceds , nil , err
}
2026-03-20 16:14:07 +01:00
// 3. Compute/storage: duration = total workflow duration (conservative bound).
2026-01-13 16:04:31 +01:00
for k , f := range map [ tools . DataType ] func ( graph . GraphItem ) bool { tools . STORAGE_RESOURCE : wf . Graph . IsStorage ,
tools . COMPUTE_RESOURCE : wf . Graph . IsCompute } {
if _ , priceds , err = plan [ resources . ResourceInterface ] ( k , instances , partnerships , buyings , strategies , bookingMode , wf , priceds , request ,
f , func ( res resources . ResourceInterface , priced pricing . PricedItemITF ) ( time . Time , float64 , error ) {
2026-03-20 16:14:07 +01:00
nearestStart , _ , err := wf . Graph . GetAverageTimeRelatedToProcessingActivity ( ps , res , func ( i graph . GraphItem ) ( r resources . ResourceInterface ) {
2025-01-13 11:24:07 +01:00
if f ( i ) {
2025-01-15 10:56:44 +01:00
_ , r = i . GetResource ( )
2025-01-13 11:24:07 +01:00
}
2025-01-15 10:56:44 +01:00
return r
2026-01-13 16:04:31 +01:00
} , * instances . Get ( res . GetID ( ) ) , * partnerships . Get ( res . GetID ( ) ) ,
* buyings . Get ( res . GetID ( ) ) , * strategies . Get ( res . GetID ( ) ) , bookingMode , request )
if err != nil {
2026-03-20 16:14:07 +01:00
return start , workflowDuration , err
2026-01-13 16:04:31 +01:00
}
2026-03-20 16:14:07 +01:00
return start . Add ( time . Duration ( nearestStart ) * time . Second ) , workflowDuration , nil
2026-01-13 16:04:31 +01:00
} , func ( started time . Time , duration float64 ) ( * time . Time , error ) {
2026-03-20 16:14:07 +01:00
if duration < 0 {
return nil , nil // service: open-ended booking
}
2026-03-20 10:30:30 +01:00
s := started . Add ( time . Duration ( duration ) * time . Second )
2026-01-13 16:04:31 +01:00
return & s , nil
2025-01-15 10:56:44 +01:00
} ) ; err != nil {
2026-01-13 16:04:31 +01:00
return false , 0 , priceds , nil , err
2025-01-13 11:24:07 +01:00
}
}
2026-03-20 16:14:07 +01:00
longest := workflowDuration
2026-01-13 16:04:31 +01:00
if _ , priceds , err = plan [ resources . ResourceInterface ] ( tools . WORKFLOW_RESOURCE , instances , partnerships , buyings , strategies ,
bookingMode , wf , priceds , request , wf . Graph . IsWorkflow ,
func ( res resources . ResourceInterface , priced pricing . PricedItemITF ) ( time . Time , float64 , error ) {
2026-03-20 14:20:26 +01:00
start := start . Add ( time . Duration ( common . GetPlannerNearestStart ( start , priceds ) ) * time . Second )
2025-01-15 10:56:44 +01:00
longest := float64 ( - 1 )
r , code , err := res . GetAccessor ( request ) . LoadOne ( res . GetID ( ) )
if code != 200 || err != nil {
2026-01-13 16:04:31 +01:00
return start , longest , err
2025-01-13 11:24:07 +01:00
}
2026-03-20 16:14:07 +01:00
_ , neoLongest , priceds2 , _ , err := r . ( * Workflow ) . Planify ( start , end , instances , partnerships , buyings , strategies , bookingMode , nil , request )
2026-01-13 16:04:31 +01:00
// should ... import priced
if err != nil {
return start , longest , err
2025-01-15 10:56:44 +01:00
} else if neoLongest > longest {
longest = neoLongest
2025-01-13 11:24:07 +01:00
}
2026-01-13 16:04:31 +01:00
for k , v := range priceds2 {
if priceds [ k ] == nil {
priceds [ k ] = map [ string ] pricing . PricedItemITF { }
}
for k2 , v2 := range v {
if priceds [ k ] [ k2 ] != nil {
v2 . AddQuantity ( priceds [ k ] [ k2 ] . GetQuantity ( ) )
}
}
}
2026-03-20 14:20:26 +01:00
return start . Add ( time . Duration ( common . GetPlannerNearestStart ( start , priceds ) ) * time . Second ) , longest , nil
2026-01-13 16:04:31 +01:00
} , func ( start time . Time , longest float64 ) ( * time . Time , error ) {
2025-01-22 11:11:04 +01:00
s := start . Add ( time . Duration ( longest ) * time . Second )
2026-01-13 16:04:31 +01:00
return & s , nil
2025-01-15 10:56:44 +01:00
} ) ; err != nil {
2026-01-13 16:04:31 +01:00
return false , 0 , priceds , nil , err
}
2026-03-20 16:14:07 +01:00
// 4. Availability check against the live planner (skipped for PREEMPTED and sub-workflows).
if p != nil && booking . BookingMode ( bookingMode ) != booking . PREEMPTED {
slide , err := plannerAvailabilitySlide ( p , priceds , booking . BookingMode ( bookingMode ) )
if err != nil {
return false , 0 , priceds , nil , err
}
if slide > 0 {
// Re-plan from the corrected start; pass nil planner to avoid infinite recursion.
return wf . Planify ( start . Add ( slide ) , end , instances , partnerships , buyings , strategies , bookingMode , nil , request )
}
}
2026-01-13 16:04:31 +01:00
isPreemptible := true
for _ , first := range wf . GetFirstItems ( ) {
_ , res := first . GetResource ( )
if res . GetBookingModes ( ) [ booking . PREEMPTED ] == nil {
isPreemptible = false
break
}
2025-01-13 11:24:07 +01:00
}
2026-01-13 16:04:31 +01:00
return isPreemptible , longest , priceds , wf , nil
2025-01-13 11:24:07 +01:00
}
2026-03-20 16:14:07 +01:00
// plannerAvailabilitySlide checks all compute/storage resources in priceds against the planner.
// For PLANNED mode it returns an error immediately on the first conflict.
// For WHEN_POSSIBLE it returns the maximum slide (duration to add to global start) needed to
// clear all conflicts, or 0 if the plan is already conflict-free.
func plannerAvailabilitySlide ( p planner . PlannerITF , priceds map [ tools . DataType ] map [ string ] pricing . PricedItemITF , mode booking . BookingMode ) ( time . Duration , error ) {
maxSlide := time . Duration ( 0 )
for _ , dt := range [ ] tools . DataType { tools . COMPUTE_RESOURCE , tools . STORAGE_RESOURCE } {
for _ , priced := range priceds [ dt ] {
locStart := priced . GetLocationStart ( )
locEnd := priced . GetLocationEnd ( )
if locStart == nil || locEnd == nil {
continue // open-ended: skip availability check
}
d := locEnd . Sub ( * locStart )
next := p . NextAvailableStart ( priced . GetID ( ) , priced . GetInstanceID ( ) , * locStart , d )
slide := next . Sub ( * locStart )
if slide <= 0 {
continue
}
if mode == booking . PLANNED {
return 0 , errors . New ( "requested slot is not available for resource " + priced . GetID ( ) )
}
if slide > maxSlide {
maxSlide = slide
}
}
}
return maxSlide , nil
}
2026-01-12 14:26:29 +01:00
// Returns a map of DataType (processing,computing,data,storage,worfklow) where each resource (identified by its UUID)
2025-07-30 18:28:19 +02:00
// is mapped to the list of its items (different appearance) in the graph
// ex: if the same Minio storage is represented by several nodes in the graph, in [tools.STORAGE_RESSOURCE] its UUID will be mapped to
2026-01-12 14:26:29 +01:00
// the list of GraphItem ID that correspond to the ID of each node
func ( w * Workflow ) GetItemsByResources ( ) map [ tools . DataType ] map [ string ] [ ] string {
2025-07-30 18:21:09 +02:00
res := make ( map [ tools . DataType ] map [ string ] [ ] string )
2025-07-30 18:01:23 +02:00
dtMethodMap := map [ tools . DataType ] func ( ) [ ] graph . GraphItem {
2026-01-12 14:26:29 +01:00
tools . STORAGE_RESOURCE : func ( ) [ ] graph . GraphItem { return w . GetGraphItems ( w . Graph . IsStorage ) } ,
tools . DATA_RESOURCE : func ( ) [ ] graph . GraphItem { return w . GetGraphItems ( w . Graph . IsData ) } ,
tools . COMPUTE_RESOURCE : func ( ) [ ] graph . GraphItem { return w . GetGraphItems ( w . Graph . IsCompute ) } ,
tools . PROCESSING_RESOURCE : func ( ) [ ] graph . GraphItem { return w . GetGraphItems ( w . Graph . IsProcessing ) } ,
2026-04-23 09:24:02 +02:00
tools . SERVICE_RESOURCE : func ( ) [ ] graph . GraphItem { return w . GetGraphItems ( w . Graph . IsService ) } ,
2026-01-12 14:26:29 +01:00
tools . WORKFLOW_RESOURCE : func ( ) [ ] graph . GraphItem { return w . GetGraphItems ( w . Graph . IsWorkflow ) } ,
2026-04-28 13:24:25 +02:00
tools . DYNAMIC_RESOURCE : func ( ) [ ] graph . GraphItem { return w . GetGraphItems ( w . Graph . IsDynamic ) } ,
2025-07-30 18:01:23 +02:00
}
for dt , meth := range dtMethodMap {
2025-07-30 18:15:55 +02:00
res [ dt ] = make ( map [ string ] [ ] string )
2025-07-30 18:01:23 +02:00
items := meth ( )
for _ , i := range items {
_ , r := i . GetResource ( )
rId := r . GetID ( )
2026-01-12 14:26:29 +01:00
res [ dt ] [ rId ] = append ( res [ dt ] [ rId ] , i . ID )
2025-07-30 18:01:23 +02:00
}
}
2025-07-30 18:21:09 +02:00
return res
2025-07-30 18:01:23 +02:00
}
2025-06-19 08:11:11 +02:00
func plan [ T resources . ResourceInterface ] (
2026-01-13 16:04:31 +01:00
dt tools . DataType , instances ConfigItem , partnerships ConfigItem , buyings ConfigItem , strategies ConfigItem , bookingMode int , wf * Workflow , priceds map [ tools . DataType ] map [ string ] pricing . PricedItemITF , request * tools . APIRequest ,
f func ( graph . GraphItem ) bool ,
start func ( resources . ResourceInterface , pricing . PricedItemITF ) ( time . Time , float64 , error ) ,
end func ( time . Time , float64 ) ( * time . Time , error ) ) ( [ ] T , map [ tools . DataType ] map [ string ] pricing . PricedItemITF , error ) {
2025-01-15 10:56:44 +01:00
resources := [ ] T { }
for _ , item := range wf . GetGraphItems ( f ) {
if priceds [ dt ] == nil {
2025-02-13 09:10:24 +01:00
priceds [ dt ] = map [ string ] pricing . PricedItemITF { }
2025-01-13 11:24:07 +01:00
}
2025-01-15 10:56:44 +01:00
dt , realItem := item . GetResource ( )
if realItem == nil {
return resources , priceds , errors . New ( "could not load the processing resource" )
}
2026-01-13 16:04:31 +01:00
priced , err := realItem . ConvertToPricedResource ( dt , instances . Get ( realItem . GetID ( ) ) ,
partnerships . Get ( realItem . GetID ( ) ) , buyings . Get ( realItem . GetID ( ) ) , strategies . Get ( realItem . GetID ( ) ) , & bookingMode , request )
if err != nil {
return resources , priceds , err
}
2025-06-27 17:10:32 +02:00
// Should be commented once the Pricing selection feature has been implemented, related to the commit d35ad440fa77763ec7f49ab34a85e47e75581b61
// if priced.SelectPricing() == nil {
// return resources, priceds, errors.New("no pricings are selected... can't proceed")
// }
2026-01-13 16:04:31 +01:00
started , duration , err := start ( realItem , priced )
if err != nil {
return resources , priceds , err
}
2025-01-15 10:56:44 +01:00
priced . SetLocationStart ( started )
if duration >= 0 {
2026-01-13 16:04:31 +01:00
if e , err := end ( started , duration ) ; err == nil && e != nil {
2025-01-22 11:11:04 +01:00
priced . SetLocationEnd ( * e )
}
}
2025-01-15 10:56:44 +01:00
resources = append ( resources , realItem . ( T ) )
2026-01-13 16:04:31 +01:00
if priceds [ dt ] [ item . ID ] != nil {
priced . AddQuantity ( priceds [ dt ] [ item . ID ] . GetQuantity ( ) )
}
2025-02-13 09:10:24 +01:00
priceds [ dt ] [ item . ID ] = priced
2026-01-13 16:04:31 +01:00
2025-01-13 11:24:07 +01:00
}
2025-01-15 10:56:44 +01:00
return resources , priceds , nil
2025-01-13 11:24:07 +01:00
}
2026-05-27 15:50:23 +02:00
// ── Integrity validation ─────────────────────────────────────────────────────
// Arrow direction constants matching the flutter_flow_chart ArrowDirection enum
// (index order: forward=0, backward=1, bidirectionnal=2).
const (
arrowDirectionBackward int64 = 1
)
// ViolationSeverity distinguishes blocking errors from non-blocking warnings.
type ViolationSeverity int
const (
SeverityError ViolationSeverity = iota // Blocks scheduling — must be fixed.
SeverityWarning // Reported but non-blocking.
)
// ViolationType identifies the category of the violation.
// Mirrors the TopologyErrorType / TopologyWarningType enums in oc-front.
type ViolationType string
const (
// Errors — block scheduling
ViolationVariableNotFound ViolationType = "variable_not_found"
ViolationMissingComputeUnit ViolationType = "missing_compute_unit"
ViolationCycle ViolationType = "cycle"
2026-06-01 16:45:05 +02:00
ViolationMissingDataStorage ViolationType = "missing_data_storage"
ViolationRequiredOutputMissing ViolationType = "required_output_missing"
2026-05-27 15:50:23 +02:00
// Warnings — non-blocking, reported for UX
ViolationInvertedArrow ViolationType = "inverted_arrow"
ViolationIsolatedProcessing ViolationType = "isolated_processing"
ViolationStorageNotLinkedToProcessing ViolationType = "storage_not_linked_to_processing"
)
// IntegrityViolation describes a single structural or semantic problem
// found in the workflow graph.
type IntegrityViolation struct {
Severity ViolationSeverity
Type ViolationType
ItemIDs [ ] string // graph item IDs involved in the violation
Message string
}
func ( v IntegrityViolation ) IsError ( ) bool { return v . Severity == SeverityError }
func ( v IntegrityViolation ) IsWarning ( ) bool { return v . Severity == SeverityWarning }
// ValidateIntegrity checks the structural and semantic integrity of the workflow
// graph. It must be called by both oc-front (UX enforcement) and oc-schedulerd
// (sovereign enforcement, regardless of submission source — the front can be
// bypassed via direct API calls).
//
// Errors (block scheduling):
// 1. Variable not found — an arg references $varName not defined in env/inputs.
// 2. Missing compute — a Processing/non-HOSTED Service has no Compute linked.
// 3. Cycle — the processing DAG contains a directed cycle.
// 4. Missing data storage — a Data with Source has no Storage linked.
//
// Warnings (non-blocking):
// 5. Inverted arrow — a backward link between two processing nodes.
// 6. Isolated processing — a processing node with no processing neighbours.
// 7. Storage not linked to processing — a storage node orphaned from any processing.
func ( w * Workflow ) ValidateIntegrity ( ) [ ] IntegrityViolation {
var violations [ ] IntegrityViolation
violations = append ( violations , w . validateVariables ( ) ... )
2026-06-01 16:45:05 +02:00
violations = append ( violations , w . validateRequiredInputs ( ) ... )
2026-05-27 15:50:23 +02:00
violations = append ( violations , w . validateComputeLinks ( ) ... )
violations = append ( violations , w . detectCycles ( ) ... )
violations = append ( violations , w . validateDataStorageLinks ( ) ... )
violations = append ( violations , w . detectInvertedArrows ( ) ... )
violations = append ( violations , w . detectIsolatedProcessings ( ) ... )
violations = append ( violations , w . detectOrphanedStorages ( ) ... )
return violations
}
// HasCriticalViolations returns true when ValidateIntegrity found at least one Error.
// oc-schedulerd uses this to reject a workflow without inspecting each violation.
func ( w * Workflow ) HasCriticalViolations ( ) bool {
for _ , v := range w . ValidateIntegrity ( ) {
if v . IsError ( ) {
return true
}
}
return false
}
// itemName returns a human-readable name for a graph item (falls back to itemID).
func ( w * Workflow ) itemName ( itemID string ) string {
item , ok := w . Graph . Items [ itemID ]
if ! ok {
return itemID
}
_ , res := item . GetResource ( )
if res != nil {
return res . GetName ( )
}
return itemID
}
// validateVariables checks that every $varName reference inside w.Args is
// defined in the corresponding element's env or inputs — mirroring
// WorkflowFactory.validateArgs() in oc-front.
var varRefPattern = regexp . MustCompile ( ` \$\ { ?([A-Za-z_][A-Za-z0-9_]*)\}? ` )
func ( w * Workflow ) validateVariables ( ) [ ] IntegrityViolation {
var violations [ ] IntegrityViolation
for itemID , argList := range w . Args {
if len ( argList ) == 0 {
continue
}
available := map [ string ] struct { } { }
for _ , p := range w . Env [ itemID ] {
if p . Name != "" {
available [ p . Name ] = struct { } { }
}
}
for _ , p := range w . Inputs [ itemID ] {
if p . Name != "" {
available [ p . Name ] = struct { } { }
}
}
name := w . itemName ( itemID )
for _ , arg := range argList {
for _ , m := range varRefPattern . FindAllStringSubmatch ( arg , - 1 ) {
varName := m [ 1 ]
if _ , ok := available [ varName ] ; ! ok {
violations = append ( violations , IntegrityViolation {
Severity : SeverityError ,
Type : ViolationVariableNotFound ,
ItemIDs : [ ] string { itemID } ,
Message : fmt . Sprintf ( ` "%s": arg "%s" → variable $%s is not defined in env or inputs ` , name , arg , varName ) ,
} )
}
}
}
}
return violations
}
// validateComputeLinks checks that every Processing node (and every non-HOSTED
// Service node) has at least one Compute linked — mirroring the computeErrors
// block in oc-front's checkTopology().
func ( w * Workflow ) validateComputeLinks ( ) [ ] IntegrityViolation {
var violations [ ] IntegrityViolation
for id , item := range w . Graph . Items {
needsCompute := false
var name string
switch {
case w . Graph . IsProcessing ( item ) && item . Processing != nil :
// IsService processings are long-running services and don't need a Compute booking.
if item . Processing . IsService {
continue
}
needsCompute = true
name = item . Processing . GetName ( )
case w . Graph . IsService ( item ) && item . Service != nil :
// HOSTED services use an existing endpoint — no Compute booking needed.
inst := item . Service . GetSelectedInstance ( nil )
if inst != nil {
if si , ok := inst . ( * resources . ServiceInstance ) ; ok && si . Mode == resources . HOSTED {
continue
}
}
needsCompute = true
name = item . Service . GetName ( )
}
if ! needsCompute {
continue
}
hasCompute := false
for _ , link := range w . Graph . Links {
var otherID string
if link . Source . ID == id {
otherID = link . Destination . ID
} else if link . Destination . ID == id {
otherID = link . Source . ID
} else {
continue
}
if other , ok := w . Graph . Items [ otherID ] ; ok && w . Graph . IsCompute ( other ) {
hasCompute = true
break
}
}
if ! hasCompute {
violations = append ( violations , IntegrityViolation {
Severity : SeverityError ,
Type : ViolationMissingComputeUnit ,
ItemIDs : [ ] string { id } ,
Message : fmt . Sprintf ( ` "%s" has no compute unit linked ` , name ) ,
} )
}
}
return violations
}
// detectCycles runs DFS colouring on the processing→processing directed graph
// and reports any back-edge as a cycle error — mirroring dfsCycle() in oc-front.
func ( w * Workflow ) detectCycles ( ) [ ] IntegrityViolation {
var violations [ ] IntegrityViolation
// Collect processing + service + event node IDs (execution flux nodes).
procIDs := map [ string ] struct { } { }
for id , item := range w . Graph . Items {
if w . Graph . IsProcessing ( item ) || w . Graph . IsService ( item ) || w . Graph . IsNativeTool ( item ) {
procIDs [ id ] = struct { } { }
}
}
// Build directed successors honoring ArrowDirection.
successors := map [ string ] [ ] string { }
for id := range procIDs {
successors [ id ] = [ ] string { }
}
for _ , link := range w . Graph . Links {
src , dst := link . Source . ID , link . Destination . ID
_ , srcIsProc := procIDs [ src ]
_ , dstIsProc := procIDs [ dst ]
if ! srcIsProc || ! dstIsProc {
continue
}
dir := int64 ( 0 )
if link . Style != nil {
dir = link . Style . ArrowDirection
}
if dir == arrowDirectionBackward {
// Visual arrow reversed: dst runs before src.
successors [ dst ] = append ( successors [ dst ] , src )
} else {
successors [ src ] = append ( successors [ src ] , dst )
}
}
// DFS colouring: 0=white, 1=grey (in stack), 2=black (done).
color := map [ string ] int { }
reported := map [ string ] struct { } { }
var dfs func ( u string )
dfs = func ( u string ) {
color [ u ] = 1
for _ , v := range successors [ u ] {
if color [ v ] == 1 {
key := u + "→" + v
if _ , seen := reported [ key ] ; ! seen {
reported [ key ] = struct { } { }
violations = append ( violations , IntegrityViolation {
Severity : SeverityError ,
Type : ViolationCycle ,
ItemIDs : [ ] string { u , v } ,
Message : fmt . Sprintf ( ` Infinite loop: "%s" → "%s" creates a cycle that would block execution indefinitely ` ,
w . itemName ( u ) , w . itemName ( v ) ) ,
} )
}
} else if color [ v ] == 0 {
dfs ( v )
}
}
color [ u ] = 2
}
for id := range procIDs {
if color [ id ] == 0 {
dfs ( id )
}
}
return violations
}
// validateDataStorageLinks checks that every Data item with a non-empty Source
// has at least one Storage linked — the builder needs this to inject the
// download step (curl or NATS/Minio protocol).
func ( w * Workflow ) validateDataStorageLinks ( ) [ ] IntegrityViolation {
var violations [ ] IntegrityViolation
dataStorageLinks := w . Graph . GetDataStorageLinks ( )
linkedStorage := map [ string ] struct { } { }
for _ , dsl := range dataStorageLinks {
linkedStorage [ dsl . DataItemID ] = struct { } { }
}
for id , item := range w . Graph . Items {
if ! w . Graph . IsData ( item ) || item . Data == nil {
continue
}
hasSource := false
for _ , inst := range item . Data . Instances {
if inst . Access . HasSource ( ) {
hasSource = true
break
}
}
if ! hasSource {
continue
}
if _ , ok := linkedStorage [ id ] ; ! ok {
violations = append ( violations , IntegrityViolation {
Severity : SeverityError ,
Type : ViolationMissingDataStorage ,
ItemIDs : [ ] string { id } ,
Message : fmt . Sprintf ( ` data "%s" has a source but no Storage linked ` , item . Data . GetName ( ) ) ,
} )
}
}
return violations
}
2026-06-01 16:45:05 +02:00
// validateRequiredInputs checks that for each processing node with a required
// input, every immediate predecessor outputs a parameter with that name.
// Mirrors the requiredOutputMissing check in oc-front's checkTopology().
func ( w * Workflow ) validateRequiredInputs ( ) [ ] IntegrityViolation {
var violations [ ] IntegrityViolation
procIDs := map [ string ] struct { } { }
for id , item := range w . Graph . Items {
if w . Graph . IsProcessing ( item ) || w . Graph . IsService ( item ) || w . Graph . IsNativeTool ( item ) {
procIDs [ id ] = struct { } { }
}
}
// Build direct predecessors map.
predecessors := map [ string ] [ ] string { }
for id := range procIDs {
predecessors [ id ] = [ ] string { }
}
for _ , link := range w . Graph . Links {
src , dst := link . Source . ID , link . Destination . ID
_ , srcIsProc := procIDs [ src ]
_ , dstIsProc := procIDs [ dst ]
if ! srcIsProc || ! dstIsProc {
continue
}
dir := int64 ( 0 )
if link . Style != nil {
dir = link . Style . ArrowDirection
}
if dir == arrowDirectionBackward {
predecessors [ src ] = append ( predecessors [ src ] , dst )
} else {
predecessors [ dst ] = append ( predecessors [ dst ] , src )
}
}
for id , reqInputs := range w . Inputs {
if _ , isProc := procIDs [ id ] ; ! isProc {
continue
}
for _ , inp := range reqInputs {
if ! inp . Required || inp . Name == "" {
continue
}
for _ , predID := range predecessors [ id ] {
if ! w . nodeHasOutput ( predID , inp . Name ) {
violations = append ( violations , IntegrityViolation {
Severity : SeverityError ,
Type : ViolationRequiredOutputMissing ,
ItemIDs : [ ] string { id , predID } ,
Message : fmt . Sprintf (
` "%s" requires input "%s" but "%s" does not output it ` ,
w . itemName ( id ) , inp . Name , w . itemName ( predID ) ,
) ,
} )
}
}
}
}
return violations
}
// nodeHasOutput returns true if the given node outputs a parameter named name,
// either via workflow-level outputs or its resource's own outputs.
func ( w * Workflow ) nodeHasOutput ( nodeID , name string ) bool {
for _ , p := range w . Outputs [ nodeID ] {
if p . Name == name {
return true
}
}
item , ok := w . Graph . Items [ nodeID ]
if ! ok {
return false
}
var res resources . ResourceInterface
switch {
case item . Processing != nil :
res = item . Processing
case item . Service != nil :
res = item . Service
}
if res != nil {
for _ , p := range res . GetOutputs ( ) {
if p . Name == name {
return true
}
}
}
return false
}
2026-05-27 15:50:23 +02:00
// detectInvertedArrows warns when a link between two processing nodes uses a
// backward arrow direction — mirroring the invertedArrow warning in oc-front.
func ( w * Workflow ) detectInvertedArrows ( ) [ ] IntegrityViolation {
var violations [ ] IntegrityViolation
for _ , link := range w . Graph . Links {
if link . Style == nil || link . Style . ArrowDirection != arrowDirectionBackward {
continue
}
srcItem , srcOK := w . Graph . Items [ link . Source . ID ]
dstItem , dstOK := w . Graph . Items [ link . Destination . ID ]
if ! srcOK || ! dstOK {
continue
}
if ( w . Graph . IsProcessing ( srcItem ) || w . Graph . IsService ( srcItem ) ) &&
( w . Graph . IsProcessing ( dstItem ) || w . Graph . IsService ( dstItem ) ) {
violations = append ( violations , IntegrityViolation {
Severity : SeverityWarning ,
Type : ViolationInvertedArrow ,
ItemIDs : [ ] string { link . Source . ID , link . Destination . ID } ,
Message : fmt . Sprintf ( ` Reversed arrow between "%s" & "%s": "%s" will execute before "%s" unexpectedly ` ,
w . itemName ( link . Destination . ID ) , w . itemName ( link . Source . ID ) ,
w . itemName ( link . Destination . ID ) , w . itemName ( link . Source . ID ) ) ,
} )
}
}
return violations
}
// detectIsolatedProcessings warns when a processing node has no link to another
// processing node — it will execute synchronously with the workflow's first elements.
func ( w * Workflow ) detectIsolatedProcessings ( ) [ ] IntegrityViolation {
var violations [ ] IntegrityViolation
procIDs := map [ string ] struct { } { }
for id , item := range w . Graph . Items {
if w . Graph . IsProcessing ( item ) || w . Graph . IsService ( item ) || w . Graph . IsNativeTool ( item ) {
procIDs [ id ] = struct { } { }
}
}
for id := range procIDs {
hasProcNeighbour := false
for _ , link := range w . Graph . Links {
var otherID string
if link . Source . ID == id {
otherID = link . Destination . ID
} else if link . Destination . ID == id {
otherID = link . Source . ID
} else {
continue
}
if _ , ok := procIDs [ otherID ] ; ok {
hasProcNeighbour = true
break
}
}
if ! hasProcNeighbour {
violations = append ( violations , IntegrityViolation {
Severity : SeverityWarning ,
Type : ViolationIsolatedProcessing ,
ItemIDs : [ ] string { id } ,
Message : fmt . Sprintf ( ` "%s" is isolated (no connection with another processing) — will execute synchronously with the workflow's first element(s) ` ,
w . itemName ( id ) ) ,
} )
}
}
return violations
}
2026-06-04 11:31:03 +02:00
// ---------------------------------------------------------------------------
// AE validation helpers — centralised so both oc-scheduler and oc-schedulerd
// share the same logic without code duplication.
// ---------------------------------------------------------------------------
// BuildResourceIDSet constructs the per-type resource-ID map and the flat
// coupling-membership set used by ValidateWorkflowAE.
//
// selectedEmbeddedStorages and selectedInstances come from the scheduling
// request (WorkflowSchedule) or from the WorkflowExecution at launch time.
// Embedded storages are NOT stored in Workflow.Storages (they are inside
// ComputeResourceInstance.AvailableStorages), so they must be resolved here
// to make them visible to the AE coupling check.
func ( w * Workflow ) BuildResourceIDSet (
selectedEmbeddedStorages map [ string ] * resources . EmbeddedStorageSelection ,
selectedInstances ConfigItem ,
) ( map [ tools . DataType ] [ ] string , map [ string ] struct { } ) {
resourcesByType := map [ tools . DataType ] [ ] string {
tools . DATA_RESOURCE : w . Datas ,
tools . PROCESSING_RESOURCE : w . Processings ,
tools . STORAGE_RESOURCE : append ( [ ] string { } , w . Storages ... ) ,
tools . COMPUTE_RESOURCE : w . Computes ,
tools . WORKFLOW_RESOURCE : w . Workflows ,
tools . SERVICE_RESOURCE : w . Services ,
}
idSet := map [ string ] struct { } { }
for _ , ids := range resourcesByType {
for _ , id := range ids {
idSet [ id ] = struct { } { }
}
}
for graphItemID , sel := range selectedEmbeddedStorages {
if sel == nil {
continue
}
c , ok := w . Graph . Items [ graphItemID ]
if ! ok {
continue
}
_ , computeRes := c . GetResource ( )
computeResource , ok := computeRes . ( * resources . ComputeResource )
if ! ok {
continue
}
computeIdx := 0
if d := selectedInstances . Get ( computeResource . GetID ( ) ) ; d != nil {
computeIdx = * d
}
if computeIdx >= len ( computeResource . Instances ) {
continue
}
computeInst := computeResource . Instances [ computeIdx ]
if sel . StorageIndex >= len ( computeInst . AvailableStorages ) {
continue
}
storageID := computeInst . AvailableStorages [ sel . StorageIndex ] . GetID ( )
if storageID == "" {
continue
}
idSet [ storageID ] = struct { } { }
resourcesByType [ tools . STORAGE_RESOURCE ] = append ( resourcesByType [ tools . STORAGE_RESOURCE ] , storageID )
}
return resourcesByType , idSet
}
// ValidateWorkflowAE checks the ExploitationAuthorizations of every resource
// referenced in resourcesByType against the coupling/peer/workflow constraints.
//
// loadResource is injected by the caller to avoid a circular import
// (oc-lib/models/resources → oclib → oc-lib/models → resources).
// A nil return from loadResource means "resource not found — skip".
func ( w * Workflow ) ValidateWorkflowAE (
workflowID , consumerPeerID string ,
resourcesByType map [ tools . DataType ] [ ] string ,
idSet map [ string ] struct { } ,
loadResource func ( tools . DataType , string ) resources . ResourceInterface ,
) [ ] resources . AEViolation {
now := time . Now ( ) . UTC ( )
var violations [ ] resources . AEViolation
for dt , ids := range resourcesByType {
for _ , id := range ids {
res := loadResource ( dt , id )
if res == nil {
continue
}
for _ , ae := range res . GetExploitationAuthorizations ( ) {
violations = append ( violations , ae . CheckAE ( id , workflowID , consumerPeerID , idSet , now ) ... )
}
}
}
return violations
}
2026-05-27 15:50:23 +02:00
// detectOrphanedStorages warns when a storage node is not linked to any
// processing node — it contributes no data flow to the workflow.
func ( w * Workflow ) detectOrphanedStorages ( ) [ ] IntegrityViolation {
var violations [ ] IntegrityViolation
for id , item := range w . Graph . Items {
if ! w . Graph . IsStorage ( item ) {
continue
}
linkedTopics := map [ string ] struct { } { }
for _ , link := range w . Graph . Links {
var otherID string
if link . Source . ID == id {
otherID = link . Destination . ID
} else if link . Destination . ID == id {
otherID = link . Source . ID
} else {
continue
}
if other , ok := w . Graph . Items [ otherID ] ; ok {
switch {
case w . Graph . IsProcessing ( other ) :
linkedTopics [ "processing" ] = struct { } { }
case w . Graph . IsCompute ( other ) :
linkedTopics [ "compute" ] = struct { } { }
case w . Graph . IsData ( other ) :
linkedTopics [ "data" ] = struct { } { }
case w . Graph . IsService ( other ) :
linkedTopics [ "service" ] = struct { } { }
}
}
}
if _ , ok := linkedTopics [ "processing" ] ; ok {
continue
}
name := w . itemName ( id )
var msg string
if len ( linkedTopics ) == 0 {
msg = fmt . Sprintf ( ` "%s" is isolated (not linked to anything) ` , name )
} else {
topics := make ( [ ] string , 0 , len ( linkedTopics ) )
for t := range linkedTopics {
topics = append ( topics , t )
}
msg = fmt . Sprintf ( ` "%s" is not linked to any processing (only linked to: %s) ` , name , strings . Join ( topics , ", " ) )
}
violations = append ( violations , IntegrityViolation {
Severity : SeverityWarning ,
Type : ViolationStorageNotLinkedToProcessing ,
ItemIDs : [ ] string { id } ,
Message : msg ,
} )
}
return violations
}