oc-scheduler -> scheduling + logs
This commit is contained in:
@@ -1,11 +1,15 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"oc-scheduler/infrastructure"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/dbs"
|
||||
@@ -27,12 +31,66 @@ var wsUpgrader = gorillaws.Upgrader{
|
||||
CheckOrigin: func(r *http.Request) bool { return true },
|
||||
}
|
||||
|
||||
var schedulerMu sync.RWMutex
|
||||
var scheduler = map[string]*infrastructure.WorkflowSchedule{}
|
||||
|
||||
func realPushCheckfunc(ctx context.Context, conn *gorillaws.Conn, req *tools.APIRequest, user string, ws infrastructure.WorkflowSchedule,
|
||||
executionsID string, wfID string, scheduled bool, asap bool, preemption bool, reschedule bool) (bool, error) {
|
||||
// If we already have draft bookings for this session and we're about to
|
||||
// re-check (timer refresh or planner update), remove the old drafts first
|
||||
// so the planner doesn't treat our own previous reservations as conflicts.
|
||||
if reschedule && scheduled {
|
||||
infrastructure.CleanupSession(executionsID, req)
|
||||
scheduled = false
|
||||
}
|
||||
workflowScheduler := ws
|
||||
|
||||
schedulerMu.Lock()
|
||||
if scheduler[user] != nil {
|
||||
workflowScheduler = *scheduler[user]
|
||||
}
|
||||
schedulerMu.Unlock()
|
||||
result, checkErr := workflowScheduler.Check(wfID, asap, preemption, req)
|
||||
fmt.Println("CHECK", checkErr)
|
||||
if checkErr != nil {
|
||||
return scheduled, checkErr
|
||||
}
|
||||
if result.Available && reschedule {
|
||||
workflowScheduler.Start = result.Start
|
||||
if result.End != nil {
|
||||
workflowScheduler.End = result.End
|
||||
}
|
||||
_, _, execs, purchases, bookings, err := workflowScheduler.GetBuyAndBook(wfID, req)
|
||||
if err != nil {
|
||||
fmt.Println("GetBuyAndBook", err)
|
||||
return scheduled, err
|
||||
}
|
||||
infrastructure.UpsertSessionDrafts(executionsID, execs, purchases, bookings, req)
|
||||
scheduled = true
|
||||
delay := workflowScheduler.Start.UTC().Add(-(1 * time.Minute)).Sub(time.Now().UTC())
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// Session closed before timer fired — nothing to do, CleanupSession
|
||||
// has already run (or will run) in the defer of CheckStreamHandler.
|
||||
return
|
||||
case <-time.After(delay):
|
||||
realPushCheckfunc(ctx, conn, req, user, ws, executionsID, wfID, scheduled, asap, preemption, true)
|
||||
}
|
||||
}()
|
||||
}
|
||||
result.SchedulingID = executionsID
|
||||
fmt.Println(result)
|
||||
return scheduled, conn.WriteJSON(result)
|
||||
}
|
||||
|
||||
// CheckStreamHandler is the WebSocket handler for slot availability checking.
|
||||
// Query params: as_possible=true, preemption=true
|
||||
func CheckStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var err error
|
||||
wfID := strings.TrimSuffix(
|
||||
strings.TrimPrefix(r.URL.Path, "/oc/"),
|
||||
"/check",
|
||||
strings.TrimPrefix(r.URL.Path, "/oc/check/"),
|
||||
"",
|
||||
)
|
||||
|
||||
q := r.URL.Query()
|
||||
@@ -49,22 +107,31 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
watchedPeers, err := infrastructure.GetWorkflowPeerIDs(wfID, req)
|
||||
fmt.Println("Watched peers for workflow", wfID, ":", watchedPeers)
|
||||
fmt.Println("Watched peers for workflow", wfID, ":", watchedPeers, err)
|
||||
if err != nil {
|
||||
http.Error(w, `{"code":404,"error":"`+err.Error()+`"}`, http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
conn, err := wsUpgrader.Upgrade(w, r, nil)
|
||||
fmt.Println("Upgrade :", err)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var ws infrastructure.WorkflowSchedule
|
||||
if err := conn.ReadJSON(&ws); err != nil {
|
||||
fmt.Println("ReadJSON :", err)
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
// Allow the initial JSON to override the query-param mode.
|
||||
if ws.Asap != nil {
|
||||
asap = *ws.Asap
|
||||
}
|
||||
if ws.Preemption != nil {
|
||||
preemption = *ws.Preemption
|
||||
}
|
||||
|
||||
plannerCh, plannerUnsub := infrastructure.SubscribePlannerUpdates(watchedPeers)
|
||||
wfCh, wfUnsub := infrastructure.SubscribeWorkflowUpdates(wfID)
|
||||
@@ -83,7 +150,9 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
scheduled := false
|
||||
confirmed := false
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer func() {
|
||||
cancel()
|
||||
conn.Close()
|
||||
plannerUnsub()
|
||||
wfUnsub()
|
||||
@@ -93,28 +162,9 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}()
|
||||
|
||||
pushCheck := func(reschedule bool) error {
|
||||
result, checkErr := ws.Check(wfID, asap, preemption, req)
|
||||
if checkErr != nil {
|
||||
return checkErr
|
||||
}
|
||||
if result.Available && reschedule {
|
||||
ws.Start = result.Start
|
||||
if result.End != nil {
|
||||
ws.End = result.End
|
||||
}
|
||||
_, _, execs, purchases, bookings, err := ws.GetBuyAndBook(wfID, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
infrastructure.UpsertSessionDrafts(executionsID, execs, purchases, bookings, req)
|
||||
scheduled = true
|
||||
}
|
||||
result.SchedulingID = executionsID
|
||||
return conn.WriteJSON(result)
|
||||
}
|
||||
|
||||
if err := pushCheck(true); err != nil {
|
||||
pushCheck := realPushCheckfunc
|
||||
if scheduled, err = pushCheck(ctx, conn, req, user, ws, executionsID, wfID, scheduled, asap, preemption, true); err != nil {
|
||||
fmt.Println("UPDATE CONFIRM FIRST scheduled", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -139,9 +189,17 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
for {
|
||||
select {
|
||||
case updated := <-updateCh:
|
||||
fmt.Println("updated FOUND ", updated)
|
||||
workflowScheduler := ws
|
||||
schedulerMu.Lock()
|
||||
if scheduler[user] != nil {
|
||||
workflowScheduler = *scheduler[user]
|
||||
}
|
||||
schedulerMu.Unlock()
|
||||
|
||||
if updated.Confirm {
|
||||
ws.UUID = executionsID
|
||||
_, _, _, schedErr := infrastructure.Schedule(&ws, wfID, req)
|
||||
workflowScheduler.UUID = executionsID
|
||||
_, _, _, schedErr := infrastructure.Schedule(&workflowScheduler, wfID, req)
|
||||
if schedErr != nil {
|
||||
_ = conn.WriteJSON(map[string]interface{}{
|
||||
"error": schedErr.Error(),
|
||||
@@ -149,38 +207,62 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
confirmed = true
|
||||
fmt.Println("UPDATE CONFIRM")
|
||||
return
|
||||
}
|
||||
changed := updated.Cron != ws.Cron ||
|
||||
!updated.Start.Equal(ws.Start) ||
|
||||
updated.DurationS != ws.DurationS ||
|
||||
(updated.End == nil) != (ws.End == nil) ||
|
||||
(updated.End != nil && ws.End != nil && !updated.End.Equal(*ws.End)) ||
|
||||
updated.BookingMode != ws.BookingMode ||
|
||||
!reflect.DeepEqual(updated.SelectedBillingStrategy, ws.SelectedBillingStrategy) ||
|
||||
!reflect.DeepEqual(updated.SelectedInstances, ws.SelectedInstances) ||
|
||||
!reflect.DeepEqual(updated.SelectedPartnerships, ws.SelectedPartnerships) ||
|
||||
!reflect.DeepEqual(updated.SelectedBuyings, ws.SelectedBuyings) ||
|
||||
!reflect.DeepEqual(updated.SelectedStrategies, ws.SelectedStrategies)
|
||||
// Detect mode change before updating local vars.
|
||||
modeChanged := (updated.Asap != nil && *updated.Asap != asap) ||
|
||||
(updated.Preemption != nil && *updated.Preemption != preemption)
|
||||
if updated.Asap != nil {
|
||||
asap = *updated.Asap
|
||||
}
|
||||
if updated.Preemption != nil {
|
||||
preemption = *updated.Preemption
|
||||
}
|
||||
changed := modeChanged ||
|
||||
updated.Cron != workflowScheduler.Cron ||
|
||||
!updated.Start.Equal(workflowScheduler.Start) ||
|
||||
updated.DurationS != workflowScheduler.DurationS ||
|
||||
(updated.End == nil) != (workflowScheduler.End == nil) ||
|
||||
(updated.End != nil && workflowScheduler.End != nil && !updated.End.Equal(*workflowScheduler.End)) ||
|
||||
updated.BookingMode != workflowScheduler.BookingMode ||
|
||||
!reflect.DeepEqual(updated.SelectedBillingStrategy, workflowScheduler.SelectedBillingStrategy) ||
|
||||
!reflect.DeepEqual(updated.SelectedInstances, workflowScheduler.SelectedInstances) ||
|
||||
!reflect.DeepEqual(updated.SelectedPartnerships, workflowScheduler.SelectedPartnerships) ||
|
||||
!reflect.DeepEqual(updated.SelectedBuyings, workflowScheduler.SelectedBuyings) ||
|
||||
!reflect.DeepEqual(updated.SelectedStrategies, workflowScheduler.SelectedStrategies)
|
||||
|
||||
infrastructure.CleanupSession(executionsID, req)
|
||||
ws = updated
|
||||
if err := pushCheck(changed || !scheduled); err != nil {
|
||||
|
||||
schedulerMu.Lock()
|
||||
scheduler[user] = &updated
|
||||
schedulerMu.Unlock()
|
||||
|
||||
if scheduled, err = pushCheck(ctx, conn, req, user, ws, executionsID, wfID, scheduled, asap, preemption, changed || !scheduled); err != nil {
|
||||
fmt.Println("UPDATE SCHEDULERD", err)
|
||||
return
|
||||
}
|
||||
|
||||
case remotePeerID := <-plannerCh:
|
||||
workflowScheduler := ws
|
||||
schedulerMu.Lock()
|
||||
if scheduler[user] != nil {
|
||||
workflowScheduler = *scheduler[user]
|
||||
}
|
||||
schedulerMu.Unlock()
|
||||
if remotePeerID == selfPeerID {
|
||||
if scheduled {
|
||||
continue
|
||||
}
|
||||
result, checkErr := ws.Check(wfID, asap, preemption, req)
|
||||
result, checkErr := workflowScheduler.Check(wfID, asap, preemption, req)
|
||||
if checkErr == nil {
|
||||
result.SchedulingID = executionsID
|
||||
_ = conn.WriteJSON(result)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err := pushCheck(scheduled); err != nil {
|
||||
if scheduled, err = pushCheck(ctx, conn, req, user, ws, executionsID, wfID, scheduled, asap, preemption, scheduled); err != nil {
|
||||
fmt.Println("UPDATE SCHEDULERD PLAN", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -192,11 +274,13 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
newOwned := infrastructure.RequestPlannerRefresh(newPeers, executionsID)
|
||||
ownedPeers = append(ownedPeers, newOwned...)
|
||||
}
|
||||
if err := pushCheck(false); err != nil {
|
||||
if scheduled, err = pushCheck(ctx, conn, req, user, ws, executionsID, wfID, scheduled, asap, preemption, false); err != nil {
|
||||
fmt.Println("UPDATE WORKFLOW", err)
|
||||
return
|
||||
}
|
||||
|
||||
case <-closeCh:
|
||||
fmt.Println("UPDATE Close ? ")
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -227,9 +311,13 @@ func (o *WorkflowSchedulerController) UnSchedule() {
|
||||
// @Title SearchScheduledDraftOrder
|
||||
// @Description search draft order for a workflow
|
||||
// @Param id path string true "id execution"
|
||||
// @Param offset query string false
|
||||
// @Param limit query string false
|
||||
// @Success 200 {workspace} models.workspace
|
||||
// @router /:id/order [get]
|
||||
// @router /order/:id [get]
|
||||
func (o *WorkflowSchedulerController) SearchScheduledDraftOrder() {
|
||||
offset, _ := strconv.Atoi(o.Ctx.Input.Query("offset"))
|
||||
limit, _ := strconv.Atoi(o.Ctx.Input.Query("limit"))
|
||||
_, peerID, _ := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
||||
id := o.Ctx.Input.Param(":id")
|
||||
filter := &dbs.Filters{
|
||||
@@ -238,6 +326,6 @@ func (o *WorkflowSchedulerController) SearchScheduledDraftOrder() {
|
||||
"order_by": {{Operator: dbs.EQUAL.String(), Value: peerID}},
|
||||
},
|
||||
}
|
||||
o.Data["json"] = oclib.NewRequestAdmin(orderCollection, nil).Search(filter, "", true)
|
||||
o.Data["json"] = oclib.NewRequestAdmin(orderCollection, nil).Search(filter, "", true, int64(offset), int64(limit))
|
||||
o.ServeJSON()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user