Event Scheduling/Listening

This commit is contained in:
mr
2026-01-14 15:16:19 +01:00
parent 8bbbd1dcfb
commit d267c88fb5
4 changed files with 74 additions and 14 deletions

View File

@@ -9,8 +9,11 @@ import (
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"github.com/rs/zerolog"
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -67,9 +70,10 @@ func (s *ScheduleManager) ListenNATS() {
}
defer nc.Close()
var wg sync.WaitGroup
wg.Add(2)
go s.listenForChange(nc, tools.REMOVE.GenerateKey(oclib.WORKFLOW.String()), true, wg)
go s.listenForChange(nc, tools.CREATE.GenerateKey(oclib.WORKFLOW.String()), false, wg)
wg.Add(3)
go s.listenForChange(nc, tools.REMOVE.GenerateKey(oclib.WORKFLOW.String()), tools.REMOVE, wg)
go s.listenForChange(nc, tools.CREATE.GenerateKey(oclib.WORKFLOW.String()), tools.CREATE, wg)
go s.listenForChange(nc, tools.WORKFLOW_EVENT.GenerateKey(oclib.WORKFLOW.String()), tools.WORKFLOW_EVENT, wg)
wg.Wait()
break
}
@@ -78,7 +82,7 @@ func (s *ScheduleManager) ListenNATS() {
// Goroutine listening to a NATS server for updates
// on workflows' scheduling. Messages must contain
// workflow execution ID, to allow retrieval of execution infos
func (s *ScheduleManager) listenForChange(nc *nats.Conn, chanName string, delete bool, wg sync.WaitGroup) {
func (s *ScheduleManager) listenForChange(nc *nats.Conn, chanName string, natsTools tools.NATSMethod, wg sync.WaitGroup) {
defer wg.Done()
ch := make(chan *nats.Msg, 64)
fmt.Println("Listening to " + chanName)
@@ -92,14 +96,17 @@ func (s *ScheduleManager) listenForChange(nc *nats.Conn, chanName string, delete
map_mess := map[string]string{}
json.Unmarshal(msg.Data, &map_mess)
str := "new"
if delete {
if natsTools == tools.NATSMethod(tools.DELETE) {
str = "deleted"
}
fmt.Println("Catching " + str + " workflow... " + map_mess["id"])
if delete {
switch natsTools {
case tools.NATSMethod(tools.DELETE):
Executions.DeleteSchedules(map_mess["id"])
} else {
case tools.NATSMethod(tools.CREATE):
s.getNextScheduledWorkflows(1)
case tools.NATSMethod(tools.WORKFLOW_EVENT):
s.executeWorkflow(map_mess["workflow_id"])
}
}
}
@@ -134,6 +141,21 @@ func (s *ScheduleManager) getExecution(from time.Time, to time.Time) (exec_list
return
}
func (s *ScheduleManager) executeWorkflow(wfID string) {
res := resources.WorkflowResource{}
access := res.GetAccessor(&tools.APIRequest{})
if d, code, err := access.LoadOne(wfID); code == 200 && err == nil {
eventExec := &workflow_execution.WorkflowExecution{
WorkflowID: d.GetID(),
ExecDate: time.Now(),
ExecutionsID: uuid.New().String(),
State: enum.SCHEDULED,
}
Executions.AddSchedules([]*workflow_execution.WorkflowExecution{eventExec}, s.Logger)
}
}
func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) {
start := time.Now().UTC()
fmt.Println(s.getExecution(