From c8b8955c4bf4fbe8b5f7d5b19092619db06bd02a Mon Sep 17 00:00:00 2001 From: mr Date: Mon, 23 Feb 2026 18:10:47 +0100 Subject: [PATCH] Decentralized --- controllers/workflow_sheduler.go | 128 ++++++ docs/nats.md | 140 +++++++ go.mod | 3 +- go.sum | 55 ++- infrastructure/nats.go | 643 +++++++++++++++++++++++++++++++ infrastructure/scheduler.go | 419 ++++++++++++++++---- main.go | 4 + 7 files changed, 1311 insertions(+), 81 deletions(-) create mode 100644 docs/nats.md create mode 100644 infrastructure/nats.go diff --git a/controllers/workflow_sheduler.go b/controllers/workflow_sheduler.go index fce5298..acab35d 100644 --- a/controllers/workflow_sheduler.go +++ b/controllers/workflow_sheduler.go @@ -3,6 +3,7 @@ package controllers import ( "encoding/json" "fmt" + "net/http" "oc-scheduler/infrastructure" "slices" @@ -15,6 +16,7 @@ import ( "cloud.o-forge.io/core/oc-lib/tools" beego "github.com/beego/beego/v2/server/web" "github.com/google/uuid" + gorillaws "github.com/gorilla/websocket" ) var orderCollection = oclib.LibDataEnum(oclib.ORDER) @@ -108,6 +110,132 @@ func (o *WorkflowSchedulerController) Schedule() { o.ServeJSON() } +var wsUpgrader = gorillaws.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, +} + +// @Title CheckStream +// @Description WebSocket stream of slot availability for a workflow. +// After the handshake the client sends one JSON frame containing the +// WorkflowSchedule parameters (start, end, booking_mode, duration_s, …). +// The server responds with a CheckResult frame immediately and again each time +// a planner for one of the workflow's storage/compute peers is updated. +// When the stream is interrupted the cache entries for those peers are evicted +// and a PB_CLOSE_PLANNER event is emitted on NATS. +// Query params: +// - as_possible=true ignore start date, search from now +// - preemption=true validate anyway, raise warnings +// +// @Param id path string true "workflow id" +// @Param as_possible query bool false "find nearest free slot from now" +// @Param preemption query bool false "validate anyway, raise warnings" +// @Success 101 +// @router /:id/check [get] +func (o *WorkflowSchedulerController) CheckStream() { + wfID := o.Ctx.Input.Param(":id") + asap, _ := o.GetBool("as_possible", false) + preemption, _ := o.GetBool("preemption", false) + + user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) + req := &tools.APIRequest{ + Username: user, + PeerID: peerID, + Groups: groups, + Caller: nil, + Admin: true, + } + + // Resolve the peer IDs concerned by this workflow before upgrading so we + // can abort cleanly with a plain HTTP error if the workflow is not found. + watchedPeers, err := infrastructure.GetWorkflowPeerIDs(wfID, req) + if err != nil { + o.Data["json"] = map[string]interface{}{"code": 404, "error": err.Error()} + o.ServeJSON() + return + } + + // Upgrade to WebSocket. + conn, err := wsUpgrader.Upgrade(o.Ctx.ResponseWriter, o.Ctx.Request, nil) + if err != nil { + return + } + + // Read the schedule parameters sent by the client as the first message. + var ws infrastructure.WorkflowSchedule + if err := conn.ReadJSON(&ws); err != nil { + conn.Close() + return + } + + // Subscribe to planner updates for the initially resolved peers and to + // workflow change notifications (peer list may change on workflow edit). + plannerCh, plannerUnsub := infrastructure.SubscribePlannerUpdates(watchedPeers) + wfCh, wfUnsub := infrastructure.SubscribeWorkflowUpdates(wfID) + + // Cleanup on exit: cancel subscriptions, evict planner cache entries, + // signal PB_CLOSE_PLANNER on NATS for each peer that was being watched. + defer func() { + conn.Close() + plannerUnsub() + wfUnsub() + for _, peer := range watchedPeers { + if b, err := json.Marshal(map[string]interface{}{"peer_id": peer}); err == nil { + infrastructure.EmitNATS(peer, tools.PropalgationMessage{ + Action: tools.PB_CLOSE_PLANNER, + Payload: b, + }) + } + } + }() + + push := func() error { + result, checkErr := ws.Check(wfID, asap, preemption, req) + if checkErr != nil { + return checkErr + } + return conn.WriteJSON(result) + } + + // Initial check. + if err := push(); err != nil { + return + } + + // Detect client-side close in a separate goroutine. + closeCh := make(chan struct{}) + go func() { + defer close(closeCh) + for { + if _, _, err := conn.ReadMessage(); err != nil { + return + } + } + }() + + // Stream loop. + for { + select { + case <-wfCh: + // The workflow was modified: refresh the peer list and re-subscribe + // so the stream watches the correct set of planners going forward. + if newPeers, err := infrastructure.GetWorkflowPeerIDs(wfID, req); err == nil { + plannerUnsub() + watchedPeers = newPeers + plannerCh, plannerUnsub = infrastructure.SubscribePlannerUpdates(newPeers) + } + if err := push(); err != nil { + return + } + case <-plannerCh: + if err := push(); err != nil { + return + } + case <-closeCh: + return + } + } +} + // @Title UnSchedule // @Description schedule workflow // @Param id path string true "id execution" diff --git a/docs/nats.md b/docs/nats.md new file mode 100644 index 0000000..207b322 --- /dev/null +++ b/docs/nats.md @@ -0,0 +1,140 @@ +# NATS dans oc-scheduler + +## Vue d'ensemble + +`oc-scheduler` utilise NATS comme bus d'événements pour deux objectifs : + +1. **Recevoir les planners** (disponibilité des ressources) publiés par `oc-discovery`. +2. **Réagir aux modifications de workflows** pour diffuser un planner actualisé et signaler les streams WebSocket actifs. + +Tout le code NATS se trouve dans `infrastructure/nats.go`. + +--- + +## Canaux écoutés + +### `PROPALGATION_EVENT` — réception des planners + +**Condition d'acceptation :** `resp.FromApp == "oc-discovery"` et `prop.Action == PB_PLANNER`. + +**Ce qui se passe :** +- Le payload est désérialisé en `planner.Planner`. +- Le champ `peer_id` est extrait pour identifier le pair. +- Le planner est stocké dans `PlannerCache[peerID]` via `storePlanner()`. +- Si c'est la **première apparition** de ce `peerID` dans le cache, une goroutine de TTL est lancée (voir §TTL ci-dessous). +- Tous les abonnés en attente d'un changement sur ce `peerID` sont notifiés. + +### `CREATE_RESOURCE` — modification d'un workflow + +**Condition d'acceptation :** `resp.Datatype == WORKFLOW`. + +**Ce qui se passe :** +1. Le payload est désérialisé en `workflow.Workflow`. +2. `broadcastPlanner(wf)` est appelé : pour chaque pair (storage + compute) du workflow dont le planner **n'est pas encore en cache**, un événement `PB_PLANNER` est émis sur NATS afin de demander un planner frais à `oc-discovery`. +3. `notifyWorkflowWatchers(wf.GetID())` est appelé : tous les streams WebSocket qui observent ce workflow sont signalés pour **rafraîchir leur liste de pairs surveillés**. + +--- + +## Canaux émis + +### `PROPALGATION_EVENT` — deux actions possibles + +| Action | Déclencheur | Effet attendu | +|---|---|---| +| `PB_PLANNER` | Workflow modifié, pair inconnu du cache | `oc-discovery` renvoie le planner du pair | +| `PB_CLOSE_PLANNER` | TTL expiré **ou** déconnexion WebSocket | Les consommateurs (oc-discovery, autres schedulers) libèrent leur état pour ce pair | + +--- + +## Cache des planners (`PlannerCache`) + +``` +PlannerCache : map[string]*planner.Planner // clé = peerID +plannerAddedAt : map[string]time.Time // horodatage de première insertion +``` + +- Protégé par `plannerMu` (RWMutex). +- Alimenté uniquement via `storePlanner()` (appelé par le listener NATS). +- Supprimé via `EmitNATS(peerID, PB_CLOSE_PLANNER)`, qui efface l'entrée **et** notifie les abonnés. + +### TTL de 24 heures + +À la **première** insertion d'un `peerID`, une goroutine est lancée : + +``` +sleep(24h) +→ si l'entrée existe encore : EmitNATS(peerID, PB_CLOSE_PLANNER) +``` + +Cela évite que des planners obsolètes stagnent indéfiniment. L'entrée est supprimée et les streams actifs reçoivent une notification « plus de planner » pour ce pair. + +--- + +## Pub/sub interne + +Un registre d'abonnements en mémoire permet à d'autres composants (notamment le controller WebSocket) de réagir aux événements sans coupler directement le code NATS et les goroutines HTTP. + +Deux registres distincts : + +| Registre | Clé | Signification | +|---|---|---| +| `plannerSubs` | `peerID` | « le planner de ce pair a changé » | +| `workflowSubs` | `workflowID` | « ce workflow a été modifié » | + +### API + +```go +// S'abonner aux changements de planners pour plusieurs pairs +ch, cancel := SubscribePlannerUpdates(peerIDs []string) + +// S'abonner aux modifications d'un workflow +ch, cancel := SubscribeWorkflowUpdates(wfID string) +``` + +Chaque canal est bufférisé (`capacity 1`) : si un signal est déjà en attente, les suivants sont ignorés sans bloquer. + +--- + +## Intégration avec le stream WebSocket (`GET /oc/:id/check`) + +Le handler `CheckStream` dans `controllers/workflow_sheduler.go` exploite ces mécanismes : + +1. **Ouverture** : résolution des `peerIDs` du workflow, abonnement à `SubscribePlannerUpdates` et `SubscribeWorkflowUpdates`. +2. **Boucle de streaming** : + - `plannerCh` reçoit un signal → re-calcul du `CheckResult` et envoi au client. + - `wfCh` reçoit un signal (workflow modifié) → recalcul des `peerIDs`, désabonnement + ré-abonnement aux nouveaux pairs, re-calcul et envoi. +3. **Fermeture** (déconnexion client) : + - Désabonnement des deux registres. + - `EmitNATS(peerID, PB_CLOSE_PLANNER)` pour **chaque pair surveillé** : le cache est purgé et `oc-discovery` est informé que le scheduler n'a plus besoin du planner. + +--- + +## Flux de données résumé + +``` +oc-discovery ──PROPALGATION_EVENT(PB_PLANNER)──► ListenNATS + │ + storePlanner() + PlannerCache[peerID] = planner + notifyPlannerWatchers(peerID) + │ + SubscribePlannerUpdates + │ + CheckStream (WS) ──► client + +Workflow modifié ──CREATE_RESOURCE(WORKFLOW)──► ListenNATS + │ + broadcastPlanner(wf) + PROPALGATION_EVENT(PB_PLANNER) → oc-discovery + notifyWorkflowWatchers(wfID) + │ + SubscribeWorkflowUpdates + │ + CheckStream refresh peerIDs ──► client + +TTL 24h / déconnexion WS ──► EmitNATS(PB_CLOSE_PLANNER) + │ + delete PlannerCache[peerID] + notifyPlannerWatchers(peerID) + PROPALGATION_EVENT(PB_CLOSE_PLANNER) → NATS bus +``` diff --git a/go.mod b/go.mod index 64ee8cb..19723c0 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-scheduler go 1.24.6 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260218112419-fa5c3a3c605b + cloud.o-forge.io/core/oc-lib v0.0.0-20260223162637-ff830065ec27 github.com/beego/beego/v2 v2.3.8 github.com/google/uuid v1.6.0 github.com/robfig/cron v1.2.0 @@ -24,6 +24,7 @@ require ( github.com/golang/snappy v1.0.0 // indirect github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 // indirect github.com/goraz/onion v0.1.3 // indirect + github.com/gorilla/websocket v1.5.3 github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/jtolds/gls v4.20.0+incompatible // indirect github.com/klauspost/compress v1.18.0 // indirect diff --git a/go.sum b/go.sum index 4f4e10a..82dbe62 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,15 @@ -cloud.o-forge.io/core/oc-lib v0.0.0-20260203150531-ef916fe2d995 h1:ZDRvnzTTNHgMm5hYmseHdEPqQ6rn/4v+P9f/JIxPaNw= -cloud.o-forge.io/core/oc-lib v0.0.0-20260203150531-ef916fe2d995/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks= -cloud.o-forge.io/core/oc-lib v0.0.0-20260212123952-403913d8cf13 h1:DNIPQ7C+7wjbj5RUx29wLxuIe/wiSOcuUMlLRIv6Fvs= -cloud.o-forge.io/core/oc-lib v0.0.0-20260212123952-403913d8cf13/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo= -cloud.o-forge.io/core/oc-lib v0.0.0-20260218112419-fa5c3a3c605b h1:ws9S0QhCiwYuxCZNi6ZfZsRvdRJ6KkkOfLT/suMrcUk= -cloud.o-forge.io/core/oc-lib v0.0.0-20260218112419-fa5c3a3c605b/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo= +cloud.o-forge.io/core/oc-lib v0.0.0-20260223141827-5d32b4646a86 h1:/7XYbCzzo062lYbyBM3MA7KLrJII9iCQzvw4T5g/4oY= +cloud.o-forge.io/core/oc-lib v0.0.0-20260223141827-5d32b4646a86/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo= +cloud.o-forge.io/core/oc-lib v0.0.0-20260223142248-b08bbf51ddc5 h1:qxLz4rrFxB1dmJa0/Q6AWBwQgmVt7LVXB0RgwpGYeXE= +cloud.o-forge.io/core/oc-lib v0.0.0-20260223142248-b08bbf51ddc5/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo= +cloud.o-forge.io/core/oc-lib v0.0.0-20260223144148-f28e2c362020 h1:F7Ifw3WgtCnDur1p5+EuFZrM9yy7KSWoIyDQ8opQE90= +cloud.o-forge.io/core/oc-lib v0.0.0-20260223144148-f28e2c362020/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo= +cloud.o-forge.io/core/oc-lib v0.0.0-20260223145010-e10bb5545561 h1:q5m2UMsEgrfN0OJsoa4Sme0v4OO1pnIt8OsAwdL+5/A= +cloud.o-forge.io/core/oc-lib v0.0.0-20260223145010-e10bb5545561/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo= +cloud.o-forge.io/core/oc-lib v0.0.0-20260223145640-e039fa56b64c h1:3PRvQdSSGjmw+Txkf0zWs3F+V9URq22zQCLR3o7bNBY= +cloud.o-forge.io/core/oc-lib v0.0.0-20260223145640-e039fa56b64c/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo= +cloud.o-forge.io/core/oc-lib v0.0.0-20260223162637-ff830065ec27 h1:cw3R1/Ivlr3W1XZ2cCHRrLB6UG/3dhdvG0i+P5W1tYc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260223162637-ff830065ec27/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/beego/beego/v2 v2.3.8 h1:wplhB1pF4TxR+2SS4PUej8eDoH4xGfxuHfS7wAk9VBc= github.com/beego/beego/v2 v2.3.8/go.mod h1:8vl9+RrXqvodrl9C8yivX1e6le6deCK6RWeq8R7gTTg= @@ -21,6 +27,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/decred/dcrd/crypto/blake256 v1.1.0 h1:zPMNGQCm0g4QTY27fOCorQW7EryeQ/U0x++OzVrdms8= +github.com/decred/dcrd/crypto/blake256 v1.1.0/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 h1:NMZiJj8QnKe1LgsbDayM4UoHwbvwDRwnI3hwNaAHRnc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40= github.com/elazarl/go-bindata-assetfs v1.0.1 h1:m0kkaHRKEu7tUIUFVwhGGGYClXvyl4RE03qmvRTNfbw= @@ -49,14 +57,20 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/goraz/onion v0.1.3 h1:KhyvbDA2b70gcz/d5izfwTiOH8SmrvV43AsVzpng3n0= github.com/goraz/onion v0.1.3/go.mod h1:XEmz1XoBz+wxTgWB8NwuvRm4RAu3vKxvrmYtzK+XCuQ= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/ipfs/go-cid v0.5.0 h1:goEKKhaGm0ul11IHA7I6p1GmKz8kEYniqFopaB5Otwg= +github.com/ipfs/go-cid v0.5.0/go.mod h1:0L7vmeNXpQpUS9vt+yEARkJ8rOg43DF3iPgn4GIN0mk= github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= +github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -65,6 +79,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= +github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/libp2p/go-libp2p/core v0.43.0-rc2 h1:1X1aDJNWhMfodJ/ynbaGLkgnC8f+hfBIqQDrzxFZOqI= github.com/libp2p/go-libp2p/core v0.43.0-rc2/go.mod h1:NYeJ9lvyBv9nbDk2IuGb8gFKEOkIv/W5YRIy1pAJB2Q= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= @@ -75,6 +91,8 @@ github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= +github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= @@ -84,6 +102,22 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= +github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= +github.com/multiformats/go-base32 v0.1.0 h1:pVx9xoSPqEIQG8o+UbAe7DNi51oej1NtK+aGkbLYxPE= +github.com/multiformats/go-base32 v0.1.0/go.mod h1:Kj3tFY6zNr+ABYMqeUNeGvkIC/UYgtWibDcT0rExnbI= +github.com/multiformats/go-base36 v0.2.0 h1:lFsAbNOGeKtuKozrtBsAkSVhv1p9D0/qedU9rQyccr0= +github.com/multiformats/go-base36 v0.2.0/go.mod h1:qvnKE++v+2MWCfePClUEjE78Z7P2a1UV0xHgWc0hkp4= +github.com/multiformats/go-multiaddr v0.16.0 h1:oGWEVKioVQcdIOBlYM8BH1rZDWOGJSqr9/BKl6zQ4qc= +github.com/multiformats/go-multiaddr v0.16.0/go.mod h1:JSVUmXDjsVFiW7RjIFMP7+Ev+h1DTbiJgVeTV/tcmP0= +github.com/multiformats/go-multibase v0.2.0 h1:isdYCVLvksgWlMW9OZRYJEa9pZETFivncJHmHnnd87g= +github.com/multiformats/go-multibase v0.2.0/go.mod h1:bFBZX4lKCA/2lyOFSAoKH5SS6oPyjtnzK/XTFDPkNuk= +github.com/multiformats/go-multicodec v0.9.1 h1:x/Fuxr7ZuR4jJV4Os5g444F7xC4XmyUaT/FWtE+9Zjo= +github.com/multiformats/go-multicodec v0.9.1/go.mod h1:LLWNMtyV5ithSBUo3vFIMaeDy+h3EbkMTek1m+Fybbo= +github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U= +github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM= +github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= +github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/nats-io/nats.go v1.44.0 h1:ECKVrDLdh/kDPV1g0gAQ+2+m2KprqZK5O/eJAyAnH2M= @@ -108,8 +142,7 @@ github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7D github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw= github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= -github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= -github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= @@ -123,6 +156,8 @@ github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYl github.com/smartystreets/goconvey v0.0.0-20190731233626-505e41936337/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hgR6gDIPg= github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= @@ -145,6 +180,8 @@ golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= +golang.org/x/exp v0.0.0-20250606033433-dcc06ee1d476 h1:bsqhLWFR6G6xiQcb+JoGqdKdRU6WzPWmK8E0jxTjzo4= +golang.org/x/exp v0.0.0-20250606033433-dcc06ee1d476/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -191,3 +228,5 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +lukechampine.com/blake3 v1.4.1 h1:I3Smz7gso8w4/TunLKec6K2fn+kyKtDxr/xcQEN84Wg= +lukechampine.com/blake3 v1.4.1/go.mod h1:QFosUxmjB8mnrWFSNwKmvxHpfY72bmD2tQ0kBMM3kwo= diff --git a/infrastructure/nats.go b/infrastructure/nats.go new file mode 100644 index 0000000..2cb8235 --- /dev/null +++ b/infrastructure/nats.go @@ -0,0 +1,643 @@ +package infrastructure + +import ( + "encoding/json" + "fmt" + "slices" + "sync" + "time" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/config" + "cloud.o-forge.io/core/oc-lib/models/booking" + "cloud.o-forge.io/core/oc-lib/models/booking/planner" + "cloud.o-forge.io/core/oc-lib/models/common/enum" + "cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource" + "cloud.o-forge.io/core/oc-lib/models/utils" + "cloud.o-forge.io/core/oc-lib/models/workflow" + "cloud.o-forge.io/core/oc-lib/models/workflow/graph" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" + "cloud.o-forge.io/core/oc-lib/tools" + "github.com/nats-io/nats.go" +) + +const plannerTTL = 24 * time.Hour + +// --------------------------------------------------------------------------- +// Planner cache — protected by plannerMu +// --------------------------------------------------------------------------- + +var plannerMu sync.RWMutex +var PlannerCache = map[string]*planner.Planner{} +var plannerAddedAt = map[string]time.Time{} // peerID → first-seen timestamp + +// --------------------------------------------------------------------------- +// Subscriber registries — one keyed by peerID, one by workflowID +// --------------------------------------------------------------------------- + +var subsMu sync.RWMutex +var plannerSubs = map[string][]chan struct{}{} // peerID → notification channels +var workflowSubs = map[string][]chan struct{}{} // workflowID → notification channels + +// SubscribePlannerUpdates registers interest in planner changes for the given +// peer IDs. The returned channel receives one struct{} (non-blocking) each time +// any of those planners is updated. Call cancel to unregister. +func SubscribePlannerUpdates(peerIDs []string) (<-chan struct{}, func()) { + return subscribe(&subsMu, plannerSubs, peerIDs) +} + +// SubscribeWorkflowUpdates registers interest in workflow modifications for the +// given workflow ID. The returned channel is signalled when the workflow changes +// (peer list may have grown or shrunk). Call cancel to unregister. +func SubscribeWorkflowUpdates(wfID string) (<-chan struct{}, func()) { + ch, cancel := subscribe(&subsMu, workflowSubs, []string{wfID}) + return ch, cancel +} + +// subscribe is the generic helper used by both registries. +func subscribe(mu *sync.RWMutex, registry map[string][]chan struct{}, keys []string) (<-chan struct{}, func()) { + ch := make(chan struct{}, 1) + mu.Lock() + for _, k := range keys { + registry[k] = append(registry[k], ch) + } + mu.Unlock() + cancel := func() { + mu.Lock() + for _, k := range keys { + subs := registry[k] + for i, s := range subs { + if s == ch { + registry[k] = append(subs[:i], subs[i+1:]...) + break + } + } + } + mu.Unlock() + } + return ch, cancel +} + +func notifyPlannerWatchers(peerID string) { + notify(&subsMu, plannerSubs, peerID) +} + +func notifyWorkflowWatchers(wfID string) { + notify(&subsMu, workflowSubs, wfID) +} + +func notify(mu *sync.RWMutex, registry map[string][]chan struct{}, key string) { + mu.RLock() + subs := registry[key] + mu.RUnlock() + for _, ch := range subs { + select { + case ch <- struct{}{}: + default: + } + } +} + +// --------------------------------------------------------------------------- +// Cache helpers +// --------------------------------------------------------------------------- + +// storePlanner inserts or updates a planner for peerID. +// On first insertion it schedules an automatic eviction after plannerTTL. +// All subscribers interested in this peer are notified. +func storePlanner(peerID string, p *planner.Planner) { + plannerMu.Lock() + isNew := PlannerCache[peerID] == nil + PlannerCache[peerID] = p + if isNew { + plannerAddedAt[peerID] = time.Now() + go evictAfter(peerID, plannerTTL) + } + plannerMu.Unlock() + notifyPlannerWatchers(peerID) +} + +// evictAfter waits ttl from the first-seen time for peerID then emits a +// PB_CLOSE_PLANNER event, which removes the entry from the cache and notifies +// NATS. +func evictAfter(peerID string, ttl time.Duration) { + time.Sleep(ttl) + plannerMu.RLock() + _, exists := PlannerCache[peerID] + plannerMu.RUnlock() + if exists { + EmitNATS(peerID, tools.PropalgationMessage{Action: tools.PB_CLOSE_PLANNER}) + } +} + +// --------------------------------------------------------------------------- +// NATS emission +// --------------------------------------------------------------------------- + +func EmitNATS(peerID string, message tools.PropalgationMessage) { + if message.Action == tools.PB_CLOSE_PLANNER { + plannerMu.Lock() + delete(PlannerCache, peerID) + delete(plannerAddedAt, peerID) + plannerMu.Unlock() + notifyPlannerWatchers(peerID) // let streams re-evaluate (will warn "no planner") + } + b, _ := json.Marshal(message) + tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ + FromApp: "oc-scheduler", + Datatype: -1, + Method: int(tools.PROPALGATION_EVENT), + Payload: b, + }) +} + +type executionConsidersPayload struct { + ID string `json:"id"` + ExecutionsID string `json:"executions_id"` + ExecutionID string `json:"execution_id"` + PeerIDs []string `json:"peer_ids"` +} + +// emitConsiders broadcasts a PROPALGATION_EVENT with the Considers action, +// carrying the stored resource ID and its datatype (BOOKING or PURCHASE_RESOURCE). +func emitConsiders(id string, executionID string, dt tools.DataType) { + access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil) + data := access.LoadOne(executionID) + if data.ToWorkflowExecution() != nil { + exec := data.ToWorkflowExecution() + if peers, err := GetWorkflowPeerIDs(exec.WorkflowID, &tools.APIRequest{Admin: true}); err == nil { + payload, _ := json.Marshal(&executionConsidersPayload{ + ID: id, + ExecutionsID: exec.ExecutionsID, + ExecutionID: executionID, + PeerIDs: peers, + }) + b, _ := json.Marshal(tools.PropalgationMessage{ + DataType: int(dt), + Action: tools.PB_CONSIDERS, + Payload: payload, + }) + tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ + FromApp: "oc-scheduler", + Datatype: dt, + Method: int(tools.PROPALGATION_EVENT), + Payload: b, + }) + } + } +} + +// EmitConsidersExecution broadcasts a Considers / WORKFLOW_EXECUTION message to all +// storage and compute peers of wf once the execution has transitioned to SCHEDULED. +// Each receiving peer will use it to confirm (IsDraft=false) their local drafts. +func EmitConsidersExecution(exec *workflow_execution.WorkflowExecution, wf *workflow.Workflow) { + if wf == nil || wf.Graph == nil { + return + } + peerIDs, err := GetWorkflowPeerIDs(wf.GetID(), &tools.APIRequest{Admin: true}) + if err != nil { + return + } + if len(peerIDs) == 0 { + return + } + payload, err := json.Marshal(executionConsidersPayload{ + ID: exec.GetID(), + ExecutionID: exec.GetID(), + ExecutionsID: exec.ExecutionsID, + PeerIDs: peerIDs}) + if err != nil { + return + } + b, err := json.Marshal(tools.PropalgationMessage{ + DataType: int(tools.WORKFLOW_EXECUTION), + Action: tools.PB_CONSIDERS, + Payload: payload, + }) + if err != nil { + return + } + tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ + FromApp: "oc-scheduler", + Datatype: tools.WORKFLOW_EXECUTION, + Method: int(tools.PROPALGATION_EVENT), + Payload: b, + }) +} + +// updateExecutionState sets BookingsState[id]=true (dt==BOOKING) or +// PurchasesState[id]=true (dt==PURCHASE_RESOURCE) on the target execution. +// payload must be JSON-encoded {"id":"...", "execution_id":"..."}. +func updateExecutionState(payload []byte, dt tools.DataType) { + var data executionConsidersPayload + if err := json.Unmarshal(payload, &data); err != nil || data.ID == "" || data.ExecutionID == "" { + return + } + adminReq := &tools.APIRequest{Admin: true} + res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(data.ExecutionID) + if err != nil || res == nil { + fmt.Printf("updateExecutionState: could not load execution %s: %v\n", data.ExecutionID, err) + return + } + exec := res.(*workflow_execution.WorkflowExecution) + switch dt { + case tools.BOOKING: + if exec.BookingsState == nil { + exec.BookingsState = map[string]bool{} + } + exec.BookingsState[data.ID] = true + case tools.PURCHASE_RESOURCE: + if exec.PurchasesState == nil { + exec.PurchasesState = map[string]bool{} + } + exec.PurchasesState[data.ID] = true + } + found := true + for _, st := range exec.BookingsState { + if !st { + found = false + break + } + } + for _, st := range exec.PurchasesState { + if !st { + found = false + break + } + } + if found { + exec.State = enum.SCHEDULED + } + if _, _, err := utils.GenericRawUpdateOne(exec, data.ExecutionID, workflow_execution.NewAccessor(adminReq)); err != nil { + fmt.Printf("updateExecutionState: could not update execution %s: %v\n", data.ExecutionID, err) + } +} + +// confirmExecutionDrafts is called when a Considers/WORKFLOW_EXECUTION message +// is received from oc-discovery, meaning the originating peer has confirmed the +// execution as SCHEDULED. For every booking and purchase ID listed in the +// execution's states, we confirm the local draft (IsDraft=false). +func confirmExecutionDrafts(payload []byte) { + var data executionConsidersPayload + if err := json.Unmarshal(payload, &data); err != nil { + fmt.Printf("confirmExecutionDrafts: could not parse payload: %v\n", err) + return + } + access := oclib.NewRequestAdmin(oclib.LibDataEnum(tools.WORKFLOW_EXECUTION), nil) + d := access.LoadOne(data.ExecutionID) + if exec := d.ToWorkflowExecution(); exec != nil { + for id := range exec.BookingsState { + go confirmResource(id, tools.BOOKING) + } + for id := range exec.PurchasesState { + go confirmResource(id, tools.PURCHASE_RESOURCE) + } + } +} + +// --------------------------------------------------------------------------- +// NATS listeners +// --------------------------------------------------------------------------- + +func ListenNATS() { + tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ + // Receive planner snapshots pushed by oc-discovery and cache them. + // Considers messages: + // BOOKING / PURCHASE_RESOURCE → mark the individual resource as + // considered in the target WorkflowExecution (BookingsState / PurchasesState). + // WORKFLOW_EXECUTION → the execution reached SCHEDULED; confirm all + // local draft bookings and purchases listed in its states. + tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) { + if resp.FromApp != "oc-discovery" { + return + } + var prop tools.PropalgationMessage + if err := json.Unmarshal(resp.Payload, &prop); err != nil { + return + } + switch prop.Action { + case tools.PB_PLANNER: + m := map[string]interface{}{} + p := planner.Planner{} + if err := json.Unmarshal(prop.Payload, &m); err != nil { + return + } + if err := json.Unmarshal(prop.Payload, &p); err != nil { + return + } + storePlanner(fmt.Sprintf("%v", m["peer_id"]), &p) + case tools.PB_CONSIDERS: + switch tools.DataType(prop.DataType) { + case tools.BOOKING, tools.PURCHASE_RESOURCE: + updateExecutionState(prop.Payload, tools.DataType(prop.DataType)) + case tools.WORKFLOW_EXECUTION: + confirmExecutionDrafts(prop.Payload) + } + } + }, + + // Incoming resource creation events: + // - WORKFLOW → refresh peer planner entries and notify CheckStream watchers. + // - BOOKING → if destined for us, validate, store as draft, start 10-min + // expiry timer, and emit a "considers_booking" response. + // - PURCHASE → if destined for us, store as draft, start 10-min expiry + // timer, and emit a "considers_purchase" response. + tools.REMOVE_RESOURCE: func(resp tools.NATSResponse) { + switch resp.Datatype { + case tools.WORKFLOW: + wf := workflow.Workflow{} + if err := json.Unmarshal(resp.Payload, &wf); err != nil { + return + } + notifyWorkflowWatchers(wf.GetID()) + } + }, + tools.CREATE_RESOURCE: func(resp tools.NATSResponse) { + switch resp.Datatype { + case tools.WORKFLOW: + wf := workflow.Workflow{} + if err := json.Unmarshal(resp.Payload, &wf); err != nil { + return + } + broadcastPlanner(&wf) + notifyWorkflowWatchers(wf.GetID()) + case tools.BOOKING: + var bk booking.Booking + if err := json.Unmarshal(resp.Payload, &bk); err != nil { + return + } + self, err := oclib.GetMySelf() + if err != nil || self == nil || bk.DestPeerID != self.GetID() { + return + } + // Reject bookings whose start date is already in the past. + if !bk.ExpectedStartDate.IsZero() && bk.ExpectedStartDate.Before(time.Now()) { + fmt.Println("ListenNATS: booking start date is in the past, discarding") + return + } + // Verify the slot is free in our planner (if we have one). + plannerMu.RLock() + p := PlannerCache[self.PeerID] + plannerMu.RUnlock() + if p != nil && !checkInstance(p, bk.ResourceID, bk.InstanceID, bk.ExpectedStartDate, bk.ExpectedEndDate) { + fmt.Println("ListenNATS: booking conflicts with local planner, discarding") + return + } + adminReq := &tools.APIRequest{Admin: true} + bk.IsDraft = true + stored, _, err := booking.NewAccessor(adminReq).StoreOne(&bk) + if err != nil { + fmt.Println("ListenNATS: could not store booking:", err) + return + } + storedID := stored.GetID() + go refreshSelfPlanner(self.PeerID, adminReq) + time.AfterFunc(10*time.Minute, func() { draftTimeout(storedID, tools.BOOKING) }) + go emitConsiders(storedID, stored.(*booking.Booking).ExecutionID, tools.BOOKING) + + case tools.PURCHASE_RESOURCE: + var pr purchase_resource.PurchaseResource + if err := json.Unmarshal(resp.Payload, &pr); err != nil { + return + } + self, err := oclib.GetMySelf() + if err != nil || self == nil || pr.DestPeerID != self.GetID() { + return + } + adminReq := &tools.APIRequest{Admin: true} + pr.IsDraft = true + stored, _, err := purchase_resource.NewAccessor(adminReq).StoreOne(&pr) + if err != nil { + fmt.Println("ListenNATS: could not store purchase:", err) + return + } + storedID := stored.GetID() + time.AfterFunc(10*time.Minute, func() { draftTimeout(storedID, tools.PURCHASE_RESOURCE) }) + go emitConsiders(storedID, stored.(*purchase_resource.PurchaseResource).ExecutionID, tools.PURCHASE_RESOURCE) + } + }, + }) +} + +// --------------------------------------------------------------------------- +// Draft timeout +// --------------------------------------------------------------------------- + +// draftTimeout deletes a booking or purchase resource if it is still a draft +// after the 10-minute confirmation window has elapsed. +func draftTimeout(id string, dt tools.DataType) { + adminReq := &tools.APIRequest{Admin: true} + var res utils.DBObject + var loadErr error + switch dt { + case tools.BOOKING: + res, _, loadErr = booking.NewAccessor(adminReq).LoadOne(id) + case tools.PURCHASE_RESOURCE: + res, _, loadErr = purchase_resource.NewAccessor(adminReq).LoadOne(id) + default: + return + } + if loadErr != nil || res == nil || !res.IsDrafted() { + return + } + switch dt { + case tools.BOOKING: + booking.NewAccessor(adminReq).DeleteOne(id) + case tools.PURCHASE_RESOURCE: + purchase_resource.NewAccessor(adminReq).DeleteOne(id) + } + fmt.Printf("draftTimeout: %s %s deleted (still draft after 10 min)\n", dt.String(), id) +} + +// --------------------------------------------------------------------------- +// Confirm channels +// --------------------------------------------------------------------------- + +// confirmResource sets IsDraft=false for a booking or purchase resource. +// For bookings it also advances State to SCHEDULED and refreshes the local planner. +func confirmResource(id string, dt tools.DataType) { + adminReq := &tools.APIRequest{Admin: true} + switch dt { + case tools.BOOKING: + res, _, err := booking.NewAccessor(adminReq).LoadOne(id) + if err != nil || res == nil { + fmt.Printf("confirmResource: could not load booking %s: %v\n", id, err) + return + } + bk := res.(*booking.Booking) + bk.IsDraft = false + bk.State = enum.SCHEDULED + if _, _, err := utils.GenericRawUpdateOne(bk, id, booking.NewAccessor(adminReq)); err != nil { + fmt.Printf("confirmResource: could not confirm booking %s: %v\n", id, err) + return + } + self, err := oclib.GetMySelf() + if err == nil && self != nil { + go refreshSelfPlanner(self.PeerID, adminReq) + } + case tools.PURCHASE_RESOURCE: + res, _, err := purchase_resource.NewAccessor(adminReq).LoadOne(id) + if err != nil || res == nil { + fmt.Printf("confirmResource: could not load purchase %s: %v\n", id, err) + return + } + pr := res.(*purchase_resource.PurchaseResource) + pr.IsDraft = false + if _, _, err := utils.GenericRawUpdateOne(pr, id, purchase_resource.NewAccessor(adminReq)); err != nil { + fmt.Printf("confirmResource: could not confirm purchase %s: %v\n", id, err) + } + } +} + +// listenConfirmChannel subscribes to a NATS subject and calls confirmResource +// for each message received. The message body is expected to be the plain +// resource ID (UTF-8 string). +func listenConfirmChannel(nc *nats.Conn, subject string, dt tools.DataType, wg *sync.WaitGroup) { + defer wg.Done() + ch := make(chan *nats.Msg, 64) + sub, err := nc.ChanSubscribe(subject, ch) + if err != nil { + fmt.Printf("listenConfirmChannel: could not subscribe to %s: %v\n", subject, err) + return + } + defer sub.Unsubscribe() + for msg := range ch { + confirmResource(string(msg.Data), dt) + } +} + +// ListenConfirm opens a direct NATS connection and subscribes to the hardcoded +// "confirm_booking" and "confirm_purchase" subjects. It reconnects automatically +// if the connection is lost. +func ListenConfirm() { + natsURL := config.GetConfig().NATSUrl + if natsURL == "" { + fmt.Println("ListenConfirm: NATS_SERVER not set, skipping confirm listeners") + return + } + for { + nc, err := nats.Connect(natsURL) + if err != nil { + fmt.Println("ListenConfirm: could not connect to NATS:", err) + time.Sleep(time.Minute) + continue + } + var wg sync.WaitGroup + wg.Add(2) + go listenConfirmChannel(nc, "confirm_booking", tools.BOOKING, &wg) + go listenConfirmChannel(nc, "confirm_purchase", tools.PURCHASE_RESOURCE, &wg) + wg.Wait() + nc.Close() + } +} + +// --------------------------------------------------------------------------- +// Self-planner initialisation +// --------------------------------------------------------------------------- + +// InitSelfPlanner bootstraps our own planner entry at startup. +// It waits (with 15-second retries) for our peer record to be present in the +// database before generating the first planner snapshot and broadcasting it +// on PB_PLANNER. This handles the race between oc-scheduler starting before +// oc-peer has fully registered our node. +func InitSelfPlanner() { + for { + self, err := oclib.GetMySelf() + if err != nil || self == nil { + fmt.Println("InitSelfPlanner: self peer not found yet, retrying in 15s...") + time.Sleep(15 * time.Second) + continue + } + refreshSelfPlanner(self.PeerID, &tools.APIRequest{Admin: true}) + return + } +} + +// --------------------------------------------------------------------------- +// Self-planner refresh +// --------------------------------------------------------------------------- + +// refreshSelfPlanner regenerates the local planner from the current state of +// the booking DB, stores it in PlannerCache under our own node UUID, and +// broadcasts it on PROPALGATION_EVENT / PB_PLANNER so all listeners (including +// oc-discovery) are kept in sync. +// +// It should be called whenever a booking for our own peer is created, whether +// by direct DB insertion (self-peer routing) or upon receiving a CREATE_RESOURCE +// BOOKING message from oc-discovery. +func refreshSelfPlanner(peerID string, request *tools.APIRequest) { + p, err := planner.GenerateShallow(request) + if err != nil { + fmt.Println("refreshSelfPlanner: could not generate planner:", err) + return + } + + // Update the local cache and notify any waiting CheckStream goroutines. + storePlanner(peerID, p) + + // Broadcast the updated planner so remote peers (and oc-discovery) can + // refresh their view of our availability. + type plannerWithPeer struct { + PeerID string `json:"peer_id"` + *planner.Planner + } + plannerPayload, err := json.Marshal(plannerWithPeer{PeerID: peerID, Planner: p}) + if err != nil { + return + } + EmitNATS(peerID, tools.PropalgationMessage{ + Action: tools.PB_PLANNER, + Payload: plannerPayload, + }) +} + +// --------------------------------------------------------------------------- +// Planner broadcast +// --------------------------------------------------------------------------- + +// broadcastPlanner iterates the storage and compute peers of the given workflow +// and, for each peer not yet in the cache, emits a PB_PLANNER propagation so +// downstream consumers (oc-discovery, other schedulers) refresh their state. +func broadcastPlanner(wf *workflow.Workflow) { + if wf.Graph == nil { + return + } + items := []graph.GraphItem{} + items = append(items, wf.GetGraphItems(wf.Graph.IsStorage)...) + items = append(items, wf.GetGraphItems(wf.Graph.IsCompute)...) + + seen := []string{} + for _, item := range items { + i := item + _, res := i.GetResource() + if res == nil { + continue + } + creatorID := res.GetCreatorID() + if slices.Contains(seen, creatorID) { + continue + } + + data := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil).LoadOne(creatorID) + p := data.ToPeer() + if p == nil { + continue + } + + plannerMu.RLock() + cached := PlannerCache[p.PeerID] + plannerMu.RUnlock() + + if cached == nil { + payload, err := json.Marshal(map[string]interface{}{"peer_id": p.PeerID}) + if err != nil { + continue + } + seen = append(seen, creatorID) + EmitNATS(p.PeerID, tools.PropalgationMessage{ + Action: tools.PB_PLANNER, + Payload: payload, + }) + } + } +} diff --git a/infrastructure/scheduler.go b/infrastructure/scheduler.go index 614395c..a4de7fa 100644 --- a/infrastructure/scheduler.go +++ b/infrastructure/scheduler.go @@ -1,18 +1,21 @@ package infrastructure import ( + "encoding/json" "errors" "fmt" "strings" - "sync" "time" + oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/models/bill" "cloud.o-forge.io/core/oc-lib/models/booking" + "cloud.o-forge.io/core/oc-lib/models/booking/planner" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/pricing" "cloud.o-forge.io/core/oc-lib/models/order" "cloud.o-forge.io/core/oc-lib/models/peer" + "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/workflow" @@ -112,20 +115,6 @@ func (ws *WorkflowSchedule) GetBuyAndBook(wfID string, request *tools.APIRequest purchased = append(purchased, exec.Buy(ws.SelectedBillingStrategy, ws.UUID, wfID, priceds)...) bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) } - - errCh := make(chan error, len(bookings)) - var m sync.Mutex - - for _, b := range bookings { - go getBooking(b, request, errCh, &m) - } - - for i := 0; i < len(bookings); i++ { - if err := <-errCh; err != nil { - return false, wf, execs, purchased, bookings, err - } - } - return true, wf, execs, purchased, bookings, nil } @@ -150,41 +139,6 @@ func (ws *WorkflowSchedule) GenerateOrder(purchases []*purchase_resource.Purchas } } -func getBooking(b *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) { - m.Lock() - c, err := getCallerCopy(request, errCh) - if err != nil { - errCh <- err - return - } - m.Unlock() - - meth := c.URLS[tools.BOOKING][tools.GET] - meth = strings.ReplaceAll(meth, ":id", b.ResourceID) - meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) - meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) - c.URLS[tools.BOOKING][tools.GET] = meth - _, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c) - - if err != nil { - errCh <- fmt.Errorf("%s", "error on "+b.DestPeerID+err.Error()) - return - } - - errCh <- nil -} - -func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) { - var c tools.HTTPCaller - err := request.Caller.DeepCopy(c) - if err != nil { - errCh <- err - return tools.HTTPCaller{}, nil - } - c.URLS = request.Caller.URLS - return c, err -} - func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*workflow_execution.WorkflowExecution, error) { if request == nil { return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("no request found") @@ -204,27 +158,28 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* } ws.Workflow = wf - var errCh = make(chan error, len(bookings)) - var m sync.Mutex + // Resolve our own peer MongoDB-ID once; used to decide local vs NATS routing. + selfID, _ := oclib.GetMySelf() - for _, purchase := range purchases { // TODO on Decentralize Stream. - go ws.CallDatacenter(purchase, purchase.DestPeerID, tools.PURCHASE_RESOURCE, request, errCh, &m) + errCh := make(chan error, len(purchases)) + for _, purchase := range purchases { + purchase.IsDraft = true + go propagateResource(purchase, purchase.DestPeerID, tools.PURCHASE_RESOURCE, selfID, request, errCh) } for i := 0; i < len(purchases); i++ { if err := <-errCh; err != nil { - return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) + return ws, wf, executions, errors.New("could not propagate purchase: " + fmt.Sprintf("%v", err)) } } errCh = make(chan error, len(bookings)) - - for _, booking := range bookings { // TODO on Decentralize Stream. - go ws.CallDatacenter(booking, booking.DestPeerID, tools.BOOKING, request, errCh, &m) + for _, bk := range bookings { + bk.IsDraft = true + go propagateResource(bk, bk.DestPeerID, tools.BOOKING, selfID, request, errCh) } - for i := 0; i < len(bookings); i++ { if err := <-errCh; err != nil { - return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) + return ws, wf, executions, errors.New("could not propagate booking: " + fmt.Sprintf("%v", err)) } } @@ -240,6 +195,7 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* } exec.StoreDraftDefault() utils.GenericStoreOne(exec, workflow_execution.NewAccessor(request)) + go EmitConsidersExecution(exec, wf) } fmt.Println("Schedules") @@ -248,21 +204,40 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* return ws, wf, executions, nil } -func (ws *WorkflowSchedule) CallDatacenter(purchase utils.DBObject, destPeerID string, dt tools.DataType, request *tools.APIRequest, errCh chan error, m *sync.Mutex) { - m.Lock() - c, err := getCallerCopy(request, errCh) +// propagateResource routes a purchase or booking to its destination: +// - If destPeerID matches our own peer (selfMongoID), the object is stored +// directly in the local DB as draft and the local planner is refreshed. +// - Otherwise a NATS CREATE_RESOURCE message is emitted so the destination +// peer can process it asynchronously. +// +// The caller is responsible for setting obj.IsDraft = true before calling. +func propagateResource(obj utils.DBObject, destPeerID string, dt tools.DataType, selfMongoID *peer.Peer, request *tools.APIRequest, errCh chan error) { + if selfMongoID == nil { + return + } // booking or purchase + if destPeerID == selfMongoID.GetID() { + if _, _, err := obj.GetAccessor(request).StoreOne(obj); err != nil { + errCh <- fmt.Errorf("could not store %s locally: %w", dt.String(), err) + return + } + // The planner tracks booking time-slots only; purchases do not affect it. + if dt == tools.BOOKING { + go refreshSelfPlanner(selfMongoID.PeerID, request) + } + errCh <- nil + return + } + payload, err := json.Marshal(obj) if err != nil { - errCh <- err + errCh <- fmt.Errorf("could not serialize %s: %w", dt.String(), err) return } - m.Unlock() - if res, err := (&peer.Peer{}).LaunchPeerExecution(destPeerID, "", dt, tools.POST, purchase.Serialize(purchase), &c); err != nil { - errCh <- err - return - } else { - data := res["data"].(map[string]interface{}) - purchase.SetID(fmt.Sprintf("%v", data["id"])) - } + tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ + FromApp: "oc-scheduler", + Datatype: dt, + Method: int(tools.CREATE_RESOURCE), + Payload: payload, + }) errCh <- nil } @@ -360,3 +335,303 @@ type Schedule struct { * TODO : LARGEST GRAIN PLANIFYING THE WORKFLOW WHEN OPTION IS SET * SET PROTECTION BORDER TIME */ + +// --------------------------------------------------------------------------- +// Slot availability check +// --------------------------------------------------------------------------- + +const ( + checkWindowHours = 5 // how far ahead to scan for a free slot (hours) + checkStepMin = 15 // time increment per scan step (minutes) +) + +// CheckResult holds the outcome of a slot availability check. +type CheckResult struct { + Available bool `json:"available"` + Start time.Time `json:"start"` + End *time.Time `json:"end,omitempty"` + // NextSlot is the nearest free slot found within checkWindowHours when + // the requested slot is unavailable, or the preferred (conflict-free) slot + // when running in preemption mode. + NextSlot *time.Time `json:"next_slot,omitempty"` + Warnings []string `json:"warnings,omitempty"` + // Preemptible is true when the check was run in preemption mode. + Preemptible bool `json:"preemptible,omitempty"` +} + +// bookingResource is the minimum info needed to verify a resource against the +// planner cache. +type bookingResource struct { + id string + peerID string + instanceID string // resolved from WorkflowSchedule.SelectedInstances +} + +// Check verifies that all booking-relevant resources (storage and compute) of +// the given workflow have capacity for the requested time slot. +// +// - asap=true → ignore ws.Start, begin searching from time.Now() +// - preemption → always return Available=true but populate Warnings with +// conflicts and NextSlot with the nearest conflict-free alternative +func (ws *WorkflowSchedule) Check(wfID string, asap bool, preemption bool, request *tools.APIRequest) (*CheckResult, error) { + // 1. Load workflow + obj, code, err := workflow.NewAccessor(request).LoadOne(wfID) + if code != 200 || err != nil { + msg := "could not load workflow " + wfID + if err != nil { + msg += ": " + err.Error() + } + return nil, errors.New(msg) + } + wf := obj.(*workflow.Workflow) + + // 2. Resolve start + start := ws.Start + if asap || start.IsZero() { + start = time.Now() + } + + // 3. Resolve end – use explicit end/duration or estimate via Planify + end := ws.End + if end == nil { + if ws.DurationS > 0 { + e := start.Add(time.Duration(ws.DurationS * float64(time.Second))) + end = &e + } else { + _, longest, _, _, planErr := wf.Planify( + start, nil, + ws.SelectedInstances, ws.SelectedPartnerships, + ws.SelectedBuyings, ws.SelectedStrategies, + int(ws.BookingMode), request, + ) + if planErr == nil && longest > 0 { + e := start.Add(time.Duration(longest) * time.Second) + end = &e + } + } + } + + // 4. Extract booking-relevant (storage + compute) resources from the graph, + // resolving the selected instance for each resource. + checkables := collectBookingResources(wf, ws.SelectedInstances) + + // 5. Check every resource against its peer's planner + unavailable, warnings := checkResourceAvailability(checkables, start, end) + + result := &CheckResult{ + Start: start, + End: end, + Warnings: warnings, + } + + // 6. Preemption mode: mark as schedulable regardless of conflicts, but + // surface warnings and the nearest conflict-free alternative. + if preemption { + result.Available = true + result.Preemptible = true + if len(unavailable) > 0 { + result.NextSlot = findNextSlot(checkables, start, end, checkWindowHours) + } + return result, nil + } + + // 7. All resources are free + if len(unavailable) == 0 { + result.Available = true + return result, nil + } + + // 8. Slot unavailable – locate the nearest free slot within the window + result.Available = false + result.NextSlot = findNextSlot(checkables, start, end, checkWindowHours) + return result, nil +} + +// collectBookingResources returns unique storage and compute resources from the +// workflow graph. For each resource the selected instance ID is resolved from +// selectedInstances (the scheduler's SelectedInstances ConfigItem) so the planner +// check targets the exact instance chosen by the user. +func collectBookingResources(wf *workflow.Workflow, selectedInstances workflow.ConfigItem) []bookingResource { + if wf.Graph == nil { + return nil + } + seen := map[string]bool{} + var result []bookingResource + + resolveInstanceID := func(res interface { + GetID() string + GetCreatorID() string + }) string { + idx := selectedInstances.Get(res.GetID()) + switch r := res.(type) { + case *resources.StorageResource: + if inst := r.GetSelectedInstance(idx); inst != nil { + return inst.GetID() + } + case *resources.ComputeResource: + if inst := r.GetSelectedInstance(idx); inst != nil { + return inst.GetID() + } + } + return "" + } + + for _, item := range wf.GetGraphItems(wf.Graph.IsStorage) { + i := item + _, res := i.GetResource() + if res == nil { + continue + } + id, peerID := res.GetID(), res.GetCreatorID() + if peerID == "" || seen[id] { + continue + } + seen[id] = true + result = append(result, bookingResource{ + id: id, + peerID: peerID, + instanceID: resolveInstanceID(res), + }) + } + + for _, item := range wf.GetGraphItems(wf.Graph.IsCompute) { + i := item + _, res := i.GetResource() + if res == nil { + continue + } + id, peerID := res.GetID(), res.GetCreatorID() + if peerID == "" || seen[id] { + continue + } + seen[id] = true + result = append(result, bookingResource{ + id: id, + peerID: peerID, + instanceID: resolveInstanceID(res), + }) + } + + return result +} + +// checkResourceAvailability returns the IDs of unavailable resources and +// human-readable warning messages. +func checkResourceAvailability(res []bookingResource, start time.Time, end *time.Time) (unavailable []string, warnings []string) { + for _, r := range res { + plannerMu.RLock() + p := PlannerCache[r.peerID] + plannerMu.RUnlock() + if p == nil { + warnings = append(warnings, fmt.Sprintf( + "peer %s planner not in cache for resource %s – assuming available", r.peerID, r.id)) + continue + } + if !checkInstance(p, r.id, r.instanceID, start, end) { + unavailable = append(unavailable, r.id) + warnings = append(warnings, fmt.Sprintf( + "resource %s is not available in [%s – %s]", + r.id, start.Format(time.RFC3339), formatOptTime(end))) + } + } + return +} + +// checkInstance checks availability for the specific instance resolved by the +// scheduler. When instanceID is empty (no instance selected / none resolvable), +// it falls back to checking all instances known in the planner and returns true +// if any one has remaining capacity. Returns true when no capacity is recorded. +func checkInstance(p *planner.Planner, resourceID string, instanceID string, start time.Time, end *time.Time) bool { + if instanceID != "" { + return p.Check(resourceID, instanceID, nil, start, end) + } + // Fallback: accept if any known instance has free capacity + caps, ok := p.Capacities[resourceID] + if !ok || len(caps) == 0 { + return true // no recorded usage → assume free + } + for id := range caps { + if p.Check(resourceID, id, nil, start, end) { + return true + } + } + return false +} + +// findNextSlot scans forward from 'from' in checkStepMin increments for up to +// windowH hours and returns the first candidate start time at which all +// resources are simultaneously free. +func findNextSlot(resources []bookingResource, from time.Time, originalEnd *time.Time, windowH int) *time.Time { + duration := time.Hour + if originalEnd != nil { + if d := originalEnd.Sub(from); d > 0 { + duration = d + } + } + step := time.Duration(checkStepMin) * time.Minute + limit := from.Add(time.Duration(windowH) * time.Hour) + for t := from.Add(step); t.Before(limit); t = t.Add(step) { + e := t.Add(duration) + if unavail, _ := checkResourceAvailability(resources, t, &e); len(unavail) == 0 { + return &t + } + } + return nil +} + +func formatOptTime(t *time.Time) string { + if t == nil { + return "open" + } + return t.Format(time.RFC3339) +} + +// GetWorkflowPeerIDs loads the workflow and returns the deduplicated list of +// creator peer IDs for all its storage and compute resources. +// These are the peers whose planners must be watched by a check stream. +func GetWorkflowPeerIDs(wfID string, request *tools.APIRequest) ([]string, error) { + obj, code, err := workflow.NewAccessor(request).LoadOne(wfID) + if code != 200 || err != nil { + msg := "could not load workflow " + wfID + if err != nil { + msg += ": " + err.Error() + } + return nil, errors.New(msg) + } + wf := obj.(*workflow.Workflow) + if wf.Graph == nil { + return nil, nil + } + seen := map[string]bool{} + var peerIDs []string + for _, item := range wf.GetGraphItems(wf.Graph.IsStorage) { + i := item + _, res := i.GetResource() + if res == nil { + continue + } + if id := res.GetCreatorID(); id != "" && !seen[id] { + seen[id] = true + peerIDs = append(peerIDs, id) + } + } + for _, item := range wf.GetGraphItems(wf.Graph.IsCompute) { + i := item + _, res := i.GetResource() + if res == nil { + continue + } + if id := res.GetCreatorID(); id != "" && !seen[id] { + seen[id] = true + peerIDs = append(peerIDs, id) + } + } + realPeersID := []string{} + access := oclib.NewRequestAdmin(oclib.LibDataEnum(tools.PEER), nil) + for _, id := range peerIDs { + if data := access.LoadOne(id); data.Data != nil { + realPeersID = append(realPeersID, data.ToPeer().PeerID) + } + } + return realPeersID, nil +} diff --git a/main.go b/main.go index 85ab59a..31768cf 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "oc-scheduler/infrastructure" _ "oc-scheduler/routers" oclib "cloud.o-forge.io/core/oc-lib" @@ -11,5 +12,8 @@ const appname = "oc-scheduler" func main() { oclib.InitAPI(appname) + go infrastructure.ListenNATS() + go infrastructure.InitSelfPlanner() + go infrastructure.ListenConfirm() beego.Run() }