package stream import ( "context" "encoding/json" "errors" "fmt" "sync" "time" "oc-discovery/daemons/node/common" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/tools" "github.com/libp2p/go-libp2p/core/network" pp "github.com/libp2p/go-libp2p/core/peer" ) // ProtocolObserve is the libp2p protocol for peer connectivity observation. // The requesting oc-discovery opens a stream to the remote oc-discovery and // sends an ObserveRequest. The remote side keeps the stream open and writes // ObserveHeartbeat events back every observeHBInterval seconds. const ProtocolObserve = "/opencloud/peer/observe/1.0" // observeHBEventType is used as the common.Event.Type for heartbeat responses. const observeHBEventType = "/opencloud/peer/observe/heartbeat" const observeHBInterval = 10 * time.Second const observeDrainDuration = 30 * time.Second // observeBatchWindow is the accumulation window before a heartbeat batch is // flushed to NATS. All peer heartbeats received within this window are grouped // into a single PEER_OBSERVE_RESPONSE_EVENT, reducing NATS traffic. const observeBatchWindow = 2 * time.Second // ObserveRequest is the first (and only) message sent by the observing side // when opening a ProtocolObserve stream. type ObserveRequest struct { // Close, when true, asks the remote side to stop the heartbeat goroutine // and remove the observer from its cache. Used for graceful teardown. Close bool `json:"close,omitempty"` } // ObserveHeartbeat is sent by the observed side every observeHBInterval. type ObserveHeartbeat struct { State string `json:"state"` // always "online" when actively emitted SentAt time.Time `json:"sent_at,omitempty"` // timestamp set by sender; lets receiver compute one-way latency } const ( maxLatencyMs = 2000.0 // ms above which latency score → 0 latencySamples = 5 // sliding window size for latency averaging fastThresholdMs = 200.0 // below = "fast", above = "slow" reliableThreshold = 0.95 // miss_rate below 5% = "reliable" ) // PeerObserveMetrics accumulates connection-quality data for one observed peer. // Updated on every incoming heartbeat (observing side). type PeerObserveMetrics struct { mu sync.Mutex firstObservedAt time.Time lastHeartbeatAt time.Time received uint64 latencies [latencySamples]time.Duration latIdx int latCount int } func (m *PeerObserveMetrics) record(latency time.Duration) { m.mu.Lock() defer m.mu.Unlock() m.received++ m.lastHeartbeatAt = time.Now().UTC() m.latencies[m.latIdx%latencySamples] = latency m.latIdx++ if m.latCount < latencySamples { m.latCount++ } } func (m *PeerObserveMetrics) snapshot() PeerObserveSnapshot { m.mu.Lock() defer m.mu.Unlock() var total time.Duration for i := 0; i < m.latCount; i++ { total += m.latencies[i] } var avgMs float64 if m.latCount > 0 { avgMs = float64(total.Milliseconds()) / float64(m.latCount) } expected := int64(time.Duration(m.lastHeartbeatAt.Second()-m.firstObservedAt.Second()) / observeHBInterval) fmt.Println("EXPECTED", expected, m.received) var missRate float64 if expected > 0 { recv := int64(m.received) if recv > expected { recv = expected } missRate = 1.0 - float64(recv)/float64(expected) } latScore := 1.0 - avgMs/maxLatencyMs if latScore < 0 { latScore = 0 } relScore := 1.0 - missRate trust := (0.35*latScore + 0.65*relScore) * 100 speed := "fast" if avgMs >= fastThresholdMs { speed = "slow" } reliability := "reliable" if relScore < reliableThreshold { reliability = "watch" } return PeerObserveSnapshot{ LatencyMs: avgMs, Speed: speed, Reliability: reliability, TrustScore: trust, LastSeenAt: m.lastHeartbeatAt, MissRate: missRate, } } // PeerObserveSnapshot is the point-in-time quality summary sent to oc-peer via NATS. type PeerObserveSnapshot struct { LatencyMs float64 `json:"latency_ms"` Speed string `json:"speed"` // "fast" | "slow" Reliability string `json:"reliability"` // "reliable" | "watch" TrustScore float64 `json:"trust_score"` LastSeenAt time.Time `json:"last_seen_at"` MissRate float64 `json:"miss_rate"` } // ShallowPeer is the minimal peer representation sent by oc-peer in a // PEER_OBSERVE_EVENT. StreamAddress lets oc-discovery connect without a DB // lookup; Address carries the NATSAddress (unused here, forwarded as-is). type ShallowPeer struct { ID string `json:"id"` PeerID string `json:"peer_id"` Address string `json:"address"` StreamAddress string `json:"stream_address"` } // ObserveCommand is the payload carried by a PEER_OBSERVE_EVENT NATS message // (from oc-peer). // // Observe → User + Peers populated // Close → User + PeerIDs + Close=true // CloseAll → CloseAll=true (User optional) type ObserveCommand struct { User string `json:"user"` Peers []ShallowPeer `json:"peers,omitempty"` PeerIDs []string `json:"peer_ids,omitempty"` Close bool `json:"close,omitempty"` CloseAll bool `json:"close_all,omitempty"` } // ── observe cache (observed side) ──────────────────────────────────────────── // observeCache tracks running heartbeat goroutines keyed by the observing // peer's libp2p PeerID string. It is used exclusively on the OBSERVED side. type observeCache struct { mu sync.Mutex cancels map[string]context.CancelFunc } func newObserveCache() *observeCache { return &observeCache{cancels: map[string]context.CancelFunc{}} } func (c *observeCache) set(pid string, cancel context.CancelFunc) { c.mu.Lock() defer c.mu.Unlock() if old, ok := c.cancels[pid]; ok { old() // cancel previous goroutine if any } c.cancels[pid] = cancel } func (c *observeCache) cancel(pid string) { c.mu.Lock() defer c.mu.Unlock() if fn, ok := c.cancels[pid]; ok { fn() delete(c.cancels, pid) } } func (c *observeCache) cancelAll() { c.mu.Lock() defer c.mu.Unlock() for _, fn := range c.cancels { fn() } c.cancels = map[string]context.CancelFunc{} } func (c *observeCache) delete(pid string) { c.mu.Lock() defer c.mu.Unlock() delete(c.cancels, pid) } // ── heartbeat batcher (observing side) ─────────────────────────────────────── // heartbeatBatcher accumulates peer_ids from incoming heartbeats over // observeBatchWindow, then flushes them in a single NATS call. // Using a map as the backing store deduplicates multiple heartbeats from the // same peer within the same window (should not happen, but is harmless). type heartbeatBatcher struct { mu sync.Mutex ids map[string]struct{} timer *time.Timer flush func(peerIDs []string) } func newHeartbeatBatcher(flush func([]string)) *heartbeatBatcher { return &heartbeatBatcher{ ids: make(map[string]struct{}), flush: flush, } } // add records peerID in the current batch and arms the flush timer if needed. func (b *heartbeatBatcher) add(peerID string) { b.mu.Lock() defer b.mu.Unlock() b.ids[peerID] = struct{}{} if b.timer == nil { b.timer = time.AfterFunc(observeBatchWindow, b.fire) } } // fire is called by the timer; it drains the batch and invokes flush. func (b *heartbeatBatcher) fire() { b.mu.Lock() ids := make([]string, 0, len(b.ids)) for id := range b.ids { ids = append(ids, id) } b.ids = make(map[string]struct{}) b.timer = nil b.mu.Unlock() if len(ids) > 0 { b.flush(ids) } } // flushObserveBatch is the flush function wired into the heartbeatBatcher. // It emits two NATS messages: // - PEER_OBSERVE_RESPONSE_EVENT → consumed by oc-peer (direct channel) // - PROPALGATION_EVENT / PB_PROPAGATE → consumed by other oc-discovery nodes func flushObserveBatch(peerIDs []string) { payload, err := json.Marshal(map[string]interface{}{ "peer_ids": peerIDs, "state": "online", }) if err != nil { return } // Direct notification to oc-peer. tools.NewNATSCaller().SetNATSPub(tools.PEER_OBSERVE_RESPONSE_EVENT, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.PEER, Method: int(tools.PEER_OBSERVE_RESPONSE_EVENT), Payload: payload, }) // Broadcast to other oc-discovery nodes so they can forward to their // local oc-peer if needed. propPayload, err := json.Marshal(tools.PropalgationMessage{ DataType: int(tools.PEER), Action: tools.PB_PROPAGATE, Payload: payload, }) if err != nil { return } tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.PEER, Method: int(tools.PROPALGATION_EVENT), Payload: propPayload, }) } // ── incoming observe handler (observed side) ────────────────────────────────── // handleIncomingObserve is called when a remote peer opens an observe stream // to us (observed side). It starts a heartbeat goroutine that writes back on // the same bidirectional rawStream — no separate reverse stream is opened. // The goroutine stops via context cancellation (triggered by a close event // read from rawStream) or when rawStream becomes unwritable. func (s *StreamService) handleIncomingObserve(rawStream network.Stream) error { remotePeerID := rawStream.Conn().RemotePeer().String() log := oclib.GetLogger() // Drain mode: reject any new observations for 30 s after a close-all. s.drainMu.RLock() draining := !s.drainUntil.IsZero() && time.Now().Before(s.drainUntil) s.drainMu.RUnlock() if draining { fmt.Println("Draining") return errors.New("draining") } // Guard: the requesting peer must not be blacklisted. access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) res := access.Search(&dbs.Filters{ And: map[string][]dbs.Filter{ "peer_id": {{Operator: dbs.EQUAL.String(), Value: remotePeerID}}, }, }, "", false, 0, 1) if len(res.Data) > 0 { p := res.Data[0].(*peer.Peer) if p.Relation == peer.BLACKLIST { fmt.Println("CLOSE blacklist or self") return errors.New("can't observe blacklisted peer") } } // Replace any existing heartbeat goroutine for this observer. ctx, cancel := context.WithCancel(context.Background()) s.observeCache.set(remotePeerID, cancel) fmt.Println("LOOP OBSERVE") go func() { // Do NOT close rawStream here: the persistent readLoop (HandleResponse) // owns rawStream's lifecycle. We only stop writing. defer cancel() defer s.observeCache.delete(remotePeerID) ticker := time.NewTicker(observeHBInterval) defer ticker.Stop() buildHBEvent := func() *common.Event { p, _ := json.Marshal(ObserveHeartbeat{State: "online", SentAt: time.Now().UTC()}) return common.NewEvent(observeHBEventType, s.Host.ID().String(), nil, "", p) } for { select { case <-ctx.Done(): return case <-ticker.C: rawStream.SetWriteDeadline(time.Now().Add(5 * time.Second)) evt := buildHBEvent() if err := json.NewEncoder(rawStream).Encode(evt); err != nil { log.Info(). Str("observer", remotePeerID). Err(err). Msg("[observe] heartbeat write failed — stream closed, stopping goroutine") return } rawStream.SetWriteDeadline(time.Time{}) } } }() return nil } // ── heartbeat receiver (observing side) ─────────────────────────────────────── // handleObserveHeartbeat is called by readLoop when a heartbeat event arrives // on an outgoing ProtocolObserve stream. It updates per-peer metrics and flushes // a quality snapshot to NATS. func (ps *StreamService) handleObserveHeartbeat(evt *common.Event) error { var hb ObserveHeartbeat if err := json.Unmarshal(evt.Payload, &hb); err == nil && !hb.SentAt.IsZero() { latency := time.Since(hb.SentAt) raw, _ := ps.observeMetrics.LoadOrStore(evt.From, &PeerObserveMetrics{ firstObservedAt: time.Now().UTC(), }) raw.(*PeerObserveMetrics).record(latency) fmt.Println("METRICS", raw) ps.observeMetrics.Store(evt.From, raw) } ps.flushObserveForPeer(evt.From, evt.User) return nil } // flushObserveForPeer sends a PEER_OBSERVE_RESPONSE_EVENT to NATS with a quality // snapshot for peerID. Replaces the old flushObserveBatch (single-peer variant). func (ps *StreamService) flushObserveForPeer(peerID string, user string) { var snap *PeerObserveSnapshot if raw, ok := ps.observeMetrics.Load(peerID); ok { fmt.Println("RETRIEVED METRICS", raw) s := raw.(*PeerObserveMetrics).snapshot() snap = &s } fmt.Println("RETRIEVED METRICS 2", snap) payload, err := json.Marshal(map[string]interface{}{ "peer_ids": []string{peerID}, "state": "online", "metrics": map[string]*PeerObserveSnapshot{peerID: snap}, }) if err != nil { return } tools.NewNATSCaller().SetNATSPub(tools.PEER_OBSERVE_RESPONSE_EVENT, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.PEER, User: user, Method: int(tools.PEER_OBSERVE_RESPONSE_EVENT), Payload: payload, }) propPayload, err := json.Marshal(tools.PropalgationMessage{ DataType: int(tools.PEER), Action: tools.PB_PROPAGATE, Payload: payload, }) if err != nil { return } tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.PEER, User: user, Method: int(tools.PROPALGATION_EVENT), Payload: propPayload, }) } // ── user→peer index (ref-counted observe management) ───────────────────────── // userPeerIndex tracks which users are observing which peers. // A libp2p observe stream is kept open as long as at least one user watches // the peer; it is closed only when the last user stops. type userPeerIndex struct { mu sync.Mutex index map[string]map[string]struct{} // user → set of peer_id strings } func newUserPeerIndex() *userPeerIndex { return &userPeerIndex{index: map[string]map[string]struct{}{}} } // add registers user as an observer of peerID. // Returns true if peerID was not yet observed by any user (first observer). func (u *userPeerIndex) add(user, peerID string) (isFirst bool) { u.mu.Lock() defer u.mu.Unlock() // Count total observers for peerID across all users before adding. total := 0 for _, peers := range u.index { if _, ok := peers[peerID]; ok { total++ } } if u.index[user] == nil { u.index[user] = map[string]struct{}{} } u.index[user][peerID] = struct{}{} return total == 0 } // remove unregisters user from peerID. // Returns true if no user is observing peerID anymore (last observer removed). func (u *userPeerIndex) remove(user, peerID string) (isLast bool) { u.mu.Lock() defer u.mu.Unlock() delete(u.index[user], peerID) if len(u.index[user]) == 0 { delete(u.index, user) } for _, peers := range u.index { if _, ok := peers[peerID]; ok { return false } } return true } // removeUser removes all entries for user and returns the peer_ids that now // have no remaining observers (i.e., those whose streams should be closed). func (u *userPeerIndex) removeUser(user string) []string { u.mu.Lock() defer u.mu.Unlock() watched := u.index[user] delete(u.index, user) var orphans []string for peerID := range watched { found := false for _, peers := range u.index { if _, ok := peers[peerID]; ok { found = true break } } if !found { orphans = append(orphans, peerID) } } return orphans } // ── NATS command handler (observing side) ───────────────────────────────────── // HandleObserveNATSCommand processes a PEER_OBSERVE_EVENT received from oc-peer. func (ps *StreamService) HandleObserveNATSCommand(resp tools.NATSResponse) { log := oclib.GetLogger() var cmd ObserveCommand if err := json.Unmarshal(resp.Payload, &cmd); err != nil { log.Warn().Err(err).Msg("[observe] failed to unmarshal ObserveCommand") return } if cmd.CloseAll { log.Info().Msg("[observe] close-all received via NATS") ps.CloseAllObserves() return } if cmd.Close { for _, peerID := range cmd.PeerIDs { if isLast := ps.observeUsers.remove(cmd.User, peerID); isLast { if err := ps.closeObserveStream(peerID); err != nil { log.Warn().Str("peer", peerID).Err(err).Msg("[observe] closeObserveStream failed") } } } return } // Observe: open streams for any new peer, using the address from the payload. for _, p := range cmd.Peers { if isFirst := ps.observeUsers.add(cmd.User, p.PeerID); isFirst { if err := ps.openObserveStream(p); err != nil { // Roll back the index entry so the next NATS command can retry. ps.observeUsers.remove(cmd.User, p.PeerID) log.Warn().Str("peer", p.PeerID).Err(err).Msg("[observe] openObserveStream failed") } } } } // ── outgoing observe management (observing side) ────────────────────────────── // OpenObserveStream is the exported variant for inter-discovery propagation // (no user context available). It bypasses the user index and opens the stream // directly if not already open. func (ps *StreamService) OpenObserveStream(p ShallowPeer) error { return ps.openObserveStream(p) } // CloseObserveStream is the exported variant for inter-discovery propagation. func (ps *StreamService) CloseObserveStream(toPeerID string) error { return ps.closeObserveStream(toPeerID) } // openObserveStream opens a ProtocolObserve stream to p. // Uses p.StreamAddress directly; falls back to DB then DHT lookup if empty. func (ps *StreamService) openObserveStream(p ShallowPeer) error { streamAddr := p.StreamAddress fmt.Println("STREAM OBS", streamAddr) access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) res := access.Search(&dbs.Filters{ And: map[string][]dbs.Filter{ "peer_id": {{Operator: dbs.EQUAL.String(), Value: p.PeerID}}, }, }, "", false, 0, 1) if streamAddr == "" { // Fallback: DB then DHT. if len(res.Data) > 0 { streamAddr = res.Data[0].(*peer.Peer).StreamAddress } else if peers, err := ps.Node.GetPeerRecord(context.Background(), p.PeerID); err == nil && len(peers) > 0 { streamAddr = peers[0].StreamAddress } } if len(res.Data) > 0 && res.Data[0].(*peer.Peer).Relation == peer.SELF { return errors.New("Can't send to self") } fmt.Println("STREAM OBS SSS", streamAddr) if streamAddr == "" { return nil // can't resolve address — silently skip } decodedID, err := pp.Decode(p.PeerID) if err != nil { return err } // If a stream already exists, reuse it. ps.Mu.RLock() _, alreadyOpen := ps.Streams[ProtocolObserve][decodedID] ps.Mu.RUnlock() if alreadyOpen { return nil } ad, err := pp.AddrInfoFromString(streamAddr) if err != nil { return err } fmt.Println("TempStream OBSERVE", ad) if ps.Streams, err = common.TempStream(ps.Host, *ad, ProtocolObserve, p.ID, ps.Streams, protocols, &ps.Mu); err == nil { rawStream := ps.Streams[ProtocolObserve][ad.ID] if hbPayload, err := json.Marshal(ObserveRequest{Close: false}); err == nil { if err := json.NewEncoder(rawStream.Stream).Encode(common.NewEvent(ProtocolObserve, ps.Host.ID().String(), nil, "", hbPayload)); err != nil { fmt.Println("ERR") rawStream.Stream.Close() return err } s := &common.Stream{ Stream: rawStream.Stream, Expiry: time.Now().Add(365 * 24 * time.Hour), } ps.Mu.Lock() if ps.Streams[ProtocolObserve] == nil { ps.Streams[ProtocolObserve] = map[pp.ID]*common.Stream{} } ps.Streams[ProtocolObserve][ad.ID] = s ps.Mu.Unlock() go ps.readLoop(s, ad.ID, ProtocolObserve, &common.ProtocolInfo{PersistantStream: true}) } } else { return err } return nil } // closeObserveStream closes the ProtocolObserve stream to toPeerID and notifies // the remote side. The close event is wrapped in a common.Event so the remote's // persistent readLoop can decode and handle it (cancel the heartbeat goroutine). func (ps *StreamService) closeObserveStream(toPeerID string) error { decodedID, err := pp.Decode(toPeerID) if err != nil { return err } ps.Mu.Lock() if ps.Streams[ProtocolObserve] != nil { if s, ok := ps.Streams[ProtocolObserve][decodedID]; ok { closePayload, _ := json.Marshal(ObserveRequest{Close: true}) closeEvt := common.NewEvent(ProtocolObserve, ps.Host.ID().String(), nil, "", closePayload) _ = json.NewEncoder(s.Stream).Encode(closeEvt) s.Stream.Close() delete(ps.Streams[ProtocolObserve], decodedID) } } ps.Mu.Unlock() ps.observeMetrics.Delete(toPeerID) return nil } // CloseAllObserves closes every outgoing ProtocolObserve stream, clears the // user index, and enters drain mode for observeDrainDuration. func (ps *StreamService) CloseAllObserves() { ps.Mu.Lock() for _, s := range ps.Streams[ProtocolObserve] { closePayload, _ := json.Marshal(ObserveRequest{Close: true}) closeEvt := common.NewEvent(ProtocolObserve, ps.Host.ID().String(), nil, "", closePayload) _ = json.NewEncoder(s.Stream).Encode(closeEvt) s.Stream.Close() } delete(ps.Streams, ProtocolObserve) ps.Mu.Unlock() // Reset user index so stale ref-counts don't block future opens. ps.observeUsers = newUserPeerIndex() ps.observeMetrics.Range(func(k, _ any) bool { ps.observeMetrics.Delete(k) return true }) ps.drainMu.Lock() ps.drainUntil = time.Now().Add(observeDrainDuration) ps.drainMu.Unlock() }