Full Flow : Catalog + Peer

This commit is contained in:
mr
2026-03-05 15:22:02 +01:00
parent ef3d998ead
commit 3751ec554d
20 changed files with 970 additions and 641 deletions

View File

@@ -18,6 +18,11 @@ type Config struct {
MinIndexer int
MaxIndexer int
// ConsensusQuorum is the minimum fraction of natives that must agree for a
// candidate indexer to be considered confirmed. Range (0, 1]. Default 0.5
// (strict majority). Raise to 0.67 for stronger Byzantine resistance.
ConsensusQuorum float64
}
var instance *Config

View File

@@ -112,6 +112,12 @@ func NewLongLivedPubSubService(h host.Host) *LongLivedPubSubService {
}
}
func (s *LongLivedPubSubService) GetPubSub(topicName string) *pubsub.Topic {
s.PubsubMu.Lock()
defer s.PubsubMu.Unlock()
return s.LongLivedPubSubs[topicName]
}
func (s *LongLivedPubSubService) processEvent(
ctx context.Context,
p *peer.Peer,
@@ -123,26 +129,8 @@ func (s *LongLivedPubSubService) processEvent(
return handler(ctx, topicName, event)
}
const TopicPubSubNodeActivity = "oc-node-activity"
const TopicPubSubSearch = "oc-node-search"
func (s *LongLivedPubSubService) SubscribeToNodeActivity(ps *pubsub.PubSub, f *func(context.Context, TopicNodeActivityPub, string)) error {
ps.RegisterTopicValidator(TopicPubSubNodeActivity, func(ctx context.Context, p pp.ID, m *pubsub.Message) bool {
return true
})
if topic, err := ps.Join(TopicPubSubNodeActivity); err != nil {
return err
} else {
s.PubsubMu.Lock()
defer s.PubsubMu.Unlock()
s.LongLivedPubSubs[TopicPubSubNodeActivity] = topic
}
if f != nil {
return SubscribeEvents(s, context.Background(), TopicPubSubNodeActivity, -1, *f)
}
return nil
}
func (s *LongLivedPubSubService) SubscribeToSearch(ps *pubsub.PubSub, f *func(context.Context, Event, string)) error {
ps.RegisterTopicValidator(TopicPubSubSearch, func(ctx context.Context, p pp.ID, m *pubsub.Message) bool {
return true
@@ -151,8 +139,8 @@ func (s *LongLivedPubSubService) SubscribeToSearch(ps *pubsub.PubSub, f *func(co
return err
} else {
s.PubsubMu.Lock()
defer s.PubsubMu.Unlock()
s.LongLivedPubSubs[TopicPubSubSearch] = topic
s.PubsubMu.Unlock()
}
if f != nil {
return SubscribeEvents(s, context.Background(), TopicPubSubSearch, -1, *f)
@@ -178,10 +166,13 @@ func SubscribeEvents[T interface{}](s *LongLivedPubSubService,
}
func waitResults[T interface{}](s *LongLivedPubSubService, ctx context.Context, sub *pubsub.Subscription, proto string, timeout int, f func(context.Context, T, string)) {
fmt.Println("waitResults", proto)
defer ctx.Done()
for {
s.PubsubMu.Lock() // check safely if cache is actually notified subscribed to topic
if s.LongLivedPubSubs[proto] == nil { // if not kill the loop.
s.PubsubMu.Unlock()
break
}
s.PubsubMu.Unlock()

View File

@@ -27,9 +27,9 @@ type LongLivedStreamRecordedService[T interface{}] struct {
StreamRecords map[protocol.ID]map[pp.ID]*StreamRecord[T]
StreamMU sync.RWMutex
maxNodesConn int
// AfterHeartbeat is an optional hook called after each successful heartbeat update.
// The indexer sets it to republish the embedded signed record to the DHT.
AfterHeartbeat func(pid pp.ID)
// AfterHeartbeat is called after each successful heartbeat with the full
// decoded Heartbeat so the hook can use the fresh embedded PeerRecord.
AfterHeartbeat func(hb *Heartbeat)
// AfterDelete is called after gc() evicts an expired peer, outside the lock.
// name and did may be empty if the HeartbeatStream had no metadata.
AfterDelete func(pid pp.ID, name string, did string)
@@ -66,7 +66,6 @@ func (ix *LongLivedStreamRecordedService[T]) gc() {
return
}
streams := ix.StreamRecords[ProtocolHeartbeat]
fmt.Println(StaticNatives, StaticIndexers, streams)
type gcEntry struct {
pid pp.ID
@@ -184,9 +183,26 @@ func (ix *LongLivedStreamRecordedService[T]) HandleHeartbeat(s network.Stream) {
logger.Info().Msg("A new node is subscribed : " + pid.String())
}
ix.StreamMU.Unlock()
// Let the indexer republish the embedded signed record to the DHT.
if ix.AfterHeartbeat != nil {
ix.AfterHeartbeat(*pid)
// Enrich hb.DID before calling the hook: nodes never set hb.DID directly;
// extract it from the embedded signed PeerRecord if available, then fall
// back to the DID stored by handleNodePublish in the stream record.
if hb.DID == "" && len(hb.Record) > 0 {
var partial struct {
DID string `json:"did"`
}
if json.Unmarshal(hb.Record, &partial) == nil && partial.DID != "" {
hb.DID = partial.DID
}
}
if hb.DID == "" {
ix.StreamMU.RLock()
if rec, ok := streams[*pid]; ok {
hb.DID = rec.DID
}
ix.StreamMU.RUnlock()
}
if ix.AfterHeartbeat != nil && hb.DID != "" {
ix.AfterHeartbeat(hb)
}
}
}
@@ -214,17 +230,15 @@ func CheckHeartbeat(h host.Host, s network.Stream, dec *json.Decoder, streams ma
}
lock.Unlock()
diversity := getDiversityRate(h, hb.IndexersBinded)
fmt.Println(upTime, bpms, diversity)
hb.ComputeIndexerScore(upTime, bpms, diversity)
// First heartbeat: uptime is always 0 so the score ceiling is 60, below the
// steady-state threshold of 75. Use a lower admission threshold so new peers
// can enter and start accumulating uptime. Subsequent heartbeats must meet
// the full threshold once uptime is tracked.
minScore := float64(50)
minScore := float64(40)
if isFirstHeartbeat {
minScore = 40
}
fmt.Println(hb.Score, minScore)
if hb.Score < minScore {
return nil, nil, errors.New("not enough trusting value")
}
@@ -239,13 +253,11 @@ func CheckHeartbeat(h host.Host, s network.Stream, dec *json.Decoder, streams ma
}
func getDiversityRate(h host.Host, peers []string) float64 {
peers, _ = checkPeers(h, peers)
diverse := []string{}
for _, p := range peers {
ip, err := ExtractIP(p)
if err != nil {
fmt.Println("NO IP", p, err)
continue
}
div := ip.Mask(net.CIDRMask(24, 32)).String()
@@ -725,7 +737,7 @@ func SendHeartbeat(ctx context.Context, proto protocol.ID, name string, h host.H
go replenishIndexersFromNative(h, idxNeed)
}
}
logger.Debug().Str("peer", ix.ID.String()).Str("proto", string(proto)).Msg("[native] step 2 — heartbeat sent ok")
// logger.Debug().Str("peer", ix.ID.String()).Str("proto", string(proto)).Msg("[native] step 2 — heartbeat sent ok")
}
}
}
@@ -768,6 +780,7 @@ func TempStream(h host.Host, ad pp.AddrInfo, proto protocol.ID, did string, stre
return streams, err
}
}
if streams[proto] != nil && streams[proto][ad.ID] != nil {
return streams, nil
} else if s, err := h.NewStream(ctxTTL, ad.ID, proto); err == nil {

View File

@@ -4,6 +4,7 @@ import (
"context"
"cloud.o-forge.io/core/oc-lib/models/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
)
type HeartBeatStreamed interface {
@@ -11,5 +12,6 @@ type HeartBeatStreamed interface {
}
type DiscoveryPeer interface {
GetPeerRecord(ctx context.Context, key string) ([]*peer.Peer, error)
GetPeerRecord(ctx context.Context, key string, search bool) ([]*peer.Peer, error)
GetPubSub(topicName string) *pubsub.Topic
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"math/rand"
"oc-discovery/conf"
"strings"
@@ -11,14 +12,16 @@ import (
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
pp "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
const (
ProtocolNativeSubscription = "/opencloud/native/subscribe/1.0"
ProtocolNativeGetIndexers = "/opencloud/native/indexers/1.0"
ProtocolNativeSubscription = "/opencloud/native/subscribe/1.0"
ProtocolNativeUnsubscribe = "/opencloud/native/unsubscribe/1.0"
ProtocolNativeGetIndexers = "/opencloud/native/indexers/1.0"
// ProtocolNativeConsensus is used by nodes/indexers to cross-validate an indexer
// pool against all configured native peers.
ProtocolNativeConsensus = "/opencloud/native/consensus/1.0"
@@ -50,9 +53,50 @@ type ConsensusResponse struct {
// IndexerRegistration is sent by an indexer to a native to signal its alive state.
// Only Addr is required; PeerID is derived from it if omitted.
// Timestamp + PubKey + Signature allow the native and DHT to verify that the
// registration was produced by the peer that owns the declared PeerID.
type IndexerRegistration struct {
PeerID string `json:"peer_id,omitempty"`
Addr string `json:"addr"`
PeerID string `json:"peer_id,omitempty"`
Addr string `json:"addr"`
Timestamp int64 `json:"ts,omitempty"` // Unix nanoseconds (anti-replay)
PubKey []byte `json:"pub_key,omitempty"` // marshaled libp2p public key
Signature []byte `json:"sig,omitempty"` // Sign(signaturePayload())
}
// SignaturePayload returns the canonical byte slice that is signed/verified.
// Format: "<PeerID>|<Addr>|<Timestamp>"
func (r *IndexerRegistration) SignaturePayload() []byte {
return []byte(fmt.Sprintf("%s|%s|%d", r.PeerID, r.Addr, r.Timestamp))
}
// Sign fills PubKey and Signature using the host's own private key.
func (r *IndexerRegistration) Sign(h host.Host) {
priv := h.Peerstore().PrivKey(h.ID())
if priv == nil {
return
}
if pub, err := crypto.MarshalPublicKey(priv.GetPublic()); err == nil {
r.PubKey = pub
}
r.Signature, _ = priv.Sign(r.SignaturePayload())
}
// Verify returns true when the registration carries a valid self-signature.
// Returns true (not an error) when no signature is present, to remain backward-
// compatible with older nodes that do not sign their registrations.
func (r *IndexerRegistration) Verify() (bool, error) {
if len(r.Signature) == 0 || len(r.PubKey) == 0 {
return true, nil // unsigned — accepted but untrusted
}
pub, err := crypto.UnmarshalPublicKey(r.PubKey)
if err != nil {
return false, fmt.Errorf("unmarshal pub key: %w", err)
}
ok, err := pub.Verify(r.SignaturePayload(), r.Signature)
if err != nil {
return false, err
}
return ok, nil
}
// GetIndexersRequest asks a native for a pool of live indexers.
@@ -194,46 +238,100 @@ func replenishIndexersFromNative(h host.Host, need int) {
logger.Info().Msg("[native] step 4 — heartbeat goroutine nudged")
}
// fetchIndexersFromNative opens a ProtocolNativeGetIndexers stream to the first
// responsive native and returns the candidate list and fallback flag.
// fetchIndexersFromNative queries ALL configured natives in parallel and merges
// their indexer lists. Non-fallback responses are preferred; if only fallbacks
// respond the fallback list is returned. Results are deduplicated and capped at count.
func fetchIndexersFromNative(h host.Host, nativeAddrs []string, count int) (candidates []string, isFallback bool) {
logger := oclib.GetLogger()
for _, addr := range nativeAddrs {
ad, err := pp.AddrInfoFromString(addr)
if err != nil {
logger.Warn().Str("addr", addr).Msg("[native] fetch — skipping invalid addr")
continue
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := h.Connect(ctx, *ad); err != nil {
cancel()
logger.Warn().Str("addr", addr).Err(err).Msg("[native] fetch — connect failed")
continue
}
s, err := h.NewStream(ctx, ad.ID, ProtocolNativeGetIndexers)
cancel()
if err != nil {
logger.Warn().Str("addr", addr).Err(err).Msg("[native] fetch — stream open failed")
continue
}
req := GetIndexersRequest{Count: count, From: h.ID().String()}
if encErr := json.NewEncoder(s).Encode(req); encErr != nil {
s.Close()
logger.Warn().Str("addr", addr).Err(encErr).Msg("[native] fetch — encode request failed")
continue
}
var resp GetIndexersResponse
if decErr := json.NewDecoder(s).Decode(&resp); decErr != nil {
s.Close()
logger.Warn().Str("addr", addr).Err(decErr).Msg("[native] fetch — decode response failed")
continue
}
s.Close()
logger.Info().Str("native", addr).Int("indexers", len(resp.Indexers)).Bool("fallback", resp.IsSelfFallback).Msg("[native] fetch — response received")
return resp.Indexers, resp.IsSelfFallback
if len(nativeAddrs) == 0 {
return nil, false
}
logger.Warn().Msg("[native] fetch — no native responded")
return nil, false
type fetchResult struct {
indexers []string
isFallback bool
}
ch := make(chan fetchResult, len(nativeAddrs))
for _, addr := range nativeAddrs {
go func(addr string) {
ad, err := pp.AddrInfoFromString(addr)
if err != nil {
ch <- fetchResult{}
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := h.Connect(ctx, *ad); err != nil {
logger.Warn().Str("addr", addr).Err(err).Msg("[native] fetch — connect failed")
ch <- fetchResult{}
return
}
s, err := h.NewStream(ctx, ad.ID, ProtocolNativeGetIndexers)
if err != nil {
logger.Warn().Str("addr", addr).Err(err).Msg("[native] fetch — stream open failed")
ch <- fetchResult{}
return
}
s.SetDeadline(time.Now().Add(4 * time.Second))
defer s.Close()
req := GetIndexersRequest{Count: count, From: h.ID().String()}
if encErr := json.NewEncoder(s).Encode(req); encErr != nil {
logger.Warn().Str("addr", addr).Err(encErr).Msg("[native] fetch — encode failed")
ch <- fetchResult{}
return
}
var resp GetIndexersResponse
if decErr := json.NewDecoder(s).Decode(&resp); decErr != nil {
logger.Warn().Str("addr", addr).Err(decErr).Msg("[native] fetch — decode failed")
ch <- fetchResult{}
return
}
logger.Info().Str("native", addr).Int("indexers", len(resp.Indexers)).Bool("fallback", resp.IsSelfFallback).Msg("[native] fetch — response received")
ch <- fetchResult{indexers: resp.Indexers, isFallback: resp.IsSelfFallback}
}(addr)
}
timer := time.NewTimer(6 * time.Second)
defer timer.Stop()
seen := map[string]struct{}{}
var realList, fallbackList []string
collected := 0
collect:
for collected < len(nativeAddrs) {
select {
case r := <-ch:
collected++
for _, ix := range r.indexers {
if _, ok := seen[ix]; ok {
continue
}
seen[ix] = struct{}{}
if r.isFallback {
fallbackList = append(fallbackList, ix)
} else {
realList = append(realList, ix)
}
}
case <-timer.C:
break collect
}
}
if len(realList) > 0 {
if len(realList) > count {
realList = realList[:count]
}
logger.Info().Int("count", len(realList)).Msg("[native] fetch — merged real indexers from all natives")
return realList, false
}
if len(fallbackList) > count {
fallbackList = fallbackList[:count]
}
logger.Info().Int("count", len(fallbackList)).Bool("fallback", true).Msg("[native] fetch — using fallback indexers")
return fallbackList, len(fallbackList) > 0
}
// resolvePool converts a candidate list to a validated addr→AddrInfo map.
@@ -337,6 +435,9 @@ func clientSideConsensus(h host.Host, candidates []string) (confirmed []string,
ch <- nativeResult{}
return
}
// Set an absolute deadline on the stream so Encode/Decode cannot
// block past the per-query budget, even if the remote native stalls.
s.SetDeadline(time.Now().Add(consensusQueryTimeout))
defer s.Close()
if err := json.NewEncoder(s).Encode(ConsensusRequest{Candidates: candidates}); err != nil {
ch <- nativeResult{}
@@ -387,9 +488,13 @@ collect:
return candidates, nil
}
quorum := conf.GetConfig().ConsensusQuorum
if quorum <= 0 {
quorum = 0.5
}
confirmedSet := map[string]struct{}{}
for addr, count := range trustedCounts {
if count*2 > total {
if float64(count) > float64(total)*quorum {
confirmed = append(confirmed, addr)
confirmedSet[addr] = struct{}{}
}
@@ -415,9 +520,11 @@ func RegisterWithNative(h host.Host, nativeAddressesStr string) {
return
}
reg := IndexerRegistration{
PeerID: h.ID().String(),
Addr: myAddr,
PeerID: h.ID().String(),
Addr: myAddr,
Timestamp: time.Now().UnixNano(),
}
reg.Sign(h)
for _, addr := range strings.Split(nativeAddressesStr, ",") {
addr = strings.TrimSpace(addr)
if addr == "" {
@@ -446,6 +553,40 @@ func RegisterWithNative(h host.Host, nativeAddressesStr string) {
}
}
// UnregisterFromNative sends an explicit deregistration message to each configured
// native so it can evict this indexer immediately without waiting for TTL expiry.
// Should be called during graceful shutdown.
func UnregisterFromNative(h host.Host, nativeAddressesStr string) {
logger := oclib.GetLogger()
reg := IndexerRegistration{PeerID: h.ID().String()}
for _, addr := range strings.Split(nativeAddressesStr, ",") {
addr = strings.TrimSpace(addr)
if addr == "" {
continue
}
ad, err := pp.AddrInfoFromString(addr)
if err != nil {
continue
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
if err := h.Connect(ctx, *ad); err != nil {
cancel()
logger.Warn().Str("addr", addr).Msg("UnregisterFromNative: connect failed")
continue
}
s, err := h.NewStream(ctx, ad.ID, ProtocolNativeUnsubscribe)
cancel()
if err != nil {
logger.Warn().Str("addr", addr).Msg("UnregisterFromNative: stream open failed")
continue
}
if err := json.NewEncoder(s).Encode(reg); err != nil {
logger.Warn().Str("addr", addr).Msg("UnregisterFromNative: encode failed")
}
s.Close()
}
}
// EnsureNativePeers populates StaticNatives from config and starts a single
// heartbeat goroutine toward the native mesh. Safe to call multiple times;
// the heartbeat goroutine is started at most once (nativeMeshHeartbeatOnce).
@@ -626,7 +767,7 @@ func replenishNativesFromPeers(h host.Host, lostAddr string, proto protocol.ID)
}
// Last (or only) native — retry periodically.
logger.Info().Str("addr", lostAddr).Msg("[native] replenish natives — last native lost, starting periodic retry")
go retryLostNative(h, lostAddr, proto)
go retryLostNative(context.Background(), h, lostAddr, proto)
}
// fetchNativeFromNatives asks each alive native for one of its own native contacts
@@ -677,7 +818,7 @@ func fetchNativeFromNatives(h host.Host, exclude []string) string {
return peer
}
}
logger.Debug().Str("native", ad.ID.String()).Msg("[native] fetch native peers — no new native from this peer")
// logger.Debug().Str("native", ad.ID.String()).Msg("[native] fetch native peers — no new native from this peer")
}
return ""
}
@@ -735,13 +876,19 @@ func fetchNativeFromIndexers(h host.Host, exclude []string) string {
}
// retryLostNative periodically retries connecting to a lost native address until
// it becomes reachable again or was already restored by another path.
func retryLostNative(h host.Host, addr string, nativeProto protocol.ID) {
// it becomes reachable again, was already restored by another path, or ctx is cancelled.
func retryLostNative(ctx context.Context, h host.Host, addr string, nativeProto protocol.ID) {
logger := oclib.GetLogger()
logger.Info().Str("addr", addr).Msg("[native] retry — periodic retry for lost native started")
t := time.NewTicker(retryNativeInterval)
defer t.Stop()
for range t.C {
for {
select {
case <-ctx.Done():
logger.Info().Str("addr", addr).Msg("[native] retry — context cancelled, stopping retry")
return
case <-t.C:
}
StreamNativeMu.RLock()
_, alreadyRestored := StaticNatives[addr]
StreamNativeMu.RUnlock()

View File

@@ -5,6 +5,8 @@ import (
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"oc-discovery/conf"
"oc-discovery/daemons/node/common"
"strings"
@@ -83,30 +85,17 @@ func (pr *PeerRecord) ExtractPeer(ourkey string, key string, pubKey crypto.PubKe
NATSAddress: pr.NATSAddress,
WalletAddress: pr.WalletAddress,
}
b, err := json.Marshal(p)
if err != nil {
return pp.SELF == p.Relation, nil, err
}
if time.Now().UTC().After(pr.ExpiryDate) {
return pp.SELF == p.Relation, nil, errors.New("peer " + key + " is offline")
}
go tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{
FromApp: "oc-discovery",
Datatype: tools.PEER,
Method: int(tools.CREATE_RESOURCE),
SearchAttr: "peer_id",
Payload: b,
})
return pp.SELF == p.Relation, p, nil
}
type GetValue struct {
Key string `json:"key"`
PeerID peer.ID `json:"peer_id"`
Name string `json:"name,omitempty"`
Search bool `json:"search,omitempty"`
Key string `json:"key"`
PeerID string `json:"peer_id,omitempty"`
Name string `json:"name,omitempty"`
Search bool `json:"search,omitempty"`
}
type GetResponse struct {
@@ -132,29 +121,33 @@ func (ix *IndexerService) initNodeHandler() {
// Each heartbeat from a node carries a freshly signed PeerRecord.
// Republish it to the DHT so the record never expires as long as the node
// is alive — no separate publish stream needed from the node side.
ix.AfterHeartbeat = func(pid peer.ID) {
ctx1, cancel1 := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel1()
res, err := ix.DHT.GetValue(ctx1, ix.genPIDKey(pid.String()))
if err != nil {
logger.Warn().Err(err)
return
}
did := string(res)
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel2()
res, err = ix.DHT.GetValue(ctx2, ix.genKey(did))
if err != nil {
logger.Warn().Err(err)
return
}
ix.AfterHeartbeat = func(hb *common.Heartbeat) {
// Priority 1: use the fresh signed PeerRecord embedded in the heartbeat.
// Each heartbeat tick, the node re-signs with ExpiryDate = now+2min, so
// this record is always fresh. Fetching from DHT would give a stale expiry.
var rec PeerRecord
if err := json.Unmarshal(res, &rec); err != nil {
logger.Warn().Err(err).Str("peer", pid.String()).Msg("indexer: heartbeat record unmarshal failed")
return
if len(hb.Record) > 0 {
if err := json.Unmarshal(hb.Record, &rec); err != nil {
logger.Warn().Err(err).Msg("indexer: heartbeat embedded record unmarshal failed")
return
}
} else {
// Fallback: node didn't embed a record yet (first heartbeat before claimInfo).
// Fetch from DHT using the DID resolved by HandleHeartbeat.
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
res, err := ix.DHT.GetValue(ctx2, ix.genKey(hb.DID))
cancel2()
if err != nil {
logger.Warn().Err(err).Str("did", hb.DID).Msg("indexer: DHT fetch for refresh failed")
return
}
if err := json.Unmarshal(res, &rec); err != nil {
logger.Warn().Err(err).Str("did", hb.DID).Msg("indexer: heartbeat record unmarshal failed")
return
}
}
if _, err := rec.Verify(); err != nil {
logger.Warn().Err(err).Str("peer", pid.String()).Msg("indexer: heartbeat record signature invalid")
logger.Warn().Err(err).Str("did", rec.DID).Msg("indexer: heartbeat record signature invalid")
return
}
data, err := json.Marshal(rec)
@@ -162,12 +155,14 @@ func (ix *IndexerService) initNodeHandler() {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
logger.Info().Msg("REFRESH PutValue " + ix.genKey(rec.DID))
if err := ix.DHT.PutValue(ctx, ix.genKey(rec.DID), data); err != nil {
logger.Warn().Err(err).Str("did", rec.DID).Msg("indexer: DHT refresh failed")
cancel()
return
}
cancel()
ix.publishNameEvent(NameIndexAdd, rec.Name, rec.PeerID, rec.DID)
if rec.Name != "" {
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
ix.DHT.PutValue(ctx2, ix.genNameKey(rec.Name), []byte(rec.DID))
@@ -188,132 +183,151 @@ func (ix *IndexerService) initNodeHandler() {
func (ix *IndexerService) handleNodePublish(s network.Stream) {
defer s.Close()
logger := oclib.GetLogger()
for {
var rec PeerRecord
if err := json.NewDecoder(s).Decode(&rec); err != nil {
logger.Err(err)
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) ||
strings.Contains(err.Error(), "reset") ||
strings.Contains(err.Error(), "closed") ||
strings.Contains(err.Error(), "too many connections") {
return
}
continue
}
if _, err := rec.Verify(); err != nil {
logger.Err(err)
return
}
if rec.PeerID == "" || rec.ExpiryDate.Before(time.Now().UTC()) {
logger.Err(errors.New(rec.PeerID + " is expired."))
return
}
pid, err := peer.Decode(rec.PeerID)
if err != nil {
return
}
var rec PeerRecord
if err := json.NewDecoder(s).Decode(&rec); err != nil {
logger.Err(err)
return
}
if _, err := rec.Verify(); err != nil {
logger.Err(err)
return
}
if rec.PeerID == "" || rec.ExpiryDate.Before(time.Now().UTC()) {
logger.Err(errors.New(rec.PeerID + " is expired."))
return
}
pid, err := peer.Decode(rec.PeerID)
if err != nil {
return
}
ix.StreamMU.Lock()
defer ix.StreamMU.Unlock()
if ix.StreamRecords[common.ProtocolHeartbeat] == nil {
ix.StreamRecords[common.ProtocolHeartbeat] = map[peer.ID]*common.StreamRecord[PeerRecord]{}
}
streams := ix.StreamRecords[common.ProtocolHeartbeat]
if srec, ok := streams[pid]; ok {
srec.DID = rec.DID
srec.Record = rec
srec.HeartbeatStream.UptimeTracker.LastSeen = time.Now().UTC()
}
ix.StreamMU.Lock()
defer ix.StreamMU.Unlock()
if ix.StreamRecords[common.ProtocolHeartbeat] == nil {
ix.StreamRecords[common.ProtocolHeartbeat] = map[peer.ID]*common.StreamRecord[PeerRecord]{}
}
streams := ix.StreamRecords[common.ProtocolHeartbeat]
if srec, ok := streams[pid]; ok {
srec.DID = rec.DID
srec.Record = rec
srec.HeartbeatStream.UptimeTracker.LastSeen = time.Now().UTC()
}
key := ix.genKey(rec.DID)
data, err := json.Marshal(rec)
if err != nil {
logger.Err(err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if err := ix.DHT.PutValue(ctx, key, data); err != nil {
logger.Err(err)
key := ix.genKey(rec.DID)
data, err := json.Marshal(rec)
if err != nil {
logger.Err(err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if err := ix.DHT.PutValue(ctx, key, data); err != nil {
logger.Err(err)
cancel()
return
}
cancel()
return
}
cancel()
// Secondary index: /name/<name> → DID, so peers can resolve by human-readable name.
if rec.Name != "" {
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
if err := ix.DHT.PutValue(ctx2, ix.genNameKey(rec.Name), []byte(rec.DID)); err != nil {
logger.Err(err).Str("name", rec.Name).Msg("indexer: failed to write name index")
fmt.Println("publishNameEvent")
ix.publishNameEvent(NameIndexAdd, rec.Name, rec.PeerID, rec.DID)
// Secondary index: /name/<name> → DID, so peers can resolve by human-readable name.
if rec.Name != "" {
ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second)
if err := ix.DHT.PutValue(ctx2, ix.genNameKey(rec.Name), []byte(rec.DID)); err != nil {
logger.Err(err).Str("name", rec.Name).Msg("indexer: failed to write name index")
}
cancel2()
}
cancel2()
}
// Secondary index: /pid/<peerID> → DID, so peers can resolve by libp2p PeerID.
if rec.PeerID != "" {
ctx3, cancel3 := context.WithTimeout(context.Background(), 10*time.Second)
if err := ix.DHT.PutValue(ctx3, ix.genPIDKey(rec.PeerID), []byte(rec.DID)); err != nil {
logger.Err(err).Str("pid", rec.PeerID).Msg("indexer: failed to write pid index")
// Secondary index: /pid/<peerID> → DID, so peers can resolve by libp2p PeerID.
if rec.PeerID != "" {
ctx3, cancel3 := context.WithTimeout(context.Background(), 10*time.Second)
if err := ix.DHT.PutValue(ctx3, ix.genPIDKey(rec.PeerID), []byte(rec.DID)); err != nil {
logger.Err(err).Str("pid", rec.PeerID).Msg("indexer: failed to write pid index")
}
cancel3()
}
cancel3()
return
}
}
func (ix *IndexerService) handleNodeGet(s network.Stream) {
defer s.Close()
logger := oclib.GetLogger()
var req GetValue
if err := json.NewDecoder(s).Decode(&req); err != nil {
logger.Err(err)
return
}
resp := GetResponse{Found: false, Records: map[string]PeerRecord{}}
keys := []string{}
// Name substring search — scan in-memory connected nodes first, then DHT exact match.
if req.Name != "" {
if req.Search {
for _, did := range ix.LookupNameIndex(strings.ToLower(req.Name)) {
keys = append(keys, did)
for {
var req GetValue
if err := json.NewDecoder(s).Decode(&req); err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) ||
strings.Contains(err.Error(), "reset") ||
strings.Contains(err.Error(), "closed") ||
strings.Contains(err.Error(), "too many connections") {
return
}
} else {
// 2. DHT exact-name lookup: covers nodes that published but aren't currently connected.
nameCtx, nameCancel := context.WithTimeout(context.Background(), 5*time.Second)
if ch, err := ix.DHT.SearchValue(nameCtx, ix.genNameKey(req.Name)); err == nil {
for did := range ch {
keys = append(keys, string(did))
break
logger.Err(err)
continue
}
resp := GetResponse{Found: false, Records: map[string]PeerRecord{}}
fmt.Println("handleNodeGet", req.Search, req.Name)
keys := []string{}
// Name substring search — scan in-memory connected nodes first, then DHT exact match.
if req.Name != "" {
if req.Search {
for _, did := range ix.LookupNameIndex(strings.ToLower(req.Name)) {
keys = append(keys, did)
}
}
nameCancel()
}
} else if req.PeerID != "" {
pidCtx, pidCancel := context.WithTimeout(context.Background(), 5*time.Second)
if did, err := ix.DHT.GetValue(pidCtx, ix.genPIDKey(req.PeerID.String())); err == nil {
keys = append(keys, string(did))
}
pidCancel()
} else {
keys = append(keys, req.Key)
}
// DHT record fetch by DID key (covers exact-name and PeerID paths).
if len(keys) > 0 {
for _, k := range keys {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
c, err := ix.DHT.GetValue(ctx, ix.genKey(k))
cancel()
if err == nil {
var rec PeerRecord
if json.Unmarshal(c, &rec) == nil {
// Filter by PeerID only when one was explicitly specified.
if req.PeerID == "" || rec.PeerID == req.PeerID.String() {
resp.Records[rec.PeerID] = rec
} else {
// 2. DHT exact-name lookup: covers nodes that published but aren't currently connected.
nameCtx, nameCancel := context.WithTimeout(context.Background(), 5*time.Second)
if ch, err := ix.DHT.SearchValue(nameCtx, ix.genNameKey(req.Name)); err == nil {
for did := range ch {
keys = append(keys, string(did))
break
}
}
} else if req.Name == "" && req.PeerID == "" {
logger.Err(err).Msg("Failed to fetch PeerRecord from DHT " + req.Key)
nameCancel()
}
} else if req.PeerID != "" {
pidCtx, pidCancel := context.WithTimeout(context.Background(), 5*time.Second)
if did, err := ix.DHT.GetValue(pidCtx, ix.genPIDKey(req.PeerID)); err == nil {
keys = append(keys, string(did))
}
pidCancel()
} else {
keys = append(keys, req.Key)
}
// DHT record fetch by DID key (covers exact-name and PeerID paths).
if len(keys) > 0 {
for _, k := range keys {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
fmt.Println("TRY TO CATCH DID", ix.genKey(k))
c, err := ix.DHT.GetValue(ctx, ix.genKey(k))
cancel()
fmt.Println("TRY TO CATCH DID ERR", ix.genKey(k), c, err)
if err == nil {
var rec PeerRecord
if json.Unmarshal(c, &rec) == nil {
fmt.Println("CATCH DID ERR", ix.genKey(k), rec)
resp.Records[rec.PeerID] = rec
}
} else if req.Name == "" && req.PeerID == "" {
logger.Err(err).Msg("Failed to fetch PeerRecord from DHT " + req.Key)
}
}
}
}
resp.Found = len(resp.Records) > 0
_ = json.NewEncoder(s).Encode(resp)
resp.Found = len(resp.Records) > 0
_ = json.NewEncoder(s).Encode(resp)
break
}
}
// handleGetNatives returns this indexer's configured native addresses,
@@ -321,30 +335,38 @@ func (ix *IndexerService) handleNodeGet(s network.Stream) {
func (ix *IndexerService) handleGetNatives(s network.Stream) {
defer s.Close()
logger := oclib.GetLogger()
var req common.GetIndexerNativesRequest
if err := json.NewDecoder(s).Decode(&req); err != nil {
logger.Err(err).Msg("indexer get natives: decode")
return
}
excludeSet := make(map[string]struct{}, len(req.Exclude))
for _, e := range req.Exclude {
excludeSet[e] = struct{}{}
}
resp := common.GetIndexerNativesResponse{}
for _, addr := range strings.Split(conf.GetConfig().NativeIndexerAddresses, ",") {
addr = strings.TrimSpace(addr)
if addr == "" {
for {
var req common.GetIndexerNativesRequest
if err := json.NewDecoder(s).Decode(&req); err != nil {
logger.Err(err).Msg("indexer get natives: decode")
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) ||
strings.Contains(err.Error(), "reset") ||
strings.Contains(err.Error(), "closed") ||
strings.Contains(err.Error(), "too many connections") {
return
}
continue
}
if _, excluded := excludeSet[addr]; !excluded {
resp.Natives = append(resp.Natives, addr)
}
}
if err := json.NewEncoder(s).Encode(resp); err != nil {
logger.Err(err).Msg("indexer get natives: encode response")
excludeSet := make(map[string]struct{}, len(req.Exclude))
for _, e := range req.Exclude {
excludeSet[e] = struct{}{}
}
resp := common.GetIndexerNativesResponse{}
for _, addr := range strings.Split(conf.GetConfig().NativeIndexerAddresses, ",") {
addr = strings.TrimSpace(addr)
if addr == "" {
continue
}
if _, excluded := excludeSet[addr]; !excluded {
resp.Natives = append(resp.Natives, addr)
}
}
if err := json.NewEncoder(s).Encode(resp); err != nil {
logger.Err(err).Msg("indexer get natives: encode response")
}
break
}
}

View File

@@ -3,6 +3,7 @@ package indexer
import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"
"time"
@@ -158,11 +159,13 @@ func (ix *IndexerService) LookupNameIndex(needle string) map[string]string {
ix.nameIndex.indexMu.RLock()
defer ix.nameIndex.indexMu.RUnlock()
for name, peers := range ix.nameIndex.index {
fmt.Println(strings.Contains(strings.ToLower(name), needleLow), needleLow, strings.ToLower(name))
if strings.Contains(strings.ToLower(name), needleLow) {
for peerID, did := range peers {
result[peerID] = did
}
}
}
fmt.Println("RESULT", result)
return result
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"math/rand"
"slices"
"strings"
@@ -15,6 +16,7 @@ import (
oclib "cloud.o-forge.io/core/oc-lib"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
pp "github.com/libp2p/go-libp2p/core/peer"
)
@@ -36,10 +38,15 @@ const (
)
// liveIndexerEntry tracks a registered indexer in the native's in-memory cache and DHT.
// PubKey and Signature are forwarded from the IndexerRegistration so the DHT validator
// can verify that the entry was produced by the peer owning the declared PeerID.
type liveIndexerEntry struct {
PeerID string `json:"peer_id"`
Addr string `json:"addr"`
ExpiresAt time.Time `json:"expires_at"`
PeerID string `json:"peer_id"`
Addr string `json:"addr"`
ExpiresAt time.Time `json:"expires_at"`
RegTimestamp int64 `json:"reg_ts,omitempty"` // Timestamp from the original IndexerRegistration
PubKey []byte `json:"pub_key,omitempty"`
Signature []byte `json:"sig,omitempty"`
}
// NativeState holds runtime state specific to native indexer operation.
@@ -53,13 +60,18 @@ type NativeState struct {
// including entries written by other natives.
knownPeerIDs map[string]string
knownMu sync.RWMutex
// cancel stops background goroutines (runOffloadLoop, refreshIndexersFromDHT)
// when the native shuts down.
cancel context.CancelFunc
}
func newNativeState() *NativeState {
func newNativeState(cancel context.CancelFunc) *NativeState {
return &NativeState{
liveIndexers: map[string]*liveIndexerEntry{},
responsiblePeers: map[pp.ID]struct{}{},
knownPeerIDs: map[string]string{},
cancel: cancel,
}
}
@@ -77,6 +89,18 @@ func (v IndexerRecordValidator) Validate(_ string, value []byte) error {
if e.ExpiresAt.Before(time.Now().UTC()) {
return errors.New("expired indexer record")
}
// Verify self-signature when present — rejects entries forged by a
// compromised native that does not control the declared PeerID.
if len(e.Signature) > 0 && len(e.PubKey) > 0 {
pub, err := crypto.UnmarshalPublicKey(e.PubKey)
if err != nil {
return fmt.Errorf("indexer entry: invalid public key: %w", err)
}
payload := []byte(fmt.Sprintf("%s|%s|%d", e.PeerID, e.Addr, e.RegTimestamp))
if ok, err := pub.Verify(payload, e.Signature); err != nil || !ok {
return errors.New("indexer entry: invalid signature")
}
}
return nil
}
@@ -99,9 +123,11 @@ func (v IndexerRecordValidator) Select(_ string, values [][]byte) (int, error) {
// InitNative registers native-specific stream handlers and starts background loops.
// Must be called after DHT is initialized.
func (ix *IndexerService) InitNative() {
ix.Native = newNativeState()
ctx, cancel := context.WithCancel(context.Background())
ix.Native = newNativeState(cancel)
ix.Host.SetStreamHandler(common.ProtocolHeartbeat, ix.HandleHeartbeat) // specific heartbeat for Indexer.
ix.Host.SetStreamHandler(common.ProtocolNativeSubscription, ix.handleNativeSubscription)
ix.Host.SetStreamHandler(common.ProtocolNativeUnsubscribe, ix.handleNativeUnsubscribe)
ix.Host.SetStreamHandler(common.ProtocolNativeGetIndexers, ix.handleNativeGetIndexers)
ix.Host.SetStreamHandler(common.ProtocolNativeConsensus, ix.handleNativeConsensus)
ix.Host.SetStreamHandler(common.ProtocolNativeGetPeers, ix.handleNativeGetPeers)
@@ -109,8 +135,8 @@ func (ix *IndexerService) InitNative() {
ix.subscribeIndexerRegistry()
// Ensure long connections to other configured natives (native-to-native mesh).
common.EnsureNativePeers(ix.Host)
go ix.runOffloadLoop()
go ix.refreshIndexersFromDHT()
go ix.runOffloadLoop(ctx)
go ix.refreshIndexersFromDHT(ctx)
}
// subscribeIndexerRegistry joins the PubSub topic used by natives to gossip newly
@@ -118,14 +144,40 @@ func (ix *IndexerService) InitNative() {
func (ix *IndexerService) subscribeIndexerRegistry() {
logger := oclib.GetLogger()
ix.PS.RegisterTopicValidator(common.TopicIndexerRegistry, func(_ context.Context, _ pp.ID, msg *pubsub.Message) bool {
// Reject empty or syntactically invalid multiaddrs before they reach the
// message loop. A compromised native could otherwise gossip arbitrary data.
addr := string(msg.Data)
if addr == "" {
// Parse as a signed IndexerRegistration.
var reg common.IndexerRegistration
if err := json.Unmarshal(msg.Data, &reg); err != nil {
return false
}
_, err := pp.AddrInfoFromString(addr)
return err == nil
if reg.Addr == "" {
return false
}
if _, err := pp.AddrInfoFromString(reg.Addr); err != nil {
return false
}
// Verify the self-signature when present (rejects forged gossip from a
// compromised native that does not control the announced PeerID).
if ok, _ := reg.Verify(); !ok {
return false
}
// Accept only messages from known native peers or from this host itself.
// This prevents external PSK participants from injecting registry entries.
from := msg.GetFrom()
if from == ix.Host.ID() {
return true
}
common.StreamNativeMu.RLock()
_, knownNative := common.StaticNatives[from.String()]
if !knownNative {
for _, ad := range common.StaticNatives {
if ad.ID == from {
knownNative = true
break
}
}
}
common.StreamNativeMu.RUnlock()
return knownNative
})
topic, err := ix.PS.Join(common.TopicIndexerRegistry)
if err != nil {
@@ -147,18 +199,18 @@ func (ix *IndexerService) subscribeIndexerRegistry() {
if err != nil {
return
}
addr := string(msg.Data)
if addr == "" {
// The gossip payload is a JSON-encoded IndexerRegistration (signed).
var gossipReg common.IndexerRegistration
if jsonErr := json.Unmarshal(msg.Data, &gossipReg); jsonErr != nil {
continue
}
if peer, err := pp.AddrInfoFromString(addr); err == nil {
ix.Native.knownMu.Lock()
ix.Native.knownPeerIDs[peer.ID.String()] = addr
ix.Native.knownMu.Unlock()
if gossipReg.Addr == "" || gossipReg.PeerID == "" {
continue
}
// A neighbouring native registered this PeerID; add to known set for DHT refresh.
ix.Native.knownMu.Lock()
ix.Native.knownPeerIDs[gossipReg.PeerID] = gossipReg.Addr
ix.Native.knownMu.Unlock()
}
}()
}
@@ -171,86 +223,172 @@ func (ix *IndexerService) handleNativeSubscription(s network.Stream) {
logger := oclib.GetLogger()
logger.Info().Msg("Subscription")
for {
var reg common.IndexerRegistration
if err := json.NewDecoder(s).Decode(&reg); err != nil {
logger.Err(err).Msg("native subscription: decode")
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) ||
strings.Contains(err.Error(), "reset") ||
strings.Contains(err.Error(), "closed") ||
strings.Contains(err.Error(), "too many connections") {
return
}
continue
}
logger.Info().Msg("Subscription " + reg.Addr)
if reg.Addr == "" {
logger.Error().Msg("native subscription: missing addr")
return
}
if reg.PeerID == "" {
ad, err := pp.AddrInfoFromString(reg.Addr)
if err != nil {
logger.Err(err).Msg("native subscription: invalid addr")
return
}
reg.PeerID = ad.ID.String()
}
// Reject registrations with an invalid self-signature.
if ok, err := reg.Verify(); !ok {
logger.Warn().Str("peer", reg.PeerID).Err(err).Msg("native subscription: invalid signature, rejecting")
return
}
// Build entry with a fresh TTL — must happen before the cache write so the
// TTL window is not consumed by DHT retries.
entry := &liveIndexerEntry{
PeerID: reg.PeerID,
Addr: reg.Addr,
ExpiresAt: time.Now().UTC().Add(IndexerTTL),
RegTimestamp: reg.Timestamp,
PubKey: reg.PubKey,
Signature: reg.Signature,
}
// Verify that the declared address is actually reachable before admitting
// the registration. This async dial runs in the background; the indexer is
// tentatively admitted immediately (so heartbeats don't get stuck) but is
// evicted from the cache if the dial fails within 5 s.
go func(e *liveIndexerEntry) {
ad, err := pp.AddrInfoFromString(e.Addr)
if err != nil {
logger.Warn().Str("addr", e.Addr).Msg("native subscription: invalid addr during validation, rejecting")
ix.Native.liveIndexersMu.Lock()
if cur := ix.Native.liveIndexers[e.PeerID]; cur == e {
delete(ix.Native.liveIndexers, e.PeerID)
}
ix.Native.liveIndexersMu.Unlock()
return
}
dialCtx, dialCancel := context.WithTimeout(context.Background(), 5*time.Second)
defer dialCancel()
if err := ix.Host.Connect(dialCtx, *ad); err != nil {
logger.Warn().Str("addr", e.Addr).Err(err).Msg("native subscription: declared address unreachable, rejecting")
ix.Native.liveIndexersMu.Lock()
if cur := ix.Native.liveIndexers[e.PeerID]; cur == e {
delete(ix.Native.liveIndexers, e.PeerID)
}
ix.Native.liveIndexersMu.Unlock()
}
}(entry)
// Update local cache and known set immediately so concurrent GetIndexers calls
// can already see this indexer without waiting for the DHT write to complete.
ix.Native.liveIndexersMu.Lock()
_, isRenewal := ix.Native.liveIndexers[reg.PeerID]
ix.Native.liveIndexers[reg.PeerID] = entry
ix.Native.liveIndexersMu.Unlock()
ix.Native.knownMu.Lock()
ix.Native.knownPeerIDs[reg.PeerID] = reg.Addr
ix.Native.knownMu.Unlock()
// Gossip the signed registration to neighbouring natives.
// The payload is JSON-encoded so the receiver can verify the self-signature.
ix.PubsubMu.RLock()
topic := ix.LongLivedPubSubs[common.TopicIndexerRegistry]
ix.PubsubMu.RUnlock()
if topic != nil {
if gossipData, marshalErr := json.Marshal(reg); marshalErr == nil {
if err := topic.Publish(context.Background(), gossipData); err != nil {
logger.Err(err).Msg("native subscription: registry gossip publish")
}
}
}
if isRenewal {
// logger.Debug().Str("peer", reg.PeerID).Msg("native: indexer TTL renewed : " + fmt.Sprintf("%v", len(ix.Native.liveIndexers)))
} else {
logger.Info().Str("peer", reg.PeerID).Msg("native: indexer registered : " + fmt.Sprintf("%v", len(ix.Native.liveIndexers)))
}
// Persist in DHT asynchronously with bounded retry.
// Max retry window = IndexerTTL (90 s) — retrying past entry expiry is pointless.
// Backoff: 10 s → 20 s → 40 s, then repeats at 40 s until deadline.
key := ix.genIndexerKey(reg.PeerID)
data, err := json.Marshal(entry)
if err != nil {
logger.Err(err).Msg("native subscription: marshal entry")
return
}
go func() {
deadline := time.Now().Add(IndexerTTL)
backoff := 10 * time.Second
for {
if time.Now().After(deadline) {
logger.Warn().Str("key", key).Msg("native subscription: DHT put abandoned, entry TTL exceeded")
return
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err := ix.DHT.PutValue(ctx, key, data)
cancel()
if err == nil {
return
}
logger.Err(err).Msg("native subscription: DHT put " + key)
if !strings.Contains(err.Error(), "failed to find any peer in table") {
return // non-retryable error
}
remaining := time.Until(deadline)
if backoff > remaining {
backoff = remaining
}
if backoff <= 0 {
return
}
time.Sleep(backoff)
if backoff < 40*time.Second {
backoff *= 2
}
}
}()
break
}
}
// handleNativeUnsubscribe removes a departing indexer from the local cache and
// known set immediately, without waiting for TTL expiry.
func (ix *IndexerService) handleNativeUnsubscribe(s network.Stream) {
defer s.Close()
logger := oclib.GetLogger()
var reg common.IndexerRegistration
if err := json.NewDecoder(s).Decode(&reg); err != nil {
logger.Err(err).Msg("native subscription: decode")
return
}
logger.Info().Msg("Subscription " + reg.Addr)
if reg.Addr == "" {
logger.Error().Msg("native subscription: missing addr")
logger.Err(err).Msg("native unsubscribe: decode")
return
}
if reg.PeerID == "" {
ad, err := pp.AddrInfoFromString(reg.Addr)
if err != nil {
logger.Err(err).Msg("native subscription: invalid addr")
return
}
reg.PeerID = ad.ID.String()
}
// Build entry with a fresh TTL — must happen before the cache write so the 66s
// window is not consumed by DHT retries.
entry := &liveIndexerEntry{
PeerID: reg.PeerID,
Addr: reg.Addr,
ExpiresAt: time.Now().UTC().Add(IndexerTTL),
}
// Update local cache and known set immediately so concurrent GetIndexers calls
// can already see this indexer without waiting for the DHT write to complete.
ix.Native.liveIndexersMu.Lock()
_, isRenewal := ix.Native.liveIndexers[reg.PeerID]
ix.Native.liveIndexers[reg.PeerID] = entry
ix.Native.liveIndexersMu.Unlock()
ix.Native.knownMu.Lock()
ix.Native.knownPeerIDs[reg.PeerID] = reg.Addr
ix.Native.knownMu.Unlock()
// Gossip PeerID to neighbouring natives so they discover it via DHT.
ix.PubsubMu.RLock()
topic := ix.LongLivedPubSubs[common.TopicIndexerRegistry]
ix.PubsubMu.RUnlock()
if topic != nil {
if err := topic.Publish(context.Background(), []byte(reg.Addr)); err != nil {
logger.Err(err).Msg("native subscription: registry gossip publish")
}
}
if isRenewal {
logger.Debug().Str("peer", reg.PeerID).Msg("native: indexer TTL renewed : " + fmt.Sprintf("%v", len(ix.Native.liveIndexers)))
} else {
logger.Info().Str("peer", reg.PeerID).Msg("native: indexer registered : " + fmt.Sprintf("%v", len(ix.Native.liveIndexers)))
}
// Persist in DHT asynchronously — retries must not block the handler or consume
// the local cache TTL.
key := ix.genIndexerKey(reg.PeerID)
data, err := json.Marshal(entry)
if err != nil {
logger.Err(err).Msg("native subscription: marshal entry")
logger.Warn().Msg("native unsubscribe: missing peer_id")
return
}
go func() {
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if err := ix.DHT.PutValue(ctx, key, data); err != nil {
cancel()
logger.Err(err).Msg("native subscription: DHT put " + key)
if strings.Contains(err.Error(), "failed to find any peer in table") {
time.Sleep(10 * time.Second)
continue
}
return
}
cancel()
return
}
}()
ix.Native.liveIndexersMu.Lock()
delete(ix.Native.liveIndexers, reg.PeerID)
ix.Native.liveIndexersMu.Unlock()
ix.Native.knownMu.Lock()
delete(ix.Native.knownPeerIDs, reg.PeerID)
ix.Native.knownMu.Unlock()
logger.Info().Str("peer", reg.PeerID).Msg("native: indexer explicitly unregistered")
}
// handleNativeGetIndexers returns this native's own list of reachable indexers.
@@ -260,39 +398,47 @@ func (ix *IndexerService) handleNativeSubscription(s network.Stream) {
func (ix *IndexerService) handleNativeGetIndexers(s network.Stream) {
defer s.Close()
logger := oclib.GetLogger()
for {
var req common.GetIndexersRequest
if err := json.NewDecoder(s).Decode(&req); err != nil {
logger.Err(err).Msg("native get indexers: decode")
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) ||
strings.Contains(err.Error(), "reset") ||
strings.Contains(err.Error(), "closed") ||
strings.Contains(err.Error(), "too many connections") {
return
}
continue
}
if req.Count <= 0 {
req.Count = 3
}
callerPeerID := s.Conn().RemotePeer().String()
reachable := ix.reachableLiveIndexers(req.Count, callerPeerID)
var resp common.GetIndexersResponse
var req common.GetIndexersRequest
if err := json.NewDecoder(s).Decode(&req); err != nil {
logger.Err(err).Msg("native get indexers: decode")
return
}
if req.Count <= 0 {
req.Count = 3
}
callerPeerID := s.Conn().RemotePeer().String()
reachable := ix.reachableLiveIndexers(req.Count, callerPeerID)
var resp common.GetIndexersResponse
if len(reachable) == 0 {
// No live indexers reachable — try to self-delegate.
if ix.selfDelegate(s.Conn().RemotePeer(), &resp) {
logger.Info().Str("peer", callerPeerID).Msg("native: no indexers, acting as fallback for node")
if len(reachable) == 0 {
// No live indexers reachable — try to self-delegate.
if ix.selfDelegate(s.Conn().RemotePeer(), &resp) {
logger.Info().Str("peer", callerPeerID).Msg("native: no indexers, acting as fallback for node")
} else {
// Fallback pool saturated: return empty so the caller retries another
// native instead of piling more load onto this one.
logger.Warn().Str("peer", callerPeerID).Int("pool", maxFallbackPeers).Msg(
"native: fallback pool saturated, refusing self-delegation")
}
} else {
// Fallback pool saturated: return empty so the caller retries another
// native instead of piling more load onto this one.
logger.Warn().Str("peer", callerPeerID).Int("pool", maxFallbackPeers).Msg(
"native: fallback pool saturated, refusing self-delegation")
rand.Shuffle(len(reachable), func(i, j int) { reachable[i], reachable[j] = reachable[j], reachable[i] })
if req.Count > len(reachable) {
req.Count = len(reachable)
}
resp.Indexers = reachable[:req.Count]
}
} else {
rand.Shuffle(len(reachable), func(i, j int) { reachable[i], reachable[j] = reachable[j], reachable[i] })
if req.Count > len(reachable) {
req.Count = len(reachable)
}
resp.Indexers = reachable[:req.Count]
}
if err := json.NewEncoder(s).Encode(resp); err != nil {
logger.Err(err).Msg("native get indexers: encode response")
if err := json.NewEncoder(s).Encode(resp); err != nil {
logger.Err(err).Msg("native get indexers: encode response")
}
break
}
}
@@ -303,39 +449,47 @@ func (ix *IndexerService) handleNativeGetIndexers(s network.Stream) {
func (ix *IndexerService) handleNativeConsensus(s network.Stream) {
defer s.Close()
logger := oclib.GetLogger()
var req common.ConsensusRequest
if err := json.NewDecoder(s).Decode(&req); err != nil {
logger.Err(err).Msg("native consensus: decode")
return
}
myList := ix.reachableLiveIndexers(-1, s.Conn().RemotePeer().String())
mySet := make(map[string]struct{}, len(myList))
for _, addr := range myList {
mySet[addr] = struct{}{}
}
trusted := []string{}
candidateSet := make(map[string]struct{}, len(req.Candidates))
for _, addr := range req.Candidates {
candidateSet[addr] = struct{}{}
if _, ok := mySet[addr]; ok {
trusted = append(trusted, addr) // candidate we also confirm as reachable
for {
var req common.ConsensusRequest
if err := json.NewDecoder(s).Decode(&req); err != nil {
logger.Err(err).Msg("native consensus: decode")
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) ||
strings.Contains(err.Error(), "reset") ||
strings.Contains(err.Error(), "closed") ||
strings.Contains(err.Error(), "too many connections") {
return
}
continue
}
}
// Extras we trust but that the requester didn't include → suggestions.
suggestions := []string{}
for _, addr := range myList {
if _, inCandidates := candidateSet[addr]; !inCandidates {
suggestions = append(suggestions, addr)
myList := ix.reachableLiveIndexers(-1, s.Conn().RemotePeer().String())
mySet := make(map[string]struct{}, len(myList))
for _, addr := range myList {
mySet[addr] = struct{}{}
}
}
resp := common.ConsensusResponse{Trusted: trusted, Suggestions: suggestions}
if err := json.NewEncoder(s).Encode(resp); err != nil {
logger.Err(err).Msg("native consensus: encode response")
trusted := []string{}
candidateSet := make(map[string]struct{}, len(req.Candidates))
for _, addr := range req.Candidates {
candidateSet[addr] = struct{}{}
if _, ok := mySet[addr]; ok {
trusted = append(trusted, addr) // candidate we also confirm as reachable
}
}
// Extras we trust but that the requester didn't include → suggestions.
suggestions := []string{}
for _, addr := range myList {
if _, inCandidates := candidateSet[addr]; !inCandidates {
suggestions = append(suggestions, addr)
}
}
resp := common.ConsensusResponse{Trusted: trusted, Suggestions: suggestions}
if err := json.NewEncoder(s).Encode(resp); err != nil {
logger.Err(err).Msg("native consensus: encode response")
}
break
}
}
@@ -406,11 +560,16 @@ func (ix *IndexerService) reachableLiveIndexers(count int, from ...string) []str
// refreshIndexersFromDHT runs in background and queries the shared DHT for every known
// indexer PeerID whose local cache entry is missing or expired. This supplements the
// local cache with entries written by neighbouring natives.
func (ix *IndexerService) refreshIndexersFromDHT() {
func (ix *IndexerService) refreshIndexersFromDHT(ctx context.Context) {
t := time.NewTicker(dhtRefreshInterval)
defer t.Stop()
logger := oclib.GetLogger()
for range t.C {
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
ix.Native.knownMu.RLock()
peerIDs := make([]string, 0, len(ix.Native.knownPeerIDs))
for pid := range ix.Native.knownPeerIDs {
@@ -427,10 +586,10 @@ func (ix *IndexerService) refreshIndexersFromDHT() {
continue // still fresh in local cache
}
key := ix.genIndexerKey(pid)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ch, err := ix.DHT.SearchValue(ctx, key)
dhtCtx, dhtCancel := context.WithTimeout(context.Background(), 5*time.Second)
ch, err := ix.DHT.SearchValue(dhtCtx, key)
if err != nil {
cancel()
dhtCancel()
continue
}
var best *liveIndexerEntry
@@ -445,7 +604,7 @@ func (ix *IndexerService) refreshIndexersFromDHT() {
}
}
}
cancel()
dhtCancel()
if best != nil {
ix.Native.liveIndexersMu.Lock()
ix.Native.liveIndexers[best.PeerID] = best
@@ -468,11 +627,16 @@ func (ix *IndexerService) genIndexerKey(peerID string) string {
// runOffloadLoop periodically checks if real indexers are available and releases
// responsible peers so they can reconnect to actual indexers on their next attempt.
func (ix *IndexerService) runOffloadLoop() {
func (ix *IndexerService) runOffloadLoop(ctx context.Context) {
t := time.NewTicker(offloadInterval)
defer t.Stop()
logger := oclib.GetLogger()
for range t.C {
for {
select {
case <-ctx.Done():
return
case <-t.C:
}
fmt.Println("runOffloadLoop", ix.Native.responsiblePeers)
ix.Native.responsibleMu.RLock()
count := len(ix.Native.responsiblePeers)
@@ -540,38 +704,46 @@ func (ix *IndexerService) runOffloadLoop() {
func (ix *IndexerService) handleNativeGetPeers(s network.Stream) {
defer s.Close()
logger := oclib.GetLogger()
var req common.GetNativePeersRequest
if err := json.NewDecoder(s).Decode(&req); err != nil {
logger.Err(err).Msg("native get peers: decode")
return
}
if req.Count <= 0 {
req.Count = 1
}
excludeSet := make(map[string]struct{}, len(req.Exclude))
for _, e := range req.Exclude {
excludeSet[e] = struct{}{}
}
common.StreamNativeMu.RLock()
candidates := make([]string, 0, len(common.StaticNatives))
for addr := range common.StaticNatives {
if _, excluded := excludeSet[addr]; !excluded {
candidates = append(candidates, addr)
for {
var req common.GetNativePeersRequest
if err := json.NewDecoder(s).Decode(&req); err != nil {
logger.Err(err).Msg("native get peers: decode")
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) ||
strings.Contains(err.Error(), "reset") ||
strings.Contains(err.Error(), "closed") ||
strings.Contains(err.Error(), "too many connections") {
return
}
continue
}
if req.Count <= 0 {
req.Count = 1
}
}
common.StreamNativeMu.RUnlock()
rand.Shuffle(len(candidates), func(i, j int) { candidates[i], candidates[j] = candidates[j], candidates[i] })
if req.Count > len(candidates) {
req.Count = len(candidates)
}
excludeSet := make(map[string]struct{}, len(req.Exclude))
for _, e := range req.Exclude {
excludeSet[e] = struct{}{}
}
resp := common.GetNativePeersResponse{Peers: candidates[:req.Count]}
if err := json.NewEncoder(s).Encode(resp); err != nil {
logger.Err(err).Msg("native get peers: encode response")
common.StreamNativeMu.RLock()
candidates := make([]string, 0, len(common.StaticNatives))
for addr := range common.StaticNatives {
if _, excluded := excludeSet[addr]; !excluded {
candidates = append(candidates, addr)
}
}
common.StreamNativeMu.RUnlock()
rand.Shuffle(len(candidates), func(i, j int) { candidates[i], candidates[j] = candidates[j], candidates[i] })
if req.Count > len(candidates) {
req.Count = len(candidates)
}
resp := common.GetNativePeersResponse{Peers: candidates[:req.Count]}
if err := json.NewEncoder(s).Encode(resp); err != nil {
logger.Err(err).Msg("native get peers: encode response")
}
break
}
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"oc-discovery/conf"
"oc-discovery/daemons/node/common"
"strings"
"sync"
oclib "cloud.o-forge.io/core/oc-lib"
@@ -60,9 +61,19 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int, isNative boo
}
}
if ix.DHT, err = dht.New(
context.Background(),
ix.Host,
// Parse bootstrap peers from configured native/indexer addresses so that the
// DHT can find its routing table entries even in a fresh deployment.
var bootstrapPeers []pp.AddrInfo
for _, addrStr := range strings.Split(conf.GetConfig().NativeIndexerAddresses+","+conf.GetConfig().IndexerAddresses, ",") {
addrStr = strings.TrimSpace(addrStr)
if addrStr == "" {
continue
}
if ad, err := pp.AddrInfoFromString(addrStr); err == nil {
bootstrapPeers = append(bootstrapPeers, *ad)
}
}
dhtOpts := []dht.Option{
dht.Mode(dht.ModeServer),
dht.ProtocolPrefix("oc"), // 🔥 réseau privé
dht.Validator(record.NamespacedValidator{
@@ -71,7 +82,11 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int, isNative boo
"name": DefaultValidator{},
"pid": DefaultValidator{},
}),
); err != nil {
}
if len(bootstrapPeers) > 0 {
dhtOpts = append(dhtOpts, dht.BootstrapPeers(bootstrapPeers...))
}
if ix.DHT, err = dht.New(context.Background(), ix.Host, dhtOpts...); err != nil {
logger.Info().Msg(err.Error())
return nil
}
@@ -90,6 +105,16 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int, isNative boo
}
func (ix *IndexerService) Close() {
if ix.Native != nil && ix.Native.cancel != nil {
ix.Native.cancel()
}
// Explicitly deregister from natives on clean shutdown so they evict this
// indexer immediately rather than waiting for TTL expiry (~90 s).
if !ix.IsNative {
if nativeAddrs := conf.GetConfig().NativeIndexerAddresses; nativeAddrs != "" {
common.UnregisterFromNative(ix.Host, nativeAddrs)
}
}
ix.DHT.Close()
ix.PS.UnregisterTopicValidator(common.TopicPubSubSearch)
if ix.nameIndex != nil {

View File

@@ -117,7 +117,7 @@ func ListenNATS(n *Node) {
proto = stream.ProtocolMinioConfigResource
}
if err := json.Unmarshal(resp.Payload, &m); err == nil {
peers, _ := n.GetPeerRecord(context.Background(), m.PeerID)
peers, _ := n.GetPeerRecord(context.Background(), m.PeerID, false)
for _, p := range peers {
n.StreamService.PublishCommon(&resp.Datatype, resp.User,
p.PeerID, proto, resp.Payload)
@@ -137,7 +137,7 @@ func ListenNATS(n *Node) {
var m executionConsidersPayload
if err := json.Unmarshal(resp.Payload, &m); err == nil {
for _, p := range m.PeerIDs {
peers, _ := n.GetPeerRecord(context.Background(), p)
peers, _ := n.GetPeerRecord(context.Background(), p, false)
for _, pp := range peers {
n.StreamService.PublishCommon(&resp.Datatype, resp.User,
pp.PeerID, stream.ProtocolConsidersResource, resp.Payload)
@@ -150,7 +150,7 @@ func ListenNATS(n *Node) {
OriginID string `json:"origin_id"`
}
if err := json.Unmarshal(propalgation.Payload, &m); err == nil && m.OriginID != "" {
peers, _ := n.GetPeerRecord(context.Background(), m.OriginID)
peers, _ := n.GetPeerRecord(context.Background(), m.OriginID, false)
for _, p := range peers {
n.StreamService.PublishCommon(nil, resp.User,
p.PeerID, stream.ProtocolConsidersResource, propalgation.Payload)
@@ -192,7 +192,7 @@ func ListenNATS(n *Node) {
if propalgation.DataType == int(tools.PEER) {
m := map[string]interface{}{}
if err := json.Unmarshal(propalgation.Payload, &m); err == nil {
if peers, err := n.GetPeerRecord(context.Background(), fmt.Sprintf("%v", m["search"])); err == nil {
if peers, err := n.GetPeerRecord(context.Background(), fmt.Sprintf("%v", m["search"]), true); err == nil {
for _, p := range peers {
if b, err := json.Marshal(p); err == nil {
go tools.NewNATSCaller().SetNATSPub(tools.SEARCH_EVENT, tools.NATSResponse{

View File

@@ -5,7 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"maps"
"oc-discovery/conf"
"oc-discovery/daemons/node/common"
"oc-discovery/daemons/node/indexer"
@@ -110,7 +109,6 @@ func InitNode(isNode bool, isIndexer bool, isNativeIndexer bool) (*Node, error)
if _, err := node.claimInfo(conf.GetConfig().Name, conf.GetConfig().Hostname); err != nil {
panic(err)
}
logger.Info().Msg("subscribe to decentralized search flow...")
logger.Info().Msg("run garbage collector...")
node.StartGC(30 * time.Second)
@@ -122,10 +120,17 @@ func InitNode(isNode bool, isIndexer bool, isNativeIndexer bool) (*Node, error)
panic(err)
}
f := func(ctx context.Context, evt common.Event, topic string) {
if p, err := node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 {
node.StreamService.SendResponse(p[0], &evt)
m := map[string]interface{}{}
err := json.Unmarshal(evt.Payload, &m)
if err != nil || evt.From == node.PeerID.String() {
fmt.Println(evt.From, node.PeerID.String(), err)
return
}
if p, err := node.GetPeerRecord(ctx, evt.From, false); err == nil && len(p) > 0 && m["search"] != nil {
node.StreamService.SendResponse(p[0], &evt, fmt.Sprintf("%v", m["search"]))
}
}
logger.Info().Msg("subscribe to decentralized search flow...")
node.SubscribeToSearch(node.PS, &f)
logger.Info().Msg("connect to NATS")
go ListenNATS(node)
@@ -187,6 +192,7 @@ func (d *Node) publishPeerRecord(
func (d *Node) GetPeerRecord(
ctx context.Context,
pidOrdid string,
search bool,
) ([]*peer.Peer, error) {
var err error
var info map[string]indexer.PeerRecord
@@ -200,16 +206,14 @@ func (d *Node) GetPeerRecord(
// Build the GetValue request: if pidOrdid is neither a UUID DID nor a libp2p
// PeerID, treat it as a human-readable name and let the indexer resolve it.
getReq := indexer.GetValue{Key: pidOrdid}
isNameSearch := false
if pidR, pidErr := pp.Decode(pidOrdid); pidErr == nil {
getReq.PeerID = pidR
getReq.PeerID = pidR.String()
} else if _, uuidErr := uuid.Parse(pidOrdid); uuidErr != nil {
// Not a UUID DID → treat pidOrdid as a name substring search.
getReq.Name = pidOrdid
getReq.Key = ""
isNameSearch = true
}
getReq.Search = search
for _, ad := range indexerSnapshot2 {
if common.StreamIndexers, err = common.TempStream(d.Host, *ad, common.ProtocolGet, "",
common.StreamIndexers, map[protocol.ID]*common.ProtocolInfo{}, &common.StreamMuIndexes); err != nil {
@@ -224,28 +228,17 @@ func (d *Node) GetPeerRecord(
continue
}
if resp.Found {
if info == nil {
info = resp.Records
} else {
// Aggregate results from all indexers for name searches.
maps.Copy(info, resp.Records)
}
// For exact lookups (PeerID / DID) stop at the first hit.
if !isNameSearch {
break
}
info = resp.Records
}
break
}
var ps []*peer.Peer
for _, pr := range info {
if pk, err := pr.Verify(); err != nil {
return nil, err
} else if ok, p, err := pr.ExtractPeer(d.PeerID.String(), pr.PeerID, pk); err != nil {
} else if _, p, err := pr.ExtractPeer(d.PeerID.String(), pr.PeerID, pk); err != nil {
return nil, err
} else {
if ok {
d.publishPeerRecord(&pr)
}
ps = append(ps, p)
}
}
@@ -316,6 +309,17 @@ func (d *Node) claimInfo(
return nil, err
} else {
_, p, err := rec.ExtractPeer(did, did, pub)
b, err := json.Marshal(p)
if err != nil {
return p, err
}
go tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{
FromApp: "oc-discovery",
Datatype: tools.PEER,
Method: int(tools.CREATE_RESOURCE),
SearchAttr: "peer_id",
Payload: b,
})
return p, err
}
}

View File

@@ -1,40 +0,0 @@
package pubsub
import (
"context"
"oc-discovery/daemons/node/common"
"cloud.o-forge.io/core/oc-lib/tools"
)
func (ps *PubSubService) handleEvent(ctx context.Context, topicName string, evt *common.Event) error {
action := ps.getTopicName(topicName)
if err := ps.handleEventSearch(ctx, evt, action); err != nil {
return err
}
return nil
}
func (ps *PubSubService) handleEventSearch( // only : on partner followings. 3 canals for every partner.
ctx context.Context,
evt *common.Event,
action tools.PubSubAction,
) error {
if !(action == tools.PB_SEARCH) {
return nil
}
if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 { // peerFrom is Unique
if err := evt.Verify(p[0]); err != nil {
return err
}
switch action {
case tools.PB_SEARCH: // when someone ask for search.
if err := ps.StreamService.SendResponse(p[0], evt); err != nil {
return err
}
default:
return nil
}
}
return nil
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"oc-discovery/daemons/node/common"
"oc-discovery/daemons/node/stream"
"oc-discovery/models"
@@ -20,15 +21,7 @@ func (ps *PubSubService) SearchPublishEvent(
}
switch typ {
case "known": // define Search Strategy
return ps.StreamService.PublishesCommon(dt, user, &dbs.Filters{ // filter by like name, short_description, description, owner, url if no filters are provided
And: map[string][]dbs.Filter{
"": {{Operator: dbs.NOT.String(), Value: dbs.Filters{ // filter by like name, short_description, description, owner, url if no filters are provided
And: map[string][]dbs.Filter{
"relation": {{Operator: dbs.EQUAL.String(), Value: peer.BLACKLIST}},
},
}}},
},
}, b, stream.ProtocolSearchResource) //if partners focus only them*/
return ps.StreamService.PublishesCommon(dt, user, nil, b, stream.ProtocolSearchResource) //if partners focus only them*/
case "partner": // define Search Strategy
return ps.StreamService.PublishesCommon(dt, user, &dbs.Filters{ // filter by like name, short_description, description, owner, url if no filters are provided
And: map[string][]dbs.Filter{
@@ -40,23 +33,26 @@ func (ps *PubSubService) SearchPublishEvent(
if err != nil {
return err
}
return ps.publishEvent(ctx, dt, tools.PB_SEARCH, user, b)
return ps.publishEvent(ctx, dt, tools.PB_SEARCH, common.TopicPubSubSearch, user, b)
default:
return errors.New("no type of research found")
}
}
func (ps *PubSubService) publishEvent(
ctx context.Context, dt *tools.DataType, action tools.PubSubAction, user string, payload []byte,
ctx context.Context, dt *tools.DataType, action tools.PubSubAction, topicName string, user string, payload []byte,
) error {
priv, err := tools.LoadKeyFromFilePrivate()
if err != nil {
return err
}
msg, _ := json.Marshal(models.NewEvent(action.String(), ps.Host.ID().String(), dt, user, payload, priv))
topic, err := ps.PS.Join(action.String())
if err != nil {
return err
topic := ps.Node.GetPubSub(topicName)
if topic == nil {
topic, err = ps.PS.Join(topicName)
if err != nil {
return err
}
}
return topic.Publish(ctx, msg)
}

View File

@@ -4,17 +4,13 @@ import (
"context"
"oc-discovery/daemons/node/common"
"oc-discovery/daemons/node/stream"
"strings"
"sync"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/tools"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
)
type PubSubService struct {
*common.LongLivedPubSubService
Node common.DiscoveryPeer
Host host.Host
PS *pubsub.PubSub
@@ -24,24 +20,12 @@ type PubSubService struct {
}
func InitPubSub(ctx context.Context, h host.Host, ps *pubsub.PubSub, node common.DiscoveryPeer, streamService *stream.StreamService) (*PubSubService, error) {
service := &PubSubService{
LongLivedPubSubService: common.NewLongLivedPubSubService(h),
Node: node,
StreamService: streamService,
PS: ps,
}
logger := oclib.GetLogger()
logger.Info().Msg("subscribe to events...")
service.initSubscribeEvents(ctx)
return service, nil
}
func (ps *PubSubService) getTopicName(topicName string) tools.PubSubAction {
ns := strings.Split(topicName, ".")
if len(ns) > 0 {
return tools.GetActionString(ns[0])
}
return tools.NONE
return &PubSubService{
Host: h,
Node: node,
StreamService: streamService,
PS: ps,
}, nil
}
func (ix *PubSubService) Close() {

View File

@@ -1,45 +0,0 @@
package pubsub
import (
"context"
"oc-discovery/daemons/node/common"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/tools"
)
func (ps *PubSubService) initSubscribeEvents(ctx context.Context) error {
if err := ps.subscribeEvents(ctx, nil, tools.PB_SEARCH, ""); err != nil {
return err
}
return nil
}
// generic function to subscribe to DHT flow of event
func (ps *PubSubService) subscribeEvents(
ctx context.Context, dt *tools.DataType, action tools.PubSubAction, peerID string,
) error {
logger := oclib.GetLogger()
// define a name app.action#peerID
name := action.String() + "#" + peerID
if dt != nil { // if a datatype is precised then : app.action.datatype#peerID
name = action.String() + "." + (*dt).String() + "#" + peerID
}
f := func(ctx context.Context, evt common.Event, topicName string) {
if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 {
if err := ps.processEvent(ctx, p[0], &evt, topicName); err != nil {
logger.Err(err)
}
}
}
return common.SubscribeEvents(ps.LongLivedPubSubService, ctx, name, -1, f)
}
func (ps *PubSubService) processEvent(
ctx context.Context, p *peer.Peer, event *common.Event, topicName string) error {
if err := event.Verify(p); err != nil {
return err
}
return ps.handleEvent(ctx, topicName, event)
}

View File

@@ -9,6 +9,7 @@ import (
"oc-discovery/daemons/node/common"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/booking/planner"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
@@ -152,16 +153,34 @@ func (abs *StreamService) pass(event *common.Event, action tools.PubSubAction) e
func (ps *StreamService) handleEventFromPartner(evt *common.Event, protocol string) error {
switch protocol {
case ProtocolSearchResource:
if evt.DataType < 0 {
m := map[string]interface{}{}
err := json.Unmarshal(evt.Payload, &m)
if err != nil {
return err
}
if search, ok := m["search"]; ok {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
peers := access.Search(nil, evt.From, false)
peers := access.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"peer_id": {{Operator: dbs.EQUAL.String(), Value: evt.From}},
},
}, evt.From, false)
if len(peers.Data) > 0 {
p := peers.Data[0].(*peer.Peer)
// TODO : something if peer is missing in our side !
ps.SendResponse(p, evt)
} else if p, err := ps.Node.GetPeerRecord(context.Background(), evt.From); err == nil && len(p) > 0 { // peer from is peerID
ps.SendResponse(p[0], evt)
fmt.Println(evt.From, p.GetID(), peers.Data)
ps.SendResponse(p, evt, fmt.Sprintf("%v", search))
} else if p, err := ps.Node.GetPeerRecord(context.Background(), evt.From, false); err == nil && len(p) > 0 { // peer from is peerID
ps.SendResponse(p[0], evt, fmt.Sprintf("%v", search))
}
} else {
fmt.Println("SEND SEARCH_EVENT SetNATSPub", m)
go tools.NewNATSCaller().SetNATSPub(tools.SEARCH_EVENT, tools.NATSResponse{
FromApp: "oc-discovery",
Datatype: tools.DataType(evt.DataType),
Method: int(tools.SEARCH_EVENT),
Payload: evt.Payload,
})
}
case ProtocolCreateResource, ProtocolUpdateResource:
fmt.Println("RECEIVED Protocol.Update")
@@ -184,32 +203,29 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, protocol stri
return nil
}
func (abs *StreamService) SendResponse(p *peer.Peer, event *common.Event) error {
dts := []oclib.LibDataEnum{oclib.LibDataEnum(event.DataType)}
func (abs *StreamService) SendResponse(p *peer.Peer, event *common.Event, search string) error {
dts := []tools.DataType{tools.DataType(event.DataType)}
if event.DataType == -1 { // expect all resources
dts = []oclib.LibDataEnum{
oclib.LibDataEnum(oclib.COMPUTE_RESOURCE),
oclib.LibDataEnum(oclib.STORAGE_RESOURCE),
oclib.LibDataEnum(oclib.PROCESSING_RESOURCE),
oclib.LibDataEnum(oclib.DATA_RESOURCE),
oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE)}
dts = []tools.DataType{
tools.COMPUTE_RESOURCE,
tools.STORAGE_RESOURCE,
tools.PROCESSING_RESOURCE,
tools.DATA_RESOURCE,
tools.WORKFLOW_RESOURCE,
}
}
var m map[string]string
err := json.Unmarshal(event.Payload, &m)
if err != nil {
if self, err := oclib.GetMySelf(); err != nil {
return err
}
for _, dt := range dts {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(event.DataType), nil)
peerID := p.GetID()
searched := access.Search(abs.FilterPeer(peerID, m["search"]), "", false)
for _, ss := range searched.Data {
if j, err := json.Marshal(ss); err == nil {
if event.DataType != -1 {
ndt := tools.DataType(dt.EnumIndex())
abs.PublishCommon(&ndt, event.User, peerID, ProtocolSearchResource, j)
} else {
abs.PublishCommon(nil, event.User, peerID, ProtocolSearchResource, j)
} else {
for _, dt := range dts {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil)
peerID := p.GetID()
searched := access.Search(abs.FilterPeer(self.GetID(), search), "", false)
fmt.Println("SEND SEARCH_EVENT", self.GetID(), dt, len(searched.Data), peerID)
for _, ss := range searched.Data {
if j, err := json.Marshal(ss); err == nil {
_, err := abs.PublishCommon(&dt, event.User, p.PeerID, ProtocolSearchResource, j)
fmt.Println("Publish ERR", err)
}
}
}

View File

@@ -17,7 +17,12 @@ import (
func (ps *StreamService) PublishesCommon(dt *tools.DataType, user string, filter *dbs.Filters, resource []byte, protos ...protocol.ID) error {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
p := access.Search(filter, "", false)
var p oclib.LibDataShallow
if filter == nil {
p = access.LoadAll(false)
} else {
p = access.Search(filter, "", false)
}
for _, pes := range p.Data {
for _, proto := range protos {
if _, err := ps.PublishCommon(dt, user, pes.(*peer.Peer).PeerID, proto, resource); err != nil {
@@ -31,6 +36,7 @@ func (ps *StreamService) PublishesCommon(dt *tools.DataType, user string, filter
func (ps *StreamService) PublishCommon(dt *tools.DataType, user string, toPeerID string, proto protocol.ID, resource []byte) (*common.Stream, error) {
fmt.Println("PublishCommon")
if toPeerID == ps.Key.String() {
fmt.Println("Can't send to ourself !")
return nil, errors.New("Can't send to ourself !")
}
@@ -43,14 +49,15 @@ func (ps *StreamService) PublishCommon(dt *tools.DataType, user string, toPeerID
var pe *peer.Peer
if len(p.Data) > 0 && p.Data[0].(*peer.Peer).Relation != peer.BLACKLIST {
pe = p.Data[0].(*peer.Peer)
} else if pps, err := ps.Node.GetPeerRecord(context.Background(), toPeerID); err == nil && len(pps) > 0 {
} else if pps, err := ps.Node.GetPeerRecord(context.Background(), toPeerID, false); err == nil && len(pps) > 0 {
pe = pps[0]
}
if pe != nil {
ad, err := pp.AddrInfoFromString(p.Data[0].(*peer.Peer).StreamAddress)
ad, err := pp.AddrInfoFromString(pe.StreamAddress)
if err != nil {
return nil, err
}
fmt.Println("WRITE")
return ps.write(toPeerID, ad, dt, user, resource, proto)
}
return nil, errors.New("peer unvalid " + toPeerID)
@@ -100,11 +107,19 @@ func (ps *StreamService) ToPartnerPublishEvent(
for k := range protocolsPartners {
ks = append(ks, k)
}
var proto protocol.ID
proto = ProtocolCreateResource
switch action {
case tools.PB_DELETE:
proto = ProtocolDeleteResource
case tools.PB_UPDATE:
proto = ProtocolUpdateResource
}
ps.PublishesCommon(dt, user, &dbs.Filters{ // filter by like name, short_description, description, owner, url if no filters are provided
And: map[string][]dbs.Filter{
"relation": {{Operator: dbs.EQUAL.String(), Value: peer.PARTNER}},
},
}, payload, ks...)
}, payload, proto)
return nil
}
@@ -129,13 +144,21 @@ func (s *StreamService) write(
return nil, errors.New("no stream available for protocol " + fmt.Sprintf("%v", proto) + " from PID " + peerID.ID.String())
}
stream := s.Streams[proto][peerID.ID]
evt := common.NewEvent(string(proto), peerID.ID.String(), dt, user, payload)
fmt.Println("SEND EVENT ", evt.From, evt.DataType, evt.Timestamp)
if err := json.NewEncoder(stream.Stream).Encode(evt); err != nil {
stream.Stream.Close()
logger.Err(err)
if self, err := oclib.GetMySelf(); err != nil {
return nil, err
} else {
stream := s.Streams[proto][peerID.ID]
evt := common.NewEvent(string(proto), self.PeerID, dt, user, payload)
fmt.Println("SEND EVENT ", peerID, proto, evt.From, evt.DataType, evt.Timestamp)
if err := json.NewEncoder(stream.Stream).Encode(evt); err != nil {
stream.Stream.Close()
logger.Err(err)
return nil, err
}
if protocolInfo, ok := protocols[proto]; ok && protocolInfo.WaitResponse {
go s.readLoop(stream, peerID.ID, proto, &common.ProtocolInfo{PersistantStream: true})
}
return stream, nil
}
return stream, nil
}

View File

@@ -3,7 +3,9 @@ package stream
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"oc-discovery/conf"
"oc-discovery/daemons/node/common"
"strings"
@@ -82,6 +84,7 @@ func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node c
func (s *StreamService) HandleResponse(stream network.Stream) {
s.Mu.Lock()
defer s.Mu.Unlock()
stream.Protocol()
if s.Streams[stream.Protocol()] == nil {
s.Streams[stream.Protocol()] = map[pp.ID]*common.Stream{}
@@ -98,8 +101,6 @@ func (s *StreamService) HandleResponse(stream network.Stream) {
Stream: stream,
Expiry: time.Now().UTC().Add(expiry + 1*time.Minute),
}
s.Mu.Unlock()
go s.readLoop(s.Streams[stream.Protocol()][stream.Conn().RemotePeer()],
stream.Conn().RemotePeer(),
stream.Protocol(), protocols[stream.Protocol()])
@@ -256,7 +257,13 @@ func (ps *StreamService) readLoop(s *common.Stream, id pp.ID, proto protocol.ID,
if err := json.NewDecoder(s.Stream).Decode(&evt); err != nil {
// Any decode error (EOF, reset, malformed JSON) terminates the loop;
// continuing on a dead/closed stream creates an infinite spin.
return
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) ||
strings.Contains(err.Error(), "reset") ||
strings.Contains(err.Error(), "closed") ||
strings.Contains(err.Error(), "too many connections") {
return
}
continue
}
ps.handleEvent(evt.Type, &evt)
if protocolInfo.WaitResponse && !protocolInfo.PersistantStream {
@@ -266,15 +273,15 @@ func (ps *StreamService) readLoop(s *common.Stream, id pp.ID, proto protocol.ID,
}
func (abs *StreamService) FilterPeer(peerID string, search string) *dbs.Filters {
id, err := oclib.GetMySelf()
p, err := oclib.GetMySelf()
if err != nil {
return nil
}
filter := map[string][]dbs.Filter{
"creator_id": {{Operator: dbs.EQUAL.String(), Value: id}}, // is my resource...
"abstractinstanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: p.GetID()}}, // is my resource...
"": {{Operator: dbs.OR.String(), Value: &dbs.Filters{
Or: map[string][]dbs.Filter{
"abstractobject.access_mode": {{Operator: dbs.EQUAL.String(), Value: 1}}, // if public
"abstractinstanciatedresource.abstractresource.abstractobject.access_mode": {{Operator: dbs.EQUAL.String(), Value: 1}}, // if public
"abstractinstanciatedresource.instances": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{ // or got a partners instances
And: map[string][]dbs.Filter{
"resourceinstance.partnerships": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{
@@ -287,15 +294,15 @@ func (abs *StreamService) FilterPeer(peerID string, search string) *dbs.Filters
},
}}},
}
if search != "" {
filter[" "] = []dbs.Filter{{Operator: dbs.OR.String(), Value: &dbs.Filters{
Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided
"abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}},
"abstractinstanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractinstanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractinstanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractinstanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}},
"abstractinstanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}},
},
}}}
}

2
go.mod
View File

@@ -3,7 +3,7 @@ module oc-discovery
go 1.25.0
require (
cloud.o-forge.io/core/oc-lib v0.0.0-20260302152414-542b0b73aba5
cloud.o-forge.io/core/oc-lib v0.0.0-20260304145747-e03a0d3dd0aa
github.com/libp2p/go-libp2p v0.47.0
github.com/libp2p/go-libp2p-record v0.3.1
github.com/multiformats/go-multiaddr v0.16.1

4
go.sum
View File

@@ -8,6 +8,10 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260226091217-cb3771c17a31 h1:hvkvJibS9NmIm
cloud.o-forge.io/core/oc-lib v0.0.0-20260226091217-cb3771c17a31/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
cloud.o-forge.io/core/oc-lib v0.0.0-20260302152414-542b0b73aba5 h1:h+Fkyj6cfwAirc0QGCBEkZSSrgcyThXswg7ytOLm948=
cloud.o-forge.io/core/oc-lib v0.0.0-20260302152414-542b0b73aba5/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
cloud.o-forge.io/core/oc-lib v0.0.0-20260304143917-340f2a6301b7 h1:RZGV3ttkfoKIigUb7T+M5Kq+YtqW/td45EmNYeW5u8k=
cloud.o-forge.io/core/oc-lib v0.0.0-20260304143917-340f2a6301b7/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
cloud.o-forge.io/core/oc-lib v0.0.0-20260304145747-e03a0d3dd0aa h1:1wCpI4dwN1pj6MlpJ7/WifhHVHmCE4RU+9klwqgo/bk=
cloud.o-forge.io/core/oc-lib v0.0.0-20260304145747-e03a0d3dd0aa/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=