Merge branch 'main' of https://cloud.o-forge.io/core/oc-monitord
This commit is contained in:
		| @@ -30,32 +30,38 @@ func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID st | ||||
| 	} | ||||
| 	remotePeer := data.ToPeer() | ||||
|  | ||||
| 	data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(localPeerID) | ||||
| 	if data.Code != 200 { | ||||
| 		logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID) | ||||
| 		return fmt.Errorf(data.Err) | ||||
| 	} | ||||
| 	localPeer := data.ToPeer() | ||||
| 			data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(localPeerID) | ||||
| 			if data.Code != 200 { | ||||
| 				logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID) | ||||
| 				return fmt.Errorf(data.Err) | ||||
| 			} | ||||
| 			localPeer := data.ToPeer() | ||||
|  | ||||
| 	caller := tools.NewHTTPCaller( | ||||
| 		map[tools.DataType]map[tools.METHOD]string{ | ||||
| 			tools.ADMIRALTY_SOURCE: { | ||||
| 				tools.POST :"/:id", | ||||
| 			}, | ||||
| 			tools.ADMIRALTY_KUBECONFIG: { | ||||
| 				tools.GET:"/:id", | ||||
| 			}, | ||||
| 			tools.ADMIRALTY_SECRET: { | ||||
| 				tools.POST:"/:id", | ||||
| 			}, | ||||
| 			caller := tools.NewHTTPCaller( | ||||
| 				map[tools.DataType]map[tools.METHOD]string{ | ||||
| 					tools.ADMIRALTY_SOURCE: { | ||||
| 						tools.POST :"/:id", | ||||
| 					}, | ||||
| 					tools.ADMIRALTY_KUBECONFIG: { | ||||
| 						tools.GET:"/:id", | ||||
| 					}, | ||||
| 					tools.ADMIRALTY_SECRET: { | ||||
| 						tools.POST:"/:id", | ||||
| 					}, | ||||
| 					tools.ADMIRALTY_TARGET: { | ||||
| 						tools.POST:"/:id", | ||||
| 					}, | ||||
| 					tools.ADMIRALTY_NODES: { | ||||
| 						tools.GET:"/:id", | ||||
| 					}, | ||||
| 			tools.ADMIRALTY_TARGET: { | ||||
| 				tools.POST:"/:id", | ||||
| 					tools.POST:"/:id", | ||||
| 				}, | ||||
| 				tools.ADMIRALTY_NODES: { | ||||
| 					tools.GET:"/:id", | ||||
| 				}, | ||||
| 			}, | ||||
| 			tools.ADMIRALTY_NODES: { | ||||
| 				tools.GET:"/:id", | ||||
| 			}, | ||||
| 		}, | ||||
| 	) | ||||
| 		) | ||||
|  | ||||
| 	logger.Info().Msg(" Creating the Admiralty Source on " + remotePeerID + " ns-" + s.Id + "\n\n") | ||||
| 	_ = s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict},caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true) | ||||
|   | ||||
| @@ -9,28 +9,24 @@ import ( | ||||
| 	"oc-monitord/conf" | ||||
| 	. "oc-monitord/models" | ||||
| 	tools2 "oc-monitord/tools" | ||||
| 	"os" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	oclib "cloud.o-forge.io/core/oc-lib" | ||||
| 	"cloud.o-forge.io/core/oc-lib/logs" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/common/enum" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/resources" | ||||
| 	w "cloud.o-forge.io/core/oc-lib/models/workflow" | ||||
| 	"github.com/nwtgck/go-fakelish" | ||||
| 	"github.com/rs/zerolog" | ||||
| 	"gopkg.in/yaml.v3" | ||||
| ) | ||||
|  | ||||
| var logger zerolog.Logger | ||||
|  | ||||
| type ArgoBuilder struct { | ||||
| 	OriginWorkflow 	*w.Workflow | ||||
| 	Workflow       	Workflow | ||||
| 	Services       	[]*Service | ||||
| 	Timeout        	int | ||||
| 	RemotePeers		[]string | ||||
| 	OriginWorkflow *w.Workflow | ||||
| 	Workflow       Workflow | ||||
| 	Services       []*Service | ||||
| 	Timeout        int | ||||
| 	RemotePeers    []string | ||||
| } | ||||
|  | ||||
| type Workflow struct { | ||||
| @@ -177,7 +173,7 @@ func (b *ArgoBuilder) createArgoTemplates(namespace string, | ||||
| 	_, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems) | ||||
| 	template := &Template{Name: getArgoName(processing.GetName(), id)} | ||||
| 	fmt.Println("Creating template for", template.Name) | ||||
| 	isReparted, peerId := b.isProcessingReparted(*processing,id) | ||||
| 	isReparted, peerId := b.isProcessingReparted(*processing, id) | ||||
| 	template.CreateContainer(processing, b.Workflow.getDag()) | ||||
| 	if isReparted { | ||||
| 		b.RemotePeers = append(b.RemotePeers, peerId) | ||||
| @@ -367,20 +363,19 @@ func getArgoName(raw_name string, component_id string) (formatedName string) { | ||||
|  | ||||
| // Verify if a processing resource is attached to another Compute than the one hosting | ||||
| // the current Open Cloud instance. If true return the peer ID to contact | ||||
| func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource, graphID string) (bool,string) { | ||||
| func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource, graphID string) (bool, string) { | ||||
| 	computeAttached := b.retrieveProcessingCompute(graphID) | ||||
| 	if computeAttached == nil { | ||||
| 		logger.Error().Msg("No compute was found attached to processing " + processing.Name + " : " + processing.UUID ) | ||||
| 		logger.Error().Msg("No compute was found attached to processing " + processing.Name + " : " + processing.UUID) | ||||
| 		panic(0) | ||||
| 	} | ||||
|  | ||||
| 	 | ||||
| 	// Creates an accessor srtictly for Peer Collection  | ||||
| 	req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"","",nil,nil) | ||||
| 	// Creates an accessor srtictly for Peer Collection | ||||
| 	req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil) | ||||
| 	if req == nil { | ||||
| 		fmt.Println("TODO : handle error when trying to create a request on the Peer Collection") | ||||
| 		return false, "" | ||||
| 	}  | ||||
| 	} | ||||
|  | ||||
| 	res := req.LoadOne(computeAttached.CreatorID) | ||||
| 	if res.Err != "" { | ||||
| @@ -388,22 +383,22 @@ func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResour | ||||
| 		fmt.Print(res.Err) | ||||
| 		return false, "" | ||||
| 	} | ||||
| 	 | ||||
|  | ||||
| 	peer := *res.ToPeer() | ||||
|  | ||||
| 	isNotReparted, _ := peer.IsMySelf() | ||||
| 	fmt.Println("Result IsMySelf for ", peer.UUID ," : ", isNotReparted) | ||||
| 	 | ||||
| 	isNotReparted := peer.State == 1 | ||||
| 	fmt.Println("Result IsMySelf for ", peer.UUID, " : ", isNotReparted) | ||||
|  | ||||
| 	return !isNotReparted, peer.UUID | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.ComputeResource { | ||||
| 	for _, link := range b.OriginWorkflow.Graph.Links { | ||||
| 		// If a link contains the id of the processing | ||||
| 		var oppositeId string  | ||||
| 		if link.Source.ID == graphID{ | ||||
| 		var oppositeId string | ||||
| 		if link.Source.ID == graphID { | ||||
| 			oppositeId = link.Destination.ID | ||||
| 		} else if(link.Destination.ID == graphID){ | ||||
| 		} else if link.Destination.ID == graphID { | ||||
| 			oppositeId = link.Source.ID | ||||
| 		} | ||||
| 		fmt.Println("OppositeId : ", oppositeId) | ||||
| @@ -417,10 +412,9 @@ func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.Compu | ||||
| 		} | ||||
|  | ||||
| 	} | ||||
| 		 | ||||
| 	return nil  | ||||
| } | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Execute the last actions once the YAML file for the Argo Workflow is created | ||||
| func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { | ||||
| @@ -429,38 +423,8 @@ func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { | ||||
| 	// Setup admiralty for each node | ||||
| 	for _, peer := range b.RemotePeers { | ||||
| 		fmt.Println("DEV :: Launching Admiralty Setup for ", peer) | ||||
| 		setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer) | ||||
| 		setter := AdmiraltySetter{Id: executionsId} | ||||
| 		setter.InitializeAdmiralty(conf.GetConfig().PeerID, peer) | ||||
| 	} | ||||
|  | ||||
| 	// Update the name of the admiralty node to use  | ||||
| 	for _, template := range b.Workflow.Spec.Templates { | ||||
| 		if len(template.Metadata.Annotations) > 0 { | ||||
| 			if resp, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok { | ||||
| 				fmt.Println(resp) | ||||
| 				template.Metadata.Annotations["multicluster.admiralty.io/clustername"] = "target-" + conf.GetConfig().ExecutionID | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Generate the YAML file | ||||
| 	random_name := fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8) | ||||
| 	b.Workflow.Metadata.Name = "oc-monitor-" + random_name | ||||
| 	logger = oclib.GetLogger() | ||||
| 	yamlified, err := yaml.Marshal(b.Workflow) | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not transform object to yaml file") | ||||
| 		return "", err  | ||||
| 	} | ||||
| 	// Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss | ||||
| 	current_timestamp := time.Now().Format("02_01_2006_150405") | ||||
| 	file_name := random_name + "_" + current_timestamp + ".yml" | ||||
| 	workflows_dir := "./argo_workflows/" | ||||
| 	err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660) | ||||
| 	 | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not write the yaml file") | ||||
| 		return "", err | ||||
| 	} | ||||
|  | ||||
| 	return workflows_dir + file_name, nil | ||||
| } | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user