From 552bb17e2b7e5a77c029aeae7d261bc5ad0a57f1 Mon Sep 17 00:00:00 2001 From: mr Date: Thu, 5 Feb 2026 11:23:11 +0100 Subject: [PATCH] Connectivity ok --- daemons/node/common/common_pubsub.go | 11 +------- daemons/node/common/common_stream.go | 28 ++++++------------- daemons/node/indexer/handler.go | 1 + daemons/node/stream/service.go | 41 +++++++++++++++------------- docker_discovery2.json | 2 +- docker_discovery3.json | 2 +- docker_discovery4.json | 6 ++-- 7 files changed, 37 insertions(+), 54 deletions(-) diff --git a/daemons/node/common/common_pubsub.go b/daemons/node/common/common_pubsub.go index 1eb5dac..807c653 100644 --- a/daemons/node/common/common_pubsub.go +++ b/daemons/node/common/common_pubsub.go @@ -159,20 +159,16 @@ func (s *LongLivedPubSubService) SubscribeToSearch(ps *pubsub.PubSub, f *func(co func SubscribeEvents[T interface{}](s *LongLivedPubSubService, ctx context.Context, proto string, timeout int, f func(context.Context, T, string), ) error { - s.PubsubMu.Lock() if s.LongLivedPubSubs[proto] == nil { - s.PubsubMu.Unlock() return errors.New("no protocol subscribed in pubsub") } topic := s.LongLivedPubSubs[proto] - s.PubsubMu.Unlock() - sub, err := topic.Subscribe() // then subscribe to it if err != nil { return err } // launch loop waiting for results. - go waitResults[T](s, ctx, sub, proto, timeout, f) + go waitResults(s, ctx, sub, proto, timeout, f) return nil } @@ -207,10 +203,5 @@ func waitResults[T interface{}](s *LongLivedPubSubService, ctx context.Context, continue } f(ctx, evt, fmt.Sprintf("%v", proto)) - /*if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 { - if err := ps.processEvent(ctx, p[0], &evt, topicName); err != nil { - logger.Err(err) - } - }*/ } } diff --git a/daemons/node/common/common_stream.go b/daemons/node/common/common_stream.go index fb670a1..6cc6f1e 100644 --- a/daemons/node/common/common_stream.go +++ b/daemons/node/common/common_stream.go @@ -235,7 +235,6 @@ var StreamIndexers ProtocolStream = ProtocolStream{} func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) { logger := oclib.GetLogger() - ctx := context.Background() addresses := strings.Split(conf.GetConfig().IndexerAddresses, ",") if len(addresses) > maxIndexer { @@ -243,13 +242,17 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) } for _, indexerAddr := range addresses { + fmt.Println("GENERATE ADDR", indexerAddr) ad, err := pp.AddrInfoFromString(indexerAddr) if err != nil { + fmt.Println("ADDR ERR", err) logger.Err(err) continue } + force := false if h.Network().Connectedness(ad.ID) != network.Connected { - if err := h.Connect(ctx, *ad); err != nil { + force = true + if err := h.Connect(context.Background(), *ad); err != nil { fmt.Println(err) logger.Err(err) continue @@ -258,7 +261,7 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) StaticIndexers = append(StaticIndexers, ad) // make a privilege streams with indexer. for _, proto := range []protocol.ID{ProtocolPublish, ProtocolGet, ProtocolHeartbeat} { - AddStreamProtocol(nil, StreamIndexers, h, proto, ad.ID, myPID, true, nil) + AddStreamProtocol(nil, StreamIndexers, h, proto, ad.ID, myPID, force, nil) } } if len(StaticIndexers) == 0 { @@ -268,7 +271,7 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) if len(StaticIndexers) < minIndexer { // TODO : ask for unknown indexer. } - SendHeartbeat(ctx, ProtocolHeartbeat, conf.GetConfig().Name, h, StreamIndexers, StaticIndexers, 20*time.Second) // your indexer is just like a node for the next indexer. + SendHeartbeat(context.Background(), ProtocolHeartbeat, conf.GetConfig().Name, h, StreamIndexers, StaticIndexers, 20*time.Second) // your indexer is just like a node for the next indexer. } func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, force bool, onStreamCreated *func(network.Stream)) ProtocolStream { @@ -294,7 +297,7 @@ func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, if protoS[proto][id] != nil { protoS[proto][id].Expiry = time.Now().Add(2 * time.Minute) } else { - fmt.Println("GENERATE STREAM", proto, id) + fmt.Println("NEW STREAM", proto, id) s, err := h.NewStream(*ctx, id, proto) if err != nil { panic(err.Error()) @@ -385,18 +388,3 @@ func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.Ad pss.Expiry = time.Now().UTC().Add(2 * time.Minute) return nil } - -/* -func SearchPeer(search string) ([]*peer.Peer, error) { - ps := []*peer.Peer{} - access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) - peers := access.Search(nil, search, false) - if len(peers.Data) == 0 { - return ps, errors.New("no self available") - } - for _, p := range peers.Data { - ps = append(ps, p.(*peer.Peer)) - } - return ps, nil -} -*/ diff --git a/daemons/node/indexer/handler.go b/daemons/node/indexer/handler.go index 77754ce..bedd573 100644 --- a/daemons/node/indexer/handler.go +++ b/daemons/node/indexer/handler.go @@ -128,6 +128,7 @@ type GetResponse struct { } func (ix *IndexerService) initNodeHandler() { + fmt.Println("Node activity") ix.Host.SetStreamHandler(common.ProtocolHeartbeat, ix.HandleNodeHeartbeat) ix.Host.SetStreamHandler(common.ProtocolPublish, ix.handleNodePublish) ix.Host.SetStreamHandler(common.ProtocolGet, ix.handleNodeGet) diff --git a/daemons/node/stream/service.go b/daemons/node/stream/service.go index 7d22045..83a351a 100644 --- a/daemons/node/stream/service.go +++ b/daemons/node/stream/service.go @@ -89,6 +89,20 @@ func (s *StreamService) HandlePartnerHeartbeat(stream network.Stream) { } func (s *StreamService) connectToPartners() error { + for _, proto := range protocols { + f := func(ss network.Stream) { + if s.Streams[proto] == nil { + s.Streams[proto] = map[pp.ID]*common.Stream{} + } + s.Streams[proto][ss.Conn().RemotePeer()] = &common.Stream{ + Stream: ss, + Expiry: time.Now().UTC().Add(2 * time.Minute), + } + go s.readLoop(s.Streams[proto][ss.Conn().RemotePeer()]) + } + fmt.Println("SetStreamHandler", proto) + s.Host.SetStreamHandler(proto, f) + } peers, err := s.searchPeer(fmt.Sprintf("%v", peer.PARTNER.EnumIndex())) if err != nil { return err @@ -105,26 +119,13 @@ func (s *StreamService) connectToPartners() error { s.ConnectToPartner(pid, ad) // heartbeat your partner. } - for _, proto := range protocols { - f := func(ss network.Stream) { - if s.Streams[proto] == nil { - s.Streams[proto] = map[pp.ID]*common.Stream{} - } - s.Streams[proto][ss.Conn().RemotePeer()] = &common.Stream{ - Stream: ss, - Expiry: time.Now().UTC().Add(2 * time.Minute), - } - s.readLoop(s.Streams[proto][ss.Conn().RemotePeer()]) - } - fmt.Println("SetStreamHandler", proto) - s.Host.SetStreamHandler(proto, f) - } // TODO if handle... from partner then HeartBeat back return nil } func (s *StreamService) ConnectToPartner(pid pp.ID, ad *pp.AddrInfo) { logger := oclib.GetLogger() + force := false for _, proto := range protocols { f := func(ss network.Stream) { if s.Streams[proto] == nil { @@ -134,15 +135,16 @@ func (s *StreamService) ConnectToPartner(pid pp.ID, ad *pp.AddrInfo) { Stream: ss, Expiry: time.Now().UTC().Add(2 * time.Minute), } - s.readLoop(s.Streams[proto][pid]) + go s.readLoop(s.Streams[proto][pid]) } if s.Host.Network().Connectedness(ad.ID) != network.Connected { + force = true if err := s.Host.Connect(context.Background(), *ad); err != nil { logger.Err(err) continue } } - s.Streams = common.AddStreamProtocol(nil, s.Streams, s.Host, proto, pid, s.Key, false, &f) + s.Streams = common.AddStreamProtocol(nil, s.Streams, s.Host, proto, pid, s.Key, force, &f) } common.SendHeartbeat(context.Background(), ProtocolHeartbeatPartner, conf.GetConfig().Name, s.Host, s.Streams, []*pp.AddrInfo{ad}, 20*time.Second) @@ -153,14 +155,15 @@ func (s *StreamService) searchPeer(search string) ([]*peer.Peer, error) { ps := []*peer.Peer{} if conf.GetConfig().PeerIDS != "" { for _, peerID := range strings.Split(conf.GetConfig().PeerIDS, ",") { - ppID := strings.Split(peerID, ":") + ppID := strings.Split(peerID, "/") + fmt.Println(ppID, peerID) ps = append(ps, &peer.Peer{ AbstractObject: utils.AbstractObject{ UUID: uuid.New().String(), Name: ppID[1], }, - PeerID: ppID[1], - StreamAddress: "/ip4/127.0.0.1/tcp/" + ppID[0] + "/p2p/" + ppID[1], + PeerID: ppID[len(ppID)-1], + StreamAddress: peerID, State: peer.ONLINE, Relation: peer.PARTNER, }) diff --git a/docker_discovery2.json b/docker_discovery2.json index 742ff7d..0f19bfb 100644 --- a/docker_discovery2.json +++ b/docker_discovery2.json @@ -4,5 +4,5 @@ "NATS_URL": "nats://nats:4222", "NODE_MODE": "indexer", "NODE_ENDPOINT_PORT": 4002, - "INDEXER_ADDRESSES": "/ip4/oc-discovery1/tcp/4001/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu" + "INDEXER_ADDRESSES": "/ip4/172.19.0.2/tcp/4001/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu" } \ No newline at end of file diff --git a/docker_discovery3.json b/docker_discovery3.json index 7266820..de50e4a 100644 --- a/docker_discovery3.json +++ b/docker_discovery3.json @@ -4,5 +4,5 @@ "NATS_URL": "nats://nats:4222", "NODE_MODE": "node", "NODE_ENDPOINT_PORT": 4003, - "INDEXER_ADDRESSES": "/ip4/oc-discovery2/tcp/4002/p2p/12D3KooWC3GNStak8KCYtJq11Dxiq45EJV53z1ZvKetMcZBeBX6u" + "INDEXER_ADDRESSES": "/ip4/172.19.0.3/tcp/4002/p2p/12D3KooWC3GNStak8KCYtJq11Dxiq45EJV53z1ZvKetMcZBeBX6u" } \ No newline at end of file diff --git a/docker_discovery4.json b/docker_discovery4.json index fe93f09..eeb4ba9 100644 --- a/docker_discovery4.json +++ b/docker_discovery4.json @@ -4,6 +4,6 @@ "NATS_URL": "nats://nats:4222", "NODE_MODE": "node", "NODE_ENDPOINT_PORT": 4004, - "INDEXER_ADDRESSES": "/ip4/oc-discovery1/tcp/4001/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu", - "PEER_IDS": "/ip4/oc-discovery3/tcp/4003/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu" -} \ No newline at end of file + "INDEXER_ADDRESSES": "/ip4/172.19.0.2/tcp/4001/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu", + "PEER_IDS": "/ip4/172.19.0.4/tcp/4003/p2p/12D3KooWBh9kZrekBAE5G33q4jCLNRAzygem3gP1mMdK8mhoCTaw" +}