699 lines
22 KiB
Go
699 lines
22 KiB
Go
package infrastructure
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"oc-scheduler/conf"
|
|
"slices"
|
|
"sync"
|
|
"time"
|
|
|
|
oclib "cloud.o-forge.io/core/oc-lib"
|
|
"cloud.o-forge.io/core/oc-lib/config"
|
|
"cloud.o-forge.io/core/oc-lib/models/booking"
|
|
"cloud.o-forge.io/core/oc-lib/models/booking/planner"
|
|
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
|
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
|
|
"cloud.o-forge.io/core/oc-lib/models/utils"
|
|
"cloud.o-forge.io/core/oc-lib/models/workflow"
|
|
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
|
|
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
|
"cloud.o-forge.io/core/oc-lib/tools"
|
|
"github.com/nats-io/nats.go"
|
|
)
|
|
|
|
const plannerTTL = 24 * time.Hour
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Planner cache — protected by plannerMu
|
|
// ---------------------------------------------------------------------------
|
|
|
|
var plannerMu sync.RWMutex
|
|
var PlannerCache = map[string]*planner.Planner{}
|
|
var plannerAddedAt = map[string]time.Time{} // peerID → first-seen timestamp
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Subscriber registries — one keyed by peerID, one by workflowID
|
|
// ---------------------------------------------------------------------------
|
|
|
|
var subsMu sync.RWMutex
|
|
var plannerSubs = map[string][]chan struct{}{} // peerID → notification channels
|
|
var workflowSubs = map[string][]chan struct{}{} // workflowID → notification channels
|
|
|
|
// SubscribePlannerUpdates registers interest in planner changes for the given
|
|
// peer IDs. The returned channel receives one struct{} (non-blocking) each time
|
|
// any of those planners is updated. Call cancel to unregister.
|
|
func SubscribePlannerUpdates(peerIDs []string) (<-chan struct{}, func()) {
|
|
return subscribe(&subsMu, plannerSubs, peerIDs)
|
|
}
|
|
|
|
// SubscribeWorkflowUpdates registers interest in workflow modifications for the
|
|
// given workflow ID. The returned channel is signalled when the workflow changes
|
|
// (peer list may have grown or shrunk). Call cancel to unregister.
|
|
func SubscribeWorkflowUpdates(wfID string) (<-chan struct{}, func()) {
|
|
ch, cancel := subscribe(&subsMu, workflowSubs, []string{wfID})
|
|
return ch, cancel
|
|
}
|
|
|
|
// subscribe is the generic helper used by both registries.
|
|
func subscribe(mu *sync.RWMutex, registry map[string][]chan struct{}, keys []string) (<-chan struct{}, func()) {
|
|
ch := make(chan struct{}, 1)
|
|
mu.Lock()
|
|
for _, k := range keys {
|
|
registry[k] = append(registry[k], ch)
|
|
}
|
|
mu.Unlock()
|
|
cancel := func() {
|
|
mu.Lock()
|
|
for _, k := range keys {
|
|
subs := registry[k]
|
|
for i, s := range subs {
|
|
if s == ch {
|
|
registry[k] = append(subs[:i], subs[i+1:]...)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
mu.Unlock()
|
|
}
|
|
return ch, cancel
|
|
}
|
|
|
|
func notifyPlannerWatchers(peerID string) {
|
|
notify(&subsMu, plannerSubs, peerID)
|
|
}
|
|
|
|
func notifyWorkflowWatchers(wfID string) {
|
|
notify(&subsMu, workflowSubs, wfID)
|
|
}
|
|
|
|
func notify(mu *sync.RWMutex, registry map[string][]chan struct{}, key string) {
|
|
mu.RLock()
|
|
subs := registry[key]
|
|
mu.RUnlock()
|
|
for _, ch := range subs {
|
|
select {
|
|
case ch <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Cache helpers
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// storePlanner inserts or updates a planner for peerID.
|
|
// On first insertion it schedules an automatic eviction after plannerTTL.
|
|
// All subscribers interested in this peer are notified.
|
|
func storePlanner(peerID string, p *planner.Planner) {
|
|
plannerMu.Lock()
|
|
isNew := PlannerCache[peerID] == nil
|
|
PlannerCache[peerID] = p
|
|
if isNew {
|
|
plannerAddedAt[peerID] = time.Now()
|
|
go evictAfter(peerID, plannerTTL)
|
|
}
|
|
plannerMu.Unlock()
|
|
notifyPlannerWatchers(peerID)
|
|
}
|
|
|
|
// evictAfter waits ttl from the first-seen time for peerID then emits a
|
|
// PB_CLOSE_PLANNER event, which removes the entry from the cache and notifies
|
|
// NATS.
|
|
func evictAfter(peerID string, ttl time.Duration) {
|
|
time.Sleep(ttl)
|
|
plannerMu.RLock()
|
|
_, exists := PlannerCache[peerID]
|
|
plannerMu.RUnlock()
|
|
if exists {
|
|
EmitNATS(peerID, tools.PropalgationMessage{Action: tools.PB_CLOSE_PLANNER})
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// NATS emission
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func EmitNATS(peerID string, message tools.PropalgationMessage) {
|
|
if message.Action == tools.PB_CLOSE_PLANNER {
|
|
plannerMu.Lock()
|
|
delete(PlannerCache, peerID)
|
|
delete(plannerAddedAt, peerID)
|
|
plannerMu.Unlock()
|
|
notifyPlannerWatchers(peerID) // let streams re-evaluate (will warn "no planner")
|
|
}
|
|
b, _ := json.Marshal(message)
|
|
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
|
|
FromApp: "oc-scheduler",
|
|
Datatype: -1,
|
|
Method: int(tools.PROPALGATION_EVENT),
|
|
Payload: b,
|
|
})
|
|
}
|
|
|
|
type executionConsidersPayload struct {
|
|
ID string `json:"id"`
|
|
ExecutionsID string `json:"executions_id"`
|
|
ExecutionID string `json:"execution_id"`
|
|
PeerIDs []string `json:"peer_ids"`
|
|
}
|
|
|
|
// emitConsiders broadcasts a PROPALGATION_EVENT with the Considers action,
|
|
// carrying the stored resource ID and its datatype (BOOKING or PURCHASE_RESOURCE).
|
|
func emitConsiders(id string, executionID string, dt tools.DataType) {
|
|
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil)
|
|
data := access.LoadOne(executionID)
|
|
if data.ToWorkflowExecution() != nil {
|
|
exec := data.ToWorkflowExecution()
|
|
if peers, err := GetWorkflowPeerIDs(exec.WorkflowID, &tools.APIRequest{Admin: true}); err == nil {
|
|
payload, _ := json.Marshal(&executionConsidersPayload{
|
|
ID: id,
|
|
ExecutionsID: exec.ExecutionsID,
|
|
ExecutionID: executionID,
|
|
PeerIDs: peers,
|
|
})
|
|
b, _ := json.Marshal(tools.PropalgationMessage{
|
|
DataType: int(dt),
|
|
Action: tools.PB_CONSIDERS,
|
|
Payload: payload,
|
|
})
|
|
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
|
|
FromApp: "oc-scheduler",
|
|
Datatype: dt,
|
|
Method: int(tools.PROPALGATION_EVENT),
|
|
Payload: b,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// EmitConsidersExecution broadcasts a Considers / WORKFLOW_EXECUTION message to all
|
|
// storage and compute peers of wf once the execution has transitioned to SCHEDULED.
|
|
// Each receiving peer will use it to confirm (IsDraft=false) their local drafts.
|
|
func EmitConsidersExecution(exec *workflow_execution.WorkflowExecution, wf *workflow.Workflow) {
|
|
if wf == nil || wf.Graph == nil {
|
|
return
|
|
}
|
|
peerIDs, err := GetWorkflowPeerIDs(wf.GetID(), &tools.APIRequest{Admin: true})
|
|
if err != nil {
|
|
return
|
|
}
|
|
if len(peerIDs) == 0 {
|
|
return
|
|
}
|
|
payload, err := json.Marshal(executionConsidersPayload{
|
|
ID: exec.GetID(),
|
|
ExecutionID: exec.GetID(),
|
|
ExecutionsID: exec.ExecutionsID,
|
|
PeerIDs: peerIDs})
|
|
if err != nil {
|
|
return
|
|
}
|
|
b, err := json.Marshal(tools.PropalgationMessage{
|
|
DataType: int(tools.WORKFLOW_EXECUTION),
|
|
Action: tools.PB_CONSIDERS,
|
|
Payload: payload,
|
|
})
|
|
if err != nil {
|
|
return
|
|
}
|
|
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
|
|
FromApp: "oc-scheduler",
|
|
Datatype: tools.WORKFLOW_EXECUTION,
|
|
Method: int(tools.PROPALGATION_EVENT),
|
|
Payload: b,
|
|
})
|
|
}
|
|
|
|
// updateExecutionState sets BookingsState[id]=true (dt==BOOKING) or
|
|
// PurchasesState[id]=true (dt==PURCHASE_RESOURCE) on the target execution.
|
|
// payload must be JSON-encoded {"id":"...", "execution_id":"..."}.
|
|
func updateExecutionState(payload []byte, dt tools.DataType) {
|
|
var data executionConsidersPayload
|
|
if err := json.Unmarshal(payload, &data); err != nil || data.ID == "" || data.ExecutionID == "" {
|
|
return
|
|
}
|
|
adminReq := &tools.APIRequest{Admin: true}
|
|
res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(data.ExecutionID)
|
|
if err != nil || res == nil {
|
|
fmt.Printf("updateExecutionState: could not load execution %s: %v\n", data.ExecutionID, err)
|
|
return
|
|
}
|
|
exec := res.(*workflow_execution.WorkflowExecution)
|
|
switch dt {
|
|
case tools.BOOKING:
|
|
if exec.BookingsState == nil {
|
|
exec.BookingsState = map[string]bool{}
|
|
}
|
|
exec.BookingsState[data.ID] = true
|
|
case tools.PURCHASE_RESOURCE:
|
|
if exec.PurchasesState == nil {
|
|
exec.PurchasesState = map[string]bool{}
|
|
}
|
|
exec.PurchasesState[data.ID] = true
|
|
}
|
|
found := true
|
|
for _, st := range exec.BookingsState {
|
|
if !st {
|
|
found = false
|
|
break
|
|
}
|
|
}
|
|
for _, st := range exec.PurchasesState {
|
|
if !st {
|
|
found = false
|
|
break
|
|
}
|
|
}
|
|
if found {
|
|
exec.State = enum.SCHEDULED
|
|
}
|
|
if _, _, err := utils.GenericRawUpdateOne(exec, data.ExecutionID, workflow_execution.NewAccessor(adminReq)); err != nil {
|
|
fmt.Printf("updateExecutionState: could not update execution %s: %v\n", data.ExecutionID, err)
|
|
}
|
|
}
|
|
|
|
// confirmExecutionDrafts is called when a Considers/WORKFLOW_EXECUTION message
|
|
// is received from oc-discovery, meaning the originating peer has confirmed the
|
|
// execution as SCHEDULED. For every booking and purchase ID listed in the
|
|
// execution's states, we confirm the local draft (IsDraft=false).
|
|
func confirmExecutionDrafts(payload []byte) {
|
|
var data executionConsidersPayload
|
|
if err := json.Unmarshal(payload, &data); err != nil {
|
|
fmt.Printf("confirmExecutionDrafts: could not parse payload: %v\n", err)
|
|
return
|
|
}
|
|
access := oclib.NewRequestAdmin(oclib.LibDataEnum(tools.WORKFLOW_EXECUTION), nil)
|
|
d := access.LoadOne(data.ExecutionID)
|
|
if exec := d.ToWorkflowExecution(); exec != nil {
|
|
for id := range exec.BookingsState {
|
|
go confirmResource(id, tools.BOOKING)
|
|
}
|
|
for id := range exec.PurchasesState {
|
|
go confirmResource(id, tools.PURCHASE_RESOURCE)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// NATS listeners
|
|
// ---------------------------------------------------------------------------
|
|
|
|
func ListenNATS() {
|
|
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
|
|
// Receive planner snapshots pushed by oc-discovery and cache them.
|
|
// Considers messages:
|
|
// BOOKING / PURCHASE_RESOURCE → mark the individual resource as
|
|
// considered in the target WorkflowExecution (BookingsState / PurchasesState).
|
|
// WORKFLOW_EXECUTION → the execution reached SCHEDULED; confirm all
|
|
// local draft bookings and purchases listed in its states.
|
|
tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) {
|
|
if resp.FromApp != "oc-discovery" {
|
|
return
|
|
}
|
|
var prop tools.PropalgationMessage
|
|
if err := json.Unmarshal(resp.Payload, &prop); err != nil {
|
|
return
|
|
}
|
|
switch prop.Action {
|
|
case tools.PB_PLANNER:
|
|
m := map[string]interface{}{}
|
|
p := planner.Planner{}
|
|
if err := json.Unmarshal(prop.Payload, &m); err != nil {
|
|
return
|
|
}
|
|
if err := json.Unmarshal(prop.Payload, &p); err != nil {
|
|
return
|
|
}
|
|
storePlanner(fmt.Sprintf("%v", m["peer_id"]), &p)
|
|
case tools.PB_CONSIDERS:
|
|
switch tools.DataType(prop.DataType) {
|
|
case tools.BOOKING, tools.PURCHASE_RESOURCE:
|
|
updateExecutionState(prop.Payload, tools.DataType(prop.DataType))
|
|
case tools.WORKFLOW_EXECUTION:
|
|
confirmExecutionDrafts(prop.Payload)
|
|
}
|
|
}
|
|
},
|
|
|
|
// Incoming resource creation events:
|
|
// - WORKFLOW → refresh peer planner entries and notify CheckStream watchers.
|
|
// - BOOKING → if destined for us, validate, store as draft, start 10-min
|
|
// expiry timer, and emit a "considers_booking" response.
|
|
// - PURCHASE → if destined for us, store as draft, start 10-min expiry
|
|
// timer, and emit a "considers_purchase" response.
|
|
tools.REMOVE_RESOURCE: func(resp tools.NATSResponse) {
|
|
switch resp.Datatype {
|
|
case tools.WORKFLOW:
|
|
wf := workflow.Workflow{}
|
|
if err := json.Unmarshal(resp.Payload, &wf); err != nil {
|
|
return
|
|
}
|
|
notifyWorkflowWatchers(wf.GetID())
|
|
}
|
|
},
|
|
tools.CREATE_RESOURCE: func(resp tools.NATSResponse) {
|
|
switch resp.Datatype {
|
|
case tools.WORKFLOW:
|
|
wf := workflow.Workflow{}
|
|
if err := json.Unmarshal(resp.Payload, &wf); err != nil {
|
|
return
|
|
}
|
|
broadcastPlanner(&wf)
|
|
notifyWorkflowWatchers(wf.GetID())
|
|
case tools.BOOKING:
|
|
var bk booking.Booking
|
|
if err := json.Unmarshal(resp.Payload, &bk); err != nil {
|
|
return
|
|
}
|
|
self, err := oclib.GetMySelf()
|
|
if err != nil || self == nil || bk.DestPeerID != self.GetID() {
|
|
return
|
|
}
|
|
// Reject bookings whose start date is already in the past.
|
|
if !bk.ExpectedStartDate.IsZero() && bk.ExpectedStartDate.Before(time.Now()) {
|
|
fmt.Println("ListenNATS: booking start date is in the past, discarding")
|
|
return
|
|
}
|
|
// Verify the slot is free in our planner (if we have one).
|
|
plannerMu.RLock()
|
|
p := PlannerCache[self.PeerID]
|
|
plannerMu.RUnlock()
|
|
if p != nil && !checkInstance(p, bk.ResourceID, bk.InstanceID, bk.ExpectedStartDate, bk.ExpectedEndDate) {
|
|
fmt.Println("ListenNATS: booking conflicts with local planner, discarding")
|
|
return
|
|
}
|
|
adminReq := &tools.APIRequest{Admin: true}
|
|
bk.IsDraft = true
|
|
stored, _, err := booking.NewAccessor(adminReq).StoreOne(&bk)
|
|
if err != nil {
|
|
fmt.Println("ListenNATS: could not store booking:", err)
|
|
return
|
|
}
|
|
storedID := stored.GetID()
|
|
go refreshSelfPlanner(self.PeerID, adminReq)
|
|
time.AfterFunc(10*time.Minute, func() { draftTimeout(storedID, tools.BOOKING) })
|
|
go emitConsiders(storedID, stored.(*booking.Booking).ExecutionID, tools.BOOKING)
|
|
|
|
case tools.PURCHASE_RESOURCE:
|
|
var pr purchase_resource.PurchaseResource
|
|
if err := json.Unmarshal(resp.Payload, &pr); err != nil {
|
|
return
|
|
}
|
|
self, err := oclib.GetMySelf()
|
|
if err != nil || self == nil || pr.DestPeerID != self.GetID() {
|
|
return
|
|
}
|
|
adminReq := &tools.APIRequest{Admin: true}
|
|
pr.IsDraft = true
|
|
stored, _, err := purchase_resource.NewAccessor(adminReq).StoreOne(&pr)
|
|
if err != nil {
|
|
fmt.Println("ListenNATS: could not store purchase:", err)
|
|
return
|
|
}
|
|
storedID := stored.GetID()
|
|
time.AfterFunc(10*time.Minute, func() { draftTimeout(storedID, tools.PURCHASE_RESOURCE) })
|
|
go emitConsiders(storedID, stored.(*purchase_resource.PurchaseResource).ExecutionID, tools.PURCHASE_RESOURCE)
|
|
}
|
|
},
|
|
})
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Draft timeout
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// draftTimeout deletes a booking or purchase resource if it is still a draft
|
|
// after the 10-minute confirmation window has elapsed.
|
|
func draftTimeout(id string, dt tools.DataType) {
|
|
adminReq := &tools.APIRequest{Admin: true}
|
|
var res utils.DBObject
|
|
var loadErr error
|
|
switch dt {
|
|
case tools.BOOKING:
|
|
res, _, loadErr = booking.NewAccessor(adminReq).LoadOne(id)
|
|
case tools.PURCHASE_RESOURCE:
|
|
res, _, loadErr = purchase_resource.NewAccessor(adminReq).LoadOne(id)
|
|
default:
|
|
return
|
|
}
|
|
if loadErr != nil || res == nil || !res.IsDrafted() {
|
|
return
|
|
}
|
|
switch dt {
|
|
case tools.BOOKING:
|
|
booking.NewAccessor(adminReq).DeleteOne(id)
|
|
case tools.PURCHASE_RESOURCE:
|
|
purchase_resource.NewAccessor(adminReq).DeleteOne(id)
|
|
}
|
|
fmt.Printf("draftTimeout: %s %s deleted (still draft after 10 min)\n", dt.String(), id)
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Confirm channels
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// confirmResource sets IsDraft=false for a booking or purchase resource.
|
|
// For bookings it also advances State to SCHEDULED and refreshes the local planner.
|
|
func confirmResource(id string, dt tools.DataType) {
|
|
adminReq := &tools.APIRequest{Admin: true}
|
|
switch dt {
|
|
case tools.BOOKING:
|
|
res, _, err := booking.NewAccessor(adminReq).LoadOne(id)
|
|
if err != nil || res == nil {
|
|
fmt.Printf("confirmResource: could not load booking %s: %v\n", id, err)
|
|
return
|
|
}
|
|
bk := res.(*booking.Booking)
|
|
bk.IsDraft = false
|
|
bk.State = enum.SCHEDULED
|
|
if _, _, err := utils.GenericRawUpdateOne(bk, id, booking.NewAccessor(adminReq)); err != nil {
|
|
fmt.Printf("confirmResource: could not confirm booking %s: %v\n", id, err)
|
|
return
|
|
}
|
|
createNamespace(bk.ExecutionsID) // create Namespace locally
|
|
self, err := oclib.GetMySelf()
|
|
if err == nil && self != nil {
|
|
go refreshSelfPlanner(self.PeerID, adminReq)
|
|
}
|
|
case tools.PURCHASE_RESOURCE:
|
|
res, _, err := purchase_resource.NewAccessor(adminReq).LoadOne(id)
|
|
if err != nil || res == nil {
|
|
fmt.Printf("confirmResource: could not load purchase %s: %v\n", id, err)
|
|
return
|
|
}
|
|
pr := res.(*purchase_resource.PurchaseResource)
|
|
pr.IsDraft = false
|
|
if _, _, err := utils.GenericRawUpdateOne(pr, id, purchase_resource.NewAccessor(adminReq)); err != nil {
|
|
fmt.Printf("confirmResource: could not confirm purchase %s: %v\n", id, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// listenConfirmChannel subscribes to a NATS subject and calls confirmResource
|
|
// for each message received. The message body is expected to be the plain
|
|
// resource ID (UTF-8 string).
|
|
func listenConfirmChannel(nc *nats.Conn, subject string, dt tools.DataType, wg *sync.WaitGroup) {
|
|
defer wg.Done()
|
|
ch := make(chan *nats.Msg, 64)
|
|
sub, err := nc.ChanSubscribe(subject, ch)
|
|
if err != nil {
|
|
fmt.Printf("listenConfirmChannel: could not subscribe to %s: %v\n", subject, err)
|
|
return
|
|
}
|
|
defer sub.Unsubscribe()
|
|
for msg := range ch {
|
|
confirmResource(string(msg.Data), dt)
|
|
}
|
|
}
|
|
|
|
// ListenConfirm opens a direct NATS connection and subscribes to the hardcoded
|
|
// "confirm_booking" and "confirm_purchase" subjects. It reconnects automatically
|
|
// if the connection is lost.
|
|
func ListenConfirm() {
|
|
natsURL := config.GetConfig().NATSUrl
|
|
if natsURL == "" {
|
|
fmt.Println("ListenConfirm: NATS_SERVER not set, skipping confirm listeners")
|
|
return
|
|
}
|
|
for {
|
|
nc, err := nats.Connect(natsURL)
|
|
if err != nil {
|
|
fmt.Println("ListenConfirm: could not connect to NATS:", err)
|
|
time.Sleep(time.Minute)
|
|
continue
|
|
}
|
|
var wg sync.WaitGroup
|
|
wg.Add(2)
|
|
go listenConfirmChannel(nc, "confirm_booking", tools.BOOKING, &wg)
|
|
go listenConfirmChannel(nc, "confirm_purchase", tools.PURCHASE_RESOURCE, &wg)
|
|
wg.Wait()
|
|
nc.Close()
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Self-planner initialisation
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// InitSelfPlanner bootstraps our own planner entry at startup.
|
|
// It waits (with 15-second retries) for our peer record to be present in the
|
|
// database before generating the first planner snapshot and broadcasting it
|
|
// on PB_PLANNER. This handles the race between oc-scheduler starting before
|
|
// oc-peer has fully registered our node.
|
|
func InitSelfPlanner() {
|
|
for {
|
|
self, err := oclib.GetMySelf()
|
|
if err != nil || self == nil {
|
|
fmt.Println("InitSelfPlanner: self peer not found yet, retrying in 15s...")
|
|
time.Sleep(15 * time.Second)
|
|
continue
|
|
}
|
|
refreshSelfPlanner(self.PeerID, &tools.APIRequest{Admin: true})
|
|
return
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Self-planner refresh
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// refreshSelfPlanner regenerates the local planner from the current state of
|
|
// the booking DB, stores it in PlannerCache under our own node UUID, and
|
|
// broadcasts it on PROPALGATION_EVENT / PB_PLANNER so all listeners (including
|
|
// oc-discovery) are kept in sync.
|
|
//
|
|
// It should be called whenever a booking for our own peer is created, whether
|
|
// by direct DB insertion (self-peer routing) or upon receiving a CREATE_RESOURCE
|
|
// BOOKING message from oc-discovery.
|
|
func refreshSelfPlanner(peerID string, request *tools.APIRequest) {
|
|
p, err := planner.GenerateShallow(request)
|
|
if err != nil {
|
|
fmt.Println("refreshSelfPlanner: could not generate planner:", err)
|
|
return
|
|
}
|
|
|
|
// Update the local cache and notify any waiting CheckStream goroutines.
|
|
storePlanner(peerID, p)
|
|
|
|
// Broadcast the updated planner so remote peers (and oc-discovery) can
|
|
// refresh their view of our availability.
|
|
type plannerWithPeer struct {
|
|
PeerID string `json:"peer_id"`
|
|
*planner.Planner
|
|
}
|
|
plannerPayload, err := json.Marshal(plannerWithPeer{PeerID: peerID, Planner: p})
|
|
if err != nil {
|
|
return
|
|
}
|
|
EmitNATS(peerID, tools.PropalgationMessage{
|
|
Action: tools.PB_PLANNER,
|
|
Payload: plannerPayload,
|
|
})
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Planner broadcast
|
|
// ---------------------------------------------------------------------------
|
|
|
|
// broadcastPlanner iterates the storage and compute peers of the given workflow
|
|
// and, for each peer not yet in the cache, emits a PB_PLANNER propagation so
|
|
// downstream consumers (oc-discovery, other schedulers) refresh their state.
|
|
func broadcastPlanner(wf *workflow.Workflow) {
|
|
if wf.Graph == nil {
|
|
return
|
|
}
|
|
items := []graph.GraphItem{}
|
|
items = append(items, wf.GetGraphItems(wf.Graph.IsStorage)...)
|
|
items = append(items, wf.GetGraphItems(wf.Graph.IsCompute)...)
|
|
|
|
seen := []string{}
|
|
for _, item := range items {
|
|
i := item
|
|
_, res := i.GetResource()
|
|
if res == nil {
|
|
continue
|
|
}
|
|
creatorID := res.GetCreatorID()
|
|
if slices.Contains(seen, creatorID) {
|
|
continue
|
|
}
|
|
|
|
data := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil).LoadOne(creatorID)
|
|
p := data.ToPeer()
|
|
if p == nil {
|
|
continue
|
|
}
|
|
|
|
plannerMu.RLock()
|
|
cached := PlannerCache[p.PeerID]
|
|
plannerMu.RUnlock()
|
|
|
|
if cached == nil {
|
|
payload, err := json.Marshal(map[string]interface{}{"peer_id": p.PeerID})
|
|
if err != nil {
|
|
continue
|
|
}
|
|
seen = append(seen, creatorID)
|
|
EmitNATS(p.PeerID, tools.PropalgationMessage{
|
|
Action: tools.PB_PLANNER,
|
|
Payload: payload,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
func createNamespace(ns string) error {
|
|
/*
|
|
* This function is used to create a namespace.
|
|
* It takes the following parameters:
|
|
* - ns: the namespace you want to create
|
|
*/
|
|
logger := oclib.GetLogger()
|
|
serv, err := tools.NewKubernetesService(
|
|
conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, conf.GetConfig().KubeCA,
|
|
conf.GetConfig().KubeCert, conf.GetConfig().KubeData)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
c := context.Background()
|
|
|
|
ok, err := serv.GetNamespace(c, ns)
|
|
if ok != nil && err == nil {
|
|
logger.Debug().Msg("A namespace with name " + ns + " already exists")
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = serv.CreateNamespace(c, ns)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = serv.CreateServiceAccount(c, ns)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
role := "argo-role"
|
|
err = serv.CreateRole(c, ns, role,
|
|
[][]string{
|
|
{"coordination.k8s.io"},
|
|
{""},
|
|
{""}},
|
|
[][]string{
|
|
{"leases"},
|
|
{"secrets"},
|
|
{"pods"}},
|
|
[][]string{
|
|
{"get", "create", "update"},
|
|
{"get"},
|
|
{"patch"}})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return serv.CreateRoleBinding(c, ns, "argo-role-binding", role)
|
|
}
|