Indexer Quality Score TrustLess

This commit is contained in:
mr
2026-02-17 13:11:22 +01:00
parent fa914958b6
commit 6a5ffb9a92
10 changed files with 294 additions and 74 deletions

View File

@@ -1,11 +1,17 @@
package common
import (
"bytes"
"context"
cr "crypto/rand"
"encoding/json"
"errors"
"fmt"
"io"
"math/rand"
"net"
"oc-discovery/conf"
"slices"
"strings"
"sync"
"time"
@@ -60,7 +66,7 @@ func (ix *LongLivedStreamRecordedService[T]) gc() {
streams := ix.StreamRecords[ProtocolHeartbeat]
for pid, rec := range streams {
if now.After(rec.HeartbeatStream.Expiry) || now.Sub(rec.LastSeen) > 2*rec.HeartbeatStream.Expiry.Sub(now) {
if now.After(rec.HeartbeatStream.Expiry) || now.Sub(rec.HeartbeatStream.UptimeTracker.LastSeen) > 2*rec.HeartbeatStream.Expiry.Sub(now) {
for _, sstreams := range ix.StreamRecords {
if sstreams[pid] != nil {
delete(sstreams, pid)
@@ -115,34 +121,44 @@ func (ix *LongLivedStreamRecordedService[T]) snapshot() []*StreamRecord[T] {
func (ix *LongLivedStreamRecordedService[T]) HandleNodeHeartbeat(s network.Stream) {
defer s.Close()
for {
pid, hb, err := CheckHeartbeat(ix.Host, s, ix.maxNodesConn)
if err != nil {
continue
}
ix.StreamMU.Lock()
if ix.StreamRecords[ProtocolHeartbeat] == nil {
ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{}
}
streams := ix.StreamRecords[ProtocolHeartbeat]
streamsAnonym := map[pp.ID]HeartBeatStreamed{}
for k, v := range streams {
streamsAnonym[k] = v
}
ix.StreamMU.Unlock()
pid, hb, err := CheckHeartbeat(ix.Host, s, streamsAnonym, &ix.StreamMU, ix.maxNodesConn)
if err != nil {
continue
}
ix.StreamMU.Lock()
// if record already seen update last seen
if rec, ok := streams[*pid]; ok {
rec.DID = hb.DID
rec.Stream = s
rec.HeartbeatStream = hb.Stream
rec.LastSeen = time.Now().UTC()
rec.HeartbeatStream.UptimeTracker.LastSeen = time.Now().UTC()
} else {
hb.Stream.UptimeTracker = &UptimeTracker{
FirstSeen: time.Now().UTC(),
LastSeen: time.Now().UTC(),
}
streams[*pid] = &StreamRecord[T]{
DID: hb.DID,
HeartbeatStream: hb.Stream,
Stream: s,
LastSeen: time.Now().UTC(),
}
}
ix.StreamMU.Unlock()
}
}
func CheckHeartbeat(h host.Host, s network.Stream, maxNodes int) (*pp.ID, *Heartbeat, error) {
func CheckHeartbeat(h host.Host, s network.Stream, streams map[pp.ID]HeartBeatStreamed, lock *sync.RWMutex, maxNodes int) (*pp.ID, *Heartbeat, error) {
if len(h.Network().Peers()) >= maxNodes {
return nil, nil, fmt.Errorf("too many connections, try another indexer")
}
@@ -150,14 +166,118 @@ func CheckHeartbeat(h host.Host, s network.Stream, maxNodes int) (*pp.ID, *Heart
if err := json.NewDecoder(s).Decode(&hb); err != nil {
return nil, nil, err
}
pid, err := pp.Decode(hb.PeerID)
hb.Stream = &Stream{
Name: hb.Name,
DID: hb.DID,
Stream: s,
Expiry: time.Now().UTC().Add(2 * time.Minute),
} // here is the long-lived bidirectionnal heart bit.
return &pid, &hb, err
if ok, bpms, err := getBandwidthChallengeRate(MinPayloadChallenge+int(rand.Float64()*(MaxPayloadChallenge-MinPayloadChallenge)), s); err != nil {
return nil, nil, err
} else if !ok {
return nil, nil, fmt.Errorf("Not a proper peer")
} else {
pid, err := pp.Decode(hb.PeerID)
if err != nil {
return nil, nil, err
}
upTime := float64(0)
lock.Lock()
if rec, ok := streams[pid]; ok && rec.GetUptimeTracker() != nil {
upTime = rec.GetUptimeTracker().Uptime().Hours() / float64(time.Since(TimeWatcher).Hours())
}
lock.Unlock()
diversity := getDiversityRate(h, hb.IndexersBinded)
hb.ComputeIndexerScore(upTime, bpms, diversity)
if hb.Score < 75 {
return nil, nil, errors.New("not enough trusting value")
}
hb.Stream = &Stream{
Name: hb.Name,
DID: hb.DID,
Stream: s,
Expiry: time.Now().UTC().Add(2 * time.Minute),
} // here is the long-lived bidirectionnal heart bit.
return &pid, &hb, err
}
}
func getDiversityRate(h host.Host, peers []string) float64 {
peers, _ = checkPeers(h, peers)
diverse := []string{}
for _, p := range peers {
ip, err := ExtractIP(p)
if err != nil {
continue
}
div := ip.Mask(net.CIDRMask(24, 32)).String()
if !slices.Contains(diverse, div) {
diverse = append(diverse, div)
}
}
return float64(len(diverse) / len(peers))
}
func checkPeers(h host.Host, peers []string) ([]string, []string) {
concretePeer := []string{}
ips := []string{}
for _, p := range peers {
ad, err := pp.AddrInfoFromString(p)
if err != nil {
continue
}
if PeerIsAlive(h, *ad) {
concretePeer = append(concretePeer, p)
if ip, err := ExtractIP(p); err == nil {
ips = append(ips, ip.Mask(net.CIDRMask(24, 32)).String())
}
}
}
return concretePeer, ips
}
const MaxExpectedMbps = 50.0
const MinPayloadChallenge = 512
const MaxPayloadChallenge = 2048
const BaseRoundTrip = 400 * time.Millisecond
func getBandwidthChallengeRate(payloadSize int, s network.Stream) (bool, float64, error) {
// Génération payload aléatoire
payload := make([]byte, payloadSize)
_, err := cr.Read(payload)
if err != nil {
return false, 0, err
}
start := time.Now()
// send on heartbeat stream the challenge
if _, err = s.Write(payload); err != nil {
return false, 0, err
}
// read back
response := make([]byte, payloadSize)
_, err = io.ReadFull(s, response)
if err != nil {
return false, 0, err
}
duration := time.Since(start)
// Verify content
if !bytes.Equal(payload, response) {
return false, 0, nil // pb or a sadge peer.
}
maxRoundTrip := BaseRoundTrip + (time.Duration(payloadSize) * (100 * time.Millisecond))
mbps := float64(payloadSize*8) / duration.Seconds() / 1e6
if duration > maxRoundTrip || mbps < 5.0 {
return false, float64(mbps / MaxExpectedMbps), nil
}
return true, float64(mbps / MaxExpectedMbps), nil
}
type UptimeTracker struct {
FirstSeen time.Time
LastSeen time.Time
}
func (u *UptimeTracker) Uptime() time.Duration {
return time.Since(u.FirstSeen)
}
func (u *UptimeTracker) IsEligible(min time.Duration) bool {
return u.Uptime() >= min
}
type StreamRecord[T interface{}] struct {
@@ -165,14 +285,25 @@ type StreamRecord[T interface{}] struct {
HeartbeatStream *Stream
Stream network.Stream
Record T
LastSeen time.Time // to check expiry
}
func (s *StreamRecord[T]) GetUptimeTracker() *UptimeTracker {
if s.HeartbeatStream == nil {
return nil
}
return s.HeartbeatStream.UptimeTracker
}
type Stream struct {
Name string `json:"name"`
DID string `json:"did"`
Stream network.Stream
Expiry time.Time `json:"expiry"`
Name string `json:"name"`
DID string `json:"did"`
Stream network.Stream
Expiry time.Time `json:"expiry"`
UptimeTracker *UptimeTracker
}
func (s *Stream) GetUptimeTracker() *UptimeTracker {
return s.UptimeTracker
}
func NewStream[T interface{}](s network.Stream, did string, record T) *Stream {
@@ -228,10 +359,13 @@ const (
ProtocolGet = "/opencloud/record/get/1.0"
)
var StaticIndexers []*pp.AddrInfo = []*pp.AddrInfo{}
var TimeWatcher time.Time
var StaticIndexers map[string]*pp.AddrInfo = map[string]*pp.AddrInfo{}
var StreamIndexers ProtocolStream = ProtocolStream{}
func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) {
TimeWatcher = time.Now().UTC()
logger := oclib.GetLogger()
addresses := strings.Split(conf.GetConfig().IndexerAddresses, ",")
@@ -243,7 +377,6 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID)
fmt.Println("GENERATE ADDR", indexerAddr)
ad, err := pp.AddrInfoFromString(indexerAddr)
if err != nil {
fmt.Println("ADDR ERR", err)
logger.Err(err)
continue
}
@@ -256,7 +389,7 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID)
continue
}
}
StaticIndexers = append(StaticIndexers, ad)
StaticIndexers[indexerAddr] = ad
// make a privilege streams with indexer.
for _, proto := range []protocol.ID{ProtocolPublish, ProtocolGet, ProtocolHeartbeat} {
AddStreamProtocol(nil, StreamIndexers, h, proto, ad.ID, myPID, force, nil)
@@ -307,11 +440,19 @@ func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host,
}
type Heartbeat struct {
Name string `json:"name"`
Stream *Stream `json:"stream"`
DID string `json:"did"`
PeerID string `json:"peer_id"`
Timestamp int64 `json:"timestamp"`
Name string `json:"name"`
Stream *Stream `json:"stream"`
DID string `json:"did"`
PeerID string `json:"peer_id"`
Timestamp int64 `json:"timestamp"`
IndexersBinded []string `json:"indexers_binded"`
Score float64
}
func (hb *Heartbeat) ComputeIndexerScore(uptimeHours float64, bpms float64, diversity float64) {
hb.Score = (0.4 * uptimeHours) +
(0.4 * bpms) +
(0.2 * diversity)
}
type HeartbeatInfo []struct {
@@ -320,7 +461,7 @@ type HeartbeatInfo []struct {
const ProtocolHeartbeat = "/opencloud/heartbeat/1.0"
func SendHeartbeat(ctx context.Context, proto protocol.ID, name string, h host.Host, ps ProtocolStream, peers []*pp.AddrInfo, interval time.Duration) {
func SendHeartbeat(ctx context.Context, proto protocol.ID, name string, h host.Host, ps ProtocolStream, peers map[string]*pp.AddrInfo, interval time.Duration) {
peerID, err := oclib.GenerateNodeID()
if err == nil {
panic("can't heartbeat daemon failed to start")
@@ -331,11 +472,16 @@ func SendHeartbeat(ctx context.Context, proto protocol.ID, name string, h host.H
for {
select {
case <-t.C:
addrs := []string{}
for addr := range StaticIndexers {
addrs = append(addrs, addr)
}
hb := Heartbeat{
Name: name,
DID: peerID,
PeerID: h.ID().String(),
Timestamp: time.Now().UTC().Unix(),
Name: name,
DID: peerID,
PeerID: h.ID().String(),
Timestamp: time.Now().UTC().Unix(),
IndexersBinded: addrs,
}
for _, ix := range peers {
_ = sendHeartbeat(ctx, h, proto, ix, hb, ps, interval*time.Second)

View File

@@ -6,6 +6,10 @@ import (
"cloud.o-forge.io/core/oc-lib/models/peer"
)
type HeartBeatStreamed interface {
GetUptimeTracker() *UptimeTracker
}
type DiscoveryPeer interface {
GetPeerRecord(ctx context.Context, key string) ([]*peer.Peer, error)
}

View File

@@ -0,0 +1,40 @@
package common
import (
"context"
"fmt"
"net"
"time"
"github.com/libp2p/go-libp2p/core/host"
pp "github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
)
func PeerIsAlive(h host.Host, ad pp.AddrInfo) bool {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
err := h.Connect(ctx, ad)
return err == nil
}
func ExtractIP(addr string) (net.IP, error) {
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return nil, err
}
ips, err := ma.ValueForProtocol(multiaddr.P_IP4) // or P_IP6
if err != nil {
return nil, err
}
host, _, err := net.SplitHostPort(ips)
if err != nil {
return nil, err
}
ip := net.ParseIP(host)
if ip == nil {
return nil, fmt.Errorf("invalid IP: %s", host)
}
return ip, nil
}

View File

@@ -171,15 +171,13 @@ func (ix *IndexerService) handleNodePublish(s network.Stream) {
fmt.Println("UPDATE PUBLISH", pid)
srec.DID = rec.DID
srec.Record = rec
srec.LastSeen = time.Now().UTC()
srec.HeartbeatStream.UptimeTracker.LastSeen = time.Now().UTC()
} else {
fmt.Println("CREATE PUBLISH", pid)
streams[pid] = &common.StreamRecord[PeerRecord]{ // HeartBeat wil
DID: rec.DID,
Record: rec,
LastSeen: time.Now().UTC(),
}
ix.StreamMU.Unlock()
logger.Err(errors.New("no heartbeat"))
continue
}
ix.StreamMU.Unlock()
if ix.LongLivedPubSubs[common.TopicPubSubNodeActivity] != nil && !rec.NoPub {
@@ -211,7 +209,6 @@ func (ix *IndexerService) handleNodePublish(s network.Stream) {
}
}
}
ix.StreamMU.Unlock()
}
}

View File

@@ -61,7 +61,7 @@ func ListenNATS(n *Node) {
p := &peer.Peer{}
p = p.Deserialize(m, p).(*peer.Peer)
ad, err := pp.AddrInfoFromString(p.PeerID)
ad, err := pp.AddrInfoFromString(p.StreamAddress)
if err != nil {
return
}
@@ -73,7 +73,7 @@ func ListenNATS(n *Node) {
n.Mu.Unlock()
if p.Relation == peer.PARTNER {
n.StreamService.ConnectToPartner(ad.ID, ad)
n.StreamService.ConnectToPartner(p.StreamAddress)
} else {
ps := common.ProtocolStream{}
for p, s := range n.StreamService.Streams {

View File

@@ -101,7 +101,7 @@ func (ps *StreamService) ToPartnerPublishEvent(
if err := json.Unmarshal(payload, &p); err != nil {
return err
}
ad, err := pp.AddrInfoFromString(p.StreamAddress)
pid, err := pp.Decode(p.PeerID)
if err != nil {
return err
}
@@ -111,11 +111,11 @@ func (ps *StreamService) ToPartnerPublishEvent(
if ps.Streams[ProtocolHeartbeatPartner] == nil {
ps.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{}
}
ps.ConnectToPartner(ad.ID, ad)
} else if ps.Streams[ProtocolHeartbeatPartner] != nil && ps.Streams[ProtocolHeartbeatPartner][ad.ID] != nil {
ps.ConnectToPartner(p.StreamAddress)
} else if ps.Streams[ProtocolHeartbeatPartner] != nil && ps.Streams[ProtocolHeartbeatPartner][pid] != nil {
for _, pids := range ps.Streams {
if pids[ad.ID] != nil {
delete(pids, ad.ID)
if pids[pid] != nil {
delete(pids, pid)
}
}
}

View File

@@ -19,6 +19,7 @@ import (
"github.com/libp2p/go-libp2p/core/network"
pp "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
ma "github.com/multiformats/go-multiaddr"
)
const ProtocolSearchResource = "/opencloud/resource/search/1.0"
@@ -44,7 +45,7 @@ type StreamService struct {
Node common.DiscoveryPeer
Streams common.ProtocolStream
maxNodesConn int
Mu sync.Mutex
Mu sync.RWMutex
// Stream map[protocol.ID]map[pp.ID]*daemons.Stream
}
@@ -66,26 +67,30 @@ func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node c
}
func (s *StreamService) HandlePartnerHeartbeat(stream network.Stream) {
pid, hb, err := common.CheckHeartbeat(s.Host, stream, s.maxNodesConn)
s.Mu.Lock()
if s.Streams[ProtocolHeartbeatPartner] == nil {
s.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{}
}
streams := s.Streams[ProtocolHeartbeatPartner]
streamsAnonym := map[pp.ID]common.HeartBeatStreamed{}
for k, v := range streams {
streamsAnonym[k] = v
}
s.Mu.Unlock()
pid, hb, err := common.CheckHeartbeat(s.Host, stream, streamsAnonym, &s.Mu, s.maxNodesConn)
if err != nil {
return
}
s.Mu.Lock()
defer s.Mu.Unlock()
if s.Streams[ProtocolHeartbeatPartner] == nil {
s.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{}
}
streams := s.Streams[ProtocolHeartbeatPartner]
// if record already seen update last seen
if rec, ok := streams[*pid]; ok {
rec.DID = hb.DID
rec.Expiry = time.Now().UTC().Add(2 * time.Minute)
} else { // if not in stream ?
pid := stream.Conn().RemotePeer()
ai, err := pp.AddrInfoFromP2pAddr(stream.Conn().RemoteMultiaddr())
val, err := stream.Conn().RemoteMultiaddr().ValueForProtocol(ma.P_IP4)
if err == nil {
s.ConnectToPartner(pid, ai)
s.ConnectToPartner(val)
}
}
go s.StartGC(30 * time.Second)
@@ -111,22 +116,18 @@ func (s *StreamService) connectToPartners() error {
return err
}
for _, p := range peers {
ad, err := pp.AddrInfoFromString(p.StreamAddress)
if err != nil {
continue
}
pid, err := pp.Decode(p.PeerID)
if err != nil {
continue
}
s.ConnectToPartner(pid, ad)
s.ConnectToPartner(p.StreamAddress)
// heartbeat your partner.
}
// TODO if handle... from partner then HeartBeat back
return nil
}
func (s *StreamService) ConnectToPartner(pid pp.ID, ad *pp.AddrInfo) {
func (s *StreamService) ConnectToPartner(address string) {
ad, err := pp.AddrInfoFromString(address)
if err != nil {
return
}
logger := oclib.GetLogger()
force := false
for _, proto := range protocols {
@@ -134,11 +135,11 @@ func (s *StreamService) ConnectToPartner(pid pp.ID, ad *pp.AddrInfo) {
if s.Streams[proto] == nil {
s.Streams[proto] = map[pp.ID]*common.Stream{}
}
s.Streams[proto][pid] = &common.Stream{
s.Streams[proto][ad.ID] = &common.Stream{
Stream: ss,
Expiry: time.Now().UTC().Add(2 * time.Minute),
}
go s.readLoop(s.Streams[proto][pid])
go s.readLoop(s.Streams[proto][ad.ID])
}
if s.Host.Network().Connectedness(ad.ID) != network.Connected {
force = true
@@ -147,10 +148,10 @@ func (s *StreamService) ConnectToPartner(pid pp.ID, ad *pp.AddrInfo) {
continue
}
}
s.Streams = common.AddStreamProtocol(nil, s.Streams, s.Host, proto, pid, s.Key, force, &f)
s.Streams = common.AddStreamProtocol(nil, s.Streams, s.Host, proto, ad.ID, s.Key, force, &f)
}
common.SendHeartbeat(context.Background(), ProtocolHeartbeatPartner, conf.GetConfig().Name,
s.Host, s.Streams, []*pp.AddrInfo{ad}, 20*time.Second)
s.Host, s.Streams, map[string]*pp.AddrInfo{address: ad}, 20*time.Second)
}
func (s *StreamService) searchPeer(search string) ([]*peer.Peer, error) {