package daemons import ( "encoding/json" "fmt" "net/url" "time" "oc-scheduler/logger" "oc-scheduler/models" "github.com/nats-io/nats.go" ) // NATS daemon listens to subject " workflowsUpdate " // workflowsUpdate messages must be formatted following this pattern '{"workflow" : "", "start_date" : "", "stop_date" : "" }' type ScheduleManager struct { Api_url string bookings *models.ScheduledBooking ws models.HttpQuery } func (s *ScheduleManager) SetBookings(b *models.ScheduledBooking){ s.bookings = b } // Goroutine listening to a NATS server for updates // on workflows' scheduling. Messages must contain // workflow's name, start_date and stop_date while there // is no way to get scheduling infos for a specific workflow func (s *ScheduleManager) ListenForWorkflowSubmissions(){ if(s.bookings == nil){ logger.Logger.Fatal().Msg("booking has not been set in the schedule manager") } nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() ch := make(chan *nats.Msg, 64) subs , err := nc.ChanSubscribe("workflowsUpdate", ch) if err != nil { logger.Logger.Fatal().Msg("Error listening to NATS") } defer subs.Unsubscribe() for msg := range(ch){ fmt.Println("Waiting...") map_mess := retrieveMapFromSub(msg.Data) s.bookings.Mu.Lock() start, err := time.Parse(time.RFC3339,map_mess["start_date"]) if err != nil{ logger.Logger.Error().Msg(err.Error()) } stop, err := time.Parse(time.RFC3339,map_mess["stop_date"]) if err != nil{ logger.Logger.Error().Msg(err.Error()) } s.bookings.AddSchedule(models.Booking{Workflow: map_mess["workflow"], Start: start, Stop: stop }) s.bookings.Mu.Unlock() } } // At the moment very simplistic, but could be useful if we send bigger messages func retrieveMapFromSub(message []byte) (result_map map[string]string) { json.Unmarshal(message, &result_map) return } // Used at launch of the component to retrieve the next scheduled workflows // and then every X minutes in case some workflows were scheduled before launch func (s *ScheduleManager) SchedulePolling (){ for(true){ err := s.getNextScheduledWorkflows(s.Api_url, 0.3) if err != nil { logger.Logger.Fatal().Msg("Failed to get the workspaces list, check api url and that api server is up : " + s.Api_url) } logger.Logger.Info().Msg("Current list of schedules") fmt.Println(s.bookings.Bookings) time.Sleep(time.Minute * 5) } } func (s *ScheduleManager) getNextScheduledWorkflows(apiurl string, hours float64) (error) { s.ws.Init(apiurl) params := url.Values{} start := time.Now().UTC() params.Add("start_date", start.Format(time.RFC3339)) time_span := time.Hour * time.Duration(hours) params.Add("stop_date",start.Add(time_span).Format(time.RFC3339)) body, err := s.ws.Get("v1/schedule?" + params.Encode()) if err != nil { return err } var workflows []map[string]string json.Unmarshal(body,&workflows) s.bookings.Mu.Lock() defer s.bookings.Mu.Unlock() for _, workflow := range(workflows){ start, _ := time.Parse(time.RFC3339,workflow["start_date"]) stop, _ := time.Parse(time.RFC3339,workflow["stop_date"]) s.bookings.AddSchedule(models.Booking{Workflow: workflow["Workflow"], Start: start, Stop: stop}) } return nil }