diff --git a/daemons/execute_monitor_local.go b/daemons/execute_monitor_local.go index 5c9fe28..ec062dc 100644 --- a/daemons/execute_monitor_local.go +++ b/daemons/execute_monitor_local.go @@ -1,6 +1,7 @@ package daemons import ( + "fmt" "oc-schedulerd/conf" "os/exec" @@ -20,15 +21,16 @@ func (lm *LocalMonitor) LaunchLocalMonitor() { } // For dev purposes, in prod KubeURL must be a kube API's URL - if lm.KubeURL == "localhost" { - lm.execLocalKube() - } else { + if lm.KubeURL != "localhost" { lm.execRemoteKube() + } else { + lm.execLocalKube() } } func (lm *LocalMonitor) execLocalKube() { cmd := exec.Command(conf.GetConfig().MonitorPath, "-w", lm.WorkflowName, "-u", lm.LokiURL, "-m", conf.GetConfig().MongoUrl, "-d", conf.GetConfig().DBName) + fmt.Println("CMD", cmd) err := cmd.Start() if err != nil { lm.Logger.Error().Msg("Could not start oc-monitor for " + lm.WorkflowName + " : " + err.Error()) diff --git a/daemons/execution_manager.go b/daemons/execution_manager.go index 547743d..80fa1af 100644 --- a/daemons/execution_manager.go +++ b/daemons/execution_manager.go @@ -8,9 +8,9 @@ import ( oclib "cloud.o-forge.io/core/oc-lib" ) -type ExecutionManager struct { - bookings ScheduledBooking -} +var Bookings = ScheduledBooking{Bookings: []Booking{}} + +type ExecutionManager struct{} // Loop every second on the booking's list and move the booking that must start to a new list // that will be looped over to start them @@ -18,20 +18,18 @@ func (em *ExecutionManager) RetrieveNextExecutions() { logger := oclib.GetLogger() for { logger.Debug().Msg("New loop") - em.bookings.Mu.Lock() - bookings := em.bookings.Bookings - if len(bookings) > 0 { - for i := len(bookings) - 1; i >= 0; i-- { - logger.Debug().Msg("It should start at " + bookings[i].Start.String() + " and it is now " + time.Now().UTC().String()) - if bookings[i].Start.Before(time.Now().UTC()) { - logger.Info().Msg("Will execute " + bookings[i].Workflow + " soon") - go em.executeBooking(bookings[i]) - bookings = append(bookings[:i], bookings[i+1:]...) - em.bookings.Bookings = bookings + Bookings.Mu.Lock() + if len(Bookings.Bookings) > 0 { + for i := len(Bookings.Bookings) - 1; i >= 0; i-- { + logger.Debug().Msg("It should start at " + Bookings.Bookings[i].Start.String() + " and it is now " + time.Now().UTC().String()) + if Bookings.Bookings[i].Start.Before(time.Now().UTC()) { + logger.Info().Msg("Will execute " + Bookings.Bookings[i].Workflow + " soon") + go em.executeBooking(Bookings.Bookings[i]) + Bookings.Bookings = append(Bookings.Bookings[:i], Bookings.Bookings[i+1:]...) } } } - em.bookings.Mu.Unlock() + Bookings.Mu.Unlock() time.Sleep(time.Second) } } @@ -48,7 +46,8 @@ func (em *ExecutionManager) executeBooking(booking Booking) { monitor := LocalMonitor{ Logger: logger, LokiURL: conf.GetConfig().LokiUrl, KubeURL: "localhost", - WorkflowName: booking.Workflow} + WorkflowName: booking.Workflow, + } monitor.LaunchLocalMonitor() } } diff --git a/daemons/schedule_manager.go b/daemons/schedule_manager.go index f0fb5d7..aaac939 100644 --- a/daemons/schedule_manager.go +++ b/daemons/schedule_manager.go @@ -16,19 +16,27 @@ import ( ) type Booking struct { - Start time.Time - Stop time.Time + Start *time.Time + Stop *time.Time Duration uint Workflow string } func (s Booking) Equals(other Booking) bool { + fmt.Println(s, other) return s.Workflow == other.Workflow && s.Start == other.Start && s.Stop == other.Stop } func (b *Booking) String() string { - return fmt.Sprintf("{workflow : %s , start_date : %s , stop_date : %s }", b.Workflow, b.Start.Format(time.RFC3339), b.Stop.Format(time.RFC3339)) - + stop := "nil" + if b.Stop != nil { + stop = b.Stop.Format(time.RFC3339) + } + start := "nil" + if b.Start != nil { + start = b.Start.Format(time.RFC3339) + } + return fmt.Sprintf("{workflow : %s , start_date : %s , stop_date : %s }", b.Workflow, start, stop) } type ScheduledBooking struct { @@ -37,20 +45,17 @@ type ScheduledBooking struct { } func (sb *ScheduledBooking) AddSchedule(new_booking Booking, logger zerolog.Logger) { - if !sb.scheduleAlreadyExists(new_booking) { - sb.Bookings = append(sb.Bookings, new_booking) - logger.Info().Msg("Updated list schedules : \n " + sb.String()) - } - logger.Debug().Msg("Workflow received not added") -} - -func (sb *ScheduledBooking) scheduleAlreadyExists(new_booking Booking) bool { + found := false for _, booking := range sb.Bookings { if booking.Equals(new_booking) { - return true + found = true + break } } - return false + if !found { + sb.Bookings = append(sb.Bookings, new_booking) + logger.Info().Msg("Updated list schedules : \n " + new_booking.String()) + } } func (sb *ScheduledBooking) String() string { @@ -65,8 +70,7 @@ func (sb *ScheduledBooking) String() string { // workflowsUpdate messages must be formatted following this pattern '{"workflow" : "", "start_date" : "", "stop_date" : "" }' type ScheduleManager struct { - Logger zerolog.Logger - bookings ScheduledBooking + Logger zerolog.Logger } // Goroutine listening to a NATS server for updates @@ -90,32 +94,12 @@ func (s *ScheduleManager) ListenForWorkflowSubmissions() { defer subs.Unsubscribe() for msg := range ch { - fmt.Println("Waiting...") - map_mess := retrieveMapFromSub(msg.Data) - - s.bookings.Mu.Lock() - - wf_exec := s.getWorkflowExecution(map_mess["workflow"]) - - s.bookings.AddSchedule(Booking{Workflow: map_mess["workflow"], Start: *wf_exec.ExecDate, Stop: *wf_exec.EndDate}, s.Logger) - s.bookings.Mu.Unlock() - + fmt.Println("Catching new workflow... " + map_mess["workflow_id"]) + s.getNextScheduledWorkflows(1) } } -func (s *ScheduleManager) getWorkflowExecution(exec_id string) *workflow_execution.WorkflowExecution { - res := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), exec_id) - if res.Code != 200 { - s.Logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id) - return nil - } - - wf_exec := res.ToWorkflowExecution() - - return wf_exec -} - // At the moment very simplistic, but could be useful if we send bigger messages func retrieveMapFromSub(message []byte) (result_map map[string]string) { json.Unmarshal(message, &result_map) @@ -130,7 +114,7 @@ func (s *ScheduleManager) SchedulePolling() { s.getNextScheduledWorkflows(3) s.Logger.Info().Msg("Current list of schedules") - fmt.Println(s.bookings.Bookings) + fmt.Println(Bookings.Bookings) time.Sleep(time.Minute * time.Duration(sleep_time)) } @@ -158,7 +142,7 @@ func (s *ScheduleManager) getWorfklowExecution(from time.Time, to time.Time) (ex func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) { start := time.Now().UTC() - end := start.Add(time.Minute * time.Duration(minutes)).UTC() + end := start.Add(time.Minute * time.Duration(minutes+1)).UTC() fmt.Printf("Getting workflows execution from %s to %s \n", start.String(), end.String()) @@ -168,13 +152,10 @@ func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) { return } - s.bookings.Mu.Lock() - defer s.bookings.Mu.Unlock() + Bookings.Mu.Lock() + defer Bookings.Mu.Unlock() for _, exec := range next_wf_exec { - exec_start := exec.ExecDate - exec_stop := exec.EndDate - - s.bookings.AddSchedule(Booking{Workflow: exec.UUID, Start: *exec_start, Stop: *exec_stop}, s.Logger) + Bookings.AddSchedule(Booking{Workflow: exec.UUID, Start: exec.ExecDate, Stop: exec.EndDate}, s.Logger) } } diff --git a/oc-schedulerd b/oc-schedulerd index c836e7a..0ed07a8 100755 Binary files a/oc-schedulerd and b/oc-schedulerd differ