From 1e4011d5b194b57e28710a711f786c7df9e547c2 Mon Sep 17 00:00:00 2001 From: pb Date: Mon, 16 Jun 2025 18:22:58 +0200 Subject: [PATCH] refactored and added some values for dev purposes, modified the values set for path and key depending on the read or write condition --- workflow_builder/argo_builder.go | 123 ++++++++++++++++++++----------- 1 file changed, 82 insertions(+), 41 deletions(-) diff --git a/workflow_builder/argo_builder.go b/workflow_builder/argo_builder.go index 03b9d38..4bb0ae2 100644 --- a/workflow_builder/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -8,6 +8,7 @@ import ( "fmt" "oc-monitord/conf" . "oc-monitord/models" + tools2 "oc-monitord/tools" "os" "strings" @@ -17,7 +18,9 @@ import ( "cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/resources" + "cloud.o-forge.io/core/oc-lib/models/utils" w "cloud.o-forge.io/core/oc-lib/models/workflow" + "cloud.o-forge.io/core/oc-lib/models/workflow/graph" "github.com/nwtgck/go-fakelish" "github.com/rs/zerolog" "gopkg.in/yaml.v3" @@ -73,7 +76,7 @@ func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, [ if b.Timeout > 0 { b.Workflow.Spec.Timeout = b.Timeout } - b.Workflow.Spec.ServiceAccountName = "sa-"+namespace + // b.Workflow.Spec.ServiceAccountName = "sa-"+namespace b.Workflow.Spec.Entrypoint = "dag" b.Workflow.ApiVersion = "argoproj.io/v1alpha1" b.Workflow.Kind = "Workflow" @@ -191,54 +194,23 @@ func (b *ArgoBuilder) createArgoTemplates(namespace string, template.Metadata.Labels = make(map[string]string) template.Metadata.Labels["app"] = "oc-service-" + processing.GetName() // Construct the template for the k8s service and add a link in graph between k8s service and processing } - related := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.Graph.IsStorage) + + related := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.Graph.IsStorage) // Retrieve all of the storage node linked to the processing for which we create the template for _, r := range related { storage := r.Node.(*resources.StorageResource) for _, linkToStorage := range r.Links { for _, rw := range linkToStorage.StorageLinkInfos { - art := Artifact{Path: template.ReplacePerEnv(rw.Source, linkToStorage.Env)} + var art Artifact + artifactBaseName := strings.Join(strings.Split(storage.GetName(), " "), "-") + "-" + strings.Replace(rw.FileName,".","-",-1) // Parameter/Artifact name must consist of alpha-numeric characters, '_' or '-' if rw.Write { - art.Name = storage.GetName() + "-" + rw.Destination + "-input-write" + art = Artifact{Path: template.ReplacePerEnv(rw.Source, linkToStorage.Env)} // When we are writing to the s3 the Path element is the path to the file in the pod + art.Name = artifactBaseName + "-input-write" } else { - art.Name = storage.GetName() + "-" + rw.Destination + "-input-read" + art = Artifact{Path: template.ReplacePerEnv(rw.Destination + "/" + rw.FileName, linkToStorage.Env)} // When we are reading from the s3 the Path element in pod should be the destination of the file + art.Name = artifactBaseName + "-input-read" } if storage.StorageType == enum.S3 { - art.S3 = &Key{ - Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env), - Insecure: true, // temporary - } - sel := storage.GetSelectedInstance() - if sel != nil { - if sel.(*resources.StorageResourceInstance).Credentials != nil { - tool, err := tools2.NewService(conf.GetConfig().Mode) - if err != nil || tool == nil { - logger.Error().Msg("Could not create the access secret") - } else { - id, err := tool.CreateAccessSecret(namespace, - sel.(*resources.StorageResourceInstance).Credentials.Login, - sel.(*resources.StorageResourceInstance).Credentials.Pass) - if err == nil { - art.S3.AccessKeySecret = &Secret{ - Name: id, - Key: "access-key", - } - art.S3.SecretKeySecret = &Secret{ - Name: id, - Key: "secret-key", - } - } - } - } - art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source+"/", "") - art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source, "") - splits := strings.Split(art.S3.EndPoint, "/") - if len(splits) > 1 { - art.S3.Bucket = splits[0] - art.S3.EndPoint = strings.Join(splits[1:], "/") - } else { - art.S3.Bucket = splits[0] - } - } + b.addS3annotations(&art, template, rw, linkToStorage, storage, namespace) } if rw.Write { template.Outputs.Artifacts = append(template.Inputs.Artifacts, art) @@ -264,6 +236,75 @@ func (b *ArgoBuilder) createArgoTemplates(namespace string, return volumes, firstItems, lastItems } +func (b *ArgoBuilder) addS3annotations(art *Artifact, template *Template, rw graph.StorageProcessingGraphLink, linkToStorage graph.GraphLink, storage *resources.StorageResource, namespace string) { + art.S3 = &Key{ + // Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env), + Insecure: true, // temporary + } + if rw.Write { + art.S3.Key = rw.Destination + "/" + rw.FileName + } else { + art.S3.Key = rw.Source + } + sel := storage.GetSelectedInstance() + // v0.1 : add the storage.Source to the s3 object + // v0.2 : test if the storage.Source exists in the configMap and quit if not + // v1 : v0.2 + if doesn't exist edit/create the configMap with the response from API call + if sel != nil { + b.addAuthInformation(sel, namespace, art) + art.S3.Bucket = "oc-bucket" // DEFAULT : will need to update this to create an unique + art.S3.EndPoint = sel.(*resources.StorageResourceInstance).Source + } +} + +func (*ArgoBuilder) addAuthInformation(sel utils.DBObject, namespace string, art *Artifact) { + if sel.(*resources.StorageResourceInstance).Credentials != nil { + tool, err := tools2.NewService(conf.GetConfig().Mode) + if err != nil || tool == nil { + logger.Error().Msg("Could not create the access secret") + } else { + id, err := tool.CreateAccessSecret(namespace, + sel.(*resources.StorageResourceInstance).Credentials.Login, + sel.(*resources.StorageResourceInstance).Credentials.Pass) + if err == nil { + art.S3.AccessKeySecret = &Secret{ + Name: id, + Key: "access-key", + } + art.S3.SecretKeySecret = &Secret{ + Name: id, + Key: "secret-key", + } + } + } + } + + // DELETE THIS AFTER FEAYURE IMPLEMENTATION + // CODE TO TEST THE BUCKET WITH SAME CREDENTIAL + + art.S3.AccessKeySecret = &Secret{ + Name: "argo-minio-secret", + Key: "accessKeySecret", + } + + art.S3.SecretKeySecret = &Secret{ + Name: "argo-minio-secret", + Key: "secretKeySecret", + } + // DELETE THIS AFTER FEAYURE IMPLEMENTATION + // CODE TO TEST THE BUCKET WITH SAME CREDENTIAL + + art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source+"/", "") + art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source, "") + splits := strings.Split(art.S3.EndPoint, "/") + if len(splits) > 1 { + art.S3.Bucket = splits[0] + art.S3.EndPoint = strings.Join(splits[1:], "/") + } else { + art.S3.Bucket = splits[0] + } +} + func (b *ArgoBuilder) addTaskToArgo(dag *Dag, graphItemID string, processing *resources.ProcessingResource, firstItems []string, lastItems []string) (*Dag, []string, []string) { unique_name := getArgoName(processing.GetName(), graphItemID)