diff --git a/conf/conf.go b/conf/conf.go index 0893916..e1e439f 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -9,6 +9,7 @@ type Config struct { NatsURL string ExecutionID string PeerID string + Groups []string Timeout int WorkflowID string Logs string @@ -18,7 +19,7 @@ type Config struct { KubeCA string KubeCert string KubeData string - ArgoHost string // when executed in a container will replace addresses with "localhost" in their url + ArgoHost string // when executed in a container will replace addresses with "localhost" in their url } var instance *Config diff --git a/go.mod b/go.mod index 99ea101..73f245c 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.1 toolchain go1.23.3 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963 + cloud.o-forge.io/core/oc-lib v0.0.0-20250617144221-ec7a7e474637 github.com/akamensky/argparse v1.4.0 github.com/google/uuid v1.6.0 github.com/goraz/onion v0.1.3 diff --git a/go.sum b/go.sum index e1beb5c..c11d2a0 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,14 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9 h1:mSFFPwil5Ih+R cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE= cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963 h1:ADDfqwtWF+VQTMSNAWPuhc4mmiKdgpHNmBB+UI2jRPE= cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE= +cloud.o-forge.io/core/oc-lib v0.0.0-20250617130633-8f2adb76e41c h1:k2y+ocElqwUK5yzyCf3rWrDUzPWbds4MbtG58+Szos0= +cloud.o-forge.io/core/oc-lib v0.0.0-20250617130633-8f2adb76e41c/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= +cloud.o-forge.io/core/oc-lib v0.0.0-20250617133502-9e5266326157 h1:853UvpMOM1QuWLrr/V8biDS8IcQcqHvoJsOT4epxDng= +cloud.o-forge.io/core/oc-lib v0.0.0-20250617133502-9e5266326157/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= +cloud.o-forge.io/core/oc-lib v0.0.0-20250617141444-0b0952b28c7e h1:Z5vLv+Wzzz58abmHRnovoqbkVlKHuC8u8/RLv7FjtZw= +cloud.o-forge.io/core/oc-lib v0.0.0-20250617141444-0b0952b28c7e/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= +cloud.o-forge.io/core/oc-lib v0.0.0-20250617144221-ec7a7e474637 h1:YiZbn6KmjgZ62uM+kH95Snd2nQliDKDnGMAxRr/VoUw= +cloud.o-forge.io/core/oc-lib v0.0.0-20250617144221-ec7a7e474637/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc= github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= diff --git a/main.go b/main.go index 21c2b96..79c14fc 100644 --- a/main.go +++ b/main.go @@ -208,6 +208,8 @@ func setConf(parser *argparse.Parser) { mode := parser.String("M", "mode", &argparse.Options{Required: false, Default: "", Help: "Mode of the execution"}) execution := parser.String("e", "execution", &argparse.Options{Required: true, Help: "Execution ID of the workflow to request from oc-catalog API"}) peer := parser.String("p", "peer", &argparse.Options{Required: false, Default: "", Help: "Peer ID of the workflow to request from oc-catalog API"}) + groups := parser.String("g", "groups", &argparse.Options{Required: false, Default: "", Help: "Groups of the peer to request from oc-catalog API"}) + mongo := parser.String("m", "mongo", &argparse.Options{Required: true, Default: "mongodb://127.0.0.1:27017", Help: "URL to reach the MongoDB"}) db := parser.String("d", "database", &argparse.Options{Required: true, Default: "DC_myDC", Help: "Name of the database to query in MongoDB"}) timeout := parser.Int("t", "timeout", &argparse.Options{Required: false, Default: -1, Help: "Timeout for the execution of the workflow"}) @@ -234,7 +236,7 @@ func setConf(parser *argparse.Parser) { conf.GetConfig().Mode = *mode conf.GetConfig().ExecutionID = *execution conf.GetConfig().PeerID = *peer - + conf.GetConfig().Groups = strings.Split((*groups), ",") conf.GetConfig().KubeHost = *host conf.GetConfig().KubePort = *port diff --git a/models/template.go b/models/template.go index bdd1fec..f182f0b 100644 --- a/models/template.go +++ b/models/template.go @@ -3,6 +3,7 @@ package models import ( "encoding/json" "fmt" + "strconv" "strings" w "cloud.o-forge.io/core/oc-lib/models/workflow" @@ -18,11 +19,60 @@ type Parameter struct { Value string `yaml:"value,omitempty"` } +type Bounds struct { + CPU string `yaml:"cpu,omitempty"` + Memory string `yaml:"memory,omitempty"` + GPU string `yaml:"nvidia.com/gpu,omitempty"` +} + +func NewBounds() *Bounds { + return &Bounds{ + CPU: "0", + Memory: "0", + GPU: "0", + } +} + +func (b *Bounds) Set(value float64, what string, isMin bool) bool { + i := float64(0) + switch what { + case "cpu": + if newI, err := strconv.ParseFloat(b.CPU, 64); err == nil { + i = newI + } + case "ram": + if newI, err := strconv.ParseFloat(b.Memory, 64); err == nil { + i = newI + } + case "gpu": + if newI, err := strconv.ParseFloat(b.GPU, 64); err == nil { + i = newI + } + } + ok := (value > i && !isMin) || (value < i && isMin) + if ok { + switch what { + case "cpu": + b.CPU = fmt.Sprintf("%f", value) + return true + case "ram": + b.Memory = fmt.Sprintf("%fGi", value) + return true + case "gpu": + b.GPU = fmt.Sprintf("%f", value) + return true + } + } + return false +} + type Container struct { Image string `yaml:"image"` Command []string `yaml:"command,omitempty,flow"` Args []string `yaml:"args,omitempty,flow"` VolumeMounts []VolumeMount `yaml:"volumeMounts,omitempty"` + Requests Bounds `yaml:"requests,omitempty"` + Limits Bounds `yaml:"limits,omitempty"` } func (c *Container) AddVolumeMount(volumeMount VolumeMount, volumes []VolumeMount) []VolumeMount { @@ -44,9 +94,10 @@ func (c *Container) AddVolumeMount(volumeMount VolumeMount, volumes []VolumeMoun } type Task struct { - Name string `yaml:"name"` - Template string `yaml:"template"` - Dependencies []string `yaml:"dependencies,omitempty"` + Name string `yaml:"name"` + Template string `yaml:"template"` + Dependencies []string `yaml:"dependencies,omitempty"` + NodeSelector map[string]string `yaml:"nodeSelector,omitempty"` Arguments struct { Parameters []Parameter `yaml:"parameters,omitempty"` } `yaml:"arguments,omitempty"` diff --git a/workflow_builder/argo_builder.go b/workflow_builder/argo_builder.go index 57fc772..b53a0f5 100644 --- a/workflow_builder/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -126,7 +126,7 @@ func (b *ArgoBuilder) createArgoTemplates( template.CreateContainer(processing, b.Workflow.GetDag()) if err := b.RepartiteProcess(*processing, id, template, namespace); err != nil { - logger.Error().Msg(fmt.Sprint("Problem to sets up repartition expected %v", err.Error())) + logger.Error().Msg(fmt.Sprint("problem to sets up repartition expected %v", err.Error())) return volumes, firstItems, lastItems } // get datacenter from the processing @@ -154,6 +154,17 @@ func (b *ArgoBuilder) RepartiteProcess(processing resources.ProcessingResource, } // Creates an accessor srtictly for Peer Collection for _, related := range computeAttached { + instance := related.Node.GetSelectedInstance().(*resources.ComputeResourceInstance) + partner := instance.GetSelectedPartnership() + if partner == nil { + logger.Error().Msg("can't proceed on datacenter because of missing pricing profiles " + related.Node.GetID()) + continue + } + garanteed, allowed := b.setResourcesAllowedAndGaranteed(b.Workflow.GetDag(), models.NewBounds(), models.NewBounds(), "gpu", partner) + garanteed, allowed = b.setResourcesAllowedAndGaranteed(b.Workflow.GetDag(), garanteed, allowed, "cpu", partner) + garanteed.Set(float64(partner.(*resources.ComputeResourcePartnership).MinGaranteedRAMSize), "ram", false) + allowed.Set(float64(partner.(*resources.ComputeResourcePartnership).MaxAllowedRAMSize), "ram", false) + res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil).LoadOne(related.Node.GetCreatorID()) if res.Err != "" { return errors.New(res.Err) @@ -172,6 +183,37 @@ func (b *ArgoBuilder) RepartiteProcess(processing resources.ProcessingResource, return nil } +func (b *ArgoBuilder) setResourcesAllowedAndGaranteed(dag *Dag, minbound *models.Bounds, maxbound *models.Bounds, typ string, partner resources.ResourcePartnerITF) (*models.Bounds, *models.Bounds) { + selector := "" + values := map[string]float64{} + if typ == "gpu" { + values = partner.(*resources.ComputeResourcePartnership).MinGaranteedGPUsMemoryGB + } else { + values = partner.(*resources.ComputeResourcePartnership).MinGaranteedCPUsCores + } + for name, GPU := range values { + if minbound.Set(float64(GPU), typ, true) { + selector = name + } + } + if selector != "" { + for _, t := range dag.Tasks { + t.NodeSelector[typ+"-type"] = selector + } + } + if typ == "gpu" { + values = partner.(*resources.ComputeResourcePartnership).MaxAllowedGPUsMemoryGB + } else { + values = partner.(*resources.ComputeResourcePartnership).MaxAllowedCPUsCores + } + if max, ok := values[selector]; ok { + maxbound.Set(float64(max), typ, false) + } else { + maxbound.GPU = minbound.GPU + } + return minbound, maxbound +} + // Execute the last actions once the YAML file for the Argo Workflow is created func (b *ArgoBuilder) CompleteBuild(namespace string) (string, error) { logger.Info().Msg("DEV :: Completing build")