package controllers import ( "context" "fmt" "net/http" "oc-scheduler/infrastructure" "reflect" "strconv" "strings" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/tools" beego "github.com/beego/beego/v2/server/web" "github.com/google/uuid" gorillaws "github.com/gorilla/websocket" ) var orderCollection = oclib.LibDataEnum(oclib.ORDER) var logger = oclib.GetLogger() // Operations about workflow type WorkflowSchedulerController struct { beego.Controller } var wsUpgrader = gorillaws.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true }, } var schedulerMu sync.RWMutex var scheduler = map[string]*infrastructure.WorkflowSchedule{} func realPushCheckfunc(ctx context.Context, conn *gorillaws.Conn, req *tools.APIRequest, user string, ws infrastructure.WorkflowSchedule, executionsID string, wfID string, scheduled bool, asap bool, preemption bool, reschedule bool) (bool, error) { // If we already have draft bookings for this session and we're about to // re-check (timer refresh or planner update), remove the old drafts first // so the planner doesn't treat our own previous reservations as conflicts. if reschedule && scheduled { infrastructure.CleanupSession(executionsID, req) scheduled = false } workflowScheduler := ws schedulerMu.Lock() if scheduler[user] != nil { workflowScheduler = *scheduler[user] } schedulerMu.Unlock() result, checkErr := workflowScheduler.Check(wfID, asap, preemption, req) fmt.Println("CHECK", checkErr) if checkErr != nil { return scheduled, checkErr } if result.Available && reschedule { workflowScheduler.Start = result.Start if result.End != nil { workflowScheduler.End = result.End } _, _, execs, purchases, bookings, err := workflowScheduler.GetBuyAndBook(wfID, req) if err != nil { fmt.Println("GetBuyAndBook", err) return scheduled, err } infrastructure.UpsertSessionDrafts(executionsID, execs, purchases, bookings, req) scheduled = true delay := workflowScheduler.Start.UTC().Add(-(1 * time.Minute)).Sub(time.Now().UTC()) go func() { select { case <-ctx.Done(): // Session closed before timer fired — nothing to do, CleanupSession // has already run (or will run) in the defer of CheckStreamHandler. return case <-time.After(delay): realPushCheckfunc(ctx, conn, req, user, ws, executionsID, wfID, scheduled, asap, preemption, true) } }() } result.SchedulingID = executionsID fmt.Println(result) return scheduled, conn.WriteJSON(result) } // CheckStreamHandler is the WebSocket handler for slot availability checking. // Query params: as_possible=true, preemption=true func CheckStreamHandler(w http.ResponseWriter, r *http.Request) { var err error wfID := strings.TrimSuffix( strings.TrimPrefix(r.URL.Path, "/oc/check/"), "", ) q := r.URL.Query() asap := q.Get("as_possible") == "true" preemption := q.Get("preemption") == "true" user, peerID, groups := oclib.ExtractTokenInfo(*r) req := &tools.APIRequest{ Username: user, PeerID: peerID, Groups: groups, Caller: nil, Admin: true, } watchedPeers, err := infrastructure.GetWorkflowPeerIDs(wfID, req) fmt.Println("Watched peers for workflow", wfID, ":", watchedPeers, err) if err != nil { http.Error(w, `{"code":404,"error":"`+err.Error()+`"}`, http.StatusNotFound) return } conn, err := wsUpgrader.Upgrade(w, r, nil) fmt.Println("Upgrade :", err) if err != nil { return } var ws infrastructure.WorkflowSchedule if err := conn.ReadJSON(&ws); err != nil { fmt.Println("ReadJSON :", err) conn.Close() return } // Allow the initial JSON to override the query-param mode. if ws.Asap != nil { asap = *ws.Asap } if ws.Preemption != nil { preemption = *ws.Preemption } plannerCh, plannerUnsub := infrastructure.SubscribePlannerUpdates(watchedPeers) wfCh, wfUnsub := infrastructure.SubscribeWorkflowUpdates(wfID) executionsID := uuid.New().String() ownedPeers := infrastructure.RequestPlannerRefresh(watchedPeers, executionsID) self, err := oclib.GetMySelf() if err != nil || self == nil { logger.Err(err).Msg("could not resolve self peer") conn.Close() return } selfPeerID := self.PeerID scheduled := false confirmed := false ctx, cancel := context.WithCancel(context.Background()) defer func() { cancel() conn.Close() plannerUnsub() wfUnsub() infrastructure.ReleaseRefreshOwnership(ownedPeers, executionsID) if !confirmed { infrastructure.CleanupSession(executionsID, req) } }() pushCheck := realPushCheckfunc if scheduled, err = pushCheck(ctx, conn, req, user, ws, executionsID, wfID, scheduled, asap, preemption, true); err != nil { fmt.Println("UPDATE CONFIRM FIRST scheduled", err) return } updateCh := make(chan infrastructure.WorkflowSchedule, 1) closeCh := make(chan struct{}) go func() { defer close(closeCh) for { var updated infrastructure.WorkflowSchedule if err := conn.ReadJSON(&updated); err != nil { return } select { case updateCh <- updated: default: <-updateCh updateCh <- updated } } }() for { select { case updated := <-updateCh: fmt.Println("updated FOUND ", updated) workflowScheduler := ws schedulerMu.Lock() if scheduler[user] != nil { workflowScheduler = *scheduler[user] } schedulerMu.Unlock() if updated.Confirm { // Subscribe BEFORE calling Schedule to avoid missing the notification. confirmCh, confirmUnsub := infrastructure.SubscribeSessionConfirmation(executionsID) defer confirmUnsub() workflowScheduler.UUID = executionsID _, _, _, schedErr := infrastructure.Schedule(&workflowScheduler, wfID, req) if schedErr != nil { _ = conn.WriteJSON(map[string]interface{}{ "error": schedErr.Error(), }) return } fmt.Println("UPDATE CONFIRM — waiting for execution confirmation") select { case <-confirmCh: _ = conn.WriteJSON(map[string]interface{}{ "confirmed": true, "scheduling_id": executionsID, }) case <-time.After(60 * time.Second): _ = conn.WriteJSON(map[string]interface{}{ "confirmed": false, "error": "confirmation timeout: scheduling accepted but peers did not confirm in time", }) case <-ctx.Done(): // client disconnected before confirmation } confirmed = true fmt.Println("UPDATE CONFIRM done") return } // Detect mode change before updating local vars. modeChanged := (updated.Asap != nil && *updated.Asap != asap) || (updated.Preemption != nil && *updated.Preemption != preemption) if updated.Asap != nil { asap = *updated.Asap } if updated.Preemption != nil { preemption = *updated.Preemption } changed := modeChanged || updated.Cron != workflowScheduler.Cron || !updated.Start.Equal(workflowScheduler.Start) || updated.DurationS != workflowScheduler.DurationS || (updated.End == nil) != (workflowScheduler.End == nil) || (updated.End != nil && workflowScheduler.End != nil && !updated.End.Equal(*workflowScheduler.End)) || updated.BookingMode != workflowScheduler.BookingMode || !reflect.DeepEqual(updated.SelectedBillingStrategy, workflowScheduler.SelectedBillingStrategy) || !reflect.DeepEqual(updated.SelectedInstances, workflowScheduler.SelectedInstances) || !reflect.DeepEqual(updated.SelectedPartnerships, workflowScheduler.SelectedPartnerships) || !reflect.DeepEqual(updated.SelectedBuyings, workflowScheduler.SelectedBuyings) || !reflect.DeepEqual(updated.SelectedStrategies, workflowScheduler.SelectedStrategies) infrastructure.CleanupSession(executionsID, req) schedulerMu.Lock() scheduler[user] = &updated schedulerMu.Unlock() if scheduled, err = pushCheck(ctx, conn, req, user, ws, executionsID, wfID, scheduled, asap, preemption, changed || !scheduled); err != nil { fmt.Println("UPDATE SCHEDULERD", err) return } case remotePeerID := <-plannerCh: workflowScheduler := ws schedulerMu.Lock() if scheduler[user] != nil { workflowScheduler = *scheduler[user] } schedulerMu.Unlock() if remotePeerID == selfPeerID { if scheduled { continue } result, checkErr := workflowScheduler.Check(wfID, asap, preemption, req) if checkErr == nil { result.SchedulingID = executionsID _ = conn.WriteJSON(result) } continue } if scheduled, err = pushCheck(ctx, conn, req, user, ws, executionsID, wfID, scheduled, asap, preemption, scheduled); err != nil { fmt.Println("UPDATE SCHEDULERD PLAN", err) return } case <-wfCh: if newPeers, err := infrastructure.GetWorkflowPeerIDs(wfID, req); err == nil { plannerUnsub() watchedPeers = newPeers plannerCh, plannerUnsub = infrastructure.SubscribePlannerUpdates(newPeers) newOwned := infrastructure.RequestPlannerRefresh(newPeers, executionsID) ownedPeers = append(ownedPeers, newOwned...) } if scheduled, err = pushCheck(ctx, conn, req, user, ws, executionsID, wfID, scheduled, asap, preemption, false); err != nil { fmt.Println("UPDATE WORKFLOW", err) return } case <-closeCh: fmt.Println("UPDATE Close ? ") return } } } // @Title UnSchedule // @Description unschedule a workflow execution: deletes its bookings on all peers then deletes the execution. // @Param id path string true "execution id" // @Success 200 {object} map[string]interface{} // @router /:id [delete] func (o *WorkflowSchedulerController) UnSchedule() { user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) executionID := o.Ctx.Input.Param(":id") req := &tools.APIRequest{ Username: user, PeerID: peerID, Groups: groups, Admin: true, } if err := infrastructure.UnscheduleExecution(executionID, req); err != nil { o.Data["json"] = map[string]interface{}{"code": 404, "error": err.Error()} } else { o.Data["json"] = map[string]interface{}{"code": 200, "error": ""} } o.ServeJSON() } // @Title SearchScheduledDraftOrder // @Description search draft order for a workflow // @Param id path string true "id execution" // @Param offset query string false // @Param limit query string false // @Success 200 {workspace} models.workspace // @router /order/:id [get] func (o *WorkflowSchedulerController) SearchScheduledDraftOrder() { offset, _ := strconv.Atoi(o.Ctx.Input.Query("offset")) limit, _ := strconv.Atoi(o.Ctx.Input.Query("limit")) _, peerID, _ := oclib.ExtractTokenInfo(*o.Ctx.Request) id := o.Ctx.Input.Param(":id") filter := &dbs.Filters{ And: map[string][]dbs.Filter{ "workflow_id": {{Operator: dbs.EQUAL.String(), Value: id}}, "order_by": {{Operator: dbs.EQUAL.String(), Value: peerID}}, }, } o.Data["json"] = oclib.NewRequestAdmin(orderCollection, nil).Search(filter, "", true, int64(offset), int64(limit)) o.ServeJSON() }