Planner Improve
This commit is contained in:
434
models/booking/planner/planner.go
Normal file
434
models/booking/planner/planner.go
Normal file
@@ -0,0 +1,434 @@
|
|||||||
|
package planner
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"cloud.o-forge.io/core/oc-lib/models/booking"
|
||||||
|
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||||||
|
"cloud.o-forge.io/core/oc-lib/tools"
|
||||||
|
)
|
||||||
|
|
||||||
|
// InstanceCapacity holds the maximum available resources of a single resource instance.
|
||||||
|
type InstanceCapacity struct {
|
||||||
|
CPUCores map[string]float64 `json:"cpu_cores,omitempty"` // model -> total cores
|
||||||
|
GPUMemGB map[string]float64 `json:"gpu_mem_gb,omitempty"` // model -> total memory GB
|
||||||
|
RAMGB float64 `json:"ram_gb,omitempty"` // total RAM GB
|
||||||
|
StorageGB float64 `json:"storage_gb,omitempty"` // total storage GB
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResourceRequest describes the resource amounts needed for a prospective booking.
|
||||||
|
// A nil map or nil pointer for a dimension means "use the full instance capacity" for that dimension.
|
||||||
|
type ResourceRequest struct {
|
||||||
|
CPUCores map[string]float64 // model -> cores needed (nil = max)
|
||||||
|
GPUMemGB map[string]float64 // model -> memory GB needed (nil = max)
|
||||||
|
RAMGB *float64 // GB needed (nil = max)
|
||||||
|
StorageGB *float64 // GB needed (nil = max)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PlannerSlot represents a single booking occupying a resource instance during a time window.
|
||||||
|
// Usage maps each resource dimension (cpu_<model>, gpu_<model>, ram, storage) to
|
||||||
|
// its percentage of consumption relative to the instance's maximum capacity (0–100).
|
||||||
|
type PlannerSlot struct {
|
||||||
|
Start time.Time `json:"start"`
|
||||||
|
End time.Time `json:"end"`
|
||||||
|
InstanceID string `json:"instance_id,omitempty"` // instance targeted by this booking
|
||||||
|
BookingID string `json:"booking_id,omitempty"` // empty in shallow mode
|
||||||
|
Usage map[string]float64 `json:"usage,omitempty"` // dimension -> % of max (0-100)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Planner is a volatile (non-persisted) object that organises bookings by resource.
|
||||||
|
// Only ComputeResource and StorageResource bookings appear in the schedule.
|
||||||
|
type Planner struct {
|
||||||
|
GeneratedAt time.Time `json:"generated_at"`
|
||||||
|
Schedule map[string][]*PlannerSlot `json:"schedule"` // resource_id -> slots
|
||||||
|
Capacities map[string]map[string]*InstanceCapacity `json:"capacities"` // resource_id -> instance_id -> max capacity
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate builds a full Planner from all active bookings.
|
||||||
|
// Each slot includes the booking ID, the instance ID, and the usage percentage of every resource dimension.
|
||||||
|
func Generate(request *tools.APIRequest) (*Planner, error) {
|
||||||
|
return generate(request, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GenerateShallow builds a Planner from all active bookings without booking IDs.
|
||||||
|
func GenerateShallow(request *tools.APIRequest) (*Planner, error) {
|
||||||
|
return generate(request, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
func generate(request *tools.APIRequest, shallow bool) (*Planner, error) {
|
||||||
|
accessor := booking.NewAccessor(request)
|
||||||
|
bookings, code, err := accessor.Search(nil, "*", false)
|
||||||
|
if code != 200 || err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
p := &Planner{
|
||||||
|
GeneratedAt: time.Now(),
|
||||||
|
Schedule: map[string][]*PlannerSlot{},
|
||||||
|
Capacities: map[string]map[string]*InstanceCapacity{},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, b := range bookings {
|
||||||
|
bk := b.(*booking.Booking)
|
||||||
|
|
||||||
|
// Only compute and storage resources are eligible
|
||||||
|
if bk.ResourceType != tools.COMPUTE_RESOURCE && bk.ResourceType != tools.STORAGE_RESOURCE {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
end := bk.ExpectedEndDate
|
||||||
|
if end == nil {
|
||||||
|
e := bk.ExpectedStartDate.Add(time.Hour)
|
||||||
|
end = &e
|
||||||
|
}
|
||||||
|
|
||||||
|
instanceID, usage, cap := extractSlotData(bk, request)
|
||||||
|
|
||||||
|
if cap != nil && instanceID != "" {
|
||||||
|
if p.Capacities[bk.ResourceID] == nil {
|
||||||
|
p.Capacities[bk.ResourceID] = map[string]*InstanceCapacity{}
|
||||||
|
}
|
||||||
|
p.Capacities[bk.ResourceID][instanceID] = cap
|
||||||
|
}
|
||||||
|
|
||||||
|
slot := &PlannerSlot{
|
||||||
|
Start: bk.ExpectedStartDate,
|
||||||
|
End: *end,
|
||||||
|
InstanceID: instanceID,
|
||||||
|
Usage: usage,
|
||||||
|
}
|
||||||
|
if !shallow {
|
||||||
|
slot.BookingID = bk.GetID()
|
||||||
|
}
|
||||||
|
|
||||||
|
p.Schedule[bk.ResourceID] = append(p.Schedule[bk.ResourceID], slot)
|
||||||
|
}
|
||||||
|
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check reports whether the requested time window has enough remaining capacity
|
||||||
|
// on the specified instance of the given resource.
|
||||||
|
//
|
||||||
|
// req describes the amounts needed; nil fields default to the full instance capacity.
|
||||||
|
// If req itself is nil, the full capacity of every dimension is assumed.
|
||||||
|
// If end is nil, a 1-hour window from start is assumed.
|
||||||
|
//
|
||||||
|
// A slot that overlaps the requested window is acceptable if, for every requested
|
||||||
|
// dimension, existing usage + requested usage ≤ 100 %.
|
||||||
|
// Slots targeting other instances are ignored.
|
||||||
|
// If no capacity is known for this instance (never booked), it is fully available.
|
||||||
|
func (p *Planner) Check(resourceID string, instanceID string, req *ResourceRequest, start time.Time, end *time.Time) bool {
|
||||||
|
if end == nil {
|
||||||
|
e := start.Add(time.Hour)
|
||||||
|
end = &e
|
||||||
|
}
|
||||||
|
|
||||||
|
cap := p.instanceCapacity(resourceID, instanceID)
|
||||||
|
reqPct := toPercentages(req, cap)
|
||||||
|
|
||||||
|
slots, ok := p.Schedule[resourceID]
|
||||||
|
if !ok {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, slot := range slots {
|
||||||
|
// Only consider slots on the same instance
|
||||||
|
if slot.InstanceID != instanceID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Only consider overlapping slots
|
||||||
|
if !slot.Start.Before(*end) || !slot.End.After(start) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Combined usage must not exceed 100 % for any requested dimension
|
||||||
|
for dim, needed := range reqPct {
|
||||||
|
if slot.Usage[dim]+needed > 100.0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// instanceCapacity returns the stored max capacity for a resource/instance pair.
|
||||||
|
// Returns an empty (but non-nil) capacity when the instance has never been booked.
|
||||||
|
func (p *Planner) instanceCapacity(resourceID, instanceID string) *InstanceCapacity {
|
||||||
|
if instances, ok := p.Capacities[resourceID]; ok {
|
||||||
|
if c, ok := instances[instanceID]; ok {
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return &InstanceCapacity{
|
||||||
|
CPUCores: map[string]float64{},
|
||||||
|
GPUMemGB: map[string]float64{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// toPercentages converts a ResourceRequest into a map of dimension -> percentage-of-max.
|
||||||
|
// nil fields in req (or nil req) are treated as requesting the full capacity (100 %).
|
||||||
|
func toPercentages(req *ResourceRequest, cap *InstanceCapacity) map[string]float64 {
|
||||||
|
pct := map[string]float64{}
|
||||||
|
|
||||||
|
if req == nil {
|
||||||
|
for model := range cap.CPUCores {
|
||||||
|
pct["cpu_"+model] = 100.0
|
||||||
|
}
|
||||||
|
for model := range cap.GPUMemGB {
|
||||||
|
pct["gpu_"+model] = 100.0
|
||||||
|
}
|
||||||
|
if cap.RAMGB > 0 {
|
||||||
|
pct["ram"] = 100.0
|
||||||
|
}
|
||||||
|
if cap.StorageGB > 0 {
|
||||||
|
pct["storage"] = 100.0
|
||||||
|
}
|
||||||
|
return pct
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.CPUCores == nil {
|
||||||
|
for model, maxCores := range cap.CPUCores {
|
||||||
|
if maxCores > 0 {
|
||||||
|
pct["cpu_"+model] = 100.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for model, needed := range req.CPUCores {
|
||||||
|
if maxCores, ok := cap.CPUCores[model]; ok && maxCores > 0 {
|
||||||
|
pct["cpu_"+model] = (needed / maxCores) * 100.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.GPUMemGB == nil {
|
||||||
|
for model, maxMem := range cap.GPUMemGB {
|
||||||
|
if maxMem > 0 {
|
||||||
|
pct["gpu_"+model] = 100.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for model, needed := range req.GPUMemGB {
|
||||||
|
if maxMem, ok := cap.GPUMemGB[model]; ok && maxMem > 0 {
|
||||||
|
pct["gpu_"+model] = (needed / maxMem) * 100.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.RAMGB == nil {
|
||||||
|
if cap.RAMGB > 0 {
|
||||||
|
pct["ram"] = 100.0
|
||||||
|
}
|
||||||
|
} else if cap.RAMGB > 0 {
|
||||||
|
pct["ram"] = (*req.RAMGB / cap.RAMGB) * 100.0
|
||||||
|
}
|
||||||
|
|
||||||
|
if req.StorageGB == nil {
|
||||||
|
if cap.StorageGB > 0 {
|
||||||
|
pct["storage"] = 100.0
|
||||||
|
}
|
||||||
|
} else if cap.StorageGB > 0 {
|
||||||
|
pct["storage"] = (*req.StorageGB / cap.StorageGB) * 100.0
|
||||||
|
}
|
||||||
|
|
||||||
|
return pct
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
// Internal helpers
|
||||||
|
// ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
// extractSlotData parses the booking's PricedItem, loads the corresponding resource,
|
||||||
|
// and returns the instance ID, usage percentages, and instance capacity in a single pass.
|
||||||
|
func extractSlotData(bk *booking.Booking, request *tools.APIRequest) (instanceID string, usage map[string]float64, cap *InstanceCapacity) {
|
||||||
|
usage = map[string]float64{}
|
||||||
|
if len(bk.PricedItem) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
b, err := json.Marshal(bk.PricedItem)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch bk.ResourceType {
|
||||||
|
case tools.COMPUTE_RESOURCE:
|
||||||
|
instanceID, usage, cap = extractComputeSlot(b, bk.ResourceID, request)
|
||||||
|
case tools.STORAGE_RESOURCE:
|
||||||
|
instanceID, usage, cap = extractStorageSlot(b, bk.ResourceID, request)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// extractComputeSlot extracts the instance ID, usage percentages, and max capacity for a compute booking.
|
||||||
|
// Keys in usage: "cpu_<model>", "gpu_<model>", "ram".
|
||||||
|
func extractComputeSlot(pricedJSON []byte, resourceID string, request *tools.APIRequest) (instanceID string, usage map[string]float64, cap *InstanceCapacity) {
|
||||||
|
usage = map[string]float64{}
|
||||||
|
|
||||||
|
var priced resources.PricedComputeResource
|
||||||
|
if err := json.Unmarshal(pricedJSON, &priced); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
res, _, err := (&resources.ComputeResource{}).GetAccessor(request).LoadOne(resourceID)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
compute := res.(*resources.ComputeResource)
|
||||||
|
|
||||||
|
instance := findComputeInstance(compute, priced.InstancesRefs)
|
||||||
|
if instance == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
instanceID = instance.GetID()
|
||||||
|
|
||||||
|
// Build the instance's maximum capacity
|
||||||
|
cap = &InstanceCapacity{
|
||||||
|
CPUCores: map[string]float64{},
|
||||||
|
GPUMemGB: map[string]float64{},
|
||||||
|
RAMGB: totalRAM(instance),
|
||||||
|
}
|
||||||
|
for model := range instance.CPUs {
|
||||||
|
cap.CPUCores[model] = totalCPUCores(instance, model)
|
||||||
|
}
|
||||||
|
for model := range instance.GPUs {
|
||||||
|
cap.GPUMemGB[model] = totalGPUMemory(instance, model)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute usage as a percentage of the instance's maximum capacity
|
||||||
|
for model, usedCores := range priced.CPUsLocated {
|
||||||
|
if maxCores := cap.CPUCores[model]; maxCores > 0 {
|
||||||
|
usage["cpu_"+model] = (usedCores / maxCores) * 100.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for model, usedMem := range priced.GPUsLocated {
|
||||||
|
if maxMem := cap.GPUMemGB[model]; maxMem > 0 {
|
||||||
|
usage["gpu_"+model] = (usedMem / maxMem) * 100.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if cap.RAMGB > 0 && priced.RAMLocated > 0 {
|
||||||
|
usage["ram"] = (priced.RAMLocated / cap.RAMGB) * 100.0
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// extractStorageSlot extracts the instance ID, usage percentages, and max capacity for a storage booking.
|
||||||
|
// Key in usage: "storage".
|
||||||
|
func extractStorageSlot(pricedJSON []byte, resourceID string, request *tools.APIRequest) (instanceID string, usage map[string]float64, cap *InstanceCapacity) {
|
||||||
|
usage = map[string]float64{}
|
||||||
|
|
||||||
|
var priced resources.PricedStorageResource
|
||||||
|
if err := json.Unmarshal(pricedJSON, &priced); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
res, _, err := (&resources.StorageResource{}).GetAccessor(request).LoadOne(resourceID)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
storage := res.(*resources.StorageResource)
|
||||||
|
|
||||||
|
instance := findStorageInstance(storage, priced.InstancesRefs)
|
||||||
|
if instance == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
instanceID = instance.GetID()
|
||||||
|
|
||||||
|
maxStorage := float64(instance.SizeGB)
|
||||||
|
cap = &InstanceCapacity{
|
||||||
|
CPUCores: map[string]float64{},
|
||||||
|
GPUMemGB: map[string]float64{},
|
||||||
|
StorageGB: maxStorage,
|
||||||
|
}
|
||||||
|
if maxStorage > 0 && priced.UsageStorageGB > 0 {
|
||||||
|
usage["storage"] = (priced.UsageStorageGB / maxStorage) * 100.0
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// findComputeInstance returns the instance referenced by the priced item's InstancesRefs,
|
||||||
|
// falling back to the first available instance.
|
||||||
|
func findComputeInstance(compute *resources.ComputeResource, refs map[string]string) *resources.ComputeResourceInstance {
|
||||||
|
for _, inst := range compute.Instances {
|
||||||
|
if _, ok := refs[inst.GetID()]; ok {
|
||||||
|
return inst
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(compute.Instances) > 0 {
|
||||||
|
return compute.Instances[0]
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// findStorageInstance returns the instance referenced by the priced item's InstancesRefs,
|
||||||
|
// falling back to the first available instance.
|
||||||
|
func findStorageInstance(storage *resources.StorageResource, refs map[string]string) *resources.StorageResourceInstance {
|
||||||
|
for _, inst := range storage.Instances {
|
||||||
|
if _, ok := refs[inst.GetID()]; ok {
|
||||||
|
return inst
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(storage.Instances) > 0 {
|
||||||
|
return storage.Instances[0]
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// totalCPUCores returns the total number of cores for a given CPU model across all nodes.
|
||||||
|
// It multiplies the per-chip core count (from the instance's CPU spec) by the total
|
||||||
|
// number of chips of that model across all nodes (chip_count × node.Quantity).
|
||||||
|
// Falls back to the spec's core count if no nodes are defined.
|
||||||
|
func totalCPUCores(instance *resources.ComputeResourceInstance, model string) float64 {
|
||||||
|
spec, ok := instance.CPUs[model]
|
||||||
|
if !ok || spec == nil || spec.Cores == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
if len(instance.Nodes) == 0 {
|
||||||
|
return float64(spec.Cores)
|
||||||
|
}
|
||||||
|
totalChips := int64(0)
|
||||||
|
for _, node := range instance.Nodes {
|
||||||
|
if chipCount, ok := node.CPUs[model]; ok {
|
||||||
|
totalChips += chipCount * max(node.Quantity, 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if totalChips == 0 {
|
||||||
|
return float64(spec.Cores)
|
||||||
|
}
|
||||||
|
return float64(totalChips * int64(spec.Cores))
|
||||||
|
}
|
||||||
|
|
||||||
|
// totalGPUMemory returns the total GPU memory (GB) for a given model across all nodes.
|
||||||
|
// Falls back to the spec's memory if no nodes are defined.
|
||||||
|
func totalGPUMemory(instance *resources.ComputeResourceInstance, model string) float64 {
|
||||||
|
spec, ok := instance.GPUs[model]
|
||||||
|
if !ok || spec == nil || spec.MemoryGb == 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
if len(instance.Nodes) == 0 {
|
||||||
|
return spec.MemoryGb
|
||||||
|
}
|
||||||
|
totalUnits := int64(0)
|
||||||
|
for _, node := range instance.Nodes {
|
||||||
|
if unitCount, ok := node.GPUs[model]; ok {
|
||||||
|
totalUnits += unitCount * max(node.Quantity, 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if totalUnits == 0 {
|
||||||
|
return spec.MemoryGb
|
||||||
|
}
|
||||||
|
return float64(totalUnits) * spec.MemoryGb
|
||||||
|
}
|
||||||
|
|
||||||
|
// totalRAM returns the total RAM (GB) across all nodes of a compute instance.
|
||||||
|
func totalRAM(instance *resources.ComputeResourceInstance) float64 {
|
||||||
|
total := float64(0)
|
||||||
|
for _, node := range instance.Nodes {
|
||||||
|
if node.RAM != nil && node.RAM.SizeGb > 0 {
|
||||||
|
total += node.RAM.SizeGb * float64(max(node.Quantity, 1))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return total
|
||||||
|
}
|
||||||
@@ -27,7 +27,7 @@ type NATSMethod int
|
|||||||
|
|
||||||
var meths = []string{"remove execution", "create execution", "discovery",
|
var meths = []string{"remove execution", "create execution", "discovery",
|
||||||
"workflow event", "create resource", "remove resource",
|
"workflow event", "create resource", "remove resource",
|
||||||
"propalgation event", "catalog search event",
|
"propalgation event", "search event", "get planner",
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -40,7 +40,8 @@ const (
|
|||||||
REMOVE_RESOURCE
|
REMOVE_RESOURCE
|
||||||
|
|
||||||
PROPALGATION_EVENT
|
PROPALGATION_EVENT
|
||||||
CATALOG_SEARCH_EVENT
|
SEARCH_EVENT
|
||||||
|
PLANNER_EVENT
|
||||||
)
|
)
|
||||||
|
|
||||||
func (n NATSMethod) String() string {
|
func (n NATSMethod) String() string {
|
||||||
@@ -50,7 +51,7 @@ func (n NATSMethod) String() string {
|
|||||||
// NameToMethod returns the NATSMethod enum value from a string
|
// NameToMethod returns the NATSMethod enum value from a string
|
||||||
func NameToMethod(name string) NATSMethod {
|
func NameToMethod(name string) NATSMethod {
|
||||||
for _, v := range [...]NATSMethod{REMOVE_EXECUTION, CREATE_EXECTUTION, DISCOVERY, WORKFLOW_EVENT,
|
for _, v := range [...]NATSMethod{REMOVE_EXECUTION, CREATE_EXECTUTION, DISCOVERY, WORKFLOW_EVENT,
|
||||||
CREATE_RESOURCE, REMOVE_RESOURCE, PROPALGATION_EVENT, CATALOG_SEARCH_EVENT} {
|
CREATE_RESOURCE, REMOVE_RESOURCE, PROPALGATION_EVENT, SEARCH_EVENT, PLANNER_EVENT} {
|
||||||
if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) {
|
if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) {
|
||||||
return v
|
return v
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user