package daemons import ( "encoding/json" "fmt" "time" "oc-scheduler/logger" "oc-scheduler/models" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "github.com/nats-io/nats.go" "go.mongodb.org/mongo-driver/bson/primitive" ) // NATS daemon listens to subject " workflowsUpdate " // workflowsUpdate messages must be formatted following this pattern '{"workflow" : "", "start_date" : "", "stop_date" : "" }' type ScheduleManager struct { Api_url string bookings *models.ScheduledBooking ws models.HttpQuery } func (s *ScheduleManager) SetBookings(b *models.ScheduledBooking){ s.bookings = b } // Goroutine listening to a NATS server for updates // on workflows' scheduling. Messages must contain // workflow execution ID, to allow retrieval of execution infos func (s *ScheduleManager) ListenForWorkflowSubmissions(){ if(s.bookings == nil){ logger.Logger.Error().Msg("booking has not been set in the schedule manager") } nc, err := nats.Connect(nats.DefaultURL) if err != nil { logger.Logger.Error().Msg("Could not connect to NATS") return } defer nc.Close() ch := make(chan *nats.Msg, 64) subs , err := nc.ChanSubscribe("workflowsUpdate", ch) if err != nil { logger.Logger.Error().Msg("Error listening to NATS") } defer subs.Unsubscribe() for msg := range(ch){ fmt.Println("Waiting...") map_mess := retrieveMapFromSub(msg.Data) s.bookings.Mu.Lock() wf_exec := getWorkflowExecution(map_mess["workflow"]) s.bookings.AddSchedule(models.Booking{Workflow: map_mess["workflow"], Start: *wf_exec.ExecDate, Stop: *wf_exec.EndDate }) s.bookings.Mu.Unlock() } } func getWorkflowExecution(exec_id string) *workflow_execution.WorkflowExecution { res := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION),exec_id) if res.Code != 200 { logger.Logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id) return nil } wf_exec := res.ToWorkflowExecution() return wf_exec } // At the moment very simplistic, but could be useful if we send bigger messages func retrieveMapFromSub(message []byte) (result_map map[string]string) { json.Unmarshal(message, &result_map) return } // Used at launch of the component to retrieve the next scheduled workflows // and then every X minutes in case some workflows were scheduled before launch func (s *ScheduleManager) SchedulePolling (){ var sleep_time float64 = 1 for(true){ s.getNextScheduledWorkflows(3) logger.Logger.Info().Msg("Current list of schedules") fmt.Println(s.bookings.Bookings) time.Sleep(time.Minute * time.Duration(sleep_time)) } } func (s *ScheduleManager) getWorfklowExecution(from time.Time, to time.Time) (exec_list []workflow_execution.WorkflowExecution, err error) { f := dbs.Filters{ And: map[string][]dbs.Filter{ "execution_date" : {{Operator : dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(from)}, {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(to)}}, "state": {{Operator: dbs.EQUAL.String(), Value: 1}}, }, } res := oclib.Search(&f,"",oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION)) if res.Code != 200 { logger.Logger.Error().Msg("Error loading") return nil, nil } for _, exec := range(res.Data){ lib_data := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION),exec.GetID()) exec_obj := lib_data.ToWorkflowExecution() exec_list = append(exec_list, *exec_obj) } return exec_list, nil } // TODO : refactor to implement oclib.Search func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) { start := time.Now().UTC() end := start.Add(time.Minute * time.Duration(minutes)).UTC() fmt.Printf("Getting workflows execution from %s to %s \n", start.String(), end.String()) next_wf_exec, err := s.getWorfklowExecution(start,end) if err != nil { logger.Logger.Error().Msg("Could not retrieve next schedules") return } s.bookings.Mu.Lock() defer s.bookings.Mu.Unlock() for _, exec := range(next_wf_exec){ exec_start := exec.ExecDate exec_stop := exec.EndDate s.bookings.AddSchedule(models.Booking{Workflow: exec.UUID, Start: *exec_start, Stop: *exec_stop}) } }