Added methods to verify if workflow contains compute on other peers before setting up admiralty
This commit is contained in:
		| @@ -1,2 +1,53 @@ | ||||
| package workflow_builder | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
|  | ||||
| 	oclib "cloud.o-forge.io/core/oc-lib" | ||||
| 	"cloud.o-forge.io/core/oc-lib/tools" | ||||
| ) | ||||
|  | ||||
| type AdmiraltySetter struct { | ||||
| 	Id	string | ||||
| } | ||||
|  | ||||
| func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID string) error { | ||||
| 	 | ||||
| 	data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(remotePeerID) | ||||
| 	if data.Code != 200 { | ||||
| 		logger.Error().Msg("Error while trying to instantiate remote peer " + remotePeerID) | ||||
| 		return fmt.Errorf(data.Err) | ||||
| 	} | ||||
| 	remotePeer := data.ToPeer() | ||||
|  | ||||
| 	caller := tools.NewHTTPCaller( | ||||
| 		map[tools.DataType]map[tools.METHOD]string{ | ||||
| 			tools.ADMIRALTY_SOURCE: map[tools.METHOD]string{ | ||||
| 				tools.POST : "/:id", | ||||
| 			}, | ||||
| 			tools.ADMIRALTY_KUBECONFIG: map[tools.METHOD]string{ | ||||
| 				tools.POST: "/:id", | ||||
| 			}, | ||||
| 			tools.ADMIRALTY_SECRET: map[tools.METHOD]string{ | ||||
| 				tools.POST: "/:id", | ||||
| 			}, | ||||
| 			tools.ADMIRALTY_TARGET: map[tools.METHOD]string{ | ||||
| 				tools.POST: "/:id", | ||||
| 			}, | ||||
| 			tools.ADMIRALTY_NODES: map[tools.METHOD]string{ | ||||
| 				tools.GET: "/id", | ||||
| 			}, | ||||
| 		}, | ||||
| 	) | ||||
| 	fmt.Println("Creating source in ") | ||||
| 	resp, err := remotePeer.LaunchPeerExecution(remotePeer.UUID,"toto-5",tools.ADMIRALTY_SOURCE,tools.POST,nil,caller) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("Error contacting remote peer") | ||||
| 		fmt.Println(err) | ||||
|    		panic(0) | ||||
| 	} | ||||
|  | ||||
| 	fmt.Println(resp) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
| @@ -17,7 +17,6 @@ import ( | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/common/enum" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/resources" | ||||
| 	w "cloud.o-forge.io/core/oc-lib/models/workflow" | ||||
| 	"cloud.o-forge.io/core/oc-lib/tools" | ||||
| 	"github.com/nwtgck/go-fakelish" | ||||
| 	"github.com/rs/zerolog" | ||||
| 	"gopkg.in/yaml.v3" | ||||
| @@ -26,10 +25,11 @@ import ( | ||||
| var logger zerolog.Logger | ||||
|  | ||||
| type ArgoBuilder struct { | ||||
| 	OriginWorkflow *w.Workflow | ||||
| 	Workflow       Workflow | ||||
| 	Services       []*Service | ||||
| 	Timeout        int | ||||
| 	OriginWorkflow 	*w.Workflow | ||||
| 	Workflow       	Workflow | ||||
| 	Services       	[]*Service | ||||
| 	Timeout        	int | ||||
| 	RemotePeers		[]string | ||||
| } | ||||
|  | ||||
| type Workflow struct { | ||||
| @@ -188,8 +188,11 @@ func (b *ArgoBuilder) createArgoTemplates(namespace string, | ||||
| 	_, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems) | ||||
| 	template := &Template{Name: getArgoName(processing.GetName(), id)} | ||||
| 	fmt.Println("Creating template for", template.Name) | ||||
| 	isReparted, url := b.isProcessingReparted(*processing) | ||||
| 	template.CreateContainer(processing, b.Workflow.getDag(), isReparted, url) | ||||
| 	isReparted, peerId := b.isProcessingReparted(*processing,id) | ||||
| 	template.CreateContainer(processing, b.Workflow.getDag()) | ||||
| 	if isReparted { | ||||
| 		b.RemotePeers = append(b.RemotePeers, peerId) | ||||
| 	} | ||||
| 	// get datacenter from the processing | ||||
| 	if processing.IsService { | ||||
| 		b.CreateService(id, processing) | ||||
| @@ -373,10 +376,15 @@ func getArgoName(raw_name string, component_id string) (formatedName string) { | ||||
| } | ||||
|  | ||||
| // Verify if a processing resource is attached to another Compute than the one hosting | ||||
| // the current Open Cloud instance. If true return the URL to contact the remote instance | ||||
| // kube API | ||||
| func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource) (bool,string) { | ||||
| 	processCreator := processing.CreatorID | ||||
| // the current Open Cloud instance. If true return the peer ID to contact | ||||
| func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource, graphID string) (bool,string) { | ||||
| 	computeAttached := b.retrieveProcessingCompute(graphID) | ||||
| 	if computeAttached == nil { | ||||
| 		logger.Error().Msg("No compute was found attached to processing " + processing.Name + " : " + processing.UUID ) | ||||
| 		panic(0) | ||||
| 	} | ||||
|  | ||||
| 	 | ||||
| 	// Creates an accessor srtictly for Peer Collection  | ||||
| 	req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"","",nil,nil) | ||||
| 	if req == nil { | ||||
| @@ -384,79 +392,38 @@ func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResour | ||||
| 		return false, "" | ||||
| 	}  | ||||
|  | ||||
| 	res := req.LoadOne(processCreator) | ||||
| 	res := req.LoadOne(computeAttached.CreatorID) | ||||
| 	if res.Err != "" { | ||||
| 		fmt.Print("TODO : handle error when requesting PeerID") | ||||
| 		fmt.Print(res.Err) | ||||
| 		return false, "" | ||||
| 	} | ||||
| 	 | ||||
| 	peer := res.ToPeer() | ||||
| 	if peer == nil { | ||||
| 		fmt.Print("TODO : handle error when converting PeerID") | ||||
| 	} | ||||
| 	peer := *res.ToPeer() | ||||
|  | ||||
| 	isNotReparted, _ := peer.IsMySelf() | ||||
| 	if !isNotReparted { | ||||
| 		// remoteCompute := b.retrieveProcessingCompute(processing) | ||||
| 		// computeInstance := remoteCompute.GetSelectedInstance() | ||||
| 		// if computeInstance == nil { | ||||
| 		// 	fmt.Println("TODO: handle when retrieving instance") | ||||
| 		// 	return false, "" | ||||
| 		// } | ||||
|  | ||||
| 		// instance := computeInstance.(*resources.ComputeResourceInstance) | ||||
| 		// return true, instance.Source | ||||
| 		//dataID == executionID | ||||
| 		caller := tools.NewHTTPCaller( | ||||
| 			map[tools.DataType]map[tools.METHOD]string{ | ||||
| 				tools.ADMIRALTY_SOURCE: map[tools.METHOD]string{ | ||||
| 					tools.POST : "/:id", | ||||
| 				}, | ||||
| 				tools.ADMIRALTY_KUBECONFIG: map[tools.METHOD]string{ | ||||
| 					tools.POST: "/:id", | ||||
| 				}, | ||||
| 				tools.ADMIRALTY_SECRET: map[tools.METHOD]string{ | ||||
| 					tools.POST: "/:id", | ||||
| 				}, | ||||
| 				tools.ADMIRALTY_TARGET: map[tools.METHOD]string{ | ||||
| 					tools.POST: "/:id", | ||||
| 				}, | ||||
| 				tools.ADMIRALTY_NODES: map[tools.METHOD]string{ | ||||
| 					tools.GET: "/id", | ||||
| 				}, | ||||
| 			}, | ||||
| 		) | ||||
|  | ||||
| 		res, err := peer.LaunchPeerExecution(peer.UUID,"toto-5",tools.ADMIRALTY_SOURCE,tools.POST,nil,caller) | ||||
| 		if err != nil { | ||||
| 			fmt.Println("Error contacting remote peer") | ||||
| 			fmt.Println(err) | ||||
| 			panic(0) | ||||
| 		} | ||||
| 		fmt.Println(res) | ||||
| 		// peer.LaunchPeerExecution(peer.UUID,"toto-5",ADMIRALTY_TOKEN,tools.GET,nil,caller) | ||||
| 	} | ||||
| 	fmt.Println("Result IsMySelf for ", peer.UUID ," : ", isNotReparted) | ||||
| 	 | ||||
| 	return false, "" | ||||
| 	return !isNotReparted, peer.UUID | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) retrieveProcessingCompute(processing resources.ProcessingResource) *resources.ComputeResource { | ||||
| func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.ComputeResource { | ||||
| 	for _, link := range b.OriginWorkflow.Graph.Links { | ||||
| 		// If a link contains the id of the processing | ||||
| 		var oppositeId string  | ||||
| 		if link.Source.ID == processing.AbstractResource.UUID{ | ||||
| 		if link.Source.ID == graphID{ | ||||
| 			oppositeId = link.Destination.ID | ||||
| 		} else if(link.Destination.ID == processing.AbstractResource.UUID){ | ||||
| 		} else if(link.Destination.ID == graphID){ | ||||
| 			oppositeId = link.Source.ID | ||||
| 		} | ||||
|  | ||||
| 		fmt.Println("OppositeId : ", oppositeId) | ||||
| 		if oppositeId != "" { | ||||
| 			isCompute, object := isCompute(oppositeId) | ||||
| 			if !isCompute { | ||||
| 			dt, res := b.OriginWorkflow.Graph.GetResource(oppositeId) | ||||
| 			if dt == oclib.COMPUTE_RESOURCE { | ||||
| 				return res.(*resources.ComputeResource) | ||||
| 			} else { | ||||
| 				continue | ||||
| 			} | ||||
| 			return object | ||||
| 		} | ||||
|  | ||||
| 	} | ||||
| @@ -464,22 +431,14 @@ func (b *ArgoBuilder) retrieveProcessingCompute(processing resources.ProcessingR | ||||
| 	return nil  | ||||
| } | ||||
|  | ||||
| func isCompute(resourceId string) (bool, *resources.ComputeResource) { | ||||
| 	req := oclib.NewRequest(oclib.LibDataEnum(oclib.COMPUTE_RESOURCE),"","",nil,nil) | ||||
| 	if req == nil { | ||||
| 		fmt.Print("TODO : handle error when creating a NewRequest()") | ||||
| 		return false, nil | ||||
| 	} | ||||
| 	res := req.LoadOne(resourceId) | ||||
| 		 | ||||
| 	if res.Err != "" { | ||||
| 		return false, nil | ||||
| 	} | ||||
| 	 | ||||
| 	compute := res.ToComputeResource() | ||||
| 	if compute == nil {	// Maybe we should add an Err returned by ToXXXXResource()  | ||||
| 		return false, nil | ||||
| 	}			 | ||||
|  | ||||
| 	return true, compute | ||||
| // Execute the last actions once the YAML file for the Argo Workflow is created | ||||
| func (b *ArgoBuilder) CompleteBuild(executionsId string) error { | ||||
| 	fmt.Println("DEV :: Completing build") | ||||
| 	for _, peer := range b.RemotePeers { | ||||
| 		fmt.Println("DEV :: Launching Admiralty Setup for ", peer) | ||||
| 		setter := AdmiraltySetter{Id: executionsId} | ||||
| 		setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| @@ -41,20 +41,20 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo | ||||
| 	return new_wf, nil | ||||
| } | ||||
|  | ||||
| func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (string, int, error) { | ||||
| func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder,string, int, error) { | ||||
| 	logger := oclib.GetLogger() | ||||
| 	fmt.Println("Exporting to Argo", w.Workflow) | ||||
| 	if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil { | ||||
| 		return "", 0, fmt.Errorf("can't export a graph that has not been loaded yet") | ||||
| 		return nil, "", 0, fmt.Errorf("can't export a graph that has not been loaded yet") | ||||
| 	} | ||||
|  | ||||
| 	argo_builder := ArgoBuilder{OriginWorkflow: w.Workflow, Timeout: timeout} | ||||
| 	filename, stepMax, _, _, err := argo_builder.CreateDAG(namespace, true) | ||||
| 	argoBuilder := ArgoBuilder{OriginWorkflow: w.Workflow, Timeout: timeout} | ||||
| 	filename, stepMax, _, _, err := argoBuilder.CreateDAG(namespace, true) | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not create the argo file for " + w.Workflow.Name) | ||||
| 		return "", 0, err | ||||
| 		return nil, "", 0, err | ||||
| 	} | ||||
| 	return filename, stepMax, nil | ||||
| 	return &argoBuilder, filename, stepMax, nil | ||||
| } | ||||
|  | ||||
| // TODO implement this function | ||||
|   | ||||
		Reference in New Issue
	
	Block a user