diff --git a/conf/config.go b/conf/config.go index 55f813f..554c097 100644 --- a/conf/config.go +++ b/conf/config.go @@ -18,6 +18,11 @@ type Config struct { MinIndexer int MaxIndexer int + + // ConsensusQuorum is the minimum fraction of natives that must agree for a + // candidate indexer to be considered confirmed. Range (0, 1]. Default 0.5 + // (strict majority). Raise to 0.67 for stronger Byzantine resistance. + ConsensusQuorum float64 } var instance *Config diff --git a/daemons/node/common/common_pubsub.go b/daemons/node/common/common_pubsub.go index 11f5941..15faba6 100644 --- a/daemons/node/common/common_pubsub.go +++ b/daemons/node/common/common_pubsub.go @@ -112,6 +112,12 @@ func NewLongLivedPubSubService(h host.Host) *LongLivedPubSubService { } } +func (s *LongLivedPubSubService) GetPubSub(topicName string) *pubsub.Topic { + s.PubsubMu.Lock() + defer s.PubsubMu.Unlock() + return s.LongLivedPubSubs[topicName] +} + func (s *LongLivedPubSubService) processEvent( ctx context.Context, p *peer.Peer, @@ -123,26 +129,8 @@ func (s *LongLivedPubSubService) processEvent( return handler(ctx, topicName, event) } -const TopicPubSubNodeActivity = "oc-node-activity" const TopicPubSubSearch = "oc-node-search" -func (s *LongLivedPubSubService) SubscribeToNodeActivity(ps *pubsub.PubSub, f *func(context.Context, TopicNodeActivityPub, string)) error { - ps.RegisterTopicValidator(TopicPubSubNodeActivity, func(ctx context.Context, p pp.ID, m *pubsub.Message) bool { - return true - }) - if topic, err := ps.Join(TopicPubSubNodeActivity); err != nil { - return err - } else { - s.PubsubMu.Lock() - defer s.PubsubMu.Unlock() - s.LongLivedPubSubs[TopicPubSubNodeActivity] = topic - } - if f != nil { - return SubscribeEvents(s, context.Background(), TopicPubSubNodeActivity, -1, *f) - } - return nil -} - func (s *LongLivedPubSubService) SubscribeToSearch(ps *pubsub.PubSub, f *func(context.Context, Event, string)) error { ps.RegisterTopicValidator(TopicPubSubSearch, func(ctx context.Context, p pp.ID, m *pubsub.Message) bool { return true @@ -151,8 +139,8 @@ func (s *LongLivedPubSubService) SubscribeToSearch(ps *pubsub.PubSub, f *func(co return err } else { s.PubsubMu.Lock() - defer s.PubsubMu.Unlock() s.LongLivedPubSubs[TopicPubSubSearch] = topic + s.PubsubMu.Unlock() } if f != nil { return SubscribeEvents(s, context.Background(), TopicPubSubSearch, -1, *f) @@ -178,10 +166,13 @@ func SubscribeEvents[T interface{}](s *LongLivedPubSubService, } func waitResults[T interface{}](s *LongLivedPubSubService, ctx context.Context, sub *pubsub.Subscription, proto string, timeout int, f func(context.Context, T, string)) { + fmt.Println("waitResults", proto) + defer ctx.Done() for { s.PubsubMu.Lock() // check safely if cache is actually notified subscribed to topic if s.LongLivedPubSubs[proto] == nil { // if not kill the loop. + s.PubsubMu.Unlock() break } s.PubsubMu.Unlock() diff --git a/daemons/node/common/common_stream.go b/daemons/node/common/common_stream.go index e7d3e53..cf2944c 100644 --- a/daemons/node/common/common_stream.go +++ b/daemons/node/common/common_stream.go @@ -27,9 +27,9 @@ type LongLivedStreamRecordedService[T interface{}] struct { StreamRecords map[protocol.ID]map[pp.ID]*StreamRecord[T] StreamMU sync.RWMutex maxNodesConn int - // AfterHeartbeat is an optional hook called after each successful heartbeat update. - // The indexer sets it to republish the embedded signed record to the DHT. - AfterHeartbeat func(pid pp.ID) + // AfterHeartbeat is called after each successful heartbeat with the full + // decoded Heartbeat so the hook can use the fresh embedded PeerRecord. + AfterHeartbeat func(hb *Heartbeat) // AfterDelete is called after gc() evicts an expired peer, outside the lock. // name and did may be empty if the HeartbeatStream had no metadata. AfterDelete func(pid pp.ID, name string, did string) @@ -66,7 +66,6 @@ func (ix *LongLivedStreamRecordedService[T]) gc() { return } streams := ix.StreamRecords[ProtocolHeartbeat] - fmt.Println(StaticNatives, StaticIndexers, streams) type gcEntry struct { pid pp.ID @@ -184,9 +183,26 @@ func (ix *LongLivedStreamRecordedService[T]) HandleHeartbeat(s network.Stream) { logger.Info().Msg("A new node is subscribed : " + pid.String()) } ix.StreamMU.Unlock() - // Let the indexer republish the embedded signed record to the DHT. - if ix.AfterHeartbeat != nil { - ix.AfterHeartbeat(*pid) + // Enrich hb.DID before calling the hook: nodes never set hb.DID directly; + // extract it from the embedded signed PeerRecord if available, then fall + // back to the DID stored by handleNodePublish in the stream record. + if hb.DID == "" && len(hb.Record) > 0 { + var partial struct { + DID string `json:"did"` + } + if json.Unmarshal(hb.Record, &partial) == nil && partial.DID != "" { + hb.DID = partial.DID + } + } + if hb.DID == "" { + ix.StreamMU.RLock() + if rec, ok := streams[*pid]; ok { + hb.DID = rec.DID + } + ix.StreamMU.RUnlock() + } + if ix.AfterHeartbeat != nil && hb.DID != "" { + ix.AfterHeartbeat(hb) } } } @@ -214,17 +230,15 @@ func CheckHeartbeat(h host.Host, s network.Stream, dec *json.Decoder, streams ma } lock.Unlock() diversity := getDiversityRate(h, hb.IndexersBinded) - fmt.Println(upTime, bpms, diversity) hb.ComputeIndexerScore(upTime, bpms, diversity) // First heartbeat: uptime is always 0 so the score ceiling is 60, below the // steady-state threshold of 75. Use a lower admission threshold so new peers // can enter and start accumulating uptime. Subsequent heartbeats must meet // the full threshold once uptime is tracked. - minScore := float64(50) + minScore := float64(40) if isFirstHeartbeat { minScore = 40 } - fmt.Println(hb.Score, minScore) if hb.Score < minScore { return nil, nil, errors.New("not enough trusting value") } @@ -239,13 +253,11 @@ func CheckHeartbeat(h host.Host, s network.Stream, dec *json.Decoder, streams ma } func getDiversityRate(h host.Host, peers []string) float64 { - peers, _ = checkPeers(h, peers) diverse := []string{} for _, p := range peers { ip, err := ExtractIP(p) if err != nil { - fmt.Println("NO IP", p, err) continue } div := ip.Mask(net.CIDRMask(24, 32)).String() @@ -725,7 +737,7 @@ func SendHeartbeat(ctx context.Context, proto protocol.ID, name string, h host.H go replenishIndexersFromNative(h, idxNeed) } } - logger.Debug().Str("peer", ix.ID.String()).Str("proto", string(proto)).Msg("[native] step 2 — heartbeat sent ok") + // logger.Debug().Str("peer", ix.ID.String()).Str("proto", string(proto)).Msg("[native] step 2 — heartbeat sent ok") } } } @@ -768,6 +780,7 @@ func TempStream(h host.Host, ad pp.AddrInfo, proto protocol.ID, did string, stre return streams, err } } + if streams[proto] != nil && streams[proto][ad.ID] != nil { return streams, nil } else if s, err := h.NewStream(ctxTTL, ad.ID, proto); err == nil { diff --git a/daemons/node/common/interface.go b/daemons/node/common/interface.go index 04d605c..cedebbc 100644 --- a/daemons/node/common/interface.go +++ b/daemons/node/common/interface.go @@ -4,6 +4,7 @@ import ( "context" "cloud.o-forge.io/core/oc-lib/models/peer" + pubsub "github.com/libp2p/go-libp2p-pubsub" ) type HeartBeatStreamed interface { @@ -11,5 +12,6 @@ type HeartBeatStreamed interface { } type DiscoveryPeer interface { - GetPeerRecord(ctx context.Context, key string) ([]*peer.Peer, error) + GetPeerRecord(ctx context.Context, key string, search bool) ([]*peer.Peer, error) + GetPubSub(topicName string) *pubsub.Topic } diff --git a/daemons/node/common/native_stream.go b/daemons/node/common/native_stream.go index a76c291..8ac0d78 100644 --- a/daemons/node/common/native_stream.go +++ b/daemons/node/common/native_stream.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "math/rand" "oc-discovery/conf" "strings" @@ -11,14 +12,16 @@ import ( "time" oclib "cloud.o-forge.io/core/oc-lib" + "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" pp "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" ) const ( - ProtocolNativeSubscription = "/opencloud/native/subscribe/1.0" - ProtocolNativeGetIndexers = "/opencloud/native/indexers/1.0" + ProtocolNativeSubscription = "/opencloud/native/subscribe/1.0" + ProtocolNativeUnsubscribe = "/opencloud/native/unsubscribe/1.0" + ProtocolNativeGetIndexers = "/opencloud/native/indexers/1.0" // ProtocolNativeConsensus is used by nodes/indexers to cross-validate an indexer // pool against all configured native peers. ProtocolNativeConsensus = "/opencloud/native/consensus/1.0" @@ -50,9 +53,50 @@ type ConsensusResponse struct { // IndexerRegistration is sent by an indexer to a native to signal its alive state. // Only Addr is required; PeerID is derived from it if omitted. +// Timestamp + PubKey + Signature allow the native and DHT to verify that the +// registration was produced by the peer that owns the declared PeerID. type IndexerRegistration struct { - PeerID string `json:"peer_id,omitempty"` - Addr string `json:"addr"` + PeerID string `json:"peer_id,omitempty"` + Addr string `json:"addr"` + Timestamp int64 `json:"ts,omitempty"` // Unix nanoseconds (anti-replay) + PubKey []byte `json:"pub_key,omitempty"` // marshaled libp2p public key + Signature []byte `json:"sig,omitempty"` // Sign(signaturePayload()) +} + +// SignaturePayload returns the canonical byte slice that is signed/verified. +// Format: "||" +func (r *IndexerRegistration) SignaturePayload() []byte { + return []byte(fmt.Sprintf("%s|%s|%d", r.PeerID, r.Addr, r.Timestamp)) +} + +// Sign fills PubKey and Signature using the host's own private key. +func (r *IndexerRegistration) Sign(h host.Host) { + priv := h.Peerstore().PrivKey(h.ID()) + if priv == nil { + return + } + if pub, err := crypto.MarshalPublicKey(priv.GetPublic()); err == nil { + r.PubKey = pub + } + r.Signature, _ = priv.Sign(r.SignaturePayload()) +} + +// Verify returns true when the registration carries a valid self-signature. +// Returns true (not an error) when no signature is present, to remain backward- +// compatible with older nodes that do not sign their registrations. +func (r *IndexerRegistration) Verify() (bool, error) { + if len(r.Signature) == 0 || len(r.PubKey) == 0 { + return true, nil // unsigned — accepted but untrusted + } + pub, err := crypto.UnmarshalPublicKey(r.PubKey) + if err != nil { + return false, fmt.Errorf("unmarshal pub key: %w", err) + } + ok, err := pub.Verify(r.SignaturePayload(), r.Signature) + if err != nil { + return false, err + } + return ok, nil } // GetIndexersRequest asks a native for a pool of live indexers. @@ -194,46 +238,100 @@ func replenishIndexersFromNative(h host.Host, need int) { logger.Info().Msg("[native] step 4 — heartbeat goroutine nudged") } -// fetchIndexersFromNative opens a ProtocolNativeGetIndexers stream to the first -// responsive native and returns the candidate list and fallback flag. +// fetchIndexersFromNative queries ALL configured natives in parallel and merges +// their indexer lists. Non-fallback responses are preferred; if only fallbacks +// respond the fallback list is returned. Results are deduplicated and capped at count. func fetchIndexersFromNative(h host.Host, nativeAddrs []string, count int) (candidates []string, isFallback bool) { logger := oclib.GetLogger() - for _, addr := range nativeAddrs { - ad, err := pp.AddrInfoFromString(addr) - if err != nil { - logger.Warn().Str("addr", addr).Msg("[native] fetch — skipping invalid addr") - continue - } - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - if err := h.Connect(ctx, *ad); err != nil { - cancel() - logger.Warn().Str("addr", addr).Err(err).Msg("[native] fetch — connect failed") - continue - } - s, err := h.NewStream(ctx, ad.ID, ProtocolNativeGetIndexers) - cancel() - if err != nil { - logger.Warn().Str("addr", addr).Err(err).Msg("[native] fetch — stream open failed") - continue - } - req := GetIndexersRequest{Count: count, From: h.ID().String()} - if encErr := json.NewEncoder(s).Encode(req); encErr != nil { - s.Close() - logger.Warn().Str("addr", addr).Err(encErr).Msg("[native] fetch — encode request failed") - continue - } - var resp GetIndexersResponse - if decErr := json.NewDecoder(s).Decode(&resp); decErr != nil { - s.Close() - logger.Warn().Str("addr", addr).Err(decErr).Msg("[native] fetch — decode response failed") - continue - } - s.Close() - logger.Info().Str("native", addr).Int("indexers", len(resp.Indexers)).Bool("fallback", resp.IsSelfFallback).Msg("[native] fetch — response received") - return resp.Indexers, resp.IsSelfFallback + if len(nativeAddrs) == 0 { + return nil, false } - logger.Warn().Msg("[native] fetch — no native responded") - return nil, false + + type fetchResult struct { + indexers []string + isFallback bool + } + ch := make(chan fetchResult, len(nativeAddrs)) + + for _, addr := range nativeAddrs { + go func(addr string) { + ad, err := pp.AddrInfoFromString(addr) + if err != nil { + ch <- fetchResult{} + return + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := h.Connect(ctx, *ad); err != nil { + logger.Warn().Str("addr", addr).Err(err).Msg("[native] fetch — connect failed") + ch <- fetchResult{} + return + } + s, err := h.NewStream(ctx, ad.ID, ProtocolNativeGetIndexers) + if err != nil { + logger.Warn().Str("addr", addr).Err(err).Msg("[native] fetch — stream open failed") + ch <- fetchResult{} + return + } + s.SetDeadline(time.Now().Add(4 * time.Second)) + defer s.Close() + req := GetIndexersRequest{Count: count, From: h.ID().String()} + if encErr := json.NewEncoder(s).Encode(req); encErr != nil { + logger.Warn().Str("addr", addr).Err(encErr).Msg("[native] fetch — encode failed") + ch <- fetchResult{} + return + } + var resp GetIndexersResponse + if decErr := json.NewDecoder(s).Decode(&resp); decErr != nil { + logger.Warn().Str("addr", addr).Err(decErr).Msg("[native] fetch — decode failed") + ch <- fetchResult{} + return + } + logger.Info().Str("native", addr).Int("indexers", len(resp.Indexers)).Bool("fallback", resp.IsSelfFallback).Msg("[native] fetch — response received") + ch <- fetchResult{indexers: resp.Indexers, isFallback: resp.IsSelfFallback} + }(addr) + } + + timer := time.NewTimer(6 * time.Second) + defer timer.Stop() + + seen := map[string]struct{}{} + var realList, fallbackList []string + collected := 0 + +collect: + for collected < len(nativeAddrs) { + select { + case r := <-ch: + collected++ + for _, ix := range r.indexers { + if _, ok := seen[ix]; ok { + continue + } + seen[ix] = struct{}{} + if r.isFallback { + fallbackList = append(fallbackList, ix) + } else { + realList = append(realList, ix) + } + } + case <-timer.C: + break collect + } + } + + if len(realList) > 0 { + if len(realList) > count { + realList = realList[:count] + } + logger.Info().Int("count", len(realList)).Msg("[native] fetch — merged real indexers from all natives") + return realList, false + } + if len(fallbackList) > count { + fallbackList = fallbackList[:count] + } + logger.Info().Int("count", len(fallbackList)).Bool("fallback", true).Msg("[native] fetch — using fallback indexers") + return fallbackList, len(fallbackList) > 0 } // resolvePool converts a candidate list to a validated addr→AddrInfo map. @@ -337,6 +435,9 @@ func clientSideConsensus(h host.Host, candidates []string) (confirmed []string, ch <- nativeResult{} return } + // Set an absolute deadline on the stream so Encode/Decode cannot + // block past the per-query budget, even if the remote native stalls. + s.SetDeadline(time.Now().Add(consensusQueryTimeout)) defer s.Close() if err := json.NewEncoder(s).Encode(ConsensusRequest{Candidates: candidates}); err != nil { ch <- nativeResult{} @@ -387,9 +488,13 @@ collect: return candidates, nil } + quorum := conf.GetConfig().ConsensusQuorum + if quorum <= 0 { + quorum = 0.5 + } confirmedSet := map[string]struct{}{} for addr, count := range trustedCounts { - if count*2 > total { + if float64(count) > float64(total)*quorum { confirmed = append(confirmed, addr) confirmedSet[addr] = struct{}{} } @@ -415,9 +520,11 @@ func RegisterWithNative(h host.Host, nativeAddressesStr string) { return } reg := IndexerRegistration{ - PeerID: h.ID().String(), - Addr: myAddr, + PeerID: h.ID().String(), + Addr: myAddr, + Timestamp: time.Now().UnixNano(), } + reg.Sign(h) for _, addr := range strings.Split(nativeAddressesStr, ",") { addr = strings.TrimSpace(addr) if addr == "" { @@ -446,6 +553,40 @@ func RegisterWithNative(h host.Host, nativeAddressesStr string) { } } +// UnregisterFromNative sends an explicit deregistration message to each configured +// native so it can evict this indexer immediately without waiting for TTL expiry. +// Should be called during graceful shutdown. +func UnregisterFromNative(h host.Host, nativeAddressesStr string) { + logger := oclib.GetLogger() + reg := IndexerRegistration{PeerID: h.ID().String()} + for _, addr := range strings.Split(nativeAddressesStr, ",") { + addr = strings.TrimSpace(addr) + if addr == "" { + continue + } + ad, err := pp.AddrInfoFromString(addr) + if err != nil { + continue + } + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + if err := h.Connect(ctx, *ad); err != nil { + cancel() + logger.Warn().Str("addr", addr).Msg("UnregisterFromNative: connect failed") + continue + } + s, err := h.NewStream(ctx, ad.ID, ProtocolNativeUnsubscribe) + cancel() + if err != nil { + logger.Warn().Str("addr", addr).Msg("UnregisterFromNative: stream open failed") + continue + } + if err := json.NewEncoder(s).Encode(reg); err != nil { + logger.Warn().Str("addr", addr).Msg("UnregisterFromNative: encode failed") + } + s.Close() + } +} + // EnsureNativePeers populates StaticNatives from config and starts a single // heartbeat goroutine toward the native mesh. Safe to call multiple times; // the heartbeat goroutine is started at most once (nativeMeshHeartbeatOnce). @@ -626,7 +767,7 @@ func replenishNativesFromPeers(h host.Host, lostAddr string, proto protocol.ID) } // Last (or only) native — retry periodically. logger.Info().Str("addr", lostAddr).Msg("[native] replenish natives — last native lost, starting periodic retry") - go retryLostNative(h, lostAddr, proto) + go retryLostNative(context.Background(), h, lostAddr, proto) } // fetchNativeFromNatives asks each alive native for one of its own native contacts @@ -677,7 +818,7 @@ func fetchNativeFromNatives(h host.Host, exclude []string) string { return peer } } - logger.Debug().Str("native", ad.ID.String()).Msg("[native] fetch native peers — no new native from this peer") + // logger.Debug().Str("native", ad.ID.String()).Msg("[native] fetch native peers — no new native from this peer") } return "" } @@ -735,13 +876,19 @@ func fetchNativeFromIndexers(h host.Host, exclude []string) string { } // retryLostNative periodically retries connecting to a lost native address until -// it becomes reachable again or was already restored by another path. -func retryLostNative(h host.Host, addr string, nativeProto protocol.ID) { +// it becomes reachable again, was already restored by another path, or ctx is cancelled. +func retryLostNative(ctx context.Context, h host.Host, addr string, nativeProto protocol.ID) { logger := oclib.GetLogger() logger.Info().Str("addr", addr).Msg("[native] retry — periodic retry for lost native started") t := time.NewTicker(retryNativeInterval) defer t.Stop() - for range t.C { + for { + select { + case <-ctx.Done(): + logger.Info().Str("addr", addr).Msg("[native] retry — context cancelled, stopping retry") + return + case <-t.C: + } StreamNativeMu.RLock() _, alreadyRestored := StaticNatives[addr] StreamNativeMu.RUnlock() diff --git a/daemons/node/indexer/handler.go b/daemons/node/indexer/handler.go index 81c00fa..ec178d2 100644 --- a/daemons/node/indexer/handler.go +++ b/daemons/node/indexer/handler.go @@ -5,6 +5,8 @@ import ( "encoding/base64" "encoding/json" "errors" + "fmt" + "io" "oc-discovery/conf" "oc-discovery/daemons/node/common" "strings" @@ -83,30 +85,17 @@ func (pr *PeerRecord) ExtractPeer(ourkey string, key string, pubKey crypto.PubKe NATSAddress: pr.NATSAddress, WalletAddress: pr.WalletAddress, } - b, err := json.Marshal(p) - if err != nil { - return pp.SELF == p.Relation, nil, err - } - if time.Now().UTC().After(pr.ExpiryDate) { return pp.SELF == p.Relation, nil, errors.New("peer " + key + " is offline") } - go tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ - FromApp: "oc-discovery", - Datatype: tools.PEER, - Method: int(tools.CREATE_RESOURCE), - SearchAttr: "peer_id", - Payload: b, - }) - return pp.SELF == p.Relation, p, nil } type GetValue struct { - Key string `json:"key"` - PeerID peer.ID `json:"peer_id"` - Name string `json:"name,omitempty"` - Search bool `json:"search,omitempty"` + Key string `json:"key"` + PeerID string `json:"peer_id,omitempty"` + Name string `json:"name,omitempty"` + Search bool `json:"search,omitempty"` } type GetResponse struct { @@ -132,29 +121,33 @@ func (ix *IndexerService) initNodeHandler() { // Each heartbeat from a node carries a freshly signed PeerRecord. // Republish it to the DHT so the record never expires as long as the node // is alive — no separate publish stream needed from the node side. - ix.AfterHeartbeat = func(pid peer.ID) { - ctx1, cancel1 := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel1() - res, err := ix.DHT.GetValue(ctx1, ix.genPIDKey(pid.String())) - if err != nil { - logger.Warn().Err(err) - return - } - did := string(res) - ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel2() - res, err = ix.DHT.GetValue(ctx2, ix.genKey(did)) - if err != nil { - logger.Warn().Err(err) - return - } + ix.AfterHeartbeat = func(hb *common.Heartbeat) { + // Priority 1: use the fresh signed PeerRecord embedded in the heartbeat. + // Each heartbeat tick, the node re-signs with ExpiryDate = now+2min, so + // this record is always fresh. Fetching from DHT would give a stale expiry. var rec PeerRecord - if err := json.Unmarshal(res, &rec); err != nil { - logger.Warn().Err(err).Str("peer", pid.String()).Msg("indexer: heartbeat record unmarshal failed") - return + if len(hb.Record) > 0 { + if err := json.Unmarshal(hb.Record, &rec); err != nil { + logger.Warn().Err(err).Msg("indexer: heartbeat embedded record unmarshal failed") + return + } + } else { + // Fallback: node didn't embed a record yet (first heartbeat before claimInfo). + // Fetch from DHT using the DID resolved by HandleHeartbeat. + ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) + res, err := ix.DHT.GetValue(ctx2, ix.genKey(hb.DID)) + cancel2() + if err != nil { + logger.Warn().Err(err).Str("did", hb.DID).Msg("indexer: DHT fetch for refresh failed") + return + } + if err := json.Unmarshal(res, &rec); err != nil { + logger.Warn().Err(err).Str("did", hb.DID).Msg("indexer: heartbeat record unmarshal failed") + return + } } if _, err := rec.Verify(); err != nil { - logger.Warn().Err(err).Str("peer", pid.String()).Msg("indexer: heartbeat record signature invalid") + logger.Warn().Err(err).Str("did", rec.DID).Msg("indexer: heartbeat record signature invalid") return } data, err := json.Marshal(rec) @@ -162,12 +155,14 @@ func (ix *IndexerService) initNodeHandler() { return } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() logger.Info().Msg("REFRESH PutValue " + ix.genKey(rec.DID)) if err := ix.DHT.PutValue(ctx, ix.genKey(rec.DID), data); err != nil { logger.Warn().Err(err).Str("did", rec.DID).Msg("indexer: DHT refresh failed") + cancel() return } + cancel() + ix.publishNameEvent(NameIndexAdd, rec.Name, rec.PeerID, rec.DID) if rec.Name != "" { ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) ix.DHT.PutValue(ctx2, ix.genNameKey(rec.Name), []byte(rec.DID)) @@ -188,132 +183,151 @@ func (ix *IndexerService) initNodeHandler() { func (ix *IndexerService) handleNodePublish(s network.Stream) { defer s.Close() logger := oclib.GetLogger() + for { + var rec PeerRecord + if err := json.NewDecoder(s).Decode(&rec); err != nil { + logger.Err(err) + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || + strings.Contains(err.Error(), "reset") || + strings.Contains(err.Error(), "closed") || + strings.Contains(err.Error(), "too many connections") { + return + } + continue + } + if _, err := rec.Verify(); err != nil { + logger.Err(err) + return + } + if rec.PeerID == "" || rec.ExpiryDate.Before(time.Now().UTC()) { + logger.Err(errors.New(rec.PeerID + " is expired.")) + return + } + pid, err := peer.Decode(rec.PeerID) + if err != nil { + return + } - var rec PeerRecord - if err := json.NewDecoder(s).Decode(&rec); err != nil { - logger.Err(err) - return - } - if _, err := rec.Verify(); err != nil { - logger.Err(err) - return - } - if rec.PeerID == "" || rec.ExpiryDate.Before(time.Now().UTC()) { - logger.Err(errors.New(rec.PeerID + " is expired.")) - return - } - pid, err := peer.Decode(rec.PeerID) - if err != nil { - return - } + ix.StreamMU.Lock() + defer ix.StreamMU.Unlock() + if ix.StreamRecords[common.ProtocolHeartbeat] == nil { + ix.StreamRecords[common.ProtocolHeartbeat] = map[peer.ID]*common.StreamRecord[PeerRecord]{} + } + streams := ix.StreamRecords[common.ProtocolHeartbeat] + if srec, ok := streams[pid]; ok { + srec.DID = rec.DID + srec.Record = rec + srec.HeartbeatStream.UptimeTracker.LastSeen = time.Now().UTC() + } - ix.StreamMU.Lock() - defer ix.StreamMU.Unlock() - if ix.StreamRecords[common.ProtocolHeartbeat] == nil { - ix.StreamRecords[common.ProtocolHeartbeat] = map[peer.ID]*common.StreamRecord[PeerRecord]{} - } - streams := ix.StreamRecords[common.ProtocolHeartbeat] - if srec, ok := streams[pid]; ok { - srec.DID = rec.DID - srec.Record = rec - srec.HeartbeatStream.UptimeTracker.LastSeen = time.Now().UTC() - } - - key := ix.genKey(rec.DID) - data, err := json.Marshal(rec) - if err != nil { - logger.Err(err) - return - } - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - if err := ix.DHT.PutValue(ctx, key, data); err != nil { - logger.Err(err) + key := ix.genKey(rec.DID) + data, err := json.Marshal(rec) + if err != nil { + logger.Err(err) + return + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + if err := ix.DHT.PutValue(ctx, key, data); err != nil { + logger.Err(err) + cancel() + return + } cancel() - return - } - cancel() - // Secondary index: /name/ → DID, so peers can resolve by human-readable name. - if rec.Name != "" { - ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) - if err := ix.DHT.PutValue(ctx2, ix.genNameKey(rec.Name), []byte(rec.DID)); err != nil { - logger.Err(err).Str("name", rec.Name).Msg("indexer: failed to write name index") + fmt.Println("publishNameEvent") + ix.publishNameEvent(NameIndexAdd, rec.Name, rec.PeerID, rec.DID) + + // Secondary index: /name/ → DID, so peers can resolve by human-readable name. + if rec.Name != "" { + ctx2, cancel2 := context.WithTimeout(context.Background(), 10*time.Second) + if err := ix.DHT.PutValue(ctx2, ix.genNameKey(rec.Name), []byte(rec.DID)); err != nil { + logger.Err(err).Str("name", rec.Name).Msg("indexer: failed to write name index") + } + cancel2() } - cancel2() - } - // Secondary index: /pid/ → DID, so peers can resolve by libp2p PeerID. - if rec.PeerID != "" { - ctx3, cancel3 := context.WithTimeout(context.Background(), 10*time.Second) - if err := ix.DHT.PutValue(ctx3, ix.genPIDKey(rec.PeerID), []byte(rec.DID)); err != nil { - logger.Err(err).Str("pid", rec.PeerID).Msg("indexer: failed to write pid index") + // Secondary index: /pid/ → DID, so peers can resolve by libp2p PeerID. + if rec.PeerID != "" { + ctx3, cancel3 := context.WithTimeout(context.Background(), 10*time.Second) + if err := ix.DHT.PutValue(ctx3, ix.genPIDKey(rec.PeerID), []byte(rec.DID)); err != nil { + logger.Err(err).Str("pid", rec.PeerID).Msg("indexer: failed to write pid index") + } + cancel3() } - cancel3() + return } } func (ix *IndexerService) handleNodeGet(s network.Stream) { - defer s.Close() logger := oclib.GetLogger() - - var req GetValue - if err := json.NewDecoder(s).Decode(&req); err != nil { - logger.Err(err) - return - } - - resp := GetResponse{Found: false, Records: map[string]PeerRecord{}} - - keys := []string{} - // Name substring search — scan in-memory connected nodes first, then DHT exact match. - if req.Name != "" { - if req.Search { - for _, did := range ix.LookupNameIndex(strings.ToLower(req.Name)) { - keys = append(keys, did) + for { + var req GetValue + if err := json.NewDecoder(s).Decode(&req); err != nil { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || + strings.Contains(err.Error(), "reset") || + strings.Contains(err.Error(), "closed") || + strings.Contains(err.Error(), "too many connections") { + return } - } else { - // 2. DHT exact-name lookup: covers nodes that published but aren't currently connected. - nameCtx, nameCancel := context.WithTimeout(context.Background(), 5*time.Second) - if ch, err := ix.DHT.SearchValue(nameCtx, ix.genNameKey(req.Name)); err == nil { - for did := range ch { - keys = append(keys, string(did)) - break + logger.Err(err) + continue + } + + resp := GetResponse{Found: false, Records: map[string]PeerRecord{}} + + fmt.Println("handleNodeGet", req.Search, req.Name) + keys := []string{} + // Name substring search — scan in-memory connected nodes first, then DHT exact match. + if req.Name != "" { + if req.Search { + for _, did := range ix.LookupNameIndex(strings.ToLower(req.Name)) { + keys = append(keys, did) } - } - nameCancel() - } - } else if req.PeerID != "" { - pidCtx, pidCancel := context.WithTimeout(context.Background(), 5*time.Second) - if did, err := ix.DHT.GetValue(pidCtx, ix.genPIDKey(req.PeerID.String())); err == nil { - keys = append(keys, string(did)) - } - pidCancel() - } else { - keys = append(keys, req.Key) - } - - // DHT record fetch by DID key (covers exact-name and PeerID paths). - if len(keys) > 0 { - for _, k := range keys { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - c, err := ix.DHT.GetValue(ctx, ix.genKey(k)) - cancel() - if err == nil { - var rec PeerRecord - if json.Unmarshal(c, &rec) == nil { - // Filter by PeerID only when one was explicitly specified. - if req.PeerID == "" || rec.PeerID == req.PeerID.String() { - resp.Records[rec.PeerID] = rec + } else { + // 2. DHT exact-name lookup: covers nodes that published but aren't currently connected. + nameCtx, nameCancel := context.WithTimeout(context.Background(), 5*time.Second) + if ch, err := ix.DHT.SearchValue(nameCtx, ix.genNameKey(req.Name)); err == nil { + for did := range ch { + keys = append(keys, string(did)) + break } } - } else if req.Name == "" && req.PeerID == "" { - logger.Err(err).Msg("Failed to fetch PeerRecord from DHT " + req.Key) + nameCancel() + } + } else if req.PeerID != "" { + pidCtx, pidCancel := context.WithTimeout(context.Background(), 5*time.Second) + if did, err := ix.DHT.GetValue(pidCtx, ix.genPIDKey(req.PeerID)); err == nil { + keys = append(keys, string(did)) + } + pidCancel() + } else { + keys = append(keys, req.Key) + } + + // DHT record fetch by DID key (covers exact-name and PeerID paths). + if len(keys) > 0 { + for _, k := range keys { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + fmt.Println("TRY TO CATCH DID", ix.genKey(k)) + c, err := ix.DHT.GetValue(ctx, ix.genKey(k)) + cancel() + fmt.Println("TRY TO CATCH DID ERR", ix.genKey(k), c, err) + if err == nil { + var rec PeerRecord + if json.Unmarshal(c, &rec) == nil { + fmt.Println("CATCH DID ERR", ix.genKey(k), rec) + resp.Records[rec.PeerID] = rec + } + } else if req.Name == "" && req.PeerID == "" { + logger.Err(err).Msg("Failed to fetch PeerRecord from DHT " + req.Key) + } } } - } - resp.Found = len(resp.Records) > 0 - _ = json.NewEncoder(s).Encode(resp) + resp.Found = len(resp.Records) > 0 + _ = json.NewEncoder(s).Encode(resp) + break + } } // handleGetNatives returns this indexer's configured native addresses, @@ -321,30 +335,38 @@ func (ix *IndexerService) handleNodeGet(s network.Stream) { func (ix *IndexerService) handleGetNatives(s network.Stream) { defer s.Close() logger := oclib.GetLogger() - - var req common.GetIndexerNativesRequest - if err := json.NewDecoder(s).Decode(&req); err != nil { - logger.Err(err).Msg("indexer get natives: decode") - return - } - - excludeSet := make(map[string]struct{}, len(req.Exclude)) - for _, e := range req.Exclude { - excludeSet[e] = struct{}{} - } - - resp := common.GetIndexerNativesResponse{} - for _, addr := range strings.Split(conf.GetConfig().NativeIndexerAddresses, ",") { - addr = strings.TrimSpace(addr) - if addr == "" { + for { + var req common.GetIndexerNativesRequest + if err := json.NewDecoder(s).Decode(&req); err != nil { + logger.Err(err).Msg("indexer get natives: decode") + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || + strings.Contains(err.Error(), "reset") || + strings.Contains(err.Error(), "closed") || + strings.Contains(err.Error(), "too many connections") { + return + } continue } - if _, excluded := excludeSet[addr]; !excluded { - resp.Natives = append(resp.Natives, addr) - } - } - if err := json.NewEncoder(s).Encode(resp); err != nil { - logger.Err(err).Msg("indexer get natives: encode response") + excludeSet := make(map[string]struct{}, len(req.Exclude)) + for _, e := range req.Exclude { + excludeSet[e] = struct{}{} + } + + resp := common.GetIndexerNativesResponse{} + for _, addr := range strings.Split(conf.GetConfig().NativeIndexerAddresses, ",") { + addr = strings.TrimSpace(addr) + if addr == "" { + continue + } + if _, excluded := excludeSet[addr]; !excluded { + resp.Natives = append(resp.Natives, addr) + } + } + + if err := json.NewEncoder(s).Encode(resp); err != nil { + logger.Err(err).Msg("indexer get natives: encode response") + } + break } } diff --git a/daemons/node/indexer/nameindex.go b/daemons/node/indexer/nameindex.go index 19bed90..374d93d 100644 --- a/daemons/node/indexer/nameindex.go +++ b/daemons/node/indexer/nameindex.go @@ -3,6 +3,7 @@ package indexer import ( "context" "encoding/json" + "fmt" "strings" "sync" "time" @@ -158,11 +159,13 @@ func (ix *IndexerService) LookupNameIndex(needle string) map[string]string { ix.nameIndex.indexMu.RLock() defer ix.nameIndex.indexMu.RUnlock() for name, peers := range ix.nameIndex.index { + fmt.Println(strings.Contains(strings.ToLower(name), needleLow), needleLow, strings.ToLower(name)) if strings.Contains(strings.ToLower(name), needleLow) { for peerID, did := range peers { result[peerID] = did } } } + fmt.Println("RESULT", result) return result } diff --git a/daemons/node/indexer/native.go b/daemons/node/indexer/native.go index 880088c..2d11e8a 100644 --- a/daemons/node/indexer/native.go +++ b/daemons/node/indexer/native.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "math/rand" "slices" "strings" @@ -15,6 +16,7 @@ import ( oclib "cloud.o-forge.io/core/oc-lib" pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/network" pp "github.com/libp2p/go-libp2p/core/peer" ) @@ -36,10 +38,15 @@ const ( ) // liveIndexerEntry tracks a registered indexer in the native's in-memory cache and DHT. +// PubKey and Signature are forwarded from the IndexerRegistration so the DHT validator +// can verify that the entry was produced by the peer owning the declared PeerID. type liveIndexerEntry struct { - PeerID string `json:"peer_id"` - Addr string `json:"addr"` - ExpiresAt time.Time `json:"expires_at"` + PeerID string `json:"peer_id"` + Addr string `json:"addr"` + ExpiresAt time.Time `json:"expires_at"` + RegTimestamp int64 `json:"reg_ts,omitempty"` // Timestamp from the original IndexerRegistration + PubKey []byte `json:"pub_key,omitempty"` + Signature []byte `json:"sig,omitempty"` } // NativeState holds runtime state specific to native indexer operation. @@ -53,13 +60,18 @@ type NativeState struct { // including entries written by other natives. knownPeerIDs map[string]string knownMu sync.RWMutex + + // cancel stops background goroutines (runOffloadLoop, refreshIndexersFromDHT) + // when the native shuts down. + cancel context.CancelFunc } -func newNativeState() *NativeState { +func newNativeState(cancel context.CancelFunc) *NativeState { return &NativeState{ liveIndexers: map[string]*liveIndexerEntry{}, responsiblePeers: map[pp.ID]struct{}{}, knownPeerIDs: map[string]string{}, + cancel: cancel, } } @@ -77,6 +89,18 @@ func (v IndexerRecordValidator) Validate(_ string, value []byte) error { if e.ExpiresAt.Before(time.Now().UTC()) { return errors.New("expired indexer record") } + // Verify self-signature when present — rejects entries forged by a + // compromised native that does not control the declared PeerID. + if len(e.Signature) > 0 && len(e.PubKey) > 0 { + pub, err := crypto.UnmarshalPublicKey(e.PubKey) + if err != nil { + return fmt.Errorf("indexer entry: invalid public key: %w", err) + } + payload := []byte(fmt.Sprintf("%s|%s|%d", e.PeerID, e.Addr, e.RegTimestamp)) + if ok, err := pub.Verify(payload, e.Signature); err != nil || !ok { + return errors.New("indexer entry: invalid signature") + } + } return nil } @@ -99,9 +123,11 @@ func (v IndexerRecordValidator) Select(_ string, values [][]byte) (int, error) { // InitNative registers native-specific stream handlers and starts background loops. // Must be called after DHT is initialized. func (ix *IndexerService) InitNative() { - ix.Native = newNativeState() + ctx, cancel := context.WithCancel(context.Background()) + ix.Native = newNativeState(cancel) ix.Host.SetStreamHandler(common.ProtocolHeartbeat, ix.HandleHeartbeat) // specific heartbeat for Indexer. ix.Host.SetStreamHandler(common.ProtocolNativeSubscription, ix.handleNativeSubscription) + ix.Host.SetStreamHandler(common.ProtocolNativeUnsubscribe, ix.handleNativeUnsubscribe) ix.Host.SetStreamHandler(common.ProtocolNativeGetIndexers, ix.handleNativeGetIndexers) ix.Host.SetStreamHandler(common.ProtocolNativeConsensus, ix.handleNativeConsensus) ix.Host.SetStreamHandler(common.ProtocolNativeGetPeers, ix.handleNativeGetPeers) @@ -109,8 +135,8 @@ func (ix *IndexerService) InitNative() { ix.subscribeIndexerRegistry() // Ensure long connections to other configured natives (native-to-native mesh). common.EnsureNativePeers(ix.Host) - go ix.runOffloadLoop() - go ix.refreshIndexersFromDHT() + go ix.runOffloadLoop(ctx) + go ix.refreshIndexersFromDHT(ctx) } // subscribeIndexerRegistry joins the PubSub topic used by natives to gossip newly @@ -118,14 +144,40 @@ func (ix *IndexerService) InitNative() { func (ix *IndexerService) subscribeIndexerRegistry() { logger := oclib.GetLogger() ix.PS.RegisterTopicValidator(common.TopicIndexerRegistry, func(_ context.Context, _ pp.ID, msg *pubsub.Message) bool { - // Reject empty or syntactically invalid multiaddrs before they reach the - // message loop. A compromised native could otherwise gossip arbitrary data. - addr := string(msg.Data) - if addr == "" { + // Parse as a signed IndexerRegistration. + var reg common.IndexerRegistration + if err := json.Unmarshal(msg.Data, ®); err != nil { return false } - _, err := pp.AddrInfoFromString(addr) - return err == nil + if reg.Addr == "" { + return false + } + if _, err := pp.AddrInfoFromString(reg.Addr); err != nil { + return false + } + // Verify the self-signature when present (rejects forged gossip from a + // compromised native that does not control the announced PeerID). + if ok, _ := reg.Verify(); !ok { + return false + } + // Accept only messages from known native peers or from this host itself. + // This prevents external PSK participants from injecting registry entries. + from := msg.GetFrom() + if from == ix.Host.ID() { + return true + } + common.StreamNativeMu.RLock() + _, knownNative := common.StaticNatives[from.String()] + if !knownNative { + for _, ad := range common.StaticNatives { + if ad.ID == from { + knownNative = true + break + } + } + } + common.StreamNativeMu.RUnlock() + return knownNative }) topic, err := ix.PS.Join(common.TopicIndexerRegistry) if err != nil { @@ -147,18 +199,18 @@ func (ix *IndexerService) subscribeIndexerRegistry() { if err != nil { return } - addr := string(msg.Data) - if addr == "" { + // The gossip payload is a JSON-encoded IndexerRegistration (signed). + var gossipReg common.IndexerRegistration + if jsonErr := json.Unmarshal(msg.Data, &gossipReg); jsonErr != nil { continue } - if peer, err := pp.AddrInfoFromString(addr); err == nil { - ix.Native.knownMu.Lock() - ix.Native.knownPeerIDs[peer.ID.String()] = addr - ix.Native.knownMu.Unlock() - + if gossipReg.Addr == "" || gossipReg.PeerID == "" { + continue } // A neighbouring native registered this PeerID; add to known set for DHT refresh. - + ix.Native.knownMu.Lock() + ix.Native.knownPeerIDs[gossipReg.PeerID] = gossipReg.Addr + ix.Native.knownMu.Unlock() } }() } @@ -171,86 +223,172 @@ func (ix *IndexerService) handleNativeSubscription(s network.Stream) { logger := oclib.GetLogger() logger.Info().Msg("Subscription") + for { + var reg common.IndexerRegistration + if err := json.NewDecoder(s).Decode(®); err != nil { + logger.Err(err).Msg("native subscription: decode") + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || + strings.Contains(err.Error(), "reset") || + strings.Contains(err.Error(), "closed") || + strings.Contains(err.Error(), "too many connections") { + return + } + continue + } + logger.Info().Msg("Subscription " + reg.Addr) + if reg.Addr == "" { + logger.Error().Msg("native subscription: missing addr") + return + } + if reg.PeerID == "" { + ad, err := pp.AddrInfoFromString(reg.Addr) + if err != nil { + logger.Err(err).Msg("native subscription: invalid addr") + return + } + reg.PeerID = ad.ID.String() + } + + // Reject registrations with an invalid self-signature. + if ok, err := reg.Verify(); !ok { + logger.Warn().Str("peer", reg.PeerID).Err(err).Msg("native subscription: invalid signature, rejecting") + return + } + + // Build entry with a fresh TTL — must happen before the cache write so the + // TTL window is not consumed by DHT retries. + entry := &liveIndexerEntry{ + PeerID: reg.PeerID, + Addr: reg.Addr, + ExpiresAt: time.Now().UTC().Add(IndexerTTL), + RegTimestamp: reg.Timestamp, + PubKey: reg.PubKey, + Signature: reg.Signature, + } + + // Verify that the declared address is actually reachable before admitting + // the registration. This async dial runs in the background; the indexer is + // tentatively admitted immediately (so heartbeats don't get stuck) but is + // evicted from the cache if the dial fails within 5 s. + go func(e *liveIndexerEntry) { + ad, err := pp.AddrInfoFromString(e.Addr) + if err != nil { + logger.Warn().Str("addr", e.Addr).Msg("native subscription: invalid addr during validation, rejecting") + ix.Native.liveIndexersMu.Lock() + if cur := ix.Native.liveIndexers[e.PeerID]; cur == e { + delete(ix.Native.liveIndexers, e.PeerID) + } + ix.Native.liveIndexersMu.Unlock() + return + } + dialCtx, dialCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer dialCancel() + if err := ix.Host.Connect(dialCtx, *ad); err != nil { + logger.Warn().Str("addr", e.Addr).Err(err).Msg("native subscription: declared address unreachable, rejecting") + ix.Native.liveIndexersMu.Lock() + if cur := ix.Native.liveIndexers[e.PeerID]; cur == e { + delete(ix.Native.liveIndexers, e.PeerID) + } + ix.Native.liveIndexersMu.Unlock() + } + }(entry) + + // Update local cache and known set immediately so concurrent GetIndexers calls + // can already see this indexer without waiting for the DHT write to complete. + ix.Native.liveIndexersMu.Lock() + _, isRenewal := ix.Native.liveIndexers[reg.PeerID] + ix.Native.liveIndexers[reg.PeerID] = entry + ix.Native.liveIndexersMu.Unlock() + + ix.Native.knownMu.Lock() + ix.Native.knownPeerIDs[reg.PeerID] = reg.Addr + ix.Native.knownMu.Unlock() + + // Gossip the signed registration to neighbouring natives. + // The payload is JSON-encoded so the receiver can verify the self-signature. + ix.PubsubMu.RLock() + topic := ix.LongLivedPubSubs[common.TopicIndexerRegistry] + ix.PubsubMu.RUnlock() + if topic != nil { + if gossipData, marshalErr := json.Marshal(reg); marshalErr == nil { + if err := topic.Publish(context.Background(), gossipData); err != nil { + logger.Err(err).Msg("native subscription: registry gossip publish") + } + } + } + + if isRenewal { + // logger.Debug().Str("peer", reg.PeerID).Msg("native: indexer TTL renewed : " + fmt.Sprintf("%v", len(ix.Native.liveIndexers))) + } else { + logger.Info().Str("peer", reg.PeerID).Msg("native: indexer registered : " + fmt.Sprintf("%v", len(ix.Native.liveIndexers))) + } + + // Persist in DHT asynchronously with bounded retry. + // Max retry window = IndexerTTL (90 s) — retrying past entry expiry is pointless. + // Backoff: 10 s → 20 s → 40 s, then repeats at 40 s until deadline. + key := ix.genIndexerKey(reg.PeerID) + data, err := json.Marshal(entry) + if err != nil { + logger.Err(err).Msg("native subscription: marshal entry") + return + } + go func() { + deadline := time.Now().Add(IndexerTTL) + backoff := 10 * time.Second + for { + if time.Now().After(deadline) { + logger.Warn().Str("key", key).Msg("native subscription: DHT put abandoned, entry TTL exceeded") + return + } + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + err := ix.DHT.PutValue(ctx, key, data) + cancel() + if err == nil { + return + } + logger.Err(err).Msg("native subscription: DHT put " + key) + if !strings.Contains(err.Error(), "failed to find any peer in table") { + return // non-retryable error + } + remaining := time.Until(deadline) + if backoff > remaining { + backoff = remaining + } + if backoff <= 0 { + return + } + time.Sleep(backoff) + if backoff < 40*time.Second { + backoff *= 2 + } + } + }() + break + } +} + +// handleNativeUnsubscribe removes a departing indexer from the local cache and +// known set immediately, without waiting for TTL expiry. +func (ix *IndexerService) handleNativeUnsubscribe(s network.Stream) { + defer s.Close() + logger := oclib.GetLogger() var reg common.IndexerRegistration if err := json.NewDecoder(s).Decode(®); err != nil { - logger.Err(err).Msg("native subscription: decode") - return - } - logger.Info().Msg("Subscription " + reg.Addr) - - if reg.Addr == "" { - logger.Error().Msg("native subscription: missing addr") + logger.Err(err).Msg("native unsubscribe: decode") return } if reg.PeerID == "" { - ad, err := pp.AddrInfoFromString(reg.Addr) - if err != nil { - logger.Err(err).Msg("native subscription: invalid addr") - return - } - reg.PeerID = ad.ID.String() - } - - // Build entry with a fresh TTL — must happen before the cache write so the 66s - // window is not consumed by DHT retries. - entry := &liveIndexerEntry{ - PeerID: reg.PeerID, - Addr: reg.Addr, - ExpiresAt: time.Now().UTC().Add(IndexerTTL), - } - - // Update local cache and known set immediately so concurrent GetIndexers calls - // can already see this indexer without waiting for the DHT write to complete. - ix.Native.liveIndexersMu.Lock() - _, isRenewal := ix.Native.liveIndexers[reg.PeerID] - ix.Native.liveIndexers[reg.PeerID] = entry - ix.Native.liveIndexersMu.Unlock() - - ix.Native.knownMu.Lock() - ix.Native.knownPeerIDs[reg.PeerID] = reg.Addr - ix.Native.knownMu.Unlock() - - // Gossip PeerID to neighbouring natives so they discover it via DHT. - ix.PubsubMu.RLock() - topic := ix.LongLivedPubSubs[common.TopicIndexerRegistry] - ix.PubsubMu.RUnlock() - if topic != nil { - if err := topic.Publish(context.Background(), []byte(reg.Addr)); err != nil { - logger.Err(err).Msg("native subscription: registry gossip publish") - } - } - - if isRenewal { - logger.Debug().Str("peer", reg.PeerID).Msg("native: indexer TTL renewed : " + fmt.Sprintf("%v", len(ix.Native.liveIndexers))) - } else { - logger.Info().Str("peer", reg.PeerID).Msg("native: indexer registered : " + fmt.Sprintf("%v", len(ix.Native.liveIndexers))) - } - - // Persist in DHT asynchronously — retries must not block the handler or consume - // the local cache TTL. - key := ix.genIndexerKey(reg.PeerID) - data, err := json.Marshal(entry) - if err != nil { - logger.Err(err).Msg("native subscription: marshal entry") + logger.Warn().Msg("native unsubscribe: missing peer_id") return } - go func() { - for { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - if err := ix.DHT.PutValue(ctx, key, data); err != nil { - cancel() - logger.Err(err).Msg("native subscription: DHT put " + key) - if strings.Contains(err.Error(), "failed to find any peer in table") { - time.Sleep(10 * time.Second) - continue - } - return - } - cancel() - return - } - }() + ix.Native.liveIndexersMu.Lock() + delete(ix.Native.liveIndexers, reg.PeerID) + ix.Native.liveIndexersMu.Unlock() + ix.Native.knownMu.Lock() + delete(ix.Native.knownPeerIDs, reg.PeerID) + ix.Native.knownMu.Unlock() + logger.Info().Str("peer", reg.PeerID).Msg("native: indexer explicitly unregistered") } // handleNativeGetIndexers returns this native's own list of reachable indexers. @@ -260,39 +398,47 @@ func (ix *IndexerService) handleNativeSubscription(s network.Stream) { func (ix *IndexerService) handleNativeGetIndexers(s network.Stream) { defer s.Close() logger := oclib.GetLogger() + for { + var req common.GetIndexersRequest + if err := json.NewDecoder(s).Decode(&req); err != nil { + logger.Err(err).Msg("native get indexers: decode") + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || + strings.Contains(err.Error(), "reset") || + strings.Contains(err.Error(), "closed") || + strings.Contains(err.Error(), "too many connections") { + return + } + continue + } + if req.Count <= 0 { + req.Count = 3 + } + callerPeerID := s.Conn().RemotePeer().String() + reachable := ix.reachableLiveIndexers(req.Count, callerPeerID) + var resp common.GetIndexersResponse - var req common.GetIndexersRequest - if err := json.NewDecoder(s).Decode(&req); err != nil { - logger.Err(err).Msg("native get indexers: decode") - return - } - if req.Count <= 0 { - req.Count = 3 - } - callerPeerID := s.Conn().RemotePeer().String() - reachable := ix.reachableLiveIndexers(req.Count, callerPeerID) - var resp common.GetIndexersResponse - - if len(reachable) == 0 { - // No live indexers reachable — try to self-delegate. - if ix.selfDelegate(s.Conn().RemotePeer(), &resp) { - logger.Info().Str("peer", callerPeerID).Msg("native: no indexers, acting as fallback for node") + if len(reachable) == 0 { + // No live indexers reachable — try to self-delegate. + if ix.selfDelegate(s.Conn().RemotePeer(), &resp) { + logger.Info().Str("peer", callerPeerID).Msg("native: no indexers, acting as fallback for node") + } else { + // Fallback pool saturated: return empty so the caller retries another + // native instead of piling more load onto this one. + logger.Warn().Str("peer", callerPeerID).Int("pool", maxFallbackPeers).Msg( + "native: fallback pool saturated, refusing self-delegation") + } } else { - // Fallback pool saturated: return empty so the caller retries another - // native instead of piling more load onto this one. - logger.Warn().Str("peer", callerPeerID).Int("pool", maxFallbackPeers).Msg( - "native: fallback pool saturated, refusing self-delegation") + rand.Shuffle(len(reachable), func(i, j int) { reachable[i], reachable[j] = reachable[j], reachable[i] }) + if req.Count > len(reachable) { + req.Count = len(reachable) + } + resp.Indexers = reachable[:req.Count] } - } else { - rand.Shuffle(len(reachable), func(i, j int) { reachable[i], reachable[j] = reachable[j], reachable[i] }) - if req.Count > len(reachable) { - req.Count = len(reachable) - } - resp.Indexers = reachable[:req.Count] - } - if err := json.NewEncoder(s).Encode(resp); err != nil { - logger.Err(err).Msg("native get indexers: encode response") + if err := json.NewEncoder(s).Encode(resp); err != nil { + logger.Err(err).Msg("native get indexers: encode response") + } + break } } @@ -303,39 +449,47 @@ func (ix *IndexerService) handleNativeGetIndexers(s network.Stream) { func (ix *IndexerService) handleNativeConsensus(s network.Stream) { defer s.Close() logger := oclib.GetLogger() - - var req common.ConsensusRequest - if err := json.NewDecoder(s).Decode(&req); err != nil { - logger.Err(err).Msg("native consensus: decode") - return - } - - myList := ix.reachableLiveIndexers(-1, s.Conn().RemotePeer().String()) - mySet := make(map[string]struct{}, len(myList)) - for _, addr := range myList { - mySet[addr] = struct{}{} - } - - trusted := []string{} - candidateSet := make(map[string]struct{}, len(req.Candidates)) - for _, addr := range req.Candidates { - candidateSet[addr] = struct{}{} - if _, ok := mySet[addr]; ok { - trusted = append(trusted, addr) // candidate we also confirm as reachable + for { + var req common.ConsensusRequest + if err := json.NewDecoder(s).Decode(&req); err != nil { + logger.Err(err).Msg("native consensus: decode") + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || + strings.Contains(err.Error(), "reset") || + strings.Contains(err.Error(), "closed") || + strings.Contains(err.Error(), "too many connections") { + return + } + continue } - } - // Extras we trust but that the requester didn't include → suggestions. - suggestions := []string{} - for _, addr := range myList { - if _, inCandidates := candidateSet[addr]; !inCandidates { - suggestions = append(suggestions, addr) + myList := ix.reachableLiveIndexers(-1, s.Conn().RemotePeer().String()) + mySet := make(map[string]struct{}, len(myList)) + for _, addr := range myList { + mySet[addr] = struct{}{} } - } - resp := common.ConsensusResponse{Trusted: trusted, Suggestions: suggestions} - if err := json.NewEncoder(s).Encode(resp); err != nil { - logger.Err(err).Msg("native consensus: encode response") + trusted := []string{} + candidateSet := make(map[string]struct{}, len(req.Candidates)) + for _, addr := range req.Candidates { + candidateSet[addr] = struct{}{} + if _, ok := mySet[addr]; ok { + trusted = append(trusted, addr) // candidate we also confirm as reachable + } + } + + // Extras we trust but that the requester didn't include → suggestions. + suggestions := []string{} + for _, addr := range myList { + if _, inCandidates := candidateSet[addr]; !inCandidates { + suggestions = append(suggestions, addr) + } + } + + resp := common.ConsensusResponse{Trusted: trusted, Suggestions: suggestions} + if err := json.NewEncoder(s).Encode(resp); err != nil { + logger.Err(err).Msg("native consensus: encode response") + } + break } } @@ -406,11 +560,16 @@ func (ix *IndexerService) reachableLiveIndexers(count int, from ...string) []str // refreshIndexersFromDHT runs in background and queries the shared DHT for every known // indexer PeerID whose local cache entry is missing or expired. This supplements the // local cache with entries written by neighbouring natives. -func (ix *IndexerService) refreshIndexersFromDHT() { +func (ix *IndexerService) refreshIndexersFromDHT(ctx context.Context) { t := time.NewTicker(dhtRefreshInterval) defer t.Stop() logger := oclib.GetLogger() - for range t.C { + for { + select { + case <-ctx.Done(): + return + case <-t.C: + } ix.Native.knownMu.RLock() peerIDs := make([]string, 0, len(ix.Native.knownPeerIDs)) for pid := range ix.Native.knownPeerIDs { @@ -427,10 +586,10 @@ func (ix *IndexerService) refreshIndexersFromDHT() { continue // still fresh in local cache } key := ix.genIndexerKey(pid) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - ch, err := ix.DHT.SearchValue(ctx, key) + dhtCtx, dhtCancel := context.WithTimeout(context.Background(), 5*time.Second) + ch, err := ix.DHT.SearchValue(dhtCtx, key) if err != nil { - cancel() + dhtCancel() continue } var best *liveIndexerEntry @@ -445,7 +604,7 @@ func (ix *IndexerService) refreshIndexersFromDHT() { } } } - cancel() + dhtCancel() if best != nil { ix.Native.liveIndexersMu.Lock() ix.Native.liveIndexers[best.PeerID] = best @@ -468,11 +627,16 @@ func (ix *IndexerService) genIndexerKey(peerID string) string { // runOffloadLoop periodically checks if real indexers are available and releases // responsible peers so they can reconnect to actual indexers on their next attempt. -func (ix *IndexerService) runOffloadLoop() { +func (ix *IndexerService) runOffloadLoop(ctx context.Context) { t := time.NewTicker(offloadInterval) defer t.Stop() logger := oclib.GetLogger() - for range t.C { + for { + select { + case <-ctx.Done(): + return + case <-t.C: + } fmt.Println("runOffloadLoop", ix.Native.responsiblePeers) ix.Native.responsibleMu.RLock() count := len(ix.Native.responsiblePeers) @@ -540,38 +704,46 @@ func (ix *IndexerService) runOffloadLoop() { func (ix *IndexerService) handleNativeGetPeers(s network.Stream) { defer s.Close() logger := oclib.GetLogger() - - var req common.GetNativePeersRequest - if err := json.NewDecoder(s).Decode(&req); err != nil { - logger.Err(err).Msg("native get peers: decode") - return - } - if req.Count <= 0 { - req.Count = 1 - } - - excludeSet := make(map[string]struct{}, len(req.Exclude)) - for _, e := range req.Exclude { - excludeSet[e] = struct{}{} - } - - common.StreamNativeMu.RLock() - candidates := make([]string, 0, len(common.StaticNatives)) - for addr := range common.StaticNatives { - if _, excluded := excludeSet[addr]; !excluded { - candidates = append(candidates, addr) + for { + var req common.GetNativePeersRequest + if err := json.NewDecoder(s).Decode(&req); err != nil { + logger.Err(err).Msg("native get peers: decode") + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || + strings.Contains(err.Error(), "reset") || + strings.Contains(err.Error(), "closed") || + strings.Contains(err.Error(), "too many connections") { + return + } + continue + } + if req.Count <= 0 { + req.Count = 1 } - } - common.StreamNativeMu.RUnlock() - rand.Shuffle(len(candidates), func(i, j int) { candidates[i], candidates[j] = candidates[j], candidates[i] }) - if req.Count > len(candidates) { - req.Count = len(candidates) - } + excludeSet := make(map[string]struct{}, len(req.Exclude)) + for _, e := range req.Exclude { + excludeSet[e] = struct{}{} + } - resp := common.GetNativePeersResponse{Peers: candidates[:req.Count]} - if err := json.NewEncoder(s).Encode(resp); err != nil { - logger.Err(err).Msg("native get peers: encode response") + common.StreamNativeMu.RLock() + candidates := make([]string, 0, len(common.StaticNatives)) + for addr := range common.StaticNatives { + if _, excluded := excludeSet[addr]; !excluded { + candidates = append(candidates, addr) + } + } + common.StreamNativeMu.RUnlock() + + rand.Shuffle(len(candidates), func(i, j int) { candidates[i], candidates[j] = candidates[j], candidates[i] }) + if req.Count > len(candidates) { + req.Count = len(candidates) + } + + resp := common.GetNativePeersResponse{Peers: candidates[:req.Count]} + if err := json.NewEncoder(s).Encode(resp); err != nil { + logger.Err(err).Msg("native get peers: encode response") + } + break } } diff --git a/daemons/node/indexer/service.go b/daemons/node/indexer/service.go index 708ff49..d238fd6 100644 --- a/daemons/node/indexer/service.go +++ b/daemons/node/indexer/service.go @@ -4,6 +4,7 @@ import ( "context" "oc-discovery/conf" "oc-discovery/daemons/node/common" + "strings" "sync" oclib "cloud.o-forge.io/core/oc-lib" @@ -60,9 +61,19 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int, isNative boo } } - if ix.DHT, err = dht.New( - context.Background(), - ix.Host, + // Parse bootstrap peers from configured native/indexer addresses so that the + // DHT can find its routing table entries even in a fresh deployment. + var bootstrapPeers []pp.AddrInfo + for _, addrStr := range strings.Split(conf.GetConfig().NativeIndexerAddresses+","+conf.GetConfig().IndexerAddresses, ",") { + addrStr = strings.TrimSpace(addrStr) + if addrStr == "" { + continue + } + if ad, err := pp.AddrInfoFromString(addrStr); err == nil { + bootstrapPeers = append(bootstrapPeers, *ad) + } + } + dhtOpts := []dht.Option{ dht.Mode(dht.ModeServer), dht.ProtocolPrefix("oc"), // 🔥 réseau privé dht.Validator(record.NamespacedValidator{ @@ -71,7 +82,11 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int, isNative boo "name": DefaultValidator{}, "pid": DefaultValidator{}, }), - ); err != nil { + } + if len(bootstrapPeers) > 0 { + dhtOpts = append(dhtOpts, dht.BootstrapPeers(bootstrapPeers...)) + } + if ix.DHT, err = dht.New(context.Background(), ix.Host, dhtOpts...); err != nil { logger.Info().Msg(err.Error()) return nil } @@ -90,6 +105,16 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int, isNative boo } func (ix *IndexerService) Close() { + if ix.Native != nil && ix.Native.cancel != nil { + ix.Native.cancel() + } + // Explicitly deregister from natives on clean shutdown so they evict this + // indexer immediately rather than waiting for TTL expiry (~90 s). + if !ix.IsNative { + if nativeAddrs := conf.GetConfig().NativeIndexerAddresses; nativeAddrs != "" { + common.UnregisterFromNative(ix.Host, nativeAddrs) + } + } ix.DHT.Close() ix.PS.UnregisterTopicValidator(common.TopicPubSubSearch) if ix.nameIndex != nil { diff --git a/daemons/node/nats.go b/daemons/node/nats.go index 4222acc..e7f7eb7 100644 --- a/daemons/node/nats.go +++ b/daemons/node/nats.go @@ -117,7 +117,7 @@ func ListenNATS(n *Node) { proto = stream.ProtocolMinioConfigResource } if err := json.Unmarshal(resp.Payload, &m); err == nil { - peers, _ := n.GetPeerRecord(context.Background(), m.PeerID) + peers, _ := n.GetPeerRecord(context.Background(), m.PeerID, false) for _, p := range peers { n.StreamService.PublishCommon(&resp.Datatype, resp.User, p.PeerID, proto, resp.Payload) @@ -137,7 +137,7 @@ func ListenNATS(n *Node) { var m executionConsidersPayload if err := json.Unmarshal(resp.Payload, &m); err == nil { for _, p := range m.PeerIDs { - peers, _ := n.GetPeerRecord(context.Background(), p) + peers, _ := n.GetPeerRecord(context.Background(), p, false) for _, pp := range peers { n.StreamService.PublishCommon(&resp.Datatype, resp.User, pp.PeerID, stream.ProtocolConsidersResource, resp.Payload) @@ -150,7 +150,7 @@ func ListenNATS(n *Node) { OriginID string `json:"origin_id"` } if err := json.Unmarshal(propalgation.Payload, &m); err == nil && m.OriginID != "" { - peers, _ := n.GetPeerRecord(context.Background(), m.OriginID) + peers, _ := n.GetPeerRecord(context.Background(), m.OriginID, false) for _, p := range peers { n.StreamService.PublishCommon(nil, resp.User, p.PeerID, stream.ProtocolConsidersResource, propalgation.Payload) @@ -192,7 +192,7 @@ func ListenNATS(n *Node) { if propalgation.DataType == int(tools.PEER) { m := map[string]interface{}{} if err := json.Unmarshal(propalgation.Payload, &m); err == nil { - if peers, err := n.GetPeerRecord(context.Background(), fmt.Sprintf("%v", m["search"])); err == nil { + if peers, err := n.GetPeerRecord(context.Background(), fmt.Sprintf("%v", m["search"]), true); err == nil { for _, p := range peers { if b, err := json.Marshal(p); err == nil { go tools.NewNATSCaller().SetNATSPub(tools.SEARCH_EVENT, tools.NATSResponse{ diff --git a/daemons/node/node.go b/daemons/node/node.go index 10d2de7..08930da 100644 --- a/daemons/node/node.go +++ b/daemons/node/node.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "maps" "oc-discovery/conf" "oc-discovery/daemons/node/common" "oc-discovery/daemons/node/indexer" @@ -110,7 +109,6 @@ func InitNode(isNode bool, isIndexer bool, isNativeIndexer bool) (*Node, error) if _, err := node.claimInfo(conf.GetConfig().Name, conf.GetConfig().Hostname); err != nil { panic(err) } - logger.Info().Msg("subscribe to decentralized search flow...") logger.Info().Msg("run garbage collector...") node.StartGC(30 * time.Second) @@ -122,10 +120,17 @@ func InitNode(isNode bool, isIndexer bool, isNativeIndexer bool) (*Node, error) panic(err) } f := func(ctx context.Context, evt common.Event, topic string) { - if p, err := node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 { - node.StreamService.SendResponse(p[0], &evt) + m := map[string]interface{}{} + err := json.Unmarshal(evt.Payload, &m) + if err != nil || evt.From == node.PeerID.String() { + fmt.Println(evt.From, node.PeerID.String(), err) + return + } + if p, err := node.GetPeerRecord(ctx, evt.From, false); err == nil && len(p) > 0 && m["search"] != nil { + node.StreamService.SendResponse(p[0], &evt, fmt.Sprintf("%v", m["search"])) } } + logger.Info().Msg("subscribe to decentralized search flow...") node.SubscribeToSearch(node.PS, &f) logger.Info().Msg("connect to NATS") go ListenNATS(node) @@ -187,6 +192,7 @@ func (d *Node) publishPeerRecord( func (d *Node) GetPeerRecord( ctx context.Context, pidOrdid string, + search bool, ) ([]*peer.Peer, error) { var err error var info map[string]indexer.PeerRecord @@ -200,16 +206,14 @@ func (d *Node) GetPeerRecord( // Build the GetValue request: if pidOrdid is neither a UUID DID nor a libp2p // PeerID, treat it as a human-readable name and let the indexer resolve it. getReq := indexer.GetValue{Key: pidOrdid} - isNameSearch := false if pidR, pidErr := pp.Decode(pidOrdid); pidErr == nil { - getReq.PeerID = pidR + getReq.PeerID = pidR.String() } else if _, uuidErr := uuid.Parse(pidOrdid); uuidErr != nil { // Not a UUID DID → treat pidOrdid as a name substring search. getReq.Name = pidOrdid getReq.Key = "" - isNameSearch = true } - + getReq.Search = search for _, ad := range indexerSnapshot2 { if common.StreamIndexers, err = common.TempStream(d.Host, *ad, common.ProtocolGet, "", common.StreamIndexers, map[protocol.ID]*common.ProtocolInfo{}, &common.StreamMuIndexes); err != nil { @@ -224,28 +228,17 @@ func (d *Node) GetPeerRecord( continue } if resp.Found { - if info == nil { - info = resp.Records - } else { - // Aggregate results from all indexers for name searches. - maps.Copy(info, resp.Records) - } - // For exact lookups (PeerID / DID) stop at the first hit. - if !isNameSearch { - break - } + info = resp.Records } + break } var ps []*peer.Peer for _, pr := range info { if pk, err := pr.Verify(); err != nil { return nil, err - } else if ok, p, err := pr.ExtractPeer(d.PeerID.String(), pr.PeerID, pk); err != nil { + } else if _, p, err := pr.ExtractPeer(d.PeerID.String(), pr.PeerID, pk); err != nil { return nil, err } else { - if ok { - d.publishPeerRecord(&pr) - } ps = append(ps, p) } } @@ -316,6 +309,17 @@ func (d *Node) claimInfo( return nil, err } else { _, p, err := rec.ExtractPeer(did, did, pub) + b, err := json.Marshal(p) + if err != nil { + return p, err + } + go tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ + FromApp: "oc-discovery", + Datatype: tools.PEER, + Method: int(tools.CREATE_RESOURCE), + SearchAttr: "peer_id", + Payload: b, + }) return p, err } } diff --git a/daemons/node/pubsub/handler.go b/daemons/node/pubsub/handler.go deleted file mode 100644 index 0f2b42a..0000000 --- a/daemons/node/pubsub/handler.go +++ /dev/null @@ -1,40 +0,0 @@ -package pubsub - -import ( - "context" - "oc-discovery/daemons/node/common" - - "cloud.o-forge.io/core/oc-lib/tools" -) - -func (ps *PubSubService) handleEvent(ctx context.Context, topicName string, evt *common.Event) error { - action := ps.getTopicName(topicName) - if err := ps.handleEventSearch(ctx, evt, action); err != nil { - return err - } - return nil -} - -func (ps *PubSubService) handleEventSearch( // only : on partner followings. 3 canals for every partner. - ctx context.Context, - evt *common.Event, - action tools.PubSubAction, -) error { - if !(action == tools.PB_SEARCH) { - return nil - } - if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 { // peerFrom is Unique - if err := evt.Verify(p[0]); err != nil { - return err - } - switch action { - case tools.PB_SEARCH: // when someone ask for search. - if err := ps.StreamService.SendResponse(p[0], evt); err != nil { - return err - } - default: - return nil - } - } - return nil -} diff --git a/daemons/node/pubsub/publish.go b/daemons/node/pubsub/publish.go index d91d961..296ce18 100644 --- a/daemons/node/pubsub/publish.go +++ b/daemons/node/pubsub/publish.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "oc-discovery/daemons/node/common" "oc-discovery/daemons/node/stream" "oc-discovery/models" @@ -20,15 +21,7 @@ func (ps *PubSubService) SearchPublishEvent( } switch typ { case "known": // define Search Strategy - return ps.StreamService.PublishesCommon(dt, user, &dbs.Filters{ // filter by like name, short_description, description, owner, url if no filters are provided - And: map[string][]dbs.Filter{ - "": {{Operator: dbs.NOT.String(), Value: dbs.Filters{ // filter by like name, short_description, description, owner, url if no filters are provided - And: map[string][]dbs.Filter{ - "relation": {{Operator: dbs.EQUAL.String(), Value: peer.BLACKLIST}}, - }, - }}}, - }, - }, b, stream.ProtocolSearchResource) //if partners focus only them*/ + return ps.StreamService.PublishesCommon(dt, user, nil, b, stream.ProtocolSearchResource) //if partners focus only them*/ case "partner": // define Search Strategy return ps.StreamService.PublishesCommon(dt, user, &dbs.Filters{ // filter by like name, short_description, description, owner, url if no filters are provided And: map[string][]dbs.Filter{ @@ -40,23 +33,26 @@ func (ps *PubSubService) SearchPublishEvent( if err != nil { return err } - return ps.publishEvent(ctx, dt, tools.PB_SEARCH, user, b) + return ps.publishEvent(ctx, dt, tools.PB_SEARCH, common.TopicPubSubSearch, user, b) default: return errors.New("no type of research found") } } func (ps *PubSubService) publishEvent( - ctx context.Context, dt *tools.DataType, action tools.PubSubAction, user string, payload []byte, + ctx context.Context, dt *tools.DataType, action tools.PubSubAction, topicName string, user string, payload []byte, ) error { priv, err := tools.LoadKeyFromFilePrivate() if err != nil { return err } msg, _ := json.Marshal(models.NewEvent(action.String(), ps.Host.ID().String(), dt, user, payload, priv)) - topic, err := ps.PS.Join(action.String()) - if err != nil { - return err + topic := ps.Node.GetPubSub(topicName) + if topic == nil { + topic, err = ps.PS.Join(topicName) + if err != nil { + return err + } } return topic.Publish(ctx, msg) } diff --git a/daemons/node/pubsub/service.go b/daemons/node/pubsub/service.go index e99efdd..7438ffb 100644 --- a/daemons/node/pubsub/service.go +++ b/daemons/node/pubsub/service.go @@ -4,17 +4,13 @@ import ( "context" "oc-discovery/daemons/node/common" "oc-discovery/daemons/node/stream" - "strings" "sync" - oclib "cloud.o-forge.io/core/oc-lib" - "cloud.o-forge.io/core/oc-lib/tools" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" ) type PubSubService struct { - *common.LongLivedPubSubService Node common.DiscoveryPeer Host host.Host PS *pubsub.PubSub @@ -24,24 +20,12 @@ type PubSubService struct { } func InitPubSub(ctx context.Context, h host.Host, ps *pubsub.PubSub, node common.DiscoveryPeer, streamService *stream.StreamService) (*PubSubService, error) { - service := &PubSubService{ - LongLivedPubSubService: common.NewLongLivedPubSubService(h), - Node: node, - StreamService: streamService, - PS: ps, - } - logger := oclib.GetLogger() - logger.Info().Msg("subscribe to events...") - service.initSubscribeEvents(ctx) - return service, nil -} - -func (ps *PubSubService) getTopicName(topicName string) tools.PubSubAction { - ns := strings.Split(topicName, ".") - if len(ns) > 0 { - return tools.GetActionString(ns[0]) - } - return tools.NONE + return &PubSubService{ + Host: h, + Node: node, + StreamService: streamService, + PS: ps, + }, nil } func (ix *PubSubService) Close() { diff --git a/daemons/node/pubsub/subscribe.go b/daemons/node/pubsub/subscribe.go deleted file mode 100644 index 19fbfb2..0000000 --- a/daemons/node/pubsub/subscribe.go +++ /dev/null @@ -1,45 +0,0 @@ -package pubsub - -import ( - "context" - "oc-discovery/daemons/node/common" - - oclib "cloud.o-forge.io/core/oc-lib" - "cloud.o-forge.io/core/oc-lib/models/peer" - "cloud.o-forge.io/core/oc-lib/tools" -) - -func (ps *PubSubService) initSubscribeEvents(ctx context.Context) error { - if err := ps.subscribeEvents(ctx, nil, tools.PB_SEARCH, ""); err != nil { - return err - } - return nil -} - -// generic function to subscribe to DHT flow of event -func (ps *PubSubService) subscribeEvents( - ctx context.Context, dt *tools.DataType, action tools.PubSubAction, peerID string, -) error { - logger := oclib.GetLogger() - // define a name app.action#peerID - name := action.String() + "#" + peerID - if dt != nil { // if a datatype is precised then : app.action.datatype#peerID - name = action.String() + "." + (*dt).String() + "#" + peerID - } - f := func(ctx context.Context, evt common.Event, topicName string) { - if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 { - if err := ps.processEvent(ctx, p[0], &evt, topicName); err != nil { - logger.Err(err) - } - } - } - return common.SubscribeEvents(ps.LongLivedPubSubService, ctx, name, -1, f) -} - -func (ps *PubSubService) processEvent( - ctx context.Context, p *peer.Peer, event *common.Event, topicName string) error { - if err := event.Verify(p); err != nil { - return err - } - return ps.handleEvent(ctx, topicName, event) -} diff --git a/daemons/node/stream/handler.go b/daemons/node/stream/handler.go index 356b3c3..b1f21a5 100644 --- a/daemons/node/stream/handler.go +++ b/daemons/node/stream/handler.go @@ -9,6 +9,7 @@ import ( "oc-discovery/daemons/node/common" oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/booking/planner" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resources" @@ -152,16 +153,34 @@ func (abs *StreamService) pass(event *common.Event, action tools.PubSubAction) e func (ps *StreamService) handleEventFromPartner(evt *common.Event, protocol string) error { switch protocol { case ProtocolSearchResource: - if evt.DataType < 0 { + m := map[string]interface{}{} + err := json.Unmarshal(evt.Payload, &m) + if err != nil { + return err + } + if search, ok := m["search"]; ok { access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) - peers := access.Search(nil, evt.From, false) + peers := access.Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "peer_id": {{Operator: dbs.EQUAL.String(), Value: evt.From}}, + }, + }, evt.From, false) if len(peers.Data) > 0 { p := peers.Data[0].(*peer.Peer) - // TODO : something if peer is missing in our side ! - ps.SendResponse(p, evt) - } else if p, err := ps.Node.GetPeerRecord(context.Background(), evt.From); err == nil && len(p) > 0 { // peer from is peerID - ps.SendResponse(p[0], evt) + fmt.Println(evt.From, p.GetID(), peers.Data) + + ps.SendResponse(p, evt, fmt.Sprintf("%v", search)) + } else if p, err := ps.Node.GetPeerRecord(context.Background(), evt.From, false); err == nil && len(p) > 0 { // peer from is peerID + ps.SendResponse(p[0], evt, fmt.Sprintf("%v", search)) } + } else { + fmt.Println("SEND SEARCH_EVENT SetNATSPub", m) + go tools.NewNATSCaller().SetNATSPub(tools.SEARCH_EVENT, tools.NATSResponse{ + FromApp: "oc-discovery", + Datatype: tools.DataType(evt.DataType), + Method: int(tools.SEARCH_EVENT), + Payload: evt.Payload, + }) } case ProtocolCreateResource, ProtocolUpdateResource: fmt.Println("RECEIVED Protocol.Update") @@ -184,32 +203,29 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, protocol stri return nil } -func (abs *StreamService) SendResponse(p *peer.Peer, event *common.Event) error { - dts := []oclib.LibDataEnum{oclib.LibDataEnum(event.DataType)} +func (abs *StreamService) SendResponse(p *peer.Peer, event *common.Event, search string) error { + dts := []tools.DataType{tools.DataType(event.DataType)} if event.DataType == -1 { // expect all resources - dts = []oclib.LibDataEnum{ - oclib.LibDataEnum(oclib.COMPUTE_RESOURCE), - oclib.LibDataEnum(oclib.STORAGE_RESOURCE), - oclib.LibDataEnum(oclib.PROCESSING_RESOURCE), - oclib.LibDataEnum(oclib.DATA_RESOURCE), - oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE)} + dts = []tools.DataType{ + tools.COMPUTE_RESOURCE, + tools.STORAGE_RESOURCE, + tools.PROCESSING_RESOURCE, + tools.DATA_RESOURCE, + tools.WORKFLOW_RESOURCE, + } } - var m map[string]string - err := json.Unmarshal(event.Payload, &m) - if err != nil { + if self, err := oclib.GetMySelf(); err != nil { return err - } - for _, dt := range dts { - access := oclib.NewRequestAdmin(oclib.LibDataEnum(event.DataType), nil) - peerID := p.GetID() - searched := access.Search(abs.FilterPeer(peerID, m["search"]), "", false) - for _, ss := range searched.Data { - if j, err := json.Marshal(ss); err == nil { - if event.DataType != -1 { - ndt := tools.DataType(dt.EnumIndex()) - abs.PublishCommon(&ndt, event.User, peerID, ProtocolSearchResource, j) - } else { - abs.PublishCommon(nil, event.User, peerID, ProtocolSearchResource, j) + } else { + for _, dt := range dts { + access := oclib.NewRequestAdmin(oclib.LibDataEnum(dt), nil) + peerID := p.GetID() + searched := access.Search(abs.FilterPeer(self.GetID(), search), "", false) + fmt.Println("SEND SEARCH_EVENT", self.GetID(), dt, len(searched.Data), peerID) + for _, ss := range searched.Data { + if j, err := json.Marshal(ss); err == nil { + _, err := abs.PublishCommon(&dt, event.User, p.PeerID, ProtocolSearchResource, j) + fmt.Println("Publish ERR", err) } } } diff --git a/daemons/node/stream/publish.go b/daemons/node/stream/publish.go index 887aa3e..9a65a64 100644 --- a/daemons/node/stream/publish.go +++ b/daemons/node/stream/publish.go @@ -17,7 +17,12 @@ import ( func (ps *StreamService) PublishesCommon(dt *tools.DataType, user string, filter *dbs.Filters, resource []byte, protos ...protocol.ID) error { access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) - p := access.Search(filter, "", false) + var p oclib.LibDataShallow + if filter == nil { + p = access.LoadAll(false) + } else { + p = access.Search(filter, "", false) + } for _, pes := range p.Data { for _, proto := range protos { if _, err := ps.PublishCommon(dt, user, pes.(*peer.Peer).PeerID, proto, resource); err != nil { @@ -31,6 +36,7 @@ func (ps *StreamService) PublishesCommon(dt *tools.DataType, user string, filter func (ps *StreamService) PublishCommon(dt *tools.DataType, user string, toPeerID string, proto protocol.ID, resource []byte) (*common.Stream, error) { fmt.Println("PublishCommon") if toPeerID == ps.Key.String() { + fmt.Println("Can't send to ourself !") return nil, errors.New("Can't send to ourself !") } @@ -43,14 +49,15 @@ func (ps *StreamService) PublishCommon(dt *tools.DataType, user string, toPeerID var pe *peer.Peer if len(p.Data) > 0 && p.Data[0].(*peer.Peer).Relation != peer.BLACKLIST { pe = p.Data[0].(*peer.Peer) - } else if pps, err := ps.Node.GetPeerRecord(context.Background(), toPeerID); err == nil && len(pps) > 0 { + } else if pps, err := ps.Node.GetPeerRecord(context.Background(), toPeerID, false); err == nil && len(pps) > 0 { pe = pps[0] } if pe != nil { - ad, err := pp.AddrInfoFromString(p.Data[0].(*peer.Peer).StreamAddress) + ad, err := pp.AddrInfoFromString(pe.StreamAddress) if err != nil { return nil, err } + fmt.Println("WRITE") return ps.write(toPeerID, ad, dt, user, resource, proto) } return nil, errors.New("peer unvalid " + toPeerID) @@ -100,11 +107,19 @@ func (ps *StreamService) ToPartnerPublishEvent( for k := range protocolsPartners { ks = append(ks, k) } + var proto protocol.ID + proto = ProtocolCreateResource + switch action { + case tools.PB_DELETE: + proto = ProtocolDeleteResource + case tools.PB_UPDATE: + proto = ProtocolUpdateResource + } ps.PublishesCommon(dt, user, &dbs.Filters{ // filter by like name, short_description, description, owner, url if no filters are provided And: map[string][]dbs.Filter{ "relation": {{Operator: dbs.EQUAL.String(), Value: peer.PARTNER}}, }, - }, payload, ks...) + }, payload, proto) return nil } @@ -129,13 +144,21 @@ func (s *StreamService) write( return nil, errors.New("no stream available for protocol " + fmt.Sprintf("%v", proto) + " from PID " + peerID.ID.String()) } - stream := s.Streams[proto][peerID.ID] - evt := common.NewEvent(string(proto), peerID.ID.String(), dt, user, payload) - fmt.Println("SEND EVENT ", evt.From, evt.DataType, evt.Timestamp) - if err := json.NewEncoder(stream.Stream).Encode(evt); err != nil { - stream.Stream.Close() - logger.Err(err) + if self, err := oclib.GetMySelf(); err != nil { + return nil, err + } else { + stream := s.Streams[proto][peerID.ID] + evt := common.NewEvent(string(proto), self.PeerID, dt, user, payload) + fmt.Println("SEND EVENT ", peerID, proto, evt.From, evt.DataType, evt.Timestamp) + if err := json.NewEncoder(stream.Stream).Encode(evt); err != nil { + stream.Stream.Close() + logger.Err(err) + return nil, err + } + if protocolInfo, ok := protocols[proto]; ok && protocolInfo.WaitResponse { + go s.readLoop(stream, peerID.ID, proto, &common.ProtocolInfo{PersistantStream: true}) + } return stream, nil } - return stream, nil + } diff --git a/daemons/node/stream/service.go b/daemons/node/stream/service.go index 18bd5d0..dffa003 100644 --- a/daemons/node/stream/service.go +++ b/daemons/node/stream/service.go @@ -3,7 +3,9 @@ package stream import ( "context" "encoding/json" + "errors" "fmt" + "io" "oc-discovery/conf" "oc-discovery/daemons/node/common" "strings" @@ -82,6 +84,7 @@ func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node c func (s *StreamService) HandleResponse(stream network.Stream) { s.Mu.Lock() + defer s.Mu.Unlock() stream.Protocol() if s.Streams[stream.Protocol()] == nil { s.Streams[stream.Protocol()] = map[pp.ID]*common.Stream{} @@ -98,8 +101,6 @@ func (s *StreamService) HandleResponse(stream network.Stream) { Stream: stream, Expiry: time.Now().UTC().Add(expiry + 1*time.Minute), } - s.Mu.Unlock() - go s.readLoop(s.Streams[stream.Protocol()][stream.Conn().RemotePeer()], stream.Conn().RemotePeer(), stream.Protocol(), protocols[stream.Protocol()]) @@ -256,7 +257,13 @@ func (ps *StreamService) readLoop(s *common.Stream, id pp.ID, proto protocol.ID, if err := json.NewDecoder(s.Stream).Decode(&evt); err != nil { // Any decode error (EOF, reset, malformed JSON) terminates the loop; // continuing on a dead/closed stream creates an infinite spin. - return + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) || + strings.Contains(err.Error(), "reset") || + strings.Contains(err.Error(), "closed") || + strings.Contains(err.Error(), "too many connections") { + return + } + continue } ps.handleEvent(evt.Type, &evt) if protocolInfo.WaitResponse && !protocolInfo.PersistantStream { @@ -266,15 +273,15 @@ func (ps *StreamService) readLoop(s *common.Stream, id pp.ID, proto protocol.ID, } func (abs *StreamService) FilterPeer(peerID string, search string) *dbs.Filters { - id, err := oclib.GetMySelf() + p, err := oclib.GetMySelf() if err != nil { return nil } filter := map[string][]dbs.Filter{ - "creator_id": {{Operator: dbs.EQUAL.String(), Value: id}}, // is my resource... + "abstractinstanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: p.GetID()}}, // is my resource... "": {{Operator: dbs.OR.String(), Value: &dbs.Filters{ Or: map[string][]dbs.Filter{ - "abstractobject.access_mode": {{Operator: dbs.EQUAL.String(), Value: 1}}, // if public + "abstractinstanciatedresource.abstractresource.abstractobject.access_mode": {{Operator: dbs.EQUAL.String(), Value: 1}}, // if public "abstractinstanciatedresource.instances": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{ // or got a partners instances And: map[string][]dbs.Filter{ "resourceinstance.partnerships": {{Operator: dbs.ELEMMATCH.String(), Value: &dbs.Filters{ @@ -287,15 +294,15 @@ func (abs *StreamService) FilterPeer(peerID string, search string) *dbs.Filters }, }}}, } + if search != "" { filter[" "] = []dbs.Filter{{Operator: dbs.OR.String(), Value: &dbs.Filters{ Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided - "abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}}, - "abstractintanciatedresource.abstractresource.abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: search}}, + "abstractinstanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, + "abstractinstanciatedresource.abstractresource.type": {{Operator: dbs.LIKE.String(), Value: search}}, + "abstractinstanciatedresource.abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}}, + "abstractinstanciatedresource.abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}}, + "abstractinstanciatedresource.abstractresource.owners.name": {{Operator: dbs.LIKE.String(), Value: search}}, }, }}} } diff --git a/go.mod b/go.mod index dc59d05..ee81795 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-discovery go 1.25.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260302152414-542b0b73aba5 + cloud.o-forge.io/core/oc-lib v0.0.0-20260304145747-e03a0d3dd0aa github.com/libp2p/go-libp2p v0.47.0 github.com/libp2p/go-libp2p-record v0.3.1 github.com/multiformats/go-multiaddr v0.16.1 diff --git a/go.sum b/go.sum index 703622a..212ec90 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,10 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260226091217-cb3771c17a31 h1:hvkvJibS9NmIm cloud.o-forge.io/core/oc-lib v0.0.0-20260226091217-cb3771c17a31/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= cloud.o-forge.io/core/oc-lib v0.0.0-20260302152414-542b0b73aba5 h1:h+Fkyj6cfwAirc0QGCBEkZSSrgcyThXswg7ytOLm948= cloud.o-forge.io/core/oc-lib v0.0.0-20260302152414-542b0b73aba5/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260304143917-340f2a6301b7 h1:RZGV3ttkfoKIigUb7T+M5Kq+YtqW/td45EmNYeW5u8k= +cloud.o-forge.io/core/oc-lib v0.0.0-20260304143917-340f2a6301b7/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260304145747-e03a0d3dd0aa h1:1wCpI4dwN1pj6MlpJ7/WifhHVHmCE4RU+9klwqgo/bk= +cloud.o-forge.io/core/oc-lib v0.0.0-20260304145747-e03a0d3dd0aa/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=