Files
oc-discovery/daemons/node/common/common_stream.go

305 lines
8.8 KiB
Go
Raw Normal View History

2026-01-30 16:57:36 +01:00
package common
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
pp "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
2026-04-08 10:04:41 +02:00
// InitialEventHops is the starting hop count for SWIM membership events.
// floor(log2(typical max-pool)) + 1 gives O(log n) propagation rounds.
const InitialEventHops = 4
const maxMemberEventQueue = 50
// MembershipEventQueue holds SWIM membership events to be piggybacked on
// outgoing heartbeats (infection-style dissemination). Bounded at
// maxMemberEventQueue entries; events are deduplicated by PeerID.
type MembershipEventQueue struct {
mu sync.Mutex
events []MemberEvent
}
// memberEventPriority maps event types to an integer so higher-severity
// events override lower-severity ones for the same PeerID.
func memberEventPriority(t MemberEventType) int {
switch t {
case MemberDead:
return 3
case MemberSuspect:
return 2
case MemberAlive:
return 1
}
return 0
}
// Add inserts or updates a membership event.
// An incoming event replaces the existing entry for the same PeerID when:
// - its Incarnation is higher, OR
// - the Incarnation is equal but the event type is higher-severity.
func (q *MembershipEventQueue) Add(e MemberEvent) {
q.mu.Lock()
defer q.mu.Unlock()
for i, ex := range q.events {
if ex.PeerID == e.PeerID {
if e.Incarnation > ex.Incarnation ||
(e.Incarnation == ex.Incarnation && memberEventPriority(e.Type) > memberEventPriority(ex.Type)) {
q.events[i] = e
}
return
}
}
if len(q.events) >= maxMemberEventQueue {
q.events = q.events[1:] // drop oldest
}
q.events = append(q.events, e)
}
// Drain returns up to max events ready for transmission.
// HopsLeft is decremented on each call; events that reach 0 are removed from
// the queue (they have already propagated enough rounds).
func (q *MembershipEventQueue) Drain(max int) []MemberEvent {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.events) == 0 {
return nil
}
out := make([]MemberEvent, 0, max)
kept := q.events[:0]
for _, e := range q.events {
if len(out) < max {
e.HopsLeft--
out = append(out, e)
if e.HopsLeft > 0 {
kept = append(kept, e)
}
// HopsLeft reached 0: event has propagated enough, drop from queue.
} else {
kept = append(kept, e)
}
}
q.events = kept
return out
}
// NodeEventQueue is the global SWIM event queue for the node side.
// Events are added on suspect/dead detection and drained into outgoing heartbeats.
var NodeEventQueue = &MembershipEventQueue{}
2026-03-11 16:28:15 +01:00
const (
ProtocolPublish = "/opencloud/record/publish/1.0"
ProtocolGet = "/opencloud/record/get/1.0"
2026-04-08 10:04:41 +02:00
ProtocolDelete = "/opencloud/record/delete/1.0"
// ProtocolIndirectProbe is opened by a node toward a live indexer to ask it
// to actively probe a suspected indexer on the node's behalf (SWIM indirect ping).
// It is the only inter-indexer protocol — indexers do not maintain persistent
// connections to each other; this stream is one-shot and short-lived.
ProtocolIndirectProbe = "/opencloud/indexer/probe/1.0"
2026-03-11 16:28:15 +01:00
)
2026-03-09 14:57:41 +01:00
2026-04-08 10:04:41 +02:00
// IndirectProbeRequest is sent by a node over ProtocolIndirectProbe.
// The receiving indexer must attempt to reach Target and report back.
type IndirectProbeRequest struct {
Target pp.AddrInfo `json:"target"`
}
// IndirectProbeResponse is the reply from the probing indexer.
type IndirectProbeResponse struct {
Reachable bool `json:"reachable"`
LatencyMs int64 `json:"latency_ms,omitempty"`
}
2026-03-11 16:28:15 +01:00
const ProtocolHeartbeat = "/opencloud/heartbeat/1.0"
2026-02-17 13:11:22 +01:00
2026-03-11 16:28:15 +01:00
// ProtocolWitnessQuery is opened by a node to ask a peer what it thinks of a given indexer.
const ProtocolWitnessQuery = "/opencloud/witness/1.0"
2026-03-09 14:57:41 +01:00
2026-03-11 16:28:15 +01:00
// ProtocolSearchPeer is opened by a node toward one of its indexers to start a
// distributed peer search. The stream stays open; the indexer writes
// SearchPeerResult JSON objects as results arrive from the GossipSub mesh.
const ProtocolSearchPeer = "/opencloud/search/peer/1.0"
2026-02-17 13:11:22 +01:00
2026-03-11 16:28:15 +01:00
// ProtocolSearchPeerResponse is opened by an indexer back toward the emitting
// indexer to deliver search results found in its referencedNodes.
const ProtocolSearchPeerResponse = "/opencloud/search/peer/response/1.0"
2026-02-17 13:11:22 +01:00
2026-03-11 16:28:15 +01:00
// ProtocolBandwidthProbe is a dedicated short-lived stream used exclusively
// for bandwidth/latency measurement. The handler echoes any bytes it receives.
// All nodes and indexers register this handler so peers can measure them.
const ProtocolBandwidthProbe = "/opencloud/probe/1.0"
2026-01-30 16:57:36 +01:00
type Stream struct {
2026-02-17 13:11:22 +01:00
Name string `json:"name"`
DID string `json:"did"`
Stream network.Stream
Expiry time.Time `json:"expiry"`
UptimeTracker *UptimeTracker
}
func (s *Stream) GetUptimeTracker() *UptimeTracker {
return s.UptimeTracker
2026-01-30 16:57:36 +01:00
}
func NewStream[T interface{}](s network.Stream, did string, record T) *Stream {
return &Stream{
DID: did,
Stream: s,
Expiry: time.Now().UTC().Add(2 * time.Minute),
}
}
2026-03-11 16:28:15 +01:00
type StreamRecord[T interface{}] struct {
DID string
HeartbeatStream *Stream
Record T
LastScore float64
2026-03-03 16:38:24 +01:00
}
2026-03-11 16:28:15 +01:00
func (s *StreamRecord[T]) GetUptimeTracker() *UptimeTracker {
if s.HeartbeatStream == nil {
2026-03-03 16:38:24 +01:00
return nil
2026-02-20 12:42:18 +01:00
}
2026-03-11 16:28:15 +01:00
return s.HeartbeatStream.UptimeTracker
2026-01-30 16:57:36 +01:00
}
2026-02-24 14:31:37 +01:00
type ProtocolInfo struct {
PersistantStream bool
WaitResponse bool
TTL time.Duration
}
func TempStream(h host.Host, ad pp.AddrInfo, proto protocol.ID, did string, streams ProtocolStream, pts map[protocol.ID]*ProtocolInfo, mu *sync.RWMutex) (ProtocolStream, error) {
expiry := 2 * time.Second
if pts[proto] != nil {
expiry = pts[proto].TTL
}
2026-03-11 16:28:15 +01:00
ctxTTL, cancelTTL := context.WithTimeout(context.Background(), expiry)
defer cancelTTL()
2026-03-03 16:38:24 +01:00
if h.Network().Connectedness(ad.ID) != network.Connected {
if err := h.Connect(ctxTTL, ad); err != nil {
return streams, err
}
}
2026-03-05 15:22:02 +01:00
2026-03-03 16:38:24 +01:00
if streams[proto] != nil && streams[proto][ad.ID] != nil {
return streams, nil
} else if s, err := h.NewStream(ctxTTL, ad.ID, proto); err == nil {
mu.Lock()
if streams[proto] == nil {
streams[proto] = map[pp.ID]*Stream{}
}
2026-03-03 16:38:24 +01:00
mu.Unlock()
time.AfterFunc(expiry, func() {
mu.Lock()
2026-03-03 16:38:24 +01:00
delete(streams[proto], ad.ID)
mu.Unlock()
2026-03-03 16:38:24 +01:00
})
mu.Lock()
streams[proto][ad.ID] = &Stream{
DID: did,
Stream: s,
Expiry: time.Now().UTC().Add(expiry),
}
2026-03-03 16:38:24 +01:00
mu.Unlock()
return streams, nil
} else {
return streams, err
}
}
2026-02-03 15:25:15 +01:00
func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.AddrInfo,
2026-03-11 16:28:15 +01:00
hb Heartbeat, ps ProtocolStream, interval time.Duration) (*HeartbeatResponse, time.Duration, error) {
2026-03-03 16:38:24 +01:00
logger := oclib.GetLogger()
if ps[proto] == nil {
ps[proto] = map[pp.ID]*Stream{}
2026-01-30 16:57:36 +01:00
}
2026-03-03 16:38:24 +01:00
streams := ps[proto]
2026-01-30 16:57:36 +01:00
pss, exists := streams[p.ID]
2026-03-03 16:38:24 +01:00
ctxTTL, cancel := context.WithTimeout(ctx, 3*interval)
defer cancel()
2026-01-30 16:57:36 +01:00
if h.Network().Connectedness(p.ID) != network.Connected {
if err := h.Connect(ctxTTL, *p); err != nil {
2026-03-03 16:38:24 +01:00
logger.Err(err)
2026-03-11 16:28:15 +01:00
return nil, 0, err
}
2026-03-11 16:28:15 +01:00
exists = false
2026-01-30 16:57:36 +01:00
}
if !exists || pss.Stream == nil {
2026-03-03 16:38:24 +01:00
logger.Info().Msg("New Stream engaged as Heartbeat " + fmt.Sprintf("%v", proto) + " " + p.ID.String())
2026-01-30 16:57:36 +01:00
s, err := h.NewStream(ctx, p.ID, proto)
if err != nil {
2026-03-17 11:57:22 +01:00
logger.Err(err).Msg(err.Error())
2026-03-11 16:28:15 +01:00
return nil, 0, err
2026-01-30 16:57:36 +01:00
}
pss = &Stream{
Stream: s,
Expiry: time.Now().UTC().Add(2 * time.Minute),
}
streams[p.ID] = pss
}
2026-03-11 16:28:15 +01:00
sentAt := time.Now()
if err := json.NewEncoder(pss.Stream).Encode(&hb); err != nil {
2026-01-30 16:57:36 +01:00
pss.Stream.Close()
2026-03-11 16:28:15 +01:00
pss.Stream = nil
return nil, 0, err
2026-01-30 16:57:36 +01:00
}
pss.Expiry = time.Now().UTC().Add(2 * time.Minute)
2026-03-11 16:28:15 +01:00
// Try to read a response (indexers that support bidirectional heartbeat respond).
pss.Stream.SetReadDeadline(time.Now().Add(5 * time.Second))
var resp HeartbeatResponse
rtt := time.Since(sentAt)
if err := json.NewDecoder(pss.Stream).Decode(&resp); err == nil {
rtt = time.Since(sentAt)
pss.Stream.SetReadDeadline(time.Time{})
return &resp, rtt, nil
}
pss.Stream.SetReadDeadline(time.Time{})
return nil, rtt, nil
}
func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, force bool, onStreamCreated *func(network.Stream)) ProtocolStream {
logger := oclib.GetLogger()
if onStreamCreated == nil {
f := func(s network.Stream) {
protoS[proto][id] = &Stream{
Stream: s,
Expiry: time.Now().UTC().Add(2 * time.Minute),
}
}
onStreamCreated = &f
}
f := *onStreamCreated
if mypid > id || force {
if ctx == nil {
c := context.Background()
ctx = &c
}
if protoS[proto] == nil {
protoS[proto] = map[pp.ID]*Stream{}
}
if protoS[proto][id] != nil {
protoS[proto][id].Expiry = time.Now().Add(2 * time.Minute)
} else {
logger.Info().Msg("NEW STREAM Generated" + fmt.Sprintf("%v", proto) + " " + id.String())
s, err := h.NewStream(*ctx, id, proto)
if err != nil {
panic(err.Error())
}
f(s)
}
}
return protoS
2026-01-30 16:57:36 +01:00
}