46 lines
1.4 KiB
Go
46 lines
1.4 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"context"
|
|
"oc-discovery/daemons/node/common"
|
|
|
|
oclib "cloud.o-forge.io/core/oc-lib"
|
|
"cloud.o-forge.io/core/oc-lib/models/peer"
|
|
"cloud.o-forge.io/core/oc-lib/tools"
|
|
)
|
|
|
|
func (ps *PubSubService) initSubscribeEvents(ctx context.Context) error {
|
|
if err := ps.subscribeEvents(ctx, nil, tools.PB_SEARCH, "", -1); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// generic function to subscribe to DHT flow of event
|
|
func (ps *PubSubService) subscribeEvents(
|
|
ctx context.Context, dt *tools.DataType, action tools.PubSubAction, peerID string, timeout int,
|
|
) error {
|
|
logger := oclib.GetLogger()
|
|
// define a name app.action#peerID
|
|
name := action.String() + "#" + peerID
|
|
if dt != nil { // if a datatype is precised then : app.action.datatype#peerID
|
|
name = action.String() + "." + (*dt).String() + "#" + peerID
|
|
}
|
|
f := func(ctx context.Context, evt common.Event, topicName string) {
|
|
if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 {
|
|
if err := ps.processEvent(ctx, p[0], &evt, topicName); err != nil {
|
|
logger.Err(err)
|
|
}
|
|
}
|
|
}
|
|
return common.SubscribeEvents(ps.LongLivedPubSubService, ctx, name, -1, f)
|
|
}
|
|
|
|
func (ps *PubSubService) processEvent(
|
|
ctx context.Context, p *peer.Peer, event *common.Event, topicName string) error {
|
|
if err := event.Verify(p); err != nil {
|
|
return err
|
|
}
|
|
return ps.handleEvent(ctx, topicName, event)
|
|
}
|