Simplify but Complete Catalog

This commit is contained in:
mr
2026-04-01 15:56:05 +02:00
parent eeb11a7b8b
commit 163a4165b8
18 changed files with 1252 additions and 2446 deletions

View File

@@ -2,12 +2,13 @@ package infrastructure
import (
"encoding/json"
"fmt"
"slices"
"sync"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
@@ -20,14 +21,16 @@ var ressourceCols = []oclib.LibDataEnum{
}
var SearchMu sync.RWMutex
var SearchStream = map[string]chan resources.ResourceInterface{}
var SearchStream = map[string]chan []byte{}
var SearchStreamSeen = map[string][]string{}
func EmitNATS(user string, groups []string, message tools.PropalgationMessage) {
b, _ := json.Marshal(message)
switch message.Action {
case tools.PB_SEARCH:
SearchMu.Lock()
SearchStream[user] = make(chan resources.ResourceInterface, 128)
SearchStream[user] = make(chan []byte, 128)
SearchStreamSeen[user] = make([]string, 128)
SearchMu.Unlock()
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-catalog",
@@ -65,13 +68,123 @@ func ListenNATS() {
return
}
p, err := resources.ToResource(int(resp.Datatype), resp.Payload)
if err == nil {
fmt.Println("SearchStream", p)
SearchMu.Lock()
fmt.Println(SearchStream, resp.User)
SearchStream[resp.User] <- p // TODO when do we update it in our catalog ?*
SearchMu.Unlock()
if err != nil {
return
}
// Exclude resources already present in the local catalog
if check := oclib.NewRequestAdmin(oclib.LibDataEnum(resp.Datatype), nil).LoadOne(p.GetID()); check.Data == nil {
p.SetNotInCatalog(true)
return
}
wrapped, merr := json.Marshal(map[string]interface{}{
"dtype": p.GetType(),
"data": p,
})
if merr != nil {
return
}
SearchMu.Lock()
if SearchStreamSeen[resp.User] != nil && slices.Contains(SearchStreamSeen[resp.User], p.GetID()) {
SearchStream[resp.User] <- wrapped // TODO when do we update it in our catalog ?
}
if SearchStreamSeen[resp.User] == nil {
SearchStreamSeen[resp.User] = []string{}
}
SearchStreamSeen[resp.User] = append(SearchStreamSeen[resp.User], p.GetID())
SearchMu.Unlock()
},
// ── WORKFLOW_STEP_DONE_EVENT ─────────────────────────────────────────
// Real-time update: one booking just completed → update its resource instance.
tools.WORKFLOW_STEP_DONE_EVENT: func(resp tools.NATSResponse) {
var evt tools.WorkflowLifecycleEvent
if err := json.Unmarshal(resp.Payload, &evt); err != nil || evt.BookingID == "" {
return
}
updateInstanceFromStep(tools.StepMetric{
BookingID: evt.BookingID,
State: evt.State,
RealStart: evt.RealStart,
RealEnd: evt.RealEnd,
})
},
// ── WORKFLOW_DONE_EVENT ──────────────────────────────────────────────
// Recap: apply all steps in case STEP_DONE events were missed while
// oc-catalog was down. Processing is idempotent (same duration wins
// when times are identical; running average converges anyway).
tools.WORKFLOW_DONE_EVENT: func(resp tools.NATSResponse) {
var evt tools.WorkflowLifecycleEvent
if err := json.Unmarshal(resp.Payload, &evt); err != nil {
return
}
for _, step := range evt.Steps {
updateInstanceFromStep(step)
}
},
})
}
// updateInstanceFromStep loads the booking identified by step.BookingID, then
// updates the AverageDuration of the resource instance only if this peer owns
// the resource (creator_id == self PeerID). Idempotent: safe to call twice.
func updateInstanceFromStep(step tools.StepMetric) {
if step.RealStart == nil || step.RealEnd == nil {
return
}
actualS := step.RealEnd.Sub(*step.RealStart).Seconds()
if actualS <= 0 {
return
}
adminReq := &tools.APIRequest{Admin: true}
// Resolve resource info from the booking.
bkRes, _, err := booking.NewAccessor(adminReq).LoadOne(step.BookingID)
if err != nil || bkRes == nil {
return
}
bk := bkRes.(*booking.Booking)
// Only update resources this peer owns.
self, selfErr := oclib.GetMySelf()
if selfErr != nil || self == nil {
return
}
switch bk.ResourceType {
case tools.COMPUTE_RESOURCE, tools.LIVE_DATACENTER:
res, _, err := (&resources.ComputeResource{}).GetAccessor(adminReq).LoadOne(bk.ResourceID)
if err != nil || res == nil {
return
}
compute := res.(*resources.ComputeResource)
if compute.GetCreatorID() != self.PeerID {
return
}
for _, inst := range compute.Instances {
if inst.GetID() == bk.InstanceID {
inst.UpdateAverageDuration(actualS)
break
}
}
utils.GenericRawUpdateOne(compute, compute.GetID(), compute.GetAccessor(adminReq))
case tools.STORAGE_RESOURCE, tools.LIVE_STORAGE:
res, _, err := (&resources.StorageResource{}).GetAccessor(adminReq).LoadOne(bk.ResourceID)
if err != nil || res == nil {
return
}
storage := res.(*resources.StorageResource)
if storage.GetCreatorID() != self.PeerID {
return
}
for _, inst := range storage.Instances {
if inst.GetID() == bk.InstanceID {
inst.UpdateAverageDuration(actualS)
break
}
}
utils.GenericRawUpdateOne(storage, storage.GetID(), storage.GetAccessor(adminReq))
}
}