Files
2026-03-25 11:11:37 +01:00

454 lines
15 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package planner
import (
"encoding/json"
"fmt"
"oc-scheduler/infrastructure/utils"
"slices"
"sync"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/booking/planner"
"cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
"cloud.o-forge.io/core/oc-lib/tools"
)
const (
checkWindowHours = 5
checkStepMin = 15 // time increment per scan step (minutes)
plannerTTL = 24 * time.Hour
)
// ---------------------------------------------------------------------------
// Planner cache — protected by plannerMu
// ---------------------------------------------------------------------------
// plannerEntry wraps a planner snapshot with refresh-ownership tracking.
// At most one check session may be the "refresh owner" of a given peer's
// planner at a time: it emits PB_PLANNER to request a fresh snapshot from
// oc-discovery and, on close (clean or forced), emits PB_CLOSE_PLANNER to
// release the stream. Any subsequent session that needs the same peer's
// planner will see Refreshing=true and skip the duplicate request.
type plannerEntry struct {
Planner *planner.Planner
Refreshing bool // true while a PB_PLANNER request is in flight
RefreshOwner string // session UUID that initiated the current refresh
}
type PlannerService struct {
Mu sync.RWMutex
Cache map[string]*plannerEntry
SubMu sync.RWMutex
Subs map[string][]chan string
AddedAt map[string]time.Time
WorkflowSubMu sync.RWMutex
WorkflowSubs map[string][]chan struct{}
}
var singleton *PlannerService
// InitSelfPlanner bootstraps our own planner entry at startup.
// It waits (with 15-second retries) for our peer record to be present in the
// database before generating the first planner snapshot and broadcasting it
// on PB_PLANNER. This handles the race between oc-scheduler starting before
// oc-peer has fully registered our node.
func InitPlanner() {
singleton = &PlannerService{
AddedAt: map[string]time.Time{},
Subs: map[string][]chan string{},
Cache: map[string]*plannerEntry{},
WorkflowSubs: map[string][]chan struct{}{},
}
for {
self, err := oclib.GetMySelf()
if err != nil || self == nil {
fmt.Println("InitPlanner: self peer not found yet, retrying in 15s...")
time.Sleep(15 * time.Second)
continue
}
singleton.RefreshSelf(self.PeerID, &tools.APIRequest{Admin: true})
return
}
}
func GetPlannerService() *PlannerService {
return singleton
}
func (s *PlannerService) HandleStore(resp tools.NATSResponse) {
m := map[string]interface{}{}
p := planner.Planner{}
if err := json.Unmarshal(resp.Payload, &m); err != nil {
return
}
if err := json.Unmarshal(resp.Payload, &p); err != nil {
return
}
s.Store(fmt.Sprintf("%v", m["peer_id"]), &p)
}
// missingPlannerPeers returns the peer IDs from res whose planner is absent
// or not yet populated in PlannerCache.
// func missingPlannerPeers(res map[string]bookingResource) []string {
func (s *PlannerService) MissingPeers(res map[string]utils.BookingResource) []string {
var out []string
for _, r := range res {
s.Mu.RLock()
entry := s.Cache[r.PeerPID]
s.Mu.RUnlock()
if entry == nil || entry.Planner == nil {
out = append(out, r.PeerPID)
}
}
return out
}
func (s *PlannerService) FindDate(wfID string, checkables map[string]utils.BookingResource, start time.Time, end *time.Time, preemption bool, asap bool) (time.Time, *time.Time, bool, bool, []string) {
var unavailable, warnings []string
// 4. Preemption: Planify ran (end is resolved), skip availability check.
if preemption {
return start, end, true, true, warnings
}
// 5b. For any peer whose planner is not yet cached, request it and wait
// briefly so the decision is based on real data rather than a blind
// "assume available". The wait is capped to avoid blocking the caller
// when oc-discovery is unreachable.
s.Fill(checkables, wfID)
unavailable, warnings = s.checkResourceAvailability(checkables, start, end)
if len(unavailable) == 0 {
//result.Available = true
return start, end, true, false, warnings
}
// 6. as_possible: find and commit to the next free slot.
if asap {
next := s.findNextSlot(checkables, start, end, checkWindowHours)
if next != nil {
start = *next
if end != nil {
shifted := next.Add(end.Sub(start))
end = &shifted
}
return start, end, true, false, warnings
} else {
return start, end, false, false, warnings
}
}
return start, end, false, false, warnings
}
func (s *PlannerService) Fill(checkables map[string]utils.BookingResource, wfID string) {
if missing := s.MissingPeers(checkables); len(missing) > 0 {
const plannerFetchTimeout = 2 * time.Second
tmpSession := "check-oneshot-" + wfID
ch, cancelSub := SubscribeUpdates(s.Subs, &s.SubMu, missing...)
owned := s.Refresh(missing, tmpSession)
select {
case <-ch:
case <-time.After(plannerFetchTimeout):
}
cancelSub()
s.ReleaseRefreshOwnership(owned, tmpSession)
}
}
// evictAfter waits ttl from first insertion then deletes the cache entry and
// emits PB_CLOSE_PLANNER so oc-discovery stops streaming for this peer.
// This is the only path that actually removes an entry from PlannerCache;
// session close (ReleaseRefreshOwnership) only resets ownership state.
func (s *PlannerService) EvictAfter(peerID string, ttl time.Duration) {
time.Sleep(ttl)
s.Mu.Lock()
_, exists := s.Cache[peerID]
if exists {
delete(s.Cache, peerID)
delete(s.AddedAt, peerID)
}
s.Mu.Unlock()
if exists {
utils.Notify(&s.SubMu, s.Subs, peerID, peerID)
utils.Propalgate(peerID, tools.PropalgationMessage{Action: tools.PB_CLOSE_PLANNER})
}
}
// SubscribePlannerUpdates registers interest in planner changes for the given
// peer IDs. The returned channel receives the peerID string (non-blocking) each
// time any of those planners is updated. Call cancel to unregister.
func SubscribeUpdates[T interface{}](subs map[string][]chan T, mu *sync.RWMutex, updates ...string) (<-chan T, func()) {
ch := make(chan T, 1)
mu.Lock()
for _, k := range updates {
subs[k] = append(subs[k], ch)
}
mu.Unlock()
cancel := func() {
mu.Lock()
for _, k := range updates {
subsk := subs[k]
for i, s := range subsk {
if s == ch {
subs[k] = append(subsk[:i], subsk[i+1:]...)
break
}
}
}
mu.Unlock()
}
return ch, cancel
}
// ---------------------------------------------------------------------------
// Cache helpers
// ---------------------------------------------------------------------------
func (s *PlannerService) Store(peerID string, p *planner.Planner) {
s.Mu.Lock()
entry := s.Cache[peerID]
isNew := entry == nil
if isNew {
entry = &plannerEntry{}
s.Cache[peerID] = entry
s.AddedAt[peerID] = time.Now().UTC()
go s.EvictAfter(peerID, plannerTTL)
}
entry.Planner = p
s.Mu.Unlock()
utils.Notify[string](&s.SubMu, s.Subs, peerID, peerID)
}
// ---------------------------------------------------------------------------
// Planner refresh / broadcast
// ---------------------------------------------------------------------------
// RequestPlannerRefresh asks oc-discovery for a fresh planner snapshot for
// each peer in peerIDs. Only the first session to request a given peer becomes
// its "refresh owner": subsequent sessions see Refreshing=true and skip the
// duplicate PB_PLANNER emission. Returns the subset of peerIDs for which this
// session claimed ownership (needed to release on close).
// RequestPlannerRefresh
func (s *PlannerService) Refresh(peerIDs []string, executionsID string) []string {
var owned []string
for _, peerID := range peerIDs {
s.Mu.Lock()
entry := s.Cache[peerID]
if entry == nil {
entry = &plannerEntry{}
s.Cache[peerID] = entry
s.AddedAt[peerID] = time.Now().UTC()
go s.EvictAfter(peerID, plannerTTL)
}
shouldRequest := !entry.Refreshing
if shouldRequest {
entry.Refreshing = true
entry.RefreshOwner = executionsID
}
s.Mu.Unlock()
if shouldRequest {
owned = append(owned, peerID)
if p, err := oclib.GetMySelf(); err == nil && p != nil && p.PeerID == peerID {
go s.RefreshSelf(peerID, &tools.APIRequest{Admin: true})
} else {
payload, _ := json.Marshal(map[string]any{"peer_id": peerID})
utils.Propalgate(peerID, tools.PropalgationMessage{
Action: tools.PB_PLANNER,
Payload: payload,
})
}
}
}
return owned
}
// ReleaseRefreshOwnership is called when a check session closes (clean or
// forced). For each peer this session owns, it resets the refresh state and
// emits PB_CLOSE_PLANNER so oc-discovery stops the planner stream.
// The planner data itself stays in the cache until TTL eviction.
func (s *PlannerService) ReleaseRefreshOwnership(peerIDs []string, executionsID string) {
for _, peerID := range peerIDs {
s.Mu.Lock()
if entry := s.Cache[peerID]; entry != nil && entry.RefreshOwner == executionsID {
entry.Refreshing = false
entry.RefreshOwner = ""
}
s.Mu.Unlock()
utils.Notify(&s.SubMu, s.Subs, peerID, peerID)
payload, _ := json.Marshal(map[string]any{"peer_id": peerID})
utils.Propalgate(peerID, tools.PropalgationMessage{
Action: tools.PB_CLOSE_PLANNER,
Payload: payload,
})
}
}
// broadcastPlanner iterates the storage and compute peers of the given workflow
// and, for each peer not yet in the cache, emits a PB_PLANNER propagation so
// downstream consumers (oc-discovery, other schedulers) refresh their state.
func (s *PlannerService) Broadcast(wf *workflow.Workflow) {
if wf.Graph == nil {
return
}
items := []graph.GraphItem{}
items = append(items, wf.GetGraphItems(wf.Graph.IsStorage)...)
items = append(items, wf.GetGraphItems(wf.Graph.IsCompute)...)
seen := []string{}
for _, item := range items {
_, res := item.GetResource()
if res == nil {
continue
}
creatorID := res.GetCreatorID()
if slices.Contains(seen, creatorID) {
continue
}
data := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil).LoadOne(creatorID)
p := data.ToPeer()
if p == nil {
continue
}
s.Mu.RLock()
cached := s.Cache[p.PeerID]
s.Mu.RUnlock()
// Only request if no snapshot and no refresh already in flight.
if cached == nil || (cached.Planner == nil && !cached.Refreshing) {
payload, err := json.Marshal(map[string]interface{}{"peer_id": p.PeerID})
if err != nil {
continue
}
seen = append(seen, creatorID)
utils.Propalgate(p.PeerID, tools.PropalgationMessage{
Action: tools.PB_PLANNER,
Payload: payload,
})
}
}
}
// ---------------------------------------------------------------------------
// Self-planner refresh
// ---------------------------------------------------------------------------
func (s *PlannerService) RefreshSelf(peerID string, request *tools.APIRequest) {
p, err := planner.GenerateShallow(request)
if err != nil {
fmt.Println("refreshSelfPlanner: could not generate planner:", err)
return
}
// Update the local cache and notify any waiting CheckStream goroutines.
s.Store(peerID, p)
// Broadcast the updated planner so remote peers (and oc-discovery) can
// refresh their view of our availability.
type plannerWithPeer struct {
PeerID string `json:"peer_id"`
*planner.Planner
}
plannerPayload, err := json.Marshal(plannerWithPeer{PeerID: peerID, Planner: p})
if err != nil {
return
}
utils.Propalgate(peerID, tools.PropalgationMessage{
Action: tools.PB_PLANNER,
Payload: plannerPayload,
})
}
// findNextSlot scans forward from 'from' in checkStepMin increments for up to
// windowH hours and returns the first candidate start time at which all
// resources are simultaneously free.
func (s *PlannerService) findNextSlot(resources map[string]utils.BookingResource, from time.Time, originalEnd *time.Time, windowH int) *time.Time {
duration := 5 * time.Minute
if originalEnd != nil {
if d := originalEnd.Sub(from); d > 0 {
duration = d
}
}
step := time.Duration(checkStepMin) * time.Minute
limit := from.Add(time.Duration(windowH) * time.Hour)
for t := from.Add(step); t.Before(limit); t = t.Add(step) {
e := t.Add(duration)
if unavail, _ := s.checkResourceAvailability(resources, t, &e); len(unavail) == 0 {
return &t
}
}
return nil
}
// checkResourceAvailability returns the IDs of unavailable resources and
// human-readable warning messages.
func (s *PlannerService) checkResourceAvailability(res map[string]utils.BookingResource, start time.Time, end *time.Time) (unavailable []string, warnings []string) {
for _, r := range res {
s.Mu.RLock()
entry := s.Cache[r.PeerPID]
s.Mu.RUnlock()
if entry == nil || entry.Planner == nil {
warnings = append(warnings, fmt.Sprintf(
"peer %s planner not in cache for resource %s assuming available", r.PeerPID, r.ID))
continue
}
if !s.checkInstance(entry.Planner, r.ID, r.InstanceID, start, end) {
unavailable = append(unavailable, r.ID)
warnings = append(warnings, fmt.Sprintf(
"resource %s is not available in [%s %s]",
r.ID, start.Format(time.RFC3339), utils.FormatOptTime(end)))
}
}
return
}
// CheckResourceInstance checks whether a resource/instance is available on the
// local planner cache for the given peer. Called by scheduling_resources when
// validating an incoming booking creation.
func (s *PlannerService) CheckResourceInstance(peerID, resourceID, instanceID string, start time.Time, end *time.Time) bool {
s.Mu.RLock()
entry := s.Cache[peerID]
s.Mu.RUnlock()
if entry == nil || entry.Planner == nil {
return true // no planner cached → assume available
}
return s.checkInstance(entry.Planner, resourceID, instanceID, start, end)
}
// SubscribePlannerUpdates returns a channel that receives a peerID each time
// one of the given peers' planners is updated.
func (s *PlannerService) SubscribePlannerUpdates(peerIDs ...string) (<-chan string, func()) {
return SubscribeUpdates[string](s.Subs, &s.SubMu, peerIDs...)
}
// SubscribeWorkflowUpdates returns a channel signalled when the workflow changes.
func (s *PlannerService) SubscribeWorkflowUpdates(wfID string) (<-chan struct{}, func()) {
return SubscribeUpdates[struct{}](s.WorkflowSubs, &s.WorkflowSubMu, wfID)
}
// NotifyWorkflow signals all subscribers watching wfID.
func (s *PlannerService) NotifyWorkflow(wfID string) {
utils.Notify[struct{}](&s.WorkflowSubMu, s.WorkflowSubs, wfID, struct{}{})
}
// checkInstance checks availability for the specific instance resolved by the
// scheduler. When instanceID is empty (no instance selected / none resolvable),
// it falls back to checking all instances known in the planner and returns true
// if any one has remaining capacity. Returns true when no capacity is recorded.
func (s *PlannerService) checkInstance(p *planner.Planner, resourceID string, instanceID string, start time.Time, end *time.Time) bool {
if instanceID != "" {
return p.Check(resourceID, instanceID, nil, start, end)
}
caps, ok := p.Capacities[resourceID]
if !ok || len(caps) == 0 {
return true
}
for id := range caps {
if p.Check(resourceID, id, nil, start, end) {
return true
}
}
return false
}