diff --git a/main.go b/main.go index 4e417e5..a316003 100644 --- a/main.go +++ b/main.go @@ -88,32 +88,33 @@ func main() { logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API") } - builder, argo_file_path, stepMax, err := new_wf.ExportToArgo(exec.ExecutionsID, conf.GetConfig().Timeout) + builder, stepMax, err := new_wf.ExportToArgo(exec.ExecutionsID, conf.GetConfig().Timeout) if err != nil { logger.Error().Msg("Could not create the Argo file for " + conf.GetConfig().WorkflowID) logger.Error().Msg(err.Error()) } - logger.Debug().Msg("Created :" + argo_file_path) + + + argo_file_path, err := builder.CompleteBuild(exec.ExecutionsID) + if err != nil { + logger.Error().Msg(err.Error()) + } workflowName = getContainerName(argo_file_path) wf_logger = logger.With().Str("argo_name", workflowName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger() wf_logger.Debug().Msg("Testing argo name") - - err = builder.CompleteBuild(exec.ExecutionsID) - if err != nil { - logger.Error().Msg(err.Error()) - } _ = stepMax - // if conf.GetConfig().KubeHost == "" { - // // Not in a k8s environment, get conf from parameters - // fmt.Println("Executes outside of k8s") - // executeOutside(argo_file_path, stepMax) - // } else { - // // Executed in a k8s environment - // fmt.Println("Executes inside a k8s") - // executeInside(exec.GetID(), "argo", argo_file_path, stepMax) - // } + + if conf.GetConfig().KubeHost == "" { + // Not in a k8s environment, get conf from parameters + fmt.Println("Executes outside of k8s") + executeOutside(argo_file_path, stepMax) + } else { + // Executed in a k8s environment + fmt.Println("Executes inside a k8s") + executeInside(exec.GetID(), "argo", argo_file_path, stepMax) + } } // So far we only log the output from diff --git a/models/template.go b/models/template.go index 8398a6c..f7a6019 100644 --- a/models/template.go +++ b/models/template.go @@ -58,6 +58,7 @@ type Dag struct { type TemplateMetadata struct { Labels map[string]string `yaml:"labels,omitempty"` + Annotations map[string]string `yaml:"annotations,omitempty"` } type Secret struct { @@ -139,3 +140,13 @@ func (template *Template) ReplacePerEnv(arg string, envs []models.Param) string } return arg } + +// Add the metadata that allow Admiralty to pick up an Argo Workflow that needs to be reparted +// The value of "clustername" is the peerId, which must be replaced by the node name's for this specific execution +func (t *Template) AddAdmiraltyAnnotations(peerId string){ + if t.Metadata.Annotations == nil { + t.Metadata.Annotations = make(map[string]string) + } + t.Metadata.Annotations["multicluster.admiralty.io/elect"] = "" + t.Metadata.Annotations["multicluster.admiralty.io/clustername"] = peerId +} \ No newline at end of file diff --git a/workflow_builder/admiralty_setter.go b/workflow_builder/admiralty_setter.go index fff8c25..f4af8ef 100644 --- a/workflow_builder/admiralty_setter.go +++ b/workflow_builder/admiralty_setter.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "net/http" + "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/models/peer" @@ -11,7 +12,8 @@ import ( ) type AdmiraltySetter struct { - Id string // ID to identify the execution, correspond to workflow_executions id + Id string // ID to identify the execution, correspond to workflow_executions id + NodeName string // Allows to retrieve the name of the node used for this execution on each peer {"peerId": "nodeName"} } func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID string) error { @@ -50,18 +52,19 @@ func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID st }, ) fmt.Println("Creating source in", remotePeerID, " ns-" + s.Id) - _ = s.callRemoteExecution(remotePeer, http.StatusCreated,caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil) + _ = s.callRemoteExecution(remotePeer, http.StatusCreated,caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true) kubeconfig := s.getKubeconfig(remotePeer, caller) - _ = s.callRemoteExecution(localPeer, http.StatusCreated, caller,s.Id, tools.ADMIRALTY_SECRET, tools.POST,kubeconfig) - _ = s.callRemoteExecution(localPeer,http.StatusCreated,caller,s.Id,tools.ADMIRALTY_TARGET,tools.POST, nil) - _ = s.callRemoteExecution(localPeer,http.StatusOK,caller,s.Id,tools.ADMIRALTY_NODES,tools.GET, nil) + _ = s.callRemoteExecution(localPeer, http.StatusCreated, caller,s.Id, tools.ADMIRALTY_SECRET, tools.POST,kubeconfig, true) + _ = s.callRemoteExecution(localPeer,http.StatusCreated,caller,s.Id,tools.ADMIRALTY_TARGET,tools.POST, nil, true) + + s.checkNodeStatus(localPeer,caller) return nil } func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCaller) map[string]string { var kubedata map[string]string - _ = s.callRemoteExecution(peer, http.StatusOK, caller, s.Id, tools.ADMIRALTY_KUBECONFIG, tools.GET, nil) + _ = s.callRemoteExecution(peer, http.StatusOK, caller, s.Id, tools.ADMIRALTY_KUBECONFIG, tools.GET, nil, true) if caller.LastResults["body"] == nil || len(caller.LastResults["body"].([]byte)) == 0 { fmt.Println("Something went wrong when retrieving data from Get call for kubeconfig") panic(0) @@ -75,7 +78,7 @@ func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCalle return kubedata } -func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode int,caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}) *peer.PeerExecution { +func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode int,caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) *peer.PeerExecution { resp, err := peer.LaunchPeerExecution(peer.UUID, dataID, dt, method, body, caller) if err != nil { fmt.Println("Error when executing on peer at", peer.Url) @@ -85,9 +88,47 @@ func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode int,ca if caller.LastResults["code"].(int) != expectedCode { fmt.Println("Didn't receive the expected code :", caller.LastResults["code"], "when expecting", expectedCode) - fmt.Println(string(caller.LastResults["body"].(byte))) - panic(0) + if _, ok := caller.LastResults["body"]; ok { + logger.Info().Msg(string(caller.LastResults["body"].([]byte))) + // fmt.Println(string(caller.LastResults["body"].([]byte))) + } + if panicCode { + panic(0) + } } return resp +} + +func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller){ + var data map[string]interface{} + if resp, ok := caller.LastResults["body"]; ok { + json.Unmarshal(resp.([]byte), &data) + } + + if node, ok := data["node"]; ok { + metadata := node.(map[string]interface{})["metadata"] + name := metadata.(map[string]interface{})["name"].(string) + s.NodeName = name + } else { + fmt.Println("Could not retrieve data about the recently created node") + panic(0) + } +} + +func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller){ + for i := range(5) { + time.Sleep(5 * time.Second) // let some time for kube to generate the node + _ = s.callRemoteExecution(localPeer,http.StatusOK,caller,s.Id,tools.ADMIRALTY_NODES,tools.GET, nil, false) + if caller.LastResults["code"] == 200 { + s.storeNodeName(caller) + return + } + if i == 5 { + logger.Error().Msg("Node on " + localPeer.Name + " was never found, panicking !") + panic(0) + } + logger.Info().Msg("Could not verify that node is up. Retrying...") + } + } \ No newline at end of file diff --git a/workflow_builder/argo_builder.go b/workflow_builder/argo_builder.go index dd90272..9cf71a8 100644 --- a/workflow_builder/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -61,7 +61,7 @@ type Spec struct { // TODO: found on a processing instance linked to storage // add s3, gcs, azure, etc if needed on a link between processing and storage -func (b *ArgoBuilder) CreateDAG(namespace string, write bool) (string, int, []string, []string, error) { +func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, []string, error) { fmt.Println("Creating DAG", b.OriginWorkflow.Graph.Items) // handle services by checking if there is only one processing with hostname and port firstItems, lastItems, volumes := b.createTemplates(namespace) @@ -74,26 +74,11 @@ func (b *ArgoBuilder) CreateDAG(namespace string, write bool) (string, int, []st b.Workflow.ApiVersion = "argoproj.io/v1alpha1" b.Workflow.Kind = "Workflow" if !write { - return "", len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil + return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil } - random_name := fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8) - b.Workflow.Metadata.Name = "oc-monitor-" + random_name - logger = oclib.GetLogger() - yamlified, err := yaml.Marshal(b.Workflow) - if err != nil { - logger.Error().Msg("Could not transform object to yaml file") - return "", 0, firstItems, lastItems, err - } - // Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss - current_timestamp := time.Now().Format("02_01_2006_150405") - file_name := random_name + "_" + current_timestamp + ".yml" - workflows_dir := "./argo_workflows/" - err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660) - if err != nil { - logger.Error().Msg("Could not write the yaml file") - return "", 0, firstItems, lastItems, err - } - return workflows_dir + file_name, len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil + + + return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil } func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []VolumeMount) { @@ -122,7 +107,7 @@ func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []V continue } subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Timeout: b.Timeout} - _, _, fi, li, err := subBuilder.CreateDAG(namespace, false) + _, fi, li, err := subBuilder.CreateDAG(namespace, false) if err != nil { logger.Error().Msg("Error creating the subworkflow : " + err.Error()) continue @@ -192,6 +177,7 @@ func (b *ArgoBuilder) createArgoTemplates(namespace string, template.CreateContainer(processing, b.Workflow.getDag()) if isReparted { b.RemotePeers = append(b.RemotePeers, peerId) + template.AddAdmiraltyAnnotations(peerId) } // get datacenter from the processing if processing.IsService { @@ -433,12 +419,44 @@ func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.Compu // Execute the last actions once the YAML file for the Argo Workflow is created -func (b *ArgoBuilder) CompleteBuild(executionsId string) error { +func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { fmt.Println("DEV :: Completing build") + setter := AdmiraltySetter{Id: executionsId} + // Setup admiralty for each node for _, peer := range b.RemotePeers { fmt.Println("DEV :: Launching Admiralty Setup for ", peer) - setter := AdmiraltySetter{Id: executionsId} setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer) } - return nil + + // Update the name of the admiralty node to use + for _, template := range b.Workflow.Spec.Templates { + if len(template.Metadata.Annotations) > 0 { + if resp, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok { + fmt.Println(resp) + template.Metadata.Annotations["multicluster.admiralty.io/clustername"] = setter.NodeName + } + } + } + + // Generate the YAML file + random_name := fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8) + b.Workflow.Metadata.Name = "oc-monitor-" + random_name + logger = oclib.GetLogger() + yamlified, err := yaml.Marshal(b.Workflow) + if err != nil { + logger.Error().Msg("Could not transform object to yaml file") + return "", err + } + // Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss + current_timestamp := time.Now().Format("02_01_2006_150405") + file_name := random_name + "_" + current_timestamp + ".yml" + workflows_dir := "./argo_workflows/" + err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660) + + if err != nil { + logger.Error().Msg("Could not write the yaml file") + return "", err + } + + return workflows_dir + file_name, nil } \ No newline at end of file diff --git a/workflow_builder/graph.go b/workflow_builder/graph.go index 5716ee7..6cb688e 100644 --- a/workflow_builder/graph.go +++ b/workflow_builder/graph.go @@ -41,20 +41,20 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo return new_wf, nil } -func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder,string, int, error) { +func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, int, error) { logger := oclib.GetLogger() fmt.Println("Exporting to Argo", w.Workflow) if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil { - return nil, "", 0, fmt.Errorf("can't export a graph that has not been loaded yet") + return nil, 0, fmt.Errorf("can't export a graph that has not been loaded yet") } argoBuilder := ArgoBuilder{OriginWorkflow: w.Workflow, Timeout: timeout} - filename, stepMax, _, _, err := argoBuilder.CreateDAG(namespace, true) + stepMax, _, _, err := argoBuilder.CreateDAG(namespace, true) if err != nil { logger.Error().Msg("Could not create the argo file for " + w.Workflow.Name) - return nil, "", 0, err + return nil, 0, err } - return &argoBuilder, filename, stepMax, nil + return &argoBuilder, stepMax, nil } // TODO implement this function