From cd66000bb95ff6e87b97e464e10f4ab12f899e56 Mon Sep 17 00:00:00 2001 From: pb Date: Thu, 11 Jul 2024 18:25:40 +0200 Subject: [PATCH] restructuration --- daemons/execution_manager.go | 12 ++++ http.go => daemons/http.go | 2 +- daemons/schedule_manager.go | 116 +++++++++++++++++++++++++++++++++++ logger/logger.go | 15 +++++ main.go | 49 ++++++++------- 5 files changed, 172 insertions(+), 22 deletions(-) create mode 100644 daemons/execution_manager.go rename http.go => daemons/http.go (98%) create mode 100644 daemons/schedule_manager.go create mode 100644 logger/logger.go diff --git a/daemons/execution_manager.go b/daemons/execution_manager.go new file mode 100644 index 0000000..fd8ac7c --- /dev/null +++ b/daemons/execution_manager.go @@ -0,0 +1,12 @@ +package daemons + +import "oc-scheduler/models" + +type ExecutionManager struct { + Bookings models.ScheduledBooking +} + +func (em *ExecutionManager) test(){ + em.Bookings.Mu.Lock() + defer em.Bookings.Mu.Unlock() +} \ No newline at end of file diff --git a/http.go b/daemons/http.go similarity index 98% rename from http.go rename to daemons/http.go index 9df57e9..b34c6d3 100644 --- a/http.go +++ b/daemons/http.go @@ -1,4 +1,4 @@ -package main +package daemons import ( "io" diff --git a/daemons/schedule_manager.go b/daemons/schedule_manager.go new file mode 100644 index 0000000..ac5248f --- /dev/null +++ b/daemons/schedule_manager.go @@ -0,0 +1,116 @@ +package daemons + +import ( + "encoding/json" + "fmt" + "net/url" + "time" + + "oc-scheduler/logger" + "oc-scheduler/models" + + "github.com/nats-io/nats.go" +) + + +type ScheduleManager struct { + Api_url string + list models.ScheduledBooking + ws HttpQuery + +} + + + +// Goroutine listening to a NATS server for updates +// on workflows' scheduling. Messages must contain +// workflow's name, startDate and stopDate while there +// is no way to get scheduling infos for a specific workflow +func (s *ScheduleManager) ListenWorkflowSubmissions(){ + + nc, _ := nats.Connect(nats.DefaultURL) + defer nc.Close() + + + ch := make(chan *nats.Msg, 64) + + subs , err := nc.ChanSubscribe("workflowsUpdate", ch) + if err != nil { + logger.Logger.Fatal().Msg("Error listening to NATS") + } + defer subs.Unsubscribe() + + for msg := range(ch){ + fmt.Println("Waiting...") + + map_mess := retrieveMapFromSub(msg.Data) + + s.list.Mu.Lock() + + start, err := time.Parse(time.RFC3339,map_mess["startDate"]) + if err != nil{ + logger.Logger.Error().Msg(err.Error()) + } + stop, err := time.Parse(time.RFC3339,map_mess["stopDate"]) + if err != nil{ + logger.Logger.Error().Msg(err.Error()) + } + + s.list.AddSchedule(models.Booking{Workflow: map_mess["workflow"], Start: start, Stop: stop }) + s.list.Mu.Unlock() + + } +} + + +func retrieveMapFromSub(message []byte) (result_map map[string]string) { + json.Unmarshal(message, &result_map) + return +} + +// Used at launch of the component to retrieve the next scheduled workflows +// and then every X minutes in case some workflows were scheduled before launch +func (s *ScheduleManager) RetrieveScheduling (){ + for(true){ + err := s.getNextScheduledWorkflows(s.Api_url, 0.3) + if err != nil { + logger.Logger.Fatal().Msg("Failed to get the workspaces list, check api url and that api server is up : " + s.Api_url) + } + + logger.Logger.Info().Msg("Current list of schedules") + fmt.Println(s.list.Bookings) + + time.Sleep(time.Minute * 5) + } +} + +func (s *ScheduleManager) getNextScheduledWorkflows(apiurl string, hours float64) (error) { + s.ws.Init(apiurl) + params := url.Values{} + start := time.Now().UTC() + params.Add("startDate", start.Format(time.RFC3339)) + time_span := time.Hour * time.Duration(hours) + params.Add("stopDate",start.Add(time_span).Format(time.RFC3339)) + body, err := s.ws.Get("v1/schedule?" + params.Encode()) + + if err != nil { + return err + } + var workflows []map[string]string + json.Unmarshal(body,&workflows) + + s.list.Mu.Lock() + defer s.list.Mu.Unlock() + + for _, workflow := range(workflows){ + start, _ := time.Parse(time.RFC3339,workflow["StartDate"]) + stop, _ := time.Parse(time.RFC3339,workflow["StopDate"]) + + s.list.AddSchedule(models.Booking{Workflow: workflow["Workflow"], Start: start, Stop: stop}) + } + + + + return nil +} + diff --git a/logger/logger.go b/logger/logger.go new file mode 100644 index 0000000..3bcaef1 --- /dev/null +++ b/logger/logger.go @@ -0,0 +1,15 @@ +package logger + +import ( + "os" + "time" + + "github.com/rs/zerolog" +) + +var Logger zerolog.Logger + +func init() { + output := zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC3339} + Logger = zerolog.New(output).With().Timestamp().Logger() +} \ No newline at end of file diff --git a/main.go b/main.go index 33b5646..e1927d7 100644 --- a/main.go +++ b/main.go @@ -3,43 +3,50 @@ package main import ( "fmt" "os" - "time" conf "oc-scheduler/conf" - "github.com/rs/zerolog" + "oc-scheduler/daemons" + "oc-scheduler/logger" ) -var log zerolog.Logger +// var log zerolog.Logger func main() { - output := zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC3339} - log = zerolog.New(output).With().Timestamp().Logger() + // output := zerolog.ConsoleWriter{Out: os.Stdout, TimeFormat: time.RFC3339} + // log = zerolog.New(output).With().Timestamp().Logger() app_conf := conf.GetConfig() apiurl := app_conf.OcCatalogUrl - var g Graph - - list, err := g.GetGraphList(apiurl) - if err != nil { - log.Fatal().Msg("Failed to get the workspaces list, check api url and that api server is up : " + apiurl) - } - - println("Available workspaces :") - for workspace, _ := range list { - println(workspace) - } - if _, err := os.Stat("./argo_workflows/"); os.IsNotExist(err) { os.Mkdir("./argo_workflows/",0755) - log.Info().Msg("Created argo_workflows/") + logger.Logger.Info().Msg("Created argo_workflows/") } - g.LoadFrom(list["test-alpr"]) - g.ExportToArgo("test-alpr") + mngr := daemons.ScheduleManager{Api_url: apiurl} - for(1 == 1){ + go mngr.RetrieveScheduling() + go mngr.ListenWorkflowSubmissions() + + // method in Schedule manager that checks the first Schedule object for its start date and exe + + // var g Graph + + // list, err := g.GetGraphList(apiurl) + // if err != nil { + // log.Fatal().Msg("Failed to get the workspaces list, check api url and that api server is up : " + apiurl) + // } + + // println("Available workspaces :") + // for workspace, _ := range list { + // println(workspace) + // } + + + // g.LoadFrom(list["test-alpr"]) + // g.ExportToArgo("test-alpr") + for(true){ fmt.Print("") } fmt.Print("stop")