2026-01-30 16:57:36 +01:00
package common
import (
"context"
"encoding/json"
"errors"
"fmt"
"oc-discovery/conf"
"strings"
"sync"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
peer "cloud.o-forge.io/core/oc-lib/models/peer"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
pp "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
type LongLivedStreamRecordedService [ T interface { } ] struct {
* LongLivedPubSubService
StreamRecords map [ protocol . ID ] map [ pp . ID ] * StreamRecord [ T ]
StreamMU sync . RWMutex
maxNodesConn int
isBidirectionnal bool
}
func NewStreamRecordedService [ T interface { } ] ( h host . Host , maxNodesConn int , isBidirectionnal bool ) * LongLivedStreamRecordedService [ T ] {
service := & LongLivedStreamRecordedService [ T ] {
LongLivedPubSubService : NewLongLivedPubSubService ( h ) ,
StreamRecords : map [ protocol . ID ] map [ pp . ID ] * StreamRecord [ T ] { } ,
maxNodesConn : maxNodesConn ,
isBidirectionnal : isBidirectionnal ,
}
go service . StartGC ( 30 * time . Second )
// Garbage collection is needed on every Map of Long-Lived Stream... it may be a top level redesigned
go service . Snapshot ( 1 * time . Hour )
return service
}
func ( ix * LongLivedStreamRecordedService [ T ] ) StartGC ( interval time . Duration ) {
go func ( ) {
t := time . NewTicker ( interval )
defer t . Stop ( )
for range t . C {
ix . gc ( )
}
} ( )
}
func ( ix * LongLivedStreamRecordedService [ T ] ) gc ( ) {
ix . StreamMU . Lock ( )
defer ix . StreamMU . Unlock ( )
2026-02-03 15:25:15 +01:00
now := time . Now ( ) . UTC ( )
if ix . StreamRecords [ ProtocolHeartbeat ] == nil {
2026-01-30 16:57:36 +01:00
ix . StreamRecords [ ProtocolHeartbeat ] = map [ pp . ID ] * StreamRecord [ T ] { }
return
}
2026-02-03 15:25:15 +01:00
streams := ix . StreamRecords [ ProtocolHeartbeat ]
2026-01-30 16:57:36 +01:00
for pid , rec := range streams {
if now . After ( rec . HeartbeatStream . Expiry ) || now . Sub ( rec . LastSeen ) > 2 * rec . HeartbeatStream . Expiry . Sub ( now ) {
for _ , sstreams := range ix . StreamRecords {
if sstreams [ pid ] != nil {
delete ( sstreams , pid )
}
}
ix . PubsubMu . Lock ( )
if ix . LongLivedPubSubs [ TopicPubSubNodeActivity ] != nil {
2026-02-09 13:28:00 +01:00
if b , err := json . Marshal ( TopicNodeActivityPub {
Disposer : "/ip4/" + conf . GetConfig ( ) . Hostname + "/tcp/" + fmt . Sprintf ( "%v" , conf . GetConfig ( ) . NodeEndpointPort ) + "/p2p/" + ix . Host . ID ( ) . String ( ) ,
Name : rec . HeartbeatStream . Name ,
DID : rec . HeartbeatStream . DID ,
PeerID : pid . String ( ) ,
NodeActivity : peer . OFFLINE . EnumIndex ( ) ,
} ) ; err == nil {
ix . LongLivedPubSubs [ TopicPubSubNodeActivity ] . Publish ( context . Background ( ) , b )
2026-01-30 16:57:36 +01:00
}
2026-02-09 13:28:00 +01:00
2026-01-30 16:57:36 +01:00
}
ix . PubsubMu . Unlock ( )
}
}
}
func ( ix * LongLivedStreamRecordedService [ T ] ) Snapshot ( interval time . Duration ) {
go func ( ) {
logger := oclib . GetLogger ( )
t := time . NewTicker ( interval )
defer t . Stop ( )
for range t . C {
infos := ix . snapshot ( )
for _ , inf := range infos {
logger . Info ( ) . Msg ( " -> " + inf . DID )
}
}
} ( )
}
// -------- Snapshot / Query --------
func ( ix * LongLivedStreamRecordedService [ T ] ) snapshot ( ) [ ] * StreamRecord [ T ] {
ix . StreamMU . Lock ( )
defer ix . StreamMU . Unlock ( )
out := make ( [ ] * StreamRecord [ T ] , 0 , len ( ix . StreamRecords ) )
for _ , streams := range ix . StreamRecords {
for _ , stream := range streams {
out = append ( out , stream )
}
}
return out
}
func ( ix * LongLivedStreamRecordedService [ T ] ) HandleNodeHeartbeat ( s network . Stream ) {
2026-02-03 15:25:15 +01:00
defer s . Close ( )
for {
pid , hb , err := CheckHeartbeat ( ix . Host , s , ix . maxNodesConn )
if err != nil {
continue
}
ix . StreamMU . Lock ( )
if ix . StreamRecords [ ProtocolHeartbeat ] == nil {
ix . StreamRecords [ ProtocolHeartbeat ] = map [ pp . ID ] * StreamRecord [ T ] { }
}
streams := ix . StreamRecords [ ProtocolHeartbeat ]
// if record already seen update last seen
if rec , ok := streams [ * pid ] ; ok {
rec . DID = hb . DID
rec . Stream = s
rec . HeartbeatStream = hb . Stream
rec . LastSeen = time . Now ( ) . UTC ( )
} else {
streams [ * pid ] = & StreamRecord [ T ] {
DID : hb . DID ,
HeartbeatStream : hb . Stream ,
Stream : s ,
LastSeen : time . Now ( ) . UTC ( ) ,
}
}
ix . StreamMU . Unlock ( )
2026-01-30 16:57:36 +01:00
}
}
func CheckHeartbeat ( h host . Host , s network . Stream , maxNodes int ) ( * pp . ID , * Heartbeat , error ) {
if len ( h . Network ( ) . Peers ( ) ) >= maxNodes {
return nil , nil , fmt . Errorf ( "too many connections, try another indexer" )
}
var hb Heartbeat
if err := json . NewDecoder ( s ) . Decode ( & hb ) ; err != nil {
return nil , nil , err
}
pid , err := pp . Decode ( hb . PeerID )
2026-02-03 15:25:15 +01:00
hb . Stream = & Stream {
2026-02-04 11:35:19 +01:00
Name : hb . Name ,
2026-02-03 15:25:15 +01:00
DID : hb . DID ,
Stream : s ,
Expiry : time . Now ( ) . UTC ( ) . Add ( 2 * time . Minute ) ,
} // here is the long-lived bidirectionnal heart bit.
2026-01-30 16:57:36 +01:00
return & pid , & hb , err
}
type StreamRecord [ T interface { } ] struct {
DID string
HeartbeatStream * Stream
Stream network . Stream
Record T
LastSeen time . Time // to check expiry
}
type Stream struct {
2026-02-04 11:35:19 +01:00
Name string ` json:"name" `
2026-01-30 16:57:36 +01:00
DID string ` json:"did" `
Stream network . Stream
Expiry time . Time ` json:"expiry" `
}
func NewStream [ T interface { } ] ( s network . Stream , did string , record T ) * Stream {
return & Stream {
DID : did ,
Stream : s ,
Expiry : time . Now ( ) . UTC ( ) . Add ( 2 * time . Minute ) ,
}
}
type ProtocolStream map [ protocol . ID ] map [ pp . ID ] * Stream
func ( ps ProtocolStream ) Get ( protocol protocol . ID ) map [ pp . ID ] * Stream {
if ps [ protocol ] == nil {
ps [ protocol ] = map [ pp . ID ] * Stream { }
}
return ps [ protocol ]
}
func ( ps ProtocolStream ) Add ( protocol protocol . ID , peerID * pp . ID , s * Stream ) error {
if ps [ protocol ] == nil {
ps [ protocol ] = map [ pp . ID ] * Stream { }
}
if peerID != nil {
if s != nil {
ps [ protocol ] [ * peerID ] = s
} else {
return errors . New ( "unable to add stream : stream missing" )
}
}
return nil
}
func ( ps ProtocolStream ) Delete ( protocol protocol . ID , peerID * pp . ID ) {
if streams , ok := ps [ protocol ] ; ok {
if peerID != nil && streams [ * peerID ] != nil {
streams [ * peerID ] . Stream . Close ( )
delete ( streams , * peerID )
} else {
for _ , s := range ps {
for _ , v := range s {
v . Stream . Close ( )
}
}
delete ( ps , protocol )
}
}
}
const (
ProtocolPublish = "/opencloud/record/publish/1.0"
ProtocolGet = "/opencloud/record/get/1.0"
)
var StaticIndexers [ ] * pp . AddrInfo = [ ] * pp . AddrInfo { }
var StreamIndexers ProtocolStream = ProtocolStream { }
func ConnectToIndexers ( h host . Host , minIndexer int , maxIndexer int , myPID pp . ID ) {
logger := oclib . GetLogger ( )
addresses := strings . Split ( conf . GetConfig ( ) . IndexerAddresses , "," )
if len ( addresses ) > maxIndexer {
addresses = addresses [ 0 : maxIndexer ]
}
for _ , indexerAddr := range addresses {
2026-02-05 11:23:11 +01:00
fmt . Println ( "GENERATE ADDR" , indexerAddr )
2026-01-30 16:57:36 +01:00
ad , err := pp . AddrInfoFromString ( indexerAddr )
if err != nil {
2026-02-05 11:23:11 +01:00
fmt . Println ( "ADDR ERR" , err )
2026-01-30 16:57:36 +01:00
logger . Err ( err )
continue
}
2026-02-05 11:23:11 +01:00
force := false
2026-01-30 16:57:36 +01:00
if h . Network ( ) . Connectedness ( ad . ID ) != network . Connected {
2026-02-05 11:23:11 +01:00
force = true
if err := h . Connect ( context . Background ( ) , * ad ) ; err != nil {
2026-02-05 08:56:55 +01:00
fmt . Println ( err )
2026-01-30 16:57:36 +01:00
logger . Err ( err )
continue
}
}
StaticIndexers = append ( StaticIndexers , ad )
// make a privilege streams with indexer.
for _ , proto := range [ ] protocol . ID { ProtocolPublish , ProtocolGet , ProtocolHeartbeat } {
2026-02-05 11:23:11 +01:00
AddStreamProtocol ( nil , StreamIndexers , h , proto , ad . ID , myPID , force , nil )
2026-01-30 16:57:36 +01:00
}
}
if len ( StaticIndexers ) == 0 {
2026-02-02 09:05:58 +01:00
logger . Err ( errors . New ( "you run a node without indexers... your gonna be isolated." ) )
2026-01-30 16:57:36 +01:00
}
if len ( StaticIndexers ) < minIndexer {
// TODO : ask for unknown indexer.
}
2026-02-05 11:23:11 +01:00
SendHeartbeat ( context . Background ( ) , ProtocolHeartbeat , conf . GetConfig ( ) . Name , h , StreamIndexers , StaticIndexers , 20 * time . Second ) // your indexer is just like a node for the next indexer.
2026-01-30 16:57:36 +01:00
}
2026-02-03 15:25:15 +01:00
func AddStreamProtocol ( ctx * context . Context , protoS ProtocolStream , h host . Host , proto protocol . ID , id pp . ID , mypid pp . ID , force bool , onStreamCreated * func ( network . Stream ) ) ProtocolStream {
2026-01-30 16:57:36 +01:00
if onStreamCreated == nil {
f := func ( s network . Stream ) {
protoS [ proto ] [ id ] = & Stream {
Stream : s ,
2026-02-03 15:25:15 +01:00
Expiry : time . Now ( ) . UTC ( ) . Add ( 2 * time . Minute ) ,
2026-01-30 16:57:36 +01:00
}
}
onStreamCreated = & f
}
f := * onStreamCreated
2026-02-03 15:25:15 +01:00
if mypid > id || force {
2026-01-30 16:57:36 +01:00
if ctx == nil {
c := context . Background ( )
ctx = & c
}
if protoS [ proto ] == nil {
protoS [ proto ] = map [ pp . ID ] * Stream { }
}
if protoS [ proto ] [ id ] != nil {
protoS [ proto ] [ id ] . Expiry = time . Now ( ) . Add ( 2 * time . Minute )
} else {
2026-02-05 11:23:11 +01:00
fmt . Println ( "NEW STREAM" , proto , id )
2026-01-30 16:57:36 +01:00
s , err := h . NewStream ( * ctx , id , proto )
if err != nil {
panic ( err . Error ( ) )
}
f ( s )
}
}
return protoS
}
type Heartbeat struct {
2026-02-04 11:35:19 +01:00
Name string ` json:"name" `
2026-01-30 16:57:36 +01:00
Stream * Stream ` json:"stream" `
DID string ` json:"did" `
PeerID string ` json:"peer_id" `
Timestamp int64 ` json:"timestamp" `
}
type HeartbeatInfo [ ] struct {
Info [ ] byte ` json:"info" `
}
const ProtocolHeartbeat = "/opencloud/heartbeat/1.0"
2026-02-04 11:35:19 +01:00
func SendHeartbeat ( ctx context . Context , proto protocol . ID , name string , h host . Host , ps ProtocolStream , peers [ ] * pp . AddrInfo , interval time . Duration ) {
2026-01-30 16:57:36 +01:00
peerID , err := oclib . GenerateNodeID ( )
if err == nil {
panic ( "can't heartbeat daemon failed to start" )
}
go func ( ) {
t := time . NewTicker ( interval )
defer t . Stop ( )
for {
select {
case <- t . C :
hb := Heartbeat {
2026-02-04 11:35:19 +01:00
Name : name ,
2026-01-30 16:57:36 +01:00
DID : peerID ,
PeerID : h . ID ( ) . String ( ) ,
2026-02-03 15:25:15 +01:00
Timestamp : time . Now ( ) . UTC ( ) . Unix ( ) ,
2026-01-30 16:57:36 +01:00
}
for _ , ix := range peers {
_ = sendHeartbeat ( ctx , h , proto , ix , hb , ps , interval * time . Second )
}
case <- ctx . Done ( ) :
return
}
}
} ( )
}
2026-02-03 15:25:15 +01:00
func sendHeartbeat ( ctx context . Context , h host . Host , proto protocol . ID , p * pp . AddrInfo ,
hb Heartbeat , ps ProtocolStream , interval time . Duration ) error {
2026-01-30 16:57:36 +01:00
streams := ps . Get ( proto )
if len ( streams ) == 0 {
return errors . New ( "no stream for protocol heartbeat founded" )
}
pss , exists := streams [ p . ID ]
ctxTTL , _ := context . WithTimeout ( ctx , 3 * interval )
// Connect si nécessaire
if h . Network ( ) . Connectedness ( p . ID ) != network . Connected {
_ = h . Connect ( ctxTTL , * p )
exists = false // on devra recréer le stream
}
// Crée le stream si inexistant ou fermé
if ! exists || pss . Stream == nil {
s , err := h . NewStream ( ctx , p . ID , proto )
if err != nil {
return err
}
pss = & Stream {
Stream : s ,
Expiry : time . Now ( ) . UTC ( ) . Add ( 2 * time . Minute ) ,
}
streams [ p . ID ] = pss
}
// Envoie le heartbeat
ss := json . NewEncoder ( pss . Stream )
err := ss . Encode ( & hb )
if err != nil {
pss . Stream . Close ( )
pss . Stream = nil // recréera au prochain tick
return err
}
pss . Expiry = time . Now ( ) . UTC ( ) . Add ( 2 * time . Minute )
return nil
}