package execution import ( "context" "encoding/json" "fmt" "oc-scheduler/conf" "oc-scheduler/infrastructure/planner" "oc-scheduler/infrastructure/scheduling_resources" infUtils "oc-scheduler/infrastructure/utils" "strings" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/order" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" "go.mongodb.org/mongo-driver/bson/primitive" ) // --------------------------------------------------------------------------- // Global execution lock registry // --------------------------------------------------------------------------- var execLocksMu sync.RWMutex var execLocks = map[string]*sync.Mutex{} func RegisterExecLock(executionID string) { execLocksMu.Lock() execLocks[executionID] = &sync.Mutex{} execLocksMu.Unlock() } func UnregisterExecLock(executionID string) { execLocksMu.Lock() delete(execLocks, executionID) execLocksMu.Unlock() } func GetExecLock(executionID string) *sync.Mutex { execLocksMu.RLock() mu := execLocks[executionID] execLocksMu.RUnlock() return mu } // --------------------------------------------------------------------------- // Considers payload // --------------------------------------------------------------------------- type ConsidersPayload struct { ID string `json:"id"` ExecutionsID string `json:"executions_id"` ExecutionID string `json:"execution_id"` PeerIDs []string `json:"peer_ids"` } // --------------------------------------------------------------------------- // Execution state machine — considers // --------------------------------------------------------------------------- func UpdateExecutionState(payload []byte, dt tools.DataType) { var data ConsidersPayload if err := json.Unmarshal(payload, &data); err != nil || data.ID == "" { return } schdata := oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil).LoadOne(data.ID) if schdata.Data == nil { return } sch := scheduling_resources.ToSchedulerObject(dt, schdata.Data) if sch == nil { return } execID := sch.GetExecutionId() mu := GetExecLock(execID) if mu == nil { fmt.Printf("UpdateExecutionState: no lock for execution %s, skipping\n", execID) return } mu.Lock() defer mu.Unlock() adminReq := &tools.APIRequest{Admin: true} res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(execID) if err != nil || res == nil { fmt.Printf("UpdateExecutionState: could not load execution %s: %v\n", execID, err) return } exec := res.(*workflow_execution.WorkflowExecution) switch dt { case tools.BOOKING: if exec.BookingsState == nil { exec.BookingsState = map[string]bool{} } exec.BookingsState[data.ID] = true case tools.PURCHASE_RESOURCE: if exec.PurchasesState == nil { exec.PurchasesState = map[string]bool{} } exec.PurchasesState[data.ID] = true } allConfirmed := true for _, st := range exec.BookingsState { if !st { allConfirmed = false break } } if allConfirmed { for _, st := range exec.PurchasesState { if !st { allConfirmed = false break } } } if allConfirmed { exec.State = enum.SCHEDULED exec.IsDraft = false } if _, _, err := utils.GenericRawUpdateOne(exec, exec.GetID(), workflow_execution.NewAccessor(adminReq)); err != nil { fmt.Printf("UpdateExecutionState: could not update execution %s: %v\n", execID, err) return } if allConfirmed { go confirmSessionOrder(exec.ExecutionsID, adminReq) obj, _, err := workflow.NewAccessor(adminReq).LoadOne(exec.WorkflowID) if err == nil && obj != nil { go EmitConsidersExecution(exec, obj.(*workflow.Workflow)) } } } func confirmSessionOrder(executionsID string, adminReq *tools.APIRequest) { results, _, _ := order.NewAccessor(adminReq).Search( &dbs.Filters{And: map[string][]dbs.Filter{ "executions_id": {{Operator: dbs.EQUAL.String(), Value: executionsID}}, }}, "", true) for _, obj := range results { if o, ok := obj.(*order.Order); ok { o.IsDraft = false utils.GenericRawUpdateOne(o, o.GetID(), order.NewAccessor(adminReq)) } } } func ConfirmExecutionDrafts(payload []byte) { var data ConsidersPayload if err := json.Unmarshal(payload, &data); err != nil { fmt.Printf("ConfirmExecutionDrafts: could not parse payload: %v\n", err) return } d := oclib.NewRequestAdmin(oclib.LibDataEnum(tools.WORKFLOW_EXECUTION), nil).LoadOne(data.ExecutionID) if exec := d.ToWorkflowExecution(); exec != nil { for id := range exec.BookingsState { go scheduling_resources.Confirm(id, tools.BOOKING) } for id := range exec.PurchasesState { go scheduling_resources.Confirm(id, tools.PURCHASE_RESOURCE) } } } func EmitConsidersExecution(exec *workflow_execution.WorkflowExecution, wf *workflow.Workflow) { if wf == nil || wf.Graph == nil { return } peerIDs, err := infUtils.GetWorkflowPeerIDs(wf.GetID(), &tools.APIRequest{Admin: true}) if err != nil || len(peerIDs) == 0 { return } payload, err := json.Marshal(ConsidersPayload{ ID: exec.GetID(), ExecutionID: exec.GetID(), ExecutionsID: exec.ExecutionsID, PeerIDs: peerIDs, }) if err != nil { return } b, err := json.Marshal(tools.PropalgationMessage{ DataType: int(tools.WORKFLOW_EXECUTION), Action: tools.PB_CONSIDERS, Payload: payload, }) if err != nil { return } tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-scheduler", Datatype: tools.WORKFLOW_EXECUTION, Method: int(tools.PROPALGATION_EVENT), Payload: b, }) } // --------------------------------------------------------------------------- // Deadline watchers // --------------------------------------------------------------------------- func WatchDeadline(executionID string, ns string, execDate time.Time, request *tools.APIRequest) { delay := time.Until(execDate.UTC().Add(-1 * time.Minute)) if delay <= 0 { go handleDeadline(executionID, ns, request) return } time.AfterFunc(delay, func() { handleDeadline(executionID, ns, request) }) } func handleDeadline(executionID string, ns string, request *tools.APIRequest) { res, _, err := workflow_execution.NewAccessor(&tools.APIRequest{Admin: true}).LoadOne(executionID) if err != nil || res == nil { fmt.Printf("handleDeadline: execution %s not found\n", executionID) return } adminReq := &tools.APIRequest{Admin: true} exec := res.(*workflow_execution.WorkflowExecution) if exec.IsDraft { Unschedule(executionID, request) workflow_execution.NewAccessor(adminReq).DeleteOne(executionID) fmt.Printf("handleDeadline: purged draft execution %s\n", executionID) return } if serv, err := tools.NewKubernetesService( conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData); err != nil { fmt.Printf("handleDeadline: k8s init failed for %s: %v\n", executionID, err) } else if err := serv.ProvisionExecutionNamespace(context.Background(), ns); err != nil && !strings.Contains(err.Error(), "already exists") { fmt.Printf("handleDeadline: failed to provision namespace %s: %v\n", ns, err) } go watchEnd(executionID, ns, exec.EndDate, exec.ExecDate) } func watchEnd(executionID string, ns string, endDate *time.Time, execDate time.Time) { var end time.Time if endDate != nil { end = *endDate } else { end = execDate.UTC().Add(5 * time.Minute) } fire := func() { serv, err := tools.NewKubernetesService( conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData) if err != nil { fmt.Printf("watchEnd: k8s init failed for %s: %v\n", executionID, err) return } if err := serv.TeardownExecutionNamespace(context.Background(), ns); err != nil { fmt.Printf("watchEnd: failed to teardown namespace %s: %v\n", ns, err) } } if delay := time.Until(end.UTC()); delay <= 0 { go fire() } else { time.AfterFunc(delay, fire) } } // --------------------------------------------------------------------------- // Unschedule / Recovery // --------------------------------------------------------------------------- func Unschedule(executionID string, request *tools.APIRequest) error { adminReq := &tools.APIRequest{Admin: true} res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(executionID) if err != nil || res == nil { return fmt.Errorf("execution %s not found: %w", executionID, err) } exec := res.(*workflow_execution.WorkflowExecution) for _, byResource := range exec.PeerBookByGraph { for _, bookingIDs := range byResource { for _, bkID := range bookingIDs { bkRes, _, loadErr := booking.NewAccessor(adminReq).LoadOne(bkID) if loadErr != nil || bkRes == nil { continue } scheduling_resources.GetService().Delete( tools.BOOKING, scheduling_resources.ToSchedulerObject(tools.BOOKING, bkRes), request, ) } } } workflow_execution.NewAccessor(adminReq).DeleteOne(executionID) UnregisterExecLock(executionID) return nil } func RecoverDraft() { adminReq := &tools.APIRequest{Admin: true} results, _, _ := workflow_execution.NewAccessor(adminReq).Search(nil, "*", true) for _, obj := range results { exec, ok := obj.(*workflow_execution.WorkflowExecution) if !ok { continue } RegisterExecLock(exec.GetID()) go WatchDeadline(exec.GetID(), exec.ExecutionsID, exec.ExecDate, adminReq) } fmt.Printf("RecoverDraft: recovered %d executions\n", len(results)) } // --------------------------------------------------------------------------- // NATS workflow lifecycle handlers // --------------------------------------------------------------------------- func HandleWorkflowStarted(resp tools.NATSResponse) { var evt tools.WorkflowLifecycleEvent if err := json.Unmarshal(resp.Payload, &evt); err != nil { return } adminReq := &tools.APIRequest{Admin: true} res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(evt.ExecutionID) if err != nil || res == nil { return } exec := res.(*workflow_execution.WorkflowExecution) exec.State = enum.STARTED if evt.RealStart != nil { exec.ExecDate = *evt.RealStart } utils.GenericRawUpdateOne(exec, exec.GetID(), workflow_execution.NewAccessor(adminReq)) } func HandleWorkflowDone(resp tools.NATSResponse) { var evt tools.WorkflowLifecycleEvent if err := json.Unmarshal(resp.Payload, &evt); err != nil { return } adminReq := &tools.APIRequest{Admin: true} res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(evt.ExecutionID) if err != nil || res == nil { return } exec := res.(*workflow_execution.WorkflowExecution) exec.State = enum.BookingStatus(evt.State) if evt.RealEnd != nil { exec.EndDate = evt.RealEnd } utils.GenericRawUpdateOne(exec, exec.GetID(), workflow_execution.NewAccessor(adminReq)) for _, step := range evt.Steps { applyStepToBooking(step, adminReq) } self, err := oclib.GetMySelf() if err == nil && self != nil { go planner.GetPlannerService().RefreshSelf(self.PeerID, adminReq) } } func HandleWorkflowStepDone(resp tools.NATSResponse) { var evt tools.WorkflowLifecycleEvent if err := json.Unmarshal(resp.Payload, &evt); err != nil || evt.BookingID == "" { return } adminReq := &tools.APIRequest{Admin: true} res, _, err := booking.NewAccessor(adminReq).LoadOne(evt.BookingID) if err != nil || res == nil { return } bk := res.(*booking.Booking) bk.State = enum.BookingStatus(evt.State) if evt.RealStart != nil { bk.RealStartDate = evt.RealStart } if evt.RealEnd != nil { bk.RealEndDate = evt.RealEnd } utils.GenericRawUpdateOne(bk, bk.GetID(), booking.NewAccessor(adminReq)) switch bk.State { case enum.SUCCESS, enum.FAILURE, enum.FORGOTTEN, enum.CANCELLED: self, err := oclib.GetMySelf() if err == nil && self != nil { go planner.GetPlannerService().RefreshSelf(self.PeerID, adminReq) } } } func applyStepToBooking(step tools.StepMetric, adminReq *tools.APIRequest) { res, _, err := booking.NewAccessor(adminReq).LoadOne(step.BookingID) if err != nil || res == nil { return } bk := res.(*booking.Booking) switch bk.State { case enum.SUCCESS, enum.FAILURE, enum.FORGOTTEN, enum.CANCELLED: return } bk.State = enum.BookingStatus(step.State) if step.RealStart != nil { bk.RealStartDate = step.RealStart } if step.RealEnd != nil { bk.RealEndDate = step.RealEnd } utils.GenericRawUpdateOne(bk, bk.GetID(), booking.NewAccessor(adminReq)) } // --------------------------------------------------------------------------- // Watchdog — stale execution safety net // --------------------------------------------------------------------------- var processedExecutions sync.Map var terminalExecStates = map[enum.BookingStatus]bool{ enum.SUCCESS: true, enum.FAILURE: true, enum.FORGOTTEN: true, enum.CANCELLED: true, } func WatchExecutions() { logger := oclib.GetLogger() logger.Info().Msg("ExecutionWatchdog: started") ticker := time.NewTicker(time.Minute) defer ticker.Stop() for range ticker.C { if err := scanStaleExecutions(); err != nil { logger.Error().Msg("ExecutionWatchdog: " + err.Error()) } } } func scanStaleExecutions() error { myself, err := oclib.GetMySelf() if err != nil { return fmt.Errorf("could not resolve local peer: %w", err) } deadline := time.Now().UTC().Add(-time.Minute) res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", myself.GetID(), []string{}, nil). Search(&dbs.Filters{And: map[string][]dbs.Filter{ "execution_date": {{Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(deadline)}}, }}, "", false) if res.Err != "" { return fmt.Errorf("stale execution search failed: %s", res.Err) } for _, dbo := range res.Data { if exec, ok := dbo.(*workflow_execution.WorkflowExecution); ok { go emitExecutionFailure(exec) } } return nil } func emitExecutionFailure(exec *workflow_execution.WorkflowExecution) { logger := oclib.GetLogger() if _, done := processedExecutions.Load(exec.GetID()); done { return } if terminalExecStates[exec.State] { processedExecutions.Store(exec.GetID(), struct{}{}) return } now := time.Now().UTC() steps := make([]tools.StepMetric, 0) for _, byGraph := range exec.PeerBookByGraph { for _, bookingIDs := range byGraph { for _, bookingID := range bookingIDs { payload, err := json.Marshal(tools.WorkflowLifecycleEvent{ ExecutionID: exec.GetID(), ExecutionsID: exec.ExecutionsID, BookingID: bookingID, State: enum.FAILURE.EnumIndex(), RealEnd: &now, }) if err != nil { continue } tools.NewNATSCaller().SetNATSPub(tools.WORKFLOW_STEP_DONE_EVENT, tools.NATSResponse{ FromApp: "oc-scheduler-watchdog", Method: int(tools.WORKFLOW_STEP_DONE_EVENT), Payload: payload, }) steps = append(steps, tools.StepMetric{ BookingID: bookingID, State: enum.FAILURE.EnumIndex(), RealEnd: &now, }) } } } donePayload, err := json.Marshal(tools.WorkflowLifecycleEvent{ ExecutionID: exec.GetID(), ExecutionsID: exec.ExecutionsID, State: enum.FAILURE.EnumIndex(), RealEnd: &now, Steps: steps, }) if err == nil { tools.NewNATSCaller().SetNATSPub(tools.WORKFLOW_DONE_EVENT, tools.NATSResponse{ FromApp: "oc-scheduler-watchdog", Method: int(tools.WORKFLOW_DONE_EVENT), Payload: donePayload, }) } logger.Info().Msgf("ExecutionWatchdog: execution %s stale → emitting FAILURE (%d bookings)", exec.GetID(), len(steps)) processedExecutions.Store(exec.GetID(), struct{}{}) }