// Package infrastructure is the public façade for all scheduling sub-services. // Controllers and main.go import only this package; the sub-packages are // internal implementation details. package infrastructure import ( "fmt" "oc-scheduler/infrastructure/execution" "oc-scheduler/infrastructure/nats" "oc-scheduler/infrastructure/planner" "oc-scheduler/infrastructure/scheduler" "oc-scheduler/infrastructure/scheduling_resources" "oc-scheduler/infrastructure/session" "oc-scheduler/infrastructure/utils" "time" "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" ) // --------------------------------------------------------------------------- // Type re-exports // --------------------------------------------------------------------------- type WorkflowSchedule = scheduler.WorkflowSchedule type CheckResult = scheduler.CheckResult // --------------------------------------------------------------------------- // Bootstrap — called from main.go // --------------------------------------------------------------------------- func ListenNATS() { nats.ListenNATS() } func InitSelfPlanner() { planner.InitPlanner() } func RecoverDraftExecutions() { execution.RecoverDraft() } func WatchExecutions() { execution.WatchExecutions() } // EmitNATS broadcasts a propagation message via NATS. func EmitNATS(peerID string, message tools.PropalgationMessage) { utils.Propalgate(peerID, message) } // --------------------------------------------------------------------------- // Utilities // --------------------------------------------------------------------------- func GetWorkflowPeerIDs(wfID string, req *tools.APIRequest) ([]string, error) { return utils.GetWorkflowPeerIDs(wfID, req) } // --------------------------------------------------------------------------- // Planner subscriptions // --------------------------------------------------------------------------- func SubscribePlannerUpdates(peerIDs []string) (<-chan string, func()) { return planner.GetPlannerService().SubscribePlannerUpdates(peerIDs...) } func SubscribeWorkflowUpdates(wfID string) (<-chan struct{}, func()) { return planner.GetPlannerService().SubscribeWorkflowUpdates(wfID) } func RequestPlannerRefresh(peerIDs []string, executionsID string) []string { return planner.GetPlannerService().Refresh(peerIDs, executionsID) } func ReleaseRefreshOwnership(peerIDs []string, executionsID string) { planner.GetPlannerService().ReleaseRefreshOwnership(peerIDs, executionsID) } // --------------------------------------------------------------------------- // Session management // --------------------------------------------------------------------------- func UpsertSessionDrafts( executionsID string, execs []*workflow_execution.WorkflowExecution, purchases, bookings []scheduling_resources.SchedulerObject, req *tools.APIRequest, ) { svc := session.NewSessionExecutionsService(executionsID) svc.UpsertSessionDrafts(purchases, bookings, execs, req) } func CleanupSession(executionsID string, req *tools.APIRequest) { svc := session.NewSessionExecutionsService(executionsID) svc.CleanupSession(req) } func UnscheduleExecution(executionID string, req *tools.APIRequest) error { return execution.Unschedule(executionID, req) } // --------------------------------------------------------------------------- // Schedule confirmation // --------------------------------------------------------------------------- func Schedule( ws *WorkflowSchedule, wfID string, req *tools.APIRequest, ) (*WorkflowSchedule, *workflow.Workflow, []*workflow_execution.WorkflowExecution, error) { if req == nil { return ws, nil, nil, fmt.Errorf("no request provided") } if ws.UUID == "" { return ws, nil, nil, fmt.Errorf("no scheduling session: use the Check stream first") } svc := session.NewSessionExecutionsService(ws.UUID) executions := svc.LoadSessionExecs() for _, exec := range executions { if !exec.ExecDate.IsZero() && exec.ExecDate.Before(time.Now().UTC()) { return ws, nil, nil, fmt.Errorf("execution %s is obsolete (start date in the past)", exec.GetID()) } } if err := svc.ConfirmSession(req); err != nil { return ws, nil, nil, fmt.Errorf("confirm session failed: %w", err) } for _, exec := range executions { go execution.WatchDeadline(exec.GetID(), exec.ExecutionsID, exec.ExecDate, req) } adminReq := &tools.APIRequest{Admin: true} obj, _, _ := workflow.NewAccessor(req).LoadOne(wfID) if obj == nil { return ws, nil, executions, nil } wf := obj.(*workflow.Workflow) ws.Workflow = wf ws.WorkflowExecution = executions wf.GetAccessor(adminReq).UpdateOne(wf.Serialize(wf), wf.GetID()) return ws, wf, executions, nil }