diff --git a/controllers/workflow_sheduler.go b/controllers/workflow_sheduler.go index 93c2c02..1319ba5 100644 --- a/controllers/workflow_sheduler.go +++ b/controllers/workflow_sheduler.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "oc-scheduler/infrastructure" + "strings" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" @@ -86,29 +87,19 @@ var wsUpgrader = gorillaws.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } -// @Title CheckStream -// @Description WebSocket stream of slot availability for a workflow. -// After the handshake the client sends one JSON frame containing the -// WorkflowSchedule parameters (start, end, booking_mode, duration_s, …). -// The server responds with a CheckResult frame immediately and again each time -// a planner for one of the workflow's storage/compute peers is updated. -// When the stream is interrupted the cache entries for those peers are evicted -// and a PB_CLOSE_PLANNER event is emitted on NATS. -// Query params: -// - as_possible=true ignore start date, search from now -// - preemption=true validate anyway, raise warnings -// -// @Param id path string true "workflow id" -// @Param as_possible query bool false "find nearest free slot from now" -// @Param preemption query bool false "validate anyway, raise warnings" -// @Success 101 -// @router /:id/check [get] -func (o *WorkflowSchedulerController) CheckStream() { - wfID := o.Ctx.Input.Param(":id") - asap, _ := o.GetBool("as_possible", false) - preemption, _ := o.GetBool("preemption", false) +// CheckStreamHandler is a plain http.HandlerFunc (registered via beego.Handler +// to avoid Beego's WriteHeader interference with the WebSocket upgrade). +// Path: /oc/:id/check → parts = ["", "oc", "", "check"] +// Query params: as_possible=true, preemption=true +func CheckStreamHandler(w http.ResponseWriter, r *http.Request) { + parts := strings.Split(strings.TrimSuffix(r.URL.Path, "/"), "/") + wfID := parts[len(parts)-2] // second-to-last segment - user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) + q := r.URL.Query() + asap := q.Get("as_possible") == "true" + preemption := q.Get("preemption") == "true" + + user, peerID, groups := oclib.ExtractTokenInfo(*r) req := &tools.APIRequest{ Username: user, PeerID: peerID, @@ -120,15 +111,16 @@ func (o *WorkflowSchedulerController) CheckStream() { // Resolve the peer IDs concerned by this workflow before upgrading so we // can abort cleanly with a plain HTTP error if the workflow is not found. watchedPeers, err := infrastructure.GetWorkflowPeerIDs(wfID, req) + fmt.Println("Here my watched peers involved in workflow", watchedPeers) if err != nil { - o.Data["json"] = map[string]interface{}{"code": 404, "error": err.Error()} - o.ServeJSON() + http.Error(w, `{"code":404,"error":"`+err.Error()+`"}`, http.StatusNotFound) return } // Upgrade to WebSocket. - conn, err := wsUpgrader.Upgrade(o.Ctx.ResponseWriter, o.Ctx.Request, nil) + conn, err := wsUpgrader.Upgrade(w, r, nil) if err != nil { + // gorilla already wrote the error response return } @@ -162,6 +154,7 @@ func (o *WorkflowSchedulerController) CheckStream() { push := func() error { result, checkErr := ws.Check(wfID, asap, preemption, req) + fmt.Println(result, checkErr) if checkErr != nil { return checkErr } diff --git a/go.mod b/go.mod index 1466789..0d26377 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-scheduler go 1.25.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260224093610-a9ebad78f3a8 + cloud.o-forge.io/core/oc-lib v0.0.0-20260304145747-e03a0d3dd0aa github.com/beego/beego/v2 v2.3.8 github.com/google/uuid v1.6.0 github.com/robfig/cron v1.2.0 diff --git a/go.sum b/go.sum index 0cf63bc..50aecff 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260224092928-54aef164ba10 h1:9i8fDtGjg3JDn cloud.o-forge.io/core/oc-lib v0.0.0-20260224092928-54aef164ba10/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= cloud.o-forge.io/core/oc-lib v0.0.0-20260224093610-a9ebad78f3a8 h1:xoC5PAz1469QxrNm8rrsq5+BtwshEt+L2Nhf90MrqrM= cloud.o-forge.io/core/oc-lib v0.0.0-20260224093610-a9ebad78f3a8/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260304145747-e03a0d3dd0aa h1:1wCpI4dwN1pj6MlpJ7/WifhHVHmCE4RU+9klwqgo/bk= +cloud.o-forge.io/core/oc-lib v0.0.0-20260304145747-e03a0d3dd0aa/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/beego/beego/v2 v2.3.8 h1:wplhB1pF4TxR+2SS4PUej8eDoH4xGfxuHfS7wAk9VBc= github.com/beego/beego/v2 v2.3.8/go.mod h1:8vl9+RrXqvodrl9C8yivX1e6le6deCK6RWeq8R7gTTg= diff --git a/infrastructure/scheduler.go b/infrastructure/scheduler.go index a4de7fa..c1f9ff6 100644 --- a/infrastructure/scheduler.go +++ b/infrastructure/scheduler.go @@ -199,7 +199,7 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* } fmt.Println("Schedules") - wf.GetAccessor(&tools.APIRequest{Admin: true}).UpdateOne(wf, wf.GetID()) + wf.GetAccessor(&tools.APIRequest{Admin: true}).UpdateOne(wf.Serialize(wf), wf.GetID()) return ws, wf, executions, nil } @@ -414,10 +414,10 @@ func (ws *WorkflowSchedule) Check(wfID string, asap bool, preemption bool, reque // 4. Extract booking-relevant (storage + compute) resources from the graph, // resolving the selected instance for each resource. checkables := collectBookingResources(wf, ws.SelectedInstances) - + fmt.Println(checkables) // 5. Check every resource against its peer's planner unavailable, warnings := checkResourceAvailability(checkables, start, end) - + fmt.Println(unavailable, warnings) result := &CheckResult{ Start: start, End: end, diff --git a/oc-scheduler b/oc-scheduler index 3d4b43f..f133e3e 100755 Binary files a/oc-scheduler and b/oc-scheduler differ diff --git a/routers/commentsRouter.go b/routers/commentsRouter.go index cd5890c..2484017 100644 --- a/routers/commentsRouter.go +++ b/routers/commentsRouter.go @@ -7,6 +7,42 @@ import ( func init() { + beego.GlobalControllerRouter["oc-scheduler/controllers:BookingController"] = append(beego.GlobalControllerRouter["oc-scheduler/controllers:BookingController"], + beego.ControllerComments{ + Method: "GetAll", + Router: `/`, + AllowHTTPMethods: []string{"get"}, + MethodParams: param.Make(), + Filters: nil, + Params: nil}) + + beego.GlobalControllerRouter["oc-scheduler/controllers:BookingController"] = append(beego.GlobalControllerRouter["oc-scheduler/controllers:BookingController"], + beego.ControllerComments{ + Method: "Get", + Router: `/:id`, + AllowHTTPMethods: []string{"get"}, + MethodParams: param.Make(), + Filters: nil, + Params: nil}) + + beego.GlobalControllerRouter["oc-scheduler/controllers:BookingController"] = append(beego.GlobalControllerRouter["oc-scheduler/controllers:BookingController"], + beego.ControllerComments{ + Method: "Search", + Router: `/search/:start_date/:end_date`, + AllowHTTPMethods: []string{"get"}, + MethodParams: param.Make(), + Filters: nil, + Params: nil}) + + beego.GlobalControllerRouter["oc-scheduler/controllers:BookingController"] = append(beego.GlobalControllerRouter["oc-scheduler/controllers:BookingController"], + beego.ControllerComments{ + Method: "ExecutionSearch", + Router: `/search/execution/:id`, + AllowHTTPMethods: []string{"get"}, + MethodParams: param.Make(), + Filters: nil, + Params: nil}) + beego.GlobalControllerRouter["oc-scheduler/controllers:LokiController"] = append(beego.GlobalControllerRouter["oc-scheduler/controllers:LokiController"], beego.ControllerComments{ Method: "GetLogs", @@ -88,6 +124,15 @@ func init() { Filters: nil, Params: nil}) + beego.GlobalControllerRouter["oc-scheduler/controllers:WorkflowSchedulerController"] = append(beego.GlobalControllerRouter["oc-scheduler/controllers:WorkflowSchedulerController"], + beego.ControllerComments{ + Method: "CheckStream", + Router: `/:id/check`, + AllowHTTPMethods: []string{"get"}, + MethodParams: param.Make(), + Filters: nil, + Params: nil}) + beego.GlobalControllerRouter["oc-scheduler/controllers:WorkflowSchedulerController"] = append(beego.GlobalControllerRouter["oc-scheduler/controllers:WorkflowSchedulerController"], beego.ControllerComments{ Method: "SearchScheduledDraftOrder", diff --git a/routers/router.go b/routers/router.go index ad6d463..914dd00 100644 --- a/routers/router.go +++ b/routers/router.go @@ -8,6 +8,7 @@ package routers import ( + "net/http" "oc-scheduler/controllers" beego "github.com/beego/beego/v2/server/web" @@ -41,4 +42,7 @@ func init() { ) beego.AddNamespace(ns) + + // Route WebSocket hors du pipeline Beego pour éviter le WriteHeader parasite + beego.Handler("/oc/:id/check", http.HandlerFunc(controllers.CheckStreamHandler)) } diff --git a/swagger/swagger.json b/swagger/swagger.json index 88fdeec..2bf0b47 100644 --- a/swagger/swagger.json +++ b/swagger/swagger.json @@ -15,6 +15,116 @@ }, "basePath": "/oc/", "paths": { + "/booking/": { + "get": { + "tags": [ + "booking" + ], + "description": "find booking by id\n\u003cbr\u003e", + "operationId": "BookingController.GetAll", + "parameters": [ + { + "in": "query", + "name": "is_draft", + "description": "draft wished", + "type": "string" + } + ], + "responses": { + "200": { + "description": "{booking} models.booking" + } + } + } + }, + "/booking/search/execution/{id}": { + "get": { + "tags": [ + "booking" + ], + "description": "search bookings by execution\n\u003cbr\u003e", + "operationId": "BookingController.Search", + "parameters": [ + { + "in": "path", + "name": "id", + "description": "id execution", + "required": true, + "type": "string" + }, + { + "in": "query", + "name": "is_draft", + "description": "draft wished", + "type": "string" + } + ], + "responses": { + "200": { + "description": "{workspace} models.workspace" + } + } + } + }, + "/booking/search/{start_date}/{end_date}": { + "get": { + "tags": [ + "booking" + ], + "description": "search bookings\n\u003cbr\u003e", + "operationId": "BookingController.Search", + "parameters": [ + { + "in": "path", + "name": "start_date", + "description": "the word search you want to get", + "required": true, + "type": "string" + }, + { + "in": "path", + "name": "end_date", + "description": "the word search you want to get", + "required": true, + "type": "string" + }, + { + "in": "query", + "name": "is_draft", + "description": "draft wished", + "type": "string" + } + ], + "responses": { + "200": { + "description": "{workspace} models.workspace" + } + } + } + }, + "/booking/{id}": { + "get": { + "tags": [ + "booking" + ], + "description": "find booking by id\n\u003cbr\u003e", + "operationId": "BookingController.Get", + "parameters": [ + { + "in": "path", + "name": "id", + "description": "the id you want to get", + "required": true, + "type": "string" + } + ], + "responses": { + "200": { + "description": "{booking} models.booking" + } + } + } + }, "/execution/": { "get": { "tags": [ @@ -240,6 +350,41 @@ } } }, + "/{id}/check": { + "get": { + "tags": [ + "oc-scheduler/controllersWorkflowSchedulerController" + ], + "description": "WebSocket stream of slot availability for a workflow.\n\u003cbr\u003e", + "operationId": "WorkflowSchedulerController.CheckStream", + "parameters": [ + { + "in": "path", + "name": "id", + "description": "workflow id", + "required": true, + "type": "string" + }, + { + "in": "query", + "name": "as_possible", + "description": "find nearest free slot from now", + "type": "boolean" + }, + { + "in": "query", + "name": "preemption", + "description": "validate anyway, raise warnings", + "type": "boolean" + } + ], + "responses": { + "101": { + "description": "" + } + } + } + }, "/{id}/order": { "get": { "tags": [ @@ -279,6 +424,10 @@ "name": "loki", "description": "Operations about workflow\n" }, + { + "name": "booking", + "description": "Operations about workspace\n" + }, { "name": "execution", "description": "Operations about workflow\n" diff --git a/swagger/swagger.yml b/swagger/swagger.yml index a9d4a41..726e6c1 100644 --- a/swagger/swagger.yml +++ b/swagger/swagger.yml @@ -57,6 +57,31 @@ paths: responses: "200": description: '{workspace} models.workspace' + /{id}/check: + get: + tags: + - oc-scheduler/controllersWorkflowSchedulerController + description: |- + WebSocket stream of slot availability for a workflow. +
+ operationId: WorkflowSchedulerController.CheckStream + parameters: + - in: path + name: id + description: workflow id + required: true + type: string + - in: query + name: as_possible + description: find nearest free slot from now + type: boolean + - in: query + name: preemption + description: validate anyway, raise warnings + type: boolean + responses: + "101": + description: "" /{id}/order: get: tags: @@ -74,6 +99,86 @@ paths: responses: "200": description: '{workspace} models.workspace' + /booking/: + get: + tags: + - booking + description: |- + find booking by id +
+ operationId: BookingController.GetAll + parameters: + - in: query + name: is_draft + description: draft wished + type: string + responses: + "200": + description: '{booking} models.booking' + /booking/{id}: + get: + tags: + - booking + description: |- + find booking by id +
+ operationId: BookingController.Get + parameters: + - in: path + name: id + description: the id you want to get + required: true + type: string + responses: + "200": + description: '{booking} models.booking' + /booking/search/{start_date}/{end_date}: + get: + tags: + - booking + description: |- + search bookings +
+ operationId: BookingController.Search + parameters: + - in: path + name: start_date + description: the word search you want to get + required: true + type: string + - in: path + name: end_date + description: the word search you want to get + required: true + type: string + - in: query + name: is_draft + description: draft wished + type: string + responses: + "200": + description: '{workspace} models.workspace' + /booking/search/execution/{id}: + get: + tags: + - booking + description: |- + search bookings by execution +
+ operationId: BookingController.Search + parameters: + - in: path + name: id + description: id execution + required: true + type: string + - in: query + name: is_draft + description: draft wished + type: string + responses: + "200": + description: '{workspace} models.workspace' /execution/: get: tags: @@ -205,6 +310,9 @@ tags: - name: loki description: | Operations about workflow +- name: booking + description: | + Operations about workspace - name: execution description: | Operations about workflow diff --git a/ws.go b/ws.go new file mode 100644 index 0000000..77e530f --- /dev/null +++ b/ws.go @@ -0,0 +1,115 @@ +//go:build ignore + +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "os" + "os/signal" + "time" + + "golang.org/x/net/websocket" +) + +func main() { + timeout := flag.Int("timeout", 30, "secondes sans message avant de quitter") + flag.Parse() + + args := flag.Args() + // Exemples de routes WS disponibles : + // ws://localhost:8090/oc//check + // ws://localhost:8090/oc//check?as_possible=true + // ws://localhost:8090/oc//check?as_possible=true&preemption=true + url := "ws://localhost:8090/oc/WORKFLOW_ID/check?as_possible=true" + token := "" + // Body JSON envoyé comme premier message WebSocket (WorkflowSchedule). + // Seuls start + duration_s sont requis si as_possible=true. + body := `{"start":"` + time.Now().UTC().Format(time.RFC3339) + `","duration_s":3600}` + + if len(args) >= 1 { + url = args[0] + } + if len(args) >= 2 { + token = args[1] + } + if len(args) >= 3 { + body = args[2] + } + + origin := "http://localhost/" + config, err := websocket.NewConfig(url, origin) + if err != nil { + log.Fatalf("Config invalide : %v", err) + } + if token != "" { + config.Header.Set("Authorization", "Bearer "+token) + fmt.Printf("Token : %s...\n", token[:min(20, len(token))]) + } + + fmt.Printf("Connexion à : %s\n", url) + ws, err := websocket.DialConfig(config) + if err != nil { + log.Fatalf("Impossible de se connecter : %v", err) + } + defer ws.Close() + fmt.Println("Connecté — envoi du body initial...") + + // Envoi du WorkflowSchedule comme premier message. + if err := websocket.Message.Send(ws, body); err != nil { + log.Fatalf("Impossible d'envoyer le body initial : %v", err) + } + fmt.Printf("Body envoyé : %s\n\nEn attente de messages...\n\n", body) + + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt) + + msgs := make(chan string) + errs := make(chan error, 1) + + go func() { + for { + var raw string + if err := websocket.Message.Receive(ws, &raw); err != nil { + errs <- err + return + } + msgs <- raw + } + }() + + idleTimer := time.NewTimer(time.Duration(*timeout) * time.Second) + defer idleTimer.Stop() + + for { + select { + case <-stop: + fmt.Println("\nInterruption — fermeture.") + return + case err := <-errs: + fmt.Printf("Connexion fermée : %v\n", err) + return + case <-idleTimer.C: + fmt.Printf("Timeout (%ds) — aucun message reçu, fermeture.\n", *timeout) + return + case raw := <-msgs: + idleTimer.Reset(time.Duration(*timeout) * time.Second) + var data any + if err := json.Unmarshal([]byte(raw), &data); err == nil { + b, _ := json.MarshalIndent(data, "", " ") + fmt.Println(string(b)) + } else { + fmt.Printf("Message brut : %s\n", raw) + } + } + } +} + +func min(a, b int) int { + if a < b { + return a + } + return b +}