package indexer import ( "context" "encoding/json" "errors" "math/rand" "sync" "time" "oc-discovery/daemons/node/common" oclib "cloud.o-forge.io/core/oc-lib" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" pp "github.com/libp2p/go-libp2p/core/peer" ) const ( // IndexerTTL is 10% above the recommended 60s heartbeat interval. IndexerTTL = 66 * time.Second // offloadInterval is how often the native checks if it can release responsible peers. offloadInterval = 30 * time.Second // dhtRefreshInterval is how often the background goroutine queries the DHT for // known-but-expired indexer entries (written by neighbouring natives). dhtRefreshInterval = 30 * time.Second ) // liveIndexerEntry tracks a registered indexer in the native's in-memory cache and DHT. type liveIndexerEntry struct { PeerID string `json:"peer_id"` Addr string `json:"addr"` ExpiresAt time.Time `json:"expires_at"` } // NativeState holds runtime state specific to native indexer operation. type NativeState struct { liveIndexers map[string]*liveIndexerEntry // keyed by PeerID, local cache with TTL liveIndexersMu sync.RWMutex responsiblePeers map[pp.ID]struct{} // peers for which the native is fallback indexer responsibleMu sync.RWMutex // knownPeerIDs accumulates all indexer PeerIDs ever seen (local stream or gossip). // Used by refreshIndexersFromDHT to re-hydrate expired entries from the shared DHT, // including entries written by other natives. knownPeerIDs map[string]struct{} knownMu sync.RWMutex } func newNativeState() *NativeState { return &NativeState{ liveIndexers: map[string]*liveIndexerEntry{}, responsiblePeers: map[pp.ID]struct{}{}, knownPeerIDs: map[string]struct{}{}, } } // IndexerRecordValidator validates indexer DHT entries under the "indexer" namespace. type IndexerRecordValidator struct{} func (v IndexerRecordValidator) Validate(_ string, value []byte) error { var e liveIndexerEntry if err := json.Unmarshal(value, &e); err != nil { return err } if e.Addr == "" { return errors.New("missing addr") } if e.ExpiresAt.Before(time.Now().UTC()) { return errors.New("expired indexer record") } return nil } func (v IndexerRecordValidator) Select(_ string, values [][]byte) (int, error) { var newest time.Time index := 0 for i, val := range values { var e liveIndexerEntry if err := json.Unmarshal(val, &e); err != nil { continue } if e.ExpiresAt.After(newest) { newest = e.ExpiresAt index = i } } return index, nil } // InitNative registers native-specific stream handlers and starts background loops. // Must be called after DHT is initialized. func (ix *IndexerService) InitNative() { ix.Native = newNativeState() ix.Host.SetStreamHandler(common.ProtocolIndexerHeartbeat, ix.HandleNodeHeartbeat) // specific heartbeat for Indexer. ix.Host.SetStreamHandler(common.ProtocolNativeSubscription, ix.handleNativeSubscription) ix.Host.SetStreamHandler(common.ProtocolNativeGetIndexers, ix.handleNativeGetIndexers) ix.Host.SetStreamHandler(common.ProtocolNativeConsensus, ix.handleNativeConsensus) ix.subscribeIndexerRegistry() // Ensure long connections to other configured natives (native-to-native mesh). common.EnsureNativePeers(ix.Host) go ix.runOffloadLoop() go ix.refreshIndexersFromDHT() } // subscribeIndexerRegistry joins the PubSub topic used by natives to gossip newly // registered indexer PeerIDs to one another, enabling cross-native DHT discovery. func (ix *IndexerService) subscribeIndexerRegistry() { logger := oclib.GetLogger() ix.PS.RegisterTopicValidator(common.TopicIndexerRegistry, func(_ context.Context, _ pp.ID, _ *pubsub.Message) bool { return true }) topic, err := ix.PS.Join(common.TopicIndexerRegistry) if err != nil { logger.Err(err).Msg("native: failed to join indexer registry topic") return } sub, err := topic.Subscribe() if err != nil { logger.Err(err).Msg("native: failed to subscribe to indexer registry topic") return } ix.PubsubMu.Lock() ix.LongLivedPubSubs[common.TopicIndexerRegistry] = topic ix.PubsubMu.Unlock() go func() { for { msg, err := sub.Next(context.Background()) if err != nil { return } peerID := string(msg.Data) if peerID == "" { continue } // A neighbouring native registered this PeerID; add to known set for DHT refresh. ix.Native.knownMu.Lock() ix.Native.knownPeerIDs[peerID] = struct{}{} ix.Native.knownMu.Unlock() } }() } // handleNativeSubscription stores an indexer's alive registration in the DHT cache. // The stream is temporary: indexer sends one IndexerRegistration and closes. func (ix *IndexerService) handleNativeSubscription(s network.Stream) { defer s.Close() logger := oclib.GetLogger() var reg common.IndexerRegistration if err := json.NewDecoder(s).Decode(®); err != nil { logger.Err(err).Msg("native subscription: decode") return } if reg.Addr == "" { logger.Error().Msg("native subscription: missing addr") return } if reg.PeerID == "" { ad, err := pp.AddrInfoFromString(reg.Addr) if err != nil { logger.Err(err).Msg("native subscription: invalid addr") return } reg.PeerID = ad.ID.String() } expiry := time.Now().UTC().Add(IndexerTTL) entry := &liveIndexerEntry{ PeerID: reg.PeerID, Addr: reg.Addr, ExpiresAt: expiry, } // Persist in DHT with 66s TTL. key := ix.genIndexerKey(reg.PeerID) if data, err := json.Marshal(entry); err == nil { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) if err := ix.DHT.PutValue(ctx, key, data); err != nil { logger.Err(err).Msg("native subscription: DHT put") } cancel() } // Update local cache and known set. ix.Native.liveIndexersMu.Lock() ix.Native.liveIndexers[reg.PeerID] = entry ix.Native.liveIndexersMu.Unlock() ix.Native.knownMu.Lock() ix.Native.knownPeerIDs[reg.PeerID] = struct{}{} ix.Native.knownMu.Unlock() // Gossip PeerID to neighbouring natives so they discover it via DHT. ix.PubsubMu.RLock() topic := ix.LongLivedPubSubs[common.TopicIndexerRegistry] ix.PubsubMu.RUnlock() if topic != nil { if err := topic.Publish(context.Background(), []byte(reg.PeerID)); err != nil { logger.Err(err).Msg("native subscription: registry gossip publish") } } logger.Info().Str("peer", reg.PeerID).Msg("native: indexer registered") } // handleNativeGetIndexers returns this native's own list of reachable indexers. // If none are available, it self-delegates (becomes the fallback indexer for the caller). // The consensus across natives is the responsibility of the requesting node/indexer. func (ix *IndexerService) handleNativeGetIndexers(s network.Stream) { defer s.Close() logger := oclib.GetLogger() var req common.GetIndexersRequest if err := json.NewDecoder(s).Decode(&req); err != nil { logger.Err(err).Msg("native get indexers: decode") return } if req.Count <= 0 { req.Count = 3 } reachable := ix.reachableLiveIndexers() var resp common.GetIndexersResponse if len(reachable) == 0 { // No indexers known: become temporary fallback for this caller. ix.selfDelegate(s.Conn().RemotePeer(), &resp) logger.Info().Str("peer", s.Conn().RemotePeer().String()).Msg("native: no indexers, acting as fallback") } else { rand.Shuffle(len(reachable), func(i, j int) { reachable[i], reachable[j] = reachable[j], reachable[i] }) if req.Count > len(reachable) { req.Count = len(reachable) } resp.Indexers = reachable[:req.Count] } if err := json.NewEncoder(s).Encode(resp); err != nil { logger.Err(err).Msg("native get indexers: encode response") } } // handleNativeConsensus answers a consensus challenge from a node/indexer. // It returns: // - Trusted: which of the candidates it considers alive. // - Suggestions: extras it knows and trusts that were not in the candidate list. func (ix *IndexerService) handleNativeConsensus(s network.Stream) { defer s.Close() logger := oclib.GetLogger() var req common.ConsensusRequest if err := json.NewDecoder(s).Decode(&req); err != nil { logger.Err(err).Msg("native consensus: decode") return } myList := ix.reachableLiveIndexers() mySet := make(map[string]struct{}, len(myList)) for _, addr := range myList { mySet[addr] = struct{}{} } trusted := []string{} candidateSet := make(map[string]struct{}, len(req.Candidates)) for _, addr := range req.Candidates { candidateSet[addr] = struct{}{} if _, ok := mySet[addr]; ok { trusted = append(trusted, addr) // candidate we also confirm as reachable } } // Extras we trust but that the requester didn't include → suggestions. suggestions := []string{} for _, addr := range myList { if _, inCandidates := candidateSet[addr]; !inCandidates { suggestions = append(suggestions, addr) } } resp := common.ConsensusResponse{Trusted: trusted, Suggestions: suggestions} if err := json.NewEncoder(s).Encode(resp); err != nil { logger.Err(err).Msg("native consensus: encode response") } } // selfDelegate marks the caller as a responsible peer and exposes this native's own // address as its temporary indexer. func (ix *IndexerService) selfDelegate(remotePeer pp.ID, resp *common.GetIndexersResponse) { ix.Native.responsibleMu.Lock() ix.Native.responsiblePeers[remotePeer] = struct{}{} ix.Native.responsibleMu.Unlock() resp.IsSelfFallback = true for _, a := range ix.Host.Addrs() { resp.Indexers = []string{a.String() + "/p2p/" + ix.Host.ID().String()} break } } // reachableLiveIndexers returns the multiaddrs of non-expired, pingable indexers // from the local cache (kept fresh by refreshIndexersFromDHT in background). func (ix *IndexerService) reachableLiveIndexers() []string { ix.Native.liveIndexersMu.RLock() now := time.Now().UTC() candidates := []*liveIndexerEntry{} for _, e := range ix.Native.liveIndexers { if e.ExpiresAt.After(now) { candidates = append(candidates, e) } } ix.Native.liveIndexersMu.RUnlock() reachable := []string{} for _, e := range candidates { ad, err := pp.AddrInfoFromString(e.Addr) if err != nil { continue } if common.PeerIsAlive(ix.Host, *ad) { reachable = append(reachable, e.Addr) } } return reachable } // refreshIndexersFromDHT runs in background and queries the shared DHT for every known // indexer PeerID whose local cache entry is missing or expired. This supplements the // local cache with entries written by neighbouring natives. func (ix *IndexerService) refreshIndexersFromDHT() { t := time.NewTicker(dhtRefreshInterval) defer t.Stop() logger := oclib.GetLogger() for range t.C { ix.Native.knownMu.RLock() peerIDs := make([]string, 0, len(ix.Native.knownPeerIDs)) for pid := range ix.Native.knownPeerIDs { peerIDs = append(peerIDs, pid) } ix.Native.knownMu.RUnlock() now := time.Now().UTC() for _, pid := range peerIDs { ix.Native.liveIndexersMu.RLock() existing := ix.Native.liveIndexers[pid] ix.Native.liveIndexersMu.RUnlock() if existing != nil && existing.ExpiresAt.After(now) { continue // still fresh in local cache } key := ix.genIndexerKey(pid) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ch, err := ix.DHT.SearchValue(ctx, key) if err != nil { cancel() continue } var best *liveIndexerEntry for b := range ch { var e liveIndexerEntry if err := json.Unmarshal(b, &e); err != nil { continue } if e.ExpiresAt.After(time.Now().UTC()) { if best == nil || e.ExpiresAt.After(best.ExpiresAt) { best = &e } } } cancel() if best != nil { ix.Native.liveIndexersMu.Lock() ix.Native.liveIndexers[best.PeerID] = best ix.Native.liveIndexersMu.Unlock() logger.Info().Str("peer", best.PeerID).Msg("native: refreshed indexer from DHT") } } } } func (ix *IndexerService) genIndexerKey(peerID string) string { return "/indexer/" + peerID } // runOffloadLoop periodically checks if real indexers are available and releases // responsible peers so they can reconnect to actual indexers on their next attempt. func (ix *IndexerService) runOffloadLoop() { t := time.NewTicker(offloadInterval) defer t.Stop() logger := oclib.GetLogger() for range t.C { ix.Native.responsibleMu.RLock() count := len(ix.Native.responsiblePeers) ix.Native.responsibleMu.RUnlock() if count == 0 { continue } if len(ix.reachableLiveIndexers()) > 0 { ix.Native.responsibleMu.Lock() ix.Native.responsiblePeers = map[pp.ID]struct{}{} ix.Native.responsibleMu.Unlock() logger.Info().Int("released", count).Msg("native: offloaded responsible peers to real indexers") } } } // StartNativeRegistration starts a goroutine that periodically registers this // indexer with all configured native indexers (every RecommendedHeartbeatInterval). func StartNativeRegistration(h host.Host, nativeAddressesStr string) { go func() { common.RegisterWithNative(h, nativeAddressesStr) t := time.NewTicker(common.RecommendedHeartbeatInterval) defer t.Stop() for range t.C { common.RegisterWithNative(h, nativeAddressesStr) } }() }