Connectivity ok
This commit is contained in:
@@ -159,20 +159,16 @@ func (s *LongLivedPubSubService) SubscribeToSearch(ps *pubsub.PubSub, f *func(co
|
|||||||
func SubscribeEvents[T interface{}](s *LongLivedPubSubService,
|
func SubscribeEvents[T interface{}](s *LongLivedPubSubService,
|
||||||
ctx context.Context, proto string, timeout int, f func(context.Context, T, string),
|
ctx context.Context, proto string, timeout int, f func(context.Context, T, string),
|
||||||
) error {
|
) error {
|
||||||
s.PubsubMu.Lock()
|
|
||||||
if s.LongLivedPubSubs[proto] == nil {
|
if s.LongLivedPubSubs[proto] == nil {
|
||||||
s.PubsubMu.Unlock()
|
|
||||||
return errors.New("no protocol subscribed in pubsub")
|
return errors.New("no protocol subscribed in pubsub")
|
||||||
}
|
}
|
||||||
topic := s.LongLivedPubSubs[proto]
|
topic := s.LongLivedPubSubs[proto]
|
||||||
s.PubsubMu.Unlock()
|
|
||||||
|
|
||||||
sub, err := topic.Subscribe() // then subscribe to it
|
sub, err := topic.Subscribe() // then subscribe to it
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// launch loop waiting for results.
|
// launch loop waiting for results.
|
||||||
go waitResults[T](s, ctx, sub, proto, timeout, f)
|
go waitResults(s, ctx, sub, proto, timeout, f)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -207,10 +203,5 @@ func waitResults[T interface{}](s *LongLivedPubSubService, ctx context.Context,
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
f(ctx, evt, fmt.Sprintf("%v", proto))
|
f(ctx, evt, fmt.Sprintf("%v", proto))
|
||||||
/*if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 {
|
|
||||||
if err := ps.processEvent(ctx, p[0], &evt, topicName); err != nil {
|
|
||||||
logger.Err(err)
|
|
||||||
}
|
|
||||||
}*/
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -235,7 +235,6 @@ var StreamIndexers ProtocolStream = ProtocolStream{}
|
|||||||
|
|
||||||
func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) {
|
func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) {
|
||||||
logger := oclib.GetLogger()
|
logger := oclib.GetLogger()
|
||||||
ctx := context.Background()
|
|
||||||
addresses := strings.Split(conf.GetConfig().IndexerAddresses, ",")
|
addresses := strings.Split(conf.GetConfig().IndexerAddresses, ",")
|
||||||
|
|
||||||
if len(addresses) > maxIndexer {
|
if len(addresses) > maxIndexer {
|
||||||
@@ -243,13 +242,17 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID)
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, indexerAddr := range addresses {
|
for _, indexerAddr := range addresses {
|
||||||
|
fmt.Println("GENERATE ADDR", indexerAddr)
|
||||||
ad, err := pp.AddrInfoFromString(indexerAddr)
|
ad, err := pp.AddrInfoFromString(indexerAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
fmt.Println("ADDR ERR", err)
|
||||||
logger.Err(err)
|
logger.Err(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
force := false
|
||||||
if h.Network().Connectedness(ad.ID) != network.Connected {
|
if h.Network().Connectedness(ad.ID) != network.Connected {
|
||||||
if err := h.Connect(ctx, *ad); err != nil {
|
force = true
|
||||||
|
if err := h.Connect(context.Background(), *ad); err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
logger.Err(err)
|
logger.Err(err)
|
||||||
continue
|
continue
|
||||||
@@ -258,7 +261,7 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID)
|
|||||||
StaticIndexers = append(StaticIndexers, ad)
|
StaticIndexers = append(StaticIndexers, ad)
|
||||||
// make a privilege streams with indexer.
|
// make a privilege streams with indexer.
|
||||||
for _, proto := range []protocol.ID{ProtocolPublish, ProtocolGet, ProtocolHeartbeat} {
|
for _, proto := range []protocol.ID{ProtocolPublish, ProtocolGet, ProtocolHeartbeat} {
|
||||||
AddStreamProtocol(nil, StreamIndexers, h, proto, ad.ID, myPID, true, nil)
|
AddStreamProtocol(nil, StreamIndexers, h, proto, ad.ID, myPID, force, nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(StaticIndexers) == 0 {
|
if len(StaticIndexers) == 0 {
|
||||||
@@ -268,7 +271,7 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID)
|
|||||||
if len(StaticIndexers) < minIndexer {
|
if len(StaticIndexers) < minIndexer {
|
||||||
// TODO : ask for unknown indexer.
|
// TODO : ask for unknown indexer.
|
||||||
}
|
}
|
||||||
SendHeartbeat(ctx, ProtocolHeartbeat, conf.GetConfig().Name, h, StreamIndexers, StaticIndexers, 20*time.Second) // your indexer is just like a node for the next indexer.
|
SendHeartbeat(context.Background(), ProtocolHeartbeat, conf.GetConfig().Name, h, StreamIndexers, StaticIndexers, 20*time.Second) // your indexer is just like a node for the next indexer.
|
||||||
}
|
}
|
||||||
|
|
||||||
func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, force bool, onStreamCreated *func(network.Stream)) ProtocolStream {
|
func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, force bool, onStreamCreated *func(network.Stream)) ProtocolStream {
|
||||||
@@ -294,7 +297,7 @@ func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host,
|
|||||||
if protoS[proto][id] != nil {
|
if protoS[proto][id] != nil {
|
||||||
protoS[proto][id].Expiry = time.Now().Add(2 * time.Minute)
|
protoS[proto][id].Expiry = time.Now().Add(2 * time.Minute)
|
||||||
} else {
|
} else {
|
||||||
fmt.Println("GENERATE STREAM", proto, id)
|
fmt.Println("NEW STREAM", proto, id)
|
||||||
s, err := h.NewStream(*ctx, id, proto)
|
s, err := h.NewStream(*ctx, id, proto)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err.Error())
|
panic(err.Error())
|
||||||
@@ -385,18 +388,3 @@ func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.Ad
|
|||||||
pss.Expiry = time.Now().UTC().Add(2 * time.Minute)
|
pss.Expiry = time.Now().UTC().Add(2 * time.Minute)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
func SearchPeer(search string) ([]*peer.Peer, error) {
|
|
||||||
ps := []*peer.Peer{}
|
|
||||||
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
|
||||||
peers := access.Search(nil, search, false)
|
|
||||||
if len(peers.Data) == 0 {
|
|
||||||
return ps, errors.New("no self available")
|
|
||||||
}
|
|
||||||
for _, p := range peers.Data {
|
|
||||||
ps = append(ps, p.(*peer.Peer))
|
|
||||||
}
|
|
||||||
return ps, nil
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|||||||
@@ -128,6 +128,7 @@ type GetResponse struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ix *IndexerService) initNodeHandler() {
|
func (ix *IndexerService) initNodeHandler() {
|
||||||
|
fmt.Println("Node activity")
|
||||||
ix.Host.SetStreamHandler(common.ProtocolHeartbeat, ix.HandleNodeHeartbeat)
|
ix.Host.SetStreamHandler(common.ProtocolHeartbeat, ix.HandleNodeHeartbeat)
|
||||||
ix.Host.SetStreamHandler(common.ProtocolPublish, ix.handleNodePublish)
|
ix.Host.SetStreamHandler(common.ProtocolPublish, ix.handleNodePublish)
|
||||||
ix.Host.SetStreamHandler(common.ProtocolGet, ix.handleNodeGet)
|
ix.Host.SetStreamHandler(common.ProtocolGet, ix.handleNodeGet)
|
||||||
|
|||||||
@@ -89,6 +89,20 @@ func (s *StreamService) HandlePartnerHeartbeat(stream network.Stream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *StreamService) connectToPartners() error {
|
func (s *StreamService) connectToPartners() error {
|
||||||
|
for _, proto := range protocols {
|
||||||
|
f := func(ss network.Stream) {
|
||||||
|
if s.Streams[proto] == nil {
|
||||||
|
s.Streams[proto] = map[pp.ID]*common.Stream{}
|
||||||
|
}
|
||||||
|
s.Streams[proto][ss.Conn().RemotePeer()] = &common.Stream{
|
||||||
|
Stream: ss,
|
||||||
|
Expiry: time.Now().UTC().Add(2 * time.Minute),
|
||||||
|
}
|
||||||
|
go s.readLoop(s.Streams[proto][ss.Conn().RemotePeer()])
|
||||||
|
}
|
||||||
|
fmt.Println("SetStreamHandler", proto)
|
||||||
|
s.Host.SetStreamHandler(proto, f)
|
||||||
|
}
|
||||||
peers, err := s.searchPeer(fmt.Sprintf("%v", peer.PARTNER.EnumIndex()))
|
peers, err := s.searchPeer(fmt.Sprintf("%v", peer.PARTNER.EnumIndex()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -105,26 +119,13 @@ func (s *StreamService) connectToPartners() error {
|
|||||||
s.ConnectToPartner(pid, ad)
|
s.ConnectToPartner(pid, ad)
|
||||||
// heartbeat your partner.
|
// heartbeat your partner.
|
||||||
}
|
}
|
||||||
for _, proto := range protocols {
|
|
||||||
f := func(ss network.Stream) {
|
|
||||||
if s.Streams[proto] == nil {
|
|
||||||
s.Streams[proto] = map[pp.ID]*common.Stream{}
|
|
||||||
}
|
|
||||||
s.Streams[proto][ss.Conn().RemotePeer()] = &common.Stream{
|
|
||||||
Stream: ss,
|
|
||||||
Expiry: time.Now().UTC().Add(2 * time.Minute),
|
|
||||||
}
|
|
||||||
s.readLoop(s.Streams[proto][ss.Conn().RemotePeer()])
|
|
||||||
}
|
|
||||||
fmt.Println("SetStreamHandler", proto)
|
|
||||||
s.Host.SetStreamHandler(proto, f)
|
|
||||||
}
|
|
||||||
// TODO if handle... from partner then HeartBeat back
|
// TODO if handle... from partner then HeartBeat back
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StreamService) ConnectToPartner(pid pp.ID, ad *pp.AddrInfo) {
|
func (s *StreamService) ConnectToPartner(pid pp.ID, ad *pp.AddrInfo) {
|
||||||
logger := oclib.GetLogger()
|
logger := oclib.GetLogger()
|
||||||
|
force := false
|
||||||
for _, proto := range protocols {
|
for _, proto := range protocols {
|
||||||
f := func(ss network.Stream) {
|
f := func(ss network.Stream) {
|
||||||
if s.Streams[proto] == nil {
|
if s.Streams[proto] == nil {
|
||||||
@@ -134,15 +135,16 @@ func (s *StreamService) ConnectToPartner(pid pp.ID, ad *pp.AddrInfo) {
|
|||||||
Stream: ss,
|
Stream: ss,
|
||||||
Expiry: time.Now().UTC().Add(2 * time.Minute),
|
Expiry: time.Now().UTC().Add(2 * time.Minute),
|
||||||
}
|
}
|
||||||
s.readLoop(s.Streams[proto][pid])
|
go s.readLoop(s.Streams[proto][pid])
|
||||||
}
|
}
|
||||||
if s.Host.Network().Connectedness(ad.ID) != network.Connected {
|
if s.Host.Network().Connectedness(ad.ID) != network.Connected {
|
||||||
|
force = true
|
||||||
if err := s.Host.Connect(context.Background(), *ad); err != nil {
|
if err := s.Host.Connect(context.Background(), *ad); err != nil {
|
||||||
logger.Err(err)
|
logger.Err(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.Streams = common.AddStreamProtocol(nil, s.Streams, s.Host, proto, pid, s.Key, false, &f)
|
s.Streams = common.AddStreamProtocol(nil, s.Streams, s.Host, proto, pid, s.Key, force, &f)
|
||||||
}
|
}
|
||||||
common.SendHeartbeat(context.Background(), ProtocolHeartbeatPartner, conf.GetConfig().Name,
|
common.SendHeartbeat(context.Background(), ProtocolHeartbeatPartner, conf.GetConfig().Name,
|
||||||
s.Host, s.Streams, []*pp.AddrInfo{ad}, 20*time.Second)
|
s.Host, s.Streams, []*pp.AddrInfo{ad}, 20*time.Second)
|
||||||
@@ -153,14 +155,15 @@ func (s *StreamService) searchPeer(search string) ([]*peer.Peer, error) {
|
|||||||
ps := []*peer.Peer{}
|
ps := []*peer.Peer{}
|
||||||
if conf.GetConfig().PeerIDS != "" {
|
if conf.GetConfig().PeerIDS != "" {
|
||||||
for _, peerID := range strings.Split(conf.GetConfig().PeerIDS, ",") {
|
for _, peerID := range strings.Split(conf.GetConfig().PeerIDS, ",") {
|
||||||
ppID := strings.Split(peerID, ":")
|
ppID := strings.Split(peerID, "/")
|
||||||
|
fmt.Println(ppID, peerID)
|
||||||
ps = append(ps, &peer.Peer{
|
ps = append(ps, &peer.Peer{
|
||||||
AbstractObject: utils.AbstractObject{
|
AbstractObject: utils.AbstractObject{
|
||||||
UUID: uuid.New().String(),
|
UUID: uuid.New().String(),
|
||||||
Name: ppID[1],
|
Name: ppID[1],
|
||||||
},
|
},
|
||||||
PeerID: ppID[1],
|
PeerID: ppID[len(ppID)-1],
|
||||||
StreamAddress: "/ip4/127.0.0.1/tcp/" + ppID[0] + "/p2p/" + ppID[1],
|
StreamAddress: peerID,
|
||||||
State: peer.ONLINE,
|
State: peer.ONLINE,
|
||||||
Relation: peer.PARTNER,
|
Relation: peer.PARTNER,
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -4,5 +4,5 @@
|
|||||||
"NATS_URL": "nats://nats:4222",
|
"NATS_URL": "nats://nats:4222",
|
||||||
"NODE_MODE": "indexer",
|
"NODE_MODE": "indexer",
|
||||||
"NODE_ENDPOINT_PORT": 4002,
|
"NODE_ENDPOINT_PORT": 4002,
|
||||||
"INDEXER_ADDRESSES": "/ip4/oc-discovery1/tcp/4001/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu"
|
"INDEXER_ADDRESSES": "/ip4/172.19.0.2/tcp/4001/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu"
|
||||||
}
|
}
|
||||||
@@ -4,5 +4,5 @@
|
|||||||
"NATS_URL": "nats://nats:4222",
|
"NATS_URL": "nats://nats:4222",
|
||||||
"NODE_MODE": "node",
|
"NODE_MODE": "node",
|
||||||
"NODE_ENDPOINT_PORT": 4003,
|
"NODE_ENDPOINT_PORT": 4003,
|
||||||
"INDEXER_ADDRESSES": "/ip4/oc-discovery2/tcp/4002/p2p/12D3KooWC3GNStak8KCYtJq11Dxiq45EJV53z1ZvKetMcZBeBX6u"
|
"INDEXER_ADDRESSES": "/ip4/172.19.0.3/tcp/4002/p2p/12D3KooWC3GNStak8KCYtJq11Dxiq45EJV53z1ZvKetMcZBeBX6u"
|
||||||
}
|
}
|
||||||
@@ -4,6 +4,6 @@
|
|||||||
"NATS_URL": "nats://nats:4222",
|
"NATS_URL": "nats://nats:4222",
|
||||||
"NODE_MODE": "node",
|
"NODE_MODE": "node",
|
||||||
"NODE_ENDPOINT_PORT": 4004,
|
"NODE_ENDPOINT_PORT": 4004,
|
||||||
"INDEXER_ADDRESSES": "/ip4/oc-discovery1/tcp/4001/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu",
|
"INDEXER_ADDRESSES": "/ip4/172.19.0.2/tcp/4001/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu",
|
||||||
"PEER_IDS": "/ip4/oc-discovery3/tcp/4003/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu"
|
"PEER_IDS": "/ip4/172.19.0.4/tcp/4003/p2p/12D3KooWBh9kZrekBAE5G33q4jCLNRAzygem3gP1mMdK8mhoCTaw"
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user