Files
oc-discovery/daemons/node/common/common_stream.go
2026-03-09 14:57:41 +01:00

1007 lines
32 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package common
import (
"context"
cr "crypto/rand"
"encoding/json"
"errors"
"fmt"
"io"
"math/rand"
"net"
"oc-discovery/conf"
"slices"
"strings"
"sync"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
pp "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
type LongLivedStreamRecordedService[T interface{}] struct {
*LongLivedPubSubService
StreamRecords map[protocol.ID]map[pp.ID]*StreamRecord[T]
StreamMU sync.RWMutex
maxNodesConn int
// AfterHeartbeat is called after each successful heartbeat with the full
// decoded Heartbeat so the hook can use the fresh embedded PeerRecord.
AfterHeartbeat func(hb *Heartbeat)
// AfterDelete is called after gc() evicts an expired peer, outside the lock.
// name and did may be empty if the HeartbeatStream had no metadata.
AfterDelete func(pid pp.ID, name string, did string)
}
func (ix *LongLivedStreamRecordedService[T]) MaxNodesConn() int {
return ix.maxNodesConn
}
func NewStreamRecordedService[T interface{}](h host.Host, maxNodesConn int) *LongLivedStreamRecordedService[T] {
service := &LongLivedStreamRecordedService[T]{
LongLivedPubSubService: NewLongLivedPubSubService(h),
StreamRecords: map[protocol.ID]map[pp.ID]*StreamRecord[T]{},
maxNodesConn: maxNodesConn,
}
go service.StartGC(30 * time.Second)
// Garbage collection is needed on every Map of Long-Lived Stream... it may be a top level redesigned
go service.Snapshot(1 * time.Hour)
return service
}
func (ix *LongLivedStreamRecordedService[T]) StartGC(interval time.Duration) {
go func() {
t := time.NewTicker(interval)
defer t.Stop()
for range t.C {
ix.gc()
}
}()
}
func (ix *LongLivedStreamRecordedService[T]) gc() {
ix.StreamMU.Lock()
now := time.Now().UTC()
if ix.StreamRecords[ProtocolHeartbeat] == nil {
ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{}
ix.StreamMU.Unlock()
return
}
streams := ix.StreamRecords[ProtocolHeartbeat]
type gcEntry struct {
pid pp.ID
name string
did string
}
var evicted []gcEntry
for pid, rec := range streams {
if now.After(rec.HeartbeatStream.Expiry) || now.Sub(rec.HeartbeatStream.UptimeTracker.LastSeen) > 2*rec.HeartbeatStream.Expiry.Sub(now) {
name, did := "", ""
if rec.HeartbeatStream != nil {
name = rec.HeartbeatStream.Name
did = rec.HeartbeatStream.DID
}
evicted = append(evicted, gcEntry{pid, name, did})
for _, sstreams := range ix.StreamRecords {
if sstreams[pid] != nil {
delete(sstreams, pid)
}
}
}
}
ix.StreamMU.Unlock()
if ix.AfterDelete != nil {
for _, e := range evicted {
ix.AfterDelete(e.pid, e.name, e.did)
}
}
}
func (ix *LongLivedStreamRecordedService[T]) Snapshot(interval time.Duration) {
go func() {
logger := oclib.GetLogger()
t := time.NewTicker(interval)
defer t.Stop()
for range t.C {
infos := ix.snapshot()
for _, inf := range infos {
logger.Info().Msg(" -> " + inf.DID)
}
}
}()
}
// -------- Snapshot / Query --------
func (ix *LongLivedStreamRecordedService[T]) snapshot() []*StreamRecord[T] {
ix.StreamMU.Lock()
defer ix.StreamMU.Unlock()
out := make([]*StreamRecord[T], 0, len(ix.StreamRecords))
for _, streams := range ix.StreamRecords {
for _, stream := range streams {
out = append(out, stream)
}
}
return out
}
func (ix *LongLivedStreamRecordedService[T]) HandleHeartbeat(s network.Stream) {
logger := oclib.GetLogger()
defer s.Close()
dec := json.NewDecoder(s)
for {
ix.StreamMU.Lock()
if ix.StreamRecords[ProtocolHeartbeat] == nil {
ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{}
}
streams := ix.StreamRecords[ProtocolHeartbeat]
streamsAnonym := map[pp.ID]HeartBeatStreamed{}
for k, v := range streams {
streamsAnonym[k] = v
}
ix.StreamMU.Unlock()
pid, hb, err := CheckHeartbeat(ix.Host, s, dec, streamsAnonym, &ix.StreamMU, ix.maxNodesConn)
if err != nil {
// Stream-level errors (EOF, reset, closed) mean the connection is gone
// — exit so the goroutine doesn't spin forever on a dead stream.
// Metric/policy errors (score too low, too many connections) are transient
// — those are also stream-terminal since the stream carries one session.
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) ||
strings.Contains(err.Error(), "reset") ||
strings.Contains(err.Error(), "closed") ||
strings.Contains(err.Error(), "too many connections") {
logger.Info().Err(err).Msg("heartbeat stream terminated, closing handler")
return
}
logger.Warn().Err(err).Msg("heartbeat check failed, retrying on same stream")
continue
}
ix.StreamMU.Lock()
// if record already seen update last seen
if rec, ok := streams[*pid]; ok {
rec.DID = hb.DID
// Preserve the existing UptimeTracker so TotalOnline accumulates correctly.
// hb.Stream is a fresh Stream with no UptimeTracker; carry the old one over.
oldTracker := rec.GetUptimeTracker()
rec.HeartbeatStream = hb.Stream
if oldTracker != nil {
rec.HeartbeatStream.UptimeTracker = oldTracker
} else {
rec.HeartbeatStream.UptimeTracker = &UptimeTracker{FirstSeen: time.Now().UTC()}
}
rec.HeartbeatStream.UptimeTracker.RecordHeartbeat()
rec.LastScore = hb.Score
logger.Info().Msg("A new node is updated : " + pid.String())
} else {
tracker := &UptimeTracker{FirstSeen: time.Now().UTC()}
tracker.RecordHeartbeat()
hb.Stream.UptimeTracker = tracker
streams[*pid] = &StreamRecord[T]{
DID: hb.DID,
HeartbeatStream: hb.Stream,
LastScore: hb.Score,
}
logger.Info().Msg("A new node is subscribed : " + pid.String())
}
ix.StreamMU.Unlock()
// Enrich hb.DID before calling the hook: nodes never set hb.DID directly;
// extract it from the embedded signed PeerRecord if available, then fall
// back to the DID stored by handleNodePublish in the stream record.
if hb.DID == "" && len(hb.Record) > 0 {
var partial struct {
DID string `json:"did"`
}
if json.Unmarshal(hb.Record, &partial) == nil && partial.DID != "" {
hb.DID = partial.DID
}
}
if hb.DID == "" {
ix.StreamMU.RLock()
if rec, ok := streams[*pid]; ok {
hb.DID = rec.DID
}
ix.StreamMU.RUnlock()
}
if ix.AfterHeartbeat != nil && hb.DID != "" {
ix.AfterHeartbeat(hb)
}
}
}
func CheckHeartbeat(h host.Host, s network.Stream, dec *json.Decoder, streams map[pp.ID]HeartBeatStreamed, lock *sync.RWMutex, maxNodes int) (*pp.ID, *Heartbeat, error) {
if len(h.Network().Peers()) >= maxNodes {
return nil, nil, fmt.Errorf("too many connections, try another indexer")
}
var hb Heartbeat
if err := dec.Decode(&hb); err != nil {
return nil, nil, err
}
_, bpms, latencyScore, _ := getBandwidthChallengeRate(h, s.Conn().RemotePeer(), MinPayloadChallenge+int(rand.Float64()*(MaxPayloadChallenge-MinPayloadChallenge)))
{
pid, err := pp.Decode(hb.PeerID)
if err != nil {
return nil, nil, err
}
uptimeRatio := float64(0)
age := time.Duration(0)
lock.Lock()
if rec, ok := streams[pid]; ok && rec.GetUptimeTracker() != nil {
uptimeRatio = rec.GetUptimeTracker().UptimeRatio()
age = rec.GetUptimeTracker().Uptime()
}
lock.Unlock()
// E: measure the indexer's own subnet diversity, not the node's view.
diversity := getOwnDiversityRate(h)
// fillRate: fraction of indexer capacity used — higher = more peers trust this indexer.
fillRate := 0.0
if maxNodes > 0 {
fillRate = float64(len(h.Network().Peers())) / float64(maxNodes)
if fillRate > 1 {
fillRate = 1
}
}
hb.ComputeIndexerScore(uptimeRatio, bpms, diversity, latencyScore, fillRate)
// B: dynamic minScore — starts at 20% for brand-new peers, ramps to 80% at 24h.
minScore := dynamicMinScore(age)
if hb.Score < minScore {
return nil, nil, errors.New("not enough trusting value")
}
hb.Stream = &Stream{
Name: hb.Name,
DID: hb.DID,
Stream: s,
Expiry: time.Now().UTC().Add(2 * time.Minute),
} // here is the long-lived bidirectional heartbeat.
return &pid, &hb, err
}
}
func getDiversityRate(h host.Host, peers []string) float64 {
peers, _ = checkPeers(h, peers)
diverse := []string{}
for _, p := range peers {
ip, err := ExtractIP(p)
if err != nil {
continue
}
div := ip.Mask(net.CIDRMask(24, 32)).String()
if !slices.Contains(diverse, div) {
diverse = append(diverse, div)
}
}
if len(diverse) == 0 || len(peers) == 0 {
return 1
}
return float64(len(diverse)) / float64(len(peers))
}
// getOwnDiversityRate measures subnet /24 diversity of the indexer's own connected peers.
// This evaluates the indexer's network position rather than the connecting node's topology.
func getOwnDiversityRate(h host.Host) float64 {
diverse := map[string]struct{}{}
total := 0
for _, pid := range h.Network().Peers() {
for _, maddr := range h.Peerstore().Addrs(pid) {
total++
ip, err := ExtractIP(maddr.String())
if err != nil {
continue
}
diverse[ip.Mask(net.CIDRMask(24, 32)).String()] = struct{}{}
}
}
if total == 0 {
return 1
}
return float64(len(diverse)) / float64(total)
}
// dynamicMinScore returns the minimum acceptable score for a peer, starting
// permissive (20%) for brand-new peers and hardening linearly to 80% over 24h.
// This prevents ejecting newcomers in fresh networks while filtering parasites.
func dynamicMinScore(age time.Duration) float64 {
hours := age.Hours()
score := 20.0 + 60.0*(hours/24.0)
if score > 80.0 {
score = 80.0
}
return score
}
func checkPeers(h host.Host, peers []string) ([]string, []string) {
concretePeer := []string{}
ips := []string{}
for _, p := range peers {
ad, err := pp.AddrInfoFromString(p)
if err != nil {
continue
}
if PeerIsAlive(h, *ad) {
concretePeer = append(concretePeer, p)
if ip, err := ExtractIP(p); err == nil {
ips = append(ips, ip.Mask(net.CIDRMask(24, 32)).String())
}
}
}
return concretePeer, ips
}
const MaxExpectedMbps = 100.0
const MinPayloadChallenge = 512
const MaxPayloadChallenge = 2048
const BaseRoundTrip = 400 * time.Millisecond
// getBandwidthChallengeRate opens a dedicated ProtocolBandwidthProbe stream to
// remotePeer, sends a random payload, reads the echo, and computes throughput
// and a latency score. Returns (ok, bpms, latencyScore, error).
// latencyScore is 1.0 when RTT is very fast and 0.0 when at or beyond maxRoundTrip.
// Using a separate stream avoids mixing binary data on the JSON heartbeat stream
// and ensures the echo handler is actually running on the remote side.
func getBandwidthChallengeRate(h host.Host, remotePeer pp.ID, payloadSize int) (bool, float64, float64, error) {
payload := make([]byte, payloadSize)
if _, err := cr.Read(payload); err != nil {
return false, 0, 0, err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
s, err := h.NewStream(ctx, remotePeer, ProtocolBandwidthProbe)
if err != nil {
return false, 0, 0, err
}
defer s.Reset()
s.SetDeadline(time.Now().Add(10 * time.Second))
start := time.Now()
if _, err = s.Write(payload); err != nil {
return false, 0, 0, err
}
s.CloseWrite()
// Half-close the write side so the handler's io.Copy sees EOF and stops.
// Read the echo.
response := make([]byte, payloadSize)
if _, err = io.ReadFull(s, response); err != nil {
return false, 0, 0, err
}
duration := time.Since(start)
maxRoundTrip := BaseRoundTrip + (time.Duration(payloadSize) * (100 * time.Millisecond))
mbps := float64(payloadSize*8) / duration.Seconds() / 1e6
// latencyScore: 1.0 = instant, 0.0 = at maxRoundTrip or beyond.
latencyScore := 1.0 - float64(duration)/float64(maxRoundTrip)
if latencyScore < 0 {
latencyScore = 0
}
if latencyScore > 1 {
latencyScore = 1
}
if duration > maxRoundTrip || mbps < 5.0 {
return false, float64(mbps / MaxExpectedMbps), latencyScore, nil
}
return true, float64(mbps / MaxExpectedMbps), latencyScore, nil
}
type UptimeTracker struct {
FirstSeen time.Time
LastSeen time.Time
TotalOnline time.Duration
}
// RecordHeartbeat accumulates online time gap-aware: only counts the interval if
// the gap since the last heartbeat is within 2× the recommended interval (i.e. no
// extended outage). Call this each time a heartbeat is successfully processed.
func (u *UptimeTracker) RecordHeartbeat() {
now := time.Now().UTC()
if !u.LastSeen.IsZero() {
gap := now.Sub(u.LastSeen)
if gap <= 2*RecommendedHeartbeatInterval {
u.TotalOnline += gap
}
}
u.LastSeen = now
}
func (u *UptimeTracker) Uptime() time.Duration {
return time.Since(u.FirstSeen)
}
// UptimeRatio returns the fraction of tracked lifetime during which the peer was
// continuously online (gap ≤ 2×RecommendedHeartbeatInterval). Returns 0 before
// the first heartbeat interval has elapsed.
func (u *UptimeTracker) UptimeRatio() float64 {
total := time.Since(u.FirstSeen)
if total <= 0 {
return 0
}
ratio := float64(u.TotalOnline) / float64(total)
if ratio > 1 {
ratio = 1
}
return ratio
}
func (u *UptimeTracker) IsEligible(min time.Duration) bool {
return u.Uptime() >= min
}
type StreamRecord[T interface{}] struct {
DID string
HeartbeatStream *Stream
Record T
LastScore float64
}
func (s *StreamRecord[T]) GetUptimeTracker() *UptimeTracker {
if s.HeartbeatStream == nil {
return nil
}
return s.HeartbeatStream.UptimeTracker
}
type Stream struct {
Name string `json:"name"`
DID string `json:"did"`
Stream network.Stream
Expiry time.Time `json:"expiry"`
UptimeTracker *UptimeTracker
}
func (s *Stream) GetUptimeTracker() *UptimeTracker {
return s.UptimeTracker
}
func NewStream[T interface{}](s network.Stream, did string, record T) *Stream {
return &Stream{
DID: did,
Stream: s,
Expiry: time.Now().UTC().Add(2 * time.Minute),
}
}
type ProtocolStream map[protocol.ID]map[pp.ID]*Stream
func (ps ProtocolStream) Get(protocol protocol.ID) map[pp.ID]*Stream {
if ps[protocol] == nil {
ps[protocol] = map[pp.ID]*Stream{}
}
return ps[protocol]
}
func (ps ProtocolStream) Add(protocol protocol.ID, peerID *pp.ID, s *Stream) error {
if ps[protocol] == nil {
ps[protocol] = map[pp.ID]*Stream{}
}
if peerID != nil {
if s != nil {
ps[protocol][*peerID] = s
} else {
return errors.New("unable to add stream : stream missing")
}
}
return nil
}
func (ps ProtocolStream) Delete(protocol protocol.ID, peerID *pp.ID) {
if streams, ok := ps[protocol]; ok {
if peerID != nil && streams[*peerID] != nil {
streams[*peerID].Stream.Close()
delete(streams, *peerID)
} else {
for _, s := range ps {
for _, v := range s {
v.Stream.Close()
}
}
delete(ps, protocol)
}
}
}
const (
ProtocolPublish = "/opencloud/record/publish/1.0"
ProtocolGet = "/opencloud/record/get/1.0"
)
var TimeWatcher time.Time
// IndexerRecord holds admission metadata for an indexer in the pool.
// AdmittedAt is zero for seed entries (IndexerAddresses) never validated by a native.
// It is set to the admission time when a native confirms the indexer via consensus.
type IndexerRecord struct {
AdmittedAt time.Time
}
// IsStableVoter returns true when this indexer has been admitted by a native
// long enough ago to participate as a voter in Phase 2 liveness voting.
func (r *IndexerRecord) IsStableVoter() bool {
return !r.AdmittedAt.IsZero() && time.Since(r.AdmittedAt) >= MinStableAge
}
var StaticIndexers map[string]*pp.AddrInfo = map[string]*pp.AddrInfo{}
// StaticIndexerMeta mirrors StaticIndexers with admission metadata.
// Both maps are always updated together under StreamMuIndexes.
var StaticIndexerMeta map[string]*IndexerRecord = map[string]*IndexerRecord{}
var StreamMuIndexes sync.RWMutex
var StreamIndexers ProtocolStream = ProtocolStream{}
// indexerHeartbeatNudge allows replenishIndexersFromNative to trigger an immediate
// heartbeat tick after adding new entries to StaticIndexers, without waiting up
// to 20s for the regular ticker. Buffered(1) so the sender never blocks.
var indexerHeartbeatNudge = make(chan struct{}, 1)
// NudgeIndexerHeartbeat signals the indexer heartbeat goroutine to fire immediately.
func NudgeIndexerHeartbeat() {
select {
case indexerHeartbeatNudge <- struct{}{}:
default: // nudge already pending, skip
}
}
func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID, recordFn ...func() json.RawMessage) error {
TimeWatcher = time.Now().UTC()
logger := oclib.GetLogger()
// If native addresses are configured, get the indexer pool from the native mesh,
// then start the long-lived heartbeat goroutine toward those indexers.
if conf.GetConfig().NativeIndexerAddresses != "" {
if err := ConnectToNatives(h, minIndexer, maxIndexer, myPID); err != nil {
return err
}
// Step 2: start the long-lived heartbeat goroutine toward the indexer pool.
// replaceStaticIndexers/replenishIndexersFromNative update the map in-place
// so this single goroutine follows all pool changes automatically.
logger.Info().Msg("[native] step 2 — starting long-lived heartbeat to indexer pool")
SendHeartbeat(context.Background(), ProtocolHeartbeat, conf.GetConfig().Name,
h, StreamIndexers, StaticIndexers, &StreamMuIndexes, 20*time.Second, recordFn...)
return nil
}
// No native configured: bootstrap from IndexerAddresses seed set.
addresses := strings.Split(conf.GetConfig().IndexerAddresses, ",")
if len(addresses) > maxIndexer {
addresses = addresses[0:maxIndexer]
}
StreamMuIndexes.Lock()
for _, indexerAddr := range addresses {
indexerAddr = strings.TrimSpace(indexerAddr)
if indexerAddr == "" {
continue
}
ad, err := pp.AddrInfoFromString(indexerAddr)
if err != nil {
logger.Err(err)
continue
}
// AdmittedAt zero = seed, not yet validated by a native.
StaticIndexers[indexerAddr] = ad
StaticIndexerMeta[indexerAddr] = &IndexerRecord{}
}
indexerCount := len(StaticIndexers)
StreamMuIndexes.Unlock()
if indexerCount < minIndexer {
return errors.New("you run a node without indexers... your gonna be isolated.")
}
// Start long-lived heartbeat to seed indexers. The single goroutine follows
// all subsequent StaticIndexers changes (including after native discovery).
SendHeartbeat(context.Background(), ProtocolHeartbeat, conf.GetConfig().Name,
h, StreamIndexers, StaticIndexers, &StreamMuIndexes, 20*time.Second, recordFn...)
// Async: ask seed indexers whether they know a native — same logic as
// replenishNativesFromPeers. Runs after a short delay to let h.Connect warm up.
go func() {
time.Sleep(2 * time.Second)
logger.Info().Msg("[startup] no native configured — asking seed indexers for native addresses")
newAddr := fetchNativeFromIndexers(h, nil)
if newAddr == "" {
logger.Info().Msg("[startup] no native found from seed indexers — pure indexer mode")
return
}
ad, err := pp.AddrInfoFromString(newAddr)
if err != nil {
return
}
logger.Info().Str("addr", newAddr).Msg("[startup] native discovered via seed indexers — bootstrapping")
StreamNativeMu.Lock()
StaticNatives[newAddr] = ad
StreamNativeMu.Unlock()
// Full native bootstrap: fetch pool, run consensus, replace StaticIndexers
// with properly admitted records (AdmittedAt set).
if err := ConnectToNatives(h, minIndexer, maxIndexer, myPID); err != nil {
logger.Warn().Err(err).Msg("[startup] native bootstrap failed after discovery")
}
}()
return nil
}
func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, force bool, onStreamCreated *func(network.Stream)) ProtocolStream {
logger := oclib.GetLogger()
if onStreamCreated == nil {
f := func(s network.Stream) {
protoS[proto][id] = &Stream{
Stream: s,
Expiry: time.Now().UTC().Add(2 * time.Minute),
}
}
onStreamCreated = &f
}
f := *onStreamCreated
if mypid > id || force {
if ctx == nil {
c := context.Background()
ctx = &c
}
if protoS[proto] == nil {
protoS[proto] = map[pp.ID]*Stream{}
}
if protoS[proto][id] != nil {
protoS[proto][id].Expiry = time.Now().Add(2 * time.Minute)
} else {
logger.Info().Msg("NEW STREAM Generated" + fmt.Sprintf("%v", proto) + " " + id.String())
s, err := h.NewStream(*ctx, id, proto)
if err != nil {
panic(err.Error())
}
f(s)
}
}
return protoS
}
type Heartbeat struct {
Name string `json:"name"`
Stream *Stream `json:"stream"`
DID string `json:"did"`
PeerID string `json:"peer_id"`
Timestamp int64 `json:"timestamp"`
IndexersBinded []string `json:"indexers_binded"`
Score float64
// Record carries a fresh signed PeerRecord (JSON) so the receiving indexer
// can republish it to the DHT without an extra round-trip.
// Only set by nodes (not indexers heartbeating other indexers).
Record json.RawMessage `json:"record,omitempty"`
}
// ComputeIndexerScore computes a composite quality score [0, 100] for the connecting peer.
// - uptimeRatio: fraction of tracked lifetime online (gap-aware) — peer reliability
// - bpms: bandwidth normalized to MaxExpectedMbps — link capacity
// - diversity: indexer's own /24 subnet diversity — network topology quality
// - latencyScore: 1 - RTT/maxRoundTrip — link responsiveness
// - fillRate: fraction of indexer slots used (0=empty, 1=full) — collective trust signal:
// a fuller indexer has been chosen and retained by many peers, which is evidence of quality.
func (hb *Heartbeat) ComputeIndexerScore(uptimeRatio float64, bpms float64, diversity float64, latencyScore float64, fillRate float64) {
hb.Score = ((0.20 * uptimeRatio) +
(0.20 * bpms) +
(0.20 * diversity) +
(0.15 * latencyScore) +
(0.25 * fillRate)) * 100
}
type HeartbeatInfo []struct {
Info []byte `json:"info"`
}
const ProtocolHeartbeat = "/opencloud/heartbeat/1.0"
// ProtocolBandwidthProbe is a dedicated short-lived stream used exclusively
// for bandwidth/latency measurement. The handler echoes any bytes it receives.
// All nodes and indexers register this handler so peers can measure them.
const ProtocolBandwidthProbe = "/opencloud/probe/1.0"
// HandleBandwidthProbe echoes back everything written on the stream, then closes.
// It is registered by all participants so the measuring side (the heartbeat receiver)
// can open a dedicated probe stream and read the round-trip latency + throughput.
func HandleBandwidthProbe(s network.Stream) {
defer s.Close()
s.SetDeadline(time.Now().Add(10 * time.Second))
io.Copy(s, s) // echo every byte back to the sender
}
// SendHeartbeat starts a goroutine that sends periodic heartbeats to peers.
// recordFn, when provided, is called on each tick and its output is embedded in
// the heartbeat as a fresh signed PeerRecord so the receiving indexer can
// republish it to the DHT without an extra round-trip.
// Pass no recordFn (or nil) for indexer→indexer / native heartbeats.
func SendHeartbeat(ctx context.Context, proto protocol.ID, name string, h host.Host, ps ProtocolStream, peers map[string]*pp.AddrInfo, mu *sync.RWMutex, interval time.Duration, recordFn ...func() json.RawMessage) {
logger := oclib.GetLogger()
// isIndexerHB is true when this goroutine drives the indexer heartbeat.
// isNativeHB is true when it drives the native heartbeat.
isIndexerHB := mu == &StreamMuIndexes
isNativeHB := mu == &StreamNativeMu
var recFn func() json.RawMessage
if len(recordFn) > 0 {
recFn = recordFn[0]
}
go func() {
logger.Info().Str("proto", string(proto)).Int("peers", len(peers)).Msg("heartbeat started")
t := time.NewTicker(interval)
defer t.Stop()
// doTick sends one round of heartbeats to the current peer snapshot.
doTick := func() {
// Build the heartbeat payload — snapshot current indexer addresses.
StreamMuIndexes.RLock()
addrs := make([]string, 0, len(StaticIndexers))
for addr := range StaticIndexers {
addrs = append(addrs, addr)
}
StreamMuIndexes.RUnlock()
hb := Heartbeat{
Name: name,
PeerID: h.ID().String(),
Timestamp: time.Now().UTC().Unix(),
IndexersBinded: addrs,
}
if recFn != nil {
hb.Record = recFn()
}
// Snapshot the peer list under a read lock so we don't hold the
// write lock during network I/O.
if mu != nil {
mu.RLock()
}
snapshot := make([]*pp.AddrInfo, 0, len(peers))
for _, ix := range peers {
snapshot = append(snapshot, ix)
}
if mu != nil {
mu.RUnlock()
}
for _, ix := range snapshot {
wasConnected := h.Network().Connectedness(ix.ID) == network.Connected
StreamNativeMu.RLock()
hasNative := len(StaticNatives) > 0
StreamNativeMu.RUnlock()
if err := sendHeartbeat(ctx, h, proto, ix, hb, ps, interval*time.Second); err != nil {
// Step 3: heartbeat failed — remove from pool and trigger replenish.
logger.Info().Str("peer", ix.ID.String()).Str("proto", string(proto)).Msg("[native] step 3 — heartbeat failed, removing peer from pool")
// Remove the dead peer and clean up its stream.
// mu already covers ps when isIndexerHB (same mutex), so one
// lock acquisition is sufficient — no re-entrant double-lock.
if mu != nil {
mu.Lock()
}
if ps[proto] != nil {
if s, ok := ps[proto][ix.ID]; ok {
if s.Stream != nil {
s.Stream.Close()
}
delete(ps[proto], ix.ID)
}
}
lostAddr := ""
for addr, ad := range peers {
if ad.ID == ix.ID {
lostAddr = addr
delete(peers, addr)
if isIndexerHB {
delete(StaticIndexerMeta, addr)
}
break
}
}
need := conf.GetConfig().MinIndexer - len(peers)
remaining := len(peers)
if mu != nil {
mu.Unlock()
}
logger.Info().Int("remaining", remaining).Int("min", conf.GetConfig().MinIndexer).Int("need", need).Msg("[native] step 3 — pool state after removal")
// Step 4: ask the native for the missing indexer count.
// hasNative computed above (used in both err and success branches).
if isIndexerHB && hasNative {
if need < 1 {
need = 1
}
logger.Info().Int("need", need).Msg("[native] step 3→4 — triggering replenish")
go replenishIndexersFromNative(h, need)
}
// Native heartbeat failed — find a replacement native.
// Case 1: if the dead native was also serving as an indexer, evict it
// from StaticIndexers immediately without waiting for the indexer HB tick.
if isNativeHB {
logger.Info().Str("addr", lostAddr).Msg("[native] step 3 — native heartbeat failed, triggering native replenish")
if lostAddr != "" && hasNative {
StreamMuIndexes.Lock()
if _, wasIndexer := StaticIndexers[lostAddr]; wasIndexer {
delete(StaticIndexers, lostAddr)
if s := StreamIndexers[ProtocolHeartbeat]; s != nil {
if stream, ok := s[ix.ID]; ok {
if stream.Stream != nil {
stream.Stream.Close()
}
delete(s, ix.ID)
}
}
idxNeed := conf.GetConfig().MinIndexer - len(StaticIndexers)
StreamMuIndexes.Unlock()
if idxNeed < 1 {
idxNeed = 1
}
logger.Info().Str("addr", lostAddr).Msg("[native] dead native evicted from indexer pool, triggering replenish")
go replenishIndexersFromNative(h, idxNeed)
} else {
StreamMuIndexes.Unlock()
}
}
go replenishNativesFromPeers(h, lostAddr, proto)
}
} else {
// Case 2: native-as-indexer reconnected after a restart.
// If the peer was disconnected before this tick and the heartbeat just
// succeeded (transparent reconnect), the native may have restarted with
// blank state (responsiblePeers empty). Evict it from StaticIndexers and
// re-request an assignment so the native re-tracks us properly and
// runOffloadLoop can eventually migrate us to real indexers.
if !wasConnected && isIndexerHB && hasNative {
StreamNativeMu.RLock()
isNativeIndexer := false
for _, ad := range StaticNatives {
if ad.ID == ix.ID {
isNativeIndexer = true
break
}
}
StreamNativeMu.RUnlock()
if isNativeIndexer {
if mu != nil {
mu.Lock()
}
if ps[proto] != nil {
if s, ok := ps[proto][ix.ID]; ok {
if s.Stream != nil {
s.Stream.Close()
}
delete(ps[proto], ix.ID)
}
}
reconnectedAddr := ""
for addr, ad := range peers {
if ad.ID == ix.ID {
reconnectedAddr = addr
delete(peers, addr)
break
}
}
idxNeed := conf.GetConfig().MinIndexer - len(peers)
if mu != nil {
mu.Unlock()
}
if idxNeed < 1 {
idxNeed = 1
}
logger.Info().Str("addr", reconnectedAddr).Str("peer", ix.ID.String()).Msg(
"[native] native-as-indexer reconnected after restart — evicting and re-requesting assignment")
go replenishIndexersFromNative(h, idxNeed)
}
}
// logger.Debug().Str("peer", ix.ID.String()).Str("proto", string(proto)).Msg("[native] step 2 — heartbeat sent ok")
}
}
}
for {
select {
case <-t.C:
doTick()
case <-indexerHeartbeatNudge:
if isIndexerHB {
logger.Info().Msg("[native] step 2 — nudge received, heartbeating new indexers immediately")
doTick()
}
case <-nativeHeartbeatNudge:
if isNativeHB {
logger.Info().Msg("[native] native nudge received, heartbeating replacement native immediately")
doTick()
}
case <-ctx.Done():
return
}
}
}()
}
type ProtocolInfo struct {
PersistantStream bool
WaitResponse bool
TTL time.Duration
}
func TempStream(h host.Host, ad pp.AddrInfo, proto protocol.ID, did string, streams ProtocolStream, pts map[protocol.ID]*ProtocolInfo, mu *sync.RWMutex) (ProtocolStream, error) {
expiry := 2 * time.Second
if pts[proto] != nil {
expiry = pts[proto].TTL
}
ctxTTL, _ := context.WithTimeout(context.Background(), expiry)
if h.Network().Connectedness(ad.ID) != network.Connected {
if err := h.Connect(ctxTTL, ad); err != nil {
return streams, err
}
}
if streams[proto] != nil && streams[proto][ad.ID] != nil {
return streams, nil
} else if s, err := h.NewStream(ctxTTL, ad.ID, proto); err == nil {
mu.Lock()
if streams[proto] == nil {
streams[proto] = map[pp.ID]*Stream{}
}
mu.Unlock()
time.AfterFunc(expiry, func() {
mu.Lock()
delete(streams[proto], ad.ID)
mu.Unlock()
})
mu.Lock()
streams[proto][ad.ID] = &Stream{
DID: did,
Stream: s,
Expiry: time.Now().UTC().Add(expiry),
}
mu.Unlock()
return streams, nil
} else {
return streams, err
}
}
func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.AddrInfo,
hb Heartbeat, ps ProtocolStream, interval time.Duration) error {
logger := oclib.GetLogger()
if ps[proto] == nil {
ps[proto] = map[pp.ID]*Stream{}
}
streams := ps[proto]
pss, exists := streams[p.ID]
ctxTTL, cancel := context.WithTimeout(ctx, 3*interval)
defer cancel()
// Connect si nécessaire
if h.Network().Connectedness(p.ID) != network.Connected {
if err := h.Connect(ctxTTL, *p); err != nil {
logger.Err(err)
return err
}
exists = false // on devra recréer le stream
}
// Crée le stream si inexistant ou fermé
if !exists || pss.Stream == nil {
logger.Info().Msg("New Stream engaged as Heartbeat " + fmt.Sprintf("%v", proto) + " " + p.ID.String())
s, err := h.NewStream(ctx, p.ID, proto)
if err != nil {
logger.Err(err)
return err
}
pss = &Stream{
Stream: s,
Expiry: time.Now().UTC().Add(2 * time.Minute),
}
streams[p.ID] = pss
}
// Envoie le heartbeat
ss := json.NewEncoder(pss.Stream)
err := ss.Encode(&hb)
if err != nil {
pss.Stream.Close()
pss.Stream = nil // recréera au prochain tick
return err
}
pss.Expiry = time.Now().UTC().Add(2 * time.Minute)
return nil
}