package common import ( "context" "encoding/json" "errors" "fmt" "sync" "time" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/tools" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" pp "github.com/libp2p/go-libp2p/core/peer" ) type Event struct { Type string `json:"type"` From string `json:"from"` // peerID User string DataType int64 `json:"datatype"` Timestamp int64 `json:"ts"` Payload []byte `json:"payload"` Signature []byte `json:"sig"` } func NewEvent(name string, from string, dt *tools.DataType, user string, payload []byte) *Event { priv, err := LoadKeyFromFilePrivate() // your node private key if err != nil { return nil } evt := &Event{ Type: name, From: from, User: user, Timestamp: time.Now().UTC().Unix(), Payload: payload, } if dt != nil { evt.DataType = int64(dt.EnumIndex()) } else { evt.DataType = -1 } body, _ := json.Marshal(evt) sig, _ := priv.Sign(body) evt.Signature = sig return evt } func (e *Event) RawEvent() *Event { return &Event{ Type: e.Type, From: e.From, User: e.User, DataType: e.DataType, Timestamp: e.Timestamp, Payload: e.Payload, } } func (e *Event) toRawByte() ([]byte, error) { return json.Marshal(e.RawEvent()) } func (event *Event) Verify(p *peer.Peer) error { if p == nil { return errors.New("no peer found") } if p.Relation == peer.BLACKLIST { // if peer is blacklisted... quit... return errors.New("peer is blacklisted") } pubKey, err := PubKeyFromString(p.PublicKey) // extract pubkey from pubkey str if err != nil { return errors.New("pubkey is malformed") } data, err := event.toRawByte() if err != nil { return err } // extract byte from raw event excluding signature. if ok, _ := pubKey.Verify(data, event.Signature); !ok { // then verify if pubkey sign this message... return errors.New("check signature failed") } return nil } type TopicNodeActivityPub struct { NodeActivity peer.PeerState Disposer pp.AddrInfo `json:"disposer_address"` Name string `json:"name"` DID string `json:"did"` // real PEER ID PeerID string `json:"peer_id"` } type LongLivedPubSubService struct { Host host.Host LongLivedPubSubs map[string]*pubsub.Topic PubsubMu sync.RWMutex } func NewLongLivedPubSubService(h host.Host) *LongLivedPubSubService { return &LongLivedPubSubService{ Host: h, LongLivedPubSubs: map[string]*pubsub.Topic{}, } } func (s *LongLivedPubSubService) processEvent( ctx context.Context, p *peer.Peer, event *Event, topicName string, handler func(context.Context, string, *Event) error) error { if err := event.Verify(p); err != nil { return err } return handler(ctx, topicName, event) } const TopicPubSubNodeActivity = "oc-node-activity" const TopicPubSubSearch = "oc-node-search" func (s *LongLivedPubSubService) SubscribeToNodeActivity(ps *pubsub.PubSub, f *func(context.Context, TopicNodeActivityPub, string)) error { ps.RegisterTopicValidator(TopicPubSubNodeActivity, func(ctx context.Context, p pp.ID, m *pubsub.Message) bool { return true }) if topic, err := ps.Join(TopicPubSubNodeActivity); err != nil { return err } else { s.PubsubMu.Lock() defer s.PubsubMu.Unlock() s.LongLivedPubSubs[TopicPubSubNodeActivity] = topic } if f != nil { return SubscribeEvents(s, context.Background(), TopicPubSubNodeActivity, -1, *f) } return nil } func (s *LongLivedPubSubService) SubscribeToSearch(ps *pubsub.PubSub, f *func(context.Context, Event, string)) error { ps.RegisterTopicValidator(TopicPubSubSearch, func(ctx context.Context, p pp.ID, m *pubsub.Message) bool { return true }) if topic, err := ps.Join(TopicPubSubSearch); err != nil { return err } else { s.PubsubMu.Lock() defer s.PubsubMu.Unlock() s.LongLivedPubSubs[TopicPubSubSearch] = topic } if f != nil { return SubscribeEvents(s, context.Background(), TopicPubSubSearch, -1, *f) } return nil } func SubscribeEvents[T interface{}](s *LongLivedPubSubService, ctx context.Context, proto string, timeout int, f func(context.Context, T, string), ) error { s.PubsubMu.Lock() if s.LongLivedPubSubs[proto] == nil { s.PubsubMu.Unlock() return errors.New("no protocol subscribed in pubsub") } topic := s.LongLivedPubSubs[proto] s.PubsubMu.Unlock() sub, err := topic.Subscribe() // then subscribe to it if err != nil { return err } // launch loop waiting for results. go waitResults[T](s, ctx, sub, proto, timeout, f) return nil } func waitResults[T interface{}](s *LongLivedPubSubService, ctx context.Context, sub *pubsub.Subscription, proto string, timeout int, f func(context.Context, T, string)) { defer ctx.Done() for { s.PubsubMu.Lock() // check safely if cache is actually notified subscribed to topic if s.LongLivedPubSubs[proto] == nil { // if not kill the loop. break } s.PubsubMu.Unlock() // if still subscribed -> wait for new message var cancel context.CancelFunc if timeout != -1 { ctx, cancel = context.WithTimeout(ctx, time.Duration(timeout)*time.Second) defer cancel() } msg, err := sub.Next(ctx) if err != nil { if errors.Is(err, context.DeadlineExceeded) { // timeout hit, no message before deadline kill subsciption. s.PubsubMu.Lock() delete(s.LongLivedPubSubs, proto) s.PubsubMu.Unlock() return } continue } var evt T if err := json.Unmarshal(msg.Data, &evt); err != nil { // map to event continue } f(ctx, evt, fmt.Sprintf("%v", proto)) /*if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 { if err := ps.processEvent(ctx, p[0], &evt, topicName); err != nil { logger.Err(err) } }*/ } }