diff --git a/controllers/loki.go b/controllers/loki.go index 3bc5df5..ce5d968 100644 --- a/controllers/loki.go +++ b/controllers/loki.go @@ -7,6 +7,7 @@ import ( "net/http" "net/url" "strings" + "time" "cloud.o-forge.io/core/oc-lib/config" beego "github.com/beego/beego/v2/server/web" @@ -97,53 +98,49 @@ func (o *LokiController) GetLogs() { // The server connects to Loki's /loki/api/v1/tail WebSocket endpoint and // forwards every message it receives until the client disconnects. func LogsStreamHandler(w http.ResponseWriter, r *http.Request) { + fmt.Println("LogsStreamHandler") execID := strings.TrimSuffix( strings.TrimPrefix(r.URL.Path, "/oc/logs/"), "", ) conn, err := wsUpgrader.Upgrade(w, r, nil) if err != nil { + fmt.Println("LogsStreamHandler", err) return } defer conn.Close() + /* + var query map[string]interface{} + if err := conn.ReadJSON(&query); err != nil { + fmt.Println("LogsStreamHandler ReadJSON", err) + return + } + */ - var query map[string]interface{} - if err := conn.ReadJSON(&query); err != nil { - return - } - - start := fmt.Sprintf("%v", query["start"]) - if len(start) > 10 { - start = start[:10] - } - + start := time.Now().UTC().UnixNano() labels := []string{ "workflow_execution_id=\"" + execID + "\"", } - for k, v := range query { - if k == "start" || k == "end" { - continue - } - labels = append(labels, fmt.Sprintf("%v=\"%v\"", k, v)) - } - - if len(labels) == 0 || len(start) < 10 { - _ = conn.WriteJSON(map[string]string{"error": "missing start or query labels"}) - return - } - + fmt.Println("LOKI START", start, labels) // Build Loki tail WS URL (http→ws, https→wss). lokiBase := config.GetConfig().LokiUrl lokiBase = strings.Replace(lokiBase, "https://", "wss://", 1) lokiBase = strings.Replace(lokiBase, "http://", "ws://", 1) - lokiURL := lokiBase + "/loki/api/v1/tail?" + url.Values{ "query": {"{" + strings.Join(labels, ", ") + "}"}, - "start": {start + "000000000"}, // seconds → nanoseconds + "start": {fmt.Sprintf("%v", start)}, }.Encode() - lokiConn, _, err := gorillaws.DefaultDialer.Dial(lokiURL, nil) + headers := http.Header{} + headers.Set("X-Scope-OrgID", "1") + + lokiConn, resp, err := gorillaws.DefaultDialer.Dial(lokiURL, headers) + fmt.Println("LOKI LISTEN", lokiBase, err) if err != nil { + if resp != nil { + body, _ := io.ReadAll(resp.Body) + fmt.Printf("Handshake failed: status=%d body=%s", resp.StatusCode, string(body)) + } _ = conn.WriteJSON(map[string]string{"error": "loki: " + err.Error()}) return } @@ -161,6 +158,7 @@ func LogsStreamHandler(w http.ResponseWriter, r *http.Request) { } var result map[string]interface{} if json.Unmarshal(msg, &result) == nil { + fmt.Println(result) if err := conn.WriteJSON(result); err != nil { errCh <- err return diff --git a/controllers/sheduler.go b/controllers/sheduler.go index 712ffe8..b9286a4 100644 --- a/controllers/sheduler.go +++ b/controllers/sheduler.go @@ -71,6 +71,7 @@ func realPushCheckfunc(ctx context.Context, conn *gorillaws.Conn, req *tools.API go func() { select { case <-ctx.Done(): + infrastructure.CleanupSession(executionsID, req) // Session closed before timer fired — nothing to do, CleanupSession // has already run (or will run) in the defer of CheckStreamHandler. return @@ -87,6 +88,7 @@ func realPushCheckfunc(ctx context.Context, conn *gorillaws.Conn, req *tools.API // CheckStreamHandler is the WebSocket handler for slot availability checking. // Query params: as_possible=true, preemption=true func CheckStreamHandler(w http.ResponseWriter, r *http.Request) { + fmt.Println("qksbdkqbsdkh") var err error wfID := strings.TrimSuffix( strings.TrimPrefix(r.URL.Path, "/oc/check/"), @@ -97,7 +99,7 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) { asap := q.Get("as_possible") == "true" preemption := q.Get("preemption") == "true" - user, peerID, groups := oclib.ExtractTokenInfo(*r) + user, peerID, groups := oclib.ExtractTokenInfoWs(*r) req := &tools.APIRequest{ Username: user, PeerID: peerID, @@ -205,6 +207,7 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) { workflowScheduler.UUID = executionsID _, _, _, schedErr := infrastructure.Schedule(&workflowScheduler, wfID, req) if schedErr != nil { + infrastructure.CleanupSession(executionsID, req) _ = conn.WriteJSON(map[string]interface{}{ "error": schedErr.Error(), }) @@ -213,8 +216,10 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) { fmt.Println("UPDATE CONFIRM — waiting for execution confirmation") select { case <-confirmCh: + fmt.Println("UPDATE CONFIRM done") + confirmed = true _ = conn.WriteJSON(map[string]interface{}{ - "confirmed": true, + "confirmed": true, "scheduling_id": executionsID, }) case <-time.After(60 * time.Second): @@ -225,8 +230,10 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) { case <-ctx.Done(): // client disconnected before confirmation } - confirmed = true - fmt.Println("UPDATE CONFIRM done") + if !confirmed { + infrastructure.CleanupSession(executionsID, req) + fmt.Println("UPDATE CONFIRM not done") + } return } // Detect mode change before updating local vars. @@ -249,7 +256,8 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) { !reflect.DeepEqual(updated.SelectedInstances, workflowScheduler.SelectedInstances) || !reflect.DeepEqual(updated.SelectedPartnerships, workflowScheduler.SelectedPartnerships) || !reflect.DeepEqual(updated.SelectedBuyings, workflowScheduler.SelectedBuyings) || - !reflect.DeepEqual(updated.SelectedStrategies, workflowScheduler.SelectedStrategies) + !reflect.DeepEqual(updated.SelectedStrategies, workflowScheduler.SelectedStrategies) || + !reflect.DeepEqual(updated.SelectedEmbeddedStorages, workflowScheduler.SelectedEmbeddedStorages) infrastructure.CleanupSession(executionsID, req) diff --git a/controllers/streams.go b/controllers/streams.go new file mode 100644 index 0000000..c867d0e --- /dev/null +++ b/controllers/streams.go @@ -0,0 +1,254 @@ +package controllers + +import ( + "context" + "fmt" + "net/http" + "time" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/dbs" + "cloud.o-forge.io/core/oc-lib/models/booking" + libutils "cloud.o-forge.io/core/oc-lib/models/utils" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" + "cloud.o-forge.io/core/oc-lib/tools" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +// streamMsg is the envelope pushed over every stream WebSocket. +type streamMsg struct { + Type string `json:"type"` // "snapshot" | "update" | "delete" + Data interface{} `json:"data,omitempty"` + Deleted bool `json:"deleted,omitempty"` +} + +// --------------------------------------------------------------------------- +// Booking stream +// --------------------------------------------------------------------------- + +// BookingStreamHandler opens a WebSocket that: +// 1. sends an immediate snapshot of matching bookings ("snapshot") +// 2. pushes each subsequent create/update/delete as an individual "update" or +// "delete" message. +// +// Query params (all optional): +// +// executions_id — filter to a specific scheduling session +// is_draft — "true" | "false" (omit = non-draft) +// start_date — YYYY-MM-DD (expected_start_date >=) +// end_date — YYYY-MM-DD (expected_start_date <=) +func BookingStreamHandler(w http.ResponseWriter, r *http.Request) { + user, peerID, groups := oclib.ExtractTokenInfoWs(*r) + + q := r.URL.Query() + executionID := q.Get("execution_id") + executionsID := q.Get("executions_id") + isDraftStr := q.Get("is_draft") + onlyDraft := isDraftStr == "true" + filterDraft := isDraftStr != "" // whether the caller wants draft filtering at all + startDate, _ := time.ParseInLocation("2006-01-02", q.Get("start_date"), time.UTC) + endDate, _ := time.ParseInLocation("2006-01-02", q.Get("end_date"), time.UTC) + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer conn.Close() + + matchesFilter := func(b *booking.Booking) bool { + if executionID != "" && b.GetID() != executionID { + return false + } + if executionsID != "" && b.ExecutionsID != executionsID { + return false + } + if filterDraft && b.IsDraft != onlyDraft { + return false + } + if !startDate.IsZero() && b.ExpectedStartDate.Before(startDate) { + return false + } + if !endDate.IsZero() && b.ExpectedStartDate.After(endDate) { + return false + } + return true + } + + // Build snapshot filters + andF := map[string][]dbs.Filter{} + if executionID != "" { + andF["id"] = []dbs.Filter{{Operator: dbs.EQUAL.String(), Value: executionID}} + } + if executionsID != "" { + andF["executions_id"] = []dbs.Filter{{Operator: dbs.EQUAL.String(), Value: executionsID}} + } + if !startDate.IsZero() { + andF["expected_start_date"] = append(andF["expected_start_date"], + dbs.Filter{Operator: "gte", Value: primitive.NewDateTimeFromTime(startDate)}) + } + if !endDate.IsZero() { + andF["expected_start_date"] = append(andF["expected_start_date"], + dbs.Filter{Operator: "lte", Value: primitive.NewDateTimeFromTime(endDate)}) + } + var snapshotFilter *dbs.Filters + if len(andF) > 0 { + snapshotFilter = &dbs.Filters{And: andF} + } + + snapshot := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), user, peerID, groups, nil). + Search(snapshotFilter, "", onlyDraft, 0, 10000) + if err := conn.WriteJSON(streamMsg{Type: "snapshot", Data: snapshot.Data}); err != nil { + return + } + + changeCh, unsub := libutils.SubscribeChanges(tools.BOOKING) + defer unsub() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // detect client disconnect + closeCh := make(chan struct{}) + go func() { + defer close(closeCh) + for { + if _, _, err := conn.ReadMessage(); err != nil { + return + } + } + }() + + for { + select { + case evt := <-changeCh: + b, ok := evt.Object.(*booking.Booking) + if !ok || !matchesFilter(b) { + continue + } + if evt.Deleted { + _ = conn.WriteJSON(streamMsg{Type: "delete", Data: b, Deleted: true}) + } else { + _ = conn.WriteJSON(streamMsg{Type: "update", Data: b}) + } + case <-closeCh: + return + case <-ctx.Done(): + return + } + } +} + +// --------------------------------------------------------------------------- +// WorkflowExecution stream +// --------------------------------------------------------------------------- + +// ExecutionStreamHandler opens a WebSocket that: +// 1. sends an immediate snapshot of matching executions ("snapshot") +// 2. pushes each subsequent create/update/delete as "update" or "delete". +// +// Query params (all optional): +// +// executions_id — filter to a specific scheduling session +// is_draft — "true" | "false" (omit = non-draft) +// start_date — YYYY-MM-DD (execution_date >=) +// end_date — YYYY-MM-DD (execution_date <=) +func ExecutionStreamHandler(w http.ResponseWriter, r *http.Request) { + user, peerID, groups := oclib.ExtractTokenInfoWs(*r) + + q := r.URL.Query() + executionID := q.Get("execution_id") + executionsID := q.Get("executions_id") + isDraftStr := q.Get("is_draft") + onlyDraft := isDraftStr == "true" + filterDraft := isDraftStr != "" + startDate, _ := time.ParseInLocation("2006-01-02", q.Get("start_date"), time.UTC) + endDate, _ := time.ParseInLocation("2006-01-02", q.Get("end_date"), time.UTC) + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer conn.Close() + + matchesFilter := func(e *workflow_execution.WorkflowExecution) bool { + if executionID != "" && e.GetID() != executionID { + return false + } + if executionsID != "" && e.ExecutionsID != executionsID { + return false + } + if filterDraft && e.IsDraft != onlyDraft { + return false + } + if !startDate.IsZero() && e.ExecDate.Before(startDate) { + return false + } + if !endDate.IsZero() && e.ExecDate.After(endDate) { + return false + } + return true + } + + // Build snapshot filters + andF := map[string][]dbs.Filter{} + if executionID != "" { + andF["id"] = []dbs.Filter{{Operator: dbs.EQUAL.String(), Value: executionID}} + } + if executionsID != "" { + andF["executions_id"] = []dbs.Filter{{Operator: dbs.EQUAL.String(), Value: executionsID}} + } + if !startDate.IsZero() { + andF["execution_date"] = append(andF["execution_date"], + dbs.Filter{Operator: "gte", Value: primitive.NewDateTimeFromTime(startDate)}) + } + if !endDate.IsZero() { + andF["execution_date"] = append(andF["execution_date"], + dbs.Filter{Operator: "lte", Value: primitive.NewDateTimeFromTime(endDate)}) + } + var snapshotFilter *dbs.Filters + if len(andF) > 0 { + snapshotFilter = &dbs.Filters{And: andF} + } + + snapshot := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), user, peerID, groups, nil). + Search(snapshotFilter, "", onlyDraft, 0, 10000) + if err := conn.WriteJSON(streamMsg{Type: "snapshot", Data: snapshot.Data}); err != nil { + return + } + + changeCh, unsub := libutils.SubscribeChanges(tools.WORKFLOW_EXECUTION) + defer unsub() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + closeCh := make(chan struct{}) + go func() { + defer close(closeCh) + for { + if _, _, err := conn.ReadMessage(); err != nil { + return + } + } + }() + + for { + select { + case evt := <-changeCh: + e, ok := evt.Object.(*workflow_execution.WorkflowExecution) + fmt.Println("CHANGE!", e, ok, matchesFilter(e)) + if !ok || !matchesFilter(e) { + continue + } + if evt.Deleted { + _ = conn.WriteJSON(streamMsg{Type: "delete", Data: e, Deleted: true}) + } else { + _ = conn.WriteJSON(streamMsg{Type: "update", Data: e}) + } + case <-closeCh: + return + case <-ctx.Done(): + return + } + } +} diff --git a/go.mod b/go.mod index d79ccd3..c39e32a 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-scheduler go 1.25.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260410075751-d7b2ef6ae120 + cloud.o-forge.io/core/oc-lib v0.0.0-20260429050913-47d487ea8011 github.com/beego/beego/v2 v2.3.8 github.com/google/uuid v1.6.0 github.com/robfig/cron v1.2.0 diff --git a/go.sum b/go.sum index 8485559..9236e4c 100644 --- a/go.sum +++ b/go.sum @@ -1,21 +1,11 @@ -cloud.o-forge.io/core/oc-lib v0.0.0-20260324114937-6d0c78946e8b h1:y0rppyzGIQTIyvapWwHZ8t20wMaSaMU6NoZLkMCui8w= -cloud.o-forge.io/core/oc-lib v0.0.0-20260324114937-6d0c78946e8b/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= -cloud.o-forge.io/core/oc-lib v0.0.0-20260326110203-87cf2cb12af0 h1:pQf9k+GSzNGEmrUa00jn9Zcqfp9X4N1Z5ie7InvUf3g= -cloud.o-forge.io/core/oc-lib v0.0.0-20260326110203-87cf2cb12af0/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= -cloud.o-forge.io/core/oc-lib v0.0.0-20260330082109-a4ab3285e34c h1:M0y5jI9BO7fyi1nMa2S2hhY0jDbBC+Bg56+5tp9g/vs= -cloud.o-forge.io/core/oc-lib v0.0.0-20260330082109-a4ab3285e34c/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= -cloud.o-forge.io/core/oc-lib v0.0.0-20260403121807-913d9b3dfb0a h1:H7K91js08Vyx307MW6BwQ/kqNGTrQVMaR3xvrIrc2W8= -cloud.o-forge.io/core/oc-lib v0.0.0-20260403121807-913d9b3dfb0a/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= -cloud.o-forge.io/core/oc-lib v0.0.0-20260407090336-526eaef33aa1 h1:uq6ZAHAqKVF9X45JnBA6+6Nu/nUwOgN8ezWWDz+bzaw= -cloud.o-forge.io/core/oc-lib v0.0.0-20260407090336-526eaef33aa1/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= -cloud.o-forge.io/core/oc-lib v0.0.0-20260407090927-6fe91eda875d h1:54Vl14gurwAkmZEaWZKUM5eDZfB7MF/fzWjibWLQljE= -cloud.o-forge.io/core/oc-lib v0.0.0-20260407090927-6fe91eda875d/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= -cloud.o-forge.io/core/oc-lib v0.0.0-20260408134044-284533ad1d7b h1:mOU+tc87/KEQgFmw1RcQ9E9Rbz8Q2jLOh5Cpu6po9Ww= -cloud.o-forge.io/core/oc-lib v0.0.0-20260408134044-284533ad1d7b/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= -cloud.o-forge.io/core/oc-lib v0.0.0-20260409065442-c340146c8db1 h1:xpz/wRyZ7DfPjk9L5NjIggdWcGFI6wsoq+gvOh+9NKA= -cloud.o-forge.io/core/oc-lib v0.0.0-20260409065442-c340146c8db1/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= -cloud.o-forge.io/core/oc-lib v0.0.0-20260410075751-d7b2ef6ae120 h1:CMOOpmpgkD63Gq7ukmXG6r+WlJxvpSgDRmalpWPhaIg= -cloud.o-forge.io/core/oc-lib v0.0.0-20260410075751-d7b2ef6ae120/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260126113404-85a8857938f5 h1:pl6/u6UXyFcfCU+xyQcSY8Lkby68EVWswxG2Oaq476A= +cloud.o-forge.io/core/oc-lib v0.0.0-20260126113404-85a8857938f5/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= +cloud.o-forge.io/core/oc-lib v0.0.0-20260427091650-f048b420d74d h1:jzgwgbZDASalQJSYbPF/L2L2RSP2OAbqhMB4YUXK27M= +cloud.o-forge.io/core/oc-lib v0.0.0-20260427091650-f048b420d74d/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260428065508-e3fbe7688ad5 h1:CVwlE1JgIcTAvVLCl+xeiJ54hndiTgP1XoFYS0vSvYA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260428065508-e3fbe7688ad5/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260429050913-47d487ea8011 h1:owV5pQ+mS5xDCKEcGTO+BgsyYrKjkISL8LDsmjEb/3s= +cloud.o-forge.io/core/oc-lib v0.0.0-20260429050913-47d487ea8011/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= diff --git a/infrastructure/execution/execution.go b/infrastructure/execution/execution.go index 8b74ed0..7a7bd87 100644 --- a/infrastructure/execution/execution.go +++ b/infrastructure/execution/execution.go @@ -13,10 +13,12 @@ import ( "time" oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/order" + "cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" @@ -144,13 +146,19 @@ func UpdateExecutionState(payload []byte, dt tools.DataType) { st := exec.BookingsState[data.ID] st.IsBooked = true exec.BookingsState[data.ID] = st + if config.GetConfig().IsNano { + scheduling_resources.SendBookingToMaster(schdata.Data.(*booking.Booking)) // TODO : ASK FOR RESPONSE... + } case tools.PURCHASE_RESOURCE: if exec.PurchasesState == nil { exec.PurchasesState = map[string]bool{} } exec.PurchasesState[data.ID] = true + if config.GetConfig().IsNano { + scheduling_resources.SendPurchaseToMaster(schdata.Data.(*purchase_resource.PurchaseResource)) // TODO : ASK FOR RESPONSE... + } } - + // TODO REMOVE allConfirmed := true for _, st := range exec.BookingsState { if !st.IsBooked { @@ -366,6 +374,13 @@ func HandleWorkflowStarted(resp tools.NATSResponse) { return } adminReq := &tools.APIRequest{Admin: true} + + mu := GetExecLock(evt.ExecutionID) + if mu != nil { + mu.Lock() + defer mu.Unlock() + } + res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(evt.ExecutionID) if err != nil || res == nil { return @@ -375,6 +390,22 @@ func HandleWorkflowStarted(resp tools.NATSResponse) { if evt.RealStart != nil { exec.ExecDate = *evt.RealStart } + + // Build the execution graph summary from the workflow graph on first start. + if len(exec.Graph) == 0 { + wfRes, _, wfErr := workflow.NewAccessor(adminReq).LoadOne(exec.WorkflowID) + if wfErr == nil && wfRes != nil { + exec.Graph = workflow_execution.BuildExecutionGraph(wfRes.(*workflow.Workflow).Graph) + } + } + // Advance steps whose deps are already satisfied (typically the entry nodes). + if len(exec.Graph) > 0 { + now := time.Now().UTC() + for _, id := range exec.Graph.ReadyToRun() { + exec.Graph.MarkRunning(id, now) + } + } + utils.GenericRawUpdateOne(exec, exec.GetID(), workflow_execution.NewAccessor(adminReq)) } @@ -384,6 +415,13 @@ func HandleWorkflowDone(resp tools.NATSResponse) { return } adminReq := &tools.APIRequest{Admin: true} + + mu := GetExecLock(evt.ExecutionID) + if mu != nil { + mu.Lock() + defer mu.Unlock() + } + res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(evt.ExecutionID) if err != nil || res == nil { return @@ -393,17 +431,58 @@ func HandleWorkflowDone(resp tools.NATSResponse) { if evt.RealEnd != nil { exec.EndDate = evt.RealEnd } - // All bookings are no longer reserved and are done + // Release all booking reservations (workflow is over) without overwriting + // IsDone: individual step events already set the authoritative done state + // for each booking. Resetting everything here would lose that granularity. if exec.BookingsState == nil { exec.BookingsState = map[string]workflow_execution.BookingState{} } - for id := range exec.BookingsState { - exec.BookingsState[id] = workflow_execution.BookingState{IsBooked: false, IsDone: true} + for id, st := range exec.BookingsState { + st.IsBooked = false + exec.BookingsState[id] = st + } + // Graph items that already reached success/failure keep their state. + // Items still in running when the execution terminates receive the terminal + // state (the step was active but no step_done event arrived before the + // workflow finished — treat it as the execution outcome). + terminalSuccess := enum.BookingStatus(evt.State) == enum.SUCCESS + nowGraph := time.Now().UTC() + for itemID, item := range exec.Graph { + if item.State == workflow_execution.StepRunning { + exec.Graph.MarkDone(itemID, terminalSuccess, nowGraph) + } } utils.GenericRawUpdateOne(exec, exec.GetID(), workflow_execution.NewAccessor(adminReq)) + + // Build a set of booking IDs already covered by per-step events so we only + // fall back for bookings the orchestrator never emitted a step for (e.g. storage). + coveredByStep := map[string]bool{} for _, step := range evt.Steps { applyStepToBooking(step, adminReq) + coveredByStep[step.BookingID] = true } + + // Propagate the execution's terminal state to any booking that was not + // updated by a step event and is not already in a terminal state. + terminalState := enum.BookingStatus(evt.State) + now := time.Now().UTC() + for id := range exec.BookingsState { + if coveredByStep[id] { + continue + } + res, _, err := booking.NewAccessor(adminReq).LoadOne(id) + if err != nil || res == nil { + continue + } + bk := res.(*booking.Booking) + if terminalExecStates[bk.State] { + continue + } + bk.State = terminalState + bk.RealEndDate = &now + utils.GenericRawUpdateOne(bk, bk.GetID(), booking.NewAccessor(adminReq)) + } + self, err := oclib.GetMySelf() if err == nil && self != nil { go planner.GetPlannerService().RefreshSelf(self.PeerID, adminReq) @@ -416,6 +495,8 @@ func HandleWorkflowStepDone(resp tools.NATSResponse) { return } adminReq := &tools.APIRequest{Admin: true} + + // Update the booking itself first (no exec lock needed for the booking doc). res, _, err := booking.NewAccessor(adminReq).LoadOne(evt.BookingID) if err != nil || res == nil { return @@ -430,20 +511,56 @@ func HandleWorkflowStepDone(resp tools.NATSResponse) { } utils.GenericRawUpdateOne(bk, bk.GetID(), booking.NewAccessor(adminReq)) - // Update BookingsState in the parent WorkflowExecution: resource released, step done - execRes, _, execErr := workflow_execution.NewAccessor(adminReq).LoadOne(bk.ExecutionID) - if execErr == nil && execRes != nil { - exec := execRes.(*workflow_execution.WorkflowExecution) - if exec.BookingsState == nil { - exec.BookingsState = map[string]workflow_execution.BookingState{} - } - st := exec.BookingsState[evt.BookingID] - st.IsBooked = false - st.IsDone = true - exec.BookingsState[evt.BookingID] = st - utils.GenericRawUpdateOne(exec, exec.GetID(), workflow_execution.NewAccessor(adminReq)) + // Update the parent WorkflowExecution under its exec lock to avoid races + // between concurrent WORKFLOW_STEP_DONE_EVENT deliveries. + execID := bk.ExecutionID + mu := GetExecLock(execID) + if mu != nil { + mu.Lock() + defer mu.Unlock() } + execRes, _, execErr := workflow_execution.NewAccessor(adminReq).LoadOne(execID) + if execErr != nil || execRes == nil { + return + } + exec := execRes.(*workflow_execution.WorkflowExecution) + + // BookingsState: resource released, step done. + if exec.BookingsState == nil { + exec.BookingsState = map[string]workflow_execution.BookingState{} + } + st := exec.BookingsState[evt.BookingID] + st.IsBooked = false + st.IsDone = true + exec.BookingsState[evt.BookingID] = st + + // Advance the execution graph. + if len(exec.Graph) > 0 { + itemID := findItemIDByBookingID(exec, evt.BookingID) + if itemID != "" { + success := enum.BookingStatus(evt.State) == enum.SUCCESS + end := time.Now().UTC() + if evt.RealEnd != nil { + end = *evt.RealEnd + } + exec.Graph.MarkDone(itemID, success, end) + + // Only advance when the step succeeded; a failure leaves dependents waiting. + if success { + start := end + if evt.RealStart != nil { + start = *evt.RealStart + } + for _, nextID := range exec.Graph.ReadyToRun() { + exec.Graph.MarkRunning(nextID, start) + } + } + } + } + + utils.GenericRawUpdateOne(exec, exec.GetID(), workflow_execution.NewAccessor(adminReq)) + switch bk.State { case enum.SUCCESS, enum.FAILURE, enum.FORGOTTEN, enum.CANCELLED: self, err := oclib.GetMySelf() @@ -453,6 +570,21 @@ func HandleWorkflowStepDone(resp tools.NATSResponse) { } } +// findItemIDByBookingID reverse-looks up a booking ID in PeerBookByGraph. +// PeerBookByGraph layout: map[peerID]map[itemID][]bookingID +func findItemIDByBookingID(exec *workflow_execution.WorkflowExecution, bookingID string) string { + for _, byItem := range exec.PeerBookByGraph { + for itemID, bookingIDs := range byItem { + for _, bkID := range bookingIDs { + if bkID == bookingID { + return itemID + } + } + } + } + return "" +} + func applyStepToBooking(step tools.StepMetric, adminReq *tools.APIRequest) { res, _, err := booking.NewAccessor(adminReq).LoadOne(step.BookingID) if err != nil || res == nil { diff --git a/infrastructure/nats/nats_handlers.go b/infrastructure/nats/nats_handlers.go index 765164a..faf7fe6 100644 --- a/infrastructure/nats/nats_handlers.go +++ b/infrastructure/nats/nats_handlers.go @@ -6,9 +6,13 @@ import ( "oc-scheduler/infrastructure/planner" "oc-scheduler/infrastructure/scheduling_resources" + oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/models/booking" + "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource" + libutils "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/workflow" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" ) @@ -49,6 +53,13 @@ func handleRemoveResource(resp tools.NATSResponse) { return } scheduling_resources.GetService().HandleRemovePurchase(p, adminReq) + case tools.WORKFLOW_EXECUTION: + var p scheduling_resources.RemoveResourcePayload + if err := json.Unmarshal(resp.Payload, &p); err != nil || p.ID == "" { + return + } + // DeleteOne calls GenericDeleteOne internally which fires NotifyChange. + workflow_execution.NewAccessor(adminReq).DeleteOne(p.ID) } } @@ -68,6 +79,21 @@ func handleCreateResource(resp tools.NATSResponse) { if err := json.Unmarshal(resp.Payload, &bk); err != nil { return } + if bk.FromNano != "" { + access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) + pp := access.LoadOne(bk.FromNano) + if p := pp.ToPeer(); p == nil || p.Relation == peer.NANO { + return + } + access = oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.BOOKING), nil) + d := access.LoadOne(bk.GetID()) + if d.Data == nil { + access.StoreOne(bk.Serialize(&bk)) + } else { + access.UpdateOne(bk.Serialize(&bk), bk.GetID()) + } + return + } needsConsiders := scheduling_resources.GetService().HandleCreateBooking(&bk, adminReq) if needsConsiders { payload, _ := json.Marshal(execution.ConsidersPayload{ID: bk.GetID()}) @@ -78,10 +104,43 @@ func handleCreateResource(resp tools.NATSResponse) { if err := json.Unmarshal(resp.Payload, &pr); err != nil { return } + if pr.FromNano != "" { + access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) + pp := access.LoadOne(pr.FromNano) + if p := pp.ToPeer(); p == nil || p.Relation == peer.NANO { + return + } + access = oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PURCHASE_RESOURCE), nil) + d := access.LoadOne(pr.GetID()) + if d.Data == nil { + access.StoreOne(pr.Serialize(&pr)) + } else { + access.UpdateOne(pr.Serialize(&pr), pr.GetID()) + } + return + } needsConsiders := scheduling_resources.GetService().HandleCreatePurchase(&pr, adminReq) if needsConsiders { payload, _ := json.Marshal(execution.ConsidersPayload{ID: pr.GetID()}) execution.UpdateExecutionState(payload, tools.PURCHASE_RESOURCE) } + case tools.WORKFLOW_EXECUTION: + // Only propagate the state change onto an execution that oc-scheduler + // already owns. Never create executions from an external NATS event: + // creation is strictly oc-scheduler's responsibility (via the session + // flow), and blindly calling StoreOne here would trigger + // StoreDraftDefault (IsDraft=true, State=DRAFT), polluting the name- + // uniqueness index and breaking the check stream's first draft creation. + var update workflow_execution.WorkflowExecution + if err := json.Unmarshal(resp.Payload, &update); err != nil || update.GetID() == "" { + return + } + res, _, loadErr := workflow_execution.NewAccessor(adminReq).LoadOne(update.GetID()) + if loadErr != nil || res == nil { + return + } + exec := res.(*workflow_execution.WorkflowExecution) + exec.State = update.State + libutils.GenericRawUpdateOne(exec, exec.GetID(), workflow_execution.NewAccessor(adminReq)) } } diff --git a/infrastructure/planner/planner.go b/infrastructure/planner/planner.go index 84ad168..08b3b43 100644 --- a/infrastructure/planner/planner.go +++ b/infrastructure/planner/planner.go @@ -10,6 +10,7 @@ import ( oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/models/booking/planner" + "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workflow/graph" "cloud.o-forge.io/core/oc-lib/tools" @@ -509,6 +510,126 @@ func (s *PlannerService) NotifyWorkflow(wfID string) { utils.Notify(&s.WorkflowSubMu, s.WorkflowSubs, wfID, struct{}{}) } +// FillForPeers fetches and waits for planners for an explicit list of peer PIDs. +// Same mechanic as Fill but decoupled from the BookingResource map — used for +// dynamic resource resolution where the peer set is not part of checkables. +func (s *PlannerService) FillForPeers(peerPIDs []string, wfID string) { + if len(peerPIDs) == 0 { + return + } + const plannerFetchTimeout = 5 * time.Second + tmpSession := "check-dynamic-" + wfID + + s.Mu.Lock() + myself, _ := oclib.GetMySelf() + for _, peerID := range peerPIDs { + entry := s.Cache[peerID] + if entry == nil { + entry = &plannerEntry{} + s.Cache[peerID] = entry + s.AddedAt[peerID] = time.Now().UTC() + go s.EvictAfter(peerID, plannerTTL) + } + entry.Planner = nil + if !entry.Refreshing { + entry.Refreshing = true + entry.RefreshOwner = tmpSession + } + } + s.Mu.Unlock() + defer s.ReleaseRefreshOwnership(peerPIDs, tmpSession) + + for _, peerID := range peerPIDs { + if myself != nil && myself.PeerID == peerID { + go s.RefreshSelf(peerID, &tools.APIRequest{Admin: true}) + } else { + payload, _ := json.Marshal(map[string]any{"peer_id": peerID}) + utils.Propalgate(peerID, tools.PropalgationMessage{ + Action: tools.PB_PLANNER, + Payload: payload, + }) + } + } + + deadline := time.Now().Add(plannerFetchTimeout) + remaining := slices.Clone(peerPIDs) + for len(remaining) > 0 { + wait := time.Until(deadline) + if wait <= 0 { + return + } + ch, cancelSub := SubscribeUpdates(s.Subs, &s.SubMu, remaining...) + select { + case <-ch: + case <-time.After(wait): + cancelSub() + return + } + cancelSub() + remaining = remaining[:0] + s.Mu.RLock() + for _, pid := range peerPIDs { + if entry := s.Cache[pid]; entry == nil || entry.Planner == nil { + remaining = append(remaining, pid) + } + } + s.Mu.RUnlock() + } +} + +// FillDynamic resolves all peer DIDs across the given dynamic resources to PIDs, +// fetches their planners via FillForPeers, and returns the DID→PID mapping for use +// in ResolveDynamic. All dynamics are batched into a single planner fetch round. +func (s *PlannerService) FillDynamic(dynamics []*resources.DynamicResource, wfID string) map[string]string { + didToPID := map[string]string{} + peerPIDs := []string{} + access := oclib.NewRequestAdmin(oclib.LibDataEnum(tools.PEER), nil) + for _, d := range dynamics { + for _, did := range d.PeerIds { + if did == "" || didToPID[did] != "" { + continue + } + if data := access.LoadOne(did); data.Data != nil { + if p := data.ToPeer(); p != nil { + didToPID[did] = p.PeerID + peerPIDs = append(peerPIDs, p.PeerID) + } + } + } + } + s.FillForPeers(peerPIDs, wfID) + return didToPID +} + +// ResolveDynamic walks the sorted instance list of a DynamicResource via +// GetSelectedInstance and returns true as soon as it finds an instance whose +// peer's planner confirms availability for [start, end]. +// d.SelectedIndex is updated to the elected instance on success. +// Peers that did not respond (no planner in cache) are skipped. +func (s *PlannerService) ResolveDynamic(d *resources.DynamicResource, didToPID map[string]string, start time.Time, end *time.Time) bool { + for { + inst := d.GetSelectedInstance(nil) + if inst == nil { + return false // exhausted all candidates + } + did := d.PeerIds[d.SelectedIndex] + resourceID := d.ResourceIds[d.SelectedIndex] + pid, ok := didToPID[did] + if !ok { + continue // peer DID could not be resolved + } + s.Mu.RLock() + entry := s.Cache[pid] + s.Mu.RUnlock() + if entry == nil || entry.Planner == nil { + continue // peer did not respond in time + } + if s.checkInstance(entry.Planner, resourceID, inst.GetID(), start, end) { + return true // d.SelectedIndex points to the elected instance + } + } +} + // checkInstance checks availability for the specific instance resolved by the // scheduler. When instanceID is empty (no instance selected / none resolvable), // it falls back to checking all instances known in the planner and returns true diff --git a/infrastructure/scheduler/scheduler.go b/infrastructure/scheduler/scheduler.go index 36ec71e..fe37445 100644 --- a/infrastructure/scheduler/scheduler.go +++ b/infrastructure/scheduler/scheduler.go @@ -13,6 +13,7 @@ import ( "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/pricing" + "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" @@ -40,12 +41,14 @@ type WorkflowSchedule struct { DurationS float64 `json:"duration_s" default:"-1"` Cron string `json:"cron,omitempty"` - BookingMode booking.BookingMode `json:"booking_mode,omitempty"` - SelectedInstances workflow.ConfigItem `json:"selected_instances"` - SelectedPartnerships workflow.ConfigItem `json:"selected_partnerships"` - SelectedBuyings workflow.ConfigItem `json:"selected_buyings"` - SelectedStrategies workflow.ConfigItem `json:"selected_strategies"` - SelectedBillingStrategy pricing.BillingStrategy `json:"selected_billing_strategy"` + BookingMode booking.BookingMode `json:"booking_mode,omitempty"` + SelectedInstances workflow.ConfigItem `json:"selected_instances"` + SelectedPartnerships workflow.ConfigItem `json:"selected_partnerships"` + SelectedBuyings workflow.ConfigItem `json:"selected_buyings"` + SelectedStrategies workflow.ConfigItem `json:"selected_strategies"` + SelectedPaymentType workflow.ConfigItem `json:"selected_payment_type"` + SelectedBillingStrategy pricing.BillingStrategy `json:"selected_billing_strategy"` + SelectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty"` // Confirm, when true, triggers Schedule() to confirm the drafts held by this session. Confirm bool `json:"confirm,omitempty"` @@ -119,6 +122,28 @@ func (ws *WorkflowSchedule) Check(wfID string, asap bool, preemption bool, reque checkables := infUtils.CollectBookingResources(wf, ws.SelectedInstances) start, end, available, preemptible, warnings := planner.GetPlannerService().FindDate(wfID, checkables, start, end, preemption, asap) + // Dynamic resources are resolved separately: their peer planners are fetched + // and the sorted instance list is walked until an available one is found. + var dynamics []*resources.DynamicResource + for _, item := range wf.GetGraphItems(wf.Graph.IsDynamic) { + _, res := item.GetResource() + if res == nil { + continue + } + d := res.(*resources.DynamicResource) + d.SetAllowedInstances(request) + dynamics = append(dynamics, d) + } + if len(dynamics) > 0 { + didToPID := planner.GetPlannerService().FillDynamic(dynamics, wfID) + for _, d := range dynamics { + if !planner.GetPlannerService().ResolveDynamic(d, didToPID, start, end) { + available = false + warnings = append(warnings, "no available instance for dynamic resource "+d.GetName()) + } + } + } + return &CheckResult{ Start: start, End: end, @@ -197,12 +222,17 @@ func (ws *WorkflowSchedule) GenerateExecutions(wf *workflow.Workflow, isPreempti UUID: uuid.New().String(), Name: wf.Name + " execution " + date.Start.Format("2006-01-02 15:04"), }, - Priority: 1, - ExecutionsID: ws.UUID, - ExecDate: date.Start, - EndDate: date.End, - State: enum.DRAFT, - WorkflowID: wf.GetID(), + SelectedInstances: ws.SelectedInstances, + SelectedPartnerships: ws.SelectedPartnerships, + SelectedBuyings: ws.SelectedBuyings, + SelectedStrategies: ws.SelectedStrategies, + SelectedEmbeddedStorages: ws.SelectedEmbeddedStorages, + Priority: 1, + ExecutionsID: ws.UUID, + ExecDate: date.Start, + EndDate: date.End, + State: enum.DRAFT, + WorkflowID: wf.GetID(), } if ws.BookingMode != booking.PLANNED { obj.Priority = 0 diff --git a/infrastructure/scheduling_resources/service.go b/infrastructure/scheduling_resources/service.go index 396b49f..47270f2 100644 --- a/infrastructure/scheduling_resources/service.go +++ b/infrastructure/scheduling_resources/service.go @@ -10,6 +10,8 @@ import ( "oc-scheduler/infrastructure/planner" oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/config" + "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/peer" @@ -221,6 +223,9 @@ func (s *SchedulingResourcesService) Delete(dt tools.DataType, bk SchedulerObjec if dt == tools.BOOKING { planner.GetPlannerService().RefreshSelf(selfID.PeerID, request) } + if (dt == tools.BOOKING || dt == tools.PURCHASE_RESOURCE) && config.GetConfig().IsNano { + SendRemoveToMaster(bk, dt) + } return } EmitNATSRemove(bk.GetID(), bk.GetPeerSession(), bk.GetExecutionsId(), dt) @@ -299,8 +304,14 @@ func DraftTimeout(id string, dt tools.DataType) { switch dt { case tools.BOOKING: booking.NewAccessor(adminReq).DeleteOne(id) + if config.GetConfig().IsNano { + SendRemoveToMaster(res, dt) + } case tools.PURCHASE_RESOURCE: purchase_resource.NewAccessor(adminReq).DeleteOne(id) + if config.GetConfig().IsNano { + SendRemoveToMaster(res, dt) + } } fmt.Printf("DraftTimeout: %s %s deleted (still draft after 10 min)\n", dt.String(), id) } @@ -316,7 +327,6 @@ func (s *SchedulingResourcesService) HandleCreateBooking(bk *booking.Booking, ad if self == nil { return false } - if existing, _, loadErr := booking.NewAccessor(adminReq).LoadOne(bk.GetID()); loadErr == nil && existing != nil { prev := existing.(*booking.Booking) if prev.SchedulerPeerID != bk.SchedulerPeerID || prev.ExecutionsID != bk.ExecutionsID { @@ -329,13 +339,20 @@ func (s *SchedulingResourcesService) HandleCreateBooking(bk *booking.Booking, ad if !bk.IsDraft && !prev.ExpectedStartDate.IsZero() && prev.ExpectedStartDate.Before(time.Now().UTC()) { fmt.Println("HandleCreateBooking: expired, deleting", bk.GetID()) booking.NewAccessor(adminReq).DeleteOne(bk.GetID()) + if config.GetConfig().IsNano { + SendRemoveToMaster(bk, tools.BOOKING) + } return false } if _, _, err := utils.GenericRawUpdateOne(bk, bk.GetID(), booking.NewAccessor(adminReq)); err != nil { fmt.Println("HandleCreateBooking: update failed:", err) return false } + if config.GetConfig().IsNano { + SendBookingToMaster(bk) + } planner.GetPlannerService().RefreshSelf(self.PeerID, adminReq) + return !bk.IsDraft } @@ -348,6 +365,7 @@ func (s *SchedulingResourcesService) HandleCreateBooking(bk *booking.Booking, ad fmt.Println("HandleCreateBooking: conflicts with local planner, discarding") return false } + bk.IsDraft = true stored, _, err := booking.NewAccessor(adminReq).StoreOne(bk) if err != nil { @@ -355,11 +373,126 @@ func (s *SchedulingResourcesService) HandleCreateBooking(bk *booking.Booking, ad return false } storedID := stored.GetID() + planner.GetPlannerService().RefreshSelf(self.PeerID, adminReq) + time.AfterFunc(10*time.Minute, func() { DraftTimeout(storedID, tools.BOOKING) }) + + if config.GetConfig().IsNano { + SendBookingToMaster(bk) // TODO : ASK FOR RESPONSE... + } return false } +func SendBookingToMaster(booking *booking.Booking) { + self, _ := oclib.GetMySelf() + if booking.GetCreatorID() != self.GetID() { + return + } + d := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil).Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "relation": {{Operator: dbs.EQUAL.String(), Value: peer.MASTER}}, + }, + }, "", false, 0, 1) + for _, dd := range d.Data { + booking.IsDraft = false + booking.FromNano = self.GetID() + m := map[string]interface{}{} + i, err := json.Marshal(m) + if err == nil { + json.Unmarshal(i, &m) + m["peer_id"] = dd.(*peer.Peer).PeerID + if payloadd, err := json.Marshal(m); err == nil { + b, err := json.Marshal(&tools.PropalgationMessage{ + DataType: tools.BOOKING.EnumIndex(), + Action: tools.PB_CREATE, + Payload: payloadd, + }) + if err == nil { + tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ + FromApp: "oc-scheduler", + Datatype: tools.BOOKING, + Method: int(tools.PROPALGATION_EVENT), + Payload: b, + }) + } + } + } + } +} + +func SendRemoveToMaster(obj utils.DBObject, dt tools.DataType) { + self, _ := oclib.GetMySelf() + if obj.GetCreatorID() != self.GetID() { + return + } + d := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil).Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "relation": {{Operator: dbs.EQUAL.String(), Value: peer.MASTER}}, + }, + }, "", false, 0, 1) + for _, dd := range d.Data { + m := map[string]interface{}{} + i, err := json.Marshal(m) + if err == nil { + json.Unmarshal(i, &m) + m["peer_id"] = dd.(*peer.Peer).PeerID + if payloadd, err := json.Marshal(m); err == nil { + b, err := json.Marshal(&tools.PropalgationMessage{ + DataType: dt.EnumIndex(), + Action: tools.PB_DELETE, + Payload: payloadd, + }) + if err == nil { + tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ + FromApp: "oc-scheduler", + Datatype: dt, + Method: int(tools.PROPALGATION_EVENT), + Payload: b, + }) + } + } + } + } +} + +func SendPurchaseToMaster(purchase *purchase_resource.PurchaseResource) { + self, _ := oclib.GetMySelf() + if purchase.GetCreatorID() != self.GetID() { + return + } + d := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil).Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "relation": {{Operator: dbs.EQUAL.String(), Value: peer.MASTER}}, + }, + }, "", false, 0, 1) + for _, dd := range d.Data { + purchase.IsDraft = false + purchase.FromNano = self.GetID() + m := map[string]interface{}{} + i, err := json.Marshal(m) + if err == nil { + json.Unmarshal(i, &m) + m["peer_id"] = dd.(*peer.Peer).PeerID + if payloadd, err := json.Marshal(m); err == nil { + b, err := json.Marshal(&tools.PropalgationMessage{ + DataType: tools.PURCHASE_RESOURCE.EnumIndex(), + Action: tools.PB_CREATE, + Payload: payloadd, + }) + if err == nil { + tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ + FromApp: "oc-scheduler", + Datatype: tools.PURCHASE_RESOURCE, + Method: int(tools.PROPALGATION_EVENT), + Payload: b, + }) + } + } + } + } +} + // HandleCreatePurchase processes an incoming purchase from NATS. // Returns true if considers must be triggered. func (s *SchedulingResourcesService) HandleCreatePurchase(pr *purchase_resource.PurchaseResource, adminReq *tools.APIRequest) bool { @@ -393,6 +526,9 @@ func (s *SchedulingResourcesService) HandleCreatePurchase(pr *purchase_resource. fmt.Println("HandleCreatePurchase: could not store:", err) return false } + if config.GetConfig().IsNano { + SendPurchaseToMaster(pr) // TODO : ASK FOR RESPONSE... + } storedID := stored.GetID() time.AfterFunc(10*time.Minute, func() { DraftTimeout(storedID, tools.PURCHASE_RESOURCE) }) return false @@ -405,14 +541,17 @@ func (s *SchedulingResourcesService) HandleRemoveBooking(p RemoveResourcePayload return } existing := res.(*booking.Booking) - if existing.SchedulerPeerID != p.SchedulerPeerID || existing.ExecutionsID != p.ExecutionsID { + if existing.SchedulerPeerID != p.SchedulerPeerID || existing.ExecutionsID != p.ExecutionsID || existing.IsDraft { fmt.Println("HandleRemoveBooking: auth mismatch, ignoring", p.ID) return } - booking.NewAccessor(adminReq).DeleteOne(p.ID) + d, _, _ := booking.NewAccessor(adminReq).DeleteOne(p.ID) if self := s.Self(); self != nil { planner.GetPlannerService().RefreshSelf(self.PeerID, adminReq) } + if config.GetConfig().IsNano && d != nil { + SendRemoveToMaster(d, tools.BOOKING) // TODO : ASK FOR RESPONSE... + } } // HandleRemovePurchase verifies auth and deletes the purchase. @@ -422,11 +561,14 @@ func (s *SchedulingResourcesService) HandleRemovePurchase(p RemoveResourcePayloa return } existing := res.(*purchase_resource.PurchaseResource) - if existing.SchedulerPeerID != p.SchedulerPeerID || existing.ExecutionsID != p.ExecutionsID { + if existing.SchedulerPeerID != p.SchedulerPeerID || existing.ExecutionsID != p.ExecutionsID || existing.IsDraft { fmt.Println("HandleRemovePurchase: auth mismatch, ignoring", p.ID) return } - purchase_resource.NewAccessor(adminReq).DeleteOne(p.ID) + d, _, _ := purchase_resource.NewAccessor(adminReq).DeleteOne(p.ID) + if config.GetConfig().IsNano && d != nil { + SendRemoveToMaster(d, tools.PURCHASE_RESOURCE) // TODO : ASK FOR RESPONSE... + } } // --------------------------------------------------------------------------- diff --git a/infrastructure/session/session.go b/infrastructure/session/session.go index 3ee9720..f113365 100644 --- a/infrastructure/session/session.go +++ b/infrastructure/session/session.go @@ -30,6 +30,48 @@ func NewSessionExecutionsService(sessionID string) *SessionExecutionsService { return &SessionExecutionsService{ExecutionsSessionID: sessionID} } +// --------------------------------------------------------------------------- +// Remote resource registry +// +// Bookings and purchases for remote peers are sent via NATS and stored only on +// the remote peer — they never appear in local MongoDB. CleanupSession would +// therefore miss them entirely. We keep a package-level in-memory registry +// (executionsID → list) that is populated when PropagateCreate routes to a +// remote peer, and consumed (cleared) by CleanupSession so it can emit the +// corresponding REMOVE_RESOURCE NATS messages. +// --------------------------------------------------------------------------- + +type remoteResourceEntry struct { + ID string + SchedulerPeerID string + ExecutionsID string + DT tools.DataType +} + +var remoteRegistryMu sync.Mutex +var remoteRegistry = map[string][]remoteResourceEntry{} + +func trackRemoteResource(executionsID, id, schedulerPeerID string, dt tools.DataType) { + if id == "" { + return + } + remoteRegistryMu.Lock() + remoteRegistry[executionsID] = append(remoteRegistry[executionsID], remoteResourceEntry{ + ID: id, SchedulerPeerID: schedulerPeerID, ExecutionsID: executionsID, DT: dt, + }) + remoteRegistryMu.Unlock() +} + +// consumeTrackedRemotes atomically returns and removes all tracked remote +// resources for the given session. +func consumeTrackedRemotes(executionsID string) []remoteResourceEntry { + remoteRegistryMu.Lock() + defer remoteRegistryMu.Unlock() + entries := remoteRegistry[executionsID] + delete(remoteRegistry, executionsID) + return entries +} + // --------------------------------------------------------------------------- // DB helpers // --------------------------------------------------------------------------- @@ -164,6 +206,11 @@ func (s *SessionExecutionsService) upsertDrafts( scheduling_resources.GetService().PropagateCreate( scheduling_resources.FromSchedulerDBObject(dt, bk), bk.GetDestPeer(), dt, request, errCh) <-errCh + // If this booking/purchase was routed to a remote peer (not stored in + // local DB), register it so CleanupSession can emit REMOVE_RESOURCE later. + if self != nil && bk.GetDestPeer() != self.GetID() { + trackRemoteResource(s.ExecutionsSessionID, bk.GetID(), bk.GetPeerSession(), dt) + } } } } @@ -184,6 +231,14 @@ func (s *SessionExecutionsService) CleanupSession(request *tools.APIRequest) { } } + // Emit NATS REMOVE_RESOURCE for bookings/purchases that were routed to + // remote peers and therefore never stored in local DB. loadSession above + // cannot find them, so we rely on the in-memory registry populated by + // upsertDrafts when PropagateCreate routes to a non-self peer. + for _, entry := range consumeTrackedRemotes(s.ExecutionsSessionID) { + scheduling_resources.EmitNATSRemove(entry.ID, entry.SchedulerPeerID, entry.ExecutionsID, entry.DT) + } + for _, exec := range s.LoadSessionExecs() { execution.UnregisterExecLock(exec.GetID()) workflow_execution.NewAccessor(adminReq).DeleteOne(exec.GetID()) @@ -227,6 +282,7 @@ func GenerateOrder( } func (s *SessionExecutionsService) ConfirmSession(request *tools.APIRequest) error { + adminReq := &tools.APIRequest{Admin: true} for _, dt := range []tools.DataType{tools.BOOKING, tools.PURCHASE_RESOURCE} { for _, bk := range s.loadSession(dt) { bk.SetIsDraft(false) @@ -239,5 +295,9 @@ func (s *SessionExecutionsService) ConfirmSession(request *tools.APIRequest) err } } } + for _, exec := range s.LoadSessionExecs() { + exec.State = enum.SCHEDULED + utils.GenericRawUpdateOne(exec, exec.GetID(), workflow_execution.NewAccessor(adminReq)) + } return nil } diff --git a/infrastructure/utils/utils.go b/infrastructure/utils/utils.go index 047b38a..5a58ac4 100644 --- a/infrastructure/utils/utils.go +++ b/infrastructure/utils/utils.go @@ -60,6 +60,10 @@ func CollectBookingResources(wf *workflow.Workflow, selectedInstances workflow.C if inst := r.GetSelectedInstance(idx); inst != nil { return inst.GetID() } + case *resources.ServiceResource: + if inst := r.GetSelectedInstance(idx); inst != nil { + return inst.GetID() + } } return "" } @@ -106,6 +110,39 @@ func CollectBookingResources(wf *workflow.Workflow, selectedInstances workflow.C } } + // HOSTED services: capacity is capped by MaxConcurrent on the LiveService. + // The peer to watch is the creator (who operates the service). + // DEPLOYMENT services are covered through their linked compute unit. + for _, item := range wf.GetGraphItems(wf.Graph.IsService) { + _, res := item.GetResource() + if res == nil { + continue + } + svc := res.(*resources.ServiceResource) + idx := selectedInstances.Get(svc.GetID()) + inst := svc.GetSelectedInstance(idx) + if inst == nil { + continue + } + if inst.(*resources.ServiceInstance).Mode != resources.HOSTED { + continue + } + id := svc.GetID() + if seen[id] { + continue + } + pid := resolvePID(svc.GetCreatorID()) + if pid == "" { + continue + } + seen[id] = true + result[pid] = BookingResource{ + ID: id, + PeerPID: pid, + InstanceID: resolveInstanceID(res), + } + } + return result } @@ -147,6 +184,35 @@ func GetWorkflowPeerIDs(wfID string, request *tools.APIRequest) ([]string, error peerIDs = append(peerIDs, id) } } + for _, item := range wf.GetGraphItems(wf.Graph.IsService) { + _, res := item.GetResource() + if res == nil { + continue + } + svc := res.(*resources.ServiceResource) + if len(svc.Instances) == 0 || svc.Instances[0].Mode != resources.HOSTED { + continue + } + if id := svc.GetCreatorID(); id != "" && !seen[id] { + seen[id] = true + peerIDs = append(peerIDs, id) + } + } + for _, item := range wf.GetGraphItems(wf.Graph.IsDynamic) { + _, res := item.GetResource() + if res == nil { + continue + } + d := res.(*resources.DynamicResource) + d.SetAllowedInstances(request) + for _, creatorID := range d.PeerIds { + if creatorID != "" && !seen[creatorID] { + seen[creatorID] = true + peerIDs = append(peerIDs, creatorID) + } + } + } + realPeersID := []string{} access := oclib.NewRequestAdmin(oclib.LibDataEnum(tools.PEER), nil) for _, id := range peerIDs { diff --git a/main.go b/main.go index 7cea505..fbf5ed2 100644 --- a/main.go +++ b/main.go @@ -8,13 +8,13 @@ import ( _ "oc-scheduler/routers" oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/config" beego "github.com/beego/beego/v2/server/web" ) const appname = "oc-scheduler" func main() { - o := oclib.GetConfLoader(appname) conf.GetConfig().PrepLeadSeconds = o.GetIntDefault("PREP_LEAD_SECONDS", 120) conf.GetConfig().KubeHost = o.GetStringDefault("KUBERNETES_SERVICE_HOST", "kubernetes.default.svc.cluster.local") @@ -36,5 +36,7 @@ func main() { go infrastructure.RecoverDraftExecutions() go infrastructure.WatchExecutions() - beego.Run() + if config.GetConfig().IsApi { + beego.Run() + } } diff --git a/oc-scheduler b/oc-scheduler new file mode 100755 index 0000000..a3ac0a5 Binary files /dev/null and b/oc-scheduler differ diff --git a/prompt.yml b/prompt.yml new file mode 100644 index 0000000..5846ff0 --- /dev/null +++ b/prompt.yml @@ -0,0 +1,9 @@ +créer une api inspiré de celle courante : oc-scheduler, nommé oc-billing dans le folder .. +cette api dispose en controlleur seulement version.go, adapté. + +son infrastructure n'est pas du tout similaire à oc-scheduler considre là vide, lors de la copie. +Ensuite voici se qui est entendu. oc-billing communique par nats avec oc-scheduler. + +Lors d'un confirm oc-scheduler doit émettre sur nats sur un channel dédié, dans le message doit être inclus : + le workflow_scheduler validé. +A la reception oc-billing, celons le mode de payment attendu par resource (dans scheduler), drafté la bill à réglé "MAINTENANT". diff --git a/routers/commentsRouter.go b/routers/commentsRouter.go index 4f443cc..8234650 100644 --- a/routers/commentsRouter.go +++ b/routers/commentsRouter.go @@ -106,6 +106,15 @@ func init() { Filters: nil, Params: nil}) + beego.GlobalControllerRouter["oc-scheduler/controllers:WorkflowExecutionController"] = append(beego.GlobalControllerRouter["oc-scheduler/controllers:WorkflowExecutionController"], + beego.ControllerComments{ + Method: "Delete", + Router: `/:id`, + AllowHTTPMethods: []string{"delete"}, + MethodParams: param.Make(), + Filters: nil, + Params: nil}) + beego.GlobalControllerRouter["oc-scheduler/controllers:WorkflowExecutionController"] = append(beego.GlobalControllerRouter["oc-scheduler/controllers:WorkflowExecutionController"], beego.ControllerComments{ Method: "Search", diff --git a/routers/router.go b/routers/router.go index 83102d4..2997b0a 100644 --- a/routers/router.go +++ b/routers/router.go @@ -52,4 +52,6 @@ func init() { // spurious WriteHeader that prevents the 101 Switching Protocols upgrade. beego.Handler("/oc/check/:id", http.HandlerFunc(controllers.CheckStreamHandler)) beego.Handler("/oc/logs/:id", http.HandlerFunc(controllers.LogsStreamHandler)) + beego.Handler("/oc/booking/stream", http.HandlerFunc(controllers.BookingStreamHandler)) + beego.Handler("/oc/execution/stream", http.HandlerFunc(controllers.ExecutionStreamHandler)) } diff --git a/swagger/swagger.json b/swagger/swagger.json index 2b72f8f..9647c56 100644 --- a/swagger/swagger.json +++ b/swagger/swagger.json @@ -13,7 +13,7 @@ "url": "https://www.gnu.org/licenses/agpl-3.0.html" } }, - "basePath": "/oc/", + "basePath": "/oc", "paths": { "/booking/": { "get": { @@ -264,6 +264,27 @@ "description": "{workflow} models.workflow" } } + }, + "delete": { + "tags": [ + "execution" + ], + "description": "find workflow by workflowid\n\u003cbr\u003e", + "operationId": "WorkflowExecutionController.Delete", + "parameters": [ + { + "in": "path", + "name": "id", + "description": "the workflowid you want to get", + "required": true, + "type": "string" + } + ], + "responses": { + "200": { + "description": "{workflow} models.workflow" + } + } } }, "/loki/{id}": { diff --git a/swagger/swagger.yml b/swagger/swagger.yml index 2b3847b..5eb3ef5 100644 --- a/swagger/swagger.yml +++ b/swagger/swagger.yml @@ -10,7 +10,7 @@ info: license: name: AGPL url: https://www.gnu.org/licenses/agpl-3.0.html -basePath: /oc/ +basePath: /oc paths: /{id}: delete: @@ -147,6 +147,22 @@ paths: responses: "200": description: '{workflow} models.workflow' + delete: + tags: + - execution + description: |- + find workflow by workflowid +
+ operationId: WorkflowExecutionController.Delete + parameters: + - in: path + name: id + description: the workflowid you want to get + required: true + type: string + responses: + "200": + description: '{workflow} models.workflow' /execution/search/{search}: get: tags: diff --git a/ws.go b/ws.go index 63e8c6a..e825d70 100644 --- a/ws.go +++ b/ws.go @@ -23,7 +23,7 @@ func main() { // ws://localhost:8090/oc//check // ws://localhost:8090/oc//check?as_possible=true // ws://localhost:8090/oc//check?as_possible=true&preemption=true - url := "ws://localhost:8090/oc/check/58314c99-c595-4ca2-8b5e-822a6774efed?as_possible=true" + url := "ws://localhost:8000/scheduler/check/58314c99-c595-4ca2-8b5e-822a6774efed?as_possible=true" token := "" // Body JSON envoyé comme premier message WebSocket (WorkflowSchedule). // Seuls start + duration_s sont requis si as_possible=true.