From aaac37a8834481f4687ca9b86cbbf1baf2ff7d43 Mon Sep 17 00:00:00 2001 From: pb Date: Fri, 2 Aug 2024 13:34:39 +0200 Subject: [PATCH] implemented oclib and generates argo yaml --- .../chalitic-scobeele_01_08_2024_183304.yml | 46 +++ .../colintic-schicke_01_08_2024_183508.yml | 46 +++ .../sismon-oximous_02_08_2024_131457.yml | 39 +++ .../spresis-aplaront_01_08_2024_182306.yml | 46 +++ conf/docker_ocmonitor_conf.json | 3 + conf/local_ocmonitor_conf.json | 3 + logger/logger.go | 15 + models/http.go | 53 +++ models/schedule.go | 74 ++++ models/template_models.go | 40 +++ models/volume_models.go | 19 ++ workflow_builder/argo_builder.go | 315 ++++++++++++++++++ workflow_builder/graph.go | 246 ++++++++++++++ workflow_builder/graph_tests.go | 10 + 14 files changed, 955 insertions(+) create mode 100644 argo_workflows/chalitic-scobeele_01_08_2024_183304.yml create mode 100644 argo_workflows/colintic-schicke_01_08_2024_183508.yml create mode 100644 argo_workflows/sismon-oximous_02_08_2024_131457.yml create mode 100644 argo_workflows/spresis-aplaront_01_08_2024_182306.yml create mode 100644 conf/docker_ocmonitor_conf.json create mode 100644 conf/local_ocmonitor_conf.json create mode 100644 logger/logger.go create mode 100644 models/http.go create mode 100644 models/schedule.go create mode 100644 models/template_models.go create mode 100644 models/volume_models.go create mode 100644 workflow_builder/argo_builder.go create mode 100644 workflow_builder/graph.go create mode 100644 workflow_builder/graph_tests.go diff --git a/argo_workflows/chalitic-scobeele_01_08_2024_183304.yml b/argo_workflows/chalitic-scobeele_01_08_2024_183304.yml new file mode 100644 index 0000000..3e98cc5 --- /dev/null +++ b/argo_workflows/chalitic-scobeele_01_08_2024_183304.yml @@ -0,0 +1,46 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: oc-test-chalitic-scobeele +spec: + entrypoint: dag + volumeClaimTemplates: + - metadata: + name: workdir + spec: + accessModes: [ReadWriteOnce] + resources: + requests: + storage: 1Gi + templates: + - name: "-0" + container: + image: "" + args: [""] + volumeMounts: + - name: workdir + mountPath: /mnt/vol + - name: curl-1 + container: + image: curlimages/curl:7.88.1 + args: [-SL, 'https://toulousefc.com', -o, /mnt/vol/tfc.hmtl] + volumeMounts: + - name: workdir + mountPath: /mnt/vol + - name: alpine-2 + container: + image: alpine:3.7 + command: [sh, -c] + args: ['grep '' store objects + + new_dag.Tasks = append(new_dag.Tasks, step) + + } + + b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, Template{Name: "dag", Dag: new_dag}) + +} + +func (b *ArgoBuilder) createVolumes() { + // For testing purposes we only declare one volume, mounted in each computing + new_volume := VolumeClaimTemplate{} + new_volume.Metadata.Name = "workdir" + new_volume.Spec.AccessModes = []string{"ReadWriteOnce"} + new_volume.Spec.Resources.Requests.Storage = "1Gi" + + b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) +} + +func (b *ArgoBuilder) getDependency(current_computing_id string) (dependencies []string) { + var dependencies_id []string + + for _, link := range b.graph.Links { + source := b.graph.Items[link.Source.ID].Processing // Instead of searching for the AbstractResource we load the Processing pointer and test if it's nil to know if the item is a processing + if current_computing_id == link.Destination.ID && source != nil && !slices.Contains(dependencies_id, link.Source.ID) { + dependencies_id = append(dependencies_id, link.Source.ID) + } + } + + for _, dependency := range dependencies_id { + source := b.graph.Items[dependency].Processing + dependency_name := getArgoName(source.GetName(), dependency) + dependencies = append(dependencies, dependency_name) + } + + return + +} + +// func (b *ArgoBuilder) componentInBranch(component_id string, branch []string) bool { +// for _, link := range branch { +// if b.graph.Links[link].Source == component_id || b.graph.Links[link].Destination == component_id { +// return true +// } +// } +// return false +// } + +// func (b *ArgoBuilder) findPreviousComputing(computing_id string, branch []string, index int) string { + +// for i := index; i >= 0 ; i-- { +// previousLink := b.graph.Links[branch[i]] + +// if previousLink.Source != computing_id && b.graph.GetComponentType(previousLink.Source) == "computing"{ +// name := getArgoName(b.graph.getComponentName(previousLink.Source),previousLink.Source) +// return name +// } +// if previousLink.Destination != computing_id && b.graph.GetComponentType(previousLink.Destination) == "computing"{ +// name := getArgoName(b.graph.getComponentName(previousLink.Destination),previousLink.Destination) +// return name +// } + +// } +// return "" +// } + +func getComputingCommands(user_input string) (list_command []string) { + user_input = removeImageName(user_input) + if len(user_input) == 0 { + return + } + + list_command = strings.Split(user_input, " ") + for i := range list_command { + list_command[i] = list_command[i] + } + return +} + +func getComputingArgs(user_input []string, command string) (list_args []string) { + if len(user_input) == 0 { + return + } + + // quickfix that might need improvement + if strings.Contains(command, "sh -c") { + list_args = append(list_args, strings.Join(user_input, " ")) + return + } + + for _, arg := range user_input { + list_args = append(list_args, arg) + } + + return +} + +// Currently implements code to overcome problems in data structure +func getComputingEnvironment(user_input []string) (map_env map[string]string) { + + is_empty := len(user_input) == 0 + is_empty_string := len(user_input) == 1 && user_input[0] == "" + + if is_empty || is_empty_string { + return + } + + if len(user_input) == 1 { + user_input = strings.Split(user_input[0], ",") + } + + map_env = make(map[string]string, 0) + + for _, str := range user_input { + new_pair := strings.Split(str, "=") + + if len(new_pair) != 2 { + logger.Logger.Error().Msg("Error extracting the environment variable from " + str) + panic(0) + } + + map_env[new_pair[0]] = new_pair[1] + } + + return +} + +func getComputingEnvironmentName(user_input []string) (list_names []string) { + env_map := getComputingEnvironment(user_input) + for name := range env_map { + list_names = append(list_names, name) + } + + return +} + +func generateWfName() (Name string) { + Name = fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8) + return +} + +func getArgoName(raw_name string, component_id string) (formatedName string) { + formatedName = strings.ReplaceAll(raw_name, " ", "-") + formatedName += "-" + component_id + formatedName = strings.ToLower(formatedName) + return +} + +func printYAML(data interface{}) { + yamlData, err := yaml.Marshal(data) + if err != nil { + fmt.Printf("Error marshalling YAML: %v\n", err) + return + } + fmt.Println(string(yamlData)) +} + +func removeImageName(user_input string) string { + // First command is the name of the container for now + if len(strings.Split(user_input, " ")) == 1 { + return "" + } + + slice_input := strings.Split(user_input, " ") + new_slice := slice_input[1:] + user_input = strings.Join(new_slice, " ") + + return user_input +} + +// Return the graphItem containing a Processing resource, so that we have access to the ID of the graphItem in order to identify it in the links +func (b *ArgoBuilder) getProcessings() (list_computings []graph.GraphItem) { + for _, item := range b.graph.Items { + if item.Processing != nil { + list_computings = append(list_computings, item) + } + } + return +} + +func getStringValue(comp resource_model.AbstractResource, key string) string { + if res := comp.GetModelValue(key); res != nil { + return res.(string) + } + return "" +} \ No newline at end of file diff --git a/workflow_builder/graph.go b/workflow_builder/graph.go new file mode 100644 index 0000000..ef65832 --- /dev/null +++ b/workflow_builder/graph.go @@ -0,0 +1,246 @@ +package workflow_builder + +import ( + "errors" + "fmt" + "maps" + + "oc-monitor/logger" + models "oc-monitor/models" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" + workflow "cloud.o-forge.io/core/oc-lib/models/workflow" +) + + + +type WorflowDB struct { + workflow_name string // used to test if the graph has been instatiated, private so can only be set by a graph's method + graph *graph.Graph + links map[int]graph.GraphLink + ws models.HttpQuery +} + +// Create the obj!ects from the mxgraphxml stored in the workflow given as a parameter +func (w *WorflowDB) LoadFrom(workflow_id string) error { + + new_wf, err := w.getWorkflow(workflow_id) + if err != nil { + return err + } + + w.graph = new_wf.Graph + w.links = w.getLinks() + + w.workflow_name = new_wf.Name + + return nil +} + +// Use oclib to retrieve the graph contained in the workflow referenced +func (w *WorflowDB) getWorkflow( workflow_id string) (workflow *workflow.Workflow, err error) { + + lib_data := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW),workflow_id) + if lib_data.Code != 200 { + logger.Logger.Error().Msg("Error loading the graph") + return workflow, errors.New(lib_data.Err) + } + + new_wf := lib_data.ToWorkflow() + if new_wf == nil { + logger.Logger.Error().Msg("WorflowDB object is empty for " + workflow_id ) + return workflow, errors.New("WorflowDB can't be empty") + } + + return new_wf, nil +} + + + +func (w *WorflowDB) getLinks() map[int]graph.GraphLink { + links := make(map[int]graph.GraphLink) + for i, link := range(w.graph.Links) { + links[i] = link + } + + return links +} + +func (w *WorflowDB) ExportToArgo() (string, error) { + if len(w.workflow_name) == 0 || &w.graph == nil { + return "",fmt.Errorf("can't export a graph that has not been loaded yet") + } + end_links := make(map[int]graph.GraphLink) + + for i, link := range w.links { + if (!w.isDCLink(i) && !w.isSource(link.Destination.ID,i)){ + end_links[i] = link + } + } + + // index_list := make([]int, len(w.links)) + // list_branches := make([][]string,0) + list_branches := w.getListBranches(end_links, nil,nil) + + // for _, branch := range list_branches{ + // str := "" + // for _, link := range branch{ + // str = str + " --> " + w.getComponentName(w.graph.Links[link].Source) + " linked with " + w.getComponentName(w.links[link].Destination) + // } + + // fmt.Println(str) + // } + + fmt.Println("Identified branches : ", list_branches) + argo_builder := ArgoBuilder{graph : *w.graph, branches: list_branches} + filename, err := argo_builder.CreateDAG() + if err != nil { + logger.Logger.Error().Msg("Could not create the argo file for " + w.workflow_name) + return "", err + } + return filename, nil +} + +// Return a list containing the IDs of each link that make up a branch in the graph +func (w *WorflowDB) getListBranches(end_links map[int]graph.GraphLink, unvisited_links_list map[int]graph.GraphLink, current_branch []int) (list_branches [][]int) { + + if current_branch == nil { + current_branch = make([]int, 0) + } + + if unvisited_links_list == nil { + unvisited_links_list = make(map[int]graph.GraphLink,len(w.graph.Links)) + maps.Copy(unvisited_links_list,w.links) + fmt.Println(unvisited_links_list) + } + + for link_id, _ := range end_links { + j := link_id + new_branches := make([][]int,0) + + previous_index := w.getPreviousLink(j, unvisited_links_list) + if len(previous_index) == 0 { + list_branches = append(list_branches, []int{link_id}) + } + + for _, id_link := range previous_index { + current_branch = append([]int{link_id},current_branch...) + delete(unvisited_links_list, link_id) + // create a new branch for each previous link, appending the current path to this node to the created branch + new_end_link := make(map[int]graph.GraphLink,0) + new_end_link[id_link] = w.links[id_link] + new_branches = w.getListBranches(new_end_link,unvisited_links_list,current_branch) + + for _, new_branch := range new_branches{ + current_branch = append(new_branch,link_id) + list_branches = append(list_branches, current_branch) + } + } + } + + return +} + +func (w *WorflowDB) ExportToHelm(id string) error { + + return nil +} + +// Return if it exists a link where Destination is the same as comp_id +func (w *WorflowDB) isDestination(comp_id string,link_id int) bool { + + for i, link := range w.links{ + if(i !=link_id && link.Destination.ID == comp_id){ + return true + } + } + + return false + +} + +// Return if it exists a link where Source is the same as comp_id +func (w *WorflowDB) isSource(comp_id string, link_id int) bool { + + for i, link := range w.links{ + if(i !=link_id && link.Source.ID == comp_id && !w.isDCLink(i)){ + return true + } + } + + return false + +} + +// Returns an index number if their is a link in w.links +// with the same Destination id that the Source id in w.links[linkIndex] +// or nil if not + +func (w *WorflowDB) getPreviousLink(link_id int,map_link map[int]graph.GraphLink) (previous_id []int) { + for k, link := range map_link{ + if(k != link_id && link.Destination == w.links[link_id].Source){ + previous_id = append(previous_id, k) + } + } + + return +} + + + +// returns either computing, data or storage +func GetComponentType(component_id string) string { + if libdata := oclib.LoadOne(oclib.LibDataEnum(oclib.PROCESSING_RESOURCE),component_id); libdata.Code == 200{ + return "computing" + } + + if libdata := oclib.LoadOne(oclib.LibDataEnum(oclib.DATA_RESOURCE),component_id); libdata.Code == 200{ + return "data" + } + + if libdata := oclib.LoadOne(oclib.LibDataEnum(oclib.STORAGE_RESOURCE),component_id); libdata.Code == 200{ + return "storage" + } + + if libdata := oclib.LoadOne(oclib.LibDataEnum(oclib.DATACENTER_RESOURCE),component_id); libdata.Code == 200{ + return "datacenter" + } + + if libdata := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE),component_id); libdata.Code == 200{ + return "workflow" + } + + return "" +} + +// Returns a slice of id, in case the link is made of twice the same type of component + +func (w *WorflowDB) getComponentByType(compType string, link graph.GraphLink) (ids []string){ + if(GetComponentType(link.Source.ID) == compType){ + ids = append(ids, link.Source.ID) + } + if(GetComponentType(link.Destination.ID) == compType){ + ids = append(ids, link.Destination.ID) + } + + return +} + +func (w *WorflowDB) isDCLink(link_id int) bool { + link := w.links[link_id] + + dest := w.graph.Items[link.Destination.ID] + dest_id := dest.GetAbstractRessource().GetID() + + source := w.graph.Items[link.Source.ID] + source_id := source.GetAbstractRessource().GetID() + + return IsDatacenter(dest_id) || IsDatacenter(source_id) +} + +func IsDatacenter(id string) bool { + resource := oclib.LoadOne(oclib.LibDataEnum(oclib.DATACENTER_RESOURCE),id) + + return resource.Code == 200 +} diff --git a/workflow_builder/graph_tests.go b/workflow_builder/graph_tests.go new file mode 100644 index 0000000..fea3a23 --- /dev/null +++ b/workflow_builder/graph_tests.go @@ -0,0 +1,10 @@ +package workflow_builder + +import ( + "testing" +) + +func TestGetGraph(t *testing.T){ + w := WorflowDB{} + w.LoadFrom("test-log") +}