package common import ( "bytes" "context" cr "crypto/rand" "encoding/json" "errors" "fmt" "io" "math/rand" "net" "oc-discovery/conf" "slices" "strings" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" pp "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" ) type LongLivedStreamRecordedService[T interface{}] struct { *LongLivedPubSubService StreamRecords map[protocol.ID]map[pp.ID]*StreamRecord[T] StreamMU sync.RWMutex maxNodesConn int } func NewStreamRecordedService[T interface{}](h host.Host, maxNodesConn int) *LongLivedStreamRecordedService[T] { service := &LongLivedStreamRecordedService[T]{ LongLivedPubSubService: NewLongLivedPubSubService(h), StreamRecords: map[protocol.ID]map[pp.ID]*StreamRecord[T]{}, maxNodesConn: maxNodesConn, } go service.StartGC(30 * time.Second) // Garbage collection is needed on every Map of Long-Lived Stream... it may be a top level redesigned go service.Snapshot(1 * time.Hour) return service } func (ix *LongLivedStreamRecordedService[T]) StartGC(interval time.Duration) { go func() { t := time.NewTicker(interval) defer t.Stop() for range t.C { ix.gc() } }() } func (ix *LongLivedStreamRecordedService[T]) gc() { ix.StreamMU.Lock() defer ix.StreamMU.Unlock() now := time.Now().UTC() if ix.StreamRecords[ProtocolHeartbeat] == nil { ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{} return } streams := ix.StreamRecords[ProtocolHeartbeat] for pid, rec := range streams { if now.After(rec.HeartbeatStream.Expiry) || now.Sub(rec.HeartbeatStream.UptimeTracker.LastSeen) > 2*rec.HeartbeatStream.Expiry.Sub(now) { for _, sstreams := range ix.StreamRecords { if sstreams[pid] != nil { delete(sstreams, pid) } } } } } func (ix *LongLivedStreamRecordedService[T]) Snapshot(interval time.Duration) { go func() { logger := oclib.GetLogger() t := time.NewTicker(interval) defer t.Stop() for range t.C { infos := ix.snapshot() for _, inf := range infos { logger.Info().Msg(" -> " + inf.DID) } } }() } // -------- Snapshot / Query -------- func (ix *LongLivedStreamRecordedService[T]) snapshot() []*StreamRecord[T] { ix.StreamMU.Lock() defer ix.StreamMU.Unlock() out := make([]*StreamRecord[T], 0, len(ix.StreamRecords)) for _, streams := range ix.StreamRecords { for _, stream := range streams { out = append(out, stream) } } return out } func (ix *LongLivedStreamRecordedService[T]) HandleNodeHeartbeat(s network.Stream) { defer s.Close() for { ix.StreamMU.Lock() if ix.StreamRecords[ProtocolHeartbeat] == nil { ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{} } streams := ix.StreamRecords[ProtocolHeartbeat] streamsAnonym := map[pp.ID]HeartBeatStreamed{} for k, v := range streams { streamsAnonym[k] = v } ix.StreamMU.Unlock() pid, hb, err := CheckHeartbeat(ix.Host, s, streamsAnonym, &ix.StreamMU, ix.maxNodesConn) if err != nil { continue } ix.StreamMU.Lock() // if record already seen update last seen if rec, ok := streams[*pid]; ok { rec.DID = hb.DID rec.HeartbeatStream = hb.Stream rec.HeartbeatStream.UptimeTracker.LastSeen = time.Now().UTC() } else { hb.Stream.UptimeTracker = &UptimeTracker{ FirstSeen: time.Now().UTC(), LastSeen: time.Now().UTC(), } streams[*pid] = &StreamRecord[T]{ DID: hb.DID, HeartbeatStream: hb.Stream, } } ix.StreamMU.Unlock() } } func CheckHeartbeat(h host.Host, s network.Stream, streams map[pp.ID]HeartBeatStreamed, lock *sync.RWMutex, maxNodes int) (*pp.ID, *Heartbeat, error) { if len(h.Network().Peers()) >= maxNodes { return nil, nil, fmt.Errorf("too many connections, try another indexer") } var hb Heartbeat if err := json.NewDecoder(s).Decode(&hb); err != nil { return nil, nil, err } if ok, bpms, err := getBandwidthChallengeRate(MinPayloadChallenge+int(rand.Float64()*(MaxPayloadChallenge-MinPayloadChallenge)), s); err != nil { return nil, nil, err } else if !ok { return nil, nil, fmt.Errorf("Not a proper peer") } else { pid, err := pp.Decode(hb.PeerID) if err != nil { return nil, nil, err } upTime := float64(0) lock.Lock() if rec, ok := streams[pid]; ok && rec.GetUptimeTracker() != nil { upTime = rec.GetUptimeTracker().Uptime().Hours() / float64(time.Since(TimeWatcher).Hours()) } lock.Unlock() diversity := getDiversityRate(h, hb.IndexersBinded) hb.ComputeIndexerScore(upTime, bpms, diversity) if hb.Score < 75 { return nil, nil, errors.New("not enough trusting value") } hb.Stream = &Stream{ Name: hb.Name, DID: hb.DID, Stream: s, Expiry: time.Now().UTC().Add(2 * time.Minute), } // here is the long-lived bidirectionnal heart bit. return &pid, &hb, err } } func getDiversityRate(h host.Host, peers []string) float64 { peers, _ = checkPeers(h, peers) diverse := []string{} for _, p := range peers { ip, err := ExtractIP(p) if err != nil { continue } div := ip.Mask(net.CIDRMask(24, 32)).String() if !slices.Contains(diverse, div) { diverse = append(diverse, div) } } return float64(len(diverse) / len(peers)) } func checkPeers(h host.Host, peers []string) ([]string, []string) { concretePeer := []string{} ips := []string{} for _, p := range peers { ad, err := pp.AddrInfoFromString(p) if err != nil { continue } if PeerIsAlive(h, *ad) { concretePeer = append(concretePeer, p) if ip, err := ExtractIP(p); err == nil { ips = append(ips, ip.Mask(net.CIDRMask(24, 32)).String()) } } } return concretePeer, ips } const MaxExpectedMbps = 50.0 const MinPayloadChallenge = 512 const MaxPayloadChallenge = 2048 const BaseRoundTrip = 400 * time.Millisecond func getBandwidthChallengeRate(payloadSize int, s network.Stream) (bool, float64, error) { // Génération payload aléatoire payload := make([]byte, payloadSize) _, err := cr.Read(payload) if err != nil { return false, 0, err } start := time.Now() // send on heartbeat stream the challenge if _, err = s.Write(payload); err != nil { return false, 0, err } // read back response := make([]byte, payloadSize) _, err = io.ReadFull(s, response) if err != nil { return false, 0, err } duration := time.Since(start) // Verify content if !bytes.Equal(payload, response) { return false, 0, nil // pb or a sadge peer. } maxRoundTrip := BaseRoundTrip + (time.Duration(payloadSize) * (100 * time.Millisecond)) mbps := float64(payloadSize*8) / duration.Seconds() / 1e6 if duration > maxRoundTrip || mbps < 5.0 { return false, float64(mbps / MaxExpectedMbps), nil } return true, float64(mbps / MaxExpectedMbps), nil } type UptimeTracker struct { FirstSeen time.Time LastSeen time.Time } func (u *UptimeTracker) Uptime() time.Duration { return time.Since(u.FirstSeen) } func (u *UptimeTracker) IsEligible(min time.Duration) bool { return u.Uptime() >= min } type StreamRecord[T interface{}] struct { DID string HeartbeatStream *Stream Record T } func (s *StreamRecord[T]) GetUptimeTracker() *UptimeTracker { if s.HeartbeatStream == nil { return nil } return s.HeartbeatStream.UptimeTracker } type Stream struct { Name string `json:"name"` DID string `json:"did"` Stream network.Stream Expiry time.Time `json:"expiry"` UptimeTracker *UptimeTracker } func (s *Stream) GetUptimeTracker() *UptimeTracker { return s.UptimeTracker } func NewStream[T interface{}](s network.Stream, did string, record T) *Stream { return &Stream{ DID: did, Stream: s, Expiry: time.Now().UTC().Add(2 * time.Minute), } } type ProtocolStream map[protocol.ID]map[pp.ID]*Stream func (ps ProtocolStream) Get(protocol protocol.ID) map[pp.ID]*Stream { if ps[protocol] == nil { ps[protocol] = map[pp.ID]*Stream{} } return ps[protocol] } func (ps ProtocolStream) Add(protocol protocol.ID, peerID *pp.ID, s *Stream) error { if ps[protocol] == nil { ps[protocol] = map[pp.ID]*Stream{} } if peerID != nil { if s != nil { ps[protocol][*peerID] = s } else { return errors.New("unable to add stream : stream missing") } } return nil } func (ps ProtocolStream) Delete(protocol protocol.ID, peerID *pp.ID) { if streams, ok := ps[protocol]; ok { if peerID != nil && streams[*peerID] != nil { streams[*peerID].Stream.Close() delete(streams, *peerID) } else { for _, s := range ps { for _, v := range s { v.Stream.Close() } } delete(ps, protocol) } } } const ( ProtocolPublish = "/opencloud/record/publish/1.0" ProtocolGet = "/opencloud/record/get/1.0" ) var TimeWatcher time.Time var StaticIndexers map[string]*pp.AddrInfo = map[string]*pp.AddrInfo{} var StreamMuIndexes sync.RWMutex var StreamIndexers ProtocolStream = ProtocolStream{} func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) error { TimeWatcher = time.Now().UTC() logger := oclib.GetLogger() // If native addresses are configured, bypass static indexer addresses if conf.GetConfig().NativeIndexerAddresses != "" { return ConnectToNatives(h, minIndexer, maxIndexer, myPID) } addresses := strings.Split(conf.GetConfig().IndexerAddresses, ",") if len(addresses) > maxIndexer { addresses = addresses[0:maxIndexer] } for _, indexerAddr := range addresses { fmt.Println("GENERATE ADDR", indexerAddr) ad, err := pp.AddrInfoFromString(indexerAddr) if err != nil { logger.Err(err) continue } StaticIndexers[indexerAddr] = ad } SendHeartbeat(context.Background(), ProtocolHeartbeat, conf.GetConfig().Name, h, StreamIndexers, StaticIndexers, 20*time.Second) // your indexer is just like a node for the next indexer. if len(StaticIndexers) < minIndexer { return errors.New("you run a node without indexers... your gonna be isolated.") } return nil } func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, force bool, onStreamCreated *func(network.Stream)) ProtocolStream { if onStreamCreated == nil { f := func(s network.Stream) { protoS[proto][id] = &Stream{ Stream: s, Expiry: time.Now().UTC().Add(2 * time.Minute), } } onStreamCreated = &f } f := *onStreamCreated if mypid > id || force { if ctx == nil { c := context.Background() ctx = &c } if protoS[proto] == nil { protoS[proto] = map[pp.ID]*Stream{} } if protoS[proto][id] != nil { protoS[proto][id].Expiry = time.Now().Add(2 * time.Minute) } else { fmt.Println("NEW STREAM", proto, id) s, err := h.NewStream(*ctx, id, proto) if err != nil { panic(err.Error()) } f(s) } } return protoS } type Heartbeat struct { Name string `json:"name"` Stream *Stream `json:"stream"` DID string `json:"did"` PeerID string `json:"peer_id"` Timestamp int64 `json:"timestamp"` IndexersBinded []string `json:"indexers_binded"` Score float64 } func (hb *Heartbeat) ComputeIndexerScore(uptimeHours float64, bpms float64, diversity float64) { hb.Score = (0.4 * uptimeHours) + (0.4 * bpms) + (0.2 * diversity) } type HeartbeatInfo []struct { Info []byte `json:"info"` } const ProtocolHeartbeat = "/opencloud/heartbeat/1.0" func SendHeartbeat(ctx context.Context, proto protocol.ID, name string, h host.Host, ps ProtocolStream, peers map[string]*pp.AddrInfo, interval time.Duration) { peerID, err := oclib.GenerateNodeID() if err == nil { panic("can't heartbeat daemon failed to start") } go func() { t := time.NewTicker(interval) defer t.Stop() for { select { case <-t.C: addrs := []string{} for addr := range StaticIndexers { addrs = append(addrs, addr) } hb := Heartbeat{ Name: name, DID: peerID, PeerID: h.ID().String(), Timestamp: time.Now().UTC().Unix(), IndexersBinded: addrs, } for _, ix := range peers { if err = sendHeartbeat(ctx, h, proto, ix, hb, ps, interval*time.Second); err != nil { StreamMuIndexes.Lock() delete(StreamIndexers[proto], ix.ID) StreamMuIndexes.Unlock() } } case <-ctx.Done(): return } } }() } type ProtocolInfo struct { PersistantStream bool WaitResponse bool TTL time.Duration } func TempStream(h host.Host, ad pp.AddrInfo, proto protocol.ID, did string, streams ProtocolStream, pts map[protocol.ID]*ProtocolInfo, mu *sync.RWMutex) (ProtocolStream, error) { expiry := 2 * time.Second if pts[proto] != nil { expiry = pts[proto].TTL } if ctxTTL, err := context.WithTimeout(context.Background(), expiry); err == nil { if h.Network().Connectedness(ad.ID) != network.Connected { if err := h.Connect(ctxTTL, ad); err != nil { return streams, err } } if streams[proto] != nil && streams[proto][ad.ID] != nil { return streams, nil } else if s, err := h.NewStream(ctxTTL, ad.ID, proto); err == nil { mu.Lock() if streams[proto] == nil { streams[proto] = map[pp.ID]*Stream{} } mu.Unlock() time.AfterFunc(expiry, func() { mu.Lock() defer mu.Unlock() delete(streams[proto], ad.ID) }) streams[ProtocolPublish][ad.ID] = &Stream{ DID: did, Stream: s, Expiry: time.Now().UTC().Add(expiry), } mu.Unlock() return streams, nil } else { return streams, err } } return streams, errors.New("can't create a context") } func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.AddrInfo, hb Heartbeat, ps ProtocolStream, interval time.Duration) error { streams := ps.Get(proto) if len(streams) == 0 { return errors.New("no stream for protocol heartbeat founded") } pss, exists := streams[p.ID] ctxTTL, _ := context.WithTimeout(ctx, 3*interval) // Connect si nécessaire if h.Network().Connectedness(p.ID) != network.Connected { if err := h.Connect(ctxTTL, *p); err != nil { return err } exists = false // on devra recréer le stream } // Crée le stream si inexistant ou fermé if !exists || pss.Stream == nil { s, err := h.NewStream(ctx, p.ID, proto) if err != nil { return err } pss = &Stream{ Stream: s, Expiry: time.Now().UTC().Add(2 * time.Minute), } streams[p.ID] = pss } // Envoie le heartbeat ss := json.NewEncoder(pss.Stream) err := ss.Encode(&hb) if err != nil { pss.Stream.Close() pss.Stream = nil // recréera au prochain tick return err } pss.Expiry = time.Now().UTC().Add(2 * time.Minute) return nil }