Merge branch 'feature/admiralty'
This commit is contained in:
		| @@ -4,18 +4,25 @@ import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"slices" | ||||
| 	"time" | ||||
|  | ||||
| 	oclib "cloud.o-forge.io/core/oc-lib" | ||||
| 	"cloud.o-forge.io/core/oc-lib/logs" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/peer" | ||||
| 	"cloud.o-forge.io/core/oc-lib/tools" | ||||
| 	tools "cloud.o-forge.io/core/oc-lib/tools" | ||||
| ) | ||||
|  | ||||
|  | ||||
| type AdmiraltySetter struct { | ||||
| 	Id	string		// ID to identify the execution, correspond to workflow_executions id | ||||
| 	Id			string				// ID to identify the execution, correspond to workflow_executions id | ||||
| 	NodeName 	string	// Allows to retrieve the name of the node used for this execution on each peer {"peerId": "nodeName"} | ||||
| } | ||||
|  | ||||
| func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID string) error { | ||||
| 	 | ||||
| 	logger = logs.GetLogger() | ||||
|  | ||||
| 	data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(remotePeerID) | ||||
| 	if data.Code != 200 { | ||||
| 		logger.Error().Msg("Error while trying to instantiate remote peer " + remotePeerID) | ||||
| @@ -51,22 +58,22 @@ func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID st | ||||
| 	) | ||||
|  | ||||
| 	logger.Info().Msg(" Creating the Admiralty Source on " + remotePeerID + " ns-" + s.Id + "\n\n") | ||||
| 	_ = s.callRemoteExecution(remotePeer, http.StatusCreated,caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil) | ||||
| 	_ = s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict},caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true) | ||||
| 	logger.Info().Msg(" Retrieving kubeconfig with the secret on " + remotePeerID + " ns-" + s.Id + "\n\n") | ||||
| 	kubeconfig := s.getKubeconfig(remotePeer, caller) | ||||
| 	logger.Info().Msg(" Creating a secret from the kubeconfig " + localPeerID + " ns-" + s.Id + "\n\n") | ||||
| 	_ = s.callRemoteExecution(localPeer, http.StatusCreated, caller,s.Id, tools.ADMIRALTY_SECRET, tools.POST,kubeconfig) | ||||
| 	_ = s.callRemoteExecution(localPeer, []int{http.StatusCreated}, caller,s.Id, tools.ADMIRALTY_SECRET, tools.POST,kubeconfig, true) | ||||
| 	logger.Info().Msg(" Creating the Admiralty Target on " + localPeerID + " ns-" + s.Id + "\n\n") | ||||
| 	_ = s.callRemoteExecution(localPeer,http.StatusCreated,caller,s.Id,tools.ADMIRALTY_TARGET,tools.POST, nil) | ||||
| 	_ = s.callRemoteExecution(localPeer,[]int{http.StatusCreated, http.StatusConflict},caller,s.Id,tools.ADMIRALTY_TARGET,tools.POST, nil, true) | ||||
| 	logger.Info().Msg(" Checking for the creation of the admiralty node on " + localPeerID + " ns-" + s.Id + "\n\n") | ||||
| 	_ = s.callRemoteExecution(localPeer,http.StatusOK,caller,s.Id,tools.ADMIRALTY_NODES,tools.GET, nil) | ||||
| 	s.checkNodeStatus(localPeer,caller) | ||||
| 	 | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCaller) map[string]string { | ||||
| 	var kubedata map[string]string | ||||
| 	_ = s.callRemoteExecution(peer, http.StatusOK, caller, s.Id, tools.ADMIRALTY_KUBECONFIG, tools.GET, nil) | ||||
| 	_ = s.callRemoteExecution(peer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_KUBECONFIG, tools.GET, nil, true) | ||||
| 	if caller.LastResults["body"] == nil || len(caller.LastResults["body"].([]byte)) == 0 { | ||||
| 		fmt.Println("Something went wrong when retrieving data from Get call for kubeconfig") | ||||
| 		panic(0) | ||||
| @@ -80,7 +87,7 @@ func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCalle | ||||
| 	return kubedata | ||||
| } | ||||
|  | ||||
| func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode int,caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}) *peer.PeerExecution { | ||||
| func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int,caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) *peer.PeerExecution { | ||||
| 	resp, err := peer.LaunchPeerExecution(peer.UUID, dataID, dt, method, body, caller) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("Error when executing on peer at", peer.Url) | ||||
| @@ -88,11 +95,49 @@ func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode int,ca | ||||
| 		panic(0) | ||||
| 	} | ||||
|  | ||||
| 	if caller.LastResults["code"].(int) != expectedCode { | ||||
| 	if !slices.Contains(expectedCode, caller.LastResults["code"].(int)) { | ||||
| 		fmt.Println("Didn't receive the expected code :", caller.LastResults["code"], "when expecting", expectedCode) | ||||
| 		fmt.Println(string(caller.LastResults["body"].(byte))) | ||||
| 		panic(0) | ||||
| 		if _, ok := caller.LastResults["body"]; ok { | ||||
| 			logger.Info().Msg(string(caller.LastResults["body"].([]byte))) | ||||
| 			// fmt.Println(string(caller.LastResults["body"].([]byte))) | ||||
| 		} | ||||
| 		if panicCode { | ||||
| 			panic(0) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return resp | ||||
| } | ||||
|  | ||||
| func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller){ | ||||
| 	var data map[string]interface{} | ||||
| 	if resp, ok := caller.LastResults["body"]; ok { | ||||
| 		json.Unmarshal(resp.([]byte), &data) | ||||
| 	} | ||||
|  | ||||
| 	if node, ok := data["node"]; ok { | ||||
| 		metadata := node.(map[string]interface{})["metadata"] | ||||
| 		name := metadata.(map[string]interface{})["name"].(string) | ||||
| 		s.NodeName = name | ||||
| 	} else { | ||||
| 		fmt.Println("Could not retrieve data about the recently created node") | ||||
| 		panic(0) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller){ | ||||
| 	for i := range(5) { | ||||
| 		time.Sleep(5 * time.Second) // let some time for kube to generate the node | ||||
| 		_ = s.callRemoteExecution(localPeer,[]int{http.StatusOK},caller,s.Id,tools.ADMIRALTY_NODES,tools.GET, nil, false) | ||||
| 		if caller.LastResults["code"] == 200 { | ||||
| 			s.storeNodeName(caller) | ||||
| 			return | ||||
| 		} | ||||
| 		if i == 5 { | ||||
| 			logger.Error().Msg("Node on " + localPeer.Name + " was never found, panicking !") | ||||
| 			panic(0) | ||||
| 		} | ||||
| 		logger.Info().Msg("Could not verify that node is up. Retrying...") | ||||
| 	} | ||||
| 	 | ||||
| } | ||||
| @@ -14,6 +14,7 @@ import ( | ||||
| 	"time" | ||||
|  | ||||
| 	oclib "cloud.o-forge.io/core/oc-lib" | ||||
| 	"cloud.o-forge.io/core/oc-lib/logs" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/common/enum" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/resources" | ||||
| 	w "cloud.o-forge.io/core/oc-lib/models/workflow" | ||||
| @@ -52,16 +53,18 @@ func (b *Workflow) getDag() *Dag { | ||||
| } | ||||
|  | ||||
| type Spec struct { | ||||
| 	Entrypoint string                `yaml:"entrypoint"` | ||||
| 	Arguments  []Parameter           `yaml:"arguments,omitempty"` | ||||
| 	Volumes    []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"` | ||||
| 	Templates  []Template            `yaml:"templates"` | ||||
| 	Timeout    int                   `yaml:"activeDeadlineSeconds,omitempty"` | ||||
| 	ServiceAccountName	string					`yaml:"serviceAccountName"` | ||||
| 	Entrypoint 			string                	`yaml:"entrypoint"` | ||||
| 	Arguments  			[]Parameter           	`yaml:"arguments,omitempty"` | ||||
| 	Volumes    			[]VolumeClaimTemplate 	`yaml:"volumeClaimTemplates,omitempty"` | ||||
| 	Templates  			[]Template            	`yaml:"templates"` | ||||
| 	Timeout    			int                   	`yaml:"activeDeadlineSeconds,omitempty"` | ||||
| } | ||||
|  | ||||
| // TODO: found on a processing instance linked to storage | ||||
| // add s3, gcs, azure, etc if needed on a link between processing and storage | ||||
| func (b *ArgoBuilder) CreateDAG(namespace string, write bool) (string, int, []string, []string, error) { | ||||
| func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, []string, error) { | ||||
| 	logger = logs.GetLogger() | ||||
| 	fmt.Println("Creating DAG", b.OriginWorkflow.Graph.Items) | ||||
| 	// handle services by checking if there is only one processing with hostname and port | ||||
| 	firstItems, lastItems, volumes := b.createTemplates(namespace) | ||||
| @@ -70,30 +73,16 @@ func (b *ArgoBuilder) CreateDAG(namespace string, write bool) (string, int, []st | ||||
| 	if b.Timeout > 0 { | ||||
| 		b.Workflow.Spec.Timeout = b.Timeout | ||||
| 	} | ||||
| 	b.Workflow.Spec.ServiceAccountName = "sa-"+namespace | ||||
| 	b.Workflow.Spec.Entrypoint = "dag" | ||||
| 	b.Workflow.ApiVersion = "argoproj.io/v1alpha1" | ||||
| 	b.Workflow.Kind = "Workflow" | ||||
| 	if !write { | ||||
| 		return "", len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil | ||||
| 		return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil | ||||
| 	} | ||||
| 	random_name := fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8) | ||||
| 	b.Workflow.Metadata.Name = "oc-monitor-" + random_name | ||||
| 	logger = oclib.GetLogger() | ||||
| 	yamlified, err := yaml.Marshal(b.Workflow) | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not transform object to yaml file") | ||||
| 		return "", 0, firstItems, lastItems, err | ||||
| 	} | ||||
| 	// Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss | ||||
| 	current_timestamp := time.Now().Format("02_01_2006_150405") | ||||
| 	file_name := random_name + "_" + current_timestamp + ".yml" | ||||
| 	workflows_dir := "./argo_workflows/" | ||||
| 	err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660) | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not write the yaml file") | ||||
| 		return "", 0, firstItems, lastItems, err | ||||
| 	} | ||||
| 	return workflows_dir + file_name, len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil | ||||
| 	 | ||||
| 	 | ||||
| 	return  len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []VolumeMount) { | ||||
| @@ -122,7 +111,7 @@ func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []V | ||||
| 			continue | ||||
| 		} | ||||
| 		subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Timeout: b.Timeout} | ||||
| 		_, _, fi, li, err := subBuilder.CreateDAG(namespace, false) | ||||
| 		_, fi, li, err := subBuilder.CreateDAG(namespace, false) | ||||
| 		if err != nil { | ||||
| 			logger.Error().Msg("Error creating the subworkflow : " + err.Error()) | ||||
| 			continue | ||||
| @@ -192,6 +181,7 @@ func (b *ArgoBuilder) createArgoTemplates(namespace string, | ||||
| 	template.CreateContainer(processing, b.Workflow.getDag()) | ||||
| 	if isReparted { | ||||
| 		b.RemotePeers = append(b.RemotePeers, peerId) | ||||
| 		template.AddAdmiraltyAnnotations(peerId) | ||||
| 	} | ||||
| 	// get datacenter from the processing | ||||
| 	if processing.IsService { | ||||
| @@ -433,12 +423,44 @@ func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.Compu | ||||
|  | ||||
|  | ||||
| // Execute the last actions once the YAML file for the Argo Workflow is created | ||||
| func (b *ArgoBuilder) CompleteBuild(executionsId string) error { | ||||
| func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { | ||||
| 	fmt.Println("DEV :: Completing build") | ||||
| 	setter := AdmiraltySetter{Id: executionsId} | ||||
| 	// Setup admiralty for each node | ||||
| 	for _, peer := range b.RemotePeers { | ||||
| 		fmt.Println("DEV :: Launching Admiralty Setup for ", peer) | ||||
| 		setter := AdmiraltySetter{Id: executionsId} | ||||
| 		setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer) | ||||
| 	} | ||||
| 	return nil | ||||
|  | ||||
| 	// Update the name of the admiralty node to use  | ||||
| 	for _, template := range b.Workflow.Spec.Templates { | ||||
| 		if len(template.Metadata.Annotations) > 0 { | ||||
| 			if resp, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok { | ||||
| 				fmt.Println(resp) | ||||
| 				template.Metadata.Annotations["multicluster.admiralty.io/clustername"] = "target-" + conf.GetConfig().ExecutionID | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Generate the YAML file | ||||
| 	random_name := fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8) | ||||
| 	b.Workflow.Metadata.Name = "oc-monitor-" + random_name | ||||
| 	logger = oclib.GetLogger() | ||||
| 	yamlified, err := yaml.Marshal(b.Workflow) | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not transform object to yaml file") | ||||
| 		return "", err  | ||||
| 	} | ||||
| 	// Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss | ||||
| 	current_timestamp := time.Now().Format("02_01_2006_150405") | ||||
| 	file_name := random_name + "_" + current_timestamp + ".yml" | ||||
| 	workflows_dir := "./argo_workflows/" | ||||
| 	err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660) | ||||
| 	 | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not write the yaml file") | ||||
| 		return "", err | ||||
| 	} | ||||
|  | ||||
| 	return workflows_dir + file_name, nil | ||||
| } | ||||
| @@ -41,20 +41,20 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo | ||||
| 	return new_wf, nil | ||||
| } | ||||
|  | ||||
| func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder,string, int, error) { | ||||
| func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, int, error) { | ||||
| 	logger := oclib.GetLogger() | ||||
| 	fmt.Println("Exporting to Argo", w.Workflow) | ||||
| 	if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil { | ||||
| 		return nil, "", 0, fmt.Errorf("can't export a graph that has not been loaded yet") | ||||
| 		return nil, 0, fmt.Errorf("can't export a graph that has not been loaded yet") | ||||
| 	} | ||||
|  | ||||
| 	argoBuilder := ArgoBuilder{OriginWorkflow: w.Workflow, Timeout: timeout} | ||||
| 	filename, stepMax, _, _, err := argoBuilder.CreateDAG(namespace, true) | ||||
| 	stepMax, _, _, err := argoBuilder.CreateDAG(namespace, true) | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not create the argo file for " + w.Workflow.Name) | ||||
| 		return nil, "", 0, err | ||||
| 		return nil, 0, err | ||||
| 	} | ||||
| 	return &argoBuilder, filename, stepMax, nil | ||||
| 	return &argoBuilder, stepMax, nil | ||||
| } | ||||
|  | ||||
| // TODO implement this function | ||||
|   | ||||
		Reference in New Issue
	
	Block a user