This commit is contained in:
mr
2026-04-16 15:19:36 +02:00
parent dc0041999d
commit 883c0bec3d
2 changed files with 173 additions and 0 deletions

View File

@@ -0,0 +1,168 @@
package workflow_execution
import (
"slices"
"time"
workflowgraph "cloud.o-forge.io/core/oc-lib/models/workflow/graph"
)
// ExecutionStepState is the runtime state of a single step in the execution graph.
type ExecutionStepState string
const (
StepWaiting ExecutionStepState = "waiting"
StepRunning ExecutionStepState = "running"
StepSuccess ExecutionStepState = "success"
StepFailure ExecutionStepState = "failure"
)
// ExecutionGraphItem is the summarized view of one node in the workflow execution graph.
//
// - Name : human-readable label (resource name or item ID as fallback)
// - StartDate : set when the step transitions to StepRunning
// - EndDate : set when the step transitions to StepSuccess or StepFailure
// - State : current lifecycle state of the step
// - Deps : itemIDs that must reach StepSuccess before this step can start
// - WhenRunning : itemIDs (resources) that become active while this step is running
// (e.g. the compute node executing it, the storage it reads/writes)
type ExecutionGraphItem struct {
Name string `json:"name" bson:"name"`
StartDate *time.Time `json:"start_date,omitempty" bson:"start_date,omitempty"`
EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"`
State ExecutionStepState `json:"state" bson:"state"`
Deps []string `json:"deps,omitempty" bson:"deps,omitempty"`
WhenRunning []string `json:"when_running,omitempty" bson:"when_running,omitempty"`
}
// ExecutionGraph is a flat, scheduler-friendly summary of a workflow execution graph.
// The map key is the workflow graph item ID.
type ExecutionGraph map[string]ExecutionGraphItem
// BuildExecutionGraph derives an initial ExecutionGraph (all steps in StepWaiting)
// from a workflow graph. It infers:
// - Deps : predecessor item IDs based on link direction
// - WhenRunning : sibling item IDs connected to a step by a link
// (i.e. resources that are co-active when the step runs)
func BuildExecutionGraph(g *workflowgraph.Graph) ExecutionGraph {
if g == nil {
return ExecutionGraph{}
}
// deps[dst] = list of src item IDs that dst depends on
deps := map[string][]string{}
// whenRunning[id] = list of item IDs active while id is running
whenRunning := map[string][]string{}
for _, link := range g.Links {
src := link.Source.ID
dst := link.Destination.ID
if src == "" || dst == "" {
continue
}
srcItem, srcOk := g.Items[src]
dstItem, dstOk := g.Items[dst]
if !srcOk || !dstOk {
continue
}
// Steps (logical nodes that sequence execution): Data, Processing, Workflow, NativeTool.
// Resources (infrastructure co-active while a step runs): Compute, Storage.
srcIsStep := srcItem.Data != nil || srcItem.Processing != nil || srcItem.Workflow != nil || srcItem.NativeTool != nil
dstIsStep := dstItem.Data != nil || dstItem.Processing != nil || dstItem.Workflow != nil || dstItem.NativeTool != nil
srcIsResource := srcItem.Compute != nil || srcItem.Storage != nil
dstIsResource := dstItem.Compute != nil || dstItem.Storage != nil
switch {
case srcIsStep && dstIsStep:
// Sequential dependency: dst must wait for src to succeed.
deps[dst] = appendUnique(deps[dst], src)
case srcIsStep && dstIsResource:
// src activates dst (compute/storage) while running.
whenRunning[src] = appendUnique(whenRunning[src], dst)
case srcIsResource && dstIsStep:
// dst uses src (compute/storage) while running.
whenRunning[dst] = appendUnique(whenRunning[dst], src)
}
}
eg := ExecutionGraph{}
for id, item := range g.Items {
name := id
_, r := item.GetResource()
if r != nil && r.GetName() != "" {
name = r.GetName()
}
eg[id] = ExecutionGraphItem{
Name: name,
State: StepWaiting,
Deps: deps[id],
WhenRunning: whenRunning[id],
}
}
return eg
}
// MarkRunning transitions the step to StepRunning and records the start time.
// It is a no-op if the step is already beyond StepRunning.
func (eg ExecutionGraph) MarkRunning(itemID string, at time.Time) {
item, ok := eg[itemID]
if !ok || item.State == StepSuccess || item.State == StepFailure {
return
}
item.State = StepRunning
item.StartDate = &at
eg[itemID] = item
}
// MarkDone transitions the step to StepSuccess or StepFailure and records the end time.
func (eg ExecutionGraph) MarkDone(itemID string, success bool, at time.Time) {
item, ok := eg[itemID]
if !ok {
return
}
if success {
item.State = StepSuccess
} else {
item.State = StepFailure
}
item.EndDate = &at
eg[itemID] = item
}
// Depssatisfied returns true when all deps of the given item have reached StepSuccess.
func (eg ExecutionGraph) Depssatisfied(itemID string) bool {
item, ok := eg[itemID]
if !ok {
return false
}
for _, dep := range item.Deps {
depItem, depOk := eg[dep]
if !depOk || depItem.State != StepSuccess {
return false
}
}
return true
}
// ReadyToRun returns the IDs of all steps that are still waiting and whose deps
// are fully satisfied. Useful for the scheduler to decide what to start next.
func (eg ExecutionGraph) ReadyToRun() []string {
ready := []string{}
for id, item := range eg {
if item.State == StepWaiting && eg.Depssatisfied(id) {
ready = append(ready, id)
}
}
return ready
}
func appendUnique(slice []string, val string) []string {
if slices.Contains(slice, val) {
return slice
}
return append(slice, val)
}