Closure On change of state
This commit is contained in:
@@ -4,12 +4,51 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"oc-discovery/daemons/node/common"
|
||||||
|
|
||||||
|
oclib "cloud.o-forge.io/core/oc-lib"
|
||||||
|
"cloud.o-forge.io/core/oc-lib/config"
|
||||||
|
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||||
"cloud.o-forge.io/core/oc-lib/tools"
|
"cloud.o-forge.io/core/oc-lib/tools"
|
||||||
|
pp "github.com/libp2p/go-libp2p/core/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ListenNATS(n Node) {
|
func ListenNATS(n Node) {
|
||||||
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
|
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
|
||||||
|
tools.CREATE_PEER: func(resp tools.NATSResponse) {
|
||||||
|
if resp.FromApp == config.GetAppName() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logger := oclib.GetLogger()
|
||||||
|
m := map[string]interface{}{}
|
||||||
|
err := json.Unmarshal(resp.Payload, &m)
|
||||||
|
if err != nil {
|
||||||
|
logger.Err(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p := &peer.Peer{}
|
||||||
|
p = p.Deserialize(m, p).(*peer.Peer)
|
||||||
|
|
||||||
|
ad, err := pp.AddrInfoFromString(p.PeerID)
|
||||||
|
n.StreamService.Mu.Lock()
|
||||||
|
defer n.StreamService.Mu.Unlock()
|
||||||
|
|
||||||
|
if p.Relation == peer.PARTNER {
|
||||||
|
n.StreamService.ConnectToPartner(ad.ID, ad)
|
||||||
|
} else {
|
||||||
|
ps := common.ProtocolStream{}
|
||||||
|
for p, s := range n.StreamService.Streams {
|
||||||
|
m := map[pp.ID]*common.Stream{}
|
||||||
|
for k := range s {
|
||||||
|
if ad.ID != k {
|
||||||
|
m[k] = s[k]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ps[p] = m
|
||||||
|
}
|
||||||
|
n.StreamService.Streams = ps
|
||||||
|
}
|
||||||
|
},
|
||||||
tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) {
|
tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) {
|
||||||
var propalgation tools.PropalgationMessage
|
var propalgation tools.PropalgationMessage
|
||||||
err := json.Unmarshal(resp.Payload, &propalgation)
|
err := json.Unmarshal(resp.Payload, &propalgation)
|
||||||
|
|||||||
@@ -91,8 +91,8 @@ func (ps *StreamService) ToPartnerPublishEvent(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
ps.mu.Lock()
|
ps.Mu.Lock()
|
||||||
defer ps.mu.Unlock()
|
defer ps.Mu.Unlock()
|
||||||
if p.Relation == peer.PARTNER {
|
if p.Relation == peer.PARTNER {
|
||||||
if ps.Streams[ProtocolHeartbeatPartner] == nil {
|
if ps.Streams[ProtocolHeartbeatPartner] == nil {
|
||||||
ps.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{}
|
ps.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{}
|
||||||
@@ -138,8 +138,8 @@ func (s *StreamService) write(
|
|||||||
if dt != nil {
|
if dt != nil {
|
||||||
name = action.String() + "." + (*dt).String() + "#" + peerID.ID.String()
|
name = action.String() + "." + (*dt).String() + "#" + peerID.ID.String()
|
||||||
}
|
}
|
||||||
s.mu.Lock()
|
s.Mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.Mu.Unlock()
|
||||||
if s.Streams[proto] == nil {
|
if s.Streams[proto] == nil {
|
||||||
s.Streams[proto] = map[pp.ID]*common.Stream{}
|
s.Streams[proto] = map[pp.ID]*common.Stream{}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ type StreamService struct {
|
|||||||
Node common.DiscoveryPeer
|
Node common.DiscoveryPeer
|
||||||
Streams common.ProtocolStream
|
Streams common.ProtocolStream
|
||||||
maxNodesConn int
|
maxNodesConn int
|
||||||
mu sync.Mutex
|
Mu sync.Mutex
|
||||||
// Stream map[protocol.ID]map[pp.ID]*daemons.Stream
|
// Stream map[protocol.ID]map[pp.ID]*daemons.Stream
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -67,8 +67,8 @@ func (s *StreamService) HandlePartnerHeartbeat(stream network.Stream) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.mu.Lock()
|
s.Mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.Mu.Unlock()
|
||||||
|
|
||||||
if s.Streams[ProtocolHeartbeatPartner] == nil {
|
if s.Streams[ProtocolHeartbeatPartner] == nil {
|
||||||
s.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{}
|
s.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{}
|
||||||
@@ -197,8 +197,8 @@ func (s *StreamService) StartGC(interval time.Duration) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *StreamService) gc() {
|
func (s *StreamService) gc() {
|
||||||
s.mu.Lock()
|
s.Mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.Mu.Unlock()
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
|
|
||||||
if s.Streams[ProtocolHeartbeatPartner] == nil {
|
if s.Streams[ProtocolHeartbeatPartner] == nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user