From 883c0bec3d87cb6c723c7e243614ec91cd627d83 Mon Sep 17 00:00:00 2001 From: mr Date: Thu, 16 Apr 2026 15:19:36 +0200 Subject: [PATCH] graph --- models/workflow_execution/execution_graph.go | 168 ++++++++++++++++++ .../workflow_execution/workflow_execution.go | 5 + 2 files changed, 173 insertions(+) create mode 100644 models/workflow_execution/execution_graph.go diff --git a/models/workflow_execution/execution_graph.go b/models/workflow_execution/execution_graph.go new file mode 100644 index 0000000..550e133 --- /dev/null +++ b/models/workflow_execution/execution_graph.go @@ -0,0 +1,168 @@ +package workflow_execution + +import ( + "slices" + "time" + + workflowgraph "cloud.o-forge.io/core/oc-lib/models/workflow/graph" +) + +// ExecutionStepState is the runtime state of a single step in the execution graph. +type ExecutionStepState string + +const ( + StepWaiting ExecutionStepState = "waiting" + StepRunning ExecutionStepState = "running" + StepSuccess ExecutionStepState = "success" + StepFailure ExecutionStepState = "failure" +) + +// ExecutionGraphItem is the summarized view of one node in the workflow execution graph. +// +// - Name : human-readable label (resource name or item ID as fallback) +// - StartDate : set when the step transitions to StepRunning +// - EndDate : set when the step transitions to StepSuccess or StepFailure +// - State : current lifecycle state of the step +// - Deps : itemIDs that must reach StepSuccess before this step can start +// - WhenRunning : itemIDs (resources) that become active while this step is running +// (e.g. the compute node executing it, the storage it reads/writes) +type ExecutionGraphItem struct { + Name string `json:"name" bson:"name"` + StartDate *time.Time `json:"start_date,omitempty" bson:"start_date,omitempty"` + EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` + State ExecutionStepState `json:"state" bson:"state"` + Deps []string `json:"deps,omitempty" bson:"deps,omitempty"` + WhenRunning []string `json:"when_running,omitempty" bson:"when_running,omitempty"` +} + +// ExecutionGraph is a flat, scheduler-friendly summary of a workflow execution graph. +// The map key is the workflow graph item ID. +type ExecutionGraph map[string]ExecutionGraphItem + +// BuildExecutionGraph derives an initial ExecutionGraph (all steps in StepWaiting) +// from a workflow graph. It infers: +// - Deps : predecessor item IDs based on link direction +// - WhenRunning : sibling item IDs connected to a step by a link +// (i.e. resources that are co-active when the step runs) +func BuildExecutionGraph(g *workflowgraph.Graph) ExecutionGraph { + if g == nil { + return ExecutionGraph{} + } + + // deps[dst] = list of src item IDs that dst depends on + deps := map[string][]string{} + // whenRunning[id] = list of item IDs active while id is running + whenRunning := map[string][]string{} + + for _, link := range g.Links { + src := link.Source.ID + dst := link.Destination.ID + if src == "" || dst == "" { + continue + } + + srcItem, srcOk := g.Items[src] + dstItem, dstOk := g.Items[dst] + if !srcOk || !dstOk { + continue + } + + // Steps (logical nodes that sequence execution): Data, Processing, Workflow, NativeTool. + // Resources (infrastructure co-active while a step runs): Compute, Storage. + srcIsStep := srcItem.Data != nil || srcItem.Processing != nil || srcItem.Workflow != nil || srcItem.NativeTool != nil + dstIsStep := dstItem.Data != nil || dstItem.Processing != nil || dstItem.Workflow != nil || dstItem.NativeTool != nil + srcIsResource := srcItem.Compute != nil || srcItem.Storage != nil + dstIsResource := dstItem.Compute != nil || dstItem.Storage != nil + + switch { + case srcIsStep && dstIsStep: + // Sequential dependency: dst must wait for src to succeed. + deps[dst] = appendUnique(deps[dst], src) + + case srcIsStep && dstIsResource: + // src activates dst (compute/storage) while running. + whenRunning[src] = appendUnique(whenRunning[src], dst) + + case srcIsResource && dstIsStep: + // dst uses src (compute/storage) while running. + whenRunning[dst] = appendUnique(whenRunning[dst], src) + } + } + + eg := ExecutionGraph{} + for id, item := range g.Items { + name := id + _, r := item.GetResource() + if r != nil && r.GetName() != "" { + name = r.GetName() + } + eg[id] = ExecutionGraphItem{ + Name: name, + State: StepWaiting, + Deps: deps[id], + WhenRunning: whenRunning[id], + } + } + return eg +} + +// MarkRunning transitions the step to StepRunning and records the start time. +// It is a no-op if the step is already beyond StepRunning. +func (eg ExecutionGraph) MarkRunning(itemID string, at time.Time) { + item, ok := eg[itemID] + if !ok || item.State == StepSuccess || item.State == StepFailure { + return + } + item.State = StepRunning + item.StartDate = &at + eg[itemID] = item +} + +// MarkDone transitions the step to StepSuccess or StepFailure and records the end time. +func (eg ExecutionGraph) MarkDone(itemID string, success bool, at time.Time) { + item, ok := eg[itemID] + if !ok { + return + } + if success { + item.State = StepSuccess + } else { + item.State = StepFailure + } + item.EndDate = &at + eg[itemID] = item +} + +// Depssatisfied returns true when all deps of the given item have reached StepSuccess. +func (eg ExecutionGraph) Depssatisfied(itemID string) bool { + item, ok := eg[itemID] + if !ok { + return false + } + for _, dep := range item.Deps { + depItem, depOk := eg[dep] + if !depOk || depItem.State != StepSuccess { + return false + } + } + return true +} + +// ReadyToRun returns the IDs of all steps that are still waiting and whose deps +// are fully satisfied. Useful for the scheduler to decide what to start next. +func (eg ExecutionGraph) ReadyToRun() []string { + ready := []string{} + for id, item := range eg { + if item.State == StepWaiting && eg.Depssatisfied(id) { + ready = append(ready, id) + } + } + return ready +} + +func appendUnique(slice []string, val string) []string { + if slices.Contains(slice, val) { + return slice + } + return append(slice, val) +} diff --git a/models/workflow_execution/workflow_execution.go b/models/workflow_execution/workflow_execution.go index 92fda7e..ea3d1d3 100755 --- a/models/workflow_execution/workflow_execution.go +++ b/models/workflow_execution/workflow_execution.go @@ -47,6 +47,11 @@ type WorkflowExecution struct { BookingsState map[string]BookingState `json:"bookings_state" bson:"bookings_state,omitempty"` // booking_id → reservation+completion status PurchasesState map[string]bool `json:"purchases_state" bson:"purchases_state,omitempty"` // purchase_id → confirmed + // Graph is a lightweight, real-time summary of the workflow execution graph. + // Keyed by workflow graph item ID; updated by oc-scheduler on each step-done event. + // Consumed by oc-front to render the live execution panel via websocket updates. + Graph ExecutionGraph `json:"graph,omitempty" bson:"graph,omitempty"` + SelectedInstances workflow.ConfigItem `json:"selected_instances"` SelectedPartnerships workflow.ConfigItem `json:"selected_partnerships"` SelectedBuyings workflow.ConfigItem `json:"selected_buyings"`