Minimize code + docker + monitord naming
This commit is contained in:
		| @@ -5,14 +5,12 @@ | ||||
| package workflow_builder | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"oc-monitor/conf" | ||||
| 	"oc-monitor/logger" | ||||
| 	. "oc-monitor/models" | ||||
| 	. "oc-monitord/models" | ||||
| 	"os" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	oclib "cloud.o-forge.io/core/oc-lib" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/resource_model" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" | ||||
| 	"github.com/nwtgck/go-fakelish" | ||||
| @@ -41,30 +39,29 @@ type Spec struct { | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) CreateDAG() (string, error) { | ||||
| 	 | ||||
|  | ||||
| 	b.createTemplates() | ||||
| 	b.createDAGstep() | ||||
| 	b.createVolumes() | ||||
| 	b.Workflow.Spec.Entrypoint = "dag" | ||||
|  | ||||
| 	b.Workflow.ApiVersion = "argoproj.io/v1alpha1" | ||||
| 	b.Workflow.Kind = "Workflow" | ||||
| 	random_name := generateWfName() | ||||
| 	b.Workflow.Metadata.Name = "oc-monitor-" + random_name | ||||
|  | ||||
| 	logger := oclib.GetLogger() | ||||
| 	yamlified, err := yaml.Marshal(b.Workflow) | ||||
| 	if err != nil { | ||||
| 		logger.Logger.Error().Msg("Could not transform object to yaml file") | ||||
| 		logger.Error().Msg("Could not transform object to yaml file") | ||||
| 		return "", err | ||||
| 	} | ||||
|  | ||||
| 	// Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss | ||||
| 	current_timestamp := time.Now().Format("02_01_2006_150405") | ||||
| 	file_name := random_name + "_" + current_timestamp + ".yml" | ||||
| 	workflows_dir := conf.GetConfig().MonitorDir+"/argo_workflows/" | ||||
| 	workflows_dir := "./argo_workflows/" | ||||
| 	err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660) | ||||
| 	if err != nil { | ||||
| 		logger.Logger.Error().Msg("Could not write the yaml file") | ||||
| 		logger.Error().Msg("Could not write the yaml file") | ||||
| 		return "", err | ||||
| 	} | ||||
|  | ||||
| @@ -72,11 +69,10 @@ func (b *ArgoBuilder) CreateDAG() (string, error) { | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) createTemplates() { | ||||
|  | ||||
| 	for _, comp := range b.getProcessings() { | ||||
| 		var command string | ||||
| 		var args 	string | ||||
| 		var env 	string | ||||
| 		var args string | ||||
| 		var env string | ||||
|  | ||||
| 		comp_res := comp.Processing | ||||
|  | ||||
| @@ -107,13 +103,9 @@ func (b *ArgoBuilder) createTemplates() { | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) createDAGstep() { | ||||
|  | ||||
| 	new_dag := Dag{} | ||||
|  | ||||
| 	for _, comp := range b.getProcessings() { | ||||
|  | ||||
| 		comp_res := comp.Processing | ||||
|  | ||||
| 		env := getStringValue(comp_res.AbstractResource, "env") | ||||
| 		unique_name := getArgoName(comp_res.GetName(), comp.ID) | ||||
| 		step := Task{Name: unique_name, Template: unique_name} | ||||
| @@ -124,12 +116,9 @@ func (b *ArgoBuilder) createDAGstep() { | ||||
| 		} | ||||
|  | ||||
| 		// retrieves the name (computing.name-computing.ID) | ||||
| 		step.Dependencies = b.getDependency(comp.ID)	// Error : we use the component ID instead of the GraphItem ID -> store objects  | ||||
| 		 | ||||
| 		step.Dependencies = b.getDependency(comp.ID) // Error : we use the component ID instead of the GraphItem ID -> store objects | ||||
| 		new_dag.Tasks = append(new_dag.Tasks, step) | ||||
|  | ||||
| 	} | ||||
|  | ||||
| 	b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, Template{Name: "dag", Dag: new_dag}) | ||||
|  | ||||
| } | ||||
| @@ -140,69 +129,36 @@ func (b *ArgoBuilder) createVolumes() { | ||||
| 	new_volume.Metadata.Name = "workdir" | ||||
| 	new_volume.Spec.AccessModes = []string{"ReadWriteOnce"} | ||||
| 	new_volume.Spec.Resources.Requests.Storage = "1Gi" | ||||
|  | ||||
| 	b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) getDependency(current_computing_id string) (dependencies []string) { | ||||
|  | ||||
| 	for _, link := range b.graph.Links { | ||||
| 		source := b.graph.Items[link.Source.ID].Processing  | ||||
| 		if !b.IsProcessing(link.Source.ID) || !b.IsProcessing(link.Destination.ID) { | ||||
| 			continue | ||||
| 		} | ||||
| 		source := b.graph.Items[link.Source.ID].Processing | ||||
| 		if current_computing_id == link.Destination.ID && source != nil { | ||||
| 			dependency_name := getArgoName(source.GetName(), link.Source.ID) | ||||
| 			dependencies = append(dependencies, dependency_name) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return | ||||
|  | ||||
| } | ||||
|  | ||||
| // func (b *ArgoBuilder) componentInBranch(component_id string, branch []string) bool { | ||||
| // 	for _, link := range branch { | ||||
| // 		if b.graph.Links[link].Source == component_id || b.graph.Links[link].Destination == component_id { | ||||
| // 			return true | ||||
| // 		} | ||||
| // 	} | ||||
| // 	return false | ||||
| // } | ||||
|  | ||||
| // func (b *ArgoBuilder) findPreviousComputing(computing_id string, branch []string, index int) string { | ||||
|  | ||||
| // 	for i := index; i >= 0 ; i-- { | ||||
| // 		previousLink := b.graph.Links[branch[i]] | ||||
|  | ||||
| // 		if previousLink.Source != computing_id && b.graph.GetComponentType(previousLink.Source) == "computing"{ | ||||
| // 			name := getArgoName(b.graph.getComponentName(previousLink.Source),previousLink.Source) | ||||
| // 			return name | ||||
| // 		} | ||||
| // 		if previousLink.Destination != computing_id && b.graph.GetComponentType(previousLink.Destination) == "computing"{ | ||||
| // 			name := getArgoName(b.graph.getComponentName(previousLink.Destination),previousLink.Destination) | ||||
| // 			return name | ||||
| // 		} | ||||
|  | ||||
| // 	} | ||||
| // 	return "" | ||||
| // } | ||||
|  | ||||
| func getComputingCommands(user_input string) (list_command []string) { | ||||
| func getComputingCommands(user_input string) []string { | ||||
| 	user_input = removeImageName(user_input) | ||||
| 	if len(user_input) == 0 { | ||||
| 		return | ||||
| 		return []string{} | ||||
| 	} | ||||
|  | ||||
| 	list_command = strings.Split(user_input, " ") | ||||
| 	for i := range list_command { | ||||
| 		list_command[i] = list_command[i] | ||||
| 	} | ||||
| 	return | ||||
| 	return strings.Split(user_input, " ") | ||||
| } | ||||
|  | ||||
| func getComputingArgs(user_input []string, command string) (list_args []string) { | ||||
| 	if len(user_input) == 0 { | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	// quickfix that might need improvement | ||||
| 	if strings.Contains(command, "sh -c") { | ||||
| 		list_args = append(list_args, strings.Join(user_input, " ")) | ||||
| @@ -210,13 +166,12 @@ func getComputingArgs(user_input []string, command string) (list_args []string) | ||||
| 	} | ||||
|  | ||||
| 	list_args = append(list_args, user_input...) | ||||
|  | ||||
| 	return | ||||
| } | ||||
|  | ||||
| // Currently implements code to overcome problems in data structure | ||||
| func getComputingEnvironment(user_input []string) (map_env map[string]string) { | ||||
| 	 | ||||
| 	logger := oclib.GetLogger() | ||||
| 	is_empty := len(user_input) == 0 | ||||
| 	is_empty_string := len(user_input) == 1 && user_input[0] == "" | ||||
|  | ||||
| @@ -234,7 +189,7 @@ func getComputingEnvironment(user_input []string) (map_env map[string]string) { | ||||
| 		new_pair := strings.Split(str, "=") | ||||
|  | ||||
| 		if len(new_pair) != 2 { | ||||
| 			logger.Logger.Error().Msg("Error extracting the environment variable from " + str) | ||||
| 			logger.Error().Msg("Error extracting the environment variable from " + str) | ||||
| 			panic(0) | ||||
| 		} | ||||
|  | ||||
| @@ -265,15 +220,6 @@ func getArgoName(raw_name string, component_id string) (formatedName string) { | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func printYAML(data interface{}) { | ||||
| 	yamlData, err := yaml.Marshal(data) | ||||
| 	if err != nil { | ||||
| 		fmt.Printf("Error marshalling YAML: %v\n", err) | ||||
| 		return | ||||
| 	} | ||||
| 	fmt.Println(string(yamlData)) | ||||
| } | ||||
|  | ||||
| func removeImageName(user_input string) string { | ||||
| 	// First command is the name of the container for now | ||||
| 	if len(strings.Split(user_input, " ")) == 1 { | ||||
| @@ -297,10 +243,18 @@ func (b *ArgoBuilder) getProcessings() (list_computings []graph.GraphItem) { | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func getStringValue(comp resource_model.AbstractResource, key string) string { | ||||
|     if res := comp.GetModelValue(key); res != nil { | ||||
|         return res.(string) | ||||
|     } | ||||
|     return "" | ||||
| func (b *ArgoBuilder) IsProcessing(id string) bool { | ||||
| 	for _, item := range b.graph.Items { | ||||
| 		if item.Processing != nil && item.Processing.GetID() == id { | ||||
| 			return true | ||||
| 		} | ||||
| 	} | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| func getStringValue(comp resource_model.AbstractResource, key string) string { | ||||
| 	if res := comp.GetModelValue(key); res != nil { | ||||
| 		return res.(string) | ||||
| 	} | ||||
| 	return "" | ||||
| } | ||||
|   | ||||
| @@ -4,156 +4,58 @@ import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
|  | ||||
| 	"oc-monitor/logger" | ||||
| 	models "oc-monitor/models" | ||||
|  | ||||
| 	oclib "cloud.o-forge.io/core/oc-lib" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" | ||||
| 	workflow "cloud.o-forge.io/core/oc-lib/models/workflow" | ||||
| ) | ||||
|  | ||||
|  | ||||
|  | ||||
| type WorflowDB struct { | ||||
| 	workflow_name	string							// used to test if the graph has been instatiated, private so can only be set by a graph's method | ||||
| 	graph 			*graph.Graph | ||||
| 	ws          	models.HttpQuery | ||||
| 	Workflow *workflow.Workflow | ||||
| } | ||||
|  | ||||
| // Create the obj!ects from the mxgraphxml stored in the workflow given as a parameter | ||||
| func (w *WorflowDB) LoadFrom(workflow_id string) error { | ||||
| 	 | ||||
| 	new_wf, err := w.getWorkflow(workflow_id) | ||||
| 	if err != nil { | ||||
| 	var err error | ||||
| 	if w.Workflow, err = w.getWorkflow(workflow_id); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | ||||
| 	w.graph = new_wf.Graph  | ||||
|  | ||||
| 	w.workflow_name = new_wf.Name | ||||
| 	 | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Use oclib to retrieve the graph contained in the workflow referenced | ||||
| func (w *WorflowDB) getWorkflow( workflow_id string) (workflow *workflow.Workflow, err error) { | ||||
| func (w *WorflowDB) getWorkflow(workflow_id string) (workflow *workflow.Workflow, err error) { | ||||
| 	logger := oclib.GetLogger() | ||||
|  | ||||
| 	lib_data := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW),workflow_id) | ||||
| 	lib_data := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW), workflow_id) | ||||
| 	if lib_data.Code != 200 { | ||||
| 		logger.Logger.Error().Msg("Error loading the graph") | ||||
| 		logger.Error().Msg("Error loading the graph") | ||||
| 		return workflow, errors.New(lib_data.Err) | ||||
| 	} | ||||
|  | ||||
| 	new_wf := lib_data.ToWorkflow() | ||||
| 	if new_wf == nil { | ||||
| 		logger.Logger.Error().Msg("WorflowDB object is empty for " + workflow_id ) | ||||
| 		logger.Error().Msg("WorflowDB object is empty for " + workflow_id) | ||||
| 		return workflow, errors.New("WorflowDB can't be empty") | ||||
| 	} | ||||
|  | ||||
| 	return new_wf, nil | ||||
| } | ||||
|  | ||||
|  | ||||
| func (w *WorflowDB) ExportToArgo() (string, error) { | ||||
| 	if len(w.workflow_name) == 0 || &w.graph == nil { | ||||
| 		return "",fmt.Errorf("can't export a graph that has not been loaded yet") | ||||
| 	logger := oclib.GetLogger() | ||||
|  | ||||
| 	if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil { | ||||
| 		return "", fmt.Errorf("can't export a graph that has not been loaded yet") | ||||
| 	} | ||||
| 	 | ||||
| 	 | ||||
| 	argo_builder := ArgoBuilder{graph : *w.graph} | ||||
|  | ||||
| 	argo_builder := ArgoBuilder{graph: *w.Workflow.Graph} | ||||
| 	filename, err := argo_builder.CreateDAG() | ||||
| 	if err != nil { | ||||
| 		logger.Logger.Error().Msg("Could not create the argo file for " + w.workflow_name) | ||||
| 		logger.Error().Msg("Could not create the argo file for " + w.Workflow.Name) | ||||
| 		return "", err | ||||
| 	} | ||||
|  	return filename, nil | ||||
| 	return filename, nil | ||||
| } | ||||
|  | ||||
| // TODO implement this function | ||||
| func (w *WorflowDB) ExportToHelm(id string) error { | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Return if it exists a link where Destination is the same as comp_id | ||||
| func (w *WorflowDB) isDestination(comp_id string,link_id int) bool { | ||||
| 	 | ||||
| 	for i, link := range w.graph.Links{ | ||||
| 		if(i !=link_id && link.Destination.ID == comp_id){ | ||||
| 			return true | ||||
| 		} | ||||
| 	} | ||||
| 	 | ||||
| 	return false | ||||
| 	 | ||||
| } | ||||
|  | ||||
| // Return if it exists a link where Source is the same as comp_id | ||||
| func (w *WorflowDB) isSource(comp_id string, link_id int) bool { | ||||
| 	 | ||||
| 	for i, link := range  w.graph.Links{ | ||||
| 		if(i !=link_id && link.Source.ID == comp_id && !w.isDCLink(i)){ | ||||
| 			return true | ||||
| 		} | ||||
| 	} | ||||
| 	 | ||||
| 	return false | ||||
| 	 | ||||
| } | ||||
|  | ||||
|  | ||||
|  | ||||
| // returns either computing, data or storage | ||||
| func GetComponentType(component_id string) string { | ||||
| 	if libdata := oclib.LoadOne(oclib.LibDataEnum(oclib.PROCESSING_RESOURCE),component_id); libdata.Code == 200{ | ||||
| 		return "computing" | ||||
| 	} | ||||
| 	 | ||||
| 	if libdata := oclib.LoadOne(oclib.LibDataEnum(oclib.DATA_RESOURCE),component_id); libdata.Code == 200{ | ||||
| 		return "data" | ||||
| 	} | ||||
|  | ||||
| 	if libdata := oclib.LoadOne(oclib.LibDataEnum(oclib.STORAGE_RESOURCE),component_id); libdata.Code == 200{ | ||||
| 		return "storage" | ||||
| 	} | ||||
|  | ||||
| 	if libdata := oclib.LoadOne(oclib.LibDataEnum(oclib.DATACENTER_RESOURCE),component_id); libdata.Code == 200{ | ||||
| 		return "datacenter" | ||||
| 	} | ||||
|  | ||||
| 	if libdata := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE),component_id); libdata.Code == 200{ | ||||
| 		return "workflow" | ||||
| 	} | ||||
|  | ||||
| 	return "" | ||||
| } | ||||
|  | ||||
| // Returns a slice of id, in case the link is made of twice the same type of component | ||||
|  | ||||
| func (w *WorflowDB) getComponentByType(compType string, link graph.GraphLink) (ids []string){ | ||||
| 	if(GetComponentType(link.Source.ID) == compType){ | ||||
| 		ids = append(ids, link.Source.ID) | ||||
| 	} | ||||
| 	if(GetComponentType(link.Destination.ID) == compType){ | ||||
| 		ids = append(ids, link.Destination.ID) | ||||
| 	} | ||||
|  | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func (w *WorflowDB) isDCLink(link_id int) bool { | ||||
| 	link :=  w.graph.Links[link_id] | ||||
|  | ||||
| 	dest := w.graph.Items[link.Destination.ID] | ||||
| 	dest_id := dest.GetAbstractRessource().GetID() | ||||
| 	 | ||||
| 	source := w.graph.Items[link.Source.ID] | ||||
| 	source_id := source.GetAbstractRessource().GetID() | ||||
| 	 | ||||
| 	return IsDatacenter(dest_id) || IsDatacenter(source_id) | ||||
| } | ||||
|  | ||||
| func IsDatacenter(id string) bool { | ||||
| 	resource := oclib.LoadOne(oclib.LibDataEnum(oclib.DATACENTER_RESOURCE),id) | ||||
| 	 | ||||
| 	return resource.Code == 200  | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user