From 3eae5791a14975aba7c5321baf5eecca67c4cb98 Mon Sep 17 00:00:00 2001 From: mr Date: Fri, 20 Feb 2026 12:42:18 +0100 Subject: [PATCH] Native Indexer Mode --- conf/config.go | 3 +- daemons/node/common/common_pubsub.go | 6 +- daemons/node/common/common_stream.go | 16 +- daemons/node/common/native_stream.go | 364 +++++++++++++++++++++++ daemons/node/indexer/native.go | 416 +++++++++++++++++++++++++++ daemons/node/indexer/service.go | 45 +-- daemons/node/node.go | 4 +- 7 files changed, 827 insertions(+), 27 deletions(-) create mode 100644 daemons/node/common/native_stream.go create mode 100644 daemons/node/indexer/native.go diff --git a/conf/config.go b/conf/config.go index 875f1da..490d66d 100644 --- a/conf/config.go +++ b/conf/config.go @@ -9,7 +9,8 @@ type Config struct { PublicKeyPath string PrivateKeyPath string NodeEndpointPort int64 - IndexerAddresses string + IndexerAddresses string + NativeIndexerAddresses string // multiaddrs of native indexers, comma-separated; bypasses IndexerAddresses when set PeerIDS string // TO REMOVE diff --git a/daemons/node/common/common_pubsub.go b/daemons/node/common/common_pubsub.go index eecfcff..11f5941 100644 --- a/daemons/node/common/common_pubsub.go +++ b/daemons/node/common/common_pubsub.go @@ -73,7 +73,11 @@ func (event *Event) Verify(p *peer.Peer) error { if p.Relation == peer.BLACKLIST { // if peer is blacklisted... quit... return errors.New("peer is blacklisted") } - pubKey, err := PubKeyFromString(p.PublicKey) // extract pubkey from pubkey str + return event.VerifySignature(p.PublicKey) +} + +func (event *Event) VerifySignature(pk string) error { + pubKey, err := PubKeyFromString(pk) // extract pubkey from pubkey str if err != nil { return errors.New("pubkey is malformed") } diff --git a/daemons/node/common/common_stream.go b/daemons/node/common/common_stream.go index 6d6e8a0..42b4f64 100644 --- a/daemons/node/common/common_stream.go +++ b/daemons/node/common/common_stream.go @@ -25,18 +25,16 @@ import ( type LongLivedStreamRecordedService[T interface{}] struct { *LongLivedPubSubService - StreamRecords map[protocol.ID]map[pp.ID]*StreamRecord[T] - StreamMU sync.RWMutex - maxNodesConn int - isBidirectionnal bool + StreamRecords map[protocol.ID]map[pp.ID]*StreamRecord[T] + StreamMU sync.RWMutex + maxNodesConn int } -func NewStreamRecordedService[T interface{}](h host.Host, maxNodesConn int, isBidirectionnal bool) *LongLivedStreamRecordedService[T] { +func NewStreamRecordedService[T interface{}](h host.Host, maxNodesConn int) *LongLivedStreamRecordedService[T] { service := &LongLivedStreamRecordedService[T]{ LongLivedPubSubService: NewLongLivedPubSubService(h), StreamRecords: map[protocol.ID]map[pp.ID]*StreamRecord[T]{}, maxNodesConn: maxNodesConn, - isBidirectionnal: isBidirectionnal, } go service.StartGC(30 * time.Second) // Garbage collection is needed on every Map of Long-Lived Stream... it may be a top level redesigned @@ -350,6 +348,12 @@ var StreamIndexers ProtocolStream = ProtocolStream{} func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) error { TimeWatcher = time.Now().UTC() logger := oclib.GetLogger() + + // If native addresses are configured, bypass static indexer addresses + if conf.GetConfig().NativeIndexerAddresses != "" { + return ConnectToNatives(h, minIndexer, maxIndexer, myPID) + } + addresses := strings.Split(conf.GetConfig().IndexerAddresses, ",") if len(addresses) > maxIndexer { diff --git a/daemons/node/common/native_stream.go b/daemons/node/common/native_stream.go new file mode 100644 index 0000000..be8c94f --- /dev/null +++ b/daemons/node/common/native_stream.go @@ -0,0 +1,364 @@ +package common + +import ( + "context" + "encoding/json" + "errors" + "math/rand" + "oc-discovery/conf" + "strings" + "sync" + "time" + + oclib "cloud.o-forge.io/core/oc-lib" + "github.com/libp2p/go-libp2p/core/host" + pp "github.com/libp2p/go-libp2p/core/peer" +) + +const ( + ProtocolNativeSubscription = "/opencloud/native/subscribe/1.0" + ProtocolNativeGetIndexers = "/opencloud/native/indexers/1.0" + // ProtocolNativeConsensus is used by nodes/indexers to cross-validate an indexer + // pool against all configured native peers. + ProtocolNativeConsensus = "/opencloud/native/consensus/1.0" + RecommendedHeartbeatInterval = 60 * time.Second + + // TopicIndexerRegistry is the PubSub topic used by native indexers to gossip + // newly registered indexer PeerIDs to neighbouring natives. + TopicIndexerRegistry = "oc-indexer-registry" + + // consensusQueryTimeout is the per-native timeout for a consensus query. + consensusQueryTimeout = 3 * time.Second + // consensusCollectTimeout is the total wait for all native responses. + consensusCollectTimeout = 4 * time.Second +) + +// ConsensusRequest is sent by a node/indexer to a native to validate a candidate +// indexer list. The native replies with what it trusts and what it suggests instead. +type ConsensusRequest struct { + Candidates []string `json:"candidates"` +} + +// ConsensusResponse is returned by a native during a consensus challenge. +// Trusted = candidates the native considers alive. +// Suggestions = extras the native knows and trusts but that were not in the candidate list. +type ConsensusResponse struct { + Trusted []string `json:"trusted"` + Suggestions []string `json:"suggestions,omitempty"` +} + +// IndexerRegistration is sent by an indexer to a native to signal its alive state. +// Only Addr is required; PeerID is derived from it if omitted. +type IndexerRegistration struct { + PeerID string `json:"peer_id,omitempty"` + Addr string `json:"addr"` +} + +// GetIndexersRequest asks a native for a pool of live indexers. +type GetIndexersRequest struct { + Count int `json:"count"` +} + +// GetIndexersResponse is returned by the native with live indexer multiaddrs. +type GetIndexersResponse struct { + Indexers []string `json:"indexers"` + IsSelfFallback bool `json:"is_self_fallback,omitempty"` +} + +var StaticNatives = map[string]*pp.AddrInfo{} +var StreamNativeMu sync.RWMutex +var StreamNatives ProtocolStream = ProtocolStream{} + +// ConnectToNatives is the client-side entry point for nodes/indexers that have +// NativeIndexerAddresses configured. It: +// 1. Connects (long-lived heartbeat) to all configured natives. +// 2. Fetches an initial indexer pool from the FIRST responsive native. +// 3. Challenges that pool to ALL natives (consensus round 1). +// 4. If the confirmed list is short, samples native suggestions and re-challenges (round 2). +// 5. Populates StaticIndexers with majority-confirmed indexers. +func ConnectToNatives(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) error { + logger := oclib.GetLogger() + + // Parse in config order: the first entry is the primary pool source. + orderedAddrs := []string{} + for _, addr := range strings.Split(conf.GetConfig().NativeIndexerAddresses, ",") { + addr = strings.TrimSpace(addr) + if addr == "" { + continue + } + ad, err := pp.AddrInfoFromString(addr) + if err != nil { + logger.Err(err).Msg("ConnectToNatives: invalid addr") + continue + } + StaticNatives[addr] = ad + orderedAddrs = append(orderedAddrs, addr) + } + if len(StaticNatives) == 0 { + return errors.New("no valid native addresses configured") + } + + // Long-lived heartbeat connections to keep the native mesh active. + SendHeartbeat(context.Background(), ProtocolHeartbeat, + conf.GetConfig().Name, h, StreamNatives, StaticNatives, 20*time.Second) + + // Step 1: get an initial pool from the FIRST responsive native (in config order). + var candidates []string + var isFallback bool + for _, addr := range orderedAddrs { + ad := StaticNatives[addr] + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := h.Connect(ctx, *ad); err != nil { + cancel() + continue + } + s, err := h.NewStream(ctx, ad.ID, ProtocolNativeGetIndexers) + cancel() + if err != nil { + continue + } + req := GetIndexersRequest{Count: maxIndexer} + if encErr := json.NewEncoder(s).Encode(req); encErr != nil { + s.Close() + continue + } + var resp GetIndexersResponse + if decErr := json.NewDecoder(s).Decode(&resp); decErr != nil { + s.Close() + continue + } + s.Close() + candidates = resp.Indexers + isFallback = resp.IsSelfFallback + break // first responsive native only + } + + if len(candidates) == 0 { + if minIndexer > 0 { + return errors.New("ConnectToNatives: no indexers available from any native") + } + return nil + } + + // If the native is already the fallback indexer, use it directly — no consensus needed. + if isFallback { + for _, addr := range candidates { + ad, err := pp.AddrInfoFromString(addr) + if err != nil { + continue + } + StaticIndexers[addr] = ad + } + return nil + } + + // Step 2: challenge the pool to ALL configured natives and score by majority vote. + confirmed, suggestions := clientSideConsensus(h, candidates) + + // Step 3: if we still have gaps, sample from suggestions and re-challenge. + if len(confirmed) < maxIndexer && len(suggestions) > 0 { + rand.Shuffle(len(suggestions), func(i, j int) { suggestions[i], suggestions[j] = suggestions[j], suggestions[i] }) + gap := maxIndexer - len(confirmed) + if gap > len(suggestions) { + gap = len(suggestions) + } + confirmed2, _ := clientSideConsensus(h, append(confirmed, suggestions[:gap]...)) + if len(confirmed2) > 0 { + confirmed = confirmed2 + } + } + + // Step 4: populate StaticIndexers with confirmed addresses. + for _, addr := range confirmed { + ad, err := pp.AddrInfoFromString(addr) + if err != nil { + continue + } + StaticIndexers[addr] = ad + } + + if minIndexer > 0 && len(StaticIndexers) < minIndexer { + return errors.New("not enough majority-confirmed indexers available") + } + return nil +} + +// clientSideConsensus challenges a candidate list to ALL configured native peers +// in parallel. Each native replies with the candidates it trusts plus extras it +// recommends. An indexer is confirmed when strictly more than 50% of responding +// natives trust it. The remaining addresses from native suggestions are returned +// as suggestions for a possible second round. +func clientSideConsensus(h host.Host, candidates []string) (confirmed []string, suggestions []string) { + if len(candidates) == 0 { + return nil, nil + } + + StreamNativeMu.RLock() + peers := make([]*pp.AddrInfo, 0, len(StaticNatives)) + for _, ad := range StaticNatives { + peers = append(peers, ad) + } + StreamNativeMu.RUnlock() + + if len(peers) == 0 { + // No natives to challenge: trust candidates as-is. + return candidates, nil + } + + type nativeResult struct { + trusted []string + suggestions []string + responded bool + } + ch := make(chan nativeResult, len(peers)) + + for _, ad := range peers { + go func(ad *pp.AddrInfo) { + ctx, cancel := context.WithTimeout(context.Background(), consensusQueryTimeout) + defer cancel() + if err := h.Connect(ctx, *ad); err != nil { + ch <- nativeResult{} + return + } + s, err := h.NewStream(ctx, ad.ID, ProtocolNativeConsensus) + if err != nil { + ch <- nativeResult{} + return + } + defer s.Close() + if err := json.NewEncoder(s).Encode(ConsensusRequest{Candidates: candidates}); err != nil { + ch <- nativeResult{} + return + } + var resp ConsensusResponse + if err := json.NewDecoder(s).Decode(&resp); err != nil { + ch <- nativeResult{} + return + } + ch <- nativeResult{trusted: resp.Trusted, suggestions: resp.Suggestions, responded: true} + }(ad) + } + + // Collect responses up to consensusCollectTimeout. + timer := time.NewTimer(consensusCollectTimeout) + defer timer.Stop() + + trustedCounts := map[string]int{} + suggestionPool := map[string]struct{}{} + total := 0 // counts only natives that actually responded + collected := 0 + +collect: + for collected < len(peers) { + select { + case r := <-ch: + collected++ + if !r.responded { + continue // timeout / error: skip, do not count as vote + } + total++ + seen := map[string]struct{}{} + for _, addr := range r.trusted { + if _, already := seen[addr]; !already { + trustedCounts[addr]++ + seen[addr] = struct{}{} + } + } + for _, addr := range r.suggestions { + suggestionPool[addr] = struct{}{} + } + case <-timer.C: + break collect + } + } + + if total == 0 { + // No native responded: fall back to trusting the candidates as-is. + return candidates, nil + } + + confirmedSet := map[string]struct{}{} + for addr, count := range trustedCounts { + if count*2 > total { // strictly >50% + confirmed = append(confirmed, addr) + confirmedSet[addr] = struct{}{} + } + } + for addr := range suggestionPool { + if _, ok := confirmedSet[addr]; !ok { + suggestions = append(suggestions, addr) + } + } + return +} + +const ProtocolIndexerHeartbeat = "/opencloud/heartbeat/indexer/1.0" + +// RegisterWithNative sends a one-shot registration to each configured native indexer. +// Should be called periodically every RecommendedHeartbeatInterval. +func RegisterWithNative(h host.Host, nativeAddressesStr string) { + logger := oclib.GetLogger() + myAddr := "" + if len(h.Addrs()) > 0 { + myAddr = h.Addrs()[0].String() + "/p2p/" + h.ID().String() + } + reg := IndexerRegistration{ + PeerID: h.ID().String(), + Addr: myAddr, + } + for _, addr := range strings.Split(nativeAddressesStr, ",") { + addr = strings.TrimSpace(addr) + if addr == "" { + continue + } + ad, err := pp.AddrInfoFromString(addr) + if err != nil { + logger.Err(err).Msg("RegisterWithNative: invalid addr") + continue + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + if err := h.Connect(ctx, *ad); err != nil { + cancel() + continue + } + s, err := h.NewStream(ctx, ad.ID, ProtocolNativeSubscription) + cancel() + if err != nil { + logger.Err(err).Msg("RegisterWithNative: stream open failed") + continue + } + if err := json.NewEncoder(s).Encode(reg); err != nil { + logger.Err(err).Msg("RegisterWithNative: encode failed") + } + s.Close() + } +} + +// EnsureNativePeers populates StaticNatives from config and starts heartbeat +// connections to other natives. Safe to call multiple times; heartbeat is only +// started once (when StaticNatives transitions from empty to non-empty). +func EnsureNativePeers(h host.Host) { + nativeAddrs := conf.GetConfig().NativeIndexerAddresses + if nativeAddrs == "" { + return + } + StreamNativeMu.Lock() + wasEmpty := len(StaticNatives) == 0 + for _, addr := range strings.Split(nativeAddrs, ",") { + addr = strings.TrimSpace(addr) + if addr == "" { + continue + } + ad, err := pp.AddrInfoFromString(addr) + if err != nil { + continue + } + StaticNatives[addr] = ad + } + StreamNativeMu.Unlock() + + if wasEmpty && len(StaticNatives) > 0 { + SendHeartbeat(context.Background(), ProtocolIndexerHeartbeat, + conf.GetConfig().Name, h, StreamNatives, StaticNatives, 20*time.Second) + } +} diff --git a/daemons/node/indexer/native.go b/daemons/node/indexer/native.go new file mode 100644 index 0000000..8dabb0c --- /dev/null +++ b/daemons/node/indexer/native.go @@ -0,0 +1,416 @@ +package indexer + +import ( + "context" + "encoding/json" + "errors" + "math/rand" + "sync" + "time" + + "oc-discovery/daemons/node/common" + + oclib "cloud.o-forge.io/core/oc-lib" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + pp "github.com/libp2p/go-libp2p/core/peer" +) + +const ( + // IndexerTTL is 10% above the recommended 60s heartbeat interval. + IndexerTTL = 66 * time.Second + // offloadInterval is how often the native checks if it can release responsible peers. + offloadInterval = 30 * time.Second + // dhtRefreshInterval is how often the background goroutine queries the DHT for + // known-but-expired indexer entries (written by neighbouring natives). + dhtRefreshInterval = 30 * time.Second +) + +// liveIndexerEntry tracks a registered indexer in the native's in-memory cache and DHT. +type liveIndexerEntry struct { + PeerID string `json:"peer_id"` + Addr string `json:"addr"` + ExpiresAt time.Time `json:"expires_at"` +} + +// NativeState holds runtime state specific to native indexer operation. +type NativeState struct { + liveIndexers map[string]*liveIndexerEntry // keyed by PeerID, local cache with TTL + liveIndexersMu sync.RWMutex + responsiblePeers map[pp.ID]struct{} // peers for which the native is fallback indexer + responsibleMu sync.RWMutex + // knownPeerIDs accumulates all indexer PeerIDs ever seen (local stream or gossip). + // Used by refreshIndexersFromDHT to re-hydrate expired entries from the shared DHT, + // including entries written by other natives. + knownPeerIDs map[string]struct{} + knownMu sync.RWMutex +} + +func newNativeState() *NativeState { + return &NativeState{ + liveIndexers: map[string]*liveIndexerEntry{}, + responsiblePeers: map[pp.ID]struct{}{}, + knownPeerIDs: map[string]struct{}{}, + } +} + +// IndexerRecordValidator validates indexer DHT entries under the "indexer" namespace. +type IndexerRecordValidator struct{} + +func (v IndexerRecordValidator) Validate(_ string, value []byte) error { + var e liveIndexerEntry + if err := json.Unmarshal(value, &e); err != nil { + return err + } + if e.Addr == "" { + return errors.New("missing addr") + } + if e.ExpiresAt.Before(time.Now().UTC()) { + return errors.New("expired indexer record") + } + return nil +} + +func (v IndexerRecordValidator) Select(_ string, values [][]byte) (int, error) { + var newest time.Time + index := 0 + for i, val := range values { + var e liveIndexerEntry + if err := json.Unmarshal(val, &e); err != nil { + continue + } + if e.ExpiresAt.After(newest) { + newest = e.ExpiresAt + index = i + } + } + return index, nil +} + +// InitNative registers native-specific stream handlers and starts background loops. +// Must be called after DHT is initialized. +func (ix *IndexerService) InitNative() { + ix.Native = newNativeState() + ix.Host.SetStreamHandler(common.ProtocolIndexerHeartbeat, ix.HandleNodeHeartbeat) // specific heartbeat for Indexer. + ix.Host.SetStreamHandler(common.ProtocolNativeSubscription, ix.handleNativeSubscription) + ix.Host.SetStreamHandler(common.ProtocolNativeGetIndexers, ix.handleNativeGetIndexers) + ix.Host.SetStreamHandler(common.ProtocolNativeConsensus, ix.handleNativeConsensus) + ix.subscribeIndexerRegistry() + // Ensure long connections to other configured natives (native-to-native mesh). + common.EnsureNativePeers(ix.Host) + go ix.runOffloadLoop() + go ix.refreshIndexersFromDHT() +} + +// subscribeIndexerRegistry joins the PubSub topic used by natives to gossip newly +// registered indexer PeerIDs to one another, enabling cross-native DHT discovery. +func (ix *IndexerService) subscribeIndexerRegistry() { + logger := oclib.GetLogger() + ix.PS.RegisterTopicValidator(common.TopicIndexerRegistry, func(_ context.Context, _ pp.ID, _ *pubsub.Message) bool { + return true + }) + topic, err := ix.PS.Join(common.TopicIndexerRegistry) + if err != nil { + logger.Err(err).Msg("native: failed to join indexer registry topic") + return + } + sub, err := topic.Subscribe() + if err != nil { + logger.Err(err).Msg("native: failed to subscribe to indexer registry topic") + return + } + ix.PubsubMu.Lock() + ix.LongLivedPubSubs[common.TopicIndexerRegistry] = topic + ix.PubsubMu.Unlock() + + go func() { + for { + msg, err := sub.Next(context.Background()) + if err != nil { + return + } + peerID := string(msg.Data) + if peerID == "" { + continue + } + // A neighbouring native registered this PeerID; add to known set for DHT refresh. + ix.Native.knownMu.Lock() + ix.Native.knownPeerIDs[peerID] = struct{}{} + ix.Native.knownMu.Unlock() + } + }() +} + +// handleNativeSubscription stores an indexer's alive registration in the DHT cache. +// The stream is temporary: indexer sends one IndexerRegistration and closes. +func (ix *IndexerService) handleNativeSubscription(s network.Stream) { + defer s.Close() + logger := oclib.GetLogger() + + var reg common.IndexerRegistration + if err := json.NewDecoder(s).Decode(®); err != nil { + logger.Err(err).Msg("native subscription: decode") + return + } + if reg.Addr == "" { + logger.Error().Msg("native subscription: missing addr") + return + } + if reg.PeerID == "" { + ad, err := pp.AddrInfoFromString(reg.Addr) + if err != nil { + logger.Err(err).Msg("native subscription: invalid addr") + return + } + reg.PeerID = ad.ID.String() + } + + expiry := time.Now().UTC().Add(IndexerTTL) + entry := &liveIndexerEntry{ + PeerID: reg.PeerID, + Addr: reg.Addr, + ExpiresAt: expiry, + } + + // Persist in DHT with 66s TTL. + key := ix.genIndexerKey(reg.PeerID) + if data, err := json.Marshal(entry); err == nil { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + if err := ix.DHT.PutValue(ctx, key, data); err != nil { + logger.Err(err).Msg("native subscription: DHT put") + } + cancel() + } + + // Update local cache and known set. + ix.Native.liveIndexersMu.Lock() + ix.Native.liveIndexers[reg.PeerID] = entry + ix.Native.liveIndexersMu.Unlock() + + ix.Native.knownMu.Lock() + ix.Native.knownPeerIDs[reg.PeerID] = struct{}{} + ix.Native.knownMu.Unlock() + + // Gossip PeerID to neighbouring natives so they discover it via DHT. + ix.PubsubMu.RLock() + topic := ix.LongLivedPubSubs[common.TopicIndexerRegistry] + ix.PubsubMu.RUnlock() + if topic != nil { + if err := topic.Publish(context.Background(), []byte(reg.PeerID)); err != nil { + logger.Err(err).Msg("native subscription: registry gossip publish") + } + } + + logger.Info().Str("peer", reg.PeerID).Msg("native: indexer registered") +} + +// handleNativeGetIndexers returns this native's own list of reachable indexers. +// If none are available, it self-delegates (becomes the fallback indexer for the caller). +// The consensus across natives is the responsibility of the requesting node/indexer. +func (ix *IndexerService) handleNativeGetIndexers(s network.Stream) { + defer s.Close() + logger := oclib.GetLogger() + + var req common.GetIndexersRequest + if err := json.NewDecoder(s).Decode(&req); err != nil { + logger.Err(err).Msg("native get indexers: decode") + return + } + if req.Count <= 0 { + req.Count = 3 + } + + reachable := ix.reachableLiveIndexers() + var resp common.GetIndexersResponse + + if len(reachable) == 0 { + // No indexers known: become temporary fallback for this caller. + ix.selfDelegate(s.Conn().RemotePeer(), &resp) + logger.Info().Str("peer", s.Conn().RemotePeer().String()).Msg("native: no indexers, acting as fallback") + } else { + rand.Shuffle(len(reachable), func(i, j int) { reachable[i], reachable[j] = reachable[j], reachable[i] }) + if req.Count > len(reachable) { + req.Count = len(reachable) + } + resp.Indexers = reachable[:req.Count] + } + + if err := json.NewEncoder(s).Encode(resp); err != nil { + logger.Err(err).Msg("native get indexers: encode response") + } +} + +// handleNativeConsensus answers a consensus challenge from a node/indexer. +// It returns: +// - Trusted: which of the candidates it considers alive. +// - Suggestions: extras it knows and trusts that were not in the candidate list. +func (ix *IndexerService) handleNativeConsensus(s network.Stream) { + defer s.Close() + logger := oclib.GetLogger() + + var req common.ConsensusRequest + if err := json.NewDecoder(s).Decode(&req); err != nil { + logger.Err(err).Msg("native consensus: decode") + return + } + + myList := ix.reachableLiveIndexers() + mySet := make(map[string]struct{}, len(myList)) + for _, addr := range myList { + mySet[addr] = struct{}{} + } + + trusted := []string{} + candidateSet := make(map[string]struct{}, len(req.Candidates)) + for _, addr := range req.Candidates { + candidateSet[addr] = struct{}{} + if _, ok := mySet[addr]; ok { + trusted = append(trusted, addr) // candidate we also confirm as reachable + } + } + + // Extras we trust but that the requester didn't include → suggestions. + suggestions := []string{} + for _, addr := range myList { + if _, inCandidates := candidateSet[addr]; !inCandidates { + suggestions = append(suggestions, addr) + } + } + + resp := common.ConsensusResponse{Trusted: trusted, Suggestions: suggestions} + if err := json.NewEncoder(s).Encode(resp); err != nil { + logger.Err(err).Msg("native consensus: encode response") + } +} + +// selfDelegate marks the caller as a responsible peer and exposes this native's own +// address as its temporary indexer. +func (ix *IndexerService) selfDelegate(remotePeer pp.ID, resp *common.GetIndexersResponse) { + ix.Native.responsibleMu.Lock() + ix.Native.responsiblePeers[remotePeer] = struct{}{} + ix.Native.responsibleMu.Unlock() + resp.IsSelfFallback = true + for _, a := range ix.Host.Addrs() { + resp.Indexers = []string{a.String() + "/p2p/" + ix.Host.ID().String()} + break + } +} + +// reachableLiveIndexers returns the multiaddrs of non-expired, pingable indexers +// from the local cache (kept fresh by refreshIndexersFromDHT in background). +func (ix *IndexerService) reachableLiveIndexers() []string { + ix.Native.liveIndexersMu.RLock() + now := time.Now().UTC() + candidates := []*liveIndexerEntry{} + for _, e := range ix.Native.liveIndexers { + if e.ExpiresAt.After(now) { + candidates = append(candidates, e) + } + } + ix.Native.liveIndexersMu.RUnlock() + + reachable := []string{} + for _, e := range candidates { + ad, err := pp.AddrInfoFromString(e.Addr) + if err != nil { + continue + } + if common.PeerIsAlive(ix.Host, *ad) { + reachable = append(reachable, e.Addr) + } + } + return reachable +} + +// refreshIndexersFromDHT runs in background and queries the shared DHT for every known +// indexer PeerID whose local cache entry is missing or expired. This supplements the +// local cache with entries written by neighbouring natives. +func (ix *IndexerService) refreshIndexersFromDHT() { + t := time.NewTicker(dhtRefreshInterval) + defer t.Stop() + logger := oclib.GetLogger() + for range t.C { + ix.Native.knownMu.RLock() + peerIDs := make([]string, 0, len(ix.Native.knownPeerIDs)) + for pid := range ix.Native.knownPeerIDs { + peerIDs = append(peerIDs, pid) + } + ix.Native.knownMu.RUnlock() + + now := time.Now().UTC() + for _, pid := range peerIDs { + ix.Native.liveIndexersMu.RLock() + existing := ix.Native.liveIndexers[pid] + ix.Native.liveIndexersMu.RUnlock() + if existing != nil && existing.ExpiresAt.After(now) { + continue // still fresh in local cache + } + key := ix.genIndexerKey(pid) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ch, err := ix.DHT.SearchValue(ctx, key) + if err != nil { + cancel() + continue + } + var best *liveIndexerEntry + for b := range ch { + var e liveIndexerEntry + if err := json.Unmarshal(b, &e); err != nil { + continue + } + if e.ExpiresAt.After(time.Now().UTC()) { + if best == nil || e.ExpiresAt.After(best.ExpiresAt) { + best = &e + } + } + } + cancel() + if best != nil { + ix.Native.liveIndexersMu.Lock() + ix.Native.liveIndexers[best.PeerID] = best + ix.Native.liveIndexersMu.Unlock() + logger.Info().Str("peer", best.PeerID).Msg("native: refreshed indexer from DHT") + } + } + } +} + +func (ix *IndexerService) genIndexerKey(peerID string) string { + return "/indexer/" + peerID +} + +// runOffloadLoop periodically checks if real indexers are available and releases +// responsible peers so they can reconnect to actual indexers on their next attempt. +func (ix *IndexerService) runOffloadLoop() { + t := time.NewTicker(offloadInterval) + defer t.Stop() + logger := oclib.GetLogger() + for range t.C { + ix.Native.responsibleMu.RLock() + count := len(ix.Native.responsiblePeers) + ix.Native.responsibleMu.RUnlock() + if count == 0 { + continue + } + if len(ix.reachableLiveIndexers()) > 0 { + ix.Native.responsibleMu.Lock() + ix.Native.responsiblePeers = map[pp.ID]struct{}{} + ix.Native.responsibleMu.Unlock() + logger.Info().Int("released", count).Msg("native: offloaded responsible peers to real indexers") + } + } +} + +// StartNativeRegistration starts a goroutine that periodically registers this +// indexer with all configured native indexers (every RecommendedHeartbeatInterval). +func StartNativeRegistration(h host.Host, nativeAddressesStr string) { + go func() { + common.RegisterWithNative(h, nativeAddressesStr) + t := time.NewTicker(common.RecommendedHeartbeatInterval) + defer t.Stop() + for range t.C { + common.RegisterWithNative(h, nativeAddressesStr) + } + }() +} diff --git a/daemons/node/indexer/service.go b/daemons/node/indexer/service.go index 253b353..79d6f9f 100644 --- a/daemons/node/indexer/service.go +++ b/daemons/node/indexer/service.go @@ -2,6 +2,7 @@ package indexer import ( "context" + "oc-discovery/conf" "oc-discovery/daemons/node/common" "sync" @@ -10,58 +11,68 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" record "github.com/libp2p/go-libp2p-record" "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" ) -// Index Record is the model for the specialized registry of node connected to Indexer +// IndexerService manages the indexer node's state: stream records, DHT, pubsub. type IndexerService struct { *common.LongLivedStreamRecordedService[PeerRecord] PS *pubsub.PubSub DHT *dht.IpfsDHT isStrictIndexer bool mu sync.RWMutex - DisposedPeers map[peer.ID]*common.TopicNodeActivityPub - - SeenQueries map[string]bool - SeenMU sync.Mutex + IsNative bool + Native *NativeState // non-nil when IsNative == true } -// if a pubsub is given... indexer is also an active oc-node. If not... your a strict indexer -func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int) *IndexerService { +// NewIndexerService creates an IndexerService. +// If ps is nil, this is a strict indexer (no pre-existing gossip sub from a node). +func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int, isNative bool) *IndexerService { logger := oclib.GetLogger() logger.Info().Msg("open indexer mode...") var err error ix := &IndexerService{ - LongLivedStreamRecordedService: common.NewStreamRecordedService[PeerRecord](h, maxNode, false), + LongLivedStreamRecordedService: common.NewStreamRecordedService[PeerRecord](h, maxNode), isStrictIndexer: ps == nil, + IsNative: isNative, } - if ps == nil { // generate your fresh gossip for the flow of killed node... EVERYBODY should know ! + if ps == nil { ps, err = pubsub.NewGossipSub(context.Background(), ix.Host) if err != nil { - panic(err) // can't run your indexer without a propalgation pubsub, of state of node. + panic(err) // can't run your indexer without a propagation pubsub } } ix.PS = ps - // later TODO : all indexer laucnh a private replica of them self. DEV OPS + if ix.isStrictIndexer { logger.Info().Msg("connect to indexers as strict indexer...") - common.ConnectToIndexers(h, 0, 5, ix.Host.ID()) // TODO : make var to change how many indexers are allowed. - logger.Info().Msg("subscribe to node activity as strict indexer...") - + common.ConnectToIndexers(h, 0, 5, ix.Host.ID()) logger.Info().Msg("subscribe to decentralized search flow as strict indexer...") ix.SubscribeToSearch(ix.PS, nil) } + if ix.DHT, err = dht.New( context.Background(), ix.Host, dht.Mode(dht.ModeServer), dht.Validator(record.NamespacedValidator{ - "node": PeerRecordValidator{}, + "node": PeerRecordValidator{}, + "indexer": IndexerRecordValidator{}, // for native indexer registry }), ); err != nil { return nil } - ix.initNodeHandler() // then listen up on every protocol expected + + // InitNative must happen after DHT is ready + if isNative { + ix.InitNative() + } else { + ix.initNodeHandler() + } + + // Register with configured natives so this indexer appears in their cache + if nativeAddrs := conf.GetConfig().NativeIndexerAddresses; nativeAddrs != "" { + StartNativeRegistration(ix.Host, nativeAddrs) + } return ix } diff --git a/daemons/node/node.go b/daemons/node/node.go index d7e90a6..87ecb9b 100644 --- a/daemons/node/node.go +++ b/daemons/node/node.go @@ -66,7 +66,7 @@ func InitNode(isNode bool, isIndexer bool, isNativeIndexer bool) (*Node, error) node := &Node{ PeerID: h.ID(), isIndexer: isIndexer, - LongLivedStreamRecordedService: common.NewStreamRecordedService[interface{}](h, 1000, false), + LongLivedStreamRecordedService: common.NewStreamRecordedService[interface{}](h, 1000), } var ps *pubsubs.PubSub if isNode { @@ -102,7 +102,7 @@ func InitNode(isNode bool, isIndexer bool, isNativeIndexer bool) (*Node, error) } if isIndexer { logger.Info().Msg("generate opencloud indexer...") - node.IndexerService = indexer.NewIndexerService(node.Host, ps, 5) + node.IndexerService = indexer.NewIndexerService(node.Host, ps, 5, isNativeIndexer) } logger.Info().Msg("connect to NATS") ListenNATS(node)