Scheduler + Observe
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/config"
|
||||
beego "github.com/beego/beego/v2/server/web"
|
||||
@@ -97,53 +98,49 @@ func (o *LokiController) GetLogs() {
|
||||
// The server connects to Loki's /loki/api/v1/tail WebSocket endpoint and
|
||||
// forwards every message it receives until the client disconnects.
|
||||
func LogsStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Println("LogsStreamHandler")
|
||||
execID := strings.TrimSuffix(
|
||||
strings.TrimPrefix(r.URL.Path, "/oc/logs/"),
|
||||
"",
|
||||
)
|
||||
conn, err := wsUpgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
fmt.Println("LogsStreamHandler", err)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
/*
|
||||
var query map[string]interface{}
|
||||
if err := conn.ReadJSON(&query); err != nil {
|
||||
fmt.Println("LogsStreamHandler ReadJSON", err)
|
||||
return
|
||||
}
|
||||
*/
|
||||
|
||||
var query map[string]interface{}
|
||||
if err := conn.ReadJSON(&query); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
start := fmt.Sprintf("%v", query["start"])
|
||||
if len(start) > 10 {
|
||||
start = start[:10]
|
||||
}
|
||||
|
||||
start := time.Now().UTC().UnixNano()
|
||||
labels := []string{
|
||||
"workflow_execution_id=\"" + execID + "\"",
|
||||
}
|
||||
for k, v := range query {
|
||||
if k == "start" || k == "end" {
|
||||
continue
|
||||
}
|
||||
labels = append(labels, fmt.Sprintf("%v=\"%v\"", k, v))
|
||||
}
|
||||
|
||||
if len(labels) == 0 || len(start) < 10 {
|
||||
_ = conn.WriteJSON(map[string]string{"error": "missing start or query labels"})
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println("LOKI START", start, labels)
|
||||
// Build Loki tail WS URL (http→ws, https→wss).
|
||||
lokiBase := config.GetConfig().LokiUrl
|
||||
lokiBase = strings.Replace(lokiBase, "https://", "wss://", 1)
|
||||
lokiBase = strings.Replace(lokiBase, "http://", "ws://", 1)
|
||||
|
||||
lokiURL := lokiBase + "/loki/api/v1/tail?" + url.Values{
|
||||
"query": {"{" + strings.Join(labels, ", ") + "}"},
|
||||
"start": {start + "000000000"}, // seconds → nanoseconds
|
||||
"start": {fmt.Sprintf("%v", start)},
|
||||
}.Encode()
|
||||
|
||||
lokiConn, _, err := gorillaws.DefaultDialer.Dial(lokiURL, nil)
|
||||
headers := http.Header{}
|
||||
headers.Set("X-Scope-OrgID", "1")
|
||||
|
||||
lokiConn, resp, err := gorillaws.DefaultDialer.Dial(lokiURL, headers)
|
||||
fmt.Println("LOKI LISTEN", lokiBase, err)
|
||||
if err != nil {
|
||||
if resp != nil {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
fmt.Printf("Handshake failed: status=%d body=%s", resp.StatusCode, string(body))
|
||||
}
|
||||
_ = conn.WriteJSON(map[string]string{"error": "loki: " + err.Error()})
|
||||
return
|
||||
}
|
||||
@@ -161,6 +158,7 @@ func LogsStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
var result map[string]interface{}
|
||||
if json.Unmarshal(msg, &result) == nil {
|
||||
fmt.Println(result)
|
||||
if err := conn.WriteJSON(result); err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
|
||||
@@ -71,6 +71,7 @@ func realPushCheckfunc(ctx context.Context, conn *gorillaws.Conn, req *tools.API
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
infrastructure.CleanupSession(executionsID, req)
|
||||
// Session closed before timer fired — nothing to do, CleanupSession
|
||||
// has already run (or will run) in the defer of CheckStreamHandler.
|
||||
return
|
||||
@@ -87,6 +88,7 @@ func realPushCheckfunc(ctx context.Context, conn *gorillaws.Conn, req *tools.API
|
||||
// CheckStreamHandler is the WebSocket handler for slot availability checking.
|
||||
// Query params: as_possible=true, preemption=true
|
||||
func CheckStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Println("qksbdkqbsdkh")
|
||||
var err error
|
||||
wfID := strings.TrimSuffix(
|
||||
strings.TrimPrefix(r.URL.Path, "/oc/check/"),
|
||||
@@ -97,7 +99,7 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
asap := q.Get("as_possible") == "true"
|
||||
preemption := q.Get("preemption") == "true"
|
||||
|
||||
user, peerID, groups := oclib.ExtractTokenInfo(*r)
|
||||
user, peerID, groups := oclib.ExtractTokenInfoWs(*r)
|
||||
req := &tools.APIRequest{
|
||||
Username: user,
|
||||
PeerID: peerID,
|
||||
@@ -205,6 +207,7 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
workflowScheduler.UUID = executionsID
|
||||
_, _, _, schedErr := infrastructure.Schedule(&workflowScheduler, wfID, req)
|
||||
if schedErr != nil {
|
||||
infrastructure.CleanupSession(executionsID, req)
|
||||
_ = conn.WriteJSON(map[string]interface{}{
|
||||
"error": schedErr.Error(),
|
||||
})
|
||||
@@ -213,8 +216,10 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
fmt.Println("UPDATE CONFIRM — waiting for execution confirmation")
|
||||
select {
|
||||
case <-confirmCh:
|
||||
fmt.Println("UPDATE CONFIRM done")
|
||||
confirmed = true
|
||||
_ = conn.WriteJSON(map[string]interface{}{
|
||||
"confirmed": true,
|
||||
"confirmed": true,
|
||||
"scheduling_id": executionsID,
|
||||
})
|
||||
case <-time.After(60 * time.Second):
|
||||
@@ -225,8 +230,10 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
case <-ctx.Done():
|
||||
// client disconnected before confirmation
|
||||
}
|
||||
confirmed = true
|
||||
fmt.Println("UPDATE CONFIRM done")
|
||||
if !confirmed {
|
||||
infrastructure.CleanupSession(executionsID, req)
|
||||
fmt.Println("UPDATE CONFIRM not done")
|
||||
}
|
||||
return
|
||||
}
|
||||
// Detect mode change before updating local vars.
|
||||
@@ -249,7 +256,8 @@ func CheckStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
!reflect.DeepEqual(updated.SelectedInstances, workflowScheduler.SelectedInstances) ||
|
||||
!reflect.DeepEqual(updated.SelectedPartnerships, workflowScheduler.SelectedPartnerships) ||
|
||||
!reflect.DeepEqual(updated.SelectedBuyings, workflowScheduler.SelectedBuyings) ||
|
||||
!reflect.DeepEqual(updated.SelectedStrategies, workflowScheduler.SelectedStrategies)
|
||||
!reflect.DeepEqual(updated.SelectedStrategies, workflowScheduler.SelectedStrategies) ||
|
||||
!reflect.DeepEqual(updated.SelectedEmbeddedStorages, workflowScheduler.SelectedEmbeddedStorages)
|
||||
|
||||
infrastructure.CleanupSession(executionsID, req)
|
||||
|
||||
|
||||
254
controllers/streams.go
Normal file
254
controllers/streams.go
Normal file
@@ -0,0 +1,254 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/dbs"
|
||||
"cloud.o-forge.io/core/oc-lib/models/booking"
|
||||
libutils "cloud.o-forge.io/core/oc-lib/models/utils"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
|
||||
// streamMsg is the envelope pushed over every stream WebSocket.
|
||||
type streamMsg struct {
|
||||
Type string `json:"type"` // "snapshot" | "update" | "delete"
|
||||
Data interface{} `json:"data,omitempty"`
|
||||
Deleted bool `json:"deleted,omitempty"`
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Booking stream
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// BookingStreamHandler opens a WebSocket that:
|
||||
// 1. sends an immediate snapshot of matching bookings ("snapshot")
|
||||
// 2. pushes each subsequent create/update/delete as an individual "update" or
|
||||
// "delete" message.
|
||||
//
|
||||
// Query params (all optional):
|
||||
//
|
||||
// executions_id — filter to a specific scheduling session
|
||||
// is_draft — "true" | "false" (omit = non-draft)
|
||||
// start_date — YYYY-MM-DD (expected_start_date >=)
|
||||
// end_date — YYYY-MM-DD (expected_start_date <=)
|
||||
func BookingStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
user, peerID, groups := oclib.ExtractTokenInfoWs(*r)
|
||||
|
||||
q := r.URL.Query()
|
||||
executionID := q.Get("execution_id")
|
||||
executionsID := q.Get("executions_id")
|
||||
isDraftStr := q.Get("is_draft")
|
||||
onlyDraft := isDraftStr == "true"
|
||||
filterDraft := isDraftStr != "" // whether the caller wants draft filtering at all
|
||||
startDate, _ := time.ParseInLocation("2006-01-02", q.Get("start_date"), time.UTC)
|
||||
endDate, _ := time.ParseInLocation("2006-01-02", q.Get("end_date"), time.UTC)
|
||||
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
matchesFilter := func(b *booking.Booking) bool {
|
||||
if executionID != "" && b.GetID() != executionID {
|
||||
return false
|
||||
}
|
||||
if executionsID != "" && b.ExecutionsID != executionsID {
|
||||
return false
|
||||
}
|
||||
if filterDraft && b.IsDraft != onlyDraft {
|
||||
return false
|
||||
}
|
||||
if !startDate.IsZero() && b.ExpectedStartDate.Before(startDate) {
|
||||
return false
|
||||
}
|
||||
if !endDate.IsZero() && b.ExpectedStartDate.After(endDate) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Build snapshot filters
|
||||
andF := map[string][]dbs.Filter{}
|
||||
if executionID != "" {
|
||||
andF["id"] = []dbs.Filter{{Operator: dbs.EQUAL.String(), Value: executionID}}
|
||||
}
|
||||
if executionsID != "" {
|
||||
andF["executions_id"] = []dbs.Filter{{Operator: dbs.EQUAL.String(), Value: executionsID}}
|
||||
}
|
||||
if !startDate.IsZero() {
|
||||
andF["expected_start_date"] = append(andF["expected_start_date"],
|
||||
dbs.Filter{Operator: "gte", Value: primitive.NewDateTimeFromTime(startDate)})
|
||||
}
|
||||
if !endDate.IsZero() {
|
||||
andF["expected_start_date"] = append(andF["expected_start_date"],
|
||||
dbs.Filter{Operator: "lte", Value: primitive.NewDateTimeFromTime(endDate)})
|
||||
}
|
||||
var snapshotFilter *dbs.Filters
|
||||
if len(andF) > 0 {
|
||||
snapshotFilter = &dbs.Filters{And: andF}
|
||||
}
|
||||
|
||||
snapshot := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), user, peerID, groups, nil).
|
||||
Search(snapshotFilter, "", onlyDraft, 0, 10000)
|
||||
if err := conn.WriteJSON(streamMsg{Type: "snapshot", Data: snapshot.Data}); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
changeCh, unsub := libutils.SubscribeChanges(tools.BOOKING)
|
||||
defer unsub()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// detect client disconnect
|
||||
closeCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(closeCh)
|
||||
for {
|
||||
if _, _, err := conn.ReadMessage(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case evt := <-changeCh:
|
||||
b, ok := evt.Object.(*booking.Booking)
|
||||
if !ok || !matchesFilter(b) {
|
||||
continue
|
||||
}
|
||||
if evt.Deleted {
|
||||
_ = conn.WriteJSON(streamMsg{Type: "delete", Data: b, Deleted: true})
|
||||
} else {
|
||||
_ = conn.WriteJSON(streamMsg{Type: "update", Data: b})
|
||||
}
|
||||
case <-closeCh:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// WorkflowExecution stream
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// ExecutionStreamHandler opens a WebSocket that:
|
||||
// 1. sends an immediate snapshot of matching executions ("snapshot")
|
||||
// 2. pushes each subsequent create/update/delete as "update" or "delete".
|
||||
//
|
||||
// Query params (all optional):
|
||||
//
|
||||
// executions_id — filter to a specific scheduling session
|
||||
// is_draft — "true" | "false" (omit = non-draft)
|
||||
// start_date — YYYY-MM-DD (execution_date >=)
|
||||
// end_date — YYYY-MM-DD (execution_date <=)
|
||||
func ExecutionStreamHandler(w http.ResponseWriter, r *http.Request) {
|
||||
user, peerID, groups := oclib.ExtractTokenInfoWs(*r)
|
||||
|
||||
q := r.URL.Query()
|
||||
executionID := q.Get("execution_id")
|
||||
executionsID := q.Get("executions_id")
|
||||
isDraftStr := q.Get("is_draft")
|
||||
onlyDraft := isDraftStr == "true"
|
||||
filterDraft := isDraftStr != ""
|
||||
startDate, _ := time.ParseInLocation("2006-01-02", q.Get("start_date"), time.UTC)
|
||||
endDate, _ := time.ParseInLocation("2006-01-02", q.Get("end_date"), time.UTC)
|
||||
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
matchesFilter := func(e *workflow_execution.WorkflowExecution) bool {
|
||||
if executionID != "" && e.GetID() != executionID {
|
||||
return false
|
||||
}
|
||||
if executionsID != "" && e.ExecutionsID != executionsID {
|
||||
return false
|
||||
}
|
||||
if filterDraft && e.IsDraft != onlyDraft {
|
||||
return false
|
||||
}
|
||||
if !startDate.IsZero() && e.ExecDate.Before(startDate) {
|
||||
return false
|
||||
}
|
||||
if !endDate.IsZero() && e.ExecDate.After(endDate) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Build snapshot filters
|
||||
andF := map[string][]dbs.Filter{}
|
||||
if executionID != "" {
|
||||
andF["id"] = []dbs.Filter{{Operator: dbs.EQUAL.String(), Value: executionID}}
|
||||
}
|
||||
if executionsID != "" {
|
||||
andF["executions_id"] = []dbs.Filter{{Operator: dbs.EQUAL.String(), Value: executionsID}}
|
||||
}
|
||||
if !startDate.IsZero() {
|
||||
andF["execution_date"] = append(andF["execution_date"],
|
||||
dbs.Filter{Operator: "gte", Value: primitive.NewDateTimeFromTime(startDate)})
|
||||
}
|
||||
if !endDate.IsZero() {
|
||||
andF["execution_date"] = append(andF["execution_date"],
|
||||
dbs.Filter{Operator: "lte", Value: primitive.NewDateTimeFromTime(endDate)})
|
||||
}
|
||||
var snapshotFilter *dbs.Filters
|
||||
if len(andF) > 0 {
|
||||
snapshotFilter = &dbs.Filters{And: andF}
|
||||
}
|
||||
|
||||
snapshot := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), user, peerID, groups, nil).
|
||||
Search(snapshotFilter, "", onlyDraft, 0, 10000)
|
||||
if err := conn.WriteJSON(streamMsg{Type: "snapshot", Data: snapshot.Data}); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
changeCh, unsub := libutils.SubscribeChanges(tools.WORKFLOW_EXECUTION)
|
||||
defer unsub()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
closeCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(closeCh)
|
||||
for {
|
||||
if _, _, err := conn.ReadMessage(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case evt := <-changeCh:
|
||||
e, ok := evt.Object.(*workflow_execution.WorkflowExecution)
|
||||
fmt.Println("CHANGE!", e, ok, matchesFilter(e))
|
||||
if !ok || !matchesFilter(e) {
|
||||
continue
|
||||
}
|
||||
if evt.Deleted {
|
||||
_ = conn.WriteJSON(streamMsg{Type: "delete", Data: e, Deleted: true})
|
||||
} else {
|
||||
_ = conn.WriteJSON(streamMsg{Type: "update", Data: e})
|
||||
}
|
||||
case <-closeCh:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user