package infrastructure import ( "encoding/json" "fmt" "slices" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/tools" ) var ressourceCols = []oclib.LibDataEnum{ oclib.LibDataEnum(oclib.PEER), } var SearchMu sync.RWMutex var SearchStreamAction = map[string][]*peer.Peer{} var SearchStream = map[string]chan WSMessage{} func EmitNATS(user string, groups []string, message tools.PropalgationMessage) { b, _ := json.Marshal(message) if message.Action == tools.PB_SEARCH { SearchMu.Lock() SearchStream[user] = make(chan WSMessage, 128) SearchStreamAction[user] = []*peer.Peer{} fmt.Println("NEW PB") SearchMu.Unlock() } tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-peer", Datatype: -1, User: user, Groups: groups, Method: int(tools.PROPALGATION_EVENT), Payload: b, }) } var self *peer.Peer func ListenNATS() { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ // ORG_PARTNER_EVENT is delivered by our local oc-discovery after receiving a // libp2p stream from the remote. The payload "type" field routes to one of two // sub-flows: // "check" — we are master: validate candidate, emit confirm via PROPALGATION // "confirm" — we are requester: upgrade (or discard) the candidate's relation tools.ORG_PARTNER_EVENT: func(resp tools.NATSResponse) { if resp.FromApp == config.GetAppName() { return } var msg struct { Type string `json:"type"` RequesterID string `json:"requester_id"` RequesterPeerID string `json:"requester_peer_id"` CandidateID string `json:"candidate_id"` MasterID string `json:"master_id"` Confirmed bool `json:"confirmed"` } if err := json.Unmarshal(resp.Payload, &msg); err != nil || msg.CandidateID == "" { return } switch msg.Type { case "check": self, _ := oclib.GetMySelf() if self == nil || self.GetID() != msg.MasterID { return } access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) candidate := access.LoadOne(msg.CandidateID) confirmed := candidate.Data != nil && candidate.ToPeer().Relation == peer.ORGANIZATION_MEMBER confirmPayload, _ := json.Marshal(map[string]interface{}{ "type": "confirm", "requester_peer_id": msg.RequesterPeerID, "candidate_id": msg.CandidateID, "confirmed": confirmed, }) propMsg := tools.PropalgationMessage{ Action: tools.PB_ORG_PARTNER, DataType: int(tools.PEER), Payload: confirmPayload, } b, _ := json.Marshal(propMsg) tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: config.GetAppName(), Datatype: tools.PEER, User: resp.User, Groups: resp.Groups, Method: int(tools.PROPALGATION_EVENT), Payload: b, }) case "confirm": access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) relation := peer.NONE if msg.Confirmed { relation = peer.ORGANIZATION_PARTNER } access.UpdateOne(map[string]interface{}{ "relation": relation, "verify": false, }, msg.CandidateID) } }, tools.CREATE_RESOURCE: func(resp tools.NATSResponse) { if resp.FromApp == config.GetAppName() || !slices.Contains(ressourceCols, oclib.LibDataEnum(resp.Datatype)) { return } self, _ := oclib.GetMySelf() if self != nil && self.IsNano { return } p := &peer.Peer{} if err := json.Unmarshal(resp.Payload, &p); err == nil { fmt.Println("CREATE_RESOURCE", p.GetID()) if self.GetID() == p.GetID() { fmt.Println("it's ourselve !") return } access := oclib.NewRequestAdmin(oclib.LibDataEnum(resp.Datatype), nil) if data := access.LoadOne(p.GetID()); data.Data != nil { if p.Relation == peer.PENDING_PARTNER || p.Relation == peer.PARTNER { if data.ToPeer().Verify { fmt.Println("UPDATE 2", p.GetID()) access.UpdateOne(map[string]interface{}{ "verify": false, "relation": peer.PARTNER, }, p.GetID()) } else { access.UpdateOne(map[string]interface{}{ "verify": true, "relation": peer.PENDING_PARTNER, }, p.GetID()) } } else if data.ToPeer().Relation == peer.NONE { access.UpdateOne(map[string]interface{}{ "verify": false, "relation": p.Relation, }, p.GetID()) } } else if p.Relation == peer.PENDING_NANO || p.Relation == peer.NANO { if data.ToPeer().Verify { access.UpdateOne(map[string]interface{}{ "verify": false, "relation": peer.MASTER, }, p.GetID()) } else { access.UpdateOne(map[string]interface{}{ "verify": true, "relation": peer.PENDING_MASTER, }, p.GetID()) } } else if p.Relation != peer.SELF && p.Relation != peer.BLACKLIST { if p.Relation == peer.PARTNER || p.Relation == peer.PENDING_PARTNER { p.Verify = true p.Relation = peer.PENDING_PARTNER } p.IsNano = config.GetConfig().IsNano // If the incoming peer shares our OrganizationMasterID, initiate org partner // verification with our master before accepting them as org partner. if self != nil && self.OrganizationMasterID != "" && p.OrganizationMasterID == self.OrganizationMasterID { go requestOrgPartnerVerification(self, p, resp.User, resp.Groups) return } access.StoreOne(p.Serialize(p)) } } }, tools.SEARCH_EVENT: func(resp tools.NATSResponse) { if !slices.Contains(ressourceCols, oclib.LibDataEnum(resp.Datatype)) { return } p := &peer.Peer{} if err := json.Unmarshal(resp.Payload, p); err != nil { return } access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) fmt.Println("ADD in SEARCH STREAM", p.GetID()) if s := access.Search(&dbs.Filters{ And: map[string][]dbs.Filter{ "peer_id": {{Operator: dbs.EQUAL.String(), Value: p.PeerID}}, }, }, "", false, 0, 1); len(s.Data) > 0 { p.Relation = s.Data[0].(*peer.Peer).Relation } else { p.NotInCatalog = true } // Stamp volatile online state from cache. p.Online = IsOnline(p.PeerID) now := time.Now() if p.Online { p.LastHeartbeat = &now } SearchMu.RLock() if ch, ok := SearchStream[resp.User]; ok { select { case ch <- WSMessage{Type: "peer", Peer: p}: default: } } SearchMu.RUnlock() }, // PEER_OBSERVE_RESPONSE_EVENT is emitted by oc-discovery when it // receives a heartbeat from an observed remote peer. tools.PEER_OBSERVE_RESPONSE_EVENT: func(resp tools.NATSResponse) { var batch struct { PeerIDs []string `json:"peer_ids"` Metrics map[string]*PeerConnectivityMetrics `json:"metrics"` } if err := json.Unmarshal(resp.Payload, &batch); err != nil { return } if len(batch.PeerIDs) == 0 { return } fmt.Println("METRICS", batch.Metrics) HandleHeartbeatBatch(batch.PeerIDs, batch.Metrics) }, }) } // requestOrgPartnerVerification stores the candidate as KNOWN and asks our // OrganizationMaster (via PROPALGATION_EVENT → oc-discovery → libp2p stream) // whether the candidate is really one of its ORGANIZATION_MEMBERs. // The ORG_PARTNER_EVENT "confirm" branch upgrades the relation once the master replies. func requestOrgPartnerVerification(self *peer.Peer, candidate *peer.Peer, user string, groups []string) { // Store candidate as NONE (path "known") — no privilege until master confirms. access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) candidate.Relation = peer.NONE candidate.Verify = false access.StoreOne(candidate.Serialize(candidate)) checkPayload, _ := json.Marshal(map[string]interface{}{ "type": "check", "requester_id": self.GetID(), "requester_peer_id": self.PeerID, "candidate_id": candidate.GetID(), "master_id": self.OrganizationMasterID, }) propMsg := tools.PropalgationMessage{ Action: tools.PB_ORG_PARTNER, DataType: int(tools.PEER), Payload: checkPayload, } b, _ := json.Marshal(propMsg) tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: config.GetAppName(), Datatype: tools.PEER, User: user, Groups: groups, Method: int(tools.PROPALGATION_EVENT), Payload: b, }) }