diff --git a/daemons/execution_manager.go b/daemons/execution_manager.go index fd8ac7c..fa26969 100644 --- a/daemons/execution_manager.go +++ b/daemons/execution_manager.go @@ -1,12 +1,68 @@ package daemons -import "oc-scheduler/models" +import ( + "oc-scheduler/logger" + "oc-scheduler/models" + "oc-scheduler/workflow_builder" + "time" +) type ExecutionManager struct { - Bookings models.ScheduledBooking + bookings *models.ScheduledBooking + executions []models.Booking } -func (em *ExecutionManager) test(){ - em.Bookings.Mu.Lock() - defer em.Bookings.Mu.Unlock() +func (em *ExecutionManager) SetBookings(b *models.ScheduledBooking){ + em.bookings = b +} + +// Loop every second on the booking's list and move the booking that must start to a new list +// that will be looped over to start them +func (em *ExecutionManager) RetrieveNextExecutions(){ + + if(em.bookings == nil){ + logger.Logger.Fatal().Msg("booking has not been set in the exection manager") + } + + for(true){ + logger.Logger.Debug().Msg("New loop") + em.bookings.Mu.Lock() + bookings := em.bookings.Bookings + if (len(bookings) > 0){ + for i := len( bookings) - 1 ; i >= 0 ; i--{ + logger.Logger.Debug().Msg("It should start at " + bookings[i].Start.String() + " and it is now " + time.Now().UTC() .String()) + if (bookings[i].Start.Before(time.Now().UTC())){ + logger.Logger.Info().Msg("Will execute " + bookings[i].Workflow + " soon") + go em.executeBooking(bookings[i]) + bookings = append(bookings[:i], bookings[i+1:]...) + em.bookings.Bookings = bookings + } + } + } + em.bookings.Mu.Unlock() + time.Sleep(time.Second) + } + +} + +func (em *ExecutionManager) executeBooking(booking models.Booking){ + // create argo + new_graph := workflow_builder.Graph{} + + err := new_graph.LoadFrom(booking.Workflow) + if err != nil { + logger.Logger.Error().Msg("Could not retrieve workflow " + booking.Workflow + " from oc-catalog API") + } + + argo_file, err := new_graph.ExportToArgo() + if err != nil { + logger.Logger.Error().Msg("Could not create the Argo file for " + booking.Workflow ) + logger.Logger.Error().Msg(err.Error()) + } + + _ = argo_file + // start execution + // locally launch a pod that contains oc-monitor, give it the name of the workflow + // create the yaml that describes the pod : filename, path/url to Loki + // locally launch an argo workflow with the filename `argo submit PATH_TO_YAML --watch --serviceaccount=argo -n argo` } \ No newline at end of file diff --git a/daemons/schedule_manager.go b/daemons/schedule_manager.go index bd679ba..a81ce6c 100644 --- a/daemons/schedule_manager.go +++ b/daemons/schedule_manager.go @@ -17,13 +17,15 @@ import ( type ScheduleManager struct { - Api_url string - list models.ScheduledBooking - ws HttpQuery + Api_url string + bookings *models.ScheduledBooking + ws models.HttpQuery } - +func (s *ScheduleManager) SetBookings(b *models.ScheduledBooking){ + s.bookings = b +} // Goroutine listening to a NATS server for updates // on workflows' scheduling. Messages must contain @@ -31,6 +33,10 @@ type ScheduleManager struct { // is no way to get scheduling infos for a specific workflow func (s *ScheduleManager) ListenWorkflowSubmissions(){ + if(s.bookings == nil){ + logger.Logger.Fatal().Msg("booking has not been set in the schedule manager") + } + nc, _ := nats.Connect(nats.DefaultURL) defer nc.Close() @@ -48,7 +54,7 @@ func (s *ScheduleManager) ListenWorkflowSubmissions(){ map_mess := retrieveMapFromSub(msg.Data) - s.list.Mu.Lock() + s.bookings.Mu.Lock() start, err := time.Parse(time.RFC3339,map_mess["start_date"]) if err != nil{ @@ -59,8 +65,8 @@ func (s *ScheduleManager) ListenWorkflowSubmissions(){ logger.Logger.Error().Msg(err.Error()) } - s.list.AddSchedule(models.Booking{Workflow: map_mess["workflow"], Start: start, Stop: stop }) - s.list.Mu.Unlock() + s.bookings.AddSchedule(models.Booking{Workflow: map_mess["workflow"], Start: start, Stop: stop }) + s.bookings.Mu.Unlock() } } @@ -81,7 +87,7 @@ func (s *ScheduleManager) RetrieveScheduling (){ } logger.Logger.Info().Msg("Current list of schedules") - fmt.Println(s.list.Bookings) + fmt.Println(s.bookings.Bookings) time.Sleep(time.Minute * 5) } @@ -102,14 +108,14 @@ func (s *ScheduleManager) getNextScheduledWorkflows(apiurl string, hours float64 var workflows []map[string]string json.Unmarshal(body,&workflows) - s.list.Mu.Lock() - defer s.list.Mu.Unlock() + s.bookings.Mu.Lock() + defer s.bookings.Mu.Unlock() for _, workflow := range(workflows){ start, _ := time.Parse(time.RFC3339,workflow["start_date"]) stop, _ := time.Parse(time.RFC3339,workflow["stop_date"]) - s.list.AddSchedule(models.Booking{Workflow: workflow["Workflow"], Start: start, Stop: stop}) + s.bookings.AddSchedule(models.Booking{Workflow: workflow["Workflow"], Start: start, Stop: stop}) } return nil diff --git a/daemons/http.go b/models/http.go similarity index 98% rename from daemons/http.go rename to models/http.go index b34c6d3..4026d70 100644 --- a/daemons/http.go +++ b/models/http.go @@ -1,4 +1,4 @@ -package daemons +package models import ( "io" diff --git a/argo_builder.go b/workflow_builder/argo_builder.go similarity index 98% rename from argo_builder.go rename to workflow_builder/argo_builder.go index 9833e3b..b58aaf7 100644 --- a/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -2,7 +2,7 @@ // via its lists of components into an argo file, using the a list of // link ID to build the dag -package main +package workflow_builder import ( "fmt" @@ -40,7 +40,7 @@ type Spec struct { Templates []Template `yaml:"templates"` } -func (b *ArgoBuilder) CreateDAG() bool { +func (b *ArgoBuilder) CreateDAG() (string, error) { fmt.Println("list of branches : ", b.branches) b.createTemplates() @@ -54,10 +54,9 @@ func (b *ArgoBuilder) CreateDAG() bool { b.Workflow.Metadata.GenerateName = "oc-test-" + random_name yamlified, err := yaml.Marshal(b.Workflow) - if err != nil { logs.Error("Could not transform object to yaml file") - return false + return "", err } // Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss @@ -67,11 +66,10 @@ func (b *ArgoBuilder) CreateDAG() bool { err = os.WriteFile(workflows_dir + file_name , []byte(yamlified), 0660) if err != nil { logs.Error("Could not write the yaml file") - return false + return "",err } - fmt.Println("Created " + file_name) - return true + return file_name, nil } func (b *ArgoBuilder) createTemplates() { diff --git a/graph.go b/workflow_builder/graph.go similarity index 75% rename from graph.go rename to workflow_builder/graph.go index 8a7689f..b20fda6 100644 --- a/graph.go +++ b/workflow_builder/graph.go @@ -1,13 +1,15 @@ -package main +package workflow_builder import ( "encoding/json" "fmt" "maps" - "net/url" - "oc-scheduler/daemons" - "cloud.o-forge.io/core/oc-catalog/models" + "oc-scheduler/conf" + "oc-scheduler/logger" + models "oc-scheduler/models" + + catalog_models "cloud.o-forge.io/core/oc-catalog/models" // this will be replaced with oc-lib "github.com/beego/beego/v2/core/logs" "github.com/tidwall/gjson" @@ -16,12 +18,13 @@ import ( type Graph struct { - Datas []models.DataModel - Computings []models.ComputingModel - Datacenters []models.DatacenterModel - Storages []models.StorageModel - Links map[string]models.Link - ws daemons.HttpQuery + workflow_name string // used to test if the graph has been instatiated, private so can only be set by a graph's method + Datas []catalog_models.DataModel + Computings []catalog_models.ComputingModel + Datacenters []catalog_models.DatacenterModel + Storages []catalog_models.StorageModel + Links map[string]catalog_models.Link + ws models.HttpQuery } // Create a dictionnaries with each existing workflow from a workspace, associated to the JSON representation of its content @@ -40,21 +43,38 @@ func (g *Graph) GetGraphList(apiurl string) (map[string]string, error) { return workspaces, nil } +// Should the parameter be removed, since we have oc-catalog url in the conf ? +func (g *Graph) GetGraph(apiurl string, workflow string) (string, error) { + g.ws.Init(apiurl) + body, err := g.ws.Get("v1/workflow/" + workflow) + if err != nil { + return "", err + } + + graph := string(body) + + // result := gjson.Get(string(body), "Workflows") + // result.ForEach(func(key, value gjson.Result) bool { + // workspaces[key.Str] = value.String() + // return true // keep iterating + // }) + return graph, nil +} + + // Create the objects from the mxgraphxml stored in the workflow given as a parameter -func (g *Graph) LoadFrom(workspace string) error { +func (g *Graph) LoadFrom(workflow_name string) error { // Extract the xmlgraph from the given workspace - xml := gjson.Get(workspace, "MxgraphXML").String() - decodedValue, err := url.QueryUnescape(xml) + graph, err := g.GetGraph(conf.GetConfig().OcCatalogUrl,workflow_name) if err != nil { return err } - _ = decodedValue - // os.WriteFile("graph.xml", []byte(decodedValue), 0660) - g.GetWorkflowComponents(workspace) - g.GetLinks(workspace) + g.GetWorkflowComponents(graph) + g.GetLinks(graph) + g.workflow_name = workflow_name return nil } @@ -94,12 +114,12 @@ func (g *Graph) GetWorkflowComponents(workflow string){ } func (g *Graph) GetLinks(workflow string){ - g.Links = make(map[string]models.Link) + g.Links = make(map[string]catalog_models.Link) result := gjson.Get(workflow, "link") if (result.Type != gjson.Null) { result.ForEach(func(id, value gjson.Result) bool{ - var l models.Link + var l catalog_models.Link json.Unmarshal([]byte(value.Raw),&l) g.Links[id.Str] = l @@ -109,7 +129,7 @@ func (g *Graph) GetLinks(workflow string){ } func (g *Graph) AddDataModel(id string, user_input gjson.Result, wf_id string) error { - var d models.DataModel + var d catalog_models.DataModel resp, err := g.ws.Get("v1/data/" + id) if err != nil { return err @@ -122,7 +142,7 @@ func (g *Graph) AddDataModel(id string, user_input gjson.Result, wf_id string) e } func (g *Graph) AddDatacenterModel(id string, user_input gjson.Result, wf_id string) error { - var d models.DatacenterModel + var d catalog_models.DatacenterModel resp, err := g.ws.Get("v1/datacenter/" + id) if err != nil { return err @@ -135,7 +155,7 @@ func (g *Graph) AddDatacenterModel(id string, user_input gjson.Result, wf_id str } func (g *Graph) AddComputingModel(id string, user_input gjson.Result, wf_id string) error { - var c models.ComputingModel + var c catalog_models.ComputingModel resp, err := g.ws.Get("v1/computing/" + id) if err != nil { return err @@ -148,7 +168,7 @@ func (g *Graph) AddComputingModel(id string, user_input gjson.Result, wf_id stri } func (g *Graph) AddStorageModel(id string, user_input gjson.Result, wf_id string) error { - var s models.StorageModel + var s catalog_models.StorageModel resp, err := g.ws.Get("v1/storage/" + id) if err != nil { return err @@ -160,8 +180,11 @@ func (g *Graph) AddStorageModel(id string, user_input gjson.Result, wf_id string return nil } -func (g *Graph) ExportToArgo(id string) error { - end_links := make(map[string]models.Link) +func (g *Graph) ExportToArgo() (string, error) { + if len(g.workflow_name) == 0 { + return "",fmt.Errorf("can't export a graph that has not been loaded yet") + } + end_links := make(map[string]catalog_models.Link) for i, link := range g.Links { if (!link.DCLink && !g.isSource(link.Destination,i)){ @@ -184,20 +207,23 @@ func (g *Graph) ExportToArgo(id string) error { fmt.Println("Identified branches : ", list_branches) argo_builder := ArgoBuilder{graph : *g, branches: list_branches} - argo_builder.CreateDAG() - - return nil + filename, err := argo_builder.CreateDAG() + if err != nil { + logger.Logger.Error().Msg("Could not create the argo file for " + g.workflow_name) + return "", err + } + return filename, nil } // Return a list containing the IDs of each link that make up a branch in the graph -func (g *Graph) getListBranches(end_links map[string]models.Link, unvisited_links_list map[string]models.Link, current_branch []string) (list_branches [][]string) { +func (g *Graph) getListBranches(end_links map[string]catalog_models.Link, unvisited_links_list map[string]catalog_models.Link, current_branch []string) (list_branches [][]string) { if current_branch == nil { current_branch = make([]string, 0) } if unvisited_links_list == nil { - unvisited_links_list = make(map[string]models.Link,len(g.Links)) + unvisited_links_list = make(map[string]catalog_models.Link,len(g.Links)) maps.Copy(unvisited_links_list,g.Links) fmt.Println(unvisited_links_list) } @@ -215,7 +241,7 @@ func (g *Graph) getListBranches(end_links map[string]models.Link, unvisited_link current_branch = append([]string{link_id},current_branch...) delete(unvisited_links_list, link_id) // create a new branch for each previous link, appending the current path to this node to the created branch - new_end_link := make(map[string]models.Link,0) + new_end_link := make(map[string]catalog_models.Link,0) new_end_link[id_link] = g.Links[id_link] new_branches = g.getListBranches(new_end_link,unvisited_links_list,current_branch) @@ -264,7 +290,7 @@ func (g *Graph) isSource(comp_id string,link_id string) bool { // with the same Destination id that the Source id in g.Links[linkIndex] // or nil if not -func (g *Graph) getPreviousLink(link_id string,map_link map[string]models.Link) (previous_id []string) { +func (g *Graph) getPreviousLink(link_id string,map_link map[string]catalog_models.Link) (previous_id []string) { for k, link := range map_link{ if(k != link_id && link.Destination == g.Links[link_id].Source){ previous_id = append(previous_id, k) @@ -320,7 +346,7 @@ func (g *Graph) getComponentType(component_id string) string { // Returns a slice of id, in case the link is made of twice the same type of component -func (g *Graph) getComponentByType(compType string, link models.Link) (ids []string){ +func (g *Graph) getComponentByType(compType string, link catalog_models.Link) (ids []string){ if(g.getComponentType(link.Source) == compType){ ids = append(ids, link.Source) }