From 3fa2cd3336e29aa9d87bfbeb1b662684ee1f48b7 Mon Sep 17 00:00:00 2001 From: pb Date: Fri, 28 Feb 2025 14:15:59 +0100 Subject: [PATCH] starting implementation of admiralty annotations --- models/template.go | 8 ++- workflow_builder/argo_builder.go | 88 +++++++++++++++++++++++++++++++- 2 files changed, 94 insertions(+), 2 deletions(-) diff --git a/models/template.go b/models/template.go index f5a1d65..75b1e12 100644 --- a/models/template.go +++ b/models/template.go @@ -85,7 +85,7 @@ type Template struct { Resource ServiceResource `yaml:"resource,omitempty"` } -func (template *Template) CreateContainer(processing *resources.ProcessingResource, dag *Dag) { +func (template *Template) CreateContainer(processing *resources.ProcessingResource, dag *Dag, isReparted bool, reparted_url string) { instance := processing.GetSelectedInstance() if instance == nil { return @@ -106,6 +106,7 @@ func (template *Template) CreateContainer(processing *resources.ProcessingResour template.Outputs.Parameters = append(template.Inputs.Parameters, Parameter{Name: v.Name}) } cmd := strings.ReplaceAll(inst.Access.Container.Command, container.Image, "") + for _, a := range strings.Split(cmd, " ") { container.Args = append(container.Args, template.ReplacePerEnv(a, inst.Env)) } @@ -113,6 +114,11 @@ func (template *Template) CreateContainer(processing *resources.ProcessingResour container.Args = append(container.Args, template.ReplacePerEnv(a, inst.Env)) } container.Args = []string{strings.Join(container.Args, " ")} + + if isReparted { + + } + template.Container = container } diff --git a/workflow_builder/argo_builder.go b/workflow_builder/argo_builder.go index 51db038..014d99f 100644 --- a/workflow_builder/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -184,7 +184,8 @@ func (b *ArgoBuilder) createArgoTemplates(id string, _, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems) template := &Template{Name: getArgoName(processing.GetName(), id)} fmt.Println("Creating template for", template.Name) - template.CreateContainer(processing, b.Workflow.getDag()) + isReparted, url := b.isProcessingReparted(*processing) + template.CreateContainer(processing, b.Workflow.getDag(), isReparted, url) // get datacenter from the processing if processing.IsService { b.CreateService(id, processing) @@ -230,6 +231,7 @@ func (b *ArgoBuilder) createArgoTemplates(id string, b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, *template) return volumes, firstItems, lastItems } + func (b *ArgoBuilder) addTaskToArgo(dag *Dag, graphItemID string, processing *resources.ProcessingResource, firstItems []string, lastItems []string) (*Dag, []string, []string) { unique_name := getArgoName(processing.GetName(), graphItemID) @@ -332,3 +334,87 @@ func getArgoName(raw_name string, component_id string) (formatedName string) { formatedName = strings.ToLower(formatedName) return } + +// Verify if a processing resource is attached to another Compute than the one hosting +// the current Open Cloud instance. If true return the URL to contact the remote instance +// kube API + +func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource) (bool,string) { + processCreator := processing.CreatorID + // Creates an accessor srtictly for Peer Collection + req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"","",nil,nil) + if req == nil { + fmt.Println("TODO : handle error when trying to create a request on the Peer Collection") + return false, "" + } + + res := req.LoadOne(processCreator) + if res.Err != "" { + fmt.Print("TODO : handle error when requesting PeerID") + fmt.Print(res.Err) + return false, "" + } + + peer := res.ToPeer() + if peer == nil { + fmt.Print("TODO : handle error when converting PeerID") + } + + isReparted, _ := peer.IsMySelf() + if isReparted { + remoteCompute := b.retrieveProcessingCompute(processing) + computeInstance := remoteCompute.GetSelectedInstance() + if computeInstance == nil { + fmt.Println("TODO: handle when retrieving instance") + return false, "" + } + + instance := computeInstance.(*resources.ComputeResourceInstance) + return true, instance.Source + } + + return false, "" +} + +func (b *ArgoBuilder) retrieveProcessingCompute(processing resources.ProcessingResource) *resources.ComputeResource { + for _, link := range b.OriginWorkflow.Graph.Links { + // If a link contains the id of the processing + var oppositeId string + if link.Source.ID == processing.AbstractResource.UUID{ + oppositeId = link.Destination.ID + } else if(link.Destination.ID == processing.AbstractResource.UUID){ + oppositeId = link.Source.ID + } + + if oppositeId != "" { + isCompute, object := isCompute(oppositeId) + if !isCompute { + continue + } + return object + } + + } + + return nil +} + +func isCompute(resourceId string) (bool, *resources.ComputeResource) { + req := oclib.NewRequest(oclib.LibDataEnum(oclib.COMPUTE_RESOURCE),"","",nil,nil) + if req == nil { + fmt.Print("TODO : handle error when creating a NewRequest()") + return false, nil + } + res := req.LoadOne(resourceId) + + if res.Err != "" { + return false, nil + } + + compute := res.ToComputeResource() + if compute == nil { // Maybe we should add an Err returned by ToXXXXResource() + return false, nil + } + + return true, compute +} \ No newline at end of file