From 88d2e526283ba5240d7b1ff7f37e6c48140b35c8 Mon Sep 17 00:00:00 2001 From: mr Date: Fri, 20 Mar 2026 16:14:07 +0100 Subject: [PATCH] Correct --- models/booking/booking.go | 51 +++++------------ models/booking/planner/planner.go | 52 ++++++++++++----- models/common/planner.go | 10 +++- models/resources/storage.go | 2 - models/workflow/workflow.go | 95 +++++++++++++++++++++++++++++-- 5 files changed, 149 insertions(+), 61 deletions(-) diff --git a/models/booking/booking.go b/models/booking/booking.go index c7fa050..9e2141d 100644 --- a/models/booking/booking.go +++ b/models/booking/booking.go @@ -3,12 +3,10 @@ package booking import ( "time" - "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/models" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" - "go.mongodb.org/mongo-driver/bson/primitive" ) /* @@ -68,40 +66,15 @@ func (b *Booking) CalcDeltaOfExecution() map[string]map[string]models.MetricResu return m } -// CheckBooking checks if a booking is possible on a specific compute resource -func (wfa *Booking) Check(id string, start time.Time, end *time.Time, parrallelAllowed int) (bool, error) { - // check if - if end == nil { - // if no end... then Book like a savage - e := start.Add(5 * time.Minute) - end = &e - } - accessor := NewAccessor(nil) - res, code, err := accessor.Search(&dbs.Filters{ - And: map[string][]dbs.Filter{ // check if there is a booking on the same compute resource by filtering on the compute_resource_id, the state and the execution date - "resource_id": {{Operator: dbs.EQUAL.String(), Value: id}}, - "state": {{Operator: dbs.EQUAL.String(), Value: enum.DRAFT.EnumIndex()}}, - "expected_start_date": { - {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(*end)}, - {Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(start)}, - }, - }, - }, "", wfa.IsDraft) - if code != 200 { - return false, err - } - return len(res) <= parrallelAllowed, nil -} - func (d *Booking) GetDelayForLaunch() time.Duration { return d.RealStartDate.Sub(d.ExpectedStartDate) } func (d *Booking) GetDelayForFinishing() time.Duration { - if d.ExpectedEndDate == nil { + if d.ExpectedEndDate == nil || d.RealEndDate == nil { return time.Duration(0) } - return d.RealEndDate.Sub(d.ExpectedStartDate) + return d.RealEndDate.Sub(*d.ExpectedEndDate) } func (d *Booking) GetUsualDuration() time.Duration { @@ -133,14 +106,20 @@ func (r *Booking) StoreDraftDefault() { } func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) { - if !r.IsDraft && r.State != set.(*Booking).State || r.RealStartDate != set.(*Booking).RealStartDate || r.RealEndDate != set.(*Booking).RealEndDate { - return true, &Booking{ - State: set.(*Booking).State, - RealStartDate: set.(*Booking).RealStartDate, - RealEndDate: set.(*Booking).RealEndDate, - } // only state can be updated + incoming := set.(*Booking) + if !r.IsDraft && r.State != incoming.State || r.RealStartDate != incoming.RealStartDate || r.RealEndDate != incoming.RealEndDate { + patch := &Booking{ + State: incoming.State, + RealStartDate: incoming.RealStartDate, + RealEndDate: incoming.RealEndDate, + } + // Auto-set RealStartDate when transitioning to STARTED and not already set + if r.State != enum.STARTED && incoming.State == enum.STARTED && patch.RealStartDate == nil { + now := time.Now() + patch.RealStartDate = &now + } + return true, patch } - // TODO : HERE WE CAN HANDLE THE CASE WHERE THE BOOKING IS DELAYED OR EXCEEDING OR ending sooner return r.IsDraft, set } diff --git a/models/booking/planner/planner.go b/models/booking/planner/planner.go index b42ee0f..c6a2e0c 100644 --- a/models/booking/planner/planner.go +++ b/models/booking/planner/planner.go @@ -2,7 +2,7 @@ package planner import ( "encoding/json" - "fmt" + "sort" "time" "cloud.o-forge.io/core/oc-lib/dbs" @@ -40,6 +40,12 @@ type PlannerSlot struct { Usage map[string]float64 `json:"usage,omitempty"` // dimension -> % of max (0-100) } +// PlannerITF is the interface used by Planify to check resource availability. +// *Planner satisfies this interface. +type PlannerITF interface { + NextAvailableStart(resourceID, instanceID string, start time.Time, d time.Duration) time.Time +} + // Planner is a volatile (non-persisted) object that organises bookings by resource. // Only ComputeResource and StorageResource bookings appear in the schedule. type Planner struct { @@ -79,7 +85,6 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) { }, "*", true) bookings := append(confirmed, drafts...) - fmt.Println("BOOKS", len(bookings)) p := &Planner{ GeneratedAt: time.Now(), Schedule: map[string][]*PlannerSlot{}, @@ -97,7 +102,6 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) { // Only compute and storage resources are eligible if bk.ResourceType != tools.COMPUTE_RESOURCE && bk.ResourceType != tools.STORAGE_RESOURCE { - fmt.Println("Not eligible") continue } @@ -157,30 +161,23 @@ func (p *Planner) Check(resourceID string, instanceID string, req *ResourceReque slots, ok := p.Schedule[resourceID] if !ok { - fmt.Println("CHECK1", true) return true } - fmt.Println("CHECK2", len(slots)) for _, slot := range slots { // Only consider slots on the same instance if slot.InstanceID != instanceID { - fmt.Println("CHECK3 MISS", slot.InstanceID, instanceID) continue } // Only consider overlapping slots if !slot.Start.Before(*end) || !slot.End.After(start) { - fmt.Println("CHECK4 MISS", slot.Start, slot.End, start, end) continue } - fmt.Println("CHECK5", reqPct) // If capacity is unknown (reqPct empty), any overlap blocks the slot. if len(reqPct) == 0 { return false } - // Combined usage must not exceed 100 % for any requested dimension for dim, needed := range reqPct { - fmt.Println("CHECK6", slot.Usage[dim]+needed) if slot.Usage[dim]+needed >= 100.0 { return false } @@ -287,7 +284,6 @@ func extractSlotData(bk *booking.Booking, request *tools.APIRequest) (instanceID if err != nil { return } - fmt.Println("EXTRACT CLOT", bk.ResourceType) switch bk.ResourceType { case tools.COMPUTE_RESOURCE: instanceID, usage, cap = extractComputeSlot(b, bk.ResourceID, request) @@ -304,20 +300,17 @@ func extractComputeSlot(pricedJSON []byte, resourceID string, request *tools.API var priced resources.PricedComputeResource if err := json.Unmarshal(pricedJSON, &priced); err != nil { - fmt.Println("extractComputeSlot", err) return } res, _, err := (&resources.ComputeResource{}).GetAccessor(request).LoadOne(resourceID) if err != nil { - fmt.Println("extractComputeSlot2", err) return } compute := res.(*resources.ComputeResource) instance := findComputeInstance(compute, priced.InstancesRefs) if instance == nil { - fmt.Println("extractComputeSlot no instance found", err) return } instanceID = instance.GetID() @@ -472,3 +465,34 @@ func totalRAM(instance *resources.ComputeResourceInstance) float64 { } return total } + +// NextAvailableStart returns the earliest time >= start when resourceID/instanceID has a +// free window of duration d. Slots are scanned in order so a single linear pass suffices. +// If the planner has no slots for this resource/instance, start is returned unchanged. +func (p *Planner) NextAvailableStart(resourceID, instanceID string, start time.Time, d time.Duration) time.Time { + slots := p.Schedule[resourceID] + if len(slots) == 0 { + return start + } + // Collect and sort slots for this instance by start time. + relevant := make([]*PlannerSlot, 0, len(slots)) + for _, s := range slots { + if s.InstanceID == instanceID { + relevant = append(relevant, s) + } + } + sort.Slice(relevant, func(i, j int) bool { return relevant[i].Start.Before(relevant[j].Start) }) + + end := start.Add(d) + for _, slot := range relevant { + if !slot.Start.Before(end) { + break // all remaining slots start after our window — done + } + if slot.End.After(start) { + // conflict: push start to after this slot + start = slot.End + end = start.Add(d) + } + } + return start +} diff --git a/models/common/planner.go b/models/common/planner.go index 0d60a0d..8b8cdf1 100755 --- a/models/common/planner.go +++ b/models/common/planner.go @@ -1,7 +1,6 @@ package common import ( - "fmt" "time" "cloud.o-forge.io/core/oc-lib/models/common/pricing" @@ -28,11 +27,16 @@ func GetPlannerNearestStart(start time.Time, planned map[tools.DataType]map[stri return near } +// GetPlannerLongestTime returns the sum of all processing durations (conservative estimate). +// Returns -1 if any processing is a service (open-ended). func GetPlannerLongestTime(planned map[tools.DataType]map[string]pricing.PricedItemITF) float64 { longestTime := float64(0) for _, priced := range planned[tools.PROCESSING_RESOURCE] { - longestTime += priced.GetExplicitDurationInS() - fmt.Println("Longest", longestTime) + d := priced.GetExplicitDurationInS() + if d < 0 { + return -1 // service present: booking is open-ended + } + longestTime += d } return longestTime } diff --git a/models/resources/storage.go b/models/resources/storage.go index 5dca1bb..bec2fe4 100755 --- a/models/resources/storage.go +++ b/models/resources/storage.go @@ -2,7 +2,6 @@ package resources import ( "errors" - "fmt" "time" "cloud.o-forge.io/core/oc-lib/models/common/enum" @@ -210,7 +209,6 @@ func (r *PricedStorageResource) GetPriceHT() (float64, error) { if r.BookingConfiguration == nil { r.BookingConfiguration = &BookingConfiguration{} } - fmt.Println("GetPriceHT", r.BookingConfiguration.UsageStart, r.BookingConfiguration.UsageEnd) now := time.Now() if r.BookingConfiguration.UsageStart == nil { r.BookingConfiguration.UsageStart = &now diff --git a/models/workflow/workflow.go b/models/workflow/workflow.go index 0d2e43e..5aff6a8 100644 --- a/models/workflow/workflow.go +++ b/models/workflow/workflow.go @@ -11,6 +11,7 @@ import ( "time" "cloud.o-forge.io/core/oc-lib/models/booking" + "cloud.o-forge.io/core/oc-lib/models/booking/planner" "cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area" "cloud.o-forge.io/core/oc-lib/models/common" "cloud.o-forge.io/core/oc-lib/models/common/models" @@ -594,8 +595,36 @@ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) { return true, nil } -func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigItem, partnerships ConfigItem, buyings ConfigItem, strategies ConfigItem, bookingMode int, request *tools.APIRequest) (bool, float64, map[tools.DataType]map[string]pricing.PricedItemITF, *Workflow, error) { +// preemptDelay is the minimum lead time granted before a preempted booking starts. +const preemptDelay = 30 * time.Second + +// Planify computes the scheduled start/end for every resource in the workflow. +// +// bookingMode controls availability checking when p (a live planner snapshot) is provided: +// - PREEMPTED : start from now+preemptDelay regardless of existing load. +// - WHEN_POSSIBLE: start from max(now, start); if a slot conflicts, slide to the next free window. +// - PLANNED : use start as-is; return an error if the slot is not available. +// +// Passing p = nil skips all availability checks (useful for sub-workflow recursion). +func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigItem, partnerships ConfigItem, buyings ConfigItem, strategies ConfigItem, bookingMode int, p planner.PlannerITF, request *tools.APIRequest) (bool, float64, map[tools.DataType]map[string]pricing.PricedItemITF, *Workflow, error) { + // 1. Adjust global start based on booking mode. + now := time.Now() + switch booking.BookingMode(bookingMode) { + case booking.PREEMPTED: + if earliest := now.Add(preemptDelay); start.Before(earliest) { + start = earliest + } + case booking.WHEN_POSSIBLE: + if start.Before(now) { + start = now + } + // PLANNED: honour the caller's start date as-is. + } + priceds := map[tools.DataType]map[string]pricing.PricedItemITF{} + var err error + + // 2. Plan processings first so we can derive the total workflow duration. ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, wf.Graph.IsProcessing, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) { d, err := wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(), @@ -612,6 +641,11 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte if err != nil { return false, 0, priceds, nil, err } + + // Total workflow duration used as the booking window for compute/storage. + // Returns -1 if any processing is a service (open-ended). + workflowDuration := common.GetPlannerLongestTime(priceds) + if _, priceds, err = plan[resources.ResourceInterface](tools.NATIVE_TOOL, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, wf.Graph.IsNativeTool, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) { return start, 0, nil @@ -628,11 +662,13 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte }); err != nil { return false, 0, priceds, nil, err } + + // 3. Compute/storage: duration = total workflow duration (conservative bound). for k, f := range map[tools.DataType]func(graph.GraphItem) bool{tools.STORAGE_RESOURCE: wf.Graph.IsStorage, tools.COMPUTE_RESOURCE: wf.Graph.IsCompute} { if _, priceds, err = plan[resources.ResourceInterface](k, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, f, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) { - nearestStart, longestDuration, err := wf.Graph.GetAverageTimeRelatedToProcessingActivity(ps, res, func(i graph.GraphItem) (r resources.ResourceInterface) { + nearestStart, _, err := wf.Graph.GetAverageTimeRelatedToProcessingActivity(ps, res, func(i graph.GraphItem) (r resources.ResourceInterface) { if f(i) { _, r = i.GetResource() } @@ -640,17 +676,21 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte }, *instances.Get(res.GetID()), *partnerships.Get(res.GetID()), *buyings.Get(res.GetID()), *strategies.Get(res.GetID()), bookingMode, request) if err != nil { - return start, longestDuration, err + return start, workflowDuration, err } - return start.Add(time.Duration(nearestStart) * time.Second), longestDuration, nil + return start.Add(time.Duration(nearestStart) * time.Second), workflowDuration, nil }, func(started time.Time, duration float64) (*time.Time, error) { + if duration < 0 { + return nil, nil // service: open-ended booking + } s := started.Add(time.Duration(duration) * time.Second) return &s, nil }); err != nil { return false, 0, priceds, nil, err } } - longest := common.GetPlannerLongestTime(priceds) + + longest := workflowDuration if _, priceds, err = plan[resources.ResourceInterface](tools.WORKFLOW_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, wf.Graph.IsWorkflow, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) { @@ -660,7 +700,7 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte if code != 200 || err != nil { return start, longest, err } - _, neoLongest, priceds2, _, err := r.(*Workflow).Planify(start, end, instances, partnerships, buyings, strategies, bookingMode, request) + _, neoLongest, priceds2, _, err := r.(*Workflow).Planify(start, end, instances, partnerships, buyings, strategies, bookingMode, nil, request) // should ... import priced if err != nil { return start, longest, err @@ -685,6 +725,19 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte }); err != nil { return false, 0, priceds, nil, err } + + // 4. Availability check against the live planner (skipped for PREEMPTED and sub-workflows). + if p != nil && booking.BookingMode(bookingMode) != booking.PREEMPTED { + slide, err := plannerAvailabilitySlide(p, priceds, booking.BookingMode(bookingMode)) + if err != nil { + return false, 0, priceds, nil, err + } + if slide > 0 { + // Re-plan from the corrected start; pass nil planner to avoid infinite recursion. + return wf.Planify(start.Add(slide), end, instances, partnerships, buyings, strategies, bookingMode, nil, request) + } + } + isPreemptible := true for _, first := range wf.GetFirstItems() { _, res := first.GetResource() @@ -696,6 +749,36 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte return isPreemptible, longest, priceds, wf, nil } +// plannerAvailabilitySlide checks all compute/storage resources in priceds against the planner. +// For PLANNED mode it returns an error immediately on the first conflict. +// For WHEN_POSSIBLE it returns the maximum slide (duration to add to global start) needed to +// clear all conflicts, or 0 if the plan is already conflict-free. +func plannerAvailabilitySlide(p planner.PlannerITF, priceds map[tools.DataType]map[string]pricing.PricedItemITF, mode booking.BookingMode) (time.Duration, error) { + maxSlide := time.Duration(0) + for _, dt := range []tools.DataType{tools.COMPUTE_RESOURCE, tools.STORAGE_RESOURCE} { + for _, priced := range priceds[dt] { + locStart := priced.GetLocationStart() + locEnd := priced.GetLocationEnd() + if locStart == nil || locEnd == nil { + continue // open-ended: skip availability check + } + d := locEnd.Sub(*locStart) + next := p.NextAvailableStart(priced.GetID(), priced.GetInstanceID(), *locStart, d) + slide := next.Sub(*locStart) + if slide <= 0 { + continue + } + if mode == booking.PLANNED { + return 0, errors.New("requested slot is not available for resource " + priced.GetID()) + } + if slide > maxSlide { + maxSlide = slide + } + } + } + return maxSlide, nil +} + // Returns a map of DataType (processing,computing,data,storage,worfklow) where each resource (identified by its UUID) // is mapped to the list of its items (different appearance) in the graph // ex: if the same Minio storage is represented by several nodes in the graph, in [tools.STORAGE_RESSOURCE] its UUID will be mapped to