package daemons import ( "encoding/json" "fmt" "oc-schedulerd/conf" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" "github.com/nats-io/nats.go" "github.com/rs/zerolog" "go.mongodb.org/mongo-driver/bson/primitive" ) type ScheduledExecution struct { Execs []*workflow_execution.WorkflowExecution Mu sync.Mutex } func (sb *ScheduledExecution) DeleteSchedules(workflow_id string) { toNotDelete := []*workflow_execution.WorkflowExecution{} for _, b := range sb.Execs { if b.WorkflowID != workflow_id { toNotDelete = append(toNotDelete, b) } } Executions.Mu.Lock() defer Executions.Mu.Unlock() sb.Execs = toNotDelete } func (sb *ScheduledExecution) AddSchedules(new_executions []*workflow_execution.WorkflowExecution, logger zerolog.Logger) { Executions.Mu.Lock() defer Executions.Mu.Unlock() for _, exec := range new_executions { fmt.Println("Adding "+exec.UUID, !sb.execIsSet(exec)) if !sb.execIsSet(exec) { sb.Execs = append(sb.Execs, exec) } } } func (sb *ScheduledExecution) execIsSet(exec *workflow_execution.WorkflowExecution) bool { for _, b := range sb.Execs { if b.Equals(exec) { return true } } return false } // NATS daemon listens to subject " workflowsUpdate " // workflowsUpdate messages must be formatted following this pattern '{"workflow" : "", "start_date" : "", "stop_date" : "" }' type ScheduleManager struct { Logger zerolog.Logger } // Goroutine listening to a NATS server for updates // on workflows' scheduling. Messages must contain // workflow execution ID, to allow retrieval of execution infos func (s *ScheduleManager) ListenNATS() { nc, err := nats.Connect(conf.GetConfig().NatsUrl) if err != nil { s.Logger.Error().Msg("Could not connect to NATS") return } defer nc.Close() var wg sync.WaitGroup wg.Add(2) go s.listenForChange(nc, tools.REMOVE.GenerateKey(oclib.WORKFLOW.String()), true, wg) go s.listenForChange(nc, tools.CREATE.GenerateKey(oclib.WORKFLOW.String()), false, wg) wg.Wait() } // Goroutine listening to a NATS server for updates // on workflows' scheduling. Messages must contain // workflow execution ID, to allow retrieval of execution infos func (s *ScheduleManager) listenForChange(nc *nats.Conn, chanName string, delete bool, wg sync.WaitGroup) { defer wg.Done() ch := make(chan *nats.Msg, 64) fmt.Println("Listening to " + chanName) subs, err := nc.ChanSubscribe(chanName, ch) if err != nil { s.Logger.Error().Msg("Error listening to NATS : " + err.Error()) } defer subs.Unsubscribe() for msg := range ch { map_mess := map[string]string{} json.Unmarshal(msg.Data, &map_mess) str := "new" if delete { str = "deleted" } fmt.Println("Catching " + str + " workflow... " + map_mess["id"]) if delete { Executions.DeleteSchedules(map_mess["id"]) } else { s.getNextScheduledWorkflows(1) } } } // Used at launch of the component to retrieve the next scheduled workflows // and then every X minutes in case some workflows were scheduled before launch func (s *ScheduleManager) SchedulePolling() { var sleep_time float64 = 1 for { s.getNextScheduledWorkflows(1) s.Logger.Info().Msg("Current list of schedules -------> " + fmt.Sprintf("%v", len(Executions.Execs))) time.Sleep(time.Minute * time.Duration(sleep_time)) } } func (s *ScheduleManager) getExecution(from time.Time, to time.Time) (exec_list []*workflow_execution.WorkflowExecution, err error) { fmt.Printf("Getting workflows execution from %s to %s \n", from.String(), to.String()) f := dbs.Filters{ And: map[string][]dbs.Filter{ "execution_date": {{Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(from)}, {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(to)}}, "state": {{Operator: dbs.EQUAL.String(), Value: enum.SCHEDULED}}, }, } res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", "", []string{}, nil).Search(&f, "", false) if res.Code != 200 { s.Logger.Error().Msg("Error loading") return } for _, exec := range res.Data { exec_list = append(exec_list, exec.(*workflow_execution.WorkflowExecution)) } fmt.Println("Found "+fmt.Sprintf("%v", len(exec_list))+" workflows", res) return } func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) { start := time.Now().UTC() fmt.Println(s.getExecution( start.Add(time.Second*time.Duration(-1)).UTC(), start.Add(time.Minute*time.Duration(minutes)).UTC(), )) if next_wf_exec, err := s.getExecution( start.Add(time.Second*time.Duration(-1)).UTC(), start.Add(time.Minute*time.Duration(minutes)).UTC(), ); err != nil { s.Logger.Error().Msg("Could not retrieve next schedules") } else { Executions.AddSchedules(next_wf_exec, s.Logger) } }