diff --git a/models/booking/planner/planner.go b/models/booking/planner/planner.go new file mode 100644 index 0000000..8d83edd --- /dev/null +++ b/models/booking/planner/planner.go @@ -0,0 +1,434 @@ +package planner + +import ( + "encoding/json" + "time" + + "cloud.o-forge.io/core/oc-lib/models/booking" + "cloud.o-forge.io/core/oc-lib/models/resources" + "cloud.o-forge.io/core/oc-lib/tools" +) + +// InstanceCapacity holds the maximum available resources of a single resource instance. +type InstanceCapacity struct { + CPUCores map[string]float64 `json:"cpu_cores,omitempty"` // model -> total cores + GPUMemGB map[string]float64 `json:"gpu_mem_gb,omitempty"` // model -> total memory GB + RAMGB float64 `json:"ram_gb,omitempty"` // total RAM GB + StorageGB float64 `json:"storage_gb,omitempty"` // total storage GB +} + +// ResourceRequest describes the resource amounts needed for a prospective booking. +// A nil map or nil pointer for a dimension means "use the full instance capacity" for that dimension. +type ResourceRequest struct { + CPUCores map[string]float64 // model -> cores needed (nil = max) + GPUMemGB map[string]float64 // model -> memory GB needed (nil = max) + RAMGB *float64 // GB needed (nil = max) + StorageGB *float64 // GB needed (nil = max) +} + +// PlannerSlot represents a single booking occupying a resource instance during a time window. +// Usage maps each resource dimension (cpu_, gpu_, ram, storage) to +// its percentage of consumption relative to the instance's maximum capacity (0–100). +type PlannerSlot struct { + Start time.Time `json:"start"` + End time.Time `json:"end"` + InstanceID string `json:"instance_id,omitempty"` // instance targeted by this booking + BookingID string `json:"booking_id,omitempty"` // empty in shallow mode + Usage map[string]float64 `json:"usage,omitempty"` // dimension -> % of max (0-100) +} + +// Planner is a volatile (non-persisted) object that organises bookings by resource. +// Only ComputeResource and StorageResource bookings appear in the schedule. +type Planner struct { + GeneratedAt time.Time `json:"generated_at"` + Schedule map[string][]*PlannerSlot `json:"schedule"` // resource_id -> slots + Capacities map[string]map[string]*InstanceCapacity `json:"capacities"` // resource_id -> instance_id -> max capacity +} + +// Generate builds a full Planner from all active bookings. +// Each slot includes the booking ID, the instance ID, and the usage percentage of every resource dimension. +func Generate(request *tools.APIRequest) (*Planner, error) { + return generate(request, false) +} + +// GenerateShallow builds a Planner from all active bookings without booking IDs. +func GenerateShallow(request *tools.APIRequest) (*Planner, error) { + return generate(request, true) +} + +func generate(request *tools.APIRequest, shallow bool) (*Planner, error) { + accessor := booking.NewAccessor(request) + bookings, code, err := accessor.Search(nil, "*", false) + if code != 200 || err != nil { + return nil, err + } + + p := &Planner{ + GeneratedAt: time.Now(), + Schedule: map[string][]*PlannerSlot{}, + Capacities: map[string]map[string]*InstanceCapacity{}, + } + + for _, b := range bookings { + bk := b.(*booking.Booking) + + // Only compute and storage resources are eligible + if bk.ResourceType != tools.COMPUTE_RESOURCE && bk.ResourceType != tools.STORAGE_RESOURCE { + continue + } + + end := bk.ExpectedEndDate + if end == nil { + e := bk.ExpectedStartDate.Add(time.Hour) + end = &e + } + + instanceID, usage, cap := extractSlotData(bk, request) + + if cap != nil && instanceID != "" { + if p.Capacities[bk.ResourceID] == nil { + p.Capacities[bk.ResourceID] = map[string]*InstanceCapacity{} + } + p.Capacities[bk.ResourceID][instanceID] = cap + } + + slot := &PlannerSlot{ + Start: bk.ExpectedStartDate, + End: *end, + InstanceID: instanceID, + Usage: usage, + } + if !shallow { + slot.BookingID = bk.GetID() + } + + p.Schedule[bk.ResourceID] = append(p.Schedule[bk.ResourceID], slot) + } + + return p, nil +} + +// Check reports whether the requested time window has enough remaining capacity +// on the specified instance of the given resource. +// +// req describes the amounts needed; nil fields default to the full instance capacity. +// If req itself is nil, the full capacity of every dimension is assumed. +// If end is nil, a 1-hour window from start is assumed. +// +// A slot that overlaps the requested window is acceptable if, for every requested +// dimension, existing usage + requested usage ≤ 100 %. +// Slots targeting other instances are ignored. +// If no capacity is known for this instance (never booked), it is fully available. +func (p *Planner) Check(resourceID string, instanceID string, req *ResourceRequest, start time.Time, end *time.Time) bool { + if end == nil { + e := start.Add(time.Hour) + end = &e + } + + cap := p.instanceCapacity(resourceID, instanceID) + reqPct := toPercentages(req, cap) + + slots, ok := p.Schedule[resourceID] + if !ok { + return true + } + + for _, slot := range slots { + // Only consider slots on the same instance + if slot.InstanceID != instanceID { + continue + } + // Only consider overlapping slots + if !slot.Start.Before(*end) || !slot.End.After(start) { + continue + } + // Combined usage must not exceed 100 % for any requested dimension + for dim, needed := range reqPct { + if slot.Usage[dim]+needed > 100.0 { + return false + } + } + } + return true +} + +// instanceCapacity returns the stored max capacity for a resource/instance pair. +// Returns an empty (but non-nil) capacity when the instance has never been booked. +func (p *Planner) instanceCapacity(resourceID, instanceID string) *InstanceCapacity { + if instances, ok := p.Capacities[resourceID]; ok { + if c, ok := instances[instanceID]; ok { + return c + } + } + return &InstanceCapacity{ + CPUCores: map[string]float64{}, + GPUMemGB: map[string]float64{}, + } +} + +// toPercentages converts a ResourceRequest into a map of dimension -> percentage-of-max. +// nil fields in req (or nil req) are treated as requesting the full capacity (100 %). +func toPercentages(req *ResourceRequest, cap *InstanceCapacity) map[string]float64 { + pct := map[string]float64{} + + if req == nil { + for model := range cap.CPUCores { + pct["cpu_"+model] = 100.0 + } + for model := range cap.GPUMemGB { + pct["gpu_"+model] = 100.0 + } + if cap.RAMGB > 0 { + pct["ram"] = 100.0 + } + if cap.StorageGB > 0 { + pct["storage"] = 100.0 + } + return pct + } + + if req.CPUCores == nil { + for model, maxCores := range cap.CPUCores { + if maxCores > 0 { + pct["cpu_"+model] = 100.0 + } + } + } else { + for model, needed := range req.CPUCores { + if maxCores, ok := cap.CPUCores[model]; ok && maxCores > 0 { + pct["cpu_"+model] = (needed / maxCores) * 100.0 + } + } + } + + if req.GPUMemGB == nil { + for model, maxMem := range cap.GPUMemGB { + if maxMem > 0 { + pct["gpu_"+model] = 100.0 + } + } + } else { + for model, needed := range req.GPUMemGB { + if maxMem, ok := cap.GPUMemGB[model]; ok && maxMem > 0 { + pct["gpu_"+model] = (needed / maxMem) * 100.0 + } + } + } + + if req.RAMGB == nil { + if cap.RAMGB > 0 { + pct["ram"] = 100.0 + } + } else if cap.RAMGB > 0 { + pct["ram"] = (*req.RAMGB / cap.RAMGB) * 100.0 + } + + if req.StorageGB == nil { + if cap.StorageGB > 0 { + pct["storage"] = 100.0 + } + } else if cap.StorageGB > 0 { + pct["storage"] = (*req.StorageGB / cap.StorageGB) * 100.0 + } + + return pct +} + +// --------------------------------------------------------------------------- +// Internal helpers +// --------------------------------------------------------------------------- + +// extractSlotData parses the booking's PricedItem, loads the corresponding resource, +// and returns the instance ID, usage percentages, and instance capacity in a single pass. +func extractSlotData(bk *booking.Booking, request *tools.APIRequest) (instanceID string, usage map[string]float64, cap *InstanceCapacity) { + usage = map[string]float64{} + if len(bk.PricedItem) == 0 { + return + } + + b, err := json.Marshal(bk.PricedItem) + if err != nil { + return + } + + switch bk.ResourceType { + case tools.COMPUTE_RESOURCE: + instanceID, usage, cap = extractComputeSlot(b, bk.ResourceID, request) + case tools.STORAGE_RESOURCE: + instanceID, usage, cap = extractStorageSlot(b, bk.ResourceID, request) + } + return +} + +// extractComputeSlot extracts the instance ID, usage percentages, and max capacity for a compute booking. +// Keys in usage: "cpu_", "gpu_", "ram". +func extractComputeSlot(pricedJSON []byte, resourceID string, request *tools.APIRequest) (instanceID string, usage map[string]float64, cap *InstanceCapacity) { + usage = map[string]float64{} + + var priced resources.PricedComputeResource + if err := json.Unmarshal(pricedJSON, &priced); err != nil { + return + } + + res, _, err := (&resources.ComputeResource{}).GetAccessor(request).LoadOne(resourceID) + if err != nil { + return + } + compute := res.(*resources.ComputeResource) + + instance := findComputeInstance(compute, priced.InstancesRefs) + if instance == nil { + return + } + instanceID = instance.GetID() + + // Build the instance's maximum capacity + cap = &InstanceCapacity{ + CPUCores: map[string]float64{}, + GPUMemGB: map[string]float64{}, + RAMGB: totalRAM(instance), + } + for model := range instance.CPUs { + cap.CPUCores[model] = totalCPUCores(instance, model) + } + for model := range instance.GPUs { + cap.GPUMemGB[model] = totalGPUMemory(instance, model) + } + + // Compute usage as a percentage of the instance's maximum capacity + for model, usedCores := range priced.CPUsLocated { + if maxCores := cap.CPUCores[model]; maxCores > 0 { + usage["cpu_"+model] = (usedCores / maxCores) * 100.0 + } + } + for model, usedMem := range priced.GPUsLocated { + if maxMem := cap.GPUMemGB[model]; maxMem > 0 { + usage["gpu_"+model] = (usedMem / maxMem) * 100.0 + } + } + if cap.RAMGB > 0 && priced.RAMLocated > 0 { + usage["ram"] = (priced.RAMLocated / cap.RAMGB) * 100.0 + } + + return +} + +// extractStorageSlot extracts the instance ID, usage percentages, and max capacity for a storage booking. +// Key in usage: "storage". +func extractStorageSlot(pricedJSON []byte, resourceID string, request *tools.APIRequest) (instanceID string, usage map[string]float64, cap *InstanceCapacity) { + usage = map[string]float64{} + + var priced resources.PricedStorageResource + if err := json.Unmarshal(pricedJSON, &priced); err != nil { + return + } + + res, _, err := (&resources.StorageResource{}).GetAccessor(request).LoadOne(resourceID) + if err != nil { + return + } + storage := res.(*resources.StorageResource) + + instance := findStorageInstance(storage, priced.InstancesRefs) + if instance == nil { + return + } + instanceID = instance.GetID() + + maxStorage := float64(instance.SizeGB) + cap = &InstanceCapacity{ + CPUCores: map[string]float64{}, + GPUMemGB: map[string]float64{}, + StorageGB: maxStorage, + } + if maxStorage > 0 && priced.UsageStorageGB > 0 { + usage["storage"] = (priced.UsageStorageGB / maxStorage) * 100.0 + } + + return +} + +// findComputeInstance returns the instance referenced by the priced item's InstancesRefs, +// falling back to the first available instance. +func findComputeInstance(compute *resources.ComputeResource, refs map[string]string) *resources.ComputeResourceInstance { + for _, inst := range compute.Instances { + if _, ok := refs[inst.GetID()]; ok { + return inst + } + } + if len(compute.Instances) > 0 { + return compute.Instances[0] + } + return nil +} + +// findStorageInstance returns the instance referenced by the priced item's InstancesRefs, +// falling back to the first available instance. +func findStorageInstance(storage *resources.StorageResource, refs map[string]string) *resources.StorageResourceInstance { + for _, inst := range storage.Instances { + if _, ok := refs[inst.GetID()]; ok { + return inst + } + } + if len(storage.Instances) > 0 { + return storage.Instances[0] + } + return nil +} + +// totalCPUCores returns the total number of cores for a given CPU model across all nodes. +// It multiplies the per-chip core count (from the instance's CPU spec) by the total +// number of chips of that model across all nodes (chip_count × node.Quantity). +// Falls back to the spec's core count if no nodes are defined. +func totalCPUCores(instance *resources.ComputeResourceInstance, model string) float64 { + spec, ok := instance.CPUs[model] + if !ok || spec == nil || spec.Cores == 0 { + return 0 + } + if len(instance.Nodes) == 0 { + return float64(spec.Cores) + } + totalChips := int64(0) + for _, node := range instance.Nodes { + if chipCount, ok := node.CPUs[model]; ok { + totalChips += chipCount * max(node.Quantity, 1) + } + } + if totalChips == 0 { + return float64(spec.Cores) + } + return float64(totalChips * int64(spec.Cores)) +} + +// totalGPUMemory returns the total GPU memory (GB) for a given model across all nodes. +// Falls back to the spec's memory if no nodes are defined. +func totalGPUMemory(instance *resources.ComputeResourceInstance, model string) float64 { + spec, ok := instance.GPUs[model] + if !ok || spec == nil || spec.MemoryGb == 0 { + return 0 + } + if len(instance.Nodes) == 0 { + return spec.MemoryGb + } + totalUnits := int64(0) + for _, node := range instance.Nodes { + if unitCount, ok := node.GPUs[model]; ok { + totalUnits += unitCount * max(node.Quantity, 1) + } + } + if totalUnits == 0 { + return spec.MemoryGb + } + return float64(totalUnits) * spec.MemoryGb +} + +// totalRAM returns the total RAM (GB) across all nodes of a compute instance. +func totalRAM(instance *resources.ComputeResourceInstance) float64 { + total := float64(0) + for _, node := range instance.Nodes { + if node.RAM != nil && node.RAM.SizeGb > 0 { + total += node.RAM.SizeGb * float64(max(node.Quantity, 1)) + } + } + return total +} diff --git a/tools/nats_caller.go b/tools/nats_caller.go index 70745f2..ef22a12 100644 --- a/tools/nats_caller.go +++ b/tools/nats_caller.go @@ -27,7 +27,7 @@ type NATSMethod int var meths = []string{"remove execution", "create execution", "discovery", "workflow event", "create resource", "remove resource", - "propalgation event", "catalog search event", + "propalgation event", "search event", "get planner", } const ( @@ -40,7 +40,8 @@ const ( REMOVE_RESOURCE PROPALGATION_EVENT - CATALOG_SEARCH_EVENT + SEARCH_EVENT + PLANNER_EVENT ) func (n NATSMethod) String() string { @@ -50,7 +51,7 @@ func (n NATSMethod) String() string { // NameToMethod returns the NATSMethod enum value from a string func NameToMethod(name string) NATSMethod { for _, v := range [...]NATSMethod{REMOVE_EXECUTION, CREATE_EXECTUTION, DISCOVERY, WORKFLOW_EVENT, - CREATE_RESOURCE, REMOVE_RESOURCE, PROPALGATION_EVENT, CATALOG_SEARCH_EVENT} { + CREATE_RESOURCE, REMOVE_RESOURCE, PROPALGATION_EVENT, SEARCH_EVENT, PLANNER_EVENT} { if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) { return v }