package indexer import ( "context" "encoding/json" "errors" "fmt" "math/rand" "slices" "strings" "sync" "time" "oc-discovery/daemons/node/common" oclib "cloud.o-forge.io/core/oc-lib" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/network" pp "github.com/libp2p/go-libp2p/core/peer" ) const ( // IndexerTTL is the lifetime of a live-indexer cache entry. Set to 50% above // the recommended 60s heartbeat interval so a single delayed renewal does not // evict a healthy indexer from the native's cache. IndexerTTL = 90 * time.Second // offloadInterval is how often the native checks if it can release responsible peers. offloadInterval = 30 * time.Second // dhtRefreshInterval is how often the background goroutine queries the DHT for // known-but-expired indexer entries (written by neighbouring natives). dhtRefreshInterval = 30 * time.Second // maxFallbackPeers caps how many peers the native will accept in self-delegation // mode. Beyond this limit the native refuses to act as a fallback indexer so it // is not overwhelmed during prolonged indexer outages. maxFallbackPeers = 50 ) // liveIndexerEntry tracks a registered indexer in the native's in-memory cache and DHT. type liveIndexerEntry struct { PeerID string `json:"peer_id"` Addr string `json:"addr"` ExpiresAt time.Time `json:"expires_at"` } // NativeState holds runtime state specific to native indexer operation. type NativeState struct { liveIndexers map[string]*liveIndexerEntry // keyed by PeerID, local cache with TTL liveIndexersMu sync.RWMutex responsiblePeers map[pp.ID]struct{} // peers for which the native is fallback indexer responsibleMu sync.RWMutex // knownPeerIDs accumulates all indexer PeerIDs ever seen (local stream or gossip). // Used by refreshIndexersFromDHT to re-hydrate expired entries from the shared DHT, // including entries written by other natives. knownPeerIDs map[string]string knownMu sync.RWMutex } func newNativeState() *NativeState { return &NativeState{ liveIndexers: map[string]*liveIndexerEntry{}, responsiblePeers: map[pp.ID]struct{}{}, knownPeerIDs: map[string]string{}, } } // IndexerRecordValidator validates indexer DHT entries under the "indexer" namespace. type IndexerRecordValidator struct{} func (v IndexerRecordValidator) Validate(_ string, value []byte) error { var e liveIndexerEntry if err := json.Unmarshal(value, &e); err != nil { return err } if e.Addr == "" { return errors.New("missing addr") } if e.ExpiresAt.Before(time.Now().UTC()) { return errors.New("expired indexer record") } return nil } func (v IndexerRecordValidator) Select(_ string, values [][]byte) (int, error) { var newest time.Time index := 0 for i, val := range values { var e liveIndexerEntry if err := json.Unmarshal(val, &e); err != nil { continue } if e.ExpiresAt.After(newest) { newest = e.ExpiresAt index = i } } return index, nil } // InitNative registers native-specific stream handlers and starts background loops. // Must be called after DHT is initialized. func (ix *IndexerService) InitNative() { ix.Native = newNativeState() ix.Host.SetStreamHandler(common.ProtocolHeartbeat, ix.HandleHeartbeat) // specific heartbeat for Indexer. ix.Host.SetStreamHandler(common.ProtocolNativeSubscription, ix.handleNativeSubscription) ix.Host.SetStreamHandler(common.ProtocolNativeGetIndexers, ix.handleNativeGetIndexers) ix.Host.SetStreamHandler(common.ProtocolNativeConsensus, ix.handleNativeConsensus) ix.Host.SetStreamHandler(common.ProtocolNativeGetPeers, ix.handleNativeGetPeers) ix.Host.SetStreamHandler(common.ProtocolIndexerGetNatives, ix.handleGetNatives) ix.subscribeIndexerRegistry() // Ensure long connections to other configured natives (native-to-native mesh). common.EnsureNativePeers(ix.Host) go ix.runOffloadLoop() go ix.refreshIndexersFromDHT() } // subscribeIndexerRegistry joins the PubSub topic used by natives to gossip newly // registered indexer PeerIDs to one another, enabling cross-native DHT discovery. func (ix *IndexerService) subscribeIndexerRegistry() { logger := oclib.GetLogger() ix.PS.RegisterTopicValidator(common.TopicIndexerRegistry, func(_ context.Context, _ pp.ID, msg *pubsub.Message) bool { // Reject empty or syntactically invalid multiaddrs before they reach the // message loop. A compromised native could otherwise gossip arbitrary data. addr := string(msg.Data) if addr == "" { return false } _, err := pp.AddrInfoFromString(addr) return err == nil }) topic, err := ix.PS.Join(common.TopicIndexerRegistry) if err != nil { logger.Err(err).Msg("native: failed to join indexer registry topic") return } sub, err := topic.Subscribe() if err != nil { logger.Err(err).Msg("native: failed to subscribe to indexer registry topic") return } ix.PubsubMu.Lock() ix.LongLivedPubSubs[common.TopicIndexerRegistry] = topic ix.PubsubMu.Unlock() go func() { for { msg, err := sub.Next(context.Background()) if err != nil { return } addr := string(msg.Data) if addr == "" { continue } if peer, err := pp.AddrInfoFromString(addr); err == nil { ix.Native.knownMu.Lock() ix.Native.knownPeerIDs[peer.ID.String()] = addr ix.Native.knownMu.Unlock() } // A neighbouring native registered this PeerID; add to known set for DHT refresh. } }() } // handleNativeSubscription stores an indexer's alive registration in the local cache // immediately, then persists it to the DHT asynchronously. // The stream is temporary: indexer sends one IndexerRegistration and closes. func (ix *IndexerService) handleNativeSubscription(s network.Stream) { defer s.Close() logger := oclib.GetLogger() logger.Info().Msg("Subscription") var reg common.IndexerRegistration if err := json.NewDecoder(s).Decode(®); err != nil { logger.Err(err).Msg("native subscription: decode") return } logger.Info().Msg("Subscription " + reg.Addr) if reg.Addr == "" { logger.Error().Msg("native subscription: missing addr") return } if reg.PeerID == "" { ad, err := pp.AddrInfoFromString(reg.Addr) if err != nil { logger.Err(err).Msg("native subscription: invalid addr") return } reg.PeerID = ad.ID.String() } // Build entry with a fresh TTL — must happen before the cache write so the 66s // window is not consumed by DHT retries. entry := &liveIndexerEntry{ PeerID: reg.PeerID, Addr: reg.Addr, ExpiresAt: time.Now().UTC().Add(IndexerTTL), } // Update local cache and known set immediately so concurrent GetIndexers calls // can already see this indexer without waiting for the DHT write to complete. ix.Native.liveIndexersMu.Lock() _, isRenewal := ix.Native.liveIndexers[reg.PeerID] ix.Native.liveIndexers[reg.PeerID] = entry ix.Native.liveIndexersMu.Unlock() ix.Native.knownMu.Lock() ix.Native.knownPeerIDs[reg.PeerID] = reg.Addr ix.Native.knownMu.Unlock() // Gossip PeerID to neighbouring natives so they discover it via DHT. ix.PubsubMu.RLock() topic := ix.LongLivedPubSubs[common.TopicIndexerRegistry] ix.PubsubMu.RUnlock() if topic != nil { if err := topic.Publish(context.Background(), []byte(reg.Addr)); err != nil { logger.Err(err).Msg("native subscription: registry gossip publish") } } if isRenewal { logger.Debug().Str("peer", reg.PeerID).Msg("native: indexer TTL renewed : " + fmt.Sprintf("%v", len(ix.Native.liveIndexers))) } else { logger.Info().Str("peer", reg.PeerID).Msg("native: indexer registered : " + fmt.Sprintf("%v", len(ix.Native.liveIndexers))) } // Persist in DHT asynchronously — retries must not block the handler or consume // the local cache TTL. key := ix.genIndexerKey(reg.PeerID) data, err := json.Marshal(entry) if err != nil { logger.Err(err).Msg("native subscription: marshal entry") return } go func() { for { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) if err := ix.DHT.PutValue(ctx, key, data); err != nil { cancel() logger.Err(err).Msg("native subscription: DHT put " + key) if strings.Contains(err.Error(), "failed to find any peer in table") { time.Sleep(10 * time.Second) continue } return } cancel() return } }() } // handleNativeGetIndexers returns this native's own list of reachable indexers. // Self-delegation (native acting as temporary fallback indexer) is only permitted // for nodes — never for peers that are themselves registered indexers in knownPeerIDs. // The consensus across natives is the responsibility of the requesting node/indexer. func (ix *IndexerService) handleNativeGetIndexers(s network.Stream) { defer s.Close() logger := oclib.GetLogger() var req common.GetIndexersRequest if err := json.NewDecoder(s).Decode(&req); err != nil { logger.Err(err).Msg("native get indexers: decode") return } if req.Count <= 0 { req.Count = 3 } callerPeerID := s.Conn().RemotePeer().String() reachable := ix.reachableLiveIndexers(req.Count, callerPeerID) var resp common.GetIndexersResponse if len(reachable) == 0 { // No live indexers reachable — try to self-delegate. if ix.selfDelegate(s.Conn().RemotePeer(), &resp) { logger.Info().Str("peer", callerPeerID).Msg("native: no indexers, acting as fallback for node") } else { // Fallback pool saturated: return empty so the caller retries another // native instead of piling more load onto this one. logger.Warn().Str("peer", callerPeerID).Int("pool", maxFallbackPeers).Msg( "native: fallback pool saturated, refusing self-delegation") } } else { rand.Shuffle(len(reachable), func(i, j int) { reachable[i], reachable[j] = reachable[j], reachable[i] }) if req.Count > len(reachable) { req.Count = len(reachable) } resp.Indexers = reachable[:req.Count] } if err := json.NewEncoder(s).Encode(resp); err != nil { logger.Err(err).Msg("native get indexers: encode response") } } // handleNativeConsensus answers a consensus challenge from a node/indexer. // It returns: // - Trusted: which of the candidates it considers alive. // - Suggestions: extras it knows and trusts that were not in the candidate list. func (ix *IndexerService) handleNativeConsensus(s network.Stream) { defer s.Close() logger := oclib.GetLogger() var req common.ConsensusRequest if err := json.NewDecoder(s).Decode(&req); err != nil { logger.Err(err).Msg("native consensus: decode") return } myList := ix.reachableLiveIndexers(-1, s.Conn().RemotePeer().String()) mySet := make(map[string]struct{}, len(myList)) for _, addr := range myList { mySet[addr] = struct{}{} } trusted := []string{} candidateSet := make(map[string]struct{}, len(req.Candidates)) for _, addr := range req.Candidates { candidateSet[addr] = struct{}{} if _, ok := mySet[addr]; ok { trusted = append(trusted, addr) // candidate we also confirm as reachable } } // Extras we trust but that the requester didn't include → suggestions. suggestions := []string{} for _, addr := range myList { if _, inCandidates := candidateSet[addr]; !inCandidates { suggestions = append(suggestions, addr) } } resp := common.ConsensusResponse{Trusted: trusted, Suggestions: suggestions} if err := json.NewEncoder(s).Encode(resp); err != nil { logger.Err(err).Msg("native consensus: encode response") } } // selfDelegate marks the caller as a responsible peer and exposes this native's own // address as its temporary indexer. Returns false when the fallback pool is saturated // (maxFallbackPeers reached) — the caller must return an empty response so the node // retries later instead of pinning indefinitely to an overloaded native. func (ix *IndexerService) selfDelegate(remotePeer pp.ID, resp *common.GetIndexersResponse) bool { ix.Native.responsibleMu.Lock() defer ix.Native.responsibleMu.Unlock() if len(ix.Native.responsiblePeers) >= maxFallbackPeers { return false } ix.Native.responsiblePeers[remotePeer] = struct{}{} resp.IsSelfFallback = true resp.Indexers = []string{ix.Host.Addrs()[len(ix.Host.Addrs())-1].String() + "/p2p/" + ix.Host.ID().String()} return true } // reachableLiveIndexers returns the multiaddrs of non-expired, pingable indexers // from the local cache (kept fresh by refreshIndexersFromDHT in background). func (ix *IndexerService) reachableLiveIndexers(count int, from ...string) []string { ix.Native.liveIndexersMu.RLock() now := time.Now().UTC() candidates := []*liveIndexerEntry{} for _, e := range ix.Native.liveIndexers { fmt.Println("liveIndexers", slices.Contains(from, e.PeerID), from, e.PeerID) if e.ExpiresAt.After(now) && !slices.Contains(from, e.PeerID) { candidates = append(candidates, e) } } ix.Native.liveIndexersMu.RUnlock() fmt.Println("midway...", candidates, from, ix.Native.knownPeerIDs) if (count > 0 && len(candidates) < count) || count < 0 { ix.Native.knownMu.RLock() for k, v := range ix.Native.knownPeerIDs { // Include peers whose liveIndexers entry is absent OR expired. // A non-nil but expired entry means the peer was once known but // has since timed out — PeerIsAlive below will decide if it's back. fmt.Println("knownPeerIDs", slices.Contains(from, k), from, k) if !slices.Contains(from, k) { candidates = append(candidates, &liveIndexerEntry{ PeerID: k, Addr: v, }) } } ix.Native.knownMu.RUnlock() } fmt.Println("midway...1", candidates) reachable := []string{} for _, e := range candidates { ad, err := pp.AddrInfoFromString(e.Addr) if err != nil { continue } if common.PeerIsAlive(ix.Host, *ad) { reachable = append(reachable, e.Addr) } } return reachable } // refreshIndexersFromDHT runs in background and queries the shared DHT for every known // indexer PeerID whose local cache entry is missing or expired. This supplements the // local cache with entries written by neighbouring natives. func (ix *IndexerService) refreshIndexersFromDHT() { t := time.NewTicker(dhtRefreshInterval) defer t.Stop() logger := oclib.GetLogger() for range t.C { ix.Native.knownMu.RLock() peerIDs := make([]string, 0, len(ix.Native.knownPeerIDs)) for pid := range ix.Native.knownPeerIDs { peerIDs = append(peerIDs, pid) } ix.Native.knownMu.RUnlock() now := time.Now().UTC() for _, pid := range peerIDs { ix.Native.liveIndexersMu.RLock() existing := ix.Native.liveIndexers[pid] ix.Native.liveIndexersMu.RUnlock() if existing != nil && existing.ExpiresAt.After(now) { continue // still fresh in local cache } key := ix.genIndexerKey(pid) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ch, err := ix.DHT.SearchValue(ctx, key) if err != nil { cancel() continue } var best *liveIndexerEntry for b := range ch { var e liveIndexerEntry if err := json.Unmarshal(b, &e); err != nil { continue } if e.ExpiresAt.After(time.Now().UTC()) { if best == nil || e.ExpiresAt.After(best.ExpiresAt) { best = &e } } } cancel() if best != nil { ix.Native.liveIndexersMu.Lock() ix.Native.liveIndexers[best.PeerID] = best ix.Native.liveIndexersMu.Unlock() logger.Info().Str("peer", best.PeerID).Msg("native: refreshed indexer from DHT") } else { // DHT has no fresh entry — peer is gone, prune from known set. ix.Native.knownMu.Lock() delete(ix.Native.knownPeerIDs, pid) ix.Native.knownMu.Unlock() logger.Info().Str("peer", pid).Msg("native: pruned stale peer from knownPeerIDs") } } } } func (ix *IndexerService) genIndexerKey(peerID string) string { return "/indexer/" + peerID } // runOffloadLoop periodically checks if real indexers are available and releases // responsible peers so they can reconnect to actual indexers on their next attempt. func (ix *IndexerService) runOffloadLoop() { t := time.NewTicker(offloadInterval) defer t.Stop() logger := oclib.GetLogger() for range t.C { fmt.Println("runOffloadLoop", ix.Native.responsiblePeers) ix.Native.responsibleMu.RLock() count := len(ix.Native.responsiblePeers) ix.Native.responsibleMu.RUnlock() if count == 0 { continue } ix.Native.responsibleMu.RLock() peerIDS := []string{} for p := range ix.Native.responsiblePeers { peerIDS = append(peerIDS, p.String()) } fmt.Println("COUNT --> ", count, len(ix.reachableLiveIndexers(-1, peerIDS...))) ix.Native.responsibleMu.RUnlock() if len(ix.reachableLiveIndexers(-1, peerIDS...)) > 0 { ix.Native.responsibleMu.RLock() released := ix.Native.responsiblePeers ix.Native.responsibleMu.RUnlock() // Reset (not Close) heartbeat streams of released peers. // Close() only half-closes the native's write direction — the peer's write // direction stays open and sendHeartbeat never sees an error. // Reset() abruptly terminates both directions, making the peer's next // json.Encode return an error which triggers replenishIndexersFromNative. ix.StreamMU.Lock() if streams := ix.StreamRecords[common.ProtocolHeartbeat]; streams != nil { for pid := range released { if rec, ok := streams[pid]; ok { if rec.HeartbeatStream != nil && rec.HeartbeatStream.Stream != nil { rec.HeartbeatStream.Stream.Reset() } ix.Native.responsibleMu.Lock() delete(ix.Native.responsiblePeers, pid) ix.Native.responsibleMu.Unlock() delete(streams, pid) logger.Info().Str("peer", pid.String()).Str("proto", string(common.ProtocolHeartbeat)).Msg( "native: offload — stream reset, peer will reconnect to real indexer") } else { // No recorded heartbeat stream for this peer: either it never // passed the score check (new peer, uptime=0 → score<75) or the // stream was GC'd. We cannot send a Reset signal, so close the // whole connection instead — this makes the peer's sendHeartbeat // return an error, which triggers replenishIndexersFromNative and // migrates it to a real indexer. ix.Native.responsibleMu.Lock() delete(ix.Native.responsiblePeers, pid) ix.Native.responsibleMu.Unlock() go ix.Host.Network().ClosePeer(pid) logger.Info().Str("peer", pid.String()).Msg( "native: offload — no heartbeat stream, closing connection so peer re-requests real indexers") } } } ix.StreamMU.Unlock() logger.Info().Int("released", count).Msg("native: offloaded responsible peers to real indexers") } } } // handleNativeGetPeers returns a random selection of this native's known native // contacts, excluding any in the request's Exclude list. func (ix *IndexerService) handleNativeGetPeers(s network.Stream) { defer s.Close() logger := oclib.GetLogger() var req common.GetNativePeersRequest if err := json.NewDecoder(s).Decode(&req); err != nil { logger.Err(err).Msg("native get peers: decode") return } if req.Count <= 0 { req.Count = 1 } excludeSet := make(map[string]struct{}, len(req.Exclude)) for _, e := range req.Exclude { excludeSet[e] = struct{}{} } common.StreamNativeMu.RLock() candidates := make([]string, 0, len(common.StaticNatives)) for addr := range common.StaticNatives { if _, excluded := excludeSet[addr]; !excluded { candidates = append(candidates, addr) } } common.StreamNativeMu.RUnlock() rand.Shuffle(len(candidates), func(i, j int) { candidates[i], candidates[j] = candidates[j], candidates[i] }) if req.Count > len(candidates) { req.Count = len(candidates) } resp := common.GetNativePeersResponse{Peers: candidates[:req.Count]} if err := json.NewEncoder(s).Encode(resp); err != nil { logger.Err(err).Msg("native get peers: encode response") } } // StartNativeRegistration starts a goroutine that periodically registers this // indexer with all configured native indexers (every RecommendedHeartbeatInterval).