package workflow_execution import ( "slices" "time" workflowgraph "cloud.o-forge.io/core/oc-lib/models/workflow/graph" ) // ExecutionStepState is the runtime state of a single step in the execution graph. type ExecutionStepState string const ( StepWaiting ExecutionStepState = "waiting" StepRunning ExecutionStepState = "running" StepSuccess ExecutionStepState = "success" StepFailure ExecutionStepState = "failure" ) // ExecutionGraphItem is the summarized view of one node in the workflow execution graph. // // - Name : human-readable label (resource name or item ID as fallback) // - StartDate : set when the step transitions to StepRunning // - EndDate : set when the step transitions to StepSuccess or StepFailure // - State : current lifecycle state of the step // - Deps : itemIDs that must reach StepSuccess before this step can start // - WhenRunning : itemIDs (resources) that become active while this step is running // (e.g. the compute node executing it, the storage it reads/writes) type ExecutionGraphItem struct { Name string `json:"name" bson:"name"` StartDate *time.Time `json:"start_date,omitempty" bson:"start_date,omitempty"` EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` State ExecutionStepState `json:"state" bson:"state"` Deps []string `json:"deps,omitempty" bson:"deps,omitempty"` WhenRunning []string `json:"when_running,omitempty" bson:"when_running,omitempty"` } // ExecutionGraph is a flat, scheduler-friendly summary of a workflow execution graph. // The map key is the workflow graph item ID. type ExecutionGraph map[string]ExecutionGraphItem // BuildExecutionGraph derives an initial ExecutionGraph (all steps in StepWaiting) // from a workflow graph. It infers: // - Deps : predecessor item IDs based on link direction // - WhenRunning : sibling item IDs connected to a step by a link // (i.e. resources that are co-active when the step runs) func BuildExecutionGraph(g *workflowgraph.Graph) ExecutionGraph { if g == nil { return ExecutionGraph{} } // deps[dst] = list of src item IDs that dst depends on deps := map[string][]string{} // whenRunning[id] = list of item IDs active while id is running whenRunning := map[string][]string{} for _, link := range g.Links { src := link.Source.ID dst := link.Destination.ID if src == "" || dst == "" { continue } srcItem, srcOk := g.Items[src] dstItem, dstOk := g.Items[dst] if !srcOk || !dstOk { continue } // Steps (logical nodes that sequence execution): Data, Processing, Workflow, NativeTool. // Resources (infrastructure co-active while a step runs): Compute, Storage. srcIsStep := srcItem.Data != nil || srcItem.Processing != nil || srcItem.Workflow != nil || srcItem.NativeTool != nil dstIsStep := dstItem.Data != nil || dstItem.Processing != nil || dstItem.Workflow != nil || dstItem.NativeTool != nil srcIsResource := srcItem.Compute != nil || srcItem.Storage != nil dstIsResource := dstItem.Compute != nil || dstItem.Storage != nil switch { case srcIsStep && dstIsStep: // Sequential dependency: dst must wait for src to succeed. deps[dst] = appendUnique(deps[dst], src) case srcIsStep && dstIsResource: // src activates dst (compute/storage) while running. whenRunning[src] = appendUnique(whenRunning[src], dst) case srcIsResource && dstIsStep: // dst uses src (compute/storage) while running. whenRunning[dst] = appendUnique(whenRunning[dst], src) } } eg := ExecutionGraph{} for id, item := range g.Items { name := id _, r := item.GetResource() if r != nil && r.GetName() != "" { name = r.GetName() } eg[id] = ExecutionGraphItem{ Name: name, State: StepWaiting, Deps: deps[id], WhenRunning: whenRunning[id], } } return eg } // MarkRunning transitions the step to StepRunning and records the start time. // It is a no-op if the step is already beyond StepRunning. func (eg ExecutionGraph) MarkRunning(itemID string, at time.Time) { item, ok := eg[itemID] if !ok || item.State == StepSuccess || item.State == StepFailure { return } item.State = StepRunning item.StartDate = &at eg[itemID] = item } // MarkDone transitions the step to StepSuccess or StepFailure and records the end time. func (eg ExecutionGraph) MarkDone(itemID string, success bool, at time.Time) { item, ok := eg[itemID] if !ok { return } if success { item.State = StepSuccess } else { item.State = StepFailure } item.EndDate = &at eg[itemID] = item } // Depssatisfied returns true when all deps of the given item have reached StepSuccess. func (eg ExecutionGraph) Depssatisfied(itemID string) bool { item, ok := eg[itemID] if !ok { return false } for _, dep := range item.Deps { depItem, depOk := eg[dep] if !depOk || depItem.State != StepSuccess { return false } } return true } // ReadyToRun returns the IDs of all steps that are still waiting and whose deps // are fully satisfied. Useful for the scheduler to decide what to start next. func (eg ExecutionGraph) ReadyToRun() []string { ready := []string{} for id, item := range eg { if item.State == StepWaiting && eg.Depssatisfied(id) { ready = append(ready, id) } } return ready } func appendUnique(slice []string, val string) []string { if slices.Contains(slice, val) { return slice } return append(slice, val) }