This commit is contained in:
mr
2024-09-27 10:10:00 +02:00
parent 47570d9423
commit 6ab737c915
5 changed files with 52 additions and 17 deletions

View File

@@ -37,15 +37,29 @@ func (sb *ScheduledBooking) AddSchedules(new_bookings []*workflow_execution.Work
Bookings.Mu.Lock()
defer Bookings.Mu.Unlock()
for _, exec := range new_bookings {
sb.Bookings = append(sb.Bookings , exec)
if !sb.execIsSet(exec) {
sb.Bookings = append(sb.Bookings, exec)
}
}
}
func (sb *ScheduledBooking) execIsSet(exec *workflow_execution.WorkflowExecution) bool {
for _, b := range sb.Bookings {
if b.Equals(exec) {
return true
}
}
return false
}
// NATS daemon listens to subject " workflowsUpdate "
// workflowsUpdate messages must be formatted following this pattern '{"workflow" : "", "start_date" : "", "stop_date" : "" }'
type ScheduleManager struct {
Logger zerolog.Logger
}
// Goroutine listening to a NATS server for updates
// on workflows' scheduling. Messages must contain
// workflow execution ID, to allow retrieval of execution infos
@@ -92,6 +106,7 @@ func (s *ScheduleManager) listenForChange(nc *nats.Conn, chanName string, delete
}
}
}
// Used at launch of the component to retrieve the next scheduled workflows
// and then every X minutes in case some workflows were scheduled before launch
func (s *ScheduleManager) SchedulePolling() {
@@ -124,8 +139,8 @@ func (s *ScheduleManager) getExecution(from time.Time, to time.Time) (exec_list
func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) {
start := time.Now().UTC()
if next_wf_exec, err := s.getExecution(
start.Add(time.Second * time.Duration(-1)).UTC(),
start.Add(time.Minute * time.Duration(minutes)).UTC(),
start.Add(time.Second*time.Duration(-1)).UTC(),
start.Add(time.Minute*time.Duration(minutes)).UTC(),
); err != nil {
s.Logger.Error().Msg("Could not retrieve next schedules")
} else {