package daemons import ( "encoding/json" "fmt" "net/url" "time" "oc-scheduler/logger" "oc-scheduler/models" "github.com/nats-io/nats.go" ) type ScheduleManager struct { Api_url string list models.ScheduledBooking ws HttpQuery } // Goroutine listening to a NATS server for updates // on workflows' scheduling. Messages must contain // workflow's name, startDate and stopDate while there // is no way to get scheduling infos for a specific workflow func (s *ScheduleManager) ListenWorkflowSubmissions(){ nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() ch := make(chan *nats.Msg, 64) subs , err := nc.ChanSubscribe("workflowsUpdate", ch) if err != nil { logger.Logger.Fatal().Msg("Error listening to NATS") } defer subs.Unsubscribe() for msg := range(ch){ fmt.Println("Waiting...") map_mess := retrieveMapFromSub(msg.Data) s.list.Mu.Lock() start, err := time.Parse(time.RFC3339,map_mess["startDate"]) if err != nil{ logger.Logger.Error().Msg(err.Error()) } stop, err := time.Parse(time.RFC3339,map_mess["stopDate"]) if err != nil{ logger.Logger.Error().Msg(err.Error()) } s.list.AddSchedule(models.Booking{Workflow: map_mess["workflow"], Start: start, Stop: stop }) s.list.Mu.Unlock() } } func retrieveMapFromSub(message []byte) (result_map map[string]string) { json.Unmarshal(message, &result_map) return } // Used at launch of the component to retrieve the next scheduled workflows // and then every X minutes in case some workflows were scheduled before launch func (s *ScheduleManager) RetrieveScheduling (){ for(true){ err := s.getNextScheduledWorkflows(s.Api_url, 0.3) if err != nil { logger.Logger.Fatal().Msg("Failed to get the workspaces list, check api url and that api server is up : " + s.Api_url) } logger.Logger.Info().Msg("Current list of schedules") fmt.Println(s.list.Bookings) time.Sleep(time.Minute * 5) } } func (s *ScheduleManager) getNextScheduledWorkflows(apiurl string, hours float64) (error) { s.ws.Init(apiurl) params := url.Values{} start := time.Now().UTC() params.Add("startDate", start.Format(time.RFC3339)) time_span := time.Hour * time.Duration(hours) params.Add("stopDate",start.Add(time_span).Format(time.RFC3339)) body, err := s.ws.Get("v1/schedule?" + params.Encode()) if err != nil { return err } var workflows []map[string]string json.Unmarshal(body,&workflows) s.list.Mu.Lock() defer s.list.Mu.Unlock() for _, workflow := range(workflows){ start, _ := time.Parse(time.RFC3339,workflow["StartDate"]) stop, _ := time.Parse(time.RFC3339,workflow["StopDate"]) s.list.AddSchedule(models.Booking{Workflow: workflow["Workflow"], Start: start, Stop: stop}) } return nil }