From 91f421af1e88f74186c14730f64727e093572460 Mon Sep 17 00:00:00 2001 From: mr Date: Thu, 12 Jun 2025 14:01:16 +0200 Subject: [PATCH] Refactor + Multi admiralty test --- Makefile | 11 +- go.mod | 2 +- go.sum | 4 + main.go | 38 +-- models/services.go | 20 ++ models/template.go | 177 ++++++++++- models/utils.go | 92 ++++++ models/volume.go | 26 ++ workflow_builder/admiralty_setter.go | 87 +++--- workflow_builder/argo_builder.go | 428 ++++++--------------------- workflow_builder/argo_services.go | 14 +- workflow_builder/graph.go | 3 +- 12 files changed, 460 insertions(+), 442 deletions(-) create mode 100644 models/utils.go diff --git a/Makefile b/Makefile index eb90ce8..6070f5e 100644 --- a/Makefile +++ b/Makefile @@ -3,8 +3,6 @@ build: clean go build . -dev: build - run: ./oc-monitord @@ -12,7 +10,7 @@ clean: rm -rf oc-monitord docker: - DOCKER_BUILDKIT=1 docker build -t oc/oc-monitord:0.0.1 -f Dockerfile . + DOCKER_BUILDKIT=1 docker build -t oc/oc-monitord:0.0.1 -f Dockerfile . --build-arg=HOST=$(HOST) docker tag oc/oc-monitord:0.0.1 oc/oc-monitord:latest docker tag oc/oc-monitord:0.0.1 oc-monitord:latest @@ -22,6 +20,11 @@ publish-kind: publish-registry: @echo "TODO" +docker-deploy: + docker compose up -d + +run-docker: docker publish-kind publish-registry docker-deploy + all: docker publish-kind publish-registry -.PHONY: build run clean docker publish-kind publish-registry +.PHONY: build run clean docker publish-kind publish-registry \ No newline at end of file diff --git a/go.mod b/go.mod index 3e0e1b7..99ea101 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.1 toolchain go1.23.3 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9 + cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963 github.com/akamensky/argparse v1.4.0 github.com/google/uuid v1.6.0 github.com/goraz/onion v0.1.3 diff --git a/go.sum b/go.sum index 4ad81c9..e1beb5c 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,11 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.o-forge.io/core/oc-lib v0.0.0-20250217072519-cafadec1469f h1:esLB0EAn8IuOChW35kcBrPaN80z4A4yYyz1mXT45GQo= +cloud.o-forge.io/core/oc-lib v0.0.0-20250217072519-cafadec1469f/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE= cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9 h1:mSFFPwil5Ih+RPBvn88MBerQMtsoHnOuyCZQaf91a34= cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE= +cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963 h1:ADDfqwtWF+VQTMSNAWPuhc4mmiKdgpHNmBB+UI2jRPE= +cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc= github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= diff --git a/main.go b/main.go index 721a23b..21c2b96 100644 --- a/main.go +++ b/main.go @@ -14,6 +14,7 @@ import ( "oc-monitord/conf" l "oc-monitord/logger" + "oc-monitord/models" u "oc-monitord/utils" "oc-monitord/workflow_builder" @@ -53,7 +54,7 @@ func main() { os.Setenv("test_service", "true") // Only for service demo, delete before merging on main parser = *argparse.NewParser("oc-monitord", "Launch the execution of a workflow given as a parameter and sends the produced logs to a loki database") - loadConfig(false, &parser) + setConf(&parser) oclib.InitDaemon("oc-monitord") oclib.SetConfig( @@ -71,6 +72,7 @@ func main() { exec := u.GetExecution(conf.GetConfig().ExecutionID) if exec == nil { logger.Fatal().Msg("Could not retrieve workflow ID from execution ID " + conf.GetConfig().ExecutionID + " on peer " + conf.GetConfig().PeerID) + return } conf.GetConfig().WorkflowID = exec.WorkflowID @@ -110,20 +112,20 @@ func main() { // Executed in a k8s environment logger.Info().Msg("Executes inside a k8s") // executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID() - executeInside(conf.GetConfig().ExecutionID, exec.ExecutionsID, argoFilePath) + executeInside(exec.ExecutionsID, argoFilePath) } } // So far we only log the output from -func executeInside(execID string, ns string, argo_file_path string) { +func executeInside(ns string, argo_file_path string) { t, err := tools2.NewService(conf.GetConfig().Mode) if err != nil { logger.Error().Msg("Could not create KubernetesTool") return } - + name, err := t.CreateArgoWorkflow(argo_file_path, ns) - // _ = name + // _ = name if err != nil { logger.Error().Msg("Could not create argo workflow : " + err.Error()) logger.Info().Msg(fmt.Sprint("CA :" + conf.GetConfig().KubeCA)) @@ -146,26 +148,26 @@ func executeInside(execID string, ns string, argo_file_path string) { } -func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { +func executeOutside(argo_file_path string, workflow *models.Workflow) { var stdoutSubmit, stderrSubmit io.ReadCloser var stdoutLogs, stderrLogs io.ReadCloser var wg sync.WaitGroup var err error - logger.Debug().Msg("executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID ) + logger.Debug().Msg("executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID) cmdSubmit := exec.Command("argo", "submit", "--watch", argo_file_path, "--serviceaccount", "sa-"+conf.GetConfig().ExecutionID, "-n", conf.GetConfig().ExecutionID) if stdoutSubmit, err = cmdSubmit.StdoutPipe(); err != nil { wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error()) return } - - cmdLogs := exec.Command("argo", "logs", "oc-monitor-"+workflowName, "-n", conf.GetConfig().ExecutionID, "--follow","--no-color") + + cmdLogs := exec.Command("argo", "logs", "oc-monitor-"+workflowName, "-n", conf.GetConfig().ExecutionID, "--follow", "--no-color") if stdoutLogs, err = cmdLogs.StdoutPipe(); err != nil { wf_logger.Error().Msg("Could not retrieve stdoutpipe for 'argo logs'" + err.Error()) return } - + var steps []string for _, template := range workflow.Spec.Templates { steps = append(steps, template.Name) @@ -186,7 +188,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { logger.Info().Msg("Running argo logs") if err := cmdLogs.Run(); err != nil { wf_logger.Error().Msg("Could not run '" + strings.Join(cmdLogs.Args, " ") + "'") - + wf_logger.Fatal().Msg(err.Error() + bufio.NewScanner(stderrLogs).Text()) } @@ -201,18 +203,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { wg.Wait() } - -func loadConfig(is_k8s bool, parser *argparse.Parser) { - var o *onion.Onion - o = initOnion(o) - setConf(is_k8s, o, parser) - - // if !IsValidUUID(conf.GetConfig().ExecutionID) { - // logger.Fatal().Msg("Provided ID is not an UUID") - // } -} - -func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) { +func setConf(parser *argparse.Parser) { url := parser.String("u", "url", &argparse.Options{Required: true, Default: "http://127.0.0.1:3100", Help: "Url to the Loki database logs will be sent to"}) mode := parser.String("M", "mode", &argparse.Options{Required: false, Default: "", Help: "Mode of the execution"}) execution := parser.String("e", "execution", &argparse.Options{Required: true, Help: "Execution ID of the workflow to request from oc-catalog API"}) @@ -305,7 +296,6 @@ func getContainerName(argo_file string) string { return container_name } - func updateStatus(status string, log string) { exec_id := conf.GetConfig().ExecutionID diff --git a/models/services.go b/models/services.go index 80465d1..a6bd083 100644 --- a/models/services.go +++ b/models/services.go @@ -1,5 +1,7 @@ package models +import "gopkg.in/yaml.v3" + type ServiceResource struct { Action string `yaml:"action,omitempty"` SuccessCondition string `yaml:"successCondition,omitempty"` @@ -15,6 +17,24 @@ type Service struct { Spec ServiceSpec `yaml:"spec"` } +func (s *Service) BindToArgo(workflow *Workflow) error { + service_manifest, err := yaml.Marshal(s) + if err != nil { + return err + } + service_template := Template{Name: "workflow-service-pod", + Resource: ServiceResource{ + Action: "create", + SuccessCondition: "status.succeeded > 0", + FailureCondition: "status.failed > 3", + SetOwnerReference: true, + Manifest: string(service_manifest), + }, + } + workflow.Spec.Templates = append(workflow.Spec.Templates, service_template) + return nil +} + type Metadata struct { Name string `yaml:"name"` } diff --git a/models/template.go b/models/template.go index f7a6019..bdd1fec 100644 --- a/models/template.go +++ b/models/template.go @@ -1,8 +1,14 @@ package models import ( + "encoding/json" + "fmt" "strings" + w "cloud.o-forge.io/core/oc-lib/models/workflow" + "cloud.o-forge.io/core/oc-lib/models/workflow/graph" + + "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/models" "cloud.o-forge.io/core/oc-lib/models/resources" ) @@ -37,12 +43,6 @@ func (c *Container) AddVolumeMount(volumeMount VolumeMount, volumes []VolumeMoun return volumes } -type VolumeMount struct { - Name string `yaml:"name"` - MountPath string `yaml:"mountPath"` - Storage *resources.StorageResource `yaml:"-"` -} - type Task struct { Name string `yaml:"name"` Template string `yaml:"template"` @@ -52,12 +52,77 @@ type Task struct { } `yaml:"arguments,omitempty"` } +func NewTask(processingName string, graphItemID string) *Task { + unique_name := GetArgoName(processingName, graphItemID) + return &Task{ + Name: unique_name, + Template: unique_name, + } +} + +func (t *Task) BindToArgo( + dag *Dag, + graphItemID string, + originWf *w.Workflow, + processing *resources.ProcessingResource, + firstItems, lastItems []string, +) (*Dag, []string, []string) { + if instance := processing.GetSelectedInstance(); instance != nil { + t.addParams(instance.(*resources.ProcessingInstance).Env) + t.addParams(instance.(*resources.ProcessingInstance).Inputs) + t.addParams(instance.(*resources.ProcessingInstance).Outputs) + } + t.Dependencies = TransformDepsToArgo(originWf.GetDependencies(graphItemID)) + name := "" + if originWf.Graph.Items[graphItemID].Processing != nil { + name = originWf.Graph.Items[graphItemID].Processing.GetName() + } + if originWf.Graph.Items[graphItemID].Workflow != nil { + name = originWf.Graph.Items[graphItemID].Workflow.GetName() + } + if len(t.Dependencies) == 0 && name != "" { + firstItems = append(firstItems, GetArgoName(name, graphItemID)) + } + if deps := originWf.IsDependancy(graphItemID); len(deps) == 0 && name != "" { + lastItems = append(lastItems, GetArgoName(name, graphItemID)) + } + dag.Tasks = append(dag.Tasks, *t) + return dag, firstItems, lastItems +} + +func (t *Task) addParams(params []models.Param) { + for _, value := range params { + t.Arguments.Parameters = append(t.Arguments.Parameters, Parameter{ + Name: value.Name, + Value: value.Value, + }) + } +} + +func (t *Task) GetDeps(name string) (int, string) { + for i, deps := range t.Dependencies { + if strings.Contains(deps, name) { + return i, deps + } + } + return 0, "" +} + type Dag struct { Tasks []Task `yaml:"tasks,omitempty"` } +func (d *Dag) GetTask(taskName string) *Task { + for _, task := range d.Tasks { + if strings.Contains(task.Name, taskName) { + return &task + } + } + return nil +} + type TemplateMetadata struct { - Labels map[string]string `yaml:"labels,omitempty"` + Labels map[string]string `yaml:"labels,omitempty"` Annotations map[string]string `yaml:"annotations,omitempty"` } @@ -66,6 +131,10 @@ type Secret struct { Key string `yaml:"key"` } +func NewSecret(name string, key string) *Secret { + return &Secret{Name: name, Key: key + "-key"} +} + type Key struct { Key string `yaml:"key"` Bucket string `yaml:"bucket"` @@ -81,6 +150,59 @@ type Artifact struct { S3 *Key `yaml:"s3,omitempty"` } +func NewArtifact(name string, rw graph.StorageProcessingGraphLink, params []models.Param, template Template) *Artifact { + if rw.Write { + name += "-" + rw.Destination + "-input-write" + } else { + name = "-" + rw.Destination + "-input-read" + } + return &Artifact{ + Name: name, + Path: template.ReplacePerEnv(rw.Source, params), + } +} + +func (a *Artifact) BindToArgo(storageType enum.StorageType, rw graph.StorageProcessingGraphLink, params []models.Param, template Template) { + if rw.Write { + template.Outputs.Artifacts = append(template.Inputs.Artifacts, *a) + } else { + template.Inputs.Artifacts = append(template.Outputs.Artifacts, *a) + } +} + +func (a *Artifact) bindS3(rw graph.StorageProcessingGraphLink, params []models.Param, template Template) { + a.S3 = &Key{ + Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, params), + Insecure: true, // temporary + } + /* sel := storage.GetSelectedInstance() + if sel != nil { + if sel.(*resources.StorageResourceInstance).Credentials != nil { + tool, err := tools2.NewService(conf.GetConfig().Mode) + if err != nil || tool == nil { + logger.Error().Msg("Could not create the access secret") + } else { + id, err := tool.CreateAccessSecret(namespace, + sel.(*resources.StorageResourceInstance).Credentials.Login, + sel.(*resources.StorageResourceInstance).Credentials.Pass) + if err == nil { + a.S3.AccessKeySecret = NewSecret(id, "access") + a.S3.SecretKeySecret = NewSecret(id, "secret") + } + } + } + source := sel.(*resources.StorageResourceInstance).Source + a.S3.Key = strings.ReplaceAll(strings.ReplaceAll(a.S3.Key, source+"/", ""), source, "") + splits := strings.Split(a.S3.EndPoint, "/") + if len(splits) > 1 { + a.S3.Bucket = splits[0] + a.S3.EndPoint = strings.Join(splits[1:], "/") + } else { + a.S3.Bucket = splits[0] + } + } */ +} + type InOut struct { Parameters []Parameter `yaml:"parameters"` Artifacts []Artifact `yaml:"artifacts,omitempty"` @@ -143,10 +265,43 @@ func (template *Template) ReplacePerEnv(arg string, envs []models.Param) string // Add the metadata that allow Admiralty to pick up an Argo Workflow that needs to be reparted // The value of "clustername" is the peerId, which must be replaced by the node name's for this specific execution -func (t *Template) AddAdmiraltyAnnotations(peerId string){ +func (t *Template) AddAdmiraltyAnnotations(peerID, namespace string) error { if t.Metadata.Annotations == nil { t.Metadata.Annotations = make(map[string]string) } - t.Metadata.Annotations["multicluster.admiralty.io/elect"] = "" - t.Metadata.Annotations["multicluster.admiralty.io/clustername"] = peerId -} \ No newline at end of file + + const key = "admiralty.io/multi-cluster-scheduler" + + var annotation SchedulerAnnotation + + // Parse existing annotation if it exists + if val, ok := t.Metadata.Annotations[key]; ok && val != "" { + if err := json.Unmarshal([]byte(val), &annotation); err != nil { + return fmt.Errorf("failed to parse existing scheduler annotation: %w", err) + } + } + + // Add new affinity + annotation.Affinities = append(annotation.Affinities, affinity{ + Cluster: "target-" + peerID + "-" + namespace, + Namespace: namespace, + }) + + // Encode back to JSON + bytes, err := json.Marshal(annotation) + if err != nil { + return fmt.Errorf("failed to encode scheduler annotation: %w", err) + } + + t.Metadata.Annotations[key] = string(bytes) + return nil +} + +type affinity struct { + Cluster string `json:"cluster"` + Namespace string `json:"namespace"` +} + +type SchedulerAnnotation struct { + Affinities []affinity `json:"affinities"` +} diff --git a/models/utils.go b/models/utils.go new file mode 100644 index 0000000..a300b3b --- /dev/null +++ b/models/utils.go @@ -0,0 +1,92 @@ +package models + +import ( + "strings" + + w "cloud.o-forge.io/core/oc-lib/models/workflow" +) + +type WorkflowsDependancies struct { + FirstWfTasks map[string][]string + RelatedWfTasks map[string][]string + LastWfTasks map[string][]string +} + +func NewWorkflowDependancies() *WorkflowsDependancies { + return &WorkflowsDependancies{ + FirstWfTasks: map[string][]string{}, + RelatedWfTasks: map[string][]string{}, + LastWfTasks: map[string][]string{}, + } +} + +func (w *WorkflowsDependancies) BindFirstTasks(depsFunc func(v string) []w.Deps, dag *Dag) { + for wfID, firstTasks := range w.FirstWfTasks { + deps := depsFunc(wfID) + if task := dag.GetTask(wfID); task != nil && len(deps) > 0 { + task.Dependencies = append(task.Dependencies, firstTasks...) + } + } +} + +func (w *WorkflowsDependancies) BindRelatedTasks(dag *Dag) { + for wfID, relatedWfTasks := range w.RelatedWfTasks { + for _, dep := range relatedWfTasks { + if task := dag.GetTask(dep); task != nil { + index := -1 + if i, deps := task.GetDeps(wfID); deps != "" { + index = i + } + if index != -1 { + task.Dependencies = append(task.Dependencies[:index], task.Dependencies[index+1:]...) + } + if w.LastWfTasks[wfID] != nil { + task.Dependencies = append(task.Dependencies, w.LastWfTasks[wfID]...) + } + } + } + } +} + +type Workflow struct { + ApiVersion string `yaml:"apiVersion"` + Kind string `yaml:"kind"` + Metadata struct { + Name string `yaml:"name"` + } `yaml:"metadata"` + Spec Spec `yaml:"spec,omitempty"` +} + +func (b *Workflow) GetDag() *Dag { + for _, t := range b.Spec.Templates { + if t.Name == "dag" { + return t.Dag + } + } + b.Spec.Templates = append(b.Spec.Templates, Template{Name: "dag", Dag: &Dag{}}) + return b.Spec.Templates[len(b.Spec.Templates)-1].Dag +} + +type Spec struct { + ServiceAccountName string `yaml:"serviceAccountName"` + Entrypoint string `yaml:"entrypoint"` + Arguments []Parameter `yaml:"arguments,omitempty"` + Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"` + Templates []Template `yaml:"templates"` + Timeout int `yaml:"activeDeadlineSeconds,omitempty"` +} + +func GetArgoName(raw_name string, component_id string) (formatedName string) { + formatedName = strings.ReplaceAll(raw_name, " ", "-") + formatedName += "-" + component_id + formatedName = strings.ToLower(formatedName) + return +} + +func TransformDepsToArgo(deps []w.Deps) []string { + argoDeps := []string{} + for _, dep := range deps { + argoDeps = append(argoDeps, GetArgoName(dep.Source, dep.Dest)) + } + return argoDeps +} diff --git a/models/volume.go b/models/volume.go index 8b30fff..fb55f2b 100644 --- a/models/volume.go +++ b/models/volume.go @@ -1,5 +1,12 @@ package models +import ( + "fmt" + "strings" + + "cloud.o-forge.io/core/oc-lib/models/resources" +) + type VolumeClaimTemplate struct { Metadata struct { Name string `yaml:"name"` @@ -15,3 +22,22 @@ type VolumeSpec struct { } `yaml:"requests"` } `yaml:"resources"` } + +type VolumeMount struct { + Name string `yaml:"name"` + MountPath string `yaml:"mountPath"` + Storage *resources.StorageResource `yaml:"-"` +} + +func (v *VolumeMount) BindToArgo(workflow *Workflow) { // TODO : one think about remote volume but TG + index := 0 + if v.Storage.SelectedInstanceIndex != nil && (*v.Storage.SelectedInstanceIndex) >= 0 { + index = *v.Storage.SelectedInstanceIndex + } + storage := v.Storage.Instances[index] + new_volume := VolumeClaimTemplate{} + new_volume.Metadata.Name = strings.ReplaceAll(strings.ToLower(v.Name), " ", "-") + new_volume.Spec.AccessModes = []string{"ReadWriteOnce"} + new_volume.Spec.Resources.Requests.Storage = fmt.Sprintf("%v", storage.SizeGB) + storage.SizeType.ToArgo() + workflow.Spec.Volumes = append(workflow.Spec.Volumes, new_volume) +} diff --git a/workflow_builder/admiralty_setter.go b/workflow_builder/admiralty_setter.go index b36cd72..26f9b1c 100644 --- a/workflow_builder/admiralty_setter.go +++ b/workflow_builder/admiralty_setter.go @@ -14,61 +14,60 @@ import ( tools "cloud.o-forge.io/core/oc-lib/tools" ) - type AdmiraltySetter struct { - Id string // ID to identify the execution, correspond to workflow_executions id - NodeName string // Allows to retrieve the name of the node used for this execution on each peer {"peerId": "nodeName"} + Id string // ID to identify the execution, correspond to workflow_executions id + NodeName string // Allows to retrieve the name of the node used for this execution on each peer {"peerId": "nodeName"} } -func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID string) error { - +func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string, remotePeerID string) error { + logger := logs.GetLogger() - data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(remotePeerID) + data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", localPeerID, nil, nil).LoadOne(remotePeerID) if data.Code != 200 { logger.Error().Msg("Error while trying to instantiate remote peer " + remotePeerID) return fmt.Errorf(data.Err) } remotePeer := data.ToPeer() - data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(localPeerID) - if data.Code != 200 { - logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID) - return fmt.Errorf(data.Err) - } - localPeer := data.ToPeer() + data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", localPeerID, nil, nil).LoadOne(localPeerID) + if data.Code != 200 { + logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID) + return fmt.Errorf(data.Err) + } + localPeer := data.ToPeer() - caller := tools.NewHTTPCaller( - map[tools.DataType]map[tools.METHOD]string{ - tools.ADMIRALTY_SOURCE: { - tools.POST :"/:id", - }, - tools.ADMIRALTY_KUBECONFIG: { - tools.GET:"/:id", - }, - tools.ADMIRALTY_SECRET: { - tools.POST:"/:id/" + remotePeerID, - }, - tools.ADMIRALTY_TARGET: { - tools.POST:"/:id/" + remotePeerID, - }, - tools.ADMIRALTY_NODES: { - tools.GET:"/:id/" + remotePeerID, - }, + caller := tools.NewHTTPCaller( + map[tools.DataType]map[tools.METHOD]string{ + tools.ADMIRALTY_SOURCE: { + tools.POST: "/:id", }, - ) - + tools.ADMIRALTY_KUBECONFIG: { + tools.GET: "/:id", + }, + tools.ADMIRALTY_SECRET: { + tools.POST: "/:id/" + remotePeerID, + }, + tools.ADMIRALTY_TARGET: { + tools.POST: "/:id/" + remotePeerID, + }, + tools.ADMIRALTY_NODES: { + tools.GET: "/:id/" + remotePeerID, + }, + }, + ) + logger.Info().Msg("\n\n Creating the Admiralty Source on " + remotePeerID + " ns-" + s.Id) - _ = s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict},caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true) + _ = s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict}, caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true) logger.Info().Msg("\n\n Retrieving kubeconfig with the secret on " + remotePeerID + " ns-" + s.Id) kubeconfig := s.getKubeconfig(remotePeer, caller) logger.Info().Msg("\n\n Creating a secret from the kubeconfig " + localPeerID + " ns-" + s.Id) - _ = s.callRemoteExecution(localPeer, []int{http.StatusCreated}, caller,s.Id, tools.ADMIRALTY_SECRET, tools.POST,kubeconfig, true) - logger.Info().Msg("\n\n Creating the Admiralty Target on " + localPeerID + " in namespace " + s.Id ) - _ = s.callRemoteExecution(localPeer,[]int{http.StatusCreated, http.StatusConflict},caller,s.Id,tools.ADMIRALTY_TARGET,tools.POST, nil, true) + _ = s.callRemoteExecution(localPeer, []int{http.StatusCreated}, caller, s.Id, tools.ADMIRALTY_SECRET, tools.POST, kubeconfig, true) + logger.Info().Msg("\n\n Creating the Admiralty Target on " + localPeerID + " in namespace " + s.Id) + _ = s.callRemoteExecution(localPeer, []int{http.StatusCreated, http.StatusConflict}, caller, s.Id, tools.ADMIRALTY_TARGET, tools.POST, nil, true) logger.Info().Msg("\n\n Checking for the creation of the admiralty node on " + localPeerID + " ns-" + s.Id) - s.checkNodeStatus(localPeer,caller) - + s.checkNodeStatus(localPeer, caller) + return nil } @@ -90,7 +89,7 @@ func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCalle return kubedata } -func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int,caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) *peer.PeerExecution { +func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int, caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) *peer.PeerExecution { l := utils.GetLogger() resp, err := peer.LaunchPeerExecution(peer.UUID, dataID, dt, method, body, caller) if err != nil { @@ -112,7 +111,7 @@ func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int, return resp } -func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller){ +func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller) { var data map[string]interface{} if resp, ok := caller.LastResults["body"]; ok { json.Unmarshal(resp.([]byte), &data) @@ -129,10 +128,10 @@ func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller){ } } -func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller){ - for i := range(5) { +func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller) { + for i := range 5 { time.Sleep(10 * time.Second) // let some time for kube to generate the node - _ = s.callRemoteExecution(localPeer,[]int{http.StatusOK},caller,s.Id,tools.ADMIRALTY_NODES,tools.GET, nil, false) + _ = s.callRemoteExecution(localPeer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_NODES, tools.GET, nil, false) if caller.LastResults["code"] == 200 { s.storeNodeName(caller) return @@ -143,5 +142,5 @@ func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HT } logger.Info().Msg("Could not verify that node is up. Retrying...") } - -} \ No newline at end of file + +} diff --git a/workflow_builder/argo_builder.go b/workflow_builder/argo_builder.go index 03b9d38..fa855f3 100644 --- a/workflow_builder/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -5,17 +5,16 @@ package workflow_builder import ( + "errors" "fmt" "oc-monitord/conf" + "oc-monitord/models" . "oc-monitord/models" - tools2 "oc-monitord/tools" "os" - "strings" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/logs" - "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/resources" w "cloud.o-forge.io/core/oc-lib/models/workflow" "github.com/nwtgck/go-fakelish" @@ -27,71 +26,37 @@ var logger zerolog.Logger type ArgoBuilder struct { OriginWorkflow *w.Workflow - Workflow Workflow + Workflow *models.Workflow Services []*Service Timeout int RemotePeers []string } -type Workflow struct { - ApiVersion string `yaml:"apiVersion"` - Kind string `yaml:"kind"` - Metadata struct { - Name string `yaml:"name"` - } `yaml:"metadata"` - Spec Spec `yaml:"spec,omitempty"` -} - -func (b *Workflow) getDag() *Dag { - for _, t := range b.Spec.Templates { - if t.Name == "dag" { - return t.Dag - } - } - b.Spec.Templates = append(b.Spec.Templates, Template{Name: "dag", Dag: &Dag{}}) - return b.Spec.Templates[len(b.Spec.Templates)-1].Dag -} - -type Spec struct { - ServiceAccountName string `yaml:"serviceAccountName"` - Entrypoint string `yaml:"entrypoint"` - Arguments []Parameter `yaml:"arguments,omitempty"` - Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"` - Templates []Template `yaml:"templates"` - Timeout int `yaml:"activeDeadlineSeconds,omitempty"` -} - // TODO: found on a processing instance linked to storage // add s3, gcs, azure, etc if needed on a link between processing and storage -func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, []string, error) { +func (b *ArgoBuilder) CreateDAG(namespace string, write bool) (int, []string, []string, error) { logger = logs.GetLogger() logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items)) // handle services by checking if there is only one processing with hostname and port firstItems, lastItems, volumes := b.createTemplates(namespace) b.createVolumes(volumes) - if b.Timeout > 0 { b.Workflow.Spec.Timeout = b.Timeout } - b.Workflow.Spec.ServiceAccountName = "sa-"+namespace + b.Workflow.Spec.ServiceAccountName = "sa-" + namespace b.Workflow.Spec.Entrypoint = "dag" b.Workflow.ApiVersion = "argoproj.io/v1alpha1" b.Workflow.Kind = "Workflow" - if !write { - return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil - } - - - return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil + return len(b.Workflow.GetDag().Tasks), firstItems, lastItems, nil } -func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []VolumeMount) { - volumes := []VolumeMount{} +func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []models.VolumeMount) { + volumes := []models.VolumeMount{} firstItems := []string{} lastItems := []string{} items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) logger.Info().Msg(fmt.Sprint("Creating templates", len(items))) - for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) { + for _, item := range items { instance := item.Processing.GetSelectedInstance() logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance)) if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil { @@ -101,89 +66,68 @@ func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []V volumes, firstItems, lastItems = b.createArgoTemplates(namespace, item.ID, item.Processing, volumes, firstItems, lastItems) } - firstWfTasks := map[string][]string{} - latestWfTasks := map[string][]string{} - relatedWfTasks := map[string][]string{} - for _, wf := range b.OriginWorkflow.Workflows { - realWorkflow, code, err := w.NewAccessor(nil).LoadOne(wf) - if code != 200 { - logger.Error().Msg("Error loading the workflow : " + err.Error()) - continue - } - subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Timeout: b.Timeout} - _, fi, li, err := subBuilder.CreateDAG(namespace, false) - if err != nil { - logger.Error().Msg("Error creating the subworkflow : " + err.Error()) - continue - } - firstWfTasks[wf] = fi - if ok, depsOfIds := subBuilder.isArgoDependancy(wf); ok { // IS BEFORE - latestWfTasks[wf] = li - relatedWfTasks[wf] = depsOfIds - } - subDag := subBuilder.Workflow.getDag() - d := b.Workflow.getDag() - d.Tasks = append(d.Tasks, subDag.Tasks...) // add the tasks of the subworkflow to the main workflow - b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, subBuilder.Workflow.Spec.Templates...) - b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, subBuilder.Workflow.Spec.Volumes...) - b.Workflow.Spec.Arguments = append(b.Workflow.Spec.Arguments, subBuilder.Workflow.Spec.Arguments...) - b.Services = append(b.Services, subBuilder.Services...) - } - for wfID, depsOfIds := range relatedWfTasks { - for _, dep := range depsOfIds { - for _, task := range b.Workflow.getDag().Tasks { - if strings.Contains(task.Name, dep) { - index := -1 - for i, depp := range task.Dependencies { - if strings.Contains(depp, wfID) { - index = i - break - } - } - if index != -1 { - task.Dependencies = append(task.Dependencies[:index], task.Dependencies[index+1:]...) - } - task.Dependencies = append(task.Dependencies, latestWfTasks[wfID]...) - } - } - } - } - for wfID, fi := range firstWfTasks { - deps := b.getArgoDependencies(wfID) - if len(deps) > 0 { - for _, dep := range fi { - for _, task := range b.Workflow.getDag().Tasks { - if strings.Contains(task.Name, dep) { - task.Dependencies = append(task.Dependencies, deps...) - } - } - } - } + + wfDeps := models.NewWorkflowDependancies() + for _, workflowID := range b.OriginWorkflow.Workflows { + b.createWorkflowArgoTemplate(workflowID, namespace, wfDeps) } + wfDeps.BindRelatedTasks(b.Workflow.GetDag()) + wfDeps.BindFirstTasks(b.OriginWorkflow.GetDependencies, b.Workflow.GetDag()) + if b.Services != nil { - dag := b.Workflow.getDag() + dag := b.Workflow.GetDag() dag.Tasks = append(dag.Tasks, Task{Name: "workflow-service-pod", Template: "workflow-service-pod"}) b.addServiceToArgo() } return firstItems, lastItems, volumes } -func (b *ArgoBuilder) createArgoTemplates(namespace string, +func (b *ArgoBuilder) createWorkflowArgoTemplate( + workflowID string, + namespace string, + wfDeps *models.WorkflowsDependancies, +) { + realWorkflow, code, err := w.NewAccessor(nil).LoadOne(workflowID) + if code != 200 { + logger.Error().Msg("Error loading the workflow : " + err.Error()) + return + } + subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Workflow: &models.Workflow{}, Timeout: b.Timeout} + _, fi, li, err := subBuilder.CreateDAG(namespace, false) + if err != nil { + logger.Error().Msg("Error creating the subworkflow : " + err.Error()) + return + } + wfDeps.FirstWfTasks[workflowID] = fi + if depsOfIds := subBuilder.OriginWorkflow.IsDependancy(workflowID); len(depsOfIds) > 0 { // IS BEFORE + wfDeps.LastWfTasks[workflowID] = li + wfDeps.RelatedWfTasks[workflowID] = models.TransformDepsToArgo(depsOfIds) + } + subDag := subBuilder.Workflow.GetDag() + d := b.Workflow.GetDag() + d.Tasks = append(d.Tasks, subDag.Tasks...) // add the tasks of the subworkflow to the main workflow + b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, subBuilder.Workflow.Spec.Templates...) + b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, subBuilder.Workflow.Spec.Volumes...) + b.Workflow.Spec.Arguments = append(b.Workflow.Spec.Arguments, subBuilder.Workflow.Spec.Arguments...) + b.Services = append(b.Services, subBuilder.Services...) +} + +func (b *ArgoBuilder) createArgoTemplates( + namespace string, id string, processing *resources.ProcessingResource, - volumes []VolumeMount, + volumes []models.VolumeMount, firstItems []string, - lastItems []string) ([]VolumeMount, []string, []string) { - _, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems) - template := &Template{Name: getArgoName(processing.GetName(), id)} + lastItems []string, +) ([]models.VolumeMount, []string, []string) { + _, firstItems, lastItems = NewTask(processing.Name, id).BindToArgo(b.Workflow.GetDag(), id, b.OriginWorkflow, processing, firstItems, lastItems) + template := &Template{Name: models.GetArgoName(processing.GetName(), id)} logger.Info().Msg(fmt.Sprint("Creating template for", template.Name)) - isReparted, peerId := b.isProcessingReparted(*processing, id) - template.CreateContainer(processing, b.Workflow.getDag()) - - if isReparted { - logger.Debug().Msg("Reparted processing, on " + peerId) - b.RemotePeers = append(b.RemotePeers, peerId) - template.AddAdmiraltyAnnotations(peerId) + + template.CreateContainer(processing, b.Workflow.GetDag()) + if err := b.RepartiteProcess(*processing, id, template, namespace); err != nil { + logger.Error().Msg(fmt.Sprint("Problem to sets up repartition expected %s", err.Error())) + return volumes, firstItems, lastItems } // get datacenter from the processing if processing.IsService { @@ -191,256 +135,52 @@ func (b *ArgoBuilder) createArgoTemplates(namespace string, template.Metadata.Labels = make(map[string]string) template.Metadata.Labels["app"] = "oc-service-" + processing.GetName() // Construct the template for the k8s service and add a link in graph between k8s service and processing } - related := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.Graph.IsStorage) - for _, r := range related { - storage := r.Node.(*resources.StorageResource) - for _, linkToStorage := range r.Links { - for _, rw := range linkToStorage.StorageLinkInfos { - art := Artifact{Path: template.ReplacePerEnv(rw.Source, linkToStorage.Env)} - if rw.Write { - art.Name = storage.GetName() + "-" + rw.Destination + "-input-write" - } else { - art.Name = storage.GetName() + "-" + rw.Destination + "-input-read" - } - if storage.StorageType == enum.S3 { - art.S3 = &Key{ - Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env), - Insecure: true, // temporary - } - sel := storage.GetSelectedInstance() - if sel != nil { - if sel.(*resources.StorageResourceInstance).Credentials != nil { - tool, err := tools2.NewService(conf.GetConfig().Mode) - if err != nil || tool == nil { - logger.Error().Msg("Could not create the access secret") - } else { - id, err := tool.CreateAccessSecret(namespace, - sel.(*resources.StorageResourceInstance).Credentials.Login, - sel.(*resources.StorageResourceInstance).Credentials.Pass) - if err == nil { - art.S3.AccessKeySecret = &Secret{ - Name: id, - Key: "access-key", - } - art.S3.SecretKeySecret = &Secret{ - Name: id, - Key: "secret-key", - } - } - } - } - art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source+"/", "") - art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source, "") - splits := strings.Split(art.S3.EndPoint, "/") - if len(splits) > 1 { - art.S3.Bucket = splits[0] - art.S3.EndPoint = strings.Join(splits[1:], "/") - } else { - art.S3.Bucket = splits[0] - } - } - } - if rw.Write { - template.Outputs.Artifacts = append(template.Inputs.Artifacts, art) - } else { - template.Inputs.Artifacts = append(template.Outputs.Artifacts, art) - } - } - } - index := 0 - if storage.SelectedInstanceIndex != nil && (*storage.SelectedInstanceIndex) >= 0 { - index = *storage.SelectedInstanceIndex - } - s := storage.Instances[index] - if s.Local { - volumes = template.Container.AddVolumeMount(VolumeMount{ - Name: strings.ReplaceAll(strings.ToLower(storage.GetName()), " ", "-"), - MountPath: s.Source, - Storage: storage, - }, volumes) - } - } b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, *template) return volumes, firstItems, lastItems } -func (b *ArgoBuilder) addTaskToArgo(dag *Dag, graphItemID string, processing *resources.ProcessingResource, - firstItems []string, lastItems []string) (*Dag, []string, []string) { - unique_name := getArgoName(processing.GetName(), graphItemID) - step := Task{Name: unique_name, Template: unique_name} - instance := processing.GetSelectedInstance() - if instance != nil { - for _, value := range instance.(*resources.ProcessingInstance).Env { - step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{ - Name: value.Name, - Value: value.Value, - }) - } - for _, value := range instance.(*resources.ProcessingInstance).Inputs { - step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{ - Name: value.Name, - Value: value.Value, - }) - } - for _, value := range instance.(*resources.ProcessingInstance).Outputs { - step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{ - Name: value.Name, - Value: value.Value, - }) - } - } - step.Dependencies = b.getArgoDependencies(graphItemID) - name := "" - if b.OriginWorkflow.Graph.Items[graphItemID].Processing != nil { - name = b.OriginWorkflow.Graph.Items[graphItemID].Processing.GetName() - } - if b.OriginWorkflow.Graph.Items[graphItemID].Workflow != nil { - name = b.OriginWorkflow.Graph.Items[graphItemID].Workflow.GetName() - } - if len(step.Dependencies) == 0 && name != "" { - firstItems = append(firstItems, getArgoName(name, graphItemID)) - } - if ok, _ := b.isArgoDependancy(graphItemID); !ok && name != "" { - lastItems = append(lastItems, getArgoName(name, graphItemID)) - } - dag.Tasks = append(dag.Tasks, step) - return dag, firstItems, lastItems -} - -func (b *ArgoBuilder) createVolumes(volumes []VolumeMount) { // TODO : one think about remote volume but TG +func (b *ArgoBuilder) createVolumes(volumes []models.VolumeMount) { // TODO : one think about remote volume but TG for _, volume := range volumes { - index := 0 - if volume.Storage.SelectedInstanceIndex != nil && (*volume.Storage.SelectedInstanceIndex) >= 0 { - index = *volume.Storage.SelectedInstanceIndex - } - storage := volume.Storage.Instances[index] - new_volume := VolumeClaimTemplate{} - new_volume.Metadata.Name = strings.ReplaceAll(strings.ToLower(volume.Name), " ", "-") - new_volume.Spec.AccessModes = []string{"ReadWriteOnce"} - new_volume.Spec.Resources.Requests.Storage = fmt.Sprintf("%v", storage.SizeGB) + storage.SizeType.ToArgo() - b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) + volume.BindToArgo(b.Workflow) } } -func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) { - dependancyOfIDs := []string{} - isDeps := false - for _, link := range b.OriginWorkflow.Graph.Links { - if _, ok := b.OriginWorkflow.Graph.Items[link.Destination.ID]; !ok { - logger.Info().Msg(fmt.Sprint("Could not find the source of the link", link.Destination.ID)) - continue - } - source := b.OriginWorkflow.Graph.Items[link.Destination.ID].Processing - if id == link.Source.ID && source != nil { - isDeps = true - dependancyOfIDs = append(dependancyOfIDs, getArgoName(source.GetName(), link.Destination.ID)) - } - wourceWF := b.OriginWorkflow.Graph.Items[link.Destination.ID].Workflow - if id == link.Source.ID && wourceWF != nil { - isDeps = true - dependancyOfIDs = append(dependancyOfIDs, getArgoName(wourceWF.GetName(), link.Destination.ID)) - } - } - return isDeps, dependancyOfIDs -} - -func (b *ArgoBuilder) getArgoDependencies(id string) (dependencies []string) { - for _, link := range b.OriginWorkflow.Graph.Links { - if _, ok := b.OriginWorkflow.Graph.Items[link.Source.ID]; !ok { - logger.Info().Msg(fmt.Sprint("Could not find the source of the link", link.Source.ID)) - continue - } - source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing - if id == link.Destination.ID && source != nil { - dependency_name := getArgoName(source.GetName(), link.Source.ID) - dependencies = append(dependencies, dependency_name) - continue - } - } - return -} - -func getArgoName(raw_name string, component_id string) (formatedName string) { - formatedName = strings.ReplaceAll(raw_name, " ", "-") - formatedName += "-" + component_id - formatedName = strings.ToLower(formatedName) - return -} - // Verify if a processing resource is attached to another Compute than the one hosting // the current Open Cloud instance. If true return the peer ID to contact -func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource, graphID string) (bool, string) { - computeAttached := b.retrieveProcessingCompute(graphID) - if computeAttached == nil { - logger.Error().Msg("No compute was found attached to processing " + processing.Name + " : " + processing.UUID) - panic(0) +func (b *ArgoBuilder) RepartiteProcess(processing resources.ProcessingResource, graphID string, template *models.Template, namespace string) error { + computeAttached := b.OriginWorkflow.GetByRelatedProcessing(processing.GetID(), b.OriginWorkflow.Graph.IsCompute) + if len(computeAttached) == 0 { + return errors.New("No compute was found attached to processing " + processing.Name + " : " + processing.UUID) } - // Creates an accessor srtictly for Peer Collection - req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil) - if req == nil { - fmt.Println("TODO : handle error when trying to create a request on the Peer Collection") - return false, "" - } - - res := req.LoadOne(computeAttached.CreatorID) - if res.Err != "" { - fmt.Print("TODO : handle error when requesting PeerID") - fmt.Print(res.Err) - return false, "" - } - - peer := *res.ToPeer() - - isNotReparted := peer.State == 1 - logger.Info().Msg(fmt.Sprint("Result IsMySelf for ", peer.UUID, " : ", isNotReparted)) - - return !isNotReparted, peer.UUID -} - -func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.ComputeResource { - for _, link := range b.OriginWorkflow.Graph.Links { - // If a link contains the id of the processing - var oppositeId string - if link.Source.ID == graphID { - oppositeId = link.Destination.ID - } else if link.Destination.ID == graphID { - oppositeId = link.Source.ID - } - - if oppositeId != "" { - dt, res := b.OriginWorkflow.Graph.GetResource(oppositeId) - if dt == oclib.COMPUTE_RESOURCE { - return res.(*resources.ComputeResource) - } else { - continue - } + for _, related := range computeAttached { + res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil).LoadOne(related.Node.GetCreatorID()) + if res.Err != "" { + return errors.New(res.Err) } - } + peer := *res.ToPeer() + isNotReparted := peer.State == 1 + logger.Info().Msg(fmt.Sprint("Result IsMySelf for ", peer.UUID, " : ", isNotReparted)) + if !(isNotReparted) { + logger.Debug().Msg("Reparted processing, on " + peer.UUID) + b.RemotePeers = append(b.RemotePeers, peer.UUID) + template.AddAdmiraltyAnnotations(peer.UUID, namespace) + } + } return nil } // Execute the last actions once the YAML file for the Argo Workflow is created -func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { - logger.Info().Msg(fmt.Sprint("DEV :: Completing build")) - setter := AdmiraltySetter{Id: executionsId} +func (b *ArgoBuilder) CompleteBuild(namespace string) (string, error) { + logger.Info().Msg("DEV :: Completing build") + setter := AdmiraltySetter{Id: namespace} // Setup admiralty for each node for _, peer := range b.RemotePeers { logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer)) - setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer) + setter.InitializeAdmiralty(conf.GetConfig().PeerID, peer) } - - // Update the name of the admiralty node to use - for _, template := range b.Workflow.Spec.Templates { - if len(template.Metadata.Annotations) > 0 { - if peerId, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok { - template.Metadata.Annotations["multicluster.admiralty.io/clustername"] = "target-" + peerId + "-" + conf.GetConfig().ExecutionID - } - } - } - // Generate the YAML file random_name := fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8) b.Workflow.Metadata.Name = "oc-monitor-" + random_name @@ -448,14 +188,14 @@ func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { yamlified, err := yaml.Marshal(b.Workflow) if err != nil { logger.Error().Msg("Could not transform object to yaml file") - return "", err + return "", err } // Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss current_timestamp := time.Now().Format("02_01_2006_150405") file_name := random_name + "_" + current_timestamp + ".yml" workflows_dir := "./argo_workflows/" err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660) - + if err != nil { logger.Error().Msg("Could not write the yaml file") return "", err diff --git a/workflow_builder/argo_services.go b/workflow_builder/argo_services.go index f95740a..e1e1c87 100644 --- a/workflow_builder/argo_services.go +++ b/workflow_builder/argo_services.go @@ -5,7 +5,6 @@ import ( "strings" "cloud.o-forge.io/core/oc-lib/models/resources" - "gopkg.in/yaml.v3" ) func (b *ArgoBuilder) CreateService(id string, processing *resources.ProcessingResource) { @@ -47,20 +46,9 @@ func (b *ArgoBuilder) completeServicePorts(service *models.Service, id string, p func (b *ArgoBuilder) addServiceToArgo() error { for _, service := range b.Services { - service_manifest, err := yaml.Marshal(service) - if err != nil { + if err := service.BindToArgo(b.Workflow); err != nil { return err } - service_template := models.Template{Name: "workflow-service-pod", - Resource: models.ServiceResource{ - Action: "create", - SuccessCondition: "status.succeeded > 0", - FailureCondition: "status.failed > 3", - SetOwnerReference: true, - Manifest: string(service_manifest), - }, - } - b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, service_template) } return nil } diff --git a/workflow_builder/graph.go b/workflow_builder/graph.go index edc0fc3..c71f1f9 100644 --- a/workflow_builder/graph.go +++ b/workflow_builder/graph.go @@ -3,6 +3,7 @@ package workflow_builder import ( "errors" "fmt" + "oc-monitord/models" oclib "cloud.o-forge.io/core/oc-lib" workflow "cloud.o-forge.io/core/oc-lib/models/workflow" @@ -48,7 +49,7 @@ func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, i return nil, 0, fmt.Errorf("can't export a graph that has not been loaded yet") } - argoBuilder := ArgoBuilder{OriginWorkflow: w.Workflow, Timeout: timeout} + argoBuilder := ArgoBuilder{OriginWorkflow: w.Workflow, Workflow: &models.Workflow{}, Timeout: timeout} stepMax, _, _, err := argoBuilder.CreateDAG(namespace, true) if err != nil { logger.Error().Msg("Could not create the argo file for " + w.Workflow.Name)