package pubsub import ( "context" "oc-discovery/daemons/node/common" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/tools" ) func (ps *PubSubService) initSubscribeEvents(ctx context.Context) error { if err := ps.subscribeEvents(ctx, nil, tools.PB_SEARCH, "", -1); err != nil { return err } return nil } // generic function to subscribe to DHT flow of event func (ps *PubSubService) subscribeEvents( ctx context.Context, dt *tools.DataType, action tools.PubSubAction, peerID string, timeout int, ) error { logger := oclib.GetLogger() // define a name app.action#peerID name := action.String() + "#" + peerID if dt != nil { // if a datatype is precised then : app.action.datatype#peerID name = action.String() + "." + (*dt).String() + "#" + peerID } f := func(ctx context.Context, evt common.Event, topicName string) { if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 { if err := ps.processEvent(ctx, p[0], &evt, topicName); err != nil { logger.Err(err) } } } return common.SubscribeEvents(ps.LongLivedPubSubService, ctx, name, -1, f) } func (ps *PubSubService) processEvent( ctx context.Context, p *peer.Peer, event *common.Event, topicName string) error { if err := event.Verify(p); err != nil { return err } return ps.handleEvent(ctx, topicName, event) }