Files
oc-scheduler/infrastructure/nats.go

644 lines
21 KiB
Go
Raw Normal View History

2026-02-23 18:10:47 +01:00
package infrastructure
import (
"encoding/json"
"fmt"
"slices"
"sync"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/booking/planner"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/nats-io/nats.go"
)
const plannerTTL = 24 * time.Hour
// ---------------------------------------------------------------------------
// Planner cache — protected by plannerMu
// ---------------------------------------------------------------------------
var plannerMu sync.RWMutex
var PlannerCache = map[string]*planner.Planner{}
var plannerAddedAt = map[string]time.Time{} // peerID → first-seen timestamp
// ---------------------------------------------------------------------------
// Subscriber registries — one keyed by peerID, one by workflowID
// ---------------------------------------------------------------------------
var subsMu sync.RWMutex
var plannerSubs = map[string][]chan struct{}{} // peerID → notification channels
var workflowSubs = map[string][]chan struct{}{} // workflowID → notification channels
// SubscribePlannerUpdates registers interest in planner changes for the given
// peer IDs. The returned channel receives one struct{} (non-blocking) each time
// any of those planners is updated. Call cancel to unregister.
func SubscribePlannerUpdates(peerIDs []string) (<-chan struct{}, func()) {
return subscribe(&subsMu, plannerSubs, peerIDs)
}
// SubscribeWorkflowUpdates registers interest in workflow modifications for the
// given workflow ID. The returned channel is signalled when the workflow changes
// (peer list may have grown or shrunk). Call cancel to unregister.
func SubscribeWorkflowUpdates(wfID string) (<-chan struct{}, func()) {
ch, cancel := subscribe(&subsMu, workflowSubs, []string{wfID})
return ch, cancel
}
// subscribe is the generic helper used by both registries.
func subscribe(mu *sync.RWMutex, registry map[string][]chan struct{}, keys []string) (<-chan struct{}, func()) {
ch := make(chan struct{}, 1)
mu.Lock()
for _, k := range keys {
registry[k] = append(registry[k], ch)
}
mu.Unlock()
cancel := func() {
mu.Lock()
for _, k := range keys {
subs := registry[k]
for i, s := range subs {
if s == ch {
registry[k] = append(subs[:i], subs[i+1:]...)
break
}
}
}
mu.Unlock()
}
return ch, cancel
}
func notifyPlannerWatchers(peerID string) {
notify(&subsMu, plannerSubs, peerID)
}
func notifyWorkflowWatchers(wfID string) {
notify(&subsMu, workflowSubs, wfID)
}
func notify(mu *sync.RWMutex, registry map[string][]chan struct{}, key string) {
mu.RLock()
subs := registry[key]
mu.RUnlock()
for _, ch := range subs {
select {
case ch <- struct{}{}:
default:
}
}
}
// ---------------------------------------------------------------------------
// Cache helpers
// ---------------------------------------------------------------------------
// storePlanner inserts or updates a planner for peerID.
// On first insertion it schedules an automatic eviction after plannerTTL.
// All subscribers interested in this peer are notified.
func storePlanner(peerID string, p *planner.Planner) {
plannerMu.Lock()
isNew := PlannerCache[peerID] == nil
PlannerCache[peerID] = p
if isNew {
plannerAddedAt[peerID] = time.Now()
go evictAfter(peerID, plannerTTL)
}
plannerMu.Unlock()
notifyPlannerWatchers(peerID)
}
// evictAfter waits ttl from the first-seen time for peerID then emits a
// PB_CLOSE_PLANNER event, which removes the entry from the cache and notifies
// NATS.
func evictAfter(peerID string, ttl time.Duration) {
time.Sleep(ttl)
plannerMu.RLock()
_, exists := PlannerCache[peerID]
plannerMu.RUnlock()
if exists {
EmitNATS(peerID, tools.PropalgationMessage{Action: tools.PB_CLOSE_PLANNER})
}
}
// ---------------------------------------------------------------------------
// NATS emission
// ---------------------------------------------------------------------------
func EmitNATS(peerID string, message tools.PropalgationMessage) {
if message.Action == tools.PB_CLOSE_PLANNER {
plannerMu.Lock()
delete(PlannerCache, peerID)
delete(plannerAddedAt, peerID)
plannerMu.Unlock()
notifyPlannerWatchers(peerID) // let streams re-evaluate (will warn "no planner")
}
b, _ := json.Marshal(message)
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-scheduler",
Datatype: -1,
Method: int(tools.PROPALGATION_EVENT),
Payload: b,
})
}
type executionConsidersPayload struct {
ID string `json:"id"`
ExecutionsID string `json:"executions_id"`
ExecutionID string `json:"execution_id"`
PeerIDs []string `json:"peer_ids"`
}
// emitConsiders broadcasts a PROPALGATION_EVENT with the Considers action,
// carrying the stored resource ID and its datatype (BOOKING or PURCHASE_RESOURCE).
func emitConsiders(id string, executionID string, dt tools.DataType) {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil)
data := access.LoadOne(executionID)
if data.ToWorkflowExecution() != nil {
exec := data.ToWorkflowExecution()
if peers, err := GetWorkflowPeerIDs(exec.WorkflowID, &tools.APIRequest{Admin: true}); err == nil {
payload, _ := json.Marshal(&executionConsidersPayload{
ID: id,
ExecutionsID: exec.ExecutionsID,
ExecutionID: executionID,
PeerIDs: peers,
})
b, _ := json.Marshal(tools.PropalgationMessage{
DataType: int(dt),
Action: tools.PB_CONSIDERS,
Payload: payload,
})
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-scheduler",
Datatype: dt,
Method: int(tools.PROPALGATION_EVENT),
Payload: b,
})
}
}
}
// EmitConsidersExecution broadcasts a Considers / WORKFLOW_EXECUTION message to all
// storage and compute peers of wf once the execution has transitioned to SCHEDULED.
// Each receiving peer will use it to confirm (IsDraft=false) their local drafts.
func EmitConsidersExecution(exec *workflow_execution.WorkflowExecution, wf *workflow.Workflow) {
if wf == nil || wf.Graph == nil {
return
}
peerIDs, err := GetWorkflowPeerIDs(wf.GetID(), &tools.APIRequest{Admin: true})
if err != nil {
return
}
if len(peerIDs) == 0 {
return
}
payload, err := json.Marshal(executionConsidersPayload{
ID: exec.GetID(),
ExecutionID: exec.GetID(),
ExecutionsID: exec.ExecutionsID,
PeerIDs: peerIDs})
if err != nil {
return
}
b, err := json.Marshal(tools.PropalgationMessage{
DataType: int(tools.WORKFLOW_EXECUTION),
Action: tools.PB_CONSIDERS,
Payload: payload,
})
if err != nil {
return
}
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-scheduler",
Datatype: tools.WORKFLOW_EXECUTION,
Method: int(tools.PROPALGATION_EVENT),
Payload: b,
})
}
// updateExecutionState sets BookingsState[id]=true (dt==BOOKING) or
// PurchasesState[id]=true (dt==PURCHASE_RESOURCE) on the target execution.
// payload must be JSON-encoded {"id":"...", "execution_id":"..."}.
func updateExecutionState(payload []byte, dt tools.DataType) {
var data executionConsidersPayload
if err := json.Unmarshal(payload, &data); err != nil || data.ID == "" || data.ExecutionID == "" {
return
}
adminReq := &tools.APIRequest{Admin: true}
res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(data.ExecutionID)
if err != nil || res == nil {
fmt.Printf("updateExecutionState: could not load execution %s: %v\n", data.ExecutionID, err)
return
}
exec := res.(*workflow_execution.WorkflowExecution)
switch dt {
case tools.BOOKING:
if exec.BookingsState == nil {
exec.BookingsState = map[string]bool{}
}
exec.BookingsState[data.ID] = true
case tools.PURCHASE_RESOURCE:
if exec.PurchasesState == nil {
exec.PurchasesState = map[string]bool{}
}
exec.PurchasesState[data.ID] = true
}
found := true
for _, st := range exec.BookingsState {
if !st {
found = false
break
}
}
for _, st := range exec.PurchasesState {
if !st {
found = false
break
}
}
if found {
exec.State = enum.SCHEDULED
}
if _, _, err := utils.GenericRawUpdateOne(exec, data.ExecutionID, workflow_execution.NewAccessor(adminReq)); err != nil {
fmt.Printf("updateExecutionState: could not update execution %s: %v\n", data.ExecutionID, err)
}
}
// confirmExecutionDrafts is called when a Considers/WORKFLOW_EXECUTION message
// is received from oc-discovery, meaning the originating peer has confirmed the
// execution as SCHEDULED. For every booking and purchase ID listed in the
// execution's states, we confirm the local draft (IsDraft=false).
func confirmExecutionDrafts(payload []byte) {
var data executionConsidersPayload
if err := json.Unmarshal(payload, &data); err != nil {
fmt.Printf("confirmExecutionDrafts: could not parse payload: %v\n", err)
return
}
access := oclib.NewRequestAdmin(oclib.LibDataEnum(tools.WORKFLOW_EXECUTION), nil)
d := access.LoadOne(data.ExecutionID)
if exec := d.ToWorkflowExecution(); exec != nil {
for id := range exec.BookingsState {
go confirmResource(id, tools.BOOKING)
}
for id := range exec.PurchasesState {
go confirmResource(id, tools.PURCHASE_RESOURCE)
}
}
}
// ---------------------------------------------------------------------------
// NATS listeners
// ---------------------------------------------------------------------------
func ListenNATS() {
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
// Receive planner snapshots pushed by oc-discovery and cache them.
// Considers messages:
// BOOKING / PURCHASE_RESOURCE → mark the individual resource as
// considered in the target WorkflowExecution (BookingsState / PurchasesState).
// WORKFLOW_EXECUTION → the execution reached SCHEDULED; confirm all
// local draft bookings and purchases listed in its states.
tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) {
if resp.FromApp != "oc-discovery" {
return
}
var prop tools.PropalgationMessage
if err := json.Unmarshal(resp.Payload, &prop); err != nil {
return
}
switch prop.Action {
case tools.PB_PLANNER:
m := map[string]interface{}{}
p := planner.Planner{}
if err := json.Unmarshal(prop.Payload, &m); err != nil {
return
}
if err := json.Unmarshal(prop.Payload, &p); err != nil {
return
}
storePlanner(fmt.Sprintf("%v", m["peer_id"]), &p)
case tools.PB_CONSIDERS:
switch tools.DataType(prop.DataType) {
case tools.BOOKING, tools.PURCHASE_RESOURCE:
updateExecutionState(prop.Payload, tools.DataType(prop.DataType))
case tools.WORKFLOW_EXECUTION:
confirmExecutionDrafts(prop.Payload)
}
}
},
// Incoming resource creation events:
// - WORKFLOW → refresh peer planner entries and notify CheckStream watchers.
// - BOOKING → if destined for us, validate, store as draft, start 10-min
// expiry timer, and emit a "considers_booking" response.
// - PURCHASE → if destined for us, store as draft, start 10-min expiry
// timer, and emit a "considers_purchase" response.
tools.REMOVE_RESOURCE: func(resp tools.NATSResponse) {
switch resp.Datatype {
case tools.WORKFLOW:
wf := workflow.Workflow{}
if err := json.Unmarshal(resp.Payload, &wf); err != nil {
return
}
notifyWorkflowWatchers(wf.GetID())
}
},
tools.CREATE_RESOURCE: func(resp tools.NATSResponse) {
switch resp.Datatype {
case tools.WORKFLOW:
wf := workflow.Workflow{}
if err := json.Unmarshal(resp.Payload, &wf); err != nil {
return
}
broadcastPlanner(&wf)
notifyWorkflowWatchers(wf.GetID())
case tools.BOOKING:
var bk booking.Booking
if err := json.Unmarshal(resp.Payload, &bk); err != nil {
return
}
self, err := oclib.GetMySelf()
if err != nil || self == nil || bk.DestPeerID != self.GetID() {
return
}
// Reject bookings whose start date is already in the past.
if !bk.ExpectedStartDate.IsZero() && bk.ExpectedStartDate.Before(time.Now()) {
fmt.Println("ListenNATS: booking start date is in the past, discarding")
return
}
// Verify the slot is free in our planner (if we have one).
plannerMu.RLock()
p := PlannerCache[self.PeerID]
plannerMu.RUnlock()
if p != nil && !checkInstance(p, bk.ResourceID, bk.InstanceID, bk.ExpectedStartDate, bk.ExpectedEndDate) {
fmt.Println("ListenNATS: booking conflicts with local planner, discarding")
return
}
adminReq := &tools.APIRequest{Admin: true}
bk.IsDraft = true
stored, _, err := booking.NewAccessor(adminReq).StoreOne(&bk)
if err != nil {
fmt.Println("ListenNATS: could not store booking:", err)
return
}
storedID := stored.GetID()
go refreshSelfPlanner(self.PeerID, adminReq)
time.AfterFunc(10*time.Minute, func() { draftTimeout(storedID, tools.BOOKING) })
go emitConsiders(storedID, stored.(*booking.Booking).ExecutionID, tools.BOOKING)
case tools.PURCHASE_RESOURCE:
var pr purchase_resource.PurchaseResource
if err := json.Unmarshal(resp.Payload, &pr); err != nil {
return
}
self, err := oclib.GetMySelf()
if err != nil || self == nil || pr.DestPeerID != self.GetID() {
return
}
adminReq := &tools.APIRequest{Admin: true}
pr.IsDraft = true
stored, _, err := purchase_resource.NewAccessor(adminReq).StoreOne(&pr)
if err != nil {
fmt.Println("ListenNATS: could not store purchase:", err)
return
}
storedID := stored.GetID()
time.AfterFunc(10*time.Minute, func() { draftTimeout(storedID, tools.PURCHASE_RESOURCE) })
go emitConsiders(storedID, stored.(*purchase_resource.PurchaseResource).ExecutionID, tools.PURCHASE_RESOURCE)
}
},
})
}
// ---------------------------------------------------------------------------
// Draft timeout
// ---------------------------------------------------------------------------
// draftTimeout deletes a booking or purchase resource if it is still a draft
// after the 10-minute confirmation window has elapsed.
func draftTimeout(id string, dt tools.DataType) {
adminReq := &tools.APIRequest{Admin: true}
var res utils.DBObject
var loadErr error
switch dt {
case tools.BOOKING:
res, _, loadErr = booking.NewAccessor(adminReq).LoadOne(id)
case tools.PURCHASE_RESOURCE:
res, _, loadErr = purchase_resource.NewAccessor(adminReq).LoadOne(id)
default:
return
}
if loadErr != nil || res == nil || !res.IsDrafted() {
return
}
switch dt {
case tools.BOOKING:
booking.NewAccessor(adminReq).DeleteOne(id)
case tools.PURCHASE_RESOURCE:
purchase_resource.NewAccessor(adminReq).DeleteOne(id)
}
fmt.Printf("draftTimeout: %s %s deleted (still draft after 10 min)\n", dt.String(), id)
}
// ---------------------------------------------------------------------------
// Confirm channels
// ---------------------------------------------------------------------------
// confirmResource sets IsDraft=false for a booking or purchase resource.
// For bookings it also advances State to SCHEDULED and refreshes the local planner.
func confirmResource(id string, dt tools.DataType) {
adminReq := &tools.APIRequest{Admin: true}
switch dt {
case tools.BOOKING:
res, _, err := booking.NewAccessor(adminReq).LoadOne(id)
if err != nil || res == nil {
fmt.Printf("confirmResource: could not load booking %s: %v\n", id, err)
return
}
bk := res.(*booking.Booking)
bk.IsDraft = false
bk.State = enum.SCHEDULED
if _, _, err := utils.GenericRawUpdateOne(bk, id, booking.NewAccessor(adminReq)); err != nil {
fmt.Printf("confirmResource: could not confirm booking %s: %v\n", id, err)
return
}
self, err := oclib.GetMySelf()
if err == nil && self != nil {
go refreshSelfPlanner(self.PeerID, adminReq)
}
case tools.PURCHASE_RESOURCE:
res, _, err := purchase_resource.NewAccessor(adminReq).LoadOne(id)
if err != nil || res == nil {
fmt.Printf("confirmResource: could not load purchase %s: %v\n", id, err)
return
}
pr := res.(*purchase_resource.PurchaseResource)
pr.IsDraft = false
if _, _, err := utils.GenericRawUpdateOne(pr, id, purchase_resource.NewAccessor(adminReq)); err != nil {
fmt.Printf("confirmResource: could not confirm purchase %s: %v\n", id, err)
}
}
}
// listenConfirmChannel subscribes to a NATS subject and calls confirmResource
// for each message received. The message body is expected to be the plain
// resource ID (UTF-8 string).
func listenConfirmChannel(nc *nats.Conn, subject string, dt tools.DataType, wg *sync.WaitGroup) {
defer wg.Done()
ch := make(chan *nats.Msg, 64)
sub, err := nc.ChanSubscribe(subject, ch)
if err != nil {
fmt.Printf("listenConfirmChannel: could not subscribe to %s: %v\n", subject, err)
return
}
defer sub.Unsubscribe()
for msg := range ch {
confirmResource(string(msg.Data), dt)
}
}
// ListenConfirm opens a direct NATS connection and subscribes to the hardcoded
// "confirm_booking" and "confirm_purchase" subjects. It reconnects automatically
// if the connection is lost.
func ListenConfirm() {
natsURL := config.GetConfig().NATSUrl
if natsURL == "" {
fmt.Println("ListenConfirm: NATS_SERVER not set, skipping confirm listeners")
return
}
for {
nc, err := nats.Connect(natsURL)
if err != nil {
fmt.Println("ListenConfirm: could not connect to NATS:", err)
time.Sleep(time.Minute)
continue
}
var wg sync.WaitGroup
wg.Add(2)
go listenConfirmChannel(nc, "confirm_booking", tools.BOOKING, &wg)
go listenConfirmChannel(nc, "confirm_purchase", tools.PURCHASE_RESOURCE, &wg)
wg.Wait()
nc.Close()
}
}
// ---------------------------------------------------------------------------
// Self-planner initialisation
// ---------------------------------------------------------------------------
// InitSelfPlanner bootstraps our own planner entry at startup.
// It waits (with 15-second retries) for our peer record to be present in the
// database before generating the first planner snapshot and broadcasting it
// on PB_PLANNER. This handles the race between oc-scheduler starting before
// oc-peer has fully registered our node.
func InitSelfPlanner() {
for {
self, err := oclib.GetMySelf()
if err != nil || self == nil {
fmt.Println("InitSelfPlanner: self peer not found yet, retrying in 15s...")
time.Sleep(15 * time.Second)
continue
}
refreshSelfPlanner(self.PeerID, &tools.APIRequest{Admin: true})
return
}
}
// ---------------------------------------------------------------------------
// Self-planner refresh
// ---------------------------------------------------------------------------
// refreshSelfPlanner regenerates the local planner from the current state of
// the booking DB, stores it in PlannerCache under our own node UUID, and
// broadcasts it on PROPALGATION_EVENT / PB_PLANNER so all listeners (including
// oc-discovery) are kept in sync.
//
// It should be called whenever a booking for our own peer is created, whether
// by direct DB insertion (self-peer routing) or upon receiving a CREATE_RESOURCE
// BOOKING message from oc-discovery.
func refreshSelfPlanner(peerID string, request *tools.APIRequest) {
p, err := planner.GenerateShallow(request)
if err != nil {
fmt.Println("refreshSelfPlanner: could not generate planner:", err)
return
}
// Update the local cache and notify any waiting CheckStream goroutines.
storePlanner(peerID, p)
// Broadcast the updated planner so remote peers (and oc-discovery) can
// refresh their view of our availability.
type plannerWithPeer struct {
PeerID string `json:"peer_id"`
*planner.Planner
}
plannerPayload, err := json.Marshal(plannerWithPeer{PeerID: peerID, Planner: p})
if err != nil {
return
}
EmitNATS(peerID, tools.PropalgationMessage{
Action: tools.PB_PLANNER,
Payload: plannerPayload,
})
}
// ---------------------------------------------------------------------------
// Planner broadcast
// ---------------------------------------------------------------------------
// broadcastPlanner iterates the storage and compute peers of the given workflow
// and, for each peer not yet in the cache, emits a PB_PLANNER propagation so
// downstream consumers (oc-discovery, other schedulers) refresh their state.
func broadcastPlanner(wf *workflow.Workflow) {
if wf.Graph == nil {
return
}
items := []graph.GraphItem{}
items = append(items, wf.GetGraphItems(wf.Graph.IsStorage)...)
items = append(items, wf.GetGraphItems(wf.Graph.IsCompute)...)
seen := []string{}
for _, item := range items {
i := item
_, res := i.GetResource()
if res == nil {
continue
}
creatorID := res.GetCreatorID()
if slices.Contains(seen, creatorID) {
continue
}
data := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil).LoadOne(creatorID)
p := data.ToPeer()
if p == nil {
continue
}
plannerMu.RLock()
cached := PlannerCache[p.PeerID]
plannerMu.RUnlock()
if cached == nil {
payload, err := json.Marshal(map[string]interface{}{"peer_id": p.PeerID})
if err != nil {
continue
}
seen = append(seen, creatorID)
EmitNATS(p.PeerID, tools.PropalgationMessage{
Action: tools.PB_PLANNER,
Payload: payload,
})
}
}
}