add resource use
This commit is contained in:
parent
7fa115c5e1
commit
2b002152a4
@ -9,6 +9,7 @@ type Config struct {
|
|||||||
NatsURL string
|
NatsURL string
|
||||||
ExecutionID string
|
ExecutionID string
|
||||||
PeerID string
|
PeerID string
|
||||||
|
Groups []string
|
||||||
Timeout int
|
Timeout int
|
||||||
WorkflowID string
|
WorkflowID string
|
||||||
Logs string
|
Logs string
|
||||||
|
2
go.mod
2
go.mod
@ -5,7 +5,7 @@ go 1.23.1
|
|||||||
toolchain go1.23.3
|
toolchain go1.23.3
|
||||||
|
|
||||||
require (
|
require (
|
||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963
|
cloud.o-forge.io/core/oc-lib v0.0.0-20250617144221-ec7a7e474637
|
||||||
github.com/akamensky/argparse v1.4.0
|
github.com/akamensky/argparse v1.4.0
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
github.com/goraz/onion v0.1.3
|
github.com/goraz/onion v0.1.3
|
||||||
|
8
go.sum
8
go.sum
@ -6,6 +6,14 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9 h1:mSFFPwil5Ih+R
|
|||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE=
|
cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE=
|
||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963 h1:ADDfqwtWF+VQTMSNAWPuhc4mmiKdgpHNmBB+UI2jRPE=
|
cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963 h1:ADDfqwtWF+VQTMSNAWPuhc4mmiKdgpHNmBB+UI2jRPE=
|
||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE=
|
cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20250617130633-8f2adb76e41c h1:k2y+ocElqwUK5yzyCf3rWrDUzPWbds4MbtG58+Szos0=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20250617130633-8f2adb76e41c/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20250617133502-9e5266326157 h1:853UvpMOM1QuWLrr/V8biDS8IcQcqHvoJsOT4epxDng=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20250617133502-9e5266326157/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20250617141444-0b0952b28c7e h1:Z5vLv+Wzzz58abmHRnovoqbkVlKHuC8u8/RLv7FjtZw=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20250617141444-0b0952b28c7e/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20250617144221-ec7a7e474637 h1:YiZbn6KmjgZ62uM+kH95Snd2nQliDKDnGMAxRr/VoUw=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20250617144221-ec7a7e474637/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||||
github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc=
|
github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc=
|
||||||
github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA=
|
github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA=
|
||||||
|
4
main.go
4
main.go
@ -208,6 +208,8 @@ func setConf(parser *argparse.Parser) {
|
|||||||
mode := parser.String("M", "mode", &argparse.Options{Required: false, Default: "", Help: "Mode of the execution"})
|
mode := parser.String("M", "mode", &argparse.Options{Required: false, Default: "", Help: "Mode of the execution"})
|
||||||
execution := parser.String("e", "execution", &argparse.Options{Required: true, Help: "Execution ID of the workflow to request from oc-catalog API"})
|
execution := parser.String("e", "execution", &argparse.Options{Required: true, Help: "Execution ID of the workflow to request from oc-catalog API"})
|
||||||
peer := parser.String("p", "peer", &argparse.Options{Required: false, Default: "", Help: "Peer ID of the workflow to request from oc-catalog API"})
|
peer := parser.String("p", "peer", &argparse.Options{Required: false, Default: "", Help: "Peer ID of the workflow to request from oc-catalog API"})
|
||||||
|
groups := parser.String("g", "groups", &argparse.Options{Required: false, Default: "", Help: "Groups of the peer to request from oc-catalog API"})
|
||||||
|
|
||||||
mongo := parser.String("m", "mongo", &argparse.Options{Required: true, Default: "mongodb://127.0.0.1:27017", Help: "URL to reach the MongoDB"})
|
mongo := parser.String("m", "mongo", &argparse.Options{Required: true, Default: "mongodb://127.0.0.1:27017", Help: "URL to reach the MongoDB"})
|
||||||
db := parser.String("d", "database", &argparse.Options{Required: true, Default: "DC_myDC", Help: "Name of the database to query in MongoDB"})
|
db := parser.String("d", "database", &argparse.Options{Required: true, Default: "DC_myDC", Help: "Name of the database to query in MongoDB"})
|
||||||
timeout := parser.Int("t", "timeout", &argparse.Options{Required: false, Default: -1, Help: "Timeout for the execution of the workflow"})
|
timeout := parser.Int("t", "timeout", &argparse.Options{Required: false, Default: -1, Help: "Timeout for the execution of the workflow"})
|
||||||
@ -234,7 +236,7 @@ func setConf(parser *argparse.Parser) {
|
|||||||
conf.GetConfig().Mode = *mode
|
conf.GetConfig().Mode = *mode
|
||||||
conf.GetConfig().ExecutionID = *execution
|
conf.GetConfig().ExecutionID = *execution
|
||||||
conf.GetConfig().PeerID = *peer
|
conf.GetConfig().PeerID = *peer
|
||||||
|
conf.GetConfig().Groups = strings.Split((*groups), ",")
|
||||||
conf.GetConfig().KubeHost = *host
|
conf.GetConfig().KubeHost = *host
|
||||||
conf.GetConfig().KubePort = *port
|
conf.GetConfig().KubePort = *port
|
||||||
|
|
||||||
|
@ -3,6 +3,7 @@ package models
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
w "cloud.o-forge.io/core/oc-lib/models/workflow"
|
w "cloud.o-forge.io/core/oc-lib/models/workflow"
|
||||||
@ -18,11 +19,60 @@ type Parameter struct {
|
|||||||
Value string `yaml:"value,omitempty"`
|
Value string `yaml:"value,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Bounds struct {
|
||||||
|
CPU string `yaml:"cpu,omitempty"`
|
||||||
|
Memory string `yaml:"memory,omitempty"`
|
||||||
|
GPU string `yaml:"nvidia.com/gpu,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBounds() *Bounds {
|
||||||
|
return &Bounds{
|
||||||
|
CPU: "0",
|
||||||
|
Memory: "0",
|
||||||
|
GPU: "0",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *Bounds) Set(value float64, what string, isMin bool) bool {
|
||||||
|
i := float64(0)
|
||||||
|
switch what {
|
||||||
|
case "cpu":
|
||||||
|
if newI, err := strconv.ParseFloat(b.CPU, 64); err == nil {
|
||||||
|
i = newI
|
||||||
|
}
|
||||||
|
case "ram":
|
||||||
|
if newI, err := strconv.ParseFloat(b.Memory, 64); err == nil {
|
||||||
|
i = newI
|
||||||
|
}
|
||||||
|
case "gpu":
|
||||||
|
if newI, err := strconv.ParseFloat(b.GPU, 64); err == nil {
|
||||||
|
i = newI
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ok := (value > i && !isMin) || (value < i && isMin)
|
||||||
|
if ok {
|
||||||
|
switch what {
|
||||||
|
case "cpu":
|
||||||
|
b.CPU = fmt.Sprintf("%f", value)
|
||||||
|
return true
|
||||||
|
case "ram":
|
||||||
|
b.Memory = fmt.Sprintf("%fGi", value)
|
||||||
|
return true
|
||||||
|
case "gpu":
|
||||||
|
b.GPU = fmt.Sprintf("%f", value)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
type Container struct {
|
type Container struct {
|
||||||
Image string `yaml:"image"`
|
Image string `yaml:"image"`
|
||||||
Command []string `yaml:"command,omitempty,flow"`
|
Command []string `yaml:"command,omitempty,flow"`
|
||||||
Args []string `yaml:"args,omitempty,flow"`
|
Args []string `yaml:"args,omitempty,flow"`
|
||||||
VolumeMounts []VolumeMount `yaml:"volumeMounts,omitempty"`
|
VolumeMounts []VolumeMount `yaml:"volumeMounts,omitempty"`
|
||||||
|
Requests Bounds `yaml:"requests,omitempty"`
|
||||||
|
Limits Bounds `yaml:"limits,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Container) AddVolumeMount(volumeMount VolumeMount, volumes []VolumeMount) []VolumeMount {
|
func (c *Container) AddVolumeMount(volumeMount VolumeMount, volumes []VolumeMount) []VolumeMount {
|
||||||
@ -47,6 +97,7 @@ type Task struct {
|
|||||||
Name string `yaml:"name"`
|
Name string `yaml:"name"`
|
||||||
Template string `yaml:"template"`
|
Template string `yaml:"template"`
|
||||||
Dependencies []string `yaml:"dependencies,omitempty"`
|
Dependencies []string `yaml:"dependencies,omitempty"`
|
||||||
|
NodeSelector map[string]string `yaml:"nodeSelector,omitempty"`
|
||||||
Arguments struct {
|
Arguments struct {
|
||||||
Parameters []Parameter `yaml:"parameters,omitempty"`
|
Parameters []Parameter `yaml:"parameters,omitempty"`
|
||||||
} `yaml:"arguments,omitempty"`
|
} `yaml:"arguments,omitempty"`
|
||||||
|
@ -126,7 +126,7 @@ func (b *ArgoBuilder) createArgoTemplates(
|
|||||||
|
|
||||||
template.CreateContainer(processing, b.Workflow.GetDag())
|
template.CreateContainer(processing, b.Workflow.GetDag())
|
||||||
if err := b.RepartiteProcess(*processing, id, template, namespace); err != nil {
|
if err := b.RepartiteProcess(*processing, id, template, namespace); err != nil {
|
||||||
logger.Error().Msg(fmt.Sprint("Problem to sets up repartition expected %v", err.Error()))
|
logger.Error().Msg(fmt.Sprint("problem to sets up repartition expected %v", err.Error()))
|
||||||
return volumes, firstItems, lastItems
|
return volumes, firstItems, lastItems
|
||||||
}
|
}
|
||||||
// get datacenter from the processing
|
// get datacenter from the processing
|
||||||
@ -154,6 +154,17 @@ func (b *ArgoBuilder) RepartiteProcess(processing resources.ProcessingResource,
|
|||||||
}
|
}
|
||||||
// Creates an accessor srtictly for Peer Collection
|
// Creates an accessor srtictly for Peer Collection
|
||||||
for _, related := range computeAttached {
|
for _, related := range computeAttached {
|
||||||
|
instance := related.Node.GetSelectedInstance().(*resources.ComputeResourceInstance)
|
||||||
|
partner := instance.GetSelectedPartnership()
|
||||||
|
if partner == nil {
|
||||||
|
logger.Error().Msg("can't proceed on datacenter because of missing pricing profiles " + related.Node.GetID())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
garanteed, allowed := b.setResourcesAllowedAndGaranteed(b.Workflow.GetDag(), models.NewBounds(), models.NewBounds(), "gpu", partner)
|
||||||
|
garanteed, allowed = b.setResourcesAllowedAndGaranteed(b.Workflow.GetDag(), garanteed, allowed, "cpu", partner)
|
||||||
|
garanteed.Set(float64(partner.(*resources.ComputeResourcePartnership).MinGaranteedRAMSize), "ram", false)
|
||||||
|
allowed.Set(float64(partner.(*resources.ComputeResourcePartnership).MaxAllowedRAMSize), "ram", false)
|
||||||
|
|
||||||
res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil).LoadOne(related.Node.GetCreatorID())
|
res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil).LoadOne(related.Node.GetCreatorID())
|
||||||
if res.Err != "" {
|
if res.Err != "" {
|
||||||
return errors.New(res.Err)
|
return errors.New(res.Err)
|
||||||
@ -172,6 +183,37 @@ func (b *ArgoBuilder) RepartiteProcess(processing resources.ProcessingResource,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *ArgoBuilder) setResourcesAllowedAndGaranteed(dag *Dag, minbound *models.Bounds, maxbound *models.Bounds, typ string, partner resources.ResourcePartnerITF) (*models.Bounds, *models.Bounds) {
|
||||||
|
selector := ""
|
||||||
|
values := map[string]float64{}
|
||||||
|
if typ == "gpu" {
|
||||||
|
values = partner.(*resources.ComputeResourcePartnership).MinGaranteedGPUsMemoryGB
|
||||||
|
} else {
|
||||||
|
values = partner.(*resources.ComputeResourcePartnership).MinGaranteedCPUsCores
|
||||||
|
}
|
||||||
|
for name, GPU := range values {
|
||||||
|
if minbound.Set(float64(GPU), typ, true) {
|
||||||
|
selector = name
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if selector != "" {
|
||||||
|
for _, t := range dag.Tasks {
|
||||||
|
t.NodeSelector[typ+"-type"] = selector
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if typ == "gpu" {
|
||||||
|
values = partner.(*resources.ComputeResourcePartnership).MaxAllowedGPUsMemoryGB
|
||||||
|
} else {
|
||||||
|
values = partner.(*resources.ComputeResourcePartnership).MaxAllowedCPUsCores
|
||||||
|
}
|
||||||
|
if max, ok := values[selector]; ok {
|
||||||
|
maxbound.Set(float64(max), typ, false)
|
||||||
|
} else {
|
||||||
|
maxbound.GPU = minbound.GPU
|
||||||
|
}
|
||||||
|
return minbound, maxbound
|
||||||
|
}
|
||||||
|
|
||||||
// Execute the last actions once the YAML file for the Argo Workflow is created
|
// Execute the last actions once the YAML file for the Argo Workflow is created
|
||||||
func (b *ArgoBuilder) CompleteBuild(namespace string) (string, error) {
|
func (b *ArgoBuilder) CompleteBuild(namespace string) (string, error) {
|
||||||
logger.Info().Msg("DEV :: Completing build")
|
logger.Info().Msg("DEV :: Completing build")
|
||||||
|
Loading…
Reference in New Issue
Block a user