// A class that translates the informations held in the graph object // via its lists of components into an argo file, using the a list of // link ID to build the dag package workflow_builder import ( "oc-monitord/models" . "oc-monitord/models" "os" "slices" "strconv" "strings" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/models/resource_model" "cloud.o-forge.io/core/oc-lib/models/resources/processing" "cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" w "cloud.o-forge.io/core/oc-lib/models/workflow" "github.com/nwtgck/go-fakelish" "github.com/rs/zerolog" "gopkg.in/yaml.v3" ) var logger zerolog.Logger type ServiceExposure int const ( PAT ServiceExposure = iota Reverse Both ) type ArgoBuilder struct { OriginWorkflow w.Workflow Workflow Workflow Services []Service Timeout int } type Workflow struct { Manifest Spec ArgoSpec `yaml:"spec,omitempty"` } type ArgoSpec struct { Entrypoint string `yaml:"entrypoint"` Arguments []Parameter `yaml:"arguments,omitempty"` Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"` Templates []Template `yaml:"templates"` Timeout int `yaml:"activeDeadlineSeconds,omitempty"` } func (b *ArgoBuilder) CreateDAG() (string, error) { // handle services by checking if there is only one processing with hostname and port b.createTemplates() b.createDAGstep() b.createVolumes() if b.Timeout > 0 { b.Workflow.Spec.Timeout = b.Timeout } b.Workflow.Spec.Entrypoint = "dag" b.Workflow.Manifest.ApiVersion = "argoproj.io/v1alpha1" b.Workflow.Kind = "Workflow" random_name := generateWfName() b.Workflow.Metadata.Name = "oc-monitor-" + random_name logger = oclib.GetLogger() yamlified, err := yaml.Marshal(b.Workflow) if err != nil { logger.Error().Msg("Could not transform object to yaml file") return "", err } // Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss current_timestamp := time.Now().Format("02_01_2006_150405") file_name := random_name + "_" + current_timestamp + ".yml" workflows_dir := "./argo_workflows/" err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660) if err != nil { logger.Error().Msg("Could not write the yaml file") return "", err } return file_name, nil } func (b *ArgoBuilder) createTemplates() { for _, comp := range b.getProcessings() { var command string var args string var env string var serv models.Service comp_res := comp.Processing command = getStringValue(comp_res.AbstractResource, "command") args = getStringValue(comp_res.AbstractResource, "args") env = getStringValue(comp_res.AbstractResource, "env") image_name := strings.Split(command, " ")[0] // TODO : decide where to store the image name, GUI or models.computing.Image temp_container := Container{Image: image_name} // TODO : decide where to store the image name, GUI or models.computing.Image temp_container.Command = getComputingCommands(command) temp_container.Args = getComputingArgs(args, command) // Only for dev purpose, input_names := getComputingEnvironmentName(strings.Split(env, " ")) var inputs_container []Parameter for _, name := range input_names { inputs_container = append(inputs_container, Parameter{Name: name}) } argo_name := getArgoName(comp_res.GetName(), comp.ID) new_temp := Template{Name: argo_name, Container: temp_container} new_temp.Inputs.Parameters = inputs_container new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "workdir", MountPath: "/mnt/vol"}) // TODO : replace this with a search of the storage / data source name if (b.isService(comp.ID)){ serv_type := getServiceExposure(*comp.Processing) if serv_type == PAT || serv_type == Both{ serv = b.CreateKubeService(comp, NodePort) b.addKubeServiceToWorkflow(serv, argo_name, comp.ID) new_temp.Metadata.Labels = make(map[string]string) new_temp.Metadata.Labels["app"] = serv.Spec.Selector["app"] // Construct the template for the k8s service and add a link in graph between k8s service and processing b.addServiceToArgo(serv) ingress := b.CreateIngress(comp,serv) b.addIngressToWorfklow(ingress, argo_name, comp.ID) } if serv_type == Reverse || serv_type == Both{ serv = b.CreateKubeService(comp, ClusterIP) // create ingress by passing the service and the processing (or reverse) b.addKubeServiceToWorkflow(serv, argo_name, comp.ID) new_temp.Metadata.Labels = make(map[string]string) new_temp.Metadata.Labels["app"] = serv.Spec.Selector["app"] // Construct the template for the k8s service and add a link in graph between k8s service and processing b.addServiceToArgo(serv) } // if err != nil { // // TODO // } } b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, new_temp) } } func (b *ArgoBuilder) createDAGstep() { new_dag := Dag{} for _, comp := range b.getProcessings() { comp_res := comp.Processing env := getStringValue(comp_res.AbstractResource, "env") unique_name := getArgoName(comp_res.GetName(), comp.ID) step := Task{Name: unique_name, Template: unique_name} comp_envs := getComputingEnvironment(strings.Split(env, " ")) for name, value := range comp_envs { step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{Name: name, Value: value}) } // retrieves the name (computing.name-computing.ID) step.Dependencies = b.getDependency(comp.ID) // Error : we use the component ID instead of the GraphItem ID -> store objects new_dag.Tasks = append(new_dag.Tasks, step) } for i, _ := range b.Services { name := "workflow-service-pod-"+strconv.Itoa(i + 1) new_dag.Tasks = append(new_dag.Tasks, Task{Name: name , Template: name}) } b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, Template{Name: "dag", Dag: new_dag}) } func (b *ArgoBuilder) createVolumes() { // For testing purposes we only declare one volume, mounted in each computing new_volume := VolumeClaimTemplate{} new_volume.Metadata.Name = "workdir" new_volume.Spec.AccessModes = []string{"ReadWriteOnce"} new_volume.Spec.Resources.Requests.Storage = "1Gi" b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) } func (b *ArgoBuilder) getDependency(current_computing_id string) (dependencies []string) { for _, link := range b.OriginWorkflow.Graph.Links { if b.OriginWorkflow.Graph.Items[link.Source.ID].Processing == nil { continue } source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing if current_computing_id == link.Destination.ID && source != nil { dependency_name := getArgoName(source.GetName(), link.Source.ID) dependencies = append(dependencies, dependency_name) } } return } func getComputingCommands(user_input string) []string { user_input = removeImageName(user_input) if len(user_input) == 0 { return []string{} } return strings.Split(user_input, " ") } func getComputingArgs(user_input string, command string) (list_args []string) { if len(user_input) == 0 { return } args := strings.Split(user_input," ") // quickfix that might need improvement if strings.Contains(command, "sh -c") { list_args = append(list_args, strings.Join(args, " ")) return } list_args = append(list_args, args...) return } // Currently implements code to overcome problems in data structure func getComputingEnvironment(user_input []string) (map_env map[string]string) { logger := oclib.GetLogger() is_empty := len(user_input) == 0 is_empty_string := len(user_input) == 1 && user_input[0] == "" if is_empty || is_empty_string { return } if len(user_input) == 1 { user_input = strings.Split(user_input[0], ",") } map_env = make(map[string]string, 0) for _, str := range user_input { new_pair := strings.Split(str, "=") if len(new_pair) != 2 { logger.Error().Msg("Error extracting the environment variable from " + str) panic(0) } map_env[new_pair[0]] = new_pair[1] } return } func getComputingEnvironmentName(user_input []string) (list_names []string) { env_map := getComputingEnvironment(user_input) for name := range env_map { list_names = append(list_names, name) } return } func generateWfName() (Name string) { Name = fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8) return } func getArgoName(raw_name string, component_id string) (formatedName string) { formatedName = strings.ReplaceAll(raw_name, " ", "-") formatedName += "-" + component_id formatedName = strings.ToLower(formatedName) return } func removeImageName(user_input string) string { // First command is the name of the container for now if len(strings.Split(user_input, " ")) == 1 { return "" } slice_input := strings.Split(user_input, " ") new_slice := slice_input[1:] user_input = strings.Join(new_slice, " ") return user_input } // Return the graphItem containing a Processing resource, so that we have access to the ID of the graphItem in order to identify it in the links func (b *ArgoBuilder) getProcessings() (list_computings []graph.GraphItem) { for _, item := range b.OriginWorkflow.Graph.Items { if item.Processing != nil { list_computings = append(list_computings, item) } } return } // Pass a GraphItem's UUID and not the ID func (b *ArgoBuilder) IsProcessing(component_uuid string) bool { return slices.Contains(b.OriginWorkflow.Processings, component_uuid) } func getStringValue(comp resource_model.AbstractResource, key string) string { if res := comp.GetModelValue(key); res != nil { return res.(string) } return "" } func (b *ArgoBuilder) isService(id string) bool{ comp := b.OriginWorkflow.Graph.Items[id] if comp.Processing == nil { return false } _, is_exposed := comp.Processing.ResourceModel.Model["expose"] return is_exposed } func getServiceExposure(service processing.ProcessingResource) ServiceExposure{ var exposure_type ServiceExposure contract := getExposeContract(service.ResourceModel.Model["expose"]) _, pat := contract["PAT"] _, reverse := contract["reverse"] if pat && reverse { exposure_type= Both } if pat { exposure_type = PAT } if reverse{ exposure_type = Reverse } return exposure_type } func (b *ArgoBuilder) CreateIngress(processing processing.ProcessingResource, service Service) Ingress{ contract := getExposeContract(processing.ResourceModel.Model["expose"]) new_ingress := models.NewIngress(contract,service.Metadata.Name) return new_ingress }