Debug Spread Get Peer

This commit is contained in:
mr
2026-02-04 11:35:19 +01:00
parent 1ebbb54dd1
commit 3bc01c3a04
15 changed files with 256 additions and 224 deletions

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
@@ -87,9 +88,11 @@ func (event *Event) Verify(p *peer.Peer) error {
}
type TopicNodeActivityPub struct {
DID string
PeerID string
NodeActivity peer.PeerState
Disposer pp.AddrInfo `json:"disposer_address"`
Name string `json:"name"`
DID string `json:"did"` // real PEER ID
PeerID string `json:"peer_id"`
}
type LongLivedPubSubService struct {
@@ -119,7 +122,7 @@ func (s *LongLivedPubSubService) processEvent(
const TopicPubSubNodeActivity = "oc-node-activity"
const TopicPubSubSearch = "oc-node-search"
func (s *LongLivedPubSubService) SubscribeToNodeActivity(ps *pubsub.PubSub) error {
func (s *LongLivedPubSubService) SubscribeToNodeActivity(ps *pubsub.PubSub, f *func(context.Context, TopicNodeActivityPub, string)) error {
ps.RegisterTopicValidator(TopicPubSubNodeActivity, func(ctx context.Context, p pp.ID, m *pubsub.Message) bool {
return true
})
@@ -130,10 +133,13 @@ func (s *LongLivedPubSubService) SubscribeToNodeActivity(ps *pubsub.PubSub) erro
defer s.PubsubMu.Unlock()
s.LongLivedPubSubs[TopicPubSubNodeActivity] = topic
}
if f != nil {
return SubscribeEvents(s, context.Background(), TopicPubSubNodeActivity, -1, *f)
}
return nil
}
func (s *LongLivedPubSubService) SubscribeToSearch(ps *pubsub.PubSub) error {
func (s *LongLivedPubSubService) SubscribeToSearch(ps *pubsub.PubSub, f *func(context.Context, Event, string)) error {
ps.RegisterTopicValidator(TopicPubSubSearch, func(ctx context.Context, p pp.ID, m *pubsub.Message) bool {
return true
})
@@ -144,5 +150,67 @@ func (s *LongLivedPubSubService) SubscribeToSearch(ps *pubsub.PubSub) error {
defer s.PubsubMu.Unlock()
s.LongLivedPubSubs[TopicPubSubSearch] = topic
}
if f != nil {
return SubscribeEvents(s, context.Background(), TopicPubSubSearch, -1, *f)
}
return nil
}
func SubscribeEvents[T interface{}](s *LongLivedPubSubService,
ctx context.Context, proto string, timeout int, f func(context.Context, T, string),
) error {
s.PubsubMu.Lock()
if s.LongLivedPubSubs[proto] == nil {
s.PubsubMu.Unlock()
return errors.New("no protocol subscribed in pubsub")
}
topic := s.LongLivedPubSubs[proto]
s.PubsubMu.Unlock()
sub, err := topic.Subscribe() // then subscribe to it
if err != nil {
return err
}
// launch loop waiting for results.
go waitResults[T](s, ctx, sub, proto, timeout, f)
return nil
}
func waitResults[T interface{}](s *LongLivedPubSubService, ctx context.Context, sub *pubsub.Subscription, proto string, timeout int, f func(context.Context, T, string)) {
defer ctx.Done()
for {
s.PubsubMu.Lock() // check safely if cache is actually notified subscribed to topic
if s.LongLivedPubSubs[proto] == nil { // if not kill the loop.
break
}
s.PubsubMu.Unlock()
// if still subscribed -> wait for new message
var cancel context.CancelFunc
if timeout != -1 {
ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Second)
defer cancel()
}
msg, err := sub.Next(ctx)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
// timeout hit, no message before deadline kill subsciption.
s.PubsubMu.Lock()
delete(s.LongLivedPubSubs, proto)
s.PubsubMu.Unlock()
return
}
continue
}
var evt T
if err := json.Unmarshal(msg.Data, &evt); err != nil { // map to event
continue
}
f(ctx, evt, fmt.Sprintf("%v", proto))
/*if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 {
if err := ps.processEvent(ctx, p[0], &evt, topicName); err != nil {
logger.Err(err)
}
}*/
}
}

View File

@@ -68,18 +68,22 @@ func (ix *LongLivedStreamRecordedService[T]) gc() {
}
ix.PubsubMu.Lock()
if ix.LongLivedPubSubs[TopicPubSubNodeActivity] != nil {
if b, err := json.Marshal(TopicNodeActivityPub{
DID: rec.HeartbeatStream.DID,
PeerID: pid.String(),
NodeActivity: peer.OFFLINE,
}); err == nil {
ix.LongLivedPubSubs[TopicPubSubNodeActivity].Publish(context.Background(), b)
ad, err := pp.AddrInfoFromString("/ip4/" + conf.GetConfig().Hostname + " /tcp/" + fmt.Sprintf("%v", conf.GetConfig().NodeEndpointPort) + " /p2p/" + ix.Host.ID().String())
if err == nil {
if b, err := json.Marshal(TopicNodeActivityPub{
Disposer: *ad,
Name: rec.HeartbeatStream.Name,
DID: rec.HeartbeatStream.DID,
PeerID: pid.String(),
NodeActivity: peer.OFFLINE,
}); err == nil {
ix.LongLivedPubSubs[TopicPubSubNodeActivity].Publish(context.Background(), b)
}
}
}
ix.PubsubMu.Unlock()
}
}
}
func (ix *LongLivedStreamRecordedService[T]) Snapshot(interval time.Duration) {
@@ -150,6 +154,7 @@ func CheckHeartbeat(h host.Host, s network.Stream, maxNodes int) (*pp.ID, *Heart
}
pid, err := pp.Decode(hb.PeerID)
hb.Stream = &Stream{
Name: hb.Name,
DID: hb.DID,
Stream: s,
Expiry: time.Now().UTC().Add(2 * time.Minute),
@@ -166,6 +171,7 @@ type StreamRecord[T interface{}] struct {
}
type Stream struct {
Name string `json:"name"`
DID string `json:"did"`
Stream network.Stream
Expiry time.Time `json:"expiry"`
@@ -261,7 +267,7 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID)
if len(StaticIndexers) < minIndexer {
// TODO : ask for unknown indexer.
}
SendHeartbeat(ctx, ProtocolHeartbeat, h, StreamIndexers, StaticIndexers, 20*time.Second) // your indexer is just like a node for the next indexer.
SendHeartbeat(ctx, ProtocolHeartbeat, conf.GetConfig().Name, h, StreamIndexers, StaticIndexers, 20*time.Second) // your indexer is just like a node for the next indexer.
}
func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, force bool, onStreamCreated *func(network.Stream)) ProtocolStream {
@@ -299,6 +305,7 @@ func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host,
}
type Heartbeat struct {
Name string `json:"name"`
Stream *Stream `json:"stream"`
DID string `json:"did"`
PeerID string `json:"peer_id"`
@@ -311,7 +318,7 @@ type HeartbeatInfo []struct {
const ProtocolHeartbeat = "/opencloud/heartbeat/1.0"
func SendHeartbeat(ctx context.Context, proto protocol.ID, h host.Host, ps ProtocolStream, peers []*pp.AddrInfo, interval time.Duration) {
func SendHeartbeat(ctx context.Context, proto protocol.ID, name string, h host.Host, ps ProtocolStream, peers []*pp.AddrInfo, interval time.Duration) {
peerID, err := oclib.GenerateNodeID()
if err == nil {
panic("can't heartbeat daemon failed to start")
@@ -323,6 +330,7 @@ func SendHeartbeat(ctx context.Context, proto protocol.ID, h host.Host, ps Proto
select {
case <-t.C:
hb := Heartbeat{
Name: name,
DID: peerID,
PeerID: h.ID().String(),
Timestamp: time.Now().UTC().Unix(),

View File

@@ -7,5 +7,5 @@ import (
)
type DiscoveryPeer interface {
GetPeerRecord(ctx context.Context, key string) (*peer.Peer, error)
GetPeerRecord(ctx context.Context, key string) ([]*peer.Peer, error)
}