diff --git a/daemons/execute_monitor_local.go b/daemons/execute_monitor_local.go index f87f04e..9edbc33 100644 --- a/daemons/execute_monitor_local.go +++ b/daemons/execute_monitor_local.go @@ -11,13 +11,13 @@ import ( type LocalMonitor struct { LokiURL string KubeURL string - WorkflowName string + ExecutionID string Duration int Logger zerolog.Logger } func (lm *LocalMonitor) LaunchLocalMonitor() { - if lm.LokiURL == "" || lm.KubeURL == "" || lm.WorkflowName == "" { + if lm.LokiURL == "" || lm.KubeURL == "" || lm.ExecutionID == "" { lm.Logger.Error().Msg("Missing parameter in LocalMonitor") } @@ -30,7 +30,7 @@ func (lm *LocalMonitor) LaunchLocalMonitor() { } func (lm *LocalMonitor) execLocalKube() { - args := []string{"-w", lm.WorkflowName, "-u", lm.LokiURL, "-m", conf.GetConfig().MongoUrl, "-d", conf.GetConfig().DBName} + args := []string{"-w", lm.ExecutionID, "-u", lm.LokiURL, "-m", conf.GetConfig().MongoUrl, "-d", conf.GetConfig().DBName} if lm.Duration > 0 { args = append(args, "-t", fmt.Sprintf("%d", lm.Duration)) } @@ -38,7 +38,7 @@ func (lm *LocalMonitor) execLocalKube() { fmt.Println("CMD", cmd) err := cmd.Start() if err != nil { - lm.Logger.Error().Msg("Could not start oc-monitor for " + lm.WorkflowName + " : " + err.Error()) + lm.Logger.Error().Msg("Could not start oc-monitor for " + lm.ExecutionID + " : " + err.Error()) } } diff --git a/daemons/execution_manager.go b/daemons/execution_manager.go index fd1d77c..f43dc64 100644 --- a/daemons/execution_manager.go +++ b/daemons/execution_manager.go @@ -6,9 +6,10 @@ import ( "time" oclib "cloud.o-forge.io/core/oc-lib" + workflow_execution "cloud.o-forge.io/core/oc-lib/models/workflow_execution" ) -var Bookings = ScheduledBooking{Bookings: []Booking{}} +var Bookings = ScheduledBooking{Bookings: map[string]*workflow_execution.WorkflowExecution{}} type ExecutionManager struct{} @@ -20,12 +21,11 @@ func (em *ExecutionManager) RetrieveNextExecutions() { logger.Debug().Msg("New loop") Bookings.Mu.Lock() if len(Bookings.Bookings) > 0 { - for i := len(Bookings.Bookings) - 1; i >= 0; i-- { - logger.Debug().Msg("It should start at " + Bookings.Bookings[i].Start.String() + " and it is now " + time.Now().UTC().String()) - if Bookings.Bookings[i].Start.Before(time.Now().UTC()) { - logger.Info().Msg("Will execute " + Bookings.Bookings[i].Workflow + " soon") - go em.executeBooking(Bookings.Bookings[i]) - Bookings.Bookings = append(Bookings.Bookings[:i], Bookings.Bookings[i+1:]...) + for k, v := range Bookings.Bookings { + if v.ExecDate.Before(time.Now().UTC()) { + logger.Info().Msg("Will execute " + k + " soon") + go em.executeBooking(v) + delete(Bookings.Bookings, k) } } } @@ -34,7 +34,7 @@ func (em *ExecutionManager) RetrieveNextExecutions() { } } -func (em *ExecutionManager) executeBooking(booking Booking) { +func (em *ExecutionManager) executeBooking(booking *workflow_execution.WorkflowExecution) { // start execution // create the yaml that describes the pod : filename, path/url to Loki exec_method := os.Getenv("MONITOR_METHOD") @@ -44,15 +44,15 @@ func (em *ExecutionManager) executeBooking(booking Booking) { } else { logger.Debug().Msg("Executing oc-monitor localy") duration := 0 - if booking.Stop != nil && booking.Start != nil { - duration = int(booking.Stop.Sub(*booking.Start).Seconds()) + if booking.EndDate != nil && booking.ExecDate != nil { + duration = int(booking.EndDate.Sub(*booking.ExecDate).Seconds()) } monitor := LocalMonitor{ Logger: logger, Duration: duration, LokiURL: conf.GetConfig().LokiUrl, KubeURL: "localhost", - WorkflowName: booking.Workflow, + ExecutionID: booking.UUID, } monitor.LaunchLocalMonitor() } diff --git a/daemons/schedule_manager.go b/daemons/schedule_manager.go index c06be42..52e78be 100644 --- a/daemons/schedule_manager.go +++ b/daemons/schedule_manager.go @@ -6,7 +6,7 @@ import ( "oc-schedulerd/conf" "sync" "time" - + "cloud.o-forge.io/core/oc-lib/tools" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" @@ -15,108 +15,92 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" ) -type Booking struct { - Start *time.Time - Stop *time.Time - Duration uint - Workflow string -} - -func (s Booking) Equals(other Booking) bool { - return s.Workflow == other.Workflow && s.Start == other.Start -} - -func (b *Booking) String() string { - stop := "nil" - if b.Stop != nil { - stop = b.Stop.Format(time.RFC3339) - } - start := "nil" - if b.Start != nil { - start = b.Start.Format(time.RFC3339) - } - return fmt.Sprintf("{workflow : %s , start_date : %s , stop_date : %s }", b.Workflow, start, stop) -} - type ScheduledBooking struct { - Bookings []Booking + Bookings map[string]*workflow_execution.WorkflowExecution Mu sync.Mutex } -func (sb *ScheduledBooking) AddSchedule(new_booking Booking, logger zerolog.Logger) { - found := false - for _, booking := range sb.Bookings { - if booking.Equals(new_booking) { - found = true - break +func (sb *ScheduledBooking) DeleteSchedules(workflow_id string) { + toDelete := []string{} + for k, b := range sb.Bookings { + if b.WorkflowID == workflow_id { + toDelete = append(toDelete, k) } } - if !found { - sb.Bookings = append(sb.Bookings, new_booking) - logger.Info().Msg("Updated list schedules : \n " + new_booking.String()) + Bookings.Mu.Lock() + defer Bookings.Mu.Unlock() + for _, k := range toDelete { + delete(sb.Bookings, k) } } -func (sb *ScheduledBooking) String() string { - var str string - for _, booking := range sb.Bookings { - str += fmt.Sprintf("%s\n", booking.String()) +func (sb *ScheduledBooking) AddSchedules(new_bookings []*workflow_execution.WorkflowExecution, logger zerolog.Logger) { + Bookings.Mu.Lock() + defer Bookings.Mu.Unlock() + for _, exec := range new_bookings { + sb.Bookings[exec.GetID()] = exec } - return str } - // NATS daemon listens to subject " workflowsUpdate " // workflowsUpdate messages must be formatted following this pattern '{"workflow" : "", "start_date" : "", "stop_date" : "" }' type ScheduleManager struct { Logger zerolog.Logger } - // Goroutine listening to a NATS server for updates // on workflows' scheduling. Messages must contain // workflow execution ID, to allow retrieval of execution infos -func (s *ScheduleManager) ListenForWorkflowSubmissions() { +func (s *ScheduleManager) ListenNATS() { nc, err := nats.Connect(conf.GetConfig().NatsUrl) if err != nil { s.Logger.Error().Msg("Could not connect to NATS") return } - defer nc.Close() + var wg sync.WaitGroup + wg.Add(2) + go s.listenForChange(nc, tools.REMOVE.GenerateKey(oclib.WORKFLOW.String()), true, wg) + go s.listenForChange(nc, tools.CREATE.GenerateKey(oclib.WORKFLOW.String()), false, wg) + wg.Wait() +} + +// Goroutine listening to a NATS server for updates +// on workflows' scheduling. Messages must contain +// workflow execution ID, to allow retrieval of execution infos +func (s *ScheduleManager) listenForChange(nc *nats.Conn, chanName string, delete bool, wg sync.WaitGroup) { + defer wg.Done() ch := make(chan *nats.Msg, 64) - - subs, err := nc.ChanSubscribe("workflowsUpdate", ch) + fmt.Println("Listening to " + chanName) + subs, err := nc.ChanSubscribe(chanName, ch) if err != nil { - s.Logger.Error().Msg("Error listening to NATS") + s.Logger.Error().Msg("Error listening to NATS : " + err.Error()) } defer subs.Unsubscribe() for msg := range ch { - map_mess := retrieveMapFromSub(msg.Data) - fmt.Println("Catching new workflow... " + map_mess["workflow_id"]) - s.getNextScheduledWorkflows(1) + map_mess := map[string]string{} + json.Unmarshal(msg.Data, &map_mess) + fmt.Println("Catching new workflow... " + map_mess["id"]) + if delete { + Bookings.DeleteSchedules(map_mess["id"]) + } else { + s.getNextScheduledWorkflows(1) + } } } - -// At the moment very simplistic, but could be useful if we send bigger messages -func retrieveMapFromSub(message []byte) (result_map map[string]string) { - json.Unmarshal(message, &result_map) - return -} - // Used at launch of the component to retrieve the next scheduled workflows // and then every X minutes in case some workflows were scheduled before launch func (s *ScheduleManager) SchedulePolling() { var sleep_time float64 = 1 for { s.getNextScheduledWorkflows(1) - s.Logger.Info().Msg("Current list of schedules -------> " + fmt.Sprintf("%v", len(Bookings.Bookings))) time.Sleep(time.Minute * time.Duration(sleep_time)) } } -func (s *ScheduleManager) getWorfklowExecution(from time.Time, to time.Time) (exec_list []workflow_execution.WorkflowExecution, err error) { +func (s *ScheduleManager) getExecution(from time.Time, to time.Time) (exec_list []*workflow_execution.WorkflowExecution, err error) { + fmt.Printf("Getting workflows execution from %s to %s \n", from.String(), to.String()) f := dbs.Filters{ And: map[string][]dbs.Filter{ "execution_date": {{Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(from)}, {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(to)}}, @@ -126,33 +110,22 @@ func (s *ScheduleManager) getWorfklowExecution(from time.Time, to time.Time) (ex res := oclib.Search(&f, "", oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION)) if res.Code != 200 { s.Logger.Error().Msg("Error loading") - return nil, nil + return } - for _, exec := range res.Data { - lib_data := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), exec.GetID()) - exec_obj := lib_data.ToWorkflowExecution() - exec_list = append(exec_list, *exec_obj) + exec_list = append(exec_list, exec.(*workflow_execution.WorkflowExecution)) } - return exec_list, nil + return } func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) { start := time.Now().UTC() - end := start.Add(time.Minute * time.Duration(minutes)).UTC() - start = start.Add(time.Second * time.Duration(-1)).UTC() - fmt.Printf("Getting workflows execution from %s to %s \n", start.String(), end.String()) - - next_wf_exec, err := s.getWorfklowExecution(start, end) - if err != nil { + if next_wf_exec, err := s.getExecution( + start.Add(time.Second * time.Duration(-1)).UTC(), + start.Add(time.Minute * time.Duration(minutes)).UTC(), + ); err != nil { s.Logger.Error().Msg("Could not retrieve next schedules") - return - } - - Bookings.Mu.Lock() - defer Bookings.Mu.Unlock() - - for _, exec := range next_wf_exec { - Bookings.AddSchedule(Booking{Workflow: exec.UUID, Start: exec.ExecDate, Stop: exec.EndDate}, s.Logger) + } else { + Bookings.AddSchedules(next_wf_exec, s.Logger) } } diff --git a/go.mod b/go.mod index 91879bc..5b8b474 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/beego/beego v1.12.12 github.com/beego/beego/v2 v2.2.2 github.com/goraz/onion v0.1.3 - github.com/nats-io/nats.go v1.9.1 + github.com/nats-io/nats.go v1.37.0 github.com/nwtgck/go-fakelish v0.1.3 github.com/rs/zerolog v1.33.0 github.com/tidwall/gjson v1.17.1 @@ -17,7 +17,7 @@ require ( ) require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20240812075555-6e3069068ce4 // indirect + cloud.o-forge.io/core/oc-lib v0.0.0-20240821093044-f64563c9ff06 // indirect github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7 // indirect github.com/antihax/optional v1.0.0 // indirect github.com/aws/aws-sdk-go v1.36.29 // indirect @@ -56,7 +56,7 @@ require ( github.com/montanaflynn/stats v0.7.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nats-io/jwt v0.3.2 // indirect - github.com/nats-io/nkeys v0.1.3 // indirect + github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/prometheus/client_golang v1.19.0 // indirect diff --git a/go.sum b/go.sum index 96f8895..911c1e4 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20240808075405-f45ad91687c4 h1:3xqz2s6r/PONq cloud.o-forge.io/core/oc-lib v0.0.0-20240808075405-f45ad91687c4/go.mod h1:V5EL+NV2s9P1/BcFm3/icfLeBYVVMLl1Z0F0eecJZGo= cloud.o-forge.io/core/oc-lib v0.0.0-20240812075555-6e3069068ce4 h1:fdxRsT4eR4v1DM3FpTPi9AKxB5oIw3XgLu9ByNipj4I= cloud.o-forge.io/core/oc-lib v0.0.0-20240812075555-6e3069068ce4/go.mod h1:V5EL+NV2s9P1/BcFm3/icfLeBYVVMLl1Z0F0eecJZGo= +cloud.o-forge.io/core/oc-lib v0.0.0-20240821093044-f64563c9ff06 h1:sYveE1C/0mpSr+ZmOYxuZ3fTWID7mr5hPiq0jQenv3Q= +cloud.o-forge.io/core/oc-lib v0.0.0-20240821093044-f64563c9ff06/go.mod h1:1hhYh5QWAbYw9cKplQ0ZD9PMgU8t6gPqiYF8sldv1HU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7 h1:n0MD6UkwbgGHtXsmfgVzC2+ZbHzIsScpbq9ZGI18074= github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7/go.mod h1:xdrQDwHlKUmv8yiElMx6W0W10cLkqpeSEUUib8KGtv4= @@ -370,9 +372,13 @@ github.com/nats-io/nats-server/v2 v2.1.2 h1:i2Ly0B+1+rzNZHHWtD4ZwKi+OU5l+uQo1iDH github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= +github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= +github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= +github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= +github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= diff --git a/main.go b/main.go index 3eb18cc..481c502 100644 --- a/main.go +++ b/main.go @@ -5,18 +5,18 @@ import ( conf "oc-schedulerd/conf" "oc-schedulerd/daemons" - + "cloud.o-forge.io/core/oc-lib/tools" oclib "cloud.o-forge.io/core/oc-lib" ) func main() { - oclib.SetConfig(conf.GetConfig().MongoUrl, "DC_myDC") + tools.SetConfig(conf.GetConfig().MongoUrl, "DC_myDC", "") oclib.Init("oc-schedulerd") sch_mngr := daemons.ScheduleManager{Logger: oclib.GetLogger()} exe_mngr := daemons.ExecutionManager{} - go sch_mngr.ListenForWorkflowSubmissions() + go sch_mngr.ListenNATS() go sch_mngr.SchedulePolling() exe_mngr.RetrieveNextExecutions() diff --git a/oc-schedulerd b/oc-schedulerd index 11b5b43..1e01344 100755 Binary files a/oc-schedulerd and b/oc-schedulerd differ