365 lines
11 KiB
Go
365 lines
11 KiB
Go
package common
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"math/rand"
|
|
"oc-discovery/conf"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
oclib "cloud.o-forge.io/core/oc-lib"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
pp "github.com/libp2p/go-libp2p/core/peer"
|
|
)
|
|
|
|
const (
|
|
ProtocolNativeSubscription = "/opencloud/native/subscribe/1.0"
|
|
ProtocolNativeGetIndexers = "/opencloud/native/indexers/1.0"
|
|
// ProtocolNativeConsensus is used by nodes/indexers to cross-validate an indexer
|
|
// pool against all configured native peers.
|
|
ProtocolNativeConsensus = "/opencloud/native/consensus/1.0"
|
|
RecommendedHeartbeatInterval = 60 * time.Second
|
|
|
|
// TopicIndexerRegistry is the PubSub topic used by native indexers to gossip
|
|
// newly registered indexer PeerIDs to neighbouring natives.
|
|
TopicIndexerRegistry = "oc-indexer-registry"
|
|
|
|
// consensusQueryTimeout is the per-native timeout for a consensus query.
|
|
consensusQueryTimeout = 3 * time.Second
|
|
// consensusCollectTimeout is the total wait for all native responses.
|
|
consensusCollectTimeout = 4 * time.Second
|
|
)
|
|
|
|
// ConsensusRequest is sent by a node/indexer to a native to validate a candidate
|
|
// indexer list. The native replies with what it trusts and what it suggests instead.
|
|
type ConsensusRequest struct {
|
|
Candidates []string `json:"candidates"`
|
|
}
|
|
|
|
// ConsensusResponse is returned by a native during a consensus challenge.
|
|
// Trusted = candidates the native considers alive.
|
|
// Suggestions = extras the native knows and trusts but that were not in the candidate list.
|
|
type ConsensusResponse struct {
|
|
Trusted []string `json:"trusted"`
|
|
Suggestions []string `json:"suggestions,omitempty"`
|
|
}
|
|
|
|
// IndexerRegistration is sent by an indexer to a native to signal its alive state.
|
|
// Only Addr is required; PeerID is derived from it if omitted.
|
|
type IndexerRegistration struct {
|
|
PeerID string `json:"peer_id,omitempty"`
|
|
Addr string `json:"addr"`
|
|
}
|
|
|
|
// GetIndexersRequest asks a native for a pool of live indexers.
|
|
type GetIndexersRequest struct {
|
|
Count int `json:"count"`
|
|
}
|
|
|
|
// GetIndexersResponse is returned by the native with live indexer multiaddrs.
|
|
type GetIndexersResponse struct {
|
|
Indexers []string `json:"indexers"`
|
|
IsSelfFallback bool `json:"is_self_fallback,omitempty"`
|
|
}
|
|
|
|
var StaticNatives = map[string]*pp.AddrInfo{}
|
|
var StreamNativeMu sync.RWMutex
|
|
var StreamNatives ProtocolStream = ProtocolStream{}
|
|
|
|
// ConnectToNatives is the client-side entry point for nodes/indexers that have
|
|
// NativeIndexerAddresses configured. It:
|
|
// 1. Connects (long-lived heartbeat) to all configured natives.
|
|
// 2. Fetches an initial indexer pool from the FIRST responsive native.
|
|
// 3. Challenges that pool to ALL natives (consensus round 1).
|
|
// 4. If the confirmed list is short, samples native suggestions and re-challenges (round 2).
|
|
// 5. Populates StaticIndexers with majority-confirmed indexers.
|
|
func ConnectToNatives(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) error {
|
|
logger := oclib.GetLogger()
|
|
|
|
// Parse in config order: the first entry is the primary pool source.
|
|
orderedAddrs := []string{}
|
|
for _, addr := range strings.Split(conf.GetConfig().NativeIndexerAddresses, ",") {
|
|
addr = strings.TrimSpace(addr)
|
|
if addr == "" {
|
|
continue
|
|
}
|
|
ad, err := pp.AddrInfoFromString(addr)
|
|
if err != nil {
|
|
logger.Err(err).Msg("ConnectToNatives: invalid addr")
|
|
continue
|
|
}
|
|
StaticNatives[addr] = ad
|
|
orderedAddrs = append(orderedAddrs, addr)
|
|
}
|
|
if len(StaticNatives) == 0 {
|
|
return errors.New("no valid native addresses configured")
|
|
}
|
|
|
|
// Long-lived heartbeat connections to keep the native mesh active.
|
|
SendHeartbeat(context.Background(), ProtocolHeartbeat,
|
|
conf.GetConfig().Name, h, StreamNatives, StaticNatives, 20*time.Second)
|
|
|
|
// Step 1: get an initial pool from the FIRST responsive native (in config order).
|
|
var candidates []string
|
|
var isFallback bool
|
|
for _, addr := range orderedAddrs {
|
|
ad := StaticNatives[addr]
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
if err := h.Connect(ctx, *ad); err != nil {
|
|
cancel()
|
|
continue
|
|
}
|
|
s, err := h.NewStream(ctx, ad.ID, ProtocolNativeGetIndexers)
|
|
cancel()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
req := GetIndexersRequest{Count: maxIndexer}
|
|
if encErr := json.NewEncoder(s).Encode(req); encErr != nil {
|
|
s.Close()
|
|
continue
|
|
}
|
|
var resp GetIndexersResponse
|
|
if decErr := json.NewDecoder(s).Decode(&resp); decErr != nil {
|
|
s.Close()
|
|
continue
|
|
}
|
|
s.Close()
|
|
candidates = resp.Indexers
|
|
isFallback = resp.IsSelfFallback
|
|
break // first responsive native only
|
|
}
|
|
|
|
if len(candidates) == 0 {
|
|
if minIndexer > 0 {
|
|
return errors.New("ConnectToNatives: no indexers available from any native")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// If the native is already the fallback indexer, use it directly — no consensus needed.
|
|
if isFallback {
|
|
for _, addr := range candidates {
|
|
ad, err := pp.AddrInfoFromString(addr)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
StaticIndexers[addr] = ad
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Step 2: challenge the pool to ALL configured natives and score by majority vote.
|
|
confirmed, suggestions := clientSideConsensus(h, candidates)
|
|
|
|
// Step 3: if we still have gaps, sample from suggestions and re-challenge.
|
|
if len(confirmed) < maxIndexer && len(suggestions) > 0 {
|
|
rand.Shuffle(len(suggestions), func(i, j int) { suggestions[i], suggestions[j] = suggestions[j], suggestions[i] })
|
|
gap := maxIndexer - len(confirmed)
|
|
if gap > len(suggestions) {
|
|
gap = len(suggestions)
|
|
}
|
|
confirmed2, _ := clientSideConsensus(h, append(confirmed, suggestions[:gap]...))
|
|
if len(confirmed2) > 0 {
|
|
confirmed = confirmed2
|
|
}
|
|
}
|
|
|
|
// Step 4: populate StaticIndexers with confirmed addresses.
|
|
for _, addr := range confirmed {
|
|
ad, err := pp.AddrInfoFromString(addr)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
StaticIndexers[addr] = ad
|
|
}
|
|
|
|
if minIndexer > 0 && len(StaticIndexers) < minIndexer {
|
|
return errors.New("not enough majority-confirmed indexers available")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// clientSideConsensus challenges a candidate list to ALL configured native peers
|
|
// in parallel. Each native replies with the candidates it trusts plus extras it
|
|
// recommends. An indexer is confirmed when strictly more than 50% of responding
|
|
// natives trust it. The remaining addresses from native suggestions are returned
|
|
// as suggestions for a possible second round.
|
|
func clientSideConsensus(h host.Host, candidates []string) (confirmed []string, suggestions []string) {
|
|
if len(candidates) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
StreamNativeMu.RLock()
|
|
peers := make([]*pp.AddrInfo, 0, len(StaticNatives))
|
|
for _, ad := range StaticNatives {
|
|
peers = append(peers, ad)
|
|
}
|
|
StreamNativeMu.RUnlock()
|
|
|
|
if len(peers) == 0 {
|
|
// No natives to challenge: trust candidates as-is.
|
|
return candidates, nil
|
|
}
|
|
|
|
type nativeResult struct {
|
|
trusted []string
|
|
suggestions []string
|
|
responded bool
|
|
}
|
|
ch := make(chan nativeResult, len(peers))
|
|
|
|
for _, ad := range peers {
|
|
go func(ad *pp.AddrInfo) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), consensusQueryTimeout)
|
|
defer cancel()
|
|
if err := h.Connect(ctx, *ad); err != nil {
|
|
ch <- nativeResult{}
|
|
return
|
|
}
|
|
s, err := h.NewStream(ctx, ad.ID, ProtocolNativeConsensus)
|
|
if err != nil {
|
|
ch <- nativeResult{}
|
|
return
|
|
}
|
|
defer s.Close()
|
|
if err := json.NewEncoder(s).Encode(ConsensusRequest{Candidates: candidates}); err != nil {
|
|
ch <- nativeResult{}
|
|
return
|
|
}
|
|
var resp ConsensusResponse
|
|
if err := json.NewDecoder(s).Decode(&resp); err != nil {
|
|
ch <- nativeResult{}
|
|
return
|
|
}
|
|
ch <- nativeResult{trusted: resp.Trusted, suggestions: resp.Suggestions, responded: true}
|
|
}(ad)
|
|
}
|
|
|
|
// Collect responses up to consensusCollectTimeout.
|
|
timer := time.NewTimer(consensusCollectTimeout)
|
|
defer timer.Stop()
|
|
|
|
trustedCounts := map[string]int{}
|
|
suggestionPool := map[string]struct{}{}
|
|
total := 0 // counts only natives that actually responded
|
|
collected := 0
|
|
|
|
collect:
|
|
for collected < len(peers) {
|
|
select {
|
|
case r := <-ch:
|
|
collected++
|
|
if !r.responded {
|
|
continue // timeout / error: skip, do not count as vote
|
|
}
|
|
total++
|
|
seen := map[string]struct{}{}
|
|
for _, addr := range r.trusted {
|
|
if _, already := seen[addr]; !already {
|
|
trustedCounts[addr]++
|
|
seen[addr] = struct{}{}
|
|
}
|
|
}
|
|
for _, addr := range r.suggestions {
|
|
suggestionPool[addr] = struct{}{}
|
|
}
|
|
case <-timer.C:
|
|
break collect
|
|
}
|
|
}
|
|
|
|
if total == 0 {
|
|
// No native responded: fall back to trusting the candidates as-is.
|
|
return candidates, nil
|
|
}
|
|
|
|
confirmedSet := map[string]struct{}{}
|
|
for addr, count := range trustedCounts {
|
|
if count*2 > total { // strictly >50%
|
|
confirmed = append(confirmed, addr)
|
|
confirmedSet[addr] = struct{}{}
|
|
}
|
|
}
|
|
for addr := range suggestionPool {
|
|
if _, ok := confirmedSet[addr]; !ok {
|
|
suggestions = append(suggestions, addr)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
const ProtocolIndexerHeartbeat = "/opencloud/heartbeat/indexer/1.0"
|
|
|
|
// RegisterWithNative sends a one-shot registration to each configured native indexer.
|
|
// Should be called periodically every RecommendedHeartbeatInterval.
|
|
func RegisterWithNative(h host.Host, nativeAddressesStr string) {
|
|
logger := oclib.GetLogger()
|
|
myAddr := ""
|
|
if len(h.Addrs()) > 0 {
|
|
myAddr = h.Addrs()[0].String() + "/p2p/" + h.ID().String()
|
|
}
|
|
reg := IndexerRegistration{
|
|
PeerID: h.ID().String(),
|
|
Addr: myAddr,
|
|
}
|
|
for _, addr := range strings.Split(nativeAddressesStr, ",") {
|
|
addr = strings.TrimSpace(addr)
|
|
if addr == "" {
|
|
continue
|
|
}
|
|
ad, err := pp.AddrInfoFromString(addr)
|
|
if err != nil {
|
|
logger.Err(err).Msg("RegisterWithNative: invalid addr")
|
|
continue
|
|
}
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
if err := h.Connect(ctx, *ad); err != nil {
|
|
cancel()
|
|
continue
|
|
}
|
|
s, err := h.NewStream(ctx, ad.ID, ProtocolNativeSubscription)
|
|
cancel()
|
|
if err != nil {
|
|
logger.Err(err).Msg("RegisterWithNative: stream open failed")
|
|
continue
|
|
}
|
|
if err := json.NewEncoder(s).Encode(reg); err != nil {
|
|
logger.Err(err).Msg("RegisterWithNative: encode failed")
|
|
}
|
|
s.Close()
|
|
}
|
|
}
|
|
|
|
// EnsureNativePeers populates StaticNatives from config and starts heartbeat
|
|
// connections to other natives. Safe to call multiple times; heartbeat is only
|
|
// started once (when StaticNatives transitions from empty to non-empty).
|
|
func EnsureNativePeers(h host.Host) {
|
|
nativeAddrs := conf.GetConfig().NativeIndexerAddresses
|
|
if nativeAddrs == "" {
|
|
return
|
|
}
|
|
StreamNativeMu.Lock()
|
|
wasEmpty := len(StaticNatives) == 0
|
|
for _, addr := range strings.Split(nativeAddrs, ",") {
|
|
addr = strings.TrimSpace(addr)
|
|
if addr == "" {
|
|
continue
|
|
}
|
|
ad, err := pp.AddrInfoFromString(addr)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
StaticNatives[addr] = ad
|
|
}
|
|
StreamNativeMu.Unlock()
|
|
|
|
if wasEmpty && len(StaticNatives) > 0 {
|
|
SendHeartbeat(context.Background(), ProtocolIndexerHeartbeat,
|
|
conf.GetConfig().Name, h, StreamNatives, StaticNatives, 20*time.Second)
|
|
}
|
|
}
|