Files
oc-discovery/daemons/node/common/native_stream.go
2026-03-03 16:38:24 +01:00

778 lines
26 KiB
Go

package common
import (
"context"
"encoding/json"
"errors"
"math/rand"
"oc-discovery/conf"
"strings"
"sync"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"github.com/libp2p/go-libp2p/core/host"
pp "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
const (
ProtocolNativeSubscription = "/opencloud/native/subscribe/1.0"
ProtocolNativeGetIndexers = "/opencloud/native/indexers/1.0"
// ProtocolNativeConsensus is used by nodes/indexers to cross-validate an indexer
// pool against all configured native peers.
ProtocolNativeConsensus = "/opencloud/native/consensus/1.0"
RecommendedHeartbeatInterval = 60 * time.Second
// TopicIndexerRegistry is the PubSub topic used by native indexers to gossip
// newly registered indexer PeerIDs to neighbouring natives.
TopicIndexerRegistry = "oc-indexer-registry"
// consensusQueryTimeout is the per-native timeout for a consensus query.
consensusQueryTimeout = 3 * time.Second
// consensusCollectTimeout is the total wait for all native responses.
consensusCollectTimeout = 4 * time.Second
)
// ConsensusRequest is sent by a node/indexer to a native to validate a candidate
// indexer list. The native replies with what it trusts and what it suggests instead.
type ConsensusRequest struct {
Candidates []string `json:"candidates"`
}
// ConsensusResponse is returned by a native during a consensus challenge.
// Trusted = candidates the native considers alive.
// Suggestions = extras the native knows and trusts but that were not in the candidate list.
type ConsensusResponse struct {
Trusted []string `json:"trusted"`
Suggestions []string `json:"suggestions,omitempty"`
}
// IndexerRegistration is sent by an indexer to a native to signal its alive state.
// Only Addr is required; PeerID is derived from it if omitted.
type IndexerRegistration struct {
PeerID string `json:"peer_id,omitempty"`
Addr string `json:"addr"`
}
// GetIndexersRequest asks a native for a pool of live indexers.
type GetIndexersRequest struct {
Count int `json:"count"`
From string `json:"from"`
}
// GetIndexersResponse is returned by the native with live indexer multiaddrs.
type GetIndexersResponse struct {
Indexers []string `json:"indexers"`
IsSelfFallback bool `json:"is_self_fallback,omitempty"`
}
var StaticNatives = map[string]*pp.AddrInfo{}
var StreamNativeMu sync.RWMutex
var StreamNatives ProtocolStream = ProtocolStream{}
// nativeHeartbeatOnce ensures we start exactly one long-lived heartbeat goroutine
// toward the native mesh, even when ConnectToNatives is called from recovery paths.
var nativeHeartbeatOnce sync.Once
// nativeMeshHeartbeatOnce guards the native-to-native heartbeat goroutine started
// by EnsureNativePeers so only one goroutine covers the whole StaticNatives map.
var nativeMeshHeartbeatOnce sync.Once
// ConnectToNatives is the initial setup for nodes/indexers in native mode:
// 1. Parses native addresses → StaticNatives.
// 2. Starts a single long-lived heartbeat goroutine toward the native mesh.
// 3. Fetches an initial indexer pool from the first responsive native.
// 4. Runs consensus when real (non-fallback) indexers are returned.
// 5. Replaces StaticIndexers with the confirmed pool.
func ConnectToNatives(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) error {
logger := oclib.GetLogger()
logger.Info().Msg("[native] step 1 — parsing native addresses")
// Parse native addresses — safe to call multiple times.
StreamNativeMu.Lock()
orderedAddrs := []string{}
for _, addr := range strings.Split(conf.GetConfig().NativeIndexerAddresses, ",") {
addr = strings.TrimSpace(addr)
if addr == "" {
continue
}
ad, err := pp.AddrInfoFromString(addr)
if err != nil {
logger.Err(err).Msg("[native] step 1 — invalid native addr")
continue
}
StaticNatives[addr] = ad
orderedAddrs = append(orderedAddrs, addr)
logger.Info().Str("addr", addr).Msg("[native] step 1 — native registered")
}
if len(StaticNatives) == 0 {
StreamNativeMu.Unlock()
return errors.New("no valid native addresses configured")
}
StreamNativeMu.Unlock()
logger.Info().Int("count", len(orderedAddrs)).Msg("[native] step 1 — natives parsed")
// Step 1: one long-lived heartbeat to each native.
nativeHeartbeatOnce.Do(func() {
logger.Info().Msg("[native] step 1 — starting long-lived heartbeat to native mesh")
SendHeartbeat(context.Background(), ProtocolHeartbeat,
conf.GetConfig().Name, h, StreamNatives, StaticNatives, &StreamNativeMu, 20*time.Second)
})
// Fetch initial pool from the first responsive native.
logger.Info().Int("want", maxIndexer).Msg("[native] step 1 — fetching indexer pool from native")
candidates, isFallback := fetchIndexersFromNative(h, orderedAddrs, maxIndexer)
if len(candidates) == 0 {
logger.Warn().Msg("[native] step 1 — no candidates returned by any native")
if minIndexer > 0 {
return errors.New("ConnectToNatives: no indexers available from any native")
}
return nil
}
logger.Info().Int("candidates", len(candidates)).Bool("fallback", isFallback).Msg("[native] step 1 — pool received")
// Step 2: populate StaticIndexers — consensus for real indexers, direct for fallback.
pool := resolvePool(h, candidates, isFallback, maxIndexer)
replaceStaticIndexers(pool)
StreamMuIndexes.RLock()
indexerCount := len(StaticIndexers)
StreamMuIndexes.RUnlock()
logger.Info().Int("pool_size", indexerCount).Msg("[native] step 2 — StaticIndexers replaced")
if minIndexer > 0 && indexerCount < minIndexer {
return errors.New("not enough majority-confirmed indexers available")
}
return nil
}
// replenishIndexersFromNative is called when an indexer heartbeat fails (step 3→4).
// It asks the native for exactly `need` replacement indexers, runs consensus when
// real indexers are returned, and adds the results to StaticIndexers without
// clearing the existing pool.
func replenishIndexersFromNative(h host.Host, need int) {
if need <= 0 {
return
}
logger := oclib.GetLogger()
logger.Info().Int("need", need).Msg("[native] step 4 — replenishing indexer pool from native")
StreamNativeMu.RLock()
addrs := make([]string, 0, len(StaticNatives))
for addr := range StaticNatives {
addrs = append(addrs, addr)
}
StreamNativeMu.RUnlock()
candidates, isFallback := fetchIndexersFromNative(h, addrs, need)
if len(candidates) == 0 {
logger.Warn().Msg("[native] step 4 — no candidates returned by any native")
return
}
logger.Info().Int("candidates", len(candidates)).Bool("fallback", isFallback).Msg("[native] step 4 — candidates received")
pool := resolvePool(h, candidates, isFallback, need)
if len(pool) == 0 {
logger.Warn().Msg("[native] step 4 — consensus yielded no confirmed indexers")
return
}
// Add new indexers to the pool — do NOT clear existing ones.
StreamMuIndexes.Lock()
for addr, ad := range pool {
StaticIndexers[addr] = ad
}
total := len(StaticIndexers)
StreamMuIndexes.Unlock()
logger.Info().Int("added", len(pool)).Int("total", total).Msg("[native] step 4 — pool replenished")
// Nudge the heartbeat goroutine to connect immediately instead of waiting
// for the next 20s tick.
NudgeIndexerHeartbeat()
logger.Info().Msg("[native] step 4 — heartbeat goroutine nudged")
}
// fetchIndexersFromNative opens a ProtocolNativeGetIndexers stream to the first
// responsive native and returns the candidate list and fallback flag.
func fetchIndexersFromNative(h host.Host, nativeAddrs []string, count int) (candidates []string, isFallback bool) {
logger := oclib.GetLogger()
for _, addr := range nativeAddrs {
ad, err := pp.AddrInfoFromString(addr)
if err != nil {
logger.Warn().Str("addr", addr).Msg("[native] fetch — skipping invalid addr")
continue
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := h.Connect(ctx, *ad); err != nil {
cancel()
logger.Warn().Str("addr", addr).Err(err).Msg("[native] fetch — connect failed")
continue
}
s, err := h.NewStream(ctx, ad.ID, ProtocolNativeGetIndexers)
cancel()
if err != nil {
logger.Warn().Str("addr", addr).Err(err).Msg("[native] fetch — stream open failed")
continue
}
req := GetIndexersRequest{Count: count, From: h.ID().String()}
if encErr := json.NewEncoder(s).Encode(req); encErr != nil {
s.Close()
logger.Warn().Str("addr", addr).Err(encErr).Msg("[native] fetch — encode request failed")
continue
}
var resp GetIndexersResponse
if decErr := json.NewDecoder(s).Decode(&resp); decErr != nil {
s.Close()
logger.Warn().Str("addr", addr).Err(decErr).Msg("[native] fetch — decode response failed")
continue
}
s.Close()
logger.Info().Str("native", addr).Int("indexers", len(resp.Indexers)).Bool("fallback", resp.IsSelfFallback).Msg("[native] fetch — response received")
return resp.Indexers, resp.IsSelfFallback
}
logger.Warn().Msg("[native] fetch — no native responded")
return nil, false
}
// resolvePool converts a candidate list to a validated addr→AddrInfo map.
// When isFallback is true the native itself is the indexer — no consensus needed.
// When isFallback is false, consensus is run before accepting the candidates.
func resolvePool(h host.Host, candidates []string, isFallback bool, maxIndexer int) map[string]*pp.AddrInfo {
logger := oclib.GetLogger()
if isFallback {
logger.Info().Strs("addrs", candidates).Msg("[native] resolve — fallback mode, skipping consensus")
pool := make(map[string]*pp.AddrInfo, len(candidates))
for _, addr := range candidates {
ad, err := pp.AddrInfoFromString(addr)
if err != nil {
continue
}
pool[addr] = ad
}
return pool
}
// Round 1.
logger.Info().Int("candidates", len(candidates)).Msg("[native] resolve — consensus round 1")
confirmed, suggestions := clientSideConsensus(h, candidates)
logger.Info().Int("confirmed", len(confirmed)).Int("suggestions", len(suggestions)).Msg("[native] resolve — consensus round 1 done")
// Round 2: fill gaps from suggestions if below target.
if len(confirmed) < maxIndexer && len(suggestions) > 0 {
rand.Shuffle(len(suggestions), func(i, j int) { suggestions[i], suggestions[j] = suggestions[j], suggestions[i] })
gap := maxIndexer - len(confirmed)
if gap > len(suggestions) {
gap = len(suggestions)
}
logger.Info().Int("gap", gap).Msg("[native] resolve — consensus round 2 (filling gaps)")
confirmed2, _ := clientSideConsensus(h, append(confirmed, suggestions[:gap]...))
if len(confirmed2) > 0 {
confirmed = confirmed2
}
logger.Info().Int("confirmed", len(confirmed)).Msg("[native] resolve — consensus round 2 done")
}
pool := make(map[string]*pp.AddrInfo, len(confirmed))
for _, addr := range confirmed {
ad, err := pp.AddrInfoFromString(addr)
if err != nil {
continue
}
pool[addr] = ad
}
logger.Info().Int("pool_size", len(pool)).Msg("[native] resolve — pool ready")
return pool
}
// replaceStaticIndexers atomically replaces the active indexer pool.
// Peers no longer in next have their heartbeat streams closed so the SendHeartbeat
// goroutine stops sending to them on the next tick.
func replaceStaticIndexers(next map[string]*pp.AddrInfo) {
StreamMuIndexes.Lock()
defer StreamMuIndexes.Unlock()
for addr, ad := range next {
StaticIndexers[addr] = ad
}
}
// clientSideConsensus challenges a candidate list to ALL configured native peers
// in parallel. Each native replies with the candidates it trusts plus extras it
// recommends. An indexer is confirmed when strictly more than 50% of responding
// natives trust it.
func clientSideConsensus(h host.Host, candidates []string) (confirmed []string, suggestions []string) {
if len(candidates) == 0 {
return nil, nil
}
StreamNativeMu.RLock()
peers := make([]*pp.AddrInfo, 0, len(StaticNatives))
for _, ad := range StaticNatives {
peers = append(peers, ad)
}
StreamNativeMu.RUnlock()
if len(peers) == 0 {
return candidates, nil
}
type nativeResult struct {
trusted []string
suggestions []string
responded bool
}
ch := make(chan nativeResult, len(peers))
for _, ad := range peers {
go func(ad *pp.AddrInfo) {
ctx, cancel := context.WithTimeout(context.Background(), consensusQueryTimeout)
defer cancel()
if err := h.Connect(ctx, *ad); err != nil {
ch <- nativeResult{}
return
}
s, err := h.NewStream(ctx, ad.ID, ProtocolNativeConsensus)
if err != nil {
ch <- nativeResult{}
return
}
defer s.Close()
if err := json.NewEncoder(s).Encode(ConsensusRequest{Candidates: candidates}); err != nil {
ch <- nativeResult{}
return
}
var resp ConsensusResponse
if err := json.NewDecoder(s).Decode(&resp); err != nil {
ch <- nativeResult{}
return
}
ch <- nativeResult{trusted: resp.Trusted, suggestions: resp.Suggestions, responded: true}
}(ad)
}
timer := time.NewTimer(consensusCollectTimeout)
defer timer.Stop()
trustedCounts := map[string]int{}
suggestionPool := map[string]struct{}{}
total := 0
collected := 0
collect:
for collected < len(peers) {
select {
case r := <-ch:
collected++
if !r.responded {
continue
}
total++
seen := map[string]struct{}{}
for _, addr := range r.trusted {
if _, already := seen[addr]; !already {
trustedCounts[addr]++
seen[addr] = struct{}{}
}
}
for _, addr := range r.suggestions {
suggestionPool[addr] = struct{}{}
}
case <-timer.C:
break collect
}
}
if total == 0 {
return candidates, nil
}
confirmedSet := map[string]struct{}{}
for addr, count := range trustedCounts {
if count*2 > total {
confirmed = append(confirmed, addr)
confirmedSet[addr] = struct{}{}
}
}
for addr := range suggestionPool {
if _, ok := confirmedSet[addr]; !ok {
suggestions = append(suggestions, addr)
}
}
return
}
// RegisterWithNative sends a one-shot registration to each configured native indexer.
// Should be called periodically every RecommendedHeartbeatInterval.
func RegisterWithNative(h host.Host, nativeAddressesStr string) {
logger := oclib.GetLogger()
myAddr := ""
if !strings.Contains(h.Addrs()[len(h.Addrs())-1].String(), "127.0.0.1") {
myAddr = h.Addrs()[len(h.Addrs())-1].String() + "/p2p/" + h.ID().String()
}
if myAddr == "" {
logger.Warn().Msg("RegisterWithNative: no routable address yet, skipping")
return
}
reg := IndexerRegistration{
PeerID: h.ID().String(),
Addr: myAddr,
}
for _, addr := range strings.Split(nativeAddressesStr, ",") {
addr = strings.TrimSpace(addr)
if addr == "" {
continue
}
ad, err := pp.AddrInfoFromString(addr)
if err != nil {
logger.Err(err).Msg("RegisterWithNative: invalid addr")
continue
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := h.Connect(ctx, *ad); err != nil {
cancel()
continue
}
s, err := h.NewStream(ctx, ad.ID, ProtocolNativeSubscription)
cancel()
if err != nil {
logger.Err(err).Msg("RegisterWithNative: stream open failed")
continue
}
if err := json.NewEncoder(s).Encode(reg); err != nil {
logger.Err(err).Msg("RegisterWithNative: encode failed")
}
s.Close()
}
}
// EnsureNativePeers populates StaticNatives from config and starts a single
// heartbeat goroutine toward the native mesh. Safe to call multiple times;
// the heartbeat goroutine is started at most once (nativeMeshHeartbeatOnce).
func EnsureNativePeers(h host.Host) {
logger := oclib.GetLogger()
nativeAddrs := conf.GetConfig().NativeIndexerAddresses
if nativeAddrs == "" {
return
}
StreamNativeMu.Lock()
for _, addr := range strings.Split(nativeAddrs, ",") {
addr = strings.TrimSpace(addr)
if addr == "" {
continue
}
ad, err := pp.AddrInfoFromString(addr)
if err != nil {
continue
}
StaticNatives[addr] = ad
logger.Info().Str("addr", addr).Msg("native: registered peer in native mesh")
}
StreamNativeMu.Unlock()
// One heartbeat goroutine iterates over all of StaticNatives on each tick;
// starting one per address would multiply heartbeats by the native count.
nativeMeshHeartbeatOnce.Do(func() {
logger.Info().Msg("native: starting mesh heartbeat goroutine")
SendHeartbeat(context.Background(), ProtocolHeartbeat,
conf.GetConfig().Name, h, StreamNatives, StaticNatives, &StreamNativeMu, 20*time.Second)
})
}
func StartNativeRegistration(h host.Host, nativeAddressesStr string) {
go func() {
// Poll until a routable (non-loopback) address is available before the first
// registration attempt. libp2p may not have discovered external addresses yet
// at startup. Cap at 12 retries (~1 minute) so we don't spin indefinitely.
for i := 0; i < 12; i++ {
hasRoutable := false
if !strings.Contains(h.Addrs()[len(h.Addrs())-1].String(), "127.0.0.1") {
hasRoutable = true
break
}
if hasRoutable {
break
}
time.Sleep(5 * time.Second)
}
RegisterWithNative(h, nativeAddressesStr)
t := time.NewTicker(RecommendedHeartbeatInterval)
defer t.Stop()
for range t.C {
RegisterWithNative(h, nativeAddressesStr)
}
}()
}
// ── Lost-native replacement ───────────────────────────────────────────────────
const (
// ProtocolNativeGetPeers lets a node/indexer ask a native for a random
// selection of that native's own native contacts (to replace a dead native).
ProtocolNativeGetPeers = "/opencloud/native/peers/1.0"
// ProtocolIndexerGetNatives lets nodes/indexers ask a connected indexer for
// its configured native addresses (fallback when no alive native responds).
ProtocolIndexerGetNatives = "/opencloud/indexer/natives/1.0"
// retryNativeInterval is how often retryLostNative polls a dead native.
retryNativeInterval = 30 * time.Second
)
// GetNativePeersRequest is sent to a native to ask for its known native contacts.
type GetNativePeersRequest struct {
Exclude []string `json:"exclude"`
Count int `json:"count"`
}
// GetNativePeersResponse carries native addresses returned by a native's peer list.
type GetNativePeersResponse struct {
Peers []string `json:"peers"`
}
// GetIndexerNativesRequest is sent to an indexer to ask for its configured native addresses.
type GetIndexerNativesRequest struct {
Exclude []string `json:"exclude"`
}
// GetIndexerNativesResponse carries native addresses returned by an indexer.
type GetIndexerNativesResponse struct {
Natives []string `json:"natives"`
}
// nativeHeartbeatNudge allows replenishNativesFromPeers to trigger an immediate
// native heartbeat tick after adding a replacement native to the pool.
var nativeHeartbeatNudge = make(chan struct{}, 1)
// NudgeNativeHeartbeat signals the native heartbeat goroutine to fire immediately.
func NudgeNativeHeartbeat() {
select {
case nativeHeartbeatNudge <- struct{}{}:
default: // nudge already pending, skip
}
}
// replenishIndexersIfNeeded checks if the indexer pool is below the configured
// minimum (or empty) and, if so, asks the native mesh for replacements.
// Called whenever a native is recovered so the indexer pool is restored.
func replenishIndexersIfNeeded(h host.Host) {
logger := oclib.GetLogger()
minIdx := conf.GetConfig().MinIndexer
if minIdx < 1 {
minIdx = 1
}
StreamMuIndexes.RLock()
indexerCount := len(StaticIndexers)
StreamMuIndexes.RUnlock()
if indexerCount < minIdx {
need := minIdx - indexerCount
logger.Info().Int("need", need).Int("current", indexerCount).Msg("[native] native recovered — replenishing indexer pool")
go replenishIndexersFromNative(h, need)
}
}
// replenishNativesFromPeers is called when the heartbeat to a native fails.
// Flow:
// 1. Ask other alive natives for one of their native contacts (ProtocolNativeGetPeers).
// 2. If none respond or return a new address, ask connected indexers (ProtocolIndexerGetNatives).
// 3. If no replacement found:
// - remaining > 1 → ignore (enough natives remain).
// - remaining ≤ 1 → start periodic retry (retryLostNative).
func replenishNativesFromPeers(h host.Host, lostAddr string, proto protocol.ID) {
if lostAddr == "" {
return
}
logger := oclib.GetLogger()
logger.Info().Str("lost", lostAddr).Msg("[native] replenish natives — start")
// Build exclude list: the lost addr + all currently alive natives.
// lostAddr has already been removed from StaticNatives by doTick.
StreamNativeMu.RLock()
remaining := len(StaticNatives)
exclude := make([]string, 0, remaining+1)
exclude = append(exclude, lostAddr)
for addr := range StaticNatives {
exclude = append(exclude, addr)
}
StreamNativeMu.RUnlock()
logger.Info().Int("remaining", remaining).Msg("[native] replenish natives — step 1: ask alive natives for a peer")
// Step 1: ask other alive natives for a replacement.
newAddr := fetchNativeFromNatives(h, exclude)
// Step 2: fallback — ask connected indexers for their native addresses.
if newAddr == "" {
logger.Info().Msg("[native] replenish natives — step 2: ask indexers for their native addresses")
newAddr = fetchNativeFromIndexers(h, exclude)
}
if newAddr != "" {
ad, err := pp.AddrInfoFromString(newAddr)
if err == nil {
StreamNativeMu.Lock()
StaticNatives[newAddr] = ad
StreamNativeMu.Unlock()
logger.Info().Str("new", newAddr).Msg("[native] replenish natives — replacement added, nudging heartbeat")
NudgeNativeHeartbeat()
replenishIndexersIfNeeded(h)
return
}
}
// Step 3: no replacement found.
logger.Warn().Int("remaining", remaining).Msg("[native] replenish natives — no replacement found")
if remaining > 1 {
logger.Info().Msg("[native] replenish natives — enough natives remain, ignoring loss")
return
}
// Last (or only) native — retry periodically.
logger.Info().Str("addr", lostAddr).Msg("[native] replenish natives — last native lost, starting periodic retry")
go retryLostNative(h, lostAddr, proto)
}
// fetchNativeFromNatives asks each alive native for one of its own native contacts
// not in exclude. Returns the first new address found or "" if none.
func fetchNativeFromNatives(h host.Host, exclude []string) string {
logger := oclib.GetLogger()
excludeSet := make(map[string]struct{}, len(exclude))
for _, e := range exclude {
excludeSet[e] = struct{}{}
}
StreamNativeMu.RLock()
natives := make([]*pp.AddrInfo, 0, len(StaticNatives))
for _, ad := range StaticNatives {
natives = append(natives, ad)
}
StreamNativeMu.RUnlock()
rand.Shuffle(len(natives), func(i, j int) { natives[i], natives[j] = natives[j], natives[i] })
for _, ad := range natives {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := h.Connect(ctx, *ad); err != nil {
cancel()
logger.Warn().Str("native", ad.ID.String()).Err(err).Msg("[native] fetch native peers — connect failed")
continue
}
s, err := h.NewStream(ctx, ad.ID, ProtocolNativeGetPeers)
cancel()
if err != nil {
logger.Warn().Str("native", ad.ID.String()).Err(err).Msg("[native] fetch native peers — stream failed")
continue
}
req := GetNativePeersRequest{Exclude: exclude, Count: 1}
if encErr := json.NewEncoder(s).Encode(req); encErr != nil {
s.Close()
continue
}
var resp GetNativePeersResponse
if decErr := json.NewDecoder(s).Decode(&resp); decErr != nil {
s.Close()
continue
}
s.Close()
for _, peer := range resp.Peers {
if _, excluded := excludeSet[peer]; !excluded && peer != "" {
logger.Info().Str("from", ad.ID.String()).Str("new", peer).Msg("[native] fetch native peers — got replacement")
return peer
}
}
logger.Debug().Str("native", ad.ID.String()).Msg("[native] fetch native peers — no new native from this peer")
}
return ""
}
// fetchNativeFromIndexers asks connected indexers for their configured native addresses,
// returning the first one not in exclude.
func fetchNativeFromIndexers(h host.Host, exclude []string) string {
logger := oclib.GetLogger()
excludeSet := make(map[string]struct{}, len(exclude))
for _, e := range exclude {
excludeSet[e] = struct{}{}
}
StreamMuIndexes.RLock()
indexers := make([]*pp.AddrInfo, 0, len(StaticIndexers))
for _, ad := range StaticIndexers {
indexers = append(indexers, ad)
}
StreamMuIndexes.RUnlock()
rand.Shuffle(len(indexers), func(i, j int) { indexers[i], indexers[j] = indexers[j], indexers[i] })
for _, ad := range indexers {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := h.Connect(ctx, *ad); err != nil {
cancel()
continue
}
s, err := h.NewStream(ctx, ad.ID, ProtocolIndexerGetNatives)
cancel()
if err != nil {
logger.Warn().Str("indexer", ad.ID.String()).Err(err).Msg("[native] fetch indexer natives — stream failed")
continue
}
req := GetIndexerNativesRequest{Exclude: exclude}
if encErr := json.NewEncoder(s).Encode(req); encErr != nil {
s.Close()
continue
}
var resp GetIndexerNativesResponse
if decErr := json.NewDecoder(s).Decode(&resp); decErr != nil {
s.Close()
continue
}
s.Close()
for _, nativeAddr := range resp.Natives {
if _, excluded := excludeSet[nativeAddr]; !excluded && nativeAddr != "" {
logger.Info().Str("indexer", ad.ID.String()).Str("native", nativeAddr).Msg("[native] fetch indexer natives — got native")
return nativeAddr
}
}
}
logger.Warn().Msg("[native] fetch indexer natives — no native found from indexers")
return ""
}
// retryLostNative periodically retries connecting to a lost native address until
// it becomes reachable again or was already restored by another path.
func retryLostNative(h host.Host, addr string, nativeProto protocol.ID) {
logger := oclib.GetLogger()
logger.Info().Str("addr", addr).Msg("[native] retry — periodic retry for lost native started")
t := time.NewTicker(retryNativeInterval)
defer t.Stop()
for range t.C {
StreamNativeMu.RLock()
_, alreadyRestored := StaticNatives[addr]
StreamNativeMu.RUnlock()
if alreadyRestored {
logger.Info().Str("addr", addr).Msg("[native] retry — native already restored, stopping retry")
return
}
ad, err := pp.AddrInfoFromString(addr)
if err != nil {
logger.Warn().Str("addr", addr).Msg("[native] retry — invalid addr, stopping retry")
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err = h.Connect(ctx, *ad)
cancel()
if err != nil {
logger.Warn().Str("addr", addr).Msg("[native] retry — still unreachable")
continue
}
// Reachable again — add back to pool.
StreamNativeMu.Lock()
StaticNatives[addr] = ad
StreamNativeMu.Unlock()
logger.Info().Str("addr", addr).Msg("[native] retry — native reconnected and added back to pool")
NudgeNativeHeartbeat()
replenishIndexersIfNeeded(h)
if nativeProto == ProtocolNativeGetIndexers {
StartNativeRegistration(h, addr) // register back
}
return
}
}