package daemons import ( "encoding/json" "fmt" "oc-schedulerd/conf" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" "github.com/nats-io/nats.go" "github.com/rs/zerolog" "go.mongodb.org/mongo-driver/bson/primitive" ) type ScheduledBooking struct { Bookings []*workflow_execution.WorkflowExecution Mu sync.Mutex } func (sb *ScheduledBooking) DeleteSchedules(workflow_id string) { toNotDelete := []*workflow_execution.WorkflowExecution{} for _, b := range sb.Bookings { if b.WorkflowID != workflow_id { toNotDelete = append(toNotDelete, b) } } Bookings.Mu.Lock() defer Bookings.Mu.Unlock() sb.Bookings = toNotDelete } func (sb *ScheduledBooking) AddSchedules(new_bookings []*workflow_execution.WorkflowExecution, logger zerolog.Logger) { Bookings.Mu.Lock() defer Bookings.Mu.Unlock() for _, exec := range new_bookings { if !sb.execIsSet(exec) { sb.Bookings = append(sb.Bookings, exec) } } } func (sb *ScheduledBooking) execIsSet(exec *workflow_execution.WorkflowExecution) bool { for _, b := range sb.Bookings { if b.Equals(exec) { return true } } return false } // NATS daemon listens to subject " workflowsUpdate " // workflowsUpdate messages must be formatted following this pattern '{"workflow" : "", "start_date" : "", "stop_date" : "" }' type ScheduleManager struct { Logger zerolog.Logger } // Goroutine listening to a NATS server for updates // on workflows' scheduling. Messages must contain // workflow execution ID, to allow retrieval of execution infos func (s *ScheduleManager) ListenNATS() { nc, err := nats.Connect(conf.GetConfig().NatsUrl) if err != nil { s.Logger.Error().Msg("Could not connect to NATS") return } defer nc.Close() var wg sync.WaitGroup wg.Add(2) go s.listenForChange(nc, tools.REMOVE.GenerateKey(oclib.WORKFLOW.String()), true, wg) go s.listenForChange(nc, tools.CREATE.GenerateKey(oclib.WORKFLOW.String()), false, wg) wg.Wait() } // Goroutine listening to a NATS server for updates // on workflows' scheduling. Messages must contain // workflow execution ID, to allow retrieval of execution infos func (s *ScheduleManager) listenForChange(nc *nats.Conn, chanName string, delete bool, wg sync.WaitGroup) { defer wg.Done() ch := make(chan *nats.Msg, 64) fmt.Println("Listening to " + chanName) subs, err := nc.ChanSubscribe(chanName, ch) if err != nil { s.Logger.Error().Msg("Error listening to NATS : " + err.Error()) } defer subs.Unsubscribe() for msg := range ch { map_mess := map[string]string{} json.Unmarshal(msg.Data, &map_mess) str := "new" if delete { str = "deleted" } fmt.Println("Catching " + str + " workflow... " + map_mess["id"]) if delete { Bookings.DeleteSchedules(map_mess["id"]) } else { s.getNextScheduledWorkflows(1) } } } // Used at launch of the component to retrieve the next scheduled workflows // and then every X minutes in case some workflows were scheduled before launch func (s *ScheduleManager) SchedulePolling() { var sleep_time float64 = 1 for { s.getNextScheduledWorkflows(1) s.Logger.Info().Msg("Current list of schedules -------> " + fmt.Sprintf("%v", len(Bookings.Bookings))) time.Sleep(time.Minute * time.Duration(sleep_time)) } } func (s *ScheduleManager) getExecution(from time.Time, to time.Time) (exec_list []*workflow_execution.WorkflowExecution, err error) { fmt.Printf("Getting workflows execution from %s to %s \n", from.String(), to.String()) f := dbs.Filters{ And: map[string][]dbs.Filter{ "execution_date": {{Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(from)}, {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(to)}}, "state": {{Operator: dbs.EQUAL.String(), Value: 1}}, }, } res := oclib.Search(&f, "", oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION)) if res.Code != 200 { s.Logger.Error().Msg("Error loading") return } for _, exec := range res.Data { exec_list = append(exec_list, exec.(*workflow_execution.WorkflowExecution)) } return } func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) { start := time.Now().UTC() if next_wf_exec, err := s.getExecution( start.Add(time.Second*time.Duration(-1)).UTC(), start.Add(time.Minute*time.Duration(minutes)).UTC(), ); err != nil { s.Logger.Error().Msg("Could not retrieve next schedules") } else { Bookings.AddSchedules(next_wf_exec, s.Logger) } }