package daemons import ( "encoding/json" "fmt" "oc-schedulerd/conf" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "github.com/nats-io/nats.go" "github.com/rs/zerolog" "go.mongodb.org/mongo-driver/bson/primitive" ) type Booking struct { Start time.Time Stop time.Time Duration uint Workflow string } func (s Booking) Equals(other Booking) bool { return s.Workflow == other.Workflow && s.Start == other.Start && s.Stop == other.Stop } func (b *Booking) String() string { return fmt.Sprintf("{workflow : %s , start_date : %s , stop_date : %s }", b.Workflow, b.Start.Format(time.RFC3339), b.Stop.Format(time.RFC3339)) } type ScheduledBooking struct { Bookings []Booking Mu sync.Mutex } func (sb *ScheduledBooking) AddSchedule(new_booking Booking, logger zerolog.Logger) { if !sb.scheduleAlreadyExists(new_booking) { sb.Bookings = append(sb.Bookings, new_booking) logger.Info().Msg("Updated list schedules : \n " + sb.String()) } logger.Debug().Msg("Workflow received not added") } func (sb *ScheduledBooking) scheduleAlreadyExists(new_booking Booking) bool { for _, booking := range sb.Bookings { if booking.Equals(new_booking) { return true } } return false } func (sb *ScheduledBooking) String() string { var str string for _, booking := range sb.Bookings { str += fmt.Sprintf("%s\n", booking.String()) } return str } // NATS daemon listens to subject " workflowsUpdate " // workflowsUpdate messages must be formatted following this pattern '{"workflow" : "", "start_date" : "", "stop_date" : "" }' type ScheduleManager struct { Logger zerolog.Logger bookings ScheduledBooking } // Goroutine listening to a NATS server for updates // on workflows' scheduling. Messages must contain // workflow execution ID, to allow retrieval of execution infos func (s *ScheduleManager) ListenForWorkflowSubmissions() { nc, err := nats.Connect(conf.GetConfig().NatsUrl) if err != nil { s.Logger.Error().Msg("Could not connect to NATS") return } defer nc.Close() ch := make(chan *nats.Msg, 64) subs, err := nc.ChanSubscribe("workflowsUpdate", ch) if err != nil { s.Logger.Error().Msg("Error listening to NATS") } defer subs.Unsubscribe() for msg := range ch { fmt.Println("Waiting...") map_mess := retrieveMapFromSub(msg.Data) s.bookings.Mu.Lock() wf_exec := s.getWorkflowExecution(map_mess["workflow"]) s.bookings.AddSchedule(Booking{Workflow: map_mess["workflow"], Start: *wf_exec.ExecDate, Stop: *wf_exec.EndDate}, s.Logger) s.bookings.Mu.Unlock() } } func (s *ScheduleManager) getWorkflowExecution(exec_id string) *workflow_execution.WorkflowExecution { res := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), exec_id) if res.Code != 200 { s.Logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id) return nil } wf_exec := res.ToWorkflowExecution() return wf_exec } // At the moment very simplistic, but could be useful if we send bigger messages func retrieveMapFromSub(message []byte) (result_map map[string]string) { json.Unmarshal(message, &result_map) return } // Used at launch of the component to retrieve the next scheduled workflows // and then every X minutes in case some workflows were scheduled before launch func (s *ScheduleManager) SchedulePolling() { var sleep_time float64 = 1 for { s.getNextScheduledWorkflows(3) s.Logger.Info().Msg("Current list of schedules") fmt.Println(s.bookings.Bookings) time.Sleep(time.Minute * time.Duration(sleep_time)) } } func (s *ScheduleManager) getWorfklowExecution(from time.Time, to time.Time) (exec_list []workflow_execution.WorkflowExecution, err error) { f := dbs.Filters{ And: map[string][]dbs.Filter{ "execution_date": {{Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(from)}, {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(to)}}, "state": {{Operator: dbs.EQUAL.String(), Value: 1}}, }, } res := oclib.Search(&f, "", oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION)) if res.Code != 200 { s.Logger.Error().Msg("Error loading") return nil, nil } for _, exec := range res.Data { lib_data := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), exec.GetID()) exec_obj := lib_data.ToWorkflowExecution() exec_list = append(exec_list, *exec_obj) } return exec_list, nil } func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) { start := time.Now().UTC() end := start.Add(time.Minute * time.Duration(minutes)).UTC() fmt.Printf("Getting workflows execution from %s to %s \n", start.String(), end.String()) next_wf_exec, err := s.getWorfklowExecution(start, end) if err != nil { s.Logger.Error().Msg("Could not retrieve next schedules") return } s.bookings.Mu.Lock() defer s.bookings.Mu.Unlock() for _, exec := range next_wf_exec { exec_start := exec.ExecDate exec_stop := exec.EndDate s.bookings.AddSchedule(Booking{Workflow: exec.UUID, Start: *exec_start, Stop: *exec_stop}, s.Logger) } }