diff --git a/daemons/node/nats.go b/daemons/node/nats.go index 1f0bb30..8483570 100644 --- a/daemons/node/nats.go +++ b/daemons/node/nats.go @@ -4,12 +4,51 @@ import ( "context" "encoding/json" "fmt" + "oc-discovery/daemons/node/common" + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/config" + "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/tools" + pp "github.com/libp2p/go-libp2p/core/peer" ) func ListenNATS(n Node) { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ + tools.CREATE_PEER: func(resp tools.NATSResponse) { + if resp.FromApp == config.GetAppName() { + return + } + logger := oclib.GetLogger() + m := map[string]interface{}{} + err := json.Unmarshal(resp.Payload, &m) + if err != nil { + logger.Err(err) + return + } + p := &peer.Peer{} + p = p.Deserialize(m, p).(*peer.Peer) + + ad, err := pp.AddrInfoFromString(p.PeerID) + n.StreamService.Mu.Lock() + defer n.StreamService.Mu.Unlock() + + if p.Relation == peer.PARTNER { + n.StreamService.ConnectToPartner(ad.ID, ad) + } else { + ps := common.ProtocolStream{} + for p, s := range n.StreamService.Streams { + m := map[pp.ID]*common.Stream{} + for k := range s { + if ad.ID != k { + m[k] = s[k] + } + } + ps[p] = m + } + n.StreamService.Streams = ps + } + }, tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) { var propalgation tools.PropalgationMessage err := json.Unmarshal(resp.Payload, &propalgation) diff --git a/daemons/node/stream/publish.go b/daemons/node/stream/publish.go index d3b88dd..a588aa9 100644 --- a/daemons/node/stream/publish.go +++ b/daemons/node/stream/publish.go @@ -91,8 +91,8 @@ func (ps *StreamService) ToPartnerPublishEvent( if err != nil { return err } - ps.mu.Lock() - defer ps.mu.Unlock() + ps.Mu.Lock() + defer ps.Mu.Unlock() if p.Relation == peer.PARTNER { if ps.Streams[ProtocolHeartbeatPartner] == nil { ps.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{} @@ -138,8 +138,8 @@ func (s *StreamService) write( if dt != nil { name = action.String() + "." + (*dt).String() + "#" + peerID.ID.String() } - s.mu.Lock() - defer s.mu.Unlock() + s.Mu.Lock() + defer s.Mu.Unlock() if s.Streams[proto] == nil { s.Streams[proto] = map[pp.ID]*common.Stream{} } diff --git a/daemons/node/stream/service.go b/daemons/node/stream/service.go index 83a351a..56ecd12 100644 --- a/daemons/node/stream/service.go +++ b/daemons/node/stream/service.go @@ -41,7 +41,7 @@ type StreamService struct { Node common.DiscoveryPeer Streams common.ProtocolStream maxNodesConn int - mu sync.Mutex + Mu sync.Mutex // Stream map[protocol.ID]map[pp.ID]*daemons.Stream } @@ -67,8 +67,8 @@ func (s *StreamService) HandlePartnerHeartbeat(stream network.Stream) { if err != nil { return } - s.mu.Lock() - defer s.mu.Unlock() + s.Mu.Lock() + defer s.Mu.Unlock() if s.Streams[ProtocolHeartbeatPartner] == nil { s.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{} @@ -197,8 +197,8 @@ func (s *StreamService) StartGC(interval time.Duration) { } func (s *StreamService) gc() { - s.mu.Lock() - defer s.mu.Unlock() + s.Mu.Lock() + defer s.Mu.Unlock() now := time.Now().UTC() if s.Streams[ProtocolHeartbeatPartner] == nil {