// A class that translates the informations held in the graph object // via its lists of components into an argo file, using the a list of // link ID to build the dag package workflow_builder import ( "errors" "fmt" "oc-monitord/conf" "oc-monitord/models" . "oc-monitord/models" "os" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/models/resources" w "cloud.o-forge.io/core/oc-lib/models/workflow" "github.com/nwtgck/go-fakelish" "github.com/rs/zerolog" "gopkg.in/yaml.v3" ) var logger zerolog.Logger type ArgoBuilder struct { OriginWorkflow *w.Workflow Workflow *models.Workflow Services []*Service Timeout int RemotePeers []string } // TODO: found on a processing instance linked to storage // add s3, gcs, azure, etc if needed on a link between processing and storage func (b *ArgoBuilder) CreateDAG(namespace string, write bool) (int, []string, []string, error) { logger = logs.GetLogger() logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items)) // handle services by checking if there is only one processing with hostname and port firstItems, lastItems, volumes := b.createTemplates(namespace) b.createVolumes(volumes) if b.Timeout > 0 { b.Workflow.Spec.Timeout = b.Timeout } b.Workflow.Spec.ServiceAccountName = "sa-" + namespace b.Workflow.Spec.Entrypoint = "dag" b.Workflow.ApiVersion = "argoproj.io/v1alpha1" b.Workflow.Kind = "Workflow" return len(b.Workflow.GetDag().Tasks), firstItems, lastItems, nil } func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []models.VolumeMount) { volumes := []models.VolumeMount{} firstItems := []string{} lastItems := []string{} items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) logger.Info().Msg(fmt.Sprint("Creating templates", len(items))) for _, item := range items { instance := item.Processing.GetSelectedInstance() logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance)) if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil { logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName()) return firstItems, lastItems, volumes } volumes, firstItems, lastItems = b.createArgoTemplates(namespace, item.ID, item.Processing, volumes, firstItems, lastItems) } wfDeps := models.NewWorkflowDependancies() for _, workflowID := range b.OriginWorkflow.Workflows { b.createWorkflowArgoTemplate(workflowID, namespace, wfDeps) } wfDeps.BindRelatedTasks(b.Workflow.GetDag()) wfDeps.BindFirstTasks(b.OriginWorkflow.GetDependencies, b.Workflow.GetDag()) if b.Services != nil { dag := b.Workflow.GetDag() dag.Tasks = append(dag.Tasks, Task{Name: "workflow-service-pod", Template: "workflow-service-pod"}) b.addServiceToArgo() } return firstItems, lastItems, volumes } func (b *ArgoBuilder) createWorkflowArgoTemplate( workflowID string, namespace string, wfDeps *models.WorkflowsDependancies, ) { realWorkflow, code, err := w.NewAccessor(nil).LoadOne(workflowID) if code != 200 { logger.Error().Msg("Error loading the workflow : " + err.Error()) return } subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Workflow: &models.Workflow{}, Timeout: b.Timeout} _, fi, li, err := subBuilder.CreateDAG(namespace, false) if err != nil { logger.Error().Msg("Error creating the subworkflow : " + err.Error()) return } wfDeps.FirstWfTasks[workflowID] = fi if depsOfIds := subBuilder.OriginWorkflow.IsDependancy(workflowID); len(depsOfIds) > 0 { // IS BEFORE wfDeps.LastWfTasks[workflowID] = li wfDeps.RelatedWfTasks[workflowID] = models.TransformDepsToArgo(depsOfIds) } subDag := subBuilder.Workflow.GetDag() d := b.Workflow.GetDag() d.Tasks = append(d.Tasks, subDag.Tasks...) // add the tasks of the subworkflow to the main workflow b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, subBuilder.Workflow.Spec.Templates...) b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, subBuilder.Workflow.Spec.Volumes...) b.Workflow.Spec.Arguments = append(b.Workflow.Spec.Arguments, subBuilder.Workflow.Spec.Arguments...) b.Services = append(b.Services, subBuilder.Services...) } func (b *ArgoBuilder) createArgoTemplates( namespace string, id string, processing *resources.ProcessingResource, volumes []models.VolumeMount, firstItems []string, lastItems []string, ) ([]models.VolumeMount, []string, []string) { _, firstItems, lastItems = NewTask(processing.Name, id).BindToArgo(b.Workflow.GetDag(), id, b.OriginWorkflow, processing, firstItems, lastItems) template := &Template{Name: models.GetArgoName(processing.GetName(), id)} logger.Info().Msg(fmt.Sprint("Creating template for", template.Name)) template.CreateContainer(processing, b.Workflow.GetDag()) if err := b.RepartiteProcess(*processing, id, template, namespace); err != nil { logger.Error().Msg(fmt.Sprint("Problem to sets up repartition expected %v", err.Error())) return volumes, firstItems, lastItems } // get datacenter from the processing if processing.IsService { b.CreateService(id, processing) template.Metadata.Labels = make(map[string]string) template.Metadata.Labels["app"] = "oc-service-" + processing.GetName() // Construct the template for the k8s service and add a link in graph between k8s service and processing } b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, *template) return volumes, firstItems, lastItems } func (b *ArgoBuilder) createVolumes(volumes []models.VolumeMount) { // TODO : one think about remote volume but TG for _, volume := range volumes { volume.BindToArgo(b.Workflow) } } // Verify if a processing resource is attached to another Compute than the one hosting // the current Open Cloud instance. If true return the peer ID to contact func (b *ArgoBuilder) RepartiteProcess(processing resources.ProcessingResource, graphID string, template *models.Template, namespace string) error { computeAttached := b.OriginWorkflow.GetByRelatedProcessing(processing.GetID(), b.OriginWorkflow.Graph.IsCompute) if len(computeAttached) == 0 { return errors.New("No compute was found attached to processing " + processing.Name + " : " + processing.UUID) } // Creates an accessor srtictly for Peer Collection for _, related := range computeAttached { res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil).LoadOne(related.Node.GetCreatorID()) if res.Err != "" { return errors.New(res.Err) } peer := *res.ToPeer() isNotReparted := peer.State == 1 logger.Info().Msg(fmt.Sprint("Result IsMySelf for ", peer.UUID, " : ", isNotReparted)) if !(isNotReparted) { logger.Debug().Msg("Reparted processing, on " + peer.UUID) b.RemotePeers = append(b.RemotePeers, peer.UUID) template.AddAdmiraltyAnnotations(peer.UUID, namespace) } } return nil } // Execute the last actions once the YAML file for the Argo Workflow is created func (b *ArgoBuilder) CompleteBuild(namespace string) (string, error) { logger.Info().Msg("DEV :: Completing build") setter := AdmiraltySetter{Id: namespace} // Setup admiralty for each node for _, peer := range b.RemotePeers { logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer)) setter.InitializeAdmiralty(conf.GetConfig().PeerID, peer) } // Generate the YAML file random_name := fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8) b.Workflow.Metadata.Name = "oc-monitor-" + random_name logger = oclib.GetLogger() yamlified, err := yaml.Marshal(b.Workflow) if err != nil { logger.Error().Msg("Could not transform object to yaml file") return "", err } // Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss current_timestamp := time.Now().Format("02_01_2006_150405") file_name := random_name + "_" + current_timestamp + ".yml" workflows_dir := "./argo_workflows/" err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660) if err != nil { logger.Error().Msg("Could not write the yaml file") return "", err } return workflows_dir + file_name, nil }