From 3bc01c3a0458420678a5878aa6eb435469f51462 Mon Sep 17 00:00:00 2001 From: mr Date: Wed, 4 Feb 2026 11:35:19 +0100 Subject: [PATCH] Debug Spread Get Peer --- Dockerfile | 60 +++++++++++----- daemons/node/common/common_pubsub.go | 76 ++++++++++++++++++-- daemons/node/common/common_stream.go | 26 ++++--- daemons/node/common/interface.go | 2 +- daemons/node/indexer/handler.go | 102 ++++++++++++++++----------- daemons/node/indexer/service.go | 22 +++++- daemons/node/node.go | 44 +++++++----- daemons/node/pubsub/handler.go | 7 +- daemons/node/pubsub/service.go | 9 +-- daemons/node/pubsub/subscribe.go | 67 ++---------------- daemons/node/stream/handler.go | 6 +- daemons/node/stream/publish.go | 2 +- daemons/node/stream/service.go | 2 +- main.go | 1 - peers.json | 54 -------------- 15 files changed, 256 insertions(+), 224 deletions(-) delete mode 100644 peers.json diff --git a/Dockerfile b/Dockerfile index d1c8211..1e89e79 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,45 +1,67 @@ +# ======================== +# Global build arguments +# ======================== +ARG CONF_NUM + +# ======================== +# Dependencies stage +# ======================== FROM golang:alpine AS deps +ARG CONF_NUM WORKDIR /app + COPY go.mod go.sum ./ RUN sed -i '/replace/d' go.mod RUN go mod download -#---------------------------------------------------------------------------------------------- - +# ======================== +# Builder stage +# ======================== FROM golang:alpine AS builder +ARG CONF_NUM -WORKDIR /app +# Fail fast if CONF_NUM missing +RUN test -n "$CONF_NUM" -RUN apk add git - -RUN go install github.com/beego/bee/v2@latest +RUN apk add --no-cache git WORKDIR /oc-discovery +# Reuse Go cache COPY --from=deps /go/pkg /go/pkg COPY --from=deps /app/go.mod /app/go.sum ./ -RUN export CGO_ENABLED=0 && \ - export GOOS=linux && \ - export GOARCH=amd64 && \ - export BUILD_FLAGS="-ldflags='-w -s'" - +# App sources COPY . . - +# Clean replace directives again (safety) RUN sed -i '/replace/d' go.mod + +# Build package +RUN go install github.com/beego/bee/v2@latest RUN bee pack -RUN mkdir -p /app/extracted && tar -zxvf oc-discovery.tar.gz -C /app/extracted -#---------------------------------------------------------------------------------------------- +# Extract bundle +RUN mkdir -p /app/extracted \ + && tar -zxvf oc-discovery.tar.gz -C /app/extracted +# ======================== +# Runtime stage +# ======================== FROM golang:alpine +ARG CONF_NUM WORKDIR /app -COPY --from=builder /app/extracted/oc-discovery /usr/bin/ -COPY --from=builder /app/extracted/swagger /app/swagger -COPY --from=builder /app/extracted/docker_discovery.json /etc/oc/discovery.json -EXPOSE 8080 +RUN mkdir ./pem -ENTRYPOINT ["oc-discovery"] +COPY --from=builder /app/extracted/pem/private${CONF_NUM}.pem ./pem/private.pem +COPY --from=builder /app/extracted/psk ./psk +COPY --from=builder /app/extracted/pem/public${CONF_NUM}.pem ./pem/public.pem + +COPY --from=builder /app/extracted/oc-discovery /usr/bin/oc-discovery +COPY --from=builder /app/extracted/docker_discovery${CONF_NUM}.json /etc/oc/discovery.json + +EXPOSE 400${CONF_NUM} + +ENTRYPOINT ["oc-discovery"] \ No newline at end of file diff --git a/daemons/node/common/common_pubsub.go b/daemons/node/common/common_pubsub.go index e060a23..1eb5dac 100644 --- a/daemons/node/common/common_pubsub.go +++ b/daemons/node/common/common_pubsub.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "sync" "time" @@ -87,9 +88,11 @@ func (event *Event) Verify(p *peer.Peer) error { } type TopicNodeActivityPub struct { - DID string - PeerID string NodeActivity peer.PeerState + Disposer pp.AddrInfo `json:"disposer_address"` + Name string `json:"name"` + DID string `json:"did"` // real PEER ID + PeerID string `json:"peer_id"` } type LongLivedPubSubService struct { @@ -119,7 +122,7 @@ func (s *LongLivedPubSubService) processEvent( const TopicPubSubNodeActivity = "oc-node-activity" const TopicPubSubSearch = "oc-node-search" -func (s *LongLivedPubSubService) SubscribeToNodeActivity(ps *pubsub.PubSub) error { +func (s *LongLivedPubSubService) SubscribeToNodeActivity(ps *pubsub.PubSub, f *func(context.Context, TopicNodeActivityPub, string)) error { ps.RegisterTopicValidator(TopicPubSubNodeActivity, func(ctx context.Context, p pp.ID, m *pubsub.Message) bool { return true }) @@ -130,10 +133,13 @@ func (s *LongLivedPubSubService) SubscribeToNodeActivity(ps *pubsub.PubSub) erro defer s.PubsubMu.Unlock() s.LongLivedPubSubs[TopicPubSubNodeActivity] = topic } + if f != nil { + return SubscribeEvents(s, context.Background(), TopicPubSubNodeActivity, -1, *f) + } return nil } -func (s *LongLivedPubSubService) SubscribeToSearch(ps *pubsub.PubSub) error { +func (s *LongLivedPubSubService) SubscribeToSearch(ps *pubsub.PubSub, f *func(context.Context, Event, string)) error { ps.RegisterTopicValidator(TopicPubSubSearch, func(ctx context.Context, p pp.ID, m *pubsub.Message) bool { return true }) @@ -144,5 +150,67 @@ func (s *LongLivedPubSubService) SubscribeToSearch(ps *pubsub.PubSub) error { defer s.PubsubMu.Unlock() s.LongLivedPubSubs[TopicPubSubSearch] = topic } + if f != nil { + return SubscribeEvents(s, context.Background(), TopicPubSubSearch, -1, *f) + } return nil } + +func SubscribeEvents[T interface{}](s *LongLivedPubSubService, + ctx context.Context, proto string, timeout int, f func(context.Context, T, string), +) error { + s.PubsubMu.Lock() + if s.LongLivedPubSubs[proto] == nil { + s.PubsubMu.Unlock() + return errors.New("no protocol subscribed in pubsub") + } + topic := s.LongLivedPubSubs[proto] + s.PubsubMu.Unlock() + + sub, err := topic.Subscribe() // then subscribe to it + if err != nil { + return err + } + // launch loop waiting for results. + go waitResults[T](s, ctx, sub, proto, timeout, f) + + return nil +} + +func waitResults[T interface{}](s *LongLivedPubSubService, ctx context.Context, sub *pubsub.Subscription, proto string, timeout int, f func(context.Context, T, string)) { + defer ctx.Done() + for { + s.PubsubMu.Lock() // check safely if cache is actually notified subscribed to topic + if s.LongLivedPubSubs[proto] == nil { // if not kill the loop. + break + } + s.PubsubMu.Unlock() + // if still subscribed -> wait for new message + var cancel context.CancelFunc + if timeout != -1 { + ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Second) + defer cancel() + } + msg, err := sub.Next(ctx) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + // timeout hit, no message before deadline kill subsciption. + s.PubsubMu.Lock() + delete(s.LongLivedPubSubs, proto) + s.PubsubMu.Unlock() + return + } + continue + } + var evt T + if err := json.Unmarshal(msg.Data, &evt); err != nil { // map to event + continue + } + f(ctx, evt, fmt.Sprintf("%v", proto)) + /*if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 { + if err := ps.processEvent(ctx, p[0], &evt, topicName); err != nil { + logger.Err(err) + } + }*/ + } +} diff --git a/daemons/node/common/common_stream.go b/daemons/node/common/common_stream.go index a96f1c5..807acc5 100644 --- a/daemons/node/common/common_stream.go +++ b/daemons/node/common/common_stream.go @@ -68,18 +68,22 @@ func (ix *LongLivedStreamRecordedService[T]) gc() { } ix.PubsubMu.Lock() if ix.LongLivedPubSubs[TopicPubSubNodeActivity] != nil { - if b, err := json.Marshal(TopicNodeActivityPub{ - DID: rec.HeartbeatStream.DID, - PeerID: pid.String(), - NodeActivity: peer.OFFLINE, - }); err == nil { - ix.LongLivedPubSubs[TopicPubSubNodeActivity].Publish(context.Background(), b) + ad, err := pp.AddrInfoFromString("/ip4/" + conf.GetConfig().Hostname + " /tcp/" + fmt.Sprintf("%v", conf.GetConfig().NodeEndpointPort) + " /p2p/" + ix.Host.ID().String()) + if err == nil { + if b, err := json.Marshal(TopicNodeActivityPub{ + Disposer: *ad, + Name: rec.HeartbeatStream.Name, + DID: rec.HeartbeatStream.DID, + PeerID: pid.String(), + NodeActivity: peer.OFFLINE, + }); err == nil { + ix.LongLivedPubSubs[TopicPubSubNodeActivity].Publish(context.Background(), b) + } } } ix.PubsubMu.Unlock() } } - } func (ix *LongLivedStreamRecordedService[T]) Snapshot(interval time.Duration) { @@ -150,6 +154,7 @@ func CheckHeartbeat(h host.Host, s network.Stream, maxNodes int) (*pp.ID, *Heart } pid, err := pp.Decode(hb.PeerID) hb.Stream = &Stream{ + Name: hb.Name, DID: hb.DID, Stream: s, Expiry: time.Now().UTC().Add(2 * time.Minute), @@ -166,6 +171,7 @@ type StreamRecord[T interface{}] struct { } type Stream struct { + Name string `json:"name"` DID string `json:"did"` Stream network.Stream Expiry time.Time `json:"expiry"` @@ -261,7 +267,7 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) if len(StaticIndexers) < minIndexer { // TODO : ask for unknown indexer. } - SendHeartbeat(ctx, ProtocolHeartbeat, h, StreamIndexers, StaticIndexers, 20*time.Second) // your indexer is just like a node for the next indexer. + SendHeartbeat(ctx, ProtocolHeartbeat, conf.GetConfig().Name, h, StreamIndexers, StaticIndexers, 20*time.Second) // your indexer is just like a node for the next indexer. } func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, force bool, onStreamCreated *func(network.Stream)) ProtocolStream { @@ -299,6 +305,7 @@ func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, } type Heartbeat struct { + Name string `json:"name"` Stream *Stream `json:"stream"` DID string `json:"did"` PeerID string `json:"peer_id"` @@ -311,7 +318,7 @@ type HeartbeatInfo []struct { const ProtocolHeartbeat = "/opencloud/heartbeat/1.0" -func SendHeartbeat(ctx context.Context, proto protocol.ID, h host.Host, ps ProtocolStream, peers []*pp.AddrInfo, interval time.Duration) { +func SendHeartbeat(ctx context.Context, proto protocol.ID, name string, h host.Host, ps ProtocolStream, peers []*pp.AddrInfo, interval time.Duration) { peerID, err := oclib.GenerateNodeID() if err == nil { panic("can't heartbeat daemon failed to start") @@ -323,6 +330,7 @@ func SendHeartbeat(ctx context.Context, proto protocol.ID, h host.Host, ps Proto select { case <-t.C: hb := Heartbeat{ + Name: name, DID: peerID, PeerID: h.ID().String(), Timestamp: time.Now().UTC().Unix(), diff --git a/daemons/node/common/interface.go b/daemons/node/common/interface.go index d197b8f..020bd7f 100644 --- a/daemons/node/common/interface.go +++ b/daemons/node/common/interface.go @@ -7,5 +7,5 @@ import ( ) type DiscoveryPeer interface { - GetPeerRecord(ctx context.Context, key string) (*peer.Peer, error) + GetPeerRecord(ctx context.Context, key string) ([]*peer.Peer, error) } diff --git a/daemons/node/indexer/handler.go b/daemons/node/indexer/handler.go index 78490ca..77754ce 100644 --- a/daemons/node/indexer/handler.go +++ b/daemons/node/indexer/handler.go @@ -1,10 +1,12 @@ package indexer import ( + "context" "encoding/base64" "encoding/json" "errors" "fmt" + "oc-discovery/conf" "oc-discovery/daemons/node/common" "time" @@ -29,7 +31,8 @@ type PeerRecord struct { Signature []byte `json:"signature"` ExpiryDate time.Time `json:"expiry_date"` - TTL int `json:"ttl"` // max of hop diffusion + TTL int `json:"ttl"` // max of hop diffusion + NoPub bool `json:"no_pub"` } func (p *PeerRecord) Sign() error { @@ -120,8 +123,8 @@ type GetValue struct { } type GetResponse struct { - Found bool `json:"found"` - Record PeerRecord `json:"record,omitempty"` + Found bool `json:"found"` + Records map[string]PeerRecord `json:"records,omitempty"` } func (ix *IndexerService) initNodeHandler() { @@ -178,7 +181,23 @@ func (ix *IndexerService) handleNodePublish(s network.Stream) { } } + if ix.LongLivedPubSubs[common.TopicPubSubNodeActivity] != nil && !rec.NoPub { + ad, err := peer.AddrInfoFromString("/ip4/" + conf.GetConfig().Hostname + " /tcp/" + fmt.Sprintf("%v", conf.GetConfig().NodeEndpointPort) + " /p2p/" + ix.Host.ID().String()) + if err == nil { + if b, err := json.Marshal(common.TopicNodeActivityPub{ + Disposer: *ad, + DID: rec.DID, + Name: rec.Name, + PeerID: pid.String(), + NodeActivity: pp.ONLINE, + }); err == nil { + ix.LongLivedPubSubs[common.TopicPubSubNodeActivity].Publish(context.Background(), b) + } + } + } + if rec.TTL > 0 { + rec.NoPub = true for _, ad := range common.StaticIndexers { if ad.ID == s.Conn().RemotePeer() { continue @@ -211,56 +230,59 @@ func (ix *IndexerService) handleNodeGet(s network.Stream) { if ix.StreamRecords[common.ProtocolGet] == nil { ix.StreamRecords[common.ProtocolGet] = map[peer.ID]*common.StreamRecord[PeerRecord]{} } + resp := GetResponse{ + Found: false, + Records: map[string]PeerRecord{}, + } streams := ix.StreamRecords[common.ProtocolPublish] // simple lookup by PeerID (or DID) for _, rec := range streams { - if rec.Record.DID == req.Key || rec.Record.PeerID == req.Key { // OK - resp := GetResponse{ - Found: true, - Record: rec.Record, + if rec.Record.DID == req.Key || rec.Record.PeerID == req.Key || rec.Record.Name == req.Key { // OK + resp.Found = true + resp.Records[rec.Record.PeerID] = rec.Record + if rec.Record.DID == req.Key || rec.Record.PeerID == req.Key { // there unique... no need to proceed more... + _ = json.NewEncoder(s).Encode(resp) + ix.StreamMU.Unlock() + return } - _ = json.NewEncoder(s).Encode(resp) - break + continue } } // if not found ask to my neighboor indexers - if common.StreamIndexers[common.ProtocolGet] == nil { - _ = json.NewEncoder(s).Encode(GetResponse{Found: false}) - ix.StreamMU.Unlock() - continue - } - for _, ad := range common.StaticIndexers { - if ad.ID == s.Conn().RemotePeer() { - continue - } - if common.StreamIndexers[common.ProtocolGet][ad.ID] == nil { - continue - } - stream := common.StreamIndexers[common.ProtocolGet][ad.ID] - if err := json.NewEncoder(stream.Stream).Encode(GetValue{Key: req.Key}); err != nil { - continue - } - - var resp GetResponse - if err := json.NewDecoder(stream.Stream).Decode(&resp); err != nil { - continue - } - if resp.Found { - for _, rec := range streams { - if rec.Record.DID == req.Key || rec.Record.PeerID == req.Key { // OK - resp := GetResponse{ - Found: true, - Record: rec.Record, + for pid, dsp := range ix.DisposedPeers { + if _, ok := resp.Records[dsp.PeerID]; !ok && (dsp.Name == req.Key || dsp.DID == req.Key || dsp.PeerID == req.Key) { + ctxTTL, err := context.WithTimeout(context.Background(), 120*time.Second) + if err != nil { + continue + } + if ix.Host.Network().Connectedness(pid) != network.Connected { + _ = ix.Host.Connect(ctxTTL, dsp.Disposer) + str, err := ix.Host.NewStream(ctxTTL, pid, common.ProtocolGet) + if err != nil { + continue + } + for { + if ctxTTL.Err() == context.DeadlineExceeded { + break + } + var subResp GetResponse + if err := json.NewDecoder(str).Decode(&resp); err != nil { + continue + } + if subResp.Found { + for k, v := range subResp.Records { + if _, ok := resp.Records[k]; !ok { + resp.Records[k] = v + } + } + break } - _ = json.NewEncoder(s).Encode(resp) - break } } - continue } } // Not found - _ = json.NewEncoder(s).Encode(GetResponse{Found: false}) + _ = json.NewEncoder(s).Encode(resp) ix.StreamMU.Unlock() } } diff --git a/daemons/node/indexer/service.go b/daemons/node/indexer/service.go index f2ad704..379bbf7 100644 --- a/daemons/node/indexer/service.go +++ b/daemons/node/indexer/service.go @@ -3,10 +3,13 @@ package indexer import ( "context" "oc-discovery/daemons/node/common" + "sync" oclib "cloud.o-forge.io/core/oc-lib" + pp "cloud.o-forge.io/core/oc-lib/models/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" ) // Index Record is the model for the specialized registry of node connected to Indexer @@ -14,6 +17,8 @@ type IndexerService struct { *common.LongLivedStreamRecordedService[PeerRecord] PS *pubsub.PubSub isStrictIndexer bool + mu sync.RWMutex + DisposedPeers map[peer.ID]*common.TopicNodeActivityPub } // if a pubsub is given... indexer is also an active oc-node. If not... your a strict indexer @@ -37,11 +42,22 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int) *IndexerServ logger.Info().Msg("connect to indexers as strict indexer...") common.ConnectToIndexers(h, 0, 5, ix.Host.ID()) // TODO : make var to change how many indexers are allowed. logger.Info().Msg("subscribe to node activity as strict indexer...") - ix.SubscribeToNodeActivity(ix.PS) // now we subscribe to a long run topic named node-activity, to relay message. + logger.Info().Msg("subscribe to decentralized search flow as strict indexer...") - ix.SubscribeToSearch(ix.PS) + ix.SubscribeToSearch(ix.PS, nil) } - ix.initNodeHandler() // then listen up on every protocol expected + f := func(ctx context.Context, evt common.TopicNodeActivityPub, _ string) { + ix.mu.Lock() + if evt.NodeActivity == pp.OFFLINE { + delete(ix.DisposedPeers, evt.Disposer.ID) + } + if evt.NodeActivity == pp.ONLINE { + ix.DisposedPeers[evt.Disposer.ID] = &evt + } + ix.mu.Unlock() + } + ix.SubscribeToNodeActivity(ix.PS, &f) // now we subscribe to a long run topic named node-activity, to relay message. + ix.initNodeHandler() // then listen up on every protocol expected return ix } diff --git a/daemons/node/node.go b/daemons/node/node.go index 615fc75..8fadd0e 100644 --- a/daemons/node/node.go +++ b/daemons/node/node.go @@ -79,7 +79,6 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) { panic(err) } logger.Info().Msg("subscribe to decentralized search flow...") - node.SubscribeToSearch(node.PS) logger.Info().Msg("run garbage collector...") node.StartGC(30 * time.Second) @@ -90,6 +89,12 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) { if node.PubSubService, err = pubsub.InitPubSub(context.Background(), node.Host, node.PS, node, node.StreamService); err != nil { panic(err) } + f := func(ctx context.Context, evt common.Event, topic string) { + if p, err := node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 { + node.StreamService.SendResponse(p[0], &evt) + } + } + node.SubscribeToSearch(node.PS, &f) } if isIndexer { logger.Info().Msg("generate opencloud indexer...") @@ -102,7 +107,7 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) { } func (d *Node) Close() { - if d.isIndexer { + if d.isIndexer && d.IndexerService != nil { d.IndexerService.Close() } d.PubSubService.Close() @@ -147,9 +152,9 @@ func (d *Node) publishPeerRecord( func (d *Node) GetPeerRecord( ctx context.Context, key string, -) (*peer.Peer, error) { +) ([]*peer.Peer, error) { var err error - var info *indexer.PeerRecord + var info map[string]indexer.PeerRecord if common.StreamIndexers[common.ProtocolPublish] == nil { return nil, errors.New("no protocol Publish is set up on the node") } @@ -162,29 +167,32 @@ func (d *Node) GetPeerRecord( return nil, err } - var resp indexer.GetResponse - if err := json.NewDecoder(stream.Stream).Decode(&resp); err != nil { - return nil, err - } - if resp.Found { - info = &resp.Record - break + for { + var resp indexer.GetResponse + if err := json.NewDecoder(stream.Stream).Decode(&resp); err != nil { + return nil, err + } + if resp.Found { + info = resp.Records + break + } } } - var p *peer.Peer - if info != nil { - if pk, err := info.Verify(); err != nil { + var ps []*peer.Peer + for _, pr := range info { + if pk, err := pr.Verify(); err != nil { return nil, err - } else if ok, p, err := info.ExtractPeer(d.PeerID.String(), key, pk); err != nil { + } else if ok, p, err := pr.ExtractPeer(d.PeerID.String(), key, pk); err != nil { return nil, err } else { if ok { - d.publishPeerRecord(info) + d.publishPeerRecord(&pr) } - return p, nil + ps = append(ps, p) } } - return p, err + + return ps, err } func (d *Node) claimInfo( diff --git a/daemons/node/pubsub/handler.go b/daemons/node/pubsub/handler.go index 60a9b77..50f4bad 100644 --- a/daemons/node/pubsub/handler.go +++ b/daemons/node/pubsub/handler.go @@ -23,14 +23,13 @@ func (ps *PubSubService) handleEventSearch( // only : on partner followings. 3 c if !(action == tools.PB_SEARCH_RESPONSE || action == tools.PB_SEARCH) { return nil } - // TODO VERIFY: FROM SHOULD BE A PEER ID OR A DID - if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil { - if err := evt.Verify(p); err != nil { + if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 { // peerFrom is Unique + if err := evt.Verify(p[0]); err != nil { return err } switch action { case tools.PB_SEARCH: // when someone ask for search. - if err := ps.StreamService.SendResponse(p, evt); err != nil { + if err := ps.StreamService.SendResponse(p[0], evt); err != nil { return err } diff --git a/daemons/node/pubsub/service.go b/daemons/node/pubsub/service.go index f3f8ed8..e99efdd 100644 --- a/daemons/node/pubsub/service.go +++ b/daemons/node/pubsub/service.go @@ -14,6 +14,7 @@ import ( ) type PubSubService struct { + *common.LongLivedPubSubService Node common.DiscoveryPeer Host host.Host PS *pubsub.PubSub @@ -24,10 +25,10 @@ type PubSubService struct { func InitPubSub(ctx context.Context, h host.Host, ps *pubsub.PubSub, node common.DiscoveryPeer, streamService *stream.StreamService) (*PubSubService, error) { service := &PubSubService{ - Node: node, - Host: h, - StreamService: streamService, - PS: ps, + LongLivedPubSubService: common.NewLongLivedPubSubService(h), + Node: node, + StreamService: streamService, + PS: ps, } logger := oclib.GetLogger() logger.Info().Msg("subscribe to events...") diff --git a/daemons/node/pubsub/subscribe.go b/daemons/node/pubsub/subscribe.go index 69b1557..3c8f3c5 100644 --- a/daemons/node/pubsub/subscribe.go +++ b/daemons/node/pubsub/subscribe.go @@ -2,16 +2,11 @@ package pubsub import ( "context" - "encoding/json" - "errors" "oc-discovery/daemons/node/common" - "slices" - "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/tools" - pubsub "github.com/libp2p/go-libp2p-pubsub" ) func (ps *PubSubService) initSubscribeEvents(ctx context.Context) error { @@ -25,72 +20,20 @@ func (ps *PubSubService) initSubscribeEvents(ctx context.Context) error { func (ps *PubSubService) subscribeEvents( ctx context.Context, dt *tools.DataType, action tools.PubSubAction, peerID string, timeout int, ) error { + logger := oclib.GetLogger() // define a name app.action#peerID name := action.String() + "#" + peerID if dt != nil { // if a datatype is precised then : app.action.datatype#peerID name = action.String() + "." + (*dt).String() + "#" + peerID } - topic, err := ps.PS.Join(name) // find out the topic - if err != nil { - return err - } - - sub, err := topic.Subscribe() // then subscribe to it - if err != nil { - return err - } - ps.mutex.Lock() // add safely in cache your subscription. - ps.Subscription = append(ps.Subscription, name) - ps.mutex.Unlock() - - // launch loop waiting for results. - go ps.waitResults(ctx, sub, name, timeout) - - return nil -} - -func (ps *PubSubService) waitResults(ctx context.Context, sub *pubsub.Subscription, topicName string, timeout int) { - logger := oclib.GetLogger() - defer ctx.Done() - for { - ps.mutex.Lock() // check safely if cache is actually notified subscribed to topic - if !slices.Contains(ps.Subscription, topicName) { // if not kill the loop. - break - } - ps.mutex.Unlock() - // if still subscribed -> wait for new message - var cancel context.CancelFunc - if timeout != -1 { - ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Second) - defer cancel() - } - msg, err := sub.Next(ctx) - if err != nil { - if errors.Is(err, context.DeadlineExceeded) { - // timeout hit, no message before deadline kill subsciption. - ps.mutex.Lock() - subs := []string{} - for _, ss := range ps.Subscription { - if ss != topicName { - subs = append(subs, ss) - } - } - ps.Subscription = subs - ps.mutex.Unlock() - return - } - continue - } - var evt common.Event - if err := json.Unmarshal(msg.Data, &evt); err != nil { // map to event - continue - } - if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil { - if err := ps.processEvent(ctx, p, &evt, topicName); err != nil { + f := func(ctx context.Context, evt common.Event, topicName string) { + if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 { + if err := ps.processEvent(ctx, p[0], &evt, topicName); err != nil { logger.Err(err) } } } + return common.SubscribeEvents(ps.LongLivedPubSubService, ctx, name, -1, f) } func (ps *PubSubService) processEvent( diff --git a/daemons/node/stream/handler.go b/daemons/node/stream/handler.go index 852d918..80b0054 100644 --- a/daemons/node/stream/handler.go +++ b/daemons/node/stream/handler.go @@ -69,8 +69,8 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, action tools. p := peers.Data[0].(*peer.Peer) // TODO : something if peer is missing in our side ! ps.SendResponse(p, evt) - } else if p, err := ps.Node.GetPeerRecord(context.Background(), evt.From); err == nil { - ps.SendResponse(p, evt) + } else if p, err := ps.Node.GetPeerRecord(context.Background(), evt.From); err == nil && len(p) > 0 { // peer from is peerID + ps.SendResponse(p[0], evt) } case tools.PB_CREATE: case tools.PB_UPDATE: @@ -115,7 +115,7 @@ func (abs *StreamService) SendResponse(p *peer.Peer, event *common.Event) error abs.PublishResources(&ndt, event.User, peerID, j) } else { abs.PublishResources(nil, event.User, peerID, j) - } // TODO : TEMP STREAM ! + } } } } diff --git a/daemons/node/stream/publish.go b/daemons/node/stream/publish.go index 0153a65..d3b88dd 100644 --- a/daemons/node/stream/publish.go +++ b/daemons/node/stream/publish.go @@ -146,7 +146,7 @@ func (s *StreamService) write( if s.Streams[proto][peerID.ID] == nil { // should create a very temp stream - ctxTTL, err := context.WithTimeout(context.Background(), 10*time.Second) + ctxTTL, err := context.WithTimeout(context.Background(), 60*time.Second) if err == nil { if isAPartner { ctxTTL = context.Background() diff --git a/daemons/node/stream/service.go b/daemons/node/stream/service.go index 9550d4c..7d22045 100644 --- a/daemons/node/stream/service.go +++ b/daemons/node/stream/service.go @@ -144,7 +144,7 @@ func (s *StreamService) ConnectToPartner(pid pp.ID, ad *pp.AddrInfo) { } s.Streams = common.AddStreamProtocol(nil, s.Streams, s.Host, proto, pid, s.Key, false, &f) } - common.SendHeartbeat(context.Background(), ProtocolHeartbeatPartner, + common.SendHeartbeat(context.Background(), ProtocolHeartbeatPartner, conf.GetConfig().Name, s.Host, s.Streams, []*pp.AddrInfo{ad}, 20*time.Second) } diff --git a/main.go b/main.go index 64a3110..7a75de3 100644 --- a/main.go +++ b/main.go @@ -52,5 +52,4 @@ func main() { log.Println("shutting down") n.Close() } - } diff --git a/peers.json b/peers.json deleted file mode 100644 index 0fa5ca5..0000000 --- a/peers.json +++ /dev/null @@ -1,54 +0,0 @@ -[ - { - "peer_id": "a50d3697-7ede-4fe5-a385-e9d01ebc1002", - "name": "ASF", - "keywords": [ - "car", - "highway", - "images", - "video" - ], - "last_seen_online": "2023-03-07T11:57:13.378707853+01:00", - "api_version": "1", - "api_url": "http://127.0.0.1:49618/v1" - }, - { - "peer_id": "a50d3697-7ede-4fe5-a385-e9d01ebc1003", - "name": "IT", - "keywords": [ - "car", - "highway", - "images", - "video" - ], - "last_seen_online": "2023-03-07T11:57:13.378707853+01:00", - "api_version": "1", - "api_url": "https://it.irtse.com/oc" - }, - { - "peer_id": "a50d3697-7ede-4fe5-a385-e9d01ebc1004", - "name": "Centre de traitement des amendes", - "keywords": [ - "car", - "highway", - "images", - "video" - ], - "last_seen_online": "2023-03-07T11:57:13.378707853+01:00", - "api_version": "1", - "api_url": "https://impots.irtse.com/oc" - }, - { - "peer_id": "a50d3697-7ede-4fe5-a385-e9d01ebc1005", - "name": "Douanes", - "keywords": [ - "car", - "highway", - "images", - "video" - ], - "last_seen_online": "2023-03-07T11:57:13.378707853+01:00", - "api_version": "1", - "api_url": "https://douanes.irtse.com/oc" - } -] \ No newline at end of file