package infrastructure import ( "encoding/json" "slices" "sync" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" ) var ressourceCols = []oclib.LibDataEnum{ oclib.LibDataEnum(oclib.COMPUTE_RESOURCE), oclib.LibDataEnum(oclib.DATA_RESOURCE), oclib.LibDataEnum(oclib.PROCESSING_RESOURCE), oclib.LibDataEnum(oclib.STORAGE_RESOURCE), oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE), } var SearchMu sync.RWMutex var SearchStream = map[string]chan []byte{} var SearchStreamSeen = map[string][]string{} func EmitNATS(user string, groups []string, message tools.PropalgationMessage) { b, _ := json.Marshal(message) switch message.Action { case tools.PB_SEARCH: SearchMu.Lock() SearchStream[user] = make(chan []byte, 128) SearchStreamSeen[user] = make([]string, 128) SearchMu.Unlock() tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-catalog", Datatype: -1, User: user, Groups: groups, Method: int(tools.PROPALGATION_EVENT), Payload: b, }) } } // un ressource quand on l'ajoute à notre catalogue elle nous est étrangère. // pour se la réaffecté à soit, on peut alors changer le créator ID. // pour protéger une ressource l'idée serait de la signée. // si on la stocke en base, elle va se dépréciée plus encore si le user n'est pas un partenaire. // elle ne sera pas maintenue à jour. Si c'est une ressource publique et qu'elle change // l'offre peut disparaitre mais subsisté chez nous. // alors si on en dispose et qu'on souhaite l'exploité. On doit en vérifier la validité... ou... // la mettre à jour. Le problème de la mise à jour c'est qu'on peut facilement // overflow.... de stream pour avoir à jour sa ressource. // donc l'idée est que la vérification soit manuelle... ou lors d'une vérification de dernière instance. // si une ressource est exploitée dans un workflow ou un shared workspace. // elle doit être vérifié par les pairs engagés. // si la donnée est déclaré comme donnée de l'emmetteur alors on vérifie que la signature est bien émise, par // l'emmetteur. Sinon... on doit interrogé le pair qui a émit la donnée. Est ce que la donnée est à jour. // lui va vérifier la signature de la ressource qu'il possède correspondante si elle existe, si non. AIE, // on met à jour mais on pète une erreur. func ListenNATS() { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ tools.SEARCH_EVENT: func(resp tools.NATSResponse) { if !slices.Contains(ressourceCols, oclib.LibDataEnum(resp.Datatype)) { return } p, err := resources.ToResource(int(resp.Datatype), resp.Payload) if err != nil { return } // Exclude resources already present in the local catalog if check := oclib.NewRequestAdmin(oclib.LibDataEnum(resp.Datatype), nil).LoadOne(p.GetID()); check.Data == nil { p.SetNotInCatalog(true) return } wrapped, merr := json.Marshal(map[string]interface{}{ "dtype": p.GetType(), "data": p, }) if merr != nil { return } SearchMu.Lock() if SearchStreamSeen[resp.User] != nil && slices.Contains(SearchStreamSeen[resp.User], p.GetID()) { SearchStream[resp.User] <- wrapped // TODO when do we update it in our catalog ? } if SearchStreamSeen[resp.User] == nil { SearchStreamSeen[resp.User] = []string{} } SearchStreamSeen[resp.User] = append(SearchStreamSeen[resp.User], p.GetID()) SearchMu.Unlock() }, // ── WORKFLOW_STEP_DONE_EVENT ───────────────────────────────────────── // Real-time update: one booking just completed → update its resource instance. tools.WORKFLOW_STEP_DONE_EVENT: func(resp tools.NATSResponse) { var evt tools.WorkflowLifecycleEvent if err := json.Unmarshal(resp.Payload, &evt); err != nil || evt.BookingID == "" { return } updateInstanceFromStep(tools.StepMetric{ BookingID: evt.BookingID, State: evt.State, RealStart: evt.RealStart, RealEnd: evt.RealEnd, }) }, // ── WORKFLOW_DONE_EVENT ────────────────────────────────────────────── // Recap: apply all steps in case STEP_DONE events were missed while // oc-catalog was down. Processing is idempotent (same duration wins // when times are identical; running average converges anyway). tools.WORKFLOW_DONE_EVENT: func(resp tools.NATSResponse) { var evt tools.WorkflowLifecycleEvent if err := json.Unmarshal(resp.Payload, &evt); err != nil { return } for _, step := range evt.Steps { updateInstanceFromStep(step) } }, }) } // updateInstanceFromStep loads the booking identified by step.BookingID, then // updates the AverageDuration of the resource instance only if this peer owns // the resource (creator_id == self PeerID). Idempotent: safe to call twice. func updateInstanceFromStep(step tools.StepMetric) { if step.RealStart == nil || step.RealEnd == nil { return } actualS := step.RealEnd.Sub(*step.RealStart).Seconds() if actualS <= 0 { return } adminReq := &tools.APIRequest{Admin: true} // Resolve resource info from the booking. bkRes, _, err := booking.NewAccessor(adminReq).LoadOne(step.BookingID) if err != nil || bkRes == nil { return } bk := bkRes.(*booking.Booking) // Only update resources this peer owns. self, selfErr := oclib.GetMySelf() if selfErr != nil || self == nil { return } switch bk.ResourceType { case tools.COMPUTE_RESOURCE, tools.LIVE_DATACENTER: res, _, err := (&resources.ComputeResource{}).GetAccessor(adminReq).LoadOne(bk.ResourceID) if err != nil || res == nil { return } compute := res.(*resources.ComputeResource) if compute.GetCreatorID() != self.PeerID { return } for _, inst := range compute.Instances { if inst.GetID() == bk.InstanceID { inst.UpdateAverageDuration(actualS) break } } utils.GenericRawUpdateOne(compute, compute.GetID(), compute.GetAccessor(adminReq)) case tools.STORAGE_RESOURCE, tools.LIVE_STORAGE: res, _, err := (&resources.StorageResource{}).GetAccessor(adminReq).LoadOne(bk.ResourceID) if err != nil || res == nil { return } storage := res.(*resources.StorageResource) if storage.GetCreatorID() != self.PeerID { return } for _, inst := range storage.Instances { if inst.GetID() == bk.InstanceID { inst.UpdateAverageDuration(actualS) break } } utils.GenericRawUpdateOne(storage, storage.GetID(), storage.GetAccessor(adminReq)) } }