package infrastructure import ( "encoding/json" "fmt" "slices" "sync" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/tools" ) var ressourceCols = []oclib.LibDataEnum{ oclib.LibDataEnum(oclib.PEER), } var SearchMu sync.RWMutex var SearchStreamAction = map[string][]*peer.Peer{} var SearchStream = map[string]chan *peer.Peer{} func EmitNATS(user string, groups []string, message tools.PropalgationMessage) { b, _ := json.Marshal(message) if message.Action == tools.PB_SEARCH { SearchMu.Lock() SearchStream[user] = make(chan *peer.Peer, 128) SearchStreamAction[user] = []*peer.Peer{} SearchMu.Unlock() } tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-peer", Datatype: -1, User: user, Groups: groups, Method: int(tools.PROPALGATION_EVENT), Payload: b, }) } // un ressource quand on l'ajoute à notre catalogue elle nous est étrangère. // pour se la réaffecté à soit, on peut alors changer le créator ID. // pour protéger une ressource l'idée serait de la signée. // si on la stocke en base, elle va se dépréciée plus encore si le user n'est pas un partenaire. // elle ne sera pas maintenue à jour. Si c'est une ressource publique et qu'elle change // l'offre peut disparaitre mais subsisté chez nous. // alors si on en dispose et qu'on souhaite l'exploité. On doit en vérifier la validité... ou... // la mettre à jour. Le problème de la mise à jour c'est qu'on peut facilement // overflow.... de stream pour avoir à jour sa ressource. // donc l'idée est que la vérification soit manuelle... ou lors d'une vérification de dernière instance. // si une ressource est exploitée dans un workflow ou un shared workspace. // elle doit être vérifié par les pairs engagés. // si la donnée est déclaré comme donnée de l'emmetteur alors on vérifie que la signature est bien émise, par // l'emmetteur. Sinon... on doit interrogé le pair qui a émit la donnée. Est ce que la donnée est à jour. // lui va vérifier la signature de la ressource qu'il possède correspondante si elle existe, si non. AIE, // on met à jour mais on pète une erreur. var self *peer.Peer func ListenNATS() { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ tools.CREATE_RESOURCE: func(resp tools.NATSResponse) { if resp.FromApp == config.GetAppName() || !slices.Contains(ressourceCols, oclib.LibDataEnum(resp.Datatype)) { return } p := &peer.Peer{} if err := json.Unmarshal(resp.Payload, &p); err == nil { /*if err := verify(resp.Payload); err != nil { return // don't trust anyone... only friends and foes are privilege }*/ fmt.Println("CREATE_RESOURCE", p.GetID()) if ok, _ := oclib.IsMySelf(p.GetID()); ok { fmt.Println("it's ourselve !") return } access := oclib.NewRequestAdmin(oclib.LibDataEnum(resp.Datatype), nil) if data := access.LoadOne(p.GetID()); data.Data != nil { if p.Relation == peer.PENDING_PARTNER || p.Relation == peer.PARTNER { if data.ToPeer().Verify { fmt.Println("UPDATE 2", p.GetID()) access.UpdateOne(map[string]interface{}{ "verify": false, "relation": peer.PARTNER, }, p.GetID()) } else { access.UpdateOne(map[string]interface{}{ "verify": true, "relation": peer.PENDING_PARTNER, }, p.GetID()) } } else if data.ToPeer().Relation == peer.NONE { access.UpdateOne(map[string]interface{}{ "verify": false, "relation": p.Relation, }, p.GetID()) } } else if p.Relation != peer.SELF && p.Relation != peer.BLACKLIST { if p.Relation == peer.PARTNER || p.Relation == peer.PENDING_PARTNER { p.Verify = true p.Relation = peer.PENDING_PARTNER } access.StoreOne(p.Serialize(p)) } } }, tools.SEARCH_EVENT: func(resp tools.NATSResponse) { if !slices.Contains(ressourceCols, oclib.LibDataEnum(resp.Datatype)) { return } p := &peer.Peer{} err := json.Unmarshal(resp.Payload, p) if err == nil { access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) fmt.Println("ADD in SEARCH STREAM", p.GetID()) if s := access.Search(&dbs.Filters{ And: map[string][]dbs.Filter{ "peer_id": {{Operator: dbs.EQUAL.String(), Value: p.PeerID}}, }, }, "", false); len(s.Data) > 0 { p.Relation = s.Data[0].(*peer.Peer).Relation } else { p.NotInCatalog = true } SearchMu.Lock() SearchStream[resp.User] <- p // TODO when do we update it in our catalog ? SearchMu.Unlock() } }, }) }