package infrastructure import ( "context" "encoding/json" "fmt" "oc-scheduler/conf" "slices" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/booking/planner" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workflow/graph" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" "github.com/nats-io/nats.go" ) const plannerTTL = 24 * time.Hour // --------------------------------------------------------------------------- // Planner cache — protected by plannerMu // --------------------------------------------------------------------------- var plannerMu sync.RWMutex var PlannerCache = map[string]*planner.Planner{} var plannerAddedAt = map[string]time.Time{} // peerID → first-seen timestamp // --------------------------------------------------------------------------- // Subscriber registries — one keyed by peerID, one by workflowID // --------------------------------------------------------------------------- var subsMu sync.RWMutex var plannerSubs = map[string][]chan struct{}{} // peerID → notification channels var workflowSubs = map[string][]chan struct{}{} // workflowID → notification channels // SubscribePlannerUpdates registers interest in planner changes for the given // peer IDs. The returned channel receives one struct{} (non-blocking) each time // any of those planners is updated. Call cancel to unregister. func SubscribePlannerUpdates(peerIDs []string) (<-chan struct{}, func()) { return subscribe(&subsMu, plannerSubs, peerIDs) } // SubscribeWorkflowUpdates registers interest in workflow modifications for the // given workflow ID. The returned channel is signalled when the workflow changes // (peer list may have grown or shrunk). Call cancel to unregister. func SubscribeWorkflowUpdates(wfID string) (<-chan struct{}, func()) { ch, cancel := subscribe(&subsMu, workflowSubs, []string{wfID}) return ch, cancel } // subscribe is the generic helper used by both registries. func subscribe(mu *sync.RWMutex, registry map[string][]chan struct{}, keys []string) (<-chan struct{}, func()) { ch := make(chan struct{}, 1) mu.Lock() for _, k := range keys { registry[k] = append(registry[k], ch) } mu.Unlock() cancel := func() { mu.Lock() for _, k := range keys { subs := registry[k] for i, s := range subs { if s == ch { registry[k] = append(subs[:i], subs[i+1:]...) break } } } mu.Unlock() } return ch, cancel } func notifyPlannerWatchers(peerID string) { notify(&subsMu, plannerSubs, peerID) } func notifyWorkflowWatchers(wfID string) { notify(&subsMu, workflowSubs, wfID) } func notify(mu *sync.RWMutex, registry map[string][]chan struct{}, key string) { mu.RLock() subs := registry[key] mu.RUnlock() for _, ch := range subs { select { case ch <- struct{}{}: default: } } } // --------------------------------------------------------------------------- // Cache helpers // --------------------------------------------------------------------------- // storePlanner inserts or updates a planner for peerID. // On first insertion it schedules an automatic eviction after plannerTTL. // All subscribers interested in this peer are notified. func storePlanner(peerID string, p *planner.Planner) { plannerMu.Lock() isNew := PlannerCache[peerID] == nil PlannerCache[peerID] = p if isNew { plannerAddedAt[peerID] = time.Now() go evictAfter(peerID, plannerTTL) } plannerMu.Unlock() notifyPlannerWatchers(peerID) } // evictAfter waits ttl from the first-seen time for peerID then emits a // PB_CLOSE_PLANNER event, which removes the entry from the cache and notifies // NATS. func evictAfter(peerID string, ttl time.Duration) { time.Sleep(ttl) plannerMu.RLock() _, exists := PlannerCache[peerID] plannerMu.RUnlock() if exists { EmitNATS(peerID, tools.PropalgationMessage{Action: tools.PB_CLOSE_PLANNER}) } } // --------------------------------------------------------------------------- // NATS emission // --------------------------------------------------------------------------- func EmitNATS(peerID string, message tools.PropalgationMessage) { if message.Action == tools.PB_CLOSE_PLANNER { plannerMu.Lock() delete(PlannerCache, peerID) delete(plannerAddedAt, peerID) plannerMu.Unlock() notifyPlannerWatchers(peerID) // let streams re-evaluate (will warn "no planner") } b, _ := json.Marshal(message) tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-scheduler", Datatype: -1, Method: int(tools.PROPALGATION_EVENT), Payload: b, }) } type executionConsidersPayload struct { ID string `json:"id"` ExecutionsID string `json:"executions_id"` ExecutionID string `json:"execution_id"` PeerIDs []string `json:"peer_ids"` } // emitConsiders broadcasts a PROPALGATION_EVENT with the Considers action, // carrying the stored resource ID and its datatype (BOOKING or PURCHASE_RESOURCE). func emitConsiders(id string, executionID string, dt tools.DataType) { access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil) data := access.LoadOne(executionID) if data.ToWorkflowExecution() != nil { exec := data.ToWorkflowExecution() if peers, err := GetWorkflowPeerIDs(exec.WorkflowID, &tools.APIRequest{Admin: true}); err == nil { payload, _ := json.Marshal(&executionConsidersPayload{ ID: id, ExecutionsID: exec.ExecutionsID, ExecutionID: executionID, PeerIDs: peers, }) b, _ := json.Marshal(tools.PropalgationMessage{ DataType: int(dt), Action: tools.PB_CONSIDERS, Payload: payload, }) tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-scheduler", Datatype: dt, Method: int(tools.PROPALGATION_EVENT), Payload: b, }) } } } // EmitConsidersExecution broadcasts a Considers / WORKFLOW_EXECUTION message to all // storage and compute peers of wf once the execution has transitioned to SCHEDULED. // Each receiving peer will use it to confirm (IsDraft=false) their local drafts. func EmitConsidersExecution(exec *workflow_execution.WorkflowExecution, wf *workflow.Workflow) { if wf == nil || wf.Graph == nil { return } peerIDs, err := GetWorkflowPeerIDs(wf.GetID(), &tools.APIRequest{Admin: true}) if err != nil { return } if len(peerIDs) == 0 { return } payload, err := json.Marshal(executionConsidersPayload{ ID: exec.GetID(), ExecutionID: exec.GetID(), ExecutionsID: exec.ExecutionsID, PeerIDs: peerIDs}) if err != nil { return } b, err := json.Marshal(tools.PropalgationMessage{ DataType: int(tools.WORKFLOW_EXECUTION), Action: tools.PB_CONSIDERS, Payload: payload, }) if err != nil { return } tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-scheduler", Datatype: tools.WORKFLOW_EXECUTION, Method: int(tools.PROPALGATION_EVENT), Payload: b, }) } // updateExecutionState sets BookingsState[id]=true (dt==BOOKING) or // PurchasesState[id]=true (dt==PURCHASE_RESOURCE) on the target execution. // payload must be JSON-encoded {"id":"...", "execution_id":"..."}. func updateExecutionState(payload []byte, dt tools.DataType) { var data executionConsidersPayload if err := json.Unmarshal(payload, &data); err != nil || data.ID == "" || data.ExecutionID == "" { return } adminReq := &tools.APIRequest{Admin: true} res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(data.ExecutionID) if err != nil || res == nil { fmt.Printf("updateExecutionState: could not load execution %s: %v\n", data.ExecutionID, err) return } exec := res.(*workflow_execution.WorkflowExecution) switch dt { case tools.BOOKING: if exec.BookingsState == nil { exec.BookingsState = map[string]bool{} } exec.BookingsState[data.ID] = true case tools.PURCHASE_RESOURCE: if exec.PurchasesState == nil { exec.PurchasesState = map[string]bool{} } exec.PurchasesState[data.ID] = true } found := true for _, st := range exec.BookingsState { if !st { found = false break } } for _, st := range exec.PurchasesState { if !st { found = false break } } if found { exec.State = enum.SCHEDULED } if _, _, err := utils.GenericRawUpdateOne(exec, data.ExecutionID, workflow_execution.NewAccessor(adminReq)); err != nil { fmt.Printf("updateExecutionState: could not update execution %s: %v\n", data.ExecutionID, err) } } // confirmExecutionDrafts is called when a Considers/WORKFLOW_EXECUTION message // is received from oc-discovery, meaning the originating peer has confirmed the // execution as SCHEDULED. For every booking and purchase ID listed in the // execution's states, we confirm the local draft (IsDraft=false). func confirmExecutionDrafts(payload []byte) { var data executionConsidersPayload if err := json.Unmarshal(payload, &data); err != nil { fmt.Printf("confirmExecutionDrafts: could not parse payload: %v\n", err) return } access := oclib.NewRequestAdmin(oclib.LibDataEnum(tools.WORKFLOW_EXECUTION), nil) d := access.LoadOne(data.ExecutionID) if exec := d.ToWorkflowExecution(); exec != nil { for id := range exec.BookingsState { go confirmResource(id, tools.BOOKING) } for id := range exec.PurchasesState { go confirmResource(id, tools.PURCHASE_RESOURCE) } } } // --------------------------------------------------------------------------- // NATS listeners // --------------------------------------------------------------------------- func ListenNATS() { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ // Receive planner snapshots pushed by oc-discovery and cache them. // Considers messages: // BOOKING / PURCHASE_RESOURCE → mark the individual resource as // considered in the target WorkflowExecution (BookingsState / PurchasesState). // WORKFLOW_EXECUTION → the execution reached SCHEDULED; confirm all // local draft bookings and purchases listed in its states. tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) { if resp.FromApp != "oc-discovery" { return } var prop tools.PropalgationMessage if err := json.Unmarshal(resp.Payload, &prop); err != nil { return } switch prop.Action { case tools.PB_PLANNER: m := map[string]interface{}{} p := planner.Planner{} if err := json.Unmarshal(prop.Payload, &m); err != nil { return } if err := json.Unmarshal(prop.Payload, &p); err != nil { return } storePlanner(fmt.Sprintf("%v", m["peer_id"]), &p) case tools.PB_CONSIDERS: switch tools.DataType(prop.DataType) { case tools.BOOKING, tools.PURCHASE_RESOURCE: updateExecutionState(prop.Payload, tools.DataType(prop.DataType)) case tools.WORKFLOW_EXECUTION: confirmExecutionDrafts(prop.Payload) } } }, // Incoming resource creation events: // - WORKFLOW → refresh peer planner entries and notify CheckStream watchers. // - BOOKING → if destined for us, validate, store as draft, start 10-min // expiry timer, and emit a "considers_booking" response. // - PURCHASE → if destined for us, store as draft, start 10-min expiry // timer, and emit a "considers_purchase" response. tools.REMOVE_RESOURCE: func(resp tools.NATSResponse) { switch resp.Datatype { case tools.WORKFLOW: wf := workflow.Workflow{} if err := json.Unmarshal(resp.Payload, &wf); err != nil { return } notifyWorkflowWatchers(wf.GetID()) } }, tools.CREATE_RESOURCE: func(resp tools.NATSResponse) { switch resp.Datatype { case tools.WORKFLOW: wf := workflow.Workflow{} if err := json.Unmarshal(resp.Payload, &wf); err != nil { return } broadcastPlanner(&wf) notifyWorkflowWatchers(wf.GetID()) case tools.BOOKING: var bk booking.Booking if err := json.Unmarshal(resp.Payload, &bk); err != nil { return } self, err := oclib.GetMySelf() if err != nil || self == nil || bk.DestPeerID != self.GetID() { return } // Reject bookings whose start date is already in the past. if !bk.ExpectedStartDate.IsZero() && bk.ExpectedStartDate.Before(time.Now()) { fmt.Println("ListenNATS: booking start date is in the past, discarding") return } // Verify the slot is free in our planner (if we have one). plannerMu.RLock() p := PlannerCache[self.PeerID] plannerMu.RUnlock() if p != nil && !checkInstance(p, bk.ResourceID, bk.InstanceID, bk.ExpectedStartDate, bk.ExpectedEndDate) { fmt.Println("ListenNATS: booking conflicts with local planner, discarding") return } adminReq := &tools.APIRequest{Admin: true} bk.IsDraft = true stored, _, err := booking.NewAccessor(adminReq).StoreOne(&bk) if err != nil { fmt.Println("ListenNATS: could not store booking:", err) return } storedID := stored.GetID() go refreshSelfPlanner(self.PeerID, adminReq) time.AfterFunc(10*time.Minute, func() { draftTimeout(storedID, tools.BOOKING) }) go emitConsiders(storedID, stored.(*booking.Booking).ExecutionID, tools.BOOKING) case tools.PURCHASE_RESOURCE: var pr purchase_resource.PurchaseResource if err := json.Unmarshal(resp.Payload, &pr); err != nil { return } self, err := oclib.GetMySelf() if err != nil || self == nil || pr.DestPeerID != self.GetID() { return } adminReq := &tools.APIRequest{Admin: true} pr.IsDraft = true stored, _, err := purchase_resource.NewAccessor(adminReq).StoreOne(&pr) if err != nil { fmt.Println("ListenNATS: could not store purchase:", err) return } storedID := stored.GetID() time.AfterFunc(10*time.Minute, func() { draftTimeout(storedID, tools.PURCHASE_RESOURCE) }) go emitConsiders(storedID, stored.(*purchase_resource.PurchaseResource).ExecutionID, tools.PURCHASE_RESOURCE) } }, }) } // --------------------------------------------------------------------------- // Draft timeout // --------------------------------------------------------------------------- // draftTimeout deletes a booking or purchase resource if it is still a draft // after the 10-minute confirmation window has elapsed. func draftTimeout(id string, dt tools.DataType) { adminReq := &tools.APIRequest{Admin: true} var res utils.DBObject var loadErr error switch dt { case tools.BOOKING: res, _, loadErr = booking.NewAccessor(adminReq).LoadOne(id) case tools.PURCHASE_RESOURCE: res, _, loadErr = purchase_resource.NewAccessor(adminReq).LoadOne(id) default: return } if loadErr != nil || res == nil || !res.IsDrafted() { return } switch dt { case tools.BOOKING: booking.NewAccessor(adminReq).DeleteOne(id) case tools.PURCHASE_RESOURCE: purchase_resource.NewAccessor(adminReq).DeleteOne(id) } fmt.Printf("draftTimeout: %s %s deleted (still draft after 10 min)\n", dt.String(), id) } // --------------------------------------------------------------------------- // Confirm channels // --------------------------------------------------------------------------- // confirmResource sets IsDraft=false for a booking or purchase resource. // For bookings it also advances State to SCHEDULED and refreshes the local planner. func confirmResource(id string, dt tools.DataType) { adminReq := &tools.APIRequest{Admin: true} switch dt { case tools.BOOKING: res, _, err := booking.NewAccessor(adminReq).LoadOne(id) if err != nil || res == nil { fmt.Printf("confirmResource: could not load booking %s: %v\n", id, err) return } bk := res.(*booking.Booking) bk.IsDraft = false bk.State = enum.SCHEDULED if _, _, err := utils.GenericRawUpdateOne(bk, id, booking.NewAccessor(adminReq)); err != nil { fmt.Printf("confirmResource: could not confirm booking %s: %v\n", id, err) return } createNamespace(bk.ExecutionsID) // create Namespace locally self, err := oclib.GetMySelf() if err == nil && self != nil { go refreshSelfPlanner(self.PeerID, adminReq) } case tools.PURCHASE_RESOURCE: res, _, err := purchase_resource.NewAccessor(adminReq).LoadOne(id) if err != nil || res == nil { fmt.Printf("confirmResource: could not load purchase %s: %v\n", id, err) return } pr := res.(*purchase_resource.PurchaseResource) pr.IsDraft = false if _, _, err := utils.GenericRawUpdateOne(pr, id, purchase_resource.NewAccessor(adminReq)); err != nil { fmt.Printf("confirmResource: could not confirm purchase %s: %v\n", id, err) } } } // listenConfirmChannel subscribes to a NATS subject and calls confirmResource // for each message received. The message body is expected to be the plain // resource ID (UTF-8 string). func listenConfirmChannel(nc *nats.Conn, subject string, dt tools.DataType, wg *sync.WaitGroup) { defer wg.Done() ch := make(chan *nats.Msg, 64) sub, err := nc.ChanSubscribe(subject, ch) if err != nil { fmt.Printf("listenConfirmChannel: could not subscribe to %s: %v\n", subject, err) return } defer sub.Unsubscribe() for msg := range ch { confirmResource(string(msg.Data), dt) } } // ListenConfirm opens a direct NATS connection and subscribes to the hardcoded // "confirm_booking" and "confirm_purchase" subjects. It reconnects automatically // if the connection is lost. func ListenConfirm() { natsURL := config.GetConfig().NATSUrl if natsURL == "" { fmt.Println("ListenConfirm: NATS_SERVER not set, skipping confirm listeners") return } for { nc, err := nats.Connect(natsURL) if err != nil { fmt.Println("ListenConfirm: could not connect to NATS:", err) time.Sleep(time.Minute) continue } var wg sync.WaitGroup wg.Add(2) go listenConfirmChannel(nc, "confirm_booking", tools.BOOKING, &wg) go listenConfirmChannel(nc, "confirm_purchase", tools.PURCHASE_RESOURCE, &wg) wg.Wait() nc.Close() } } // --------------------------------------------------------------------------- // Self-planner initialisation // --------------------------------------------------------------------------- // InitSelfPlanner bootstraps our own planner entry at startup. // It waits (with 15-second retries) for our peer record to be present in the // database before generating the first planner snapshot and broadcasting it // on PB_PLANNER. This handles the race between oc-scheduler starting before // oc-peer has fully registered our node. func InitSelfPlanner() { for { self, err := oclib.GetMySelf() if err != nil || self == nil { fmt.Println("InitSelfPlanner: self peer not found yet, retrying in 15s...") time.Sleep(15 * time.Second) continue } refreshSelfPlanner(self.PeerID, &tools.APIRequest{Admin: true}) return } } // --------------------------------------------------------------------------- // Self-planner refresh // --------------------------------------------------------------------------- // refreshSelfPlanner regenerates the local planner from the current state of // the booking DB, stores it in PlannerCache under our own node UUID, and // broadcasts it on PROPALGATION_EVENT / PB_PLANNER so all listeners (including // oc-discovery) are kept in sync. // // It should be called whenever a booking for our own peer is created, whether // by direct DB insertion (self-peer routing) or upon receiving a CREATE_RESOURCE // BOOKING message from oc-discovery. func refreshSelfPlanner(peerID string, request *tools.APIRequest) { p, err := planner.GenerateShallow(request) if err != nil { fmt.Println("refreshSelfPlanner: could not generate planner:", err) return } // Update the local cache and notify any waiting CheckStream goroutines. storePlanner(peerID, p) // Broadcast the updated planner so remote peers (and oc-discovery) can // refresh their view of our availability. type plannerWithPeer struct { PeerID string `json:"peer_id"` *planner.Planner } plannerPayload, err := json.Marshal(plannerWithPeer{PeerID: peerID, Planner: p}) if err != nil { return } EmitNATS(peerID, tools.PropalgationMessage{ Action: tools.PB_PLANNER, Payload: plannerPayload, }) } // --------------------------------------------------------------------------- // Planner broadcast // --------------------------------------------------------------------------- // broadcastPlanner iterates the storage and compute peers of the given workflow // and, for each peer not yet in the cache, emits a PB_PLANNER propagation so // downstream consumers (oc-discovery, other schedulers) refresh their state. func broadcastPlanner(wf *workflow.Workflow) { if wf.Graph == nil { return } items := []graph.GraphItem{} items = append(items, wf.GetGraphItems(wf.Graph.IsStorage)...) items = append(items, wf.GetGraphItems(wf.Graph.IsCompute)...) seen := []string{} for _, item := range items { i := item _, res := i.GetResource() if res == nil { continue } creatorID := res.GetCreatorID() if slices.Contains(seen, creatorID) { continue } data := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil).LoadOne(creatorID) p := data.ToPeer() if p == nil { continue } plannerMu.RLock() cached := PlannerCache[p.PeerID] plannerMu.RUnlock() if cached == nil { payload, err := json.Marshal(map[string]interface{}{"peer_id": p.PeerID}) if err != nil { continue } seen = append(seen, creatorID) EmitNATS(p.PeerID, tools.PropalgationMessage{ Action: tools.PB_PLANNER, Payload: payload, }) } } } func createNamespace(ns string) error { /* * This function is used to create a namespace. * It takes the following parameters: * - ns: the namespace you want to create */ logger := oclib.GetLogger() serv, err := tools.NewKubernetesService( conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData) if err != nil { return nil } c := context.Background() ok, err := serv.GetNamespace(c, ns) if ok != nil && err == nil { logger.Debug().Msg("A namespace with name " + ns + " already exists") return nil } if err != nil { return err } err = serv.CreateNamespace(c, ns) if err != nil { return err } err = serv.CreateServiceAccount(c, ns) if err != nil { return err } role := "argo-role" err = serv.CreateRole(c, ns, role, [][]string{ {"coordination.k8s.io"}, {""}, {""}}, [][]string{ {"leases"}, {"secrets"}, {"pods"}}, [][]string{ {"get", "create", "update"}, {"get"}, {"patch"}}) if err != nil { return err } return serv.CreateRoleBinding(c, ns, "argo-role-binding", role) }