454 lines
15 KiB
Go
454 lines
15 KiB
Go
package planner
|
||
|
||
import (
|
||
"encoding/json"
|
||
"fmt"
|
||
"oc-scheduler/infrastructure/utils"
|
||
"slices"
|
||
"sync"
|
||
"time"
|
||
|
||
oclib "cloud.o-forge.io/core/oc-lib"
|
||
"cloud.o-forge.io/core/oc-lib/models/booking/planner"
|
||
"cloud.o-forge.io/core/oc-lib/models/workflow"
|
||
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
|
||
"cloud.o-forge.io/core/oc-lib/tools"
|
||
)
|
||
|
||
const (
|
||
checkWindowHours = 5
|
||
checkStepMin = 15 // time increment per scan step (minutes)
|
||
plannerTTL = 24 * time.Hour
|
||
)
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// Planner cache — protected by plannerMu
|
||
// ---------------------------------------------------------------------------
|
||
|
||
// plannerEntry wraps a planner snapshot with refresh-ownership tracking.
|
||
// At most one check session may be the "refresh owner" of a given peer's
|
||
// planner at a time: it emits PB_PLANNER to request a fresh snapshot from
|
||
// oc-discovery and, on close (clean or forced), emits PB_CLOSE_PLANNER to
|
||
// release the stream. Any subsequent session that needs the same peer's
|
||
// planner will see Refreshing=true and skip the duplicate request.
|
||
type plannerEntry struct {
|
||
Planner *planner.Planner
|
||
Refreshing bool // true while a PB_PLANNER request is in flight
|
||
RefreshOwner string // session UUID that initiated the current refresh
|
||
}
|
||
|
||
type PlannerService struct {
|
||
Mu sync.RWMutex
|
||
Cache map[string]*plannerEntry
|
||
SubMu sync.RWMutex
|
||
Subs map[string][]chan string
|
||
AddedAt map[string]time.Time
|
||
WorkflowSubMu sync.RWMutex
|
||
WorkflowSubs map[string][]chan struct{}
|
||
}
|
||
|
||
var singleton *PlannerService
|
||
|
||
// InitSelfPlanner bootstraps our own planner entry at startup.
|
||
// It waits (with 15-second retries) for our peer record to be present in the
|
||
// database before generating the first planner snapshot and broadcasting it
|
||
// on PB_PLANNER. This handles the race between oc-scheduler starting before
|
||
// oc-peer has fully registered our node.
|
||
func InitPlanner() {
|
||
singleton = &PlannerService{
|
||
AddedAt: map[string]time.Time{},
|
||
Subs: map[string][]chan string{},
|
||
Cache: map[string]*plannerEntry{},
|
||
WorkflowSubs: map[string][]chan struct{}{},
|
||
}
|
||
for {
|
||
self, err := oclib.GetMySelf()
|
||
if err != nil || self == nil {
|
||
fmt.Println("InitPlanner: self peer not found yet, retrying in 15s...")
|
||
time.Sleep(15 * time.Second)
|
||
continue
|
||
}
|
||
singleton.RefreshSelf(self.PeerID, &tools.APIRequest{Admin: true})
|
||
return
|
||
}
|
||
}
|
||
|
||
func GetPlannerService() *PlannerService {
|
||
return singleton
|
||
}
|
||
|
||
func (s *PlannerService) HandleStore(resp tools.NATSResponse) {
|
||
m := map[string]interface{}{}
|
||
p := planner.Planner{}
|
||
if err := json.Unmarshal(resp.Payload, &m); err != nil {
|
||
return
|
||
}
|
||
if err := json.Unmarshal(resp.Payload, &p); err != nil {
|
||
return
|
||
}
|
||
s.Store(fmt.Sprintf("%v", m["peer_id"]), &p)
|
||
}
|
||
|
||
// missingPlannerPeers returns the peer IDs from res whose planner is absent
|
||
// or not yet populated in PlannerCache.
|
||
// func missingPlannerPeers(res map[string]bookingResource) []string {
|
||
func (s *PlannerService) MissingPeers(res map[string]utils.BookingResource) []string {
|
||
var out []string
|
||
for _, r := range res {
|
||
s.Mu.RLock()
|
||
entry := s.Cache[r.PeerPID]
|
||
s.Mu.RUnlock()
|
||
if entry == nil || entry.Planner == nil {
|
||
out = append(out, r.PeerPID)
|
||
}
|
||
}
|
||
return out
|
||
}
|
||
|
||
func (s *PlannerService) FindDate(wfID string, checkables map[string]utils.BookingResource, start time.Time, end *time.Time, preemption bool, asap bool) (time.Time, *time.Time, bool, bool, []string) {
|
||
var unavailable, warnings []string
|
||
// 4. Preemption: Planify ran (end is resolved), skip availability check.
|
||
if preemption {
|
||
return start, end, true, true, warnings
|
||
}
|
||
// 5b. For any peer whose planner is not yet cached, request it and wait
|
||
// briefly so the decision is based on real data rather than a blind
|
||
// "assume available". The wait is capped to avoid blocking the caller
|
||
// when oc-discovery is unreachable.
|
||
s.Fill(checkables, wfID)
|
||
|
||
unavailable, warnings = s.checkResourceAvailability(checkables, start, end)
|
||
|
||
if len(unavailable) == 0 {
|
||
//result.Available = true
|
||
return start, end, true, false, warnings
|
||
}
|
||
|
||
// 6. as_possible: find and commit to the next free slot.
|
||
if asap {
|
||
next := s.findNextSlot(checkables, start, end, checkWindowHours)
|
||
if next != nil {
|
||
start = *next
|
||
if end != nil {
|
||
shifted := next.Add(end.Sub(start))
|
||
end = &shifted
|
||
}
|
||
return start, end, true, false, warnings
|
||
} else {
|
||
return start, end, false, false, warnings
|
||
}
|
||
}
|
||
return start, end, false, false, warnings
|
||
}
|
||
|
||
func (s *PlannerService) Fill(checkables map[string]utils.BookingResource, wfID string) {
|
||
if missing := s.MissingPeers(checkables); len(missing) > 0 {
|
||
const plannerFetchTimeout = 2 * time.Second
|
||
tmpSession := "check-oneshot-" + wfID
|
||
ch, cancelSub := SubscribeUpdates(s.Subs, &s.SubMu, missing...)
|
||
owned := s.Refresh(missing, tmpSession)
|
||
select {
|
||
case <-ch:
|
||
case <-time.After(plannerFetchTimeout):
|
||
}
|
||
cancelSub()
|
||
s.ReleaseRefreshOwnership(owned, tmpSession)
|
||
}
|
||
}
|
||
|
||
// evictAfter waits ttl from first insertion then deletes the cache entry and
|
||
// emits PB_CLOSE_PLANNER so oc-discovery stops streaming for this peer.
|
||
// This is the only path that actually removes an entry from PlannerCache;
|
||
// session close (ReleaseRefreshOwnership) only resets ownership state.
|
||
func (s *PlannerService) EvictAfter(peerID string, ttl time.Duration) {
|
||
time.Sleep(ttl)
|
||
s.Mu.Lock()
|
||
_, exists := s.Cache[peerID]
|
||
if exists {
|
||
delete(s.Cache, peerID)
|
||
delete(s.AddedAt, peerID)
|
||
}
|
||
s.Mu.Unlock()
|
||
if exists {
|
||
utils.Notify(&s.SubMu, s.Subs, peerID, peerID)
|
||
utils.Propalgate(peerID, tools.PropalgationMessage{Action: tools.PB_CLOSE_PLANNER})
|
||
}
|
||
}
|
||
|
||
// SubscribePlannerUpdates registers interest in planner changes for the given
|
||
// peer IDs. The returned channel receives the peerID string (non-blocking) each
|
||
// time any of those planners is updated. Call cancel to unregister.
|
||
func SubscribeUpdates[T interface{}](subs map[string][]chan T, mu *sync.RWMutex, updates ...string) (<-chan T, func()) {
|
||
ch := make(chan T, 1)
|
||
mu.Lock()
|
||
for _, k := range updates {
|
||
subs[k] = append(subs[k], ch)
|
||
}
|
||
mu.Unlock()
|
||
cancel := func() {
|
||
mu.Lock()
|
||
for _, k := range updates {
|
||
subsk := subs[k]
|
||
for i, s := range subsk {
|
||
if s == ch {
|
||
subs[k] = append(subsk[:i], subsk[i+1:]...)
|
||
break
|
||
}
|
||
}
|
||
}
|
||
mu.Unlock()
|
||
}
|
||
return ch, cancel
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// Cache helpers
|
||
// ---------------------------------------------------------------------------
|
||
|
||
func (s *PlannerService) Store(peerID string, p *planner.Planner) {
|
||
s.Mu.Lock()
|
||
entry := s.Cache[peerID]
|
||
isNew := entry == nil
|
||
if isNew {
|
||
entry = &plannerEntry{}
|
||
s.Cache[peerID] = entry
|
||
s.AddedAt[peerID] = time.Now().UTC()
|
||
go s.EvictAfter(peerID, plannerTTL)
|
||
}
|
||
entry.Planner = p
|
||
s.Mu.Unlock()
|
||
utils.Notify[string](&s.SubMu, s.Subs, peerID, peerID)
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// Planner refresh / broadcast
|
||
// ---------------------------------------------------------------------------
|
||
|
||
// RequestPlannerRefresh asks oc-discovery for a fresh planner snapshot for
|
||
// each peer in peerIDs. Only the first session to request a given peer becomes
|
||
// its "refresh owner": subsequent sessions see Refreshing=true and skip the
|
||
// duplicate PB_PLANNER emission. Returns the subset of peerIDs for which this
|
||
// session claimed ownership (needed to release on close).
|
||
|
||
// RequestPlannerRefresh
|
||
func (s *PlannerService) Refresh(peerIDs []string, executionsID string) []string {
|
||
var owned []string
|
||
for _, peerID := range peerIDs {
|
||
s.Mu.Lock()
|
||
entry := s.Cache[peerID]
|
||
if entry == nil {
|
||
entry = &plannerEntry{}
|
||
s.Cache[peerID] = entry
|
||
s.AddedAt[peerID] = time.Now().UTC()
|
||
go s.EvictAfter(peerID, plannerTTL)
|
||
}
|
||
shouldRequest := !entry.Refreshing
|
||
if shouldRequest {
|
||
entry.Refreshing = true
|
||
entry.RefreshOwner = executionsID
|
||
}
|
||
s.Mu.Unlock()
|
||
if shouldRequest {
|
||
owned = append(owned, peerID)
|
||
if p, err := oclib.GetMySelf(); err == nil && p != nil && p.PeerID == peerID {
|
||
go s.RefreshSelf(peerID, &tools.APIRequest{Admin: true})
|
||
} else {
|
||
payload, _ := json.Marshal(map[string]any{"peer_id": peerID})
|
||
utils.Propalgate(peerID, tools.PropalgationMessage{
|
||
Action: tools.PB_PLANNER,
|
||
Payload: payload,
|
||
})
|
||
}
|
||
}
|
||
}
|
||
return owned
|
||
}
|
||
|
||
// ReleaseRefreshOwnership is called when a check session closes (clean or
|
||
// forced). For each peer this session owns, it resets the refresh state and
|
||
// emits PB_CLOSE_PLANNER so oc-discovery stops the planner stream.
|
||
// The planner data itself stays in the cache until TTL eviction.
|
||
func (s *PlannerService) ReleaseRefreshOwnership(peerIDs []string, executionsID string) {
|
||
for _, peerID := range peerIDs {
|
||
s.Mu.Lock()
|
||
if entry := s.Cache[peerID]; entry != nil && entry.RefreshOwner == executionsID {
|
||
entry.Refreshing = false
|
||
entry.RefreshOwner = ""
|
||
}
|
||
s.Mu.Unlock()
|
||
utils.Notify(&s.SubMu, s.Subs, peerID, peerID)
|
||
payload, _ := json.Marshal(map[string]any{"peer_id": peerID})
|
||
utils.Propalgate(peerID, tools.PropalgationMessage{
|
||
Action: tools.PB_CLOSE_PLANNER,
|
||
Payload: payload,
|
||
})
|
||
}
|
||
}
|
||
|
||
// broadcastPlanner iterates the storage and compute peers of the given workflow
|
||
// and, for each peer not yet in the cache, emits a PB_PLANNER propagation so
|
||
// downstream consumers (oc-discovery, other schedulers) refresh their state.
|
||
func (s *PlannerService) Broadcast(wf *workflow.Workflow) {
|
||
if wf.Graph == nil {
|
||
return
|
||
}
|
||
items := []graph.GraphItem{}
|
||
items = append(items, wf.GetGraphItems(wf.Graph.IsStorage)...)
|
||
items = append(items, wf.GetGraphItems(wf.Graph.IsCompute)...)
|
||
|
||
seen := []string{}
|
||
for _, item := range items {
|
||
_, res := item.GetResource()
|
||
if res == nil {
|
||
continue
|
||
}
|
||
creatorID := res.GetCreatorID()
|
||
if slices.Contains(seen, creatorID) {
|
||
continue
|
||
}
|
||
|
||
data := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil).LoadOne(creatorID)
|
||
p := data.ToPeer()
|
||
if p == nil {
|
||
continue
|
||
}
|
||
|
||
s.Mu.RLock()
|
||
cached := s.Cache[p.PeerID]
|
||
s.Mu.RUnlock()
|
||
|
||
// Only request if no snapshot and no refresh already in flight.
|
||
if cached == nil || (cached.Planner == nil && !cached.Refreshing) {
|
||
payload, err := json.Marshal(map[string]interface{}{"peer_id": p.PeerID})
|
||
if err != nil {
|
||
continue
|
||
}
|
||
seen = append(seen, creatorID)
|
||
utils.Propalgate(p.PeerID, tools.PropalgationMessage{
|
||
Action: tools.PB_PLANNER,
|
||
Payload: payload,
|
||
})
|
||
}
|
||
}
|
||
}
|
||
|
||
// ---------------------------------------------------------------------------
|
||
// Self-planner refresh
|
||
// ---------------------------------------------------------------------------
|
||
|
||
func (s *PlannerService) RefreshSelf(peerID string, request *tools.APIRequest) {
|
||
p, err := planner.GenerateShallow(request)
|
||
if err != nil {
|
||
fmt.Println("refreshSelfPlanner: could not generate planner:", err)
|
||
return
|
||
}
|
||
// Update the local cache and notify any waiting CheckStream goroutines.
|
||
s.Store(peerID, p)
|
||
// Broadcast the updated planner so remote peers (and oc-discovery) can
|
||
// refresh their view of our availability.
|
||
type plannerWithPeer struct {
|
||
PeerID string `json:"peer_id"`
|
||
*planner.Planner
|
||
}
|
||
plannerPayload, err := json.Marshal(plannerWithPeer{PeerID: peerID, Planner: p})
|
||
if err != nil {
|
||
return
|
||
}
|
||
utils.Propalgate(peerID, tools.PropalgationMessage{
|
||
Action: tools.PB_PLANNER,
|
||
Payload: plannerPayload,
|
||
})
|
||
}
|
||
|
||
// findNextSlot scans forward from 'from' in checkStepMin increments for up to
|
||
// windowH hours and returns the first candidate start time at which all
|
||
// resources are simultaneously free.
|
||
func (s *PlannerService) findNextSlot(resources map[string]utils.BookingResource, from time.Time, originalEnd *time.Time, windowH int) *time.Time {
|
||
duration := 5 * time.Minute
|
||
if originalEnd != nil {
|
||
if d := originalEnd.Sub(from); d > 0 {
|
||
duration = d
|
||
}
|
||
}
|
||
step := time.Duration(checkStepMin) * time.Minute
|
||
limit := from.Add(time.Duration(windowH) * time.Hour)
|
||
for t := from.Add(step); t.Before(limit); t = t.Add(step) {
|
||
e := t.Add(duration)
|
||
if unavail, _ := s.checkResourceAvailability(resources, t, &e); len(unavail) == 0 {
|
||
return &t
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// checkResourceAvailability returns the IDs of unavailable resources and
|
||
// human-readable warning messages.
|
||
func (s *PlannerService) checkResourceAvailability(res map[string]utils.BookingResource, start time.Time, end *time.Time) (unavailable []string, warnings []string) {
|
||
for _, r := range res {
|
||
s.Mu.RLock()
|
||
entry := s.Cache[r.PeerPID]
|
||
s.Mu.RUnlock()
|
||
if entry == nil || entry.Planner == nil {
|
||
warnings = append(warnings, fmt.Sprintf(
|
||
"peer %s planner not in cache for resource %s – assuming available", r.PeerPID, r.ID))
|
||
continue
|
||
}
|
||
if !s.checkInstance(entry.Planner, r.ID, r.InstanceID, start, end) {
|
||
unavailable = append(unavailable, r.ID)
|
||
warnings = append(warnings, fmt.Sprintf(
|
||
"resource %s is not available in [%s – %s]",
|
||
r.ID, start.Format(time.RFC3339), utils.FormatOptTime(end)))
|
||
}
|
||
}
|
||
return
|
||
}
|
||
|
||
// CheckResourceInstance checks whether a resource/instance is available on the
|
||
// local planner cache for the given peer. Called by scheduling_resources when
|
||
// validating an incoming booking creation.
|
||
func (s *PlannerService) CheckResourceInstance(peerID, resourceID, instanceID string, start time.Time, end *time.Time) bool {
|
||
s.Mu.RLock()
|
||
entry := s.Cache[peerID]
|
||
s.Mu.RUnlock()
|
||
if entry == nil || entry.Planner == nil {
|
||
return true // no planner cached → assume available
|
||
}
|
||
return s.checkInstance(entry.Planner, resourceID, instanceID, start, end)
|
||
}
|
||
|
||
// SubscribePlannerUpdates returns a channel that receives a peerID each time
|
||
// one of the given peers' planners is updated.
|
||
func (s *PlannerService) SubscribePlannerUpdates(peerIDs ...string) (<-chan string, func()) {
|
||
return SubscribeUpdates[string](s.Subs, &s.SubMu, peerIDs...)
|
||
}
|
||
|
||
// SubscribeWorkflowUpdates returns a channel signalled when the workflow changes.
|
||
func (s *PlannerService) SubscribeWorkflowUpdates(wfID string) (<-chan struct{}, func()) {
|
||
return SubscribeUpdates[struct{}](s.WorkflowSubs, &s.WorkflowSubMu, wfID)
|
||
}
|
||
|
||
// NotifyWorkflow signals all subscribers watching wfID.
|
||
func (s *PlannerService) NotifyWorkflow(wfID string) {
|
||
utils.Notify[struct{}](&s.WorkflowSubMu, s.WorkflowSubs, wfID, struct{}{})
|
||
}
|
||
|
||
// checkInstance checks availability for the specific instance resolved by the
|
||
// scheduler. When instanceID is empty (no instance selected / none resolvable),
|
||
// it falls back to checking all instances known in the planner and returns true
|
||
// if any one has remaining capacity. Returns true when no capacity is recorded.
|
||
func (s *PlannerService) checkInstance(p *planner.Planner, resourceID string, instanceID string, start time.Time, end *time.Time) bool {
|
||
if instanceID != "" {
|
||
return p.Check(resourceID, instanceID, nil, start, end)
|
||
}
|
||
caps, ok := p.Capacities[resourceID]
|
||
if !ok || len(caps) == 0 {
|
||
return true
|
||
}
|
||
for id := range caps {
|
||
if p.Check(resourceID, id, nil, start, end) {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|