// A class that translates the informations held in the graph object // via its lists of components into an argo file, using the a list of // link ID to build the dag package workflow_builder import ( . "oc-monitord/models" "os" "slices" "strconv" "strings" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/models/resource_model" "cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" w "cloud.o-forge.io/core/oc-lib/models/workflow" "github.com/nwtgck/go-fakelish" "github.com/rs/zerolog" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "gopkg.in/yaml.v3" ) var logger zerolog.Logger type ArgoBuilder struct { OriginWorkflow w.Workflow Workflow Workflow Services *Service Timeout int } type Workflow struct { ApiVersion string `yaml:"apiVersion"` Kind string `yaml:"kind"` Metadata struct { Name string `yaml:"name"` } `yaml:"metadata"` Spec Spec `yaml:"spec,omitempty"` } type Spec struct { Entrypoint string `yaml:"entrypoint"` Arguments []Parameter `yaml:"arguments,omitempty"` Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"` Templates []Template `yaml:"templates"` Timeout int `yaml:"activeDeadlineSeconds,omitempty"` } func (b *ArgoBuilder) CreateDAG() (string, error) { // handle services by checking if there is only one processing with hostname and port b.createNginxVolumes() b.createTemplates() b.createDAGstep() b.createVolumes() if b.Timeout > 0 { b.Workflow.Spec.Timeout = b.Timeout } b.Workflow.Spec.Entrypoint = "dag" b.Workflow.ApiVersion = "argoproj.io/v1alpha1" b.Workflow.Kind = "Workflow" random_name := generateWfName() b.Workflow.Metadata.Name = "oc-monitor-" + random_name logger = oclib.GetLogger() yamlified, err := yaml.Marshal(b.Workflow) if err != nil { logger.Error().Msg("Could not transform object to yaml file") return "", err } // Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss current_timestamp := time.Now().Format("02_01_2006_150405") file_name := random_name + "_" + current_timestamp + ".yml" workflows_dir := "./argo_workflows/" err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660) if err != nil { logger.Error().Msg("Could not write the yaml file") return "", err } return file_name, nil } func (b *ArgoBuilder) createTemplates() { for _, comp := range b.getProcessings() { var command string var args string var env string comp_res := comp.Processing command = getStringValue(comp_res.AbstractResource, "command") args = getStringValue(comp_res.AbstractResource, "args") env = getStringValue(comp_res.AbstractResource, "env") image_name := strings.Split(command, " ")[0] // TODO : decide where to store the image name, GUI or models.computing.Image temp_container := Container{Image: image_name} // TODO : decide where to store the image name, GUI or models.computing.Image temp_container.Command = getComputingCommands(command) temp_container.Args = getComputingArgs(args, command) // Only for dev purpose, input_names := getComputingEnvironmentName(strings.Split(env, " ")) var inputs_container []Parameter for _, name := range input_names { inputs_container = append(inputs_container, Parameter{Name: name}) } argo_name := getArgoName(comp_res.GetName(), comp.ID) new_temp := Template{Name: argo_name, Container: temp_container} new_temp.Inputs.Parameters = inputs_container new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "workdir", MountPath: "/mnt/vol"}) // TODO : replace this with a search of the storage / data source name new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "nginx-demo", MountPath: "/usr/share/nginx"}) // Used for processing services' demo with nginx if (b.isService(comp.ID)){ serv := b.CreateService(comp) b.createService(serv, argo_name, comp.ID) new_temp.Metadata.Labels = make(map[string]string) new_temp.Metadata.Labels["app"] = "oc-service" // Construct the template for the k8s service and add a link in graph between k8s service and processing // if err != nil { // // TODO // } } b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, new_temp) } if b.Services != nil { b.addServiceToArgo() } } func (b *ArgoBuilder) createDAGstep() { new_dag := Dag{} for _, comp := range b.getProcessings() { comp_res := comp.Processing env := getStringValue(comp_res.AbstractResource, "env") unique_name := getArgoName(comp_res.GetName(), comp.ID) step := Task{Name: unique_name, Template: unique_name} comp_envs := getComputingEnvironment(strings.Split(env, " ")) for name, value := range comp_envs { step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{Name: name, Value: value}) } // retrieves the name (computing.name-computing.ID) step.Dependencies = b.getDependency(comp.ID) // Error : we use the component ID instead of the GraphItem ID -> store objects new_dag.Tasks = append(new_dag.Tasks, step) } if b.Services != nil { new_dag.Tasks = append(new_dag.Tasks, Task{Name:"workflow-service-pod", Template: "workflow-service-pod"}) } b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, Template{Name: "dag", Dag: new_dag}) } func (b *ArgoBuilder) createVolumes() { // For testing purposes we only declare one volume, mounted in each computing new_volume := VolumeClaimTemplate{} new_volume.Metadata.Name = "workdir" new_volume.Spec.AccessModes = []string{"ReadWriteOnce"} new_volume.Spec.Resources.Requests.Storage = "1Gi" b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) } // For demo purposes, until we implement the use of storage ressources func (b *ArgoBuilder) createNginxVolumes() { new_volume := VolumeClaimTemplate{} new_volume.Metadata.Name = "nginx-demo" new_volume.Spec.AccessModes = []string{"ReadWriteOnce"} new_volume.Spec.Resources.Requests.Storage = "1Gi" b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) } func (b *ArgoBuilder) getDependency(current_computing_id string) (dependencies []string) { for _, link := range b.OriginWorkflow.Graph.Links { if b.OriginWorkflow.Graph.Items[link.Source.ID].Processing == nil { continue } source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing if current_computing_id == link.Destination.ID && source != nil { dependency_name := getArgoName(source.GetName(), link.Source.ID) dependencies = append(dependencies, dependency_name) } } return } func getComputingCommands(user_input string) []string { user_input = removeImageName(user_input) if len(user_input) == 0 { return []string{} } return strings.Split(user_input, " ") } func getComputingArgs(user_input string, command string) (list_args []string) { if len(user_input) == 0 { return } args := strings.Split(user_input," ") // quickfix that might need improvement if strings.Contains(command, "sh -c") { list_args = append(list_args, strings.Join(args, " ")) return } list_args = append(list_args, args...) return } // Currently implements code to overcome problems in data structure func getComputingEnvironment(user_input []string) (map_env map[string]string) { logger := oclib.GetLogger() is_empty := len(user_input) == 0 is_empty_string := len(user_input) == 1 && user_input[0] == "" if is_empty || is_empty_string { return } if len(user_input) == 1 { user_input = strings.Split(user_input[0], ",") } map_env = make(map[string]string, 0) for _, str := range user_input { new_pair := strings.Split(str, "=") if len(new_pair) != 2 { logger.Error().Msg("Error extracting the environment variable from " + str) panic(0) } map_env[new_pair[0]] = new_pair[1] } return } func getComputingEnvironmentName(user_input []string) (list_names []string) { env_map := getComputingEnvironment(user_input) for name := range env_map { list_names = append(list_names, name) } return } func generateWfName() (Name string) { Name = fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8) return } func getArgoName(raw_name string, component_id string) (formatedName string) { formatedName = strings.ReplaceAll(raw_name, " ", "-") formatedName += "-" + component_id formatedName = strings.ToLower(formatedName) return } func removeImageName(user_input string) string { // First command is the name of the container for now if len(strings.Split(user_input, " ")) == 1 { return "" } slice_input := strings.Split(user_input, " ") new_slice := slice_input[1:] user_input = strings.Join(new_slice, " ") return user_input } // Return the graphItem containing a Processing resource, so that we have access to the ID of the graphItem in order to identify it in the links func (b *ArgoBuilder) getProcessings() (list_computings []graph.GraphItem) { for _, item := range b.OriginWorkflow.Graph.Items { if item.Processing != nil { list_computings = append(list_computings, item) } } return } // Pass a GraphItem's UUID and not the ID func (b *ArgoBuilder) IsProcessing(component_uuid string) bool { return slices.Contains(b.OriginWorkflow.Processings, component_uuid) } func getStringValue(comp resource_model.AbstractResource, key string) string { if res := comp.GetModelValue(key); res != nil { return res.(string) } return "" } func (b *ArgoBuilder) isService(id string) bool{ comp := b.OriginWorkflow.Graph.Items[id] if comp.Processing == nil { return false } _, is_exposed := comp.Processing.ResourceModel.Model["expose"] return is_exposed } func (b *ArgoBuilder) CreateService(processing graph.GraphItem) Service{ // model { // Type : "dict", // Value : { // "80" : { // "reverse" : "", // "PAT" : "34000" // }, // "344" : { // "reverse" : "", // "PAT" : "34400" // } // } // } new_service := Service{APIVersion: "v1", Kind: "Service", Metadata: Metadata{ Name: "workflow-service" , }, Spec: ServiceSpec{ Selector: map[string]string{"app": "oc-service"}, Ports: []ServicePort{ }, Type: "NodePort", }, } completeServicePorts(&new_service, processing) yamlified, _ := yaml.Marshal(new_service) x := string(yamlified) _ = x return new_service } func completeServicePorts(service *Service, processing graph.GraphItem) { contract := getExposeContract(processing.Processing.ResourceModel.Model["expose"]) for str_port,translation_dict := range contract{ port, err := strconv.ParseInt(str_port, 10, 64) if err != nil { logger.Error().Msg("Could not convert " + str_port + "to an int") return } if _, ok := translation_dict["PAT"]; ok{ port_translation, err := strconv.ParseInt(translation_dict["PAT"], 10, 64) if err != nil { logger.Error().Msg("Could not convert " + translation_dict["PAT"] + "to an int") return } new_port_translation := ServicePort{ Name: strings.ToLower(processing.Processing.Name) + processing.ID, Port: port_translation-30000, TargetPort: port, NodePort: port_translation, Protocol: "TCP", } service.Spec.Ports = append(service.Spec.Ports, new_port_translation) } } return } // TODO : refactor this method or the deserialization process in oc-lib to get rid of the mongo code func getExposeContract(expose resource_model.Model) map[string]map[string]string { contract := make(map[string]map[string]string,0) mapped_info := bson.M{} // var contract PortTranslation _ , byt, _ := bson.MarshalValue(expose.Value) bson.Unmarshal(byt,&mapped_info) for _,v := range mapped_info { port := v.(primitive.M)["Key"].(string) // exposed_port := map[string]interface{}{data["Key"] : ""} port_translation := v.(primitive.M)["Value"] contract[port] = map[string]string{} for _,v2 := range port_translation.(primitive.A) { if v2.(primitive.M)["Key"] == "reverse" { contract[port]["reverse"] = v2.(primitive.M)["Value"].(string) } if v2.(primitive.M)["Key"] == "PAT" { contract[port]["PAT"] = v2.(primitive.M)["Value"].(string) } } } return contract } // func getPortsFromModel(model map[string]resource_model.Model) (data []int) { // defer func() { // recover the panic // if r := recover(); r != nil { // for _, v := range model["expose"].Value.(map[string]interface{}) { // subMap := v.(map[string]interface{}) // for k2, v2 := range subMap { // if k2 == "PAT" { // data = append(data, v2.(int)) // } // } // } // } // }() // expose := model["expose"].Value // // sub := expose.([]primitive.A) // for _, item := range expose.(primitive.A) { // if doc, ok := item.(primitive.D); ok { // for v,k := range doc{ // key := k.Key // valueMap := make(map[string]interface{}) // if nestedArray, ok := elem.Value.(primitive.A); ok { // for _, nestedItem := range nestedArray { // if nestedDoc, ok := nestedItem.(primitive.D); ok { // for _, nestedElem := range nestedDoc { // valueMap[nestedElem.Key] = nestedElem.Value // } // } // } // } // } // } // } // return // } func (b *ArgoBuilder) createService(service Service, processing_name string, processing_id string) { if b.Services != nil{ b.Services.Spec.Ports = append(b.Services.Spec.Ports, service.Spec.Ports...) }else { b.Services = &service } b.addLabel(processing_name,processing_id) } func (b *ArgoBuilder) addLabel(name string, id string) { argo_name := getArgoName(name,id) for _, template := range b.Workflow.Spec.Templates{ if template.Name == argo_name{ template.Metadata.Labels["app"] = "service-workflow" return } } } func (b *ArgoBuilder) addServiceToArgo() error { service_manifest, err := yaml.Marshal(b.Services) if err != nil { logger.Error().Msg("Could not marshal service manifest") return err } service_template := Template{Name: "workflow-service-pod", Resource: ServiceResource{ Action: "create", SuccessCondition: "status.succeeded > 0", FailureCondition: "status.failed > 3", SetOwnerReference: true, Manifest: string(service_manifest), }, } b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, service_template) return nil }