Add verification

This commit is contained in:
mr
2026-03-12 12:05:52 +01:00
parent 98fe2600b3
commit b9df0b2731
8 changed files with 275 additions and 64 deletions

View File

@@ -29,8 +29,20 @@ const plannerTTL = 24 * time.Hour
// Planner cache — protected by plannerMu
// ---------------------------------------------------------------------------
// plannerEntry wraps a planner snapshot with refresh-ownership tracking.
// At most one check session may be the "refresh owner" of a given peer's
// planner at a time: it emits PB_PLANNER to request a fresh snapshot from
// oc-discovery and, on close (clean or forced), emits PB_CLOSE_PLANNER to
// release the stream. Any subsequent session that needs the same peer's
// planner will see Refreshing=true and skip the duplicate request.
type plannerEntry struct {
Planner *planner.Planner
Refreshing bool // true while a PB_PLANNER request is in flight
RefreshOwner string // session UUID that initiated the current refresh
}
var plannerMu sync.RWMutex
var PlannerCache = map[string]*planner.Planner{}
var PlannerCache = map[string]*plannerEntry{}
var plannerAddedAt = map[string]time.Time{} // peerID → first-seen timestamp
// ---------------------------------------------------------------------------
@@ -104,29 +116,39 @@ func notify(mu *sync.RWMutex, registry map[string][]chan struct{}, key string) {
// Cache helpers
// ---------------------------------------------------------------------------
// storePlanner inserts or updates a planner for peerID.
// storePlanner inserts or updates the planner snapshot for peerID.
// On first insertion it schedules an automatic eviction after plannerTTL.
// Existing refresh-ownership state (Refreshing / RefreshOwner) is preserved
// so that an in-flight request is not inadvertently reset.
// All subscribers interested in this peer are notified.
func storePlanner(peerID string, p *planner.Planner) {
plannerMu.Lock()
isNew := PlannerCache[peerID] == nil
PlannerCache[peerID] = p
entry := PlannerCache[peerID]
isNew := entry == nil
if isNew {
entry = &plannerEntry{}
PlannerCache[peerID] = entry
plannerAddedAt[peerID] = time.Now()
go evictAfter(peerID, plannerTTL)
}
entry.Planner = p
plannerMu.Unlock()
notifyPlannerWatchers(peerID)
}
// evictAfter waits ttl from the first-seen time for peerID then emits a
// PB_CLOSE_PLANNER event, which removes the entry from the cache and notifies
// NATS.
// evictAfter waits ttl from first insertion then deletes the cache entry and
// emits PB_CLOSE_PLANNER so oc-discovery stops streaming for this peer.
// This is the only path that actually removes an entry from PlannerCache;
// session close (ReleaseRefreshOwnership) only resets ownership state.
func evictAfter(peerID string, ttl time.Duration) {
time.Sleep(ttl)
plannerMu.RLock()
plannerMu.Lock()
_, exists := PlannerCache[peerID]
plannerMu.RUnlock()
if exists {
delete(PlannerCache, peerID)
delete(plannerAddedAt, peerID)
}
plannerMu.Unlock()
if exists {
EmitNATS(peerID, tools.PropalgationMessage{Action: tools.PB_CLOSE_PLANNER})
}
@@ -137,12 +159,11 @@ func evictAfter(peerID string, ttl time.Duration) {
// ---------------------------------------------------------------------------
func EmitNATS(peerID string, message tools.PropalgationMessage) {
// PB_CLOSE_PLANNER: notify local watchers so streams re-evaluate.
// Cache mutations (eviction or ownership reset) are the caller's
// responsibility — see evictAfter and ReleaseRefreshOwnership.
if message.Action == tools.PB_CLOSE_PLANNER {
plannerMu.Lock()
delete(PlannerCache, peerID)
delete(plannerAddedAt, peerID)
plannerMu.Unlock()
notifyPlannerWatchers(peerID) // let streams re-evaluate (will warn "no planner")
notifyPlannerWatchers(peerID)
}
b, _ := json.Marshal(message)
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
@@ -309,6 +330,17 @@ func ListenNATS() {
// considered in the target WorkflowExecution (BookingsState / PurchasesState).
// WORKFLOW_EXECUTION → the execution reached SCHEDULED; confirm all
// local draft bookings and purchases listed in its states.
tools.PLANNER_EXECUTION: func(resp tools.NATSResponse) {
m := map[string]interface{}{}
p := planner.Planner{}
if err := json.Unmarshal(resp.Payload, &m); err != nil {
return
}
if err := json.Unmarshal(resp.Payload, &p); err != nil {
return
}
storePlanner(fmt.Sprintf("%v", m["peer_id"]), &p)
},
tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) {
if resp.FromApp != "oc-discovery" {
return
@@ -318,16 +350,6 @@ func ListenNATS() {
return
}
switch prop.Action {
case tools.PB_PLANNER:
m := map[string]interface{}{}
p := planner.Planner{}
if err := json.Unmarshal(prop.Payload, &m); err != nil {
return
}
if err := json.Unmarshal(prop.Payload, &p); err != nil {
return
}
storePlanner(fmt.Sprintf("%v", m["peer_id"]), &p)
case tools.PB_CONSIDERS:
switch tools.DataType(prop.DataType) {
case tools.BOOKING, tools.PURCHASE_RESOURCE:
@@ -379,9 +401,9 @@ func ListenNATS() {
}
// Verify the slot is free in our planner (if we have one).
plannerMu.RLock()
p := PlannerCache[self.PeerID]
selfEntry := PlannerCache[self.PeerID]
plannerMu.RUnlock()
if p != nil && !checkInstance(p, bk.ResourceID, bk.InstanceID, bk.ExpectedStartDate, bk.ExpectedEndDate) {
if selfEntry != nil && selfEntry.Planner != nil && !checkInstance(selfEntry.Planner, bk.ResourceID, bk.InstanceID, bk.ExpectedStartDate, bk.ExpectedEndDate) {
fmt.Println("ListenNATS: booking conflicts with local planner, discarding")
return
}
@@ -598,6 +620,60 @@ func refreshSelfPlanner(peerID string, request *tools.APIRequest) {
// Planner broadcast
// ---------------------------------------------------------------------------
// RequestPlannerRefresh asks oc-discovery for a fresh planner snapshot for
// each peer in peerIDs. Only the first session to request a given peer becomes
// its "refresh owner": subsequent sessions see Refreshing=true and skip the
// duplicate PB_PLANNER emission. Returns the subset of peerIDs for which this
// session claimed ownership (needed to release on close).
func RequestPlannerRefresh(peerIDs []string, sessionID string) []string {
var owned []string
for _, peerID := range peerIDs {
plannerMu.Lock()
entry := PlannerCache[peerID]
if entry == nil {
entry = &plannerEntry{}
PlannerCache[peerID] = entry
plannerAddedAt[peerID] = time.Now()
go evictAfter(peerID, plannerTTL)
}
shouldRequest := !entry.Refreshing
if shouldRequest {
entry.Refreshing = true
entry.RefreshOwner = sessionID
}
plannerMu.Unlock()
if shouldRequest {
owned = append(owned, peerID)
payload, _ := json.Marshal(map[string]any{"peer_id": peerID})
EmitNATS(peerID, tools.PropalgationMessage{
Action: tools.PB_PLANNER,
Payload: payload,
})
}
}
return owned
}
// ReleaseRefreshOwnership is called when a check session closes (clean or
// forced). For each peer this session owns, it resets the refresh state and
// emits PB_CLOSE_PLANNER so oc-discovery stops the planner stream.
// The planner data itself stays in the cache until TTL eviction.
func ReleaseRefreshOwnership(peerIDs []string, sessionID string) {
for _, peerID := range peerIDs {
plannerMu.Lock()
if entry := PlannerCache[peerID]; entry != nil && entry.RefreshOwner == sessionID {
entry.Refreshing = false
entry.RefreshOwner = ""
}
plannerMu.Unlock()
payload, _ := json.Marshal(map[string]any{"peer_id": peerID})
EmitNATS(peerID, tools.PropalgationMessage{
Action: tools.PB_CLOSE_PLANNER,
Payload: payload,
})
}
}
// broadcastPlanner iterates the storage and compute peers of the given workflow
// and, for each peer not yet in the cache, emits a PB_PLANNER propagation so
// downstream consumers (oc-discovery, other schedulers) refresh their state.
@@ -631,7 +707,8 @@ func broadcastPlanner(wf *workflow.Workflow) {
cached := PlannerCache[p.PeerID]
plannerMu.RUnlock()
if cached == nil {
// Only request if no snapshot and no refresh already in flight.
if cached == nil || (cached.Planner == nil && !cached.Refreshing) {
payload, err := json.Marshal(map[string]interface{}{"peer_id": p.PeerID})
if err != nil {
continue