169 lines
5.2 KiB
Go
169 lines
5.2 KiB
Go
package indexer
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"oc-discovery/daemons/node/common"
|
|
|
|
oclib "cloud.o-forge.io/core/oc-lib"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
pp "github.com/libp2p/go-libp2p/core/peer"
|
|
)
|
|
|
|
// TopicNameIndex is the GossipSub topic shared by regular indexers to exchange
|
|
// add/delete events for the distributed name→peerID mapping.
|
|
const TopicNameIndex = "oc-name-index"
|
|
|
|
// nameIndexDedupWindow suppresses re-emission of the same (action, name, peerID)
|
|
// tuple within this window, reducing duplicate events when a node is registered
|
|
// with multiple indexers simultaneously.
|
|
const nameIndexDedupWindow = 30 * time.Second
|
|
|
|
// NameIndexAction indicates whether a name mapping is being added or removed.
|
|
type NameIndexAction string
|
|
|
|
const (
|
|
NameIndexAdd NameIndexAction = "add"
|
|
NameIndexDelete NameIndexAction = "delete"
|
|
)
|
|
|
|
// NameIndexEvent is published on TopicNameIndex by each indexer when a node
|
|
// registers (add) or is evicted by the GC (delete).
|
|
type NameIndexEvent struct {
|
|
Action NameIndexAction `json:"action"`
|
|
Name string `json:"name"`
|
|
PeerID string `json:"peer_id"`
|
|
DID string `json:"did"`
|
|
}
|
|
|
|
// nameIndexState holds the local in-memory name index and the sender-side
|
|
// deduplication tracker.
|
|
type nameIndexState struct {
|
|
// index: name → peerID → DID, built from events received from all indexers.
|
|
index map[string]map[string]string
|
|
indexMu sync.RWMutex
|
|
|
|
// emitted tracks the last emission time for each (action, name, peerID) key
|
|
// to suppress duplicates within nameIndexDedupWindow.
|
|
emitted map[string]time.Time
|
|
emittedMu sync.Mutex
|
|
}
|
|
|
|
// shouldEmit returns true if the (action, name, peerID) tuple has not been
|
|
// emitted within nameIndexDedupWindow, updating the tracker if so.
|
|
func (s *nameIndexState) shouldEmit(action NameIndexAction, name, peerID string) bool {
|
|
key := string(action) + ":" + name + ":" + peerID
|
|
s.emittedMu.Lock()
|
|
defer s.emittedMu.Unlock()
|
|
if t, ok := s.emitted[key]; ok && time.Since(t) < nameIndexDedupWindow {
|
|
return false
|
|
}
|
|
s.emitted[key] = time.Now()
|
|
return true
|
|
}
|
|
|
|
// onEvent applies a received NameIndexEvent to the local index.
|
|
// "add" inserts/updates the mapping; "delete" removes it.
|
|
// Operations are idempotent — duplicate events from multiple indexers are harmless.
|
|
func (s *nameIndexState) onEvent(evt NameIndexEvent) {
|
|
if evt.Name == "" || evt.PeerID == "" {
|
|
return
|
|
}
|
|
s.indexMu.Lock()
|
|
defer s.indexMu.Unlock()
|
|
switch evt.Action {
|
|
case NameIndexAdd:
|
|
if s.index[evt.Name] == nil {
|
|
s.index[evt.Name] = map[string]string{}
|
|
}
|
|
s.index[evt.Name][evt.PeerID] = evt.DID
|
|
case NameIndexDelete:
|
|
if s.index[evt.Name] != nil {
|
|
delete(s.index[evt.Name], evt.PeerID)
|
|
if len(s.index[evt.Name]) == 0 {
|
|
delete(s.index, evt.Name)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// initNameIndex joins TopicNameIndex and starts consuming events.
|
|
// Must be called after ix.PS is ready.
|
|
func (ix *IndexerService) initNameIndex(ps *pubsub.PubSub) {
|
|
logger := oclib.GetLogger()
|
|
ix.nameIndex = &nameIndexState{
|
|
index: map[string]map[string]string{},
|
|
emitted: map[string]time.Time{},
|
|
}
|
|
|
|
ps.RegisterTopicValidator(TopicNameIndex, func(_ context.Context, _ pp.ID, _ *pubsub.Message) bool {
|
|
return true
|
|
})
|
|
topic, err := ps.Join(TopicNameIndex)
|
|
if err != nil {
|
|
logger.Err(err).Msg("name index: failed to join topic")
|
|
return
|
|
}
|
|
ix.LongLivedStreamRecordedService.LongLivedPubSubService.PubsubMu.Lock()
|
|
ix.LongLivedStreamRecordedService.LongLivedPubSubService.LongLivedPubSubs[TopicNameIndex] = topic
|
|
ix.LongLivedStreamRecordedService.LongLivedPubSubService.PubsubMu.Unlock()
|
|
|
|
common.SubscribeEvents(
|
|
ix.LongLivedStreamRecordedService.LongLivedPubSubService,
|
|
context.Background(),
|
|
TopicNameIndex,
|
|
-1,
|
|
func(_ context.Context, evt NameIndexEvent, _ string) {
|
|
ix.nameIndex.onEvent(evt)
|
|
},
|
|
)
|
|
}
|
|
|
|
// publishNameEvent emits a NameIndexEvent on TopicNameIndex, subject to the
|
|
// sender-side deduplication window.
|
|
func (ix *IndexerService) publishNameEvent(action NameIndexAction, name, peerID, did string) {
|
|
if ix.nameIndex == nil || name == "" || peerID == "" {
|
|
return
|
|
}
|
|
if !ix.nameIndex.shouldEmit(action, name, peerID) {
|
|
return
|
|
}
|
|
ix.LongLivedStreamRecordedService.LongLivedPubSubService.PubsubMu.RLock()
|
|
topic := ix.LongLivedStreamRecordedService.LongLivedPubSubService.LongLivedPubSubs[TopicNameIndex]
|
|
ix.LongLivedStreamRecordedService.LongLivedPubSubService.PubsubMu.RUnlock()
|
|
if topic == nil {
|
|
return
|
|
}
|
|
evt := NameIndexEvent{Action: action, Name: name, PeerID: peerID, DID: did}
|
|
b, err := json.Marshal(evt)
|
|
if err != nil {
|
|
return
|
|
}
|
|
_ = topic.Publish(context.Background(), b)
|
|
}
|
|
|
|
// LookupNameIndex searches the distributed name index for peers whose name
|
|
// contains needle (case-insensitive). Returns peerID → DID for matched peers.
|
|
// Returns nil if the name index is not initialised (e.g. native indexers).
|
|
func (ix *IndexerService) LookupNameIndex(needle string) map[string]string {
|
|
if ix.nameIndex == nil {
|
|
return nil
|
|
}
|
|
result := map[string]string{}
|
|
needleLow := strings.ToLower(needle)
|
|
ix.nameIndex.indexMu.RLock()
|
|
defer ix.nameIndex.indexMu.RUnlock()
|
|
for name, peers := range ix.nameIndex.index {
|
|
if strings.Contains(strings.ToLower(name), needleLow) {
|
|
for peerID, did := range peers {
|
|
result[peerID] = did
|
|
}
|
|
}
|
|
}
|
|
return result
|
|
}
|