diff --git a/go.mod b/go.mod index 74abb7d..ab7db4e 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.1 toolchain go1.23.3 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20250808141553-f4b0cf5683de + cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d github.com/akamensky/argparse v1.4.0 github.com/google/uuid v1.6.0 github.com/goraz/onion v0.1.3 diff --git a/go.sum b/go.sum index c7ec012..788f750 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,10 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20250808140536-e7a71188a3b5 h1:bmEG0M99WXWCH cloud.o-forge.io/core/oc-lib v0.0.0-20250808140536-e7a71188a3b5/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= cloud.o-forge.io/core/oc-lib v0.0.0-20250808141553-f4b0cf5683de h1:s47eEnWRCjBMOxbec5ROHztuwu0Zo7MuXgqWizgkiXU= cloud.o-forge.io/core/oc-lib v0.0.0-20250808141553-f4b0cf5683de/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= +cloud.o-forge.io/core/oc-lib v0.0.0-20260113155325-5cdfc28d2f51 h1:jlSEprNaUBe628uP9a9TrJ16Q5Ej6OxHlAKNtrHrN2o= +cloud.o-forge.io/core/oc-lib v0.0.0-20260113155325-5cdfc28d2f51/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= +cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d h1:6oGSN4Fb+H7LNVbUEN7vaDtWBHZTdd2Y1BkBdZ7MLXE= +cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc= github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= diff --git a/main.go b/main.go index 721a23b..116dbaf 100644 --- a/main.go +++ b/main.go @@ -89,7 +89,7 @@ func main() { logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API") } - builder, _, err := new_wf.ExportToArgo(exec.ExecutionsID, conf.GetConfig().Timeout) // Removed stepMax so far, I don't know if we need it anymore + builder, _, err := new_wf.ExportToArgo(exec, conf.GetConfig().Timeout) // Removed stepMax so far, I don't know if we need it anymore if err != nil { logger.Error().Msg("Could not create the Argo file for " + conf.GetConfig().WorkflowID) logger.Error().Msg(err.Error()) @@ -110,20 +110,20 @@ func main() { // Executed in a k8s environment logger.Info().Msg("Executes inside a k8s") // executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID() - executeInside(conf.GetConfig().ExecutionID, exec.ExecutionsID, argoFilePath) + executeInside(exec.ExecutionsID, argoFilePath) } } // So far we only log the output from -func executeInside(execID string, ns string, argo_file_path string) { +func executeInside(ns string, argo_file_path string) { t, err := tools2.NewService(conf.GetConfig().Mode) if err != nil { logger.Error().Msg("Could not create KubernetesTool") return } - + name, err := t.CreateArgoWorkflow(argo_file_path, ns) - // _ = name + // _ = name if err != nil { logger.Error().Msg("Could not create argo workflow : " + err.Error()) logger.Info().Msg(fmt.Sprint("CA :" + conf.GetConfig().KubeCA)) @@ -152,20 +152,20 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { var wg sync.WaitGroup var err error - logger.Debug().Msg("executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID ) + logger.Debug().Msg("executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID) cmdSubmit := exec.Command("argo", "submit", "--watch", argo_file_path, "--serviceaccount", "sa-"+conf.GetConfig().ExecutionID, "-n", conf.GetConfig().ExecutionID) if stdoutSubmit, err = cmdSubmit.StdoutPipe(); err != nil { wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error()) return } - - cmdLogs := exec.Command("argo", "logs", "oc-monitor-"+workflowName, "-n", conf.GetConfig().ExecutionID, "--follow","--no-color") + + cmdLogs := exec.Command("argo", "logs", "oc-monitor-"+workflowName, "-n", conf.GetConfig().ExecutionID, "--follow", "--no-color") if stdoutLogs, err = cmdLogs.StdoutPipe(); err != nil { wf_logger.Error().Msg("Could not retrieve stdoutpipe for 'argo logs'" + err.Error()) return } - + var steps []string for _, template := range workflow.Spec.Templates { steps = append(steps, template.Name) @@ -186,7 +186,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { logger.Info().Msg("Running argo logs") if err := cmdLogs.Run(); err != nil { wf_logger.Error().Msg("Could not run '" + strings.Join(cmdLogs.Args, " ") + "'") - + wf_logger.Fatal().Msg(err.Error() + bufio.NewScanner(stderrLogs).Text()) } @@ -201,7 +201,6 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { wg.Wait() } - func loadConfig(is_k8s bool, parser *argparse.Parser) { var o *onion.Onion o = initOnion(o) @@ -305,7 +304,6 @@ func getContainerName(argo_file string) string { return container_name } - func updateStatus(status string, log string) { exec_id := conf.GetConfig().ExecutionID diff --git a/models/template.go b/models/template.go index 3b5ea3d..19b4d9e 100644 --- a/models/template.go +++ b/models/template.go @@ -1,10 +1,16 @@ package models import ( + "encoding/json" + "fmt" + "oc-monitord/conf" "strings" "cloud.o-forge.io/core/oc-lib/models/common/models" "cloud.o-forge.io/core/oc-lib/models/resources" + "cloud.o-forge.io/core/oc-lib/models/resources/native_tools" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" + "cloud.o-forge.io/core/oc-lib/tools" ) type Parameter struct { @@ -96,8 +102,34 @@ type Template struct { Resource ServiceResource `yaml:"resource,omitempty"` } -func (template *Template) CreateContainer(processing *resources.ProcessingResource, dag *Dag) { - instance := processing.GetSelectedInstance() +func (template *Template) CreateEventContainer(exec *workflow_execution.WorkflowExecution, nt *resources.NativeTool, dag *Dag) { + container := Container{Image: "natsio/nats-box"} + container.Command = []string{"sh", "-c"} // all is bash + + var event *native_tools.WorkflowEventParams + b, err := json.Marshal(nt.Params) + if err != nil { + fmt.Println(err) + return + } + err = json.Unmarshal(b, event) + if err != nil { + fmt.Println(err) + return + } + if event != nil { + container.Args = append(container.Args, "nats pub --server "+conf.GetConfig().NatsURL+":4222 "+tools.WORKFLOW_EVENT.GenerateKey(tools.WORKFLOW_EVENT.String())+" '{\"workflow_id\":\""+event.WorkflowResourceID+"\"}'") + container.Args = []string{strings.Join(container.Args, " ")} + template.Container = container + } +} + +func (template *Template) CreateContainer(exec *workflow_execution.WorkflowExecution, processing *resources.ProcessingResource, dag *Dag) { + index := 0 + if d, ok := exec.SelectedInstances[processing.GetID()]; ok { + index = d + } + instance := processing.GetSelectedInstance(&index) if instance == nil { return } diff --git a/workflow_builder/argo_builder.go b/workflow_builder/argo_builder.go index 780b677..27fbb3e 100644 --- a/workflow_builder/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -18,8 +18,11 @@ import ( "cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/resources" + "cloud.o-forge.io/core/oc-lib/models/resources/native_tools" w "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workflow/graph" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" + "cloud.o-forge.io/core/oc-lib/tools" "github.com/nwtgck/go-fakelish" "github.com/rs/zerolog" "gopkg.in/yaml.v3" @@ -55,22 +58,22 @@ func (b *Workflow) getDag() *Dag { } type Spec struct { - ServiceAccountName string `yaml:"serviceAccountName,omitempty"` - Entrypoint string `yaml:"entrypoint"` - Arguments []Parameter `yaml:"arguments,omitempty"` - Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"` - Templates []Template `yaml:"templates"` - Timeout int `yaml:"activeDeadlineSeconds,omitempty"` + ServiceAccountName string `yaml:"serviceAccountName,omitempty"` + Entrypoint string `yaml:"entrypoint"` + Arguments []Parameter `yaml:"arguments,omitempty"` + Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"` + Templates []Template `yaml:"templates"` + Timeout int `yaml:"activeDeadlineSeconds,omitempty"` } // TODO: found on a processing instance linked to storage // add s3, gcs, azure, etc if needed on a link between processing and storage -func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, []string, error) { +func (b *ArgoBuilder) CreateDAG(exec *workflow_execution.WorkflowExecution, namespace string, write bool) (int, []string, []string, error) { logger = logs.GetLogger() logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items)) // handle services by checking if there is only one processing with hostname and port - firstItems, lastItems, volumes := b.createTemplates(namespace) - b.createVolumes(volumes) + firstItems, lastItems, volumes := b.createTemplates(exec, namespace) + b.createVolumes(exec, volumes) if b.Timeout > 0 { b.Workflow.Spec.Timeout = b.Timeout @@ -82,27 +85,44 @@ func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, [ if !write { return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil } - - - return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil + + return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil } - -func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []VolumeMount) { + +func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution, namespace string) ([]string, []string, []VolumeMount) { volumes := []VolumeMount{} firstItems := []string{} lastItems := []string{} - items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) - logger.Info().Msg(fmt.Sprint("Creating templates", len(items))) for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) { - instance := item.Processing.GetSelectedInstance() + index := 0 + _, res := item.GetResource() + if d, ok := exec.SelectedInstances[res.GetID()]; ok { + index = d + } + instance := item.Processing.GetSelectedInstance(&index) logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance)) if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil { logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName()) return firstItems, lastItems, volumes } - volumes, firstItems, lastItems = b.createArgoTemplates(namespace, + volumes, firstItems, lastItems = b.createArgoTemplates(exec, + namespace, item.ID, item.Processing, volumes, firstItems, lastItems) } + for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsNativeTool) { + if item.NativeTool.Kind != int(native_tools.WORKFLOW_EVENT) { + continue + } + index := 0 + _, res := item.GetResource() + if d, ok := exec.SelectedInstances[res.GetID()]; ok { + index = d + } + instance := item.NativeTool.GetSelectedInstance(&index) + logger.Info().Msg(fmt.Sprint("Creating template for", item.NativeTool.GetName(), instance)) + volumes, firstItems, lastItems = b.createArgoTemplates(exec, + namespace, item.ID, item.NativeTool, volumes, firstItems, lastItems) + } firstWfTasks := map[string][]string{} latestWfTasks := map[string][]string{} relatedWfTasks := map[string][]string{} @@ -113,7 +133,7 @@ func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []V continue } subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Timeout: b.Timeout} - _, fi, li, err := subBuilder.CreateDAG(namespace, false) + _, fi, li, err := subBuilder.CreateDAG(exec, namespace, false) if err != nil { logger.Error().Msg("Error creating the subworkflow : " + err.Error()) continue @@ -170,36 +190,42 @@ func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []V return firstItems, lastItems, volumes } -func (b *ArgoBuilder) createArgoTemplates(namespace string, +func (b *ArgoBuilder) createArgoTemplates( + exec *workflow_execution.WorkflowExecution, + namespace string, id string, - processing *resources.ProcessingResource, + processing resources.ResourceInterface, volumes []VolumeMount, firstItems []string, lastItems []string) ([]VolumeMount, []string, []string) { - _, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems) + _, firstItems, lastItems = b.addTaskToArgo(exec, b.Workflow.getDag(), id, processing, firstItems, lastItems) template := &Template{Name: getArgoName(processing.GetName(), id)} logger.Info().Msg(fmt.Sprint("Creating template for", template.Name)) - isReparted, peerId := b.isProcessingReparted(*processing, id) - template.CreateContainer(processing, b.Workflow.getDag()) - + isReparted, peerId := b.isProcessingReparted(processing, id) + if processing.GetType() == tools.PROCESSING_RESOURCE.String() { + template.CreateContainer(exec, processing.(*resources.ProcessingResource), b.Workflow.getDag()) + } else if processing.GetType() == tools.NATIVE_TOOL.String() { + template.CreateEventContainer(exec, processing.(*resources.NativeTool), b.Workflow.getDag()) + } + if isReparted { logger.Debug().Msg("Reparted processing, on " + peerId) b.RemotePeers = append(b.RemotePeers, peerId) template.AddAdmiraltyAnnotations(peerId) } // get datacenter from the processing - if processing.IsService { - b.CreateService(id, processing) + if processing.GetType() == tools.PROCESSING_RESOURCE.String() && processing.(*resources.ProcessingResource).IsService { + b.CreateService(exec, id, processing) template.Metadata.Labels = make(map[string]string) template.Metadata.Labels["app"] = "oc-service-" + processing.GetName() // Construct the template for the k8s service and add a link in graph between k8s service and processing } - volumes = b.addStorageAnnotations(id, template, namespace, volumes) + volumes = b.addStorageAnnotations(exec, id, template, namespace, volumes) b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, *template) return volumes, firstItems, lastItems } -func (b *ArgoBuilder) addStorageAnnotations(id string, template *Template, namespace string, volumes []VolumeMount) []VolumeMount { +func (b *ArgoBuilder) addStorageAnnotations(exec *workflow_execution.WorkflowExecution, id string, template *Template, namespace string, volumes []VolumeMount) []VolumeMount { related := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.Graph.IsStorage) // Retrieve all of the storage node linked to the processing for which we create the template for _, r := range related { @@ -218,7 +244,7 @@ func (b *ArgoBuilder) addStorageAnnotations(id string, template *Template, names if storage.StorageType == enum.S3 { - b.addS3annotations(&art, template, rw, linkToStorage, storage, namespace) + b.addS3annotations(exec, &art, template, rw, linkToStorage, storage, namespace) } if rw.Write { @@ -229,8 +255,8 @@ func (b *ArgoBuilder) addStorageAnnotations(id string, template *Template, names } } index := 0 - if storage.SelectedInstanceIndex != nil && (*storage.SelectedInstanceIndex) >= 0 { - index = *storage.SelectedInstanceIndex + if s, ok := exec.SelectedInstances[storage.GetID()]; ok { + index = s } s := storage.Instances[index] if s.Local { @@ -242,10 +268,10 @@ func (b *ArgoBuilder) addStorageAnnotations(id string, template *Template, names } } return volumes -} +} + +func (b *ArgoBuilder) addS3annotations(exec *workflow_execution.WorkflowExecution, art *Artifact, template *Template, rw graph.StorageProcessingGraphLink, linkToStorage graph.GraphLink, storage *resources.StorageResource, namespace string) { -func (b *ArgoBuilder) addS3annotations(art *Artifact, template *Template, rw graph.StorageProcessingGraphLink, linkToStorage graph.GraphLink, storage *resources.StorageResource, namespace string) { - art.S3 = &Key{ // Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env), Insecure: true, // temporary @@ -253,58 +279,65 @@ func (b *ArgoBuilder) addS3annotations(art *Artifact, template *Template, rw gra if rw.Write { art.S3.Key = rw.Destination + "/" + rw.FileName } else { - art.S3.Key = rw.Source + art.S3.Key = rw.Source } - sel := storage.GetSelectedInstance() + index := 0 + if d, ok := exec.SelectedInstances[storage.GetID()]; ok { + index = d + } + sel := storage.GetSelectedInstance(&index) // v0.1 : add the storage.Source to the s3 object // v0.2 : test if the storage.Source exists in the configMap and quit if not // v1 : v0.2 + if doesn't exist edit/create the configMap with the response from API call if sel != nil { - b.addAuthInformation(storage, namespace, art) - art.S3.Bucket = namespace // DEFAULT : will need to update this to create an unique - art.S3.EndPoint = sel.(*resources.StorageResourceInstance).Source + b.addAuthInformation(exec, storage, namespace, art) + art.S3.Bucket = namespace // DEFAULT : will need to update this to create an unique + art.S3.EndPoint = sel.(*resources.StorageResourceInstance).Source } } +func (b *ArgoBuilder) addAuthInformation(exec *workflow_execution.WorkflowExecution, storage *resources.StorageResource, namespace string, art *Artifact) { + index := 0 + if d, ok := exec.SelectedInstances[storage.GetID()]; ok { + index = d + } + + sel := storage.GetSelectedInstance(&index) -func (b *ArgoBuilder) addAuthInformation(storage *resources.StorageResource, namespace string, art *Artifact) { - - sel := storage.GetSelectedInstance() - tool, err := tools2.NewService(conf.GetConfig().Mode) if err != nil || tool == nil { logger.Fatal().Msg("Could not create the access secret :" + err.Error()) - } - - secretName, err := b.SetupS3Credentials(storage, namespace, tool) // this method return should be updated once we have decided how to retrieve credentials - - if err == nil { - art.S3.AccessKeySecret = &Secret{ - Name: secretName, - Key: "access-key", - } - art.S3.SecretKeySecret = &Secret{ - Name: secretName, - Key: "secret-key", + } + + secretName, err := b.SetupS3Credentials(storage, namespace, tool) // this method return should be updated once we have decided how to retrieve credentials + + if err == nil { + art.S3.AccessKeySecret = &Secret{ + Name: secretName, + Key: "access-key", + } + art.S3.SecretKeySecret = &Secret{ + Name: secretName, + Key: "secret-key", } } - + art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source+"/", "") art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source, "") splits := strings.Split(art.S3.EndPoint, "/") if len(splits) > 1 { art.S3.Bucket = splits[0] art.S3.EndPoint = strings.Join(splits[1:], "/") - } else { - art.S3.Bucket = splits[0] + } else { + art.S3.Bucket = splits[0] } } func (b *ArgoBuilder) SetupS3Credentials(storage *resources.StorageResource, namespace string, tool tools2.Tool) (string, error) { s := tool.GetS3Secret(storage.UUID, namespace) // var s *v1.Secret - accessKey, secretKey := retrieveMinioCredential("peer",namespace) - + accessKey, secretKey := retrieveMinioCredential("peer", namespace) + if s == nil { id, err := tool.CreateAccessSecret( accessKey, @@ -319,22 +352,24 @@ func (b *ArgoBuilder) SetupS3Credentials(storage *resources.StorageResource, nam return id, nil } - - // s.Name = "toto" return s.Name, nil - + } // This method needs to evolve to an API call to the peer passed as a parameter -func retrieveMinioCredential(peer string, namespace string) (string,string) { +func retrieveMinioCredential(peer string, namespace string) (string, string) { return "hF9wRGog75JuMdshWeEZ", "OwXXJkVQyb5l1aVPdOegKOtDJGoP1dJYeo8O7mDW" } -func (b *ArgoBuilder) addTaskToArgo(dag *Dag, graphItemID string, processing *resources.ProcessingResource, +func (b *ArgoBuilder) addTaskToArgo(exec *workflow_execution.WorkflowExecution, dag *Dag, graphItemID string, processing resources.ResourceInterface, firstItems []string, lastItems []string) (*Dag, []string, []string) { - unique_name := getArgoName(processing.GetName(), graphItemID) - step := Task{Name: unique_name, Template: unique_name} - instance := processing.GetSelectedInstance() + unique_name := getArgoName(processing.GetName(), graphItemID) + step := Task{Name: unique_name, Template: unique_name} + index := 0 + if d, ok := exec.SelectedInstances[processing.GetID()]; ok { + index = d + } + instance := processing.GetSelectedInstance(&index) if instance != nil { for _, value := range instance.(*resources.ProcessingInstance).Env { step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{ @@ -373,11 +408,11 @@ func (b *ArgoBuilder) addTaskToArgo(dag *Dag, graphItemID string, processing *re return dag, firstItems, lastItems } -func (b *ArgoBuilder) createVolumes(volumes []VolumeMount) { // TODO : one think about remote volume but TG +func (b *ArgoBuilder) createVolumes(exec *workflow_execution.WorkflowExecution, volumes []VolumeMount) { // TODO : one think about remote volume but TG for _, volume := range volumes { index := 0 - if volume.Storage.SelectedInstanceIndex != nil && (*volume.Storage.SelectedInstanceIndex) >= 0 { - index = *volume.Storage.SelectedInstanceIndex + if s, ok := exec.SelectedInstances[volume.Storage.GetID()]; ok { + index = s } storage := volume.Storage.Instances[index] new_volume := VolumeClaimTemplate{} @@ -435,10 +470,10 @@ func getArgoName(raw_name string, component_id string) (formatedName string) { // Verify if a processing resource is attached to another Compute than the one hosting // the current Open Cloud instance. If true return the peer ID to contact -func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource, graphID string) (bool, string) { +func (b *ArgoBuilder) isProcessingReparted(processing resources.ResourceInterface, graphID string) (bool, string) { computeAttached := b.retrieveProcessingCompute(graphID) if computeAttached == nil { - logger.Error().Msg("No compute was found attached to processing " + processing.Name + " : " + processing.UUID) + logger.Error().Msg("No compute was found attached to processing " + processing.GetName() + " : " + processing.GetID()) panic(0) } @@ -473,7 +508,7 @@ func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.Compu } else if link.Destination.ID == graphID { oppositeId = link.Source.ID } - + if oppositeId != "" { dt, res := b.OriginWorkflow.Graph.GetResource(oppositeId) if dt == oclib.COMPUTE_RESOURCE { @@ -495,10 +530,10 @@ func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { // Setup admiralty for each node for _, peer := range b.RemotePeers { logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer)) - setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer) + setter.InitializeAdmiralty(conf.GetConfig().PeerID, peer) } - // Update the name of the admiralty node to use + // Update the name of the admiralty node to use for _, template := range b.Workflow.Spec.Templates { if len(template.Metadata.Annotations) > 0 { if peerId, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok { @@ -514,14 +549,14 @@ func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { yamlified, err := yaml.Marshal(b.Workflow) if err != nil { logger.Error().Msg("Could not transform object to yaml file") - return "", err + return "", err } // Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss current_timestamp := time.Now().Format("02_01_2006_150405") file_name := random_name + "_" + current_timestamp + ".yml" workflows_dir := "./argo_workflows/" err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660) - + if err != nil { logger.Error().Msg("Could not write the yaml file") return "", err diff --git a/workflow_builder/argo_services.go b/workflow_builder/argo_services.go index f95740a..0961686 100644 --- a/workflow_builder/argo_services.go +++ b/workflow_builder/argo_services.go @@ -5,10 +5,11 @@ import ( "strings" "cloud.o-forge.io/core/oc-lib/models/resources" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "gopkg.in/yaml.v3" ) -func (b *ArgoBuilder) CreateService(id string, processing *resources.ProcessingResource) { +func (b *ArgoBuilder) CreateService(exec *workflow_execution.WorkflowExecution, id string, processing resources.ResourceInterface) { new_service := models.Service{ APIVersion: "v1", Kind: "Service", @@ -24,17 +25,21 @@ func (b *ArgoBuilder) CreateService(id string, processing *resources.ProcessingR if processing == nil { return } - b.completeServicePorts(&new_service, id, processing) + b.completeServicePorts(exec, &new_service, id, processing) b.Services = append(b.Services, &new_service) } -func (b *ArgoBuilder) completeServicePorts(service *models.Service, id string, processing *resources.ProcessingResource) { - instance := processing.GetSelectedInstance() +func (b *ArgoBuilder) completeServicePorts(exec *workflow_execution.WorkflowExecution, service *models.Service, id string, processing resources.ResourceInterface) { + index := 0 + if d, ok := exec.SelectedInstances[processing.GetID()]; ok { + index = d + } + instance := processing.GetSelectedInstance(&index) if instance != nil && instance.(*resources.ProcessingInstance).Access != nil && instance.(*resources.ProcessingInstance).Access.Container != nil { for _, execute := range instance.(*resources.ProcessingInstance).Access.Container.Exposes { if execute.PAT != 0 { new_port_translation := models.ServicePort{ - Name: strings.ToLower(processing.Name) + id, + Name: strings.ToLower(processing.GetName()) + id, Port: execute.Port, TargetPort: execute.PAT, Protocol: "TCP", diff --git a/workflow_builder/graph.go b/workflow_builder/graph.go index edc0fc3..bcbc73e 100644 --- a/workflow_builder/graph.go +++ b/workflow_builder/graph.go @@ -6,6 +6,7 @@ import ( oclib "cloud.o-forge.io/core/oc-lib" workflow "cloud.o-forge.io/core/oc-lib/models/workflow" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" ) type WorflowDB struct { @@ -41,7 +42,7 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo return new_wf, nil } -func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, int, error) { +func (w *WorflowDB) ExportToArgo(exec *workflow_execution.WorkflowExecution, timeout int) (*ArgoBuilder, int, error) { logger := oclib.GetLogger() logger.Info().Msg(fmt.Sprint("Exporting to Argo", w.Workflow)) if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil { @@ -49,7 +50,7 @@ func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, i } argoBuilder := ArgoBuilder{OriginWorkflow: w.Workflow, Timeout: timeout} - stepMax, _, _, err := argoBuilder.CreateDAG(namespace, true) + stepMax, _, _, err := argoBuilder.CreateDAG(exec, exec.ExecutionsID, true) if err != nil { logger.Error().Msg("Could not create the argo file for " + w.Workflow.Name) return nil, 0, err