package common import ( "context" "encoding/json" "fmt" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" pp "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" ) // InitialEventHops is the starting hop count for SWIM membership events. // floor(log2(typical max-pool)) + 1 gives O(log n) propagation rounds. const InitialEventHops = 4 const maxMemberEventQueue = 50 // MembershipEventQueue holds SWIM membership events to be piggybacked on // outgoing heartbeats (infection-style dissemination). Bounded at // maxMemberEventQueue entries; events are deduplicated by PeerID. type MembershipEventQueue struct { mu sync.Mutex events []MemberEvent } // memberEventPriority maps event types to an integer so higher-severity // events override lower-severity ones for the same PeerID. func memberEventPriority(t MemberEventType) int { switch t { case MemberDead: return 3 case MemberSuspect: return 2 case MemberAlive: return 1 } return 0 } // Add inserts or updates a membership event. // An incoming event replaces the existing entry for the same PeerID when: // - its Incarnation is higher, OR // - the Incarnation is equal but the event type is higher-severity. func (q *MembershipEventQueue) Add(e MemberEvent) { q.mu.Lock() defer q.mu.Unlock() for i, ex := range q.events { if ex.PeerID == e.PeerID { if e.Incarnation > ex.Incarnation || (e.Incarnation == ex.Incarnation && memberEventPriority(e.Type) > memberEventPriority(ex.Type)) { q.events[i] = e } return } } if len(q.events) >= maxMemberEventQueue { q.events = q.events[1:] // drop oldest } q.events = append(q.events, e) } // Drain returns up to max events ready for transmission. // HopsLeft is decremented on each call; events that reach 0 are removed from // the queue (they have already propagated enough rounds). func (q *MembershipEventQueue) Drain(max int) []MemberEvent { q.mu.Lock() defer q.mu.Unlock() if len(q.events) == 0 { return nil } out := make([]MemberEvent, 0, max) kept := q.events[:0] for _, e := range q.events { if len(out) < max { e.HopsLeft-- out = append(out, e) if e.HopsLeft > 0 { kept = append(kept, e) } // HopsLeft reached 0: event has propagated enough, drop from queue. } else { kept = append(kept, e) } } q.events = kept return out } // NodeEventQueue is the global SWIM event queue for the node side. // Events are added on suspect/dead detection and drained into outgoing heartbeats. var NodeEventQueue = &MembershipEventQueue{} const ( ProtocolPublish = "/opencloud/record/publish/1.0" ProtocolGet = "/opencloud/record/get/1.0" ProtocolDelete = "/opencloud/record/delete/1.0" // ProtocolIndirectProbe is opened by a node toward a live indexer to ask it // to actively probe a suspected indexer on the node's behalf (SWIM indirect ping). // It is the only inter-indexer protocol — indexers do not maintain persistent // connections to each other; this stream is one-shot and short-lived. ProtocolIndirectProbe = "/opencloud/indexer/probe/1.0" ) // IndirectProbeRequest is sent by a node over ProtocolIndirectProbe. // The receiving indexer must attempt to reach Target and report back. type IndirectProbeRequest struct { Target pp.AddrInfo `json:"target"` } // IndirectProbeResponse is the reply from the probing indexer. type IndirectProbeResponse struct { Reachable bool `json:"reachable"` LatencyMs int64 `json:"latency_ms,omitempty"` } const ProtocolHeartbeat = "/opencloud/heartbeat/1.0" // ProtocolWitnessQuery is opened by a node to ask a peer what it thinks of a given indexer. const ProtocolWitnessQuery = "/opencloud/witness/1.0" // ProtocolSearchPeer is opened by a node toward one of its indexers to start a // distributed peer search. The stream stays open; the indexer writes // SearchPeerResult JSON objects as results arrive from the GossipSub mesh. const ProtocolSearchPeer = "/opencloud/search/peer/1.0" // ProtocolSearchPeerResponse is opened by an indexer back toward the emitting // indexer to deliver search results found in its referencedNodes. const ProtocolSearchPeerResponse = "/opencloud/search/peer/response/1.0" // ProtocolBandwidthProbe is a dedicated short-lived stream used exclusively // for bandwidth/latency measurement. The handler echoes any bytes it receives. // All nodes and indexers register this handler so peers can measure them. const ProtocolBandwidthProbe = "/opencloud/probe/1.0" type Stream struct { Name string `json:"name"` DID string `json:"did"` Stream network.Stream Expiry time.Time `json:"expiry"` UptimeTracker *UptimeTracker } func (s *Stream) GetUptimeTracker() *UptimeTracker { return s.UptimeTracker } func NewStream[T interface{}](s network.Stream, did string, record T) *Stream { return &Stream{ DID: did, Stream: s, Expiry: time.Now().UTC().Add(2 * time.Minute), } } type StreamRecord[T interface{}] struct { DID string HeartbeatStream *Stream Record T LastScore float64 } func (s *StreamRecord[T]) GetUptimeTracker() *UptimeTracker { if s.HeartbeatStream == nil { return nil } return s.HeartbeatStream.UptimeTracker } type ProtocolInfo struct { PersistantStream bool WaitResponse bool TTL time.Duration } func TempStream(h host.Host, ad pp.AddrInfo, proto protocol.ID, did string, streams ProtocolStream, pts map[protocol.ID]*ProtocolInfo, mu *sync.RWMutex) (ProtocolStream, error) { expiry := 2 * time.Second if pts[proto] != nil { expiry = pts[proto].TTL } ctxTTL, cancelTTL := context.WithTimeout(context.Background(), expiry) defer cancelTTL() if h.Network().Connectedness(ad.ID) != network.Connected { if err := h.Connect(ctxTTL, ad); err != nil { return streams, err } } if streams[proto] != nil && streams[proto][ad.ID] != nil { return streams, nil } else if s, err := h.NewStream(ctxTTL, ad.ID, proto); err == nil { mu.Lock() if streams[proto] == nil { streams[proto] = map[pp.ID]*Stream{} } mu.Unlock() time.AfterFunc(expiry, func() { mu.Lock() delete(streams[proto], ad.ID) mu.Unlock() }) mu.Lock() streams[proto][ad.ID] = &Stream{ DID: did, Stream: s, Expiry: time.Now().UTC().Add(expiry), } mu.Unlock() return streams, nil } else { return streams, err } } func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.AddrInfo, hb Heartbeat, ps ProtocolStream, interval time.Duration) (*HeartbeatResponse, time.Duration, error) { logger := oclib.GetLogger() if ps[proto] == nil { ps[proto] = map[pp.ID]*Stream{} } streams := ps[proto] pss, exists := streams[p.ID] ctxTTL, cancel := context.WithTimeout(ctx, 3*interval) defer cancel() if h.Network().Connectedness(p.ID) != network.Connected { if err := h.Connect(ctxTTL, *p); err != nil { logger.Err(err) return nil, 0, err } exists = false } if !exists || pss.Stream == nil { logger.Info().Msg("New Stream engaged as Heartbeat " + fmt.Sprintf("%v", proto) + " " + p.ID.String()) s, err := h.NewStream(ctx, p.ID, proto) if err != nil { logger.Err(err).Msg(err.Error()) return nil, 0, err } pss = &Stream{ Stream: s, Expiry: time.Now().UTC().Add(2 * time.Minute), } streams[p.ID] = pss } sentAt := time.Now() if err := json.NewEncoder(pss.Stream).Encode(&hb); err != nil { pss.Stream.Close() pss.Stream = nil return nil, 0, err } pss.Expiry = time.Now().UTC().Add(2 * time.Minute) // Try to read a response (indexers that support bidirectional heartbeat respond). pss.Stream.SetReadDeadline(time.Now().Add(5 * time.Second)) var resp HeartbeatResponse rtt := time.Since(sentAt) if err := json.NewDecoder(pss.Stream).Decode(&resp); err == nil { rtt = time.Since(sentAt) pss.Stream.SetReadDeadline(time.Time{}) return &resp, rtt, nil } pss.Stream.SetReadDeadline(time.Time{}) return nil, rtt, nil } func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, force bool, onStreamCreated *func(network.Stream)) ProtocolStream { logger := oclib.GetLogger() if onStreamCreated == nil { f := func(s network.Stream) { protoS[proto][id] = &Stream{ Stream: s, Expiry: time.Now().UTC().Add(2 * time.Minute), } } onStreamCreated = &f } f := *onStreamCreated if mypid > id || force { if ctx == nil { c := context.Background() ctx = &c } if protoS[proto] == nil { protoS[proto] = map[pp.ID]*Stream{} } if protoS[proto][id] != nil { protoS[proto][id].Expiry = time.Now().Add(2 * time.Minute) } else { logger.Info().Msg("NEW STREAM Generated" + fmt.Sprintf("%v", proto) + " " + id.String()) s, err := h.NewStream(*ctx, id, proto) if err != nil { panic(err.Error()) } f(s) } } return protoS }