package node import ( "context" "encoding/json" "fmt" "oc-discovery/daemons/node/common" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/tools" pp "github.com/libp2p/go-libp2p/core/peer" ) func ListenNATS(n Node) { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ tools.CREATE_PEER: func(resp tools.NATSResponse) { if resp.FromApp == config.GetAppName() { return } logger := oclib.GetLogger() m := map[string]interface{}{} err := json.Unmarshal(resp.Payload, &m) if err != nil { logger.Err(err) return } p := &peer.Peer{} p = p.Deserialize(m, p).(*peer.Peer) ad, err := pp.AddrInfoFromString(p.PeerID) n.StreamService.Mu.Lock() defer n.StreamService.Mu.Unlock() if p.Relation == peer.PARTNER { n.StreamService.ConnectToPartner(ad.ID, ad) } else { ps := common.ProtocolStream{} for p, s := range n.StreamService.Streams { m := map[pp.ID]*common.Stream{} for k := range s { if ad.ID != k { m[k] = s[k] } else { s[k].Stream.Close() } } ps[p] = m } n.StreamService.Streams = ps } }, tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) { var propalgation tools.PropalgationMessage err := json.Unmarshal(resp.Payload, &propalgation) var dt *tools.DataType if propalgation.DataType > 0 { dtt := tools.DataType(propalgation.DataType) dt = &dtt } if err == nil { switch propalgation.Action { case tools.PB_CREATE: case tools.PB_UPDATE: case tools.PB_DELETE: n.StreamService.ToPartnerPublishEvent( context.Background(), propalgation.Action, dt, resp.User, propalgation.Payload, ) case tools.PB_SEARCH: m := map[string]interface{}{} json.Unmarshal(propalgation.Payload, &m) n.PubSubService.SearchPublishEvent( context.Background(), dt, fmt.Sprintf("%v", m["type"]), resp.User, fmt.Sprintf("%v", m["search"]), ) } } }, }) }