Files
oc-discovery/daemons/node/stream/observe.go
T
2026-05-27 16:17:00 +02:00

673 lines
21 KiB
Go

package stream
import (
"context"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"oc-discovery/daemons/node/common"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/libp2p/go-libp2p/core/network"
pp "github.com/libp2p/go-libp2p/core/peer"
)
// ProtocolObserve is the libp2p protocol for peer connectivity observation.
// The requesting oc-discovery opens a stream to the remote oc-discovery and
// sends an ObserveRequest. The remote side keeps the stream open and writes
// ObserveHeartbeat events back every observeHBInterval seconds.
const ProtocolObserve = "/opencloud/peer/observe/1.0"
// observeHBEventType is used as the common.Event.Type for heartbeat responses.
const observeHBEventType = "/opencloud/peer/observe/heartbeat"
const observeHBInterval = 10 * time.Second
const observeDrainDuration = 30 * time.Second
// observeBatchWindow is the accumulation window before a heartbeat batch is
// flushed to NATS. All peer heartbeats received within this window are grouped
// into a single PEER_OBSERVE_RESPONSE_EVENT, reducing NATS traffic.
const observeBatchWindow = 2 * time.Second
// ObserveRequest is the first (and only) message sent by the observing side
// when opening a ProtocolObserve stream.
type ObserveRequest struct {
// Close, when true, asks the remote side to stop the heartbeat goroutine
// and remove the observer from its cache. Used for graceful teardown.
Close bool `json:"close,omitempty"`
}
// ObserveHeartbeat is sent by the observed side every observeHBInterval.
type ObserveHeartbeat struct {
State string `json:"state"` // always "online" when actively emitted
SentAt time.Time `json:"sent_at,omitempty"` // timestamp set by sender; lets receiver compute one-way latency
}
const (
maxLatencyMs = 2000.0 // ms above which latency score → 0
latencySamples = 5 // sliding window size for latency averaging
fastThresholdMs = 200.0 // below = "fast", above = "slow"
reliableThreshold = 0.95 // miss_rate below 5% = "reliable"
)
// PeerObserveMetrics accumulates connection-quality data for one observed peer.
// Updated on every incoming heartbeat (observing side).
type PeerObserveMetrics struct {
mu sync.Mutex
firstObservedAt time.Time
lastHeartbeatAt time.Time
received uint64
latencies [latencySamples]time.Duration
latIdx int
latCount int
}
func (m *PeerObserveMetrics) record(latency time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
m.received++
m.lastHeartbeatAt = time.Now().UTC()
m.latencies[m.latIdx%latencySamples] = latency
m.latIdx++
if m.latCount < latencySamples {
m.latCount++
}
}
func (m *PeerObserveMetrics) snapshot() PeerObserveSnapshot {
m.mu.Lock()
defer m.mu.Unlock()
var total time.Duration
for i := 0; i < m.latCount; i++ {
total += m.latencies[i]
}
var avgMs float64
if m.latCount > 0 {
avgMs = float64(total.Milliseconds()) / float64(m.latCount)
}
expected := int64(time.Duration(m.lastHeartbeatAt.Second()-m.firstObservedAt.Second()) / observeHBInterval)
fmt.Println("EXPECTED", expected, m.received)
var missRate float64
if expected > 0 {
recv := int64(m.received)
if recv > expected {
recv = expected
}
missRate = 1.0 - float64(recv)/float64(expected)
}
latScore := 1.0 - avgMs/maxLatencyMs
if latScore < 0 {
latScore = 0
}
relScore := 1.0 - missRate
trust := (0.35*latScore + 0.65*relScore) * 100
speed := "fast"
if avgMs >= fastThresholdMs {
speed = "slow"
}
reliability := "reliable"
if relScore < reliableThreshold {
reliability = "watch"
}
return PeerObserveSnapshot{
LatencyMs: avgMs,
Speed: speed,
Reliability: reliability,
TrustScore: trust,
LastSeenAt: m.lastHeartbeatAt,
MissRate: missRate,
}
}
// PeerObserveSnapshot is the point-in-time quality summary sent to oc-peer via NATS.
type PeerObserveSnapshot struct {
LatencyMs float64 `json:"latency_ms"`
Speed string `json:"speed"` // "fast" | "slow"
Reliability string `json:"reliability"` // "reliable" | "watch"
TrustScore float64 `json:"trust_score"`
LastSeenAt time.Time `json:"last_seen_at"`
MissRate float64 `json:"miss_rate"`
}
// ShallowPeer is the minimal peer representation sent by oc-peer in a
// PEER_OBSERVE_EVENT. StreamAddress lets oc-discovery connect without a DB
// lookup; Address carries the NATSAddress (unused here, forwarded as-is).
type ShallowPeer struct {
ID string `json:"id"`
PeerID string `json:"peer_id"`
Address string `json:"address"`
StreamAddress string `json:"stream_address"`
}
// ObserveCommand is the payload carried by a PEER_OBSERVE_EVENT NATS message
// (from oc-peer).
//
// Observe → User + Peers populated
// Close → User + PeerIDs + Close=true
// CloseAll → CloseAll=true (User optional)
type ObserveCommand struct {
User string `json:"user"`
Peers []ShallowPeer `json:"peers,omitempty"`
PeerIDs []string `json:"peer_ids,omitempty"`
Close bool `json:"close,omitempty"`
CloseAll bool `json:"close_all,omitempty"`
}
// ── observe cache (observed side) ────────────────────────────────────────────
// observeCache tracks running heartbeat goroutines keyed by the observing
// peer's libp2p PeerID string. It is used exclusively on the OBSERVED side.
type observeCache struct {
mu sync.Mutex
cancels map[string]context.CancelFunc
}
func newObserveCache() *observeCache {
return &observeCache{cancels: map[string]context.CancelFunc{}}
}
func (c *observeCache) set(pid string, cancel context.CancelFunc) {
c.mu.Lock()
defer c.mu.Unlock()
if old, ok := c.cancels[pid]; ok {
old() // cancel previous goroutine if any
}
c.cancels[pid] = cancel
}
func (c *observeCache) cancel(pid string) {
c.mu.Lock()
defer c.mu.Unlock()
if fn, ok := c.cancels[pid]; ok {
fn()
delete(c.cancels, pid)
}
}
func (c *observeCache) cancelAll() {
c.mu.Lock()
defer c.mu.Unlock()
for _, fn := range c.cancels {
fn()
}
c.cancels = map[string]context.CancelFunc{}
}
func (c *observeCache) delete(pid string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.cancels, pid)
}
// ── heartbeat batcher (observing side) ───────────────────────────────────────
// heartbeatBatcher accumulates peer_ids from incoming heartbeats over
// observeBatchWindow, then flushes them in a single NATS call.
// Using a map as the backing store deduplicates multiple heartbeats from the
// same peer within the same window (should not happen, but is harmless).
type heartbeatBatcher struct {
mu sync.Mutex
ids map[string]struct{}
timer *time.Timer
flush func(peerIDs []string)
}
func newHeartbeatBatcher(flush func([]string)) *heartbeatBatcher {
return &heartbeatBatcher{
ids: make(map[string]struct{}),
flush: flush,
}
}
// add records peerID in the current batch and arms the flush timer if needed.
func (b *heartbeatBatcher) add(peerID string) {
b.mu.Lock()
defer b.mu.Unlock()
b.ids[peerID] = struct{}{}
if b.timer == nil {
b.timer = time.AfterFunc(observeBatchWindow, b.fire)
}
}
// fire is called by the timer; it drains the batch and invokes flush.
func (b *heartbeatBatcher) fire() {
b.mu.Lock()
ids := make([]string, 0, len(b.ids))
for id := range b.ids {
ids = append(ids, id)
}
b.ids = make(map[string]struct{})
b.timer = nil
b.mu.Unlock()
if len(ids) > 0 {
b.flush(ids)
}
}
// flushObserveBatch is the flush function wired into the heartbeatBatcher.
// It emits two NATS messages:
// - PEER_OBSERVE_RESPONSE_EVENT → consumed by oc-peer (direct channel)
// - PROPALGATION_EVENT / PB_PROPAGATE → consumed by other oc-discovery nodes
func flushObserveBatch(peerIDs []string) {
payload, err := json.Marshal(map[string]interface{}{
"peer_ids": peerIDs,
"state": "online",
})
if err != nil {
return
}
// Direct notification to oc-peer.
tools.NewNATSCaller().SetNATSPub(tools.PEER_OBSERVE_RESPONSE_EVENT, tools.NATSResponse{
FromApp: "oc-discovery",
Datatype: tools.PEER,
Method: int(tools.PEER_OBSERVE_RESPONSE_EVENT),
Payload: payload,
})
// Broadcast to other oc-discovery nodes so they can forward to their
// local oc-peer if needed.
propPayload, err := json.Marshal(tools.PropalgationMessage{
DataType: int(tools.PEER),
Action: tools.PB_PROPAGATE,
Payload: payload,
})
if err != nil {
return
}
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-discovery",
Datatype: tools.PEER,
Method: int(tools.PROPALGATION_EVENT),
Payload: propPayload,
})
}
// ── incoming observe handler (observed side) ──────────────────────────────────
// handleIncomingObserve is called when a remote peer opens an observe stream
// to us (observed side). It starts a heartbeat goroutine that writes back on
// the same bidirectional rawStream — no separate reverse stream is opened.
// The goroutine stops via context cancellation (triggered by a close event
// read from rawStream) or when rawStream becomes unwritable.
func (s *StreamService) handleIncomingObserve(rawStream network.Stream) error {
remotePeerID := rawStream.Conn().RemotePeer().String()
log := oclib.GetLogger()
// Drain mode: reject any new observations for 30 s after a close-all.
s.drainMu.RLock()
draining := !s.drainUntil.IsZero() && time.Now().Before(s.drainUntil)
s.drainMu.RUnlock()
if draining {
fmt.Println("Draining")
return errors.New("draining")
}
// Guard: the requesting peer must not be blacklisted.
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
res := access.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"peer_id": {{Operator: dbs.EQUAL.String(), Value: remotePeerID}},
},
}, "", false, 0, 1)
if len(res.Data) > 0 {
p := res.Data[0].(*peer.Peer)
if p.Relation == peer.BLACKLIST {
fmt.Println("CLOSE blacklist or self")
return errors.New("can't observe blacklisted peer")
}
}
// Replace any existing heartbeat goroutine for this observer.
ctx, cancel := context.WithCancel(context.Background())
s.observeCache.set(remotePeerID, cancel)
fmt.Println("LOOP OBSERVE")
go func() {
// Do NOT close rawStream here: the persistent readLoop (HandleResponse)
// owns rawStream's lifecycle. We only stop writing.
defer cancel()
defer s.observeCache.delete(remotePeerID)
ticker := time.NewTicker(observeHBInterval)
defer ticker.Stop()
buildHBEvent := func() *common.Event {
p, _ := json.Marshal(ObserveHeartbeat{State: "online", SentAt: time.Now().UTC()})
return common.NewEvent(observeHBEventType, s.Host.ID().String(), nil, "", p)
}
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
rawStream.SetWriteDeadline(time.Now().Add(5 * time.Second))
evt := buildHBEvent()
if err := json.NewEncoder(rawStream).Encode(evt); err != nil {
log.Info().
Str("observer", remotePeerID).
Err(err).
Msg("[observe] heartbeat write failed — stream closed, stopping goroutine")
return
}
rawStream.SetWriteDeadline(time.Time{})
}
}
}()
return nil
}
// ── heartbeat receiver (observing side) ───────────────────────────────────────
// handleObserveHeartbeat is called by readLoop when a heartbeat event arrives
// on an outgoing ProtocolObserve stream. It updates per-peer metrics and flushes
// a quality snapshot to NATS.
func (ps *StreamService) handleObserveHeartbeat(evt *common.Event) error {
var hb ObserveHeartbeat
if err := json.Unmarshal(evt.Payload, &hb); err == nil && !hb.SentAt.IsZero() {
latency := time.Since(hb.SentAt)
raw, _ := ps.observeMetrics.LoadOrStore(evt.From, &PeerObserveMetrics{
firstObservedAt: time.Now().UTC(),
})
raw.(*PeerObserveMetrics).record(latency)
fmt.Println("METRICS", raw)
ps.observeMetrics.Store(evt.From, raw)
}
ps.flushObserveForPeer(evt.From, evt.User)
return nil
}
// flushObserveForPeer sends a PEER_OBSERVE_RESPONSE_EVENT to NATS with a quality
// snapshot for peerID. Replaces the old flushObserveBatch (single-peer variant).
func (ps *StreamService) flushObserveForPeer(peerID string, user string) {
var snap *PeerObserveSnapshot
if raw, ok := ps.observeMetrics.Load(peerID); ok {
fmt.Println("RETRIEVED METRICS", raw)
s := raw.(*PeerObserveMetrics).snapshot()
snap = &s
}
fmt.Println("RETRIEVED METRICS 2", snap)
payload, err := json.Marshal(map[string]interface{}{
"peer_ids": []string{peerID},
"state": "online",
"metrics": map[string]*PeerObserveSnapshot{peerID: snap},
})
if err != nil {
return
}
tools.NewNATSCaller().SetNATSPub(tools.PEER_OBSERVE_RESPONSE_EVENT, tools.NATSResponse{
FromApp: "oc-discovery",
Datatype: tools.PEER,
User: user,
Method: int(tools.PEER_OBSERVE_RESPONSE_EVENT),
Payload: payload,
})
propPayload, err := json.Marshal(tools.PropalgationMessage{
DataType: int(tools.PEER),
Action: tools.PB_PROPAGATE,
Payload: payload,
})
if err != nil {
return
}
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-discovery",
Datatype: tools.PEER,
User: user,
Method: int(tools.PROPALGATION_EVENT),
Payload: propPayload,
})
}
// ── user→peer index (ref-counted observe management) ─────────────────────────
// userPeerIndex tracks which users are observing which peers.
// A libp2p observe stream is kept open as long as at least one user watches
// the peer; it is closed only when the last user stops.
type userPeerIndex struct {
mu sync.Mutex
index map[string]map[string]struct{} // user → set of peer_id strings
}
func newUserPeerIndex() *userPeerIndex {
return &userPeerIndex{index: map[string]map[string]struct{}{}}
}
// add registers user as an observer of peerID.
// Returns true if peerID was not yet observed by any user (first observer).
func (u *userPeerIndex) add(user, peerID string) (isFirst bool) {
u.mu.Lock()
defer u.mu.Unlock()
// Count total observers for peerID across all users before adding.
total := 0
for _, peers := range u.index {
if _, ok := peers[peerID]; ok {
total++
}
}
if u.index[user] == nil {
u.index[user] = map[string]struct{}{}
}
u.index[user][peerID] = struct{}{}
return total == 0
}
// remove unregisters user from peerID.
// Returns true if no user is observing peerID anymore (last observer removed).
func (u *userPeerIndex) remove(user, peerID string) (isLast bool) {
u.mu.Lock()
defer u.mu.Unlock()
delete(u.index[user], peerID)
if len(u.index[user]) == 0 {
delete(u.index, user)
}
for _, peers := range u.index {
if _, ok := peers[peerID]; ok {
return false
}
}
return true
}
// removeUser removes all entries for user and returns the peer_ids that now
// have no remaining observers (i.e., those whose streams should be closed).
func (u *userPeerIndex) removeUser(user string) []string {
u.mu.Lock()
defer u.mu.Unlock()
watched := u.index[user]
delete(u.index, user)
var orphans []string
for peerID := range watched {
found := false
for _, peers := range u.index {
if _, ok := peers[peerID]; ok {
found = true
break
}
}
if !found {
orphans = append(orphans, peerID)
}
}
return orphans
}
// ── NATS command handler (observing side) ─────────────────────────────────────
// HandleObserveNATSCommand processes a PEER_OBSERVE_EVENT received from oc-peer.
func (ps *StreamService) HandleObserveNATSCommand(resp tools.NATSResponse) {
log := oclib.GetLogger()
var cmd ObserveCommand
if err := json.Unmarshal(resp.Payload, &cmd); err != nil {
log.Warn().Err(err).Msg("[observe] failed to unmarshal ObserveCommand")
return
}
if cmd.CloseAll {
log.Info().Msg("[observe] close-all received via NATS")
ps.CloseAllObserves()
return
}
if cmd.Close {
for _, peerID := range cmd.PeerIDs {
if isLast := ps.observeUsers.remove(cmd.User, peerID); isLast {
if err := ps.closeObserveStream(peerID); err != nil {
log.Warn().Str("peer", peerID).Err(err).Msg("[observe] closeObserveStream failed")
}
}
}
return
}
// Observe: open streams for any new peer, using the address from the payload.
for _, p := range cmd.Peers {
if isFirst := ps.observeUsers.add(cmd.User, p.PeerID); isFirst {
if err := ps.openObserveStream(p); err != nil {
// Roll back the index entry so the next NATS command can retry.
ps.observeUsers.remove(cmd.User, p.PeerID)
log.Warn().Str("peer", p.PeerID).Err(err).Msg("[observe] openObserveStream failed")
}
}
}
}
// ── outgoing observe management (observing side) ──────────────────────────────
// OpenObserveStream is the exported variant for inter-discovery propagation
// (no user context available). It bypasses the user index and opens the stream
// directly if not already open.
func (ps *StreamService) OpenObserveStream(p ShallowPeer) error {
return ps.openObserveStream(p)
}
// CloseObserveStream is the exported variant for inter-discovery propagation.
func (ps *StreamService) CloseObserveStream(toPeerID string) error {
return ps.closeObserveStream(toPeerID)
}
// openObserveStream opens a ProtocolObserve stream to p.
// Uses p.StreamAddress directly; falls back to DB then DHT lookup if empty.
func (ps *StreamService) openObserveStream(p ShallowPeer) error {
streamAddr := p.StreamAddress
fmt.Println("STREAM OBS", streamAddr)
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
res := access.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"peer_id": {{Operator: dbs.EQUAL.String(), Value: p.PeerID}},
},
}, "", false, 0, 1)
if streamAddr == "" {
// Fallback: DB then DHT.
if len(res.Data) > 0 {
streamAddr = res.Data[0].(*peer.Peer).StreamAddress
} else if peers, err := ps.Node.GetPeerRecord(context.Background(), p.PeerID); err == nil && len(peers) > 0 {
streamAddr = peers[0].StreamAddress
}
}
if len(res.Data) > 0 && res.Data[0].(*peer.Peer).Relation == peer.SELF {
return errors.New("Can't send to self")
}
fmt.Println("STREAM OBS SSS", streamAddr)
if streamAddr == "" {
return nil // can't resolve address — silently skip
}
decodedID, err := pp.Decode(p.PeerID)
if err != nil {
return err
}
// If a stream already exists, reuse it.
ps.Mu.RLock()
_, alreadyOpen := ps.Streams[ProtocolObserve][decodedID]
ps.Mu.RUnlock()
if alreadyOpen {
return nil
}
ad, err := pp.AddrInfoFromString(streamAddr)
if err != nil {
return err
}
fmt.Println("TempStream OBSERVE", ad)
if ps.Streams, err = common.TempStream(ps.Host, *ad, ProtocolObserve, p.ID, ps.Streams, protocols, &ps.Mu); err == nil {
rawStream := ps.Streams[ProtocolObserve][ad.ID]
if hbPayload, err := json.Marshal(ObserveRequest{Close: false}); err == nil {
if err := json.NewEncoder(rawStream.Stream).Encode(common.NewEvent(ProtocolObserve, ps.Host.ID().String(), nil, "", hbPayload)); err != nil {
fmt.Println("ERR")
rawStream.Stream.Close()
return err
}
s := &common.Stream{
Stream: rawStream.Stream,
Expiry: time.Now().Add(365 * 24 * time.Hour),
}
ps.Mu.Lock()
if ps.Streams[ProtocolObserve] == nil {
ps.Streams[ProtocolObserve] = map[pp.ID]*common.Stream{}
}
ps.Streams[ProtocolObserve][ad.ID] = s
ps.Mu.Unlock()
go ps.readLoop(s, ad.ID, ProtocolObserve, &common.ProtocolInfo{PersistantStream: true})
}
} else {
return err
}
return nil
}
// closeObserveStream closes the ProtocolObserve stream to toPeerID and notifies
// the remote side. The close event is wrapped in a common.Event so the remote's
// persistent readLoop can decode and handle it (cancel the heartbeat goroutine).
func (ps *StreamService) closeObserveStream(toPeerID string) error {
decodedID, err := pp.Decode(toPeerID)
if err != nil {
return err
}
ps.Mu.Lock()
if ps.Streams[ProtocolObserve] != nil {
if s, ok := ps.Streams[ProtocolObserve][decodedID]; ok {
closePayload, _ := json.Marshal(ObserveRequest{Close: true})
closeEvt := common.NewEvent(ProtocolObserve, ps.Host.ID().String(), nil, "", closePayload)
_ = json.NewEncoder(s.Stream).Encode(closeEvt)
s.Stream.Close()
delete(ps.Streams[ProtocolObserve], decodedID)
}
}
ps.Mu.Unlock()
ps.observeMetrics.Delete(toPeerID)
return nil
}
// CloseAllObserves closes every outgoing ProtocolObserve stream, clears the
// user index, and enters drain mode for observeDrainDuration.
func (ps *StreamService) CloseAllObserves() {
ps.Mu.Lock()
for _, s := range ps.Streams[ProtocolObserve] {
closePayload, _ := json.Marshal(ObserveRequest{Close: true})
closeEvt := common.NewEvent(ProtocolObserve, ps.Host.ID().String(), nil, "", closePayload)
_ = json.NewEncoder(s.Stream).Encode(closeEvt)
s.Stream.Close()
}
delete(ps.Streams, ProtocolObserve)
ps.Mu.Unlock()
// Reset user index so stale ref-counts don't block future opens.
ps.observeUsers = newUserPeerIndex()
ps.observeMetrics.Range(func(k, _ any) bool {
ps.observeMetrics.Delete(k)
return true
})
ps.drainMu.Lock()
ps.drainUntil = time.Now().Add(observeDrainDuration)
ps.drainMu.Unlock()
}