1 Commits

Author SHA1 Message Date
mr
d267c88fb5 Event Scheduling/Listening 2026-01-14 15:16:19 +01:00
4 changed files with 74 additions and 14 deletions

View File

@@ -1,12 +1,14 @@
package daemons package daemons
import ( import (
"fmt"
"oc-schedulerd/conf" "oc-schedulerd/conf"
"time" "time"
oclib "cloud.o-forge.io/core/oc-lib" oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
workflow_execution "cloud.o-forge.io/core/oc-lib/models/workflow_execution" workflow_execution "cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"go.mongodb.org/mongo-driver/bson/primitive"
) )
var Executions = ScheduledExecution{Execs: map[string]workflow_execution.WorkflowExecution{}} var Executions = ScheduledExecution{Execs: map[string]workflow_execution.WorkflowExecution{}}
@@ -18,23 +20,53 @@ type ExecutionManager struct{}
func (em *ExecutionManager) RetrieveNextExecutions() { func (em *ExecutionManager) RetrieveNextExecutions() {
logger := oclib.GetLogger() logger := oclib.GetLogger()
for { for {
fmt.Println("Checking for executions", len(Executions.Execs))
Executions.Mu.Lock() Executions.Mu.Lock()
if len(Executions.Execs) > 0 { if len(Executions.Execs) > 0 {
executions := Executions.Execs executions := Executions.Execs
orderedExec := map[int]map[string]workflow_execution.WorkflowExecution{}
for execId, exec := range executions { for execId, exec := range executions {
if exec.ExecDate.Before(time.Now().UTC()) { if orderedExec[exec.Priority] == nil {
logger.Info().Msg("Will execute " + execId + " soon") orderedExec[exec.Priority] = map[string]workflow_execution.WorkflowExecution{}
go em.executeExecution(&exec) }
delete(executions, execId) orderedExec[exec.Priority][execId] = exec
}
for i := range []int{7, 6, 5, 4, 3, 2, 1, 0} { // priority in reversed
if orderedExec[i] == nil {
continue
}
for execId, exec := range orderedExec[i] {
if i == 0 && em.isAStartingExecutionBeforeEnd(&exec) { // BEST EFFORT exception
continue
}
if exec.ExecDate.Before(time.Now().UTC()) {
logger.Info().Msg("Will execute " + execId + " soon")
go em.executeExecution(&exec)
delete(executions, execId)
}
} }
} }
} }
Executions.Mu.Unlock() Executions.Mu.Unlock()
time.Sleep(time.Second) time.Sleep(time.Second)
} }
} }
func (em *ExecutionManager) isAStartingExecutionBeforeEnd(execution *workflow_execution.WorkflowExecution) bool {
access := workflow_execution.NewAccessor(nil)
l, _, err := access.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"execution_date": {{Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(*execution.EndDate)}},
"state": {{Operator: dbs.EQUAL.String(), Value: enum.SCHEDULED}},
}, // TODO later should refine on each endpoint
}, "", false)
if err != nil && len(l) == 0 {
return false
}
return true
}
func (em *ExecutionManager) executeExecution(execution *workflow_execution.WorkflowExecution) { func (em *ExecutionManager) executeExecution(execution *workflow_execution.WorkflowExecution) {
// start execution // start execution
// create the yaml that describes the pod : filename, path/url to Loki // create the yaml that describes the pod : filename, path/url to Loki

View File

@@ -9,8 +9,11 @@ import (
oclib "cloud.o-forge.io/core/oc-lib" oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
@@ -67,9 +70,10 @@ func (s *ScheduleManager) ListenNATS() {
} }
defer nc.Close() defer nc.Close()
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(2) wg.Add(3)
go s.listenForChange(nc, tools.REMOVE.GenerateKey(oclib.WORKFLOW.String()), true, wg) go s.listenForChange(nc, tools.REMOVE.GenerateKey(oclib.WORKFLOW.String()), tools.REMOVE, wg)
go s.listenForChange(nc, tools.CREATE.GenerateKey(oclib.WORKFLOW.String()), false, wg) go s.listenForChange(nc, tools.CREATE.GenerateKey(oclib.WORKFLOW.String()), tools.CREATE, wg)
go s.listenForChange(nc, tools.WORKFLOW_EVENT.GenerateKey(oclib.WORKFLOW.String()), tools.WORKFLOW_EVENT, wg)
wg.Wait() wg.Wait()
break break
} }
@@ -78,7 +82,7 @@ func (s *ScheduleManager) ListenNATS() {
// Goroutine listening to a NATS server for updates // Goroutine listening to a NATS server for updates
// on workflows' scheduling. Messages must contain // on workflows' scheduling. Messages must contain
// workflow execution ID, to allow retrieval of execution infos // workflow execution ID, to allow retrieval of execution infos
func (s *ScheduleManager) listenForChange(nc *nats.Conn, chanName string, delete bool, wg sync.WaitGroup) { func (s *ScheduleManager) listenForChange(nc *nats.Conn, chanName string, natsTools tools.NATSMethod, wg sync.WaitGroup) {
defer wg.Done() defer wg.Done()
ch := make(chan *nats.Msg, 64) ch := make(chan *nats.Msg, 64)
fmt.Println("Listening to " + chanName) fmt.Println("Listening to " + chanName)
@@ -92,14 +96,17 @@ func (s *ScheduleManager) listenForChange(nc *nats.Conn, chanName string, delete
map_mess := map[string]string{} map_mess := map[string]string{}
json.Unmarshal(msg.Data, &map_mess) json.Unmarshal(msg.Data, &map_mess)
str := "new" str := "new"
if delete { if natsTools == tools.NATSMethod(tools.DELETE) {
str = "deleted" str = "deleted"
} }
fmt.Println("Catching " + str + " workflow... " + map_mess["id"]) fmt.Println("Catching " + str + " workflow... " + map_mess["id"])
if delete { switch natsTools {
case tools.NATSMethod(tools.DELETE):
Executions.DeleteSchedules(map_mess["id"]) Executions.DeleteSchedules(map_mess["id"])
} else { case tools.NATSMethod(tools.CREATE):
s.getNextScheduledWorkflows(1) s.getNextScheduledWorkflows(1)
case tools.NATSMethod(tools.WORKFLOW_EVENT):
s.executeWorkflow(map_mess["workflow_id"])
} }
} }
} }
@@ -134,6 +141,21 @@ func (s *ScheduleManager) getExecution(from time.Time, to time.Time) (exec_list
return return
} }
func (s *ScheduleManager) executeWorkflow(wfID string) {
res := resources.WorkflowResource{}
access := res.GetAccessor(&tools.APIRequest{})
if d, code, err := access.LoadOne(wfID); code == 200 && err == nil {
eventExec := &workflow_execution.WorkflowExecution{
WorkflowID: d.GetID(),
ExecDate: time.Now(),
ExecutionsID: uuid.New().String(),
State: enum.SCHEDULED,
}
Executions.AddSchedules([]*workflow_execution.WorkflowExecution{eventExec}, s.Logger)
}
}
func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) { func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) {
start := time.Now().UTC() start := time.Now().UTC()
fmt.Println(s.getExecution( fmt.Println(s.getExecution(

2
go.mod
View File

@@ -5,7 +5,7 @@ go 1.23.0
toolchain go1.24.0 toolchain go1.24.0
require ( require (
cloud.o-forge.io/core/oc-lib v0.0.0-20250808141553-f4b0cf5683de cloud.o-forge.io/core/oc-lib v0.0.0-20260113155325-5cdfc28d2f51
github.com/beego/beego v1.12.12 github.com/beego/beego v1.12.12
github.com/goraz/onion v0.1.3 github.com/goraz/onion v0.1.3
github.com/nats-io/nats.go v1.44.0 github.com/nats-io/nats.go v1.44.0

6
go.sum
View File

@@ -1,5 +1,11 @@
cloud.o-forge.io/core/oc-lib v0.0.0-20250808141553-f4b0cf5683de h1:s47eEnWRCjBMOxbec5ROHztuwu0Zo7MuXgqWizgkiXU= cloud.o-forge.io/core/oc-lib v0.0.0-20250808141553-f4b0cf5683de h1:s47eEnWRCjBMOxbec5ROHztuwu0Zo7MuXgqWizgkiXU=
cloud.o-forge.io/core/oc-lib v0.0.0-20250808141553-f4b0cf5683de/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= cloud.o-forge.io/core/oc-lib v0.0.0-20250808141553-f4b0cf5683de/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20260112132629-be770ec763b1 h1:MOllR71ruHvBZK3nxeOAnHa9xjnotnlngMEGZqEBp2g=
cloud.o-forge.io/core/oc-lib v0.0.0-20260112132629-be770ec763b1/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20260112144037-c35b06e0bc3c h1:9lXrj1agE1clFfxOXRrVXi4PEvlAuWKb4z977c2uk4k=
cloud.o-forge.io/core/oc-lib v0.0.0-20260112144037-c35b06e0bc3c/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20260113155325-5cdfc28d2f51 h1:jlSEprNaUBe628uP9a9TrJ16Q5Ej6OxHlAKNtrHrN2o=
cloud.o-forge.io/core/oc-lib v0.0.0-20260113155325-5cdfc28d2f51/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=