561 lines
15 KiB
Go
561 lines
15 KiB
Go
package common
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
cr "crypto/rand"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"net"
|
|
"oc-discovery/conf"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
oclib "cloud.o-forge.io/core/oc-lib"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
"github.com/libp2p/go-libp2p/core/network"
|
|
pp "github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/libp2p/go-libp2p/core/protocol"
|
|
)
|
|
|
|
type LongLivedStreamRecordedService[T interface{}] struct {
|
|
*LongLivedPubSubService
|
|
StreamRecords map[protocol.ID]map[pp.ID]*StreamRecord[T]
|
|
StreamMU sync.RWMutex
|
|
maxNodesConn int
|
|
isBidirectionnal bool
|
|
}
|
|
|
|
func NewStreamRecordedService[T interface{}](h host.Host, maxNodesConn int, isBidirectionnal bool) *LongLivedStreamRecordedService[T] {
|
|
service := &LongLivedStreamRecordedService[T]{
|
|
LongLivedPubSubService: NewLongLivedPubSubService(h),
|
|
StreamRecords: map[protocol.ID]map[pp.ID]*StreamRecord[T]{},
|
|
maxNodesConn: maxNodesConn,
|
|
isBidirectionnal: isBidirectionnal,
|
|
}
|
|
go service.StartGC(30 * time.Second)
|
|
// Garbage collection is needed on every Map of Long-Lived Stream... it may be a top level redesigned
|
|
go service.Snapshot(1 * time.Hour)
|
|
return service
|
|
}
|
|
|
|
func (ix *LongLivedStreamRecordedService[T]) StartGC(interval time.Duration) {
|
|
go func() {
|
|
t := time.NewTicker(interval)
|
|
defer t.Stop()
|
|
for range t.C {
|
|
ix.gc()
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (ix *LongLivedStreamRecordedService[T]) gc() {
|
|
ix.StreamMU.Lock()
|
|
defer ix.StreamMU.Unlock()
|
|
now := time.Now().UTC()
|
|
if ix.StreamRecords[ProtocolHeartbeat] == nil {
|
|
ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{}
|
|
return
|
|
}
|
|
streams := ix.StreamRecords[ProtocolHeartbeat]
|
|
|
|
for pid, rec := range streams {
|
|
if now.After(rec.HeartbeatStream.Expiry) || now.Sub(rec.HeartbeatStream.UptimeTracker.LastSeen) > 2*rec.HeartbeatStream.Expiry.Sub(now) {
|
|
for _, sstreams := range ix.StreamRecords {
|
|
if sstreams[pid] != nil {
|
|
delete(sstreams, pid)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ix *LongLivedStreamRecordedService[T]) Snapshot(interval time.Duration) {
|
|
go func() {
|
|
logger := oclib.GetLogger()
|
|
t := time.NewTicker(interval)
|
|
defer t.Stop()
|
|
for range t.C {
|
|
infos := ix.snapshot()
|
|
for _, inf := range infos {
|
|
logger.Info().Msg(" -> " + inf.DID)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// -------- Snapshot / Query --------
|
|
func (ix *LongLivedStreamRecordedService[T]) snapshot() []*StreamRecord[T] {
|
|
ix.StreamMU.Lock()
|
|
defer ix.StreamMU.Unlock()
|
|
|
|
out := make([]*StreamRecord[T], 0, len(ix.StreamRecords))
|
|
for _, streams := range ix.StreamRecords {
|
|
for _, stream := range streams {
|
|
out = append(out, stream)
|
|
}
|
|
}
|
|
return out
|
|
}
|
|
|
|
func (ix *LongLivedStreamRecordedService[T]) HandleNodeHeartbeat(s network.Stream) {
|
|
defer s.Close()
|
|
for {
|
|
ix.StreamMU.Lock()
|
|
if ix.StreamRecords[ProtocolHeartbeat] == nil {
|
|
ix.StreamRecords[ProtocolHeartbeat] = map[pp.ID]*StreamRecord[T]{}
|
|
}
|
|
streams := ix.StreamRecords[ProtocolHeartbeat]
|
|
streamsAnonym := map[pp.ID]HeartBeatStreamed{}
|
|
for k, v := range streams {
|
|
streamsAnonym[k] = v
|
|
}
|
|
ix.StreamMU.Unlock()
|
|
|
|
pid, hb, err := CheckHeartbeat(ix.Host, s, streamsAnonym, &ix.StreamMU, ix.maxNodesConn)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
ix.StreamMU.Lock()
|
|
// if record already seen update last seen
|
|
if rec, ok := streams[*pid]; ok {
|
|
rec.DID = hb.DID
|
|
rec.HeartbeatStream = hb.Stream
|
|
rec.HeartbeatStream.UptimeTracker.LastSeen = time.Now().UTC()
|
|
} else {
|
|
hb.Stream.UptimeTracker = &UptimeTracker{
|
|
FirstSeen: time.Now().UTC(),
|
|
LastSeen: time.Now().UTC(),
|
|
}
|
|
streams[*pid] = &StreamRecord[T]{
|
|
DID: hb.DID,
|
|
HeartbeatStream: hb.Stream,
|
|
}
|
|
}
|
|
ix.StreamMU.Unlock()
|
|
}
|
|
}
|
|
|
|
func CheckHeartbeat(h host.Host, s network.Stream, streams map[pp.ID]HeartBeatStreamed, lock *sync.RWMutex, maxNodes int) (*pp.ID, *Heartbeat, error) {
|
|
if len(h.Network().Peers()) >= maxNodes {
|
|
return nil, nil, fmt.Errorf("too many connections, try another indexer")
|
|
}
|
|
var hb Heartbeat
|
|
if err := json.NewDecoder(s).Decode(&hb); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
if ok, bpms, err := getBandwidthChallengeRate(MinPayloadChallenge+int(rand.Float64()*(MaxPayloadChallenge-MinPayloadChallenge)), s); err != nil {
|
|
return nil, nil, err
|
|
} else if !ok {
|
|
return nil, nil, fmt.Errorf("Not a proper peer")
|
|
} else {
|
|
pid, err := pp.Decode(hb.PeerID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
upTime := float64(0)
|
|
lock.Lock()
|
|
if rec, ok := streams[pid]; ok && rec.GetUptimeTracker() != nil {
|
|
upTime = rec.GetUptimeTracker().Uptime().Hours() / float64(time.Since(TimeWatcher).Hours())
|
|
}
|
|
lock.Unlock()
|
|
diversity := getDiversityRate(h, hb.IndexersBinded)
|
|
hb.ComputeIndexerScore(upTime, bpms, diversity)
|
|
if hb.Score < 75 {
|
|
return nil, nil, errors.New("not enough trusting value")
|
|
}
|
|
hb.Stream = &Stream{
|
|
Name: hb.Name,
|
|
DID: hb.DID,
|
|
Stream: s,
|
|
Expiry: time.Now().UTC().Add(2 * time.Minute),
|
|
} // here is the long-lived bidirectionnal heart bit.
|
|
return &pid, &hb, err
|
|
}
|
|
}
|
|
|
|
func getDiversityRate(h host.Host, peers []string) float64 {
|
|
peers, _ = checkPeers(h, peers)
|
|
diverse := []string{}
|
|
for _, p := range peers {
|
|
ip, err := ExtractIP(p)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
div := ip.Mask(net.CIDRMask(24, 32)).String()
|
|
if !slices.Contains(diverse, div) {
|
|
diverse = append(diverse, div)
|
|
}
|
|
}
|
|
return float64(len(diverse) / len(peers))
|
|
}
|
|
|
|
func checkPeers(h host.Host, peers []string) ([]string, []string) {
|
|
concretePeer := []string{}
|
|
ips := []string{}
|
|
for _, p := range peers {
|
|
ad, err := pp.AddrInfoFromString(p)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if PeerIsAlive(h, *ad) {
|
|
concretePeer = append(concretePeer, p)
|
|
if ip, err := ExtractIP(p); err == nil {
|
|
ips = append(ips, ip.Mask(net.CIDRMask(24, 32)).String())
|
|
}
|
|
}
|
|
}
|
|
return concretePeer, ips
|
|
}
|
|
|
|
const MaxExpectedMbps = 50.0
|
|
const MinPayloadChallenge = 512
|
|
const MaxPayloadChallenge = 2048
|
|
const BaseRoundTrip = 400 * time.Millisecond
|
|
|
|
func getBandwidthChallengeRate(payloadSize int, s network.Stream) (bool, float64, error) {
|
|
// Génération payload aléatoire
|
|
payload := make([]byte, payloadSize)
|
|
_, err := cr.Read(payload)
|
|
if err != nil {
|
|
return false, 0, err
|
|
}
|
|
start := time.Now()
|
|
// send on heartbeat stream the challenge
|
|
if _, err = s.Write(payload); err != nil {
|
|
return false, 0, err
|
|
}
|
|
// read back
|
|
response := make([]byte, payloadSize)
|
|
_, err = io.ReadFull(s, response)
|
|
if err != nil {
|
|
return false, 0, err
|
|
}
|
|
|
|
duration := time.Since(start)
|
|
// Verify content
|
|
if !bytes.Equal(payload, response) {
|
|
return false, 0, nil // pb or a sadge peer.
|
|
}
|
|
maxRoundTrip := BaseRoundTrip + (time.Duration(payloadSize) * (100 * time.Millisecond))
|
|
mbps := float64(payloadSize*8) / duration.Seconds() / 1e6
|
|
if duration > maxRoundTrip || mbps < 5.0 {
|
|
return false, float64(mbps / MaxExpectedMbps), nil
|
|
}
|
|
return true, float64(mbps / MaxExpectedMbps), nil
|
|
}
|
|
|
|
type UptimeTracker struct {
|
|
FirstSeen time.Time
|
|
LastSeen time.Time
|
|
}
|
|
|
|
func (u *UptimeTracker) Uptime() time.Duration {
|
|
return time.Since(u.FirstSeen)
|
|
}
|
|
|
|
func (u *UptimeTracker) IsEligible(min time.Duration) bool {
|
|
return u.Uptime() >= min
|
|
}
|
|
|
|
type StreamRecord[T interface{}] struct {
|
|
DID string
|
|
HeartbeatStream *Stream
|
|
Record T
|
|
}
|
|
|
|
func (s *StreamRecord[T]) GetUptimeTracker() *UptimeTracker {
|
|
if s.HeartbeatStream == nil {
|
|
return nil
|
|
}
|
|
return s.HeartbeatStream.UptimeTracker
|
|
}
|
|
|
|
type Stream struct {
|
|
Name string `json:"name"`
|
|
DID string `json:"did"`
|
|
Stream network.Stream
|
|
Expiry time.Time `json:"expiry"`
|
|
UptimeTracker *UptimeTracker
|
|
}
|
|
|
|
func (s *Stream) GetUptimeTracker() *UptimeTracker {
|
|
return s.UptimeTracker
|
|
}
|
|
|
|
func NewStream[T interface{}](s network.Stream, did string, record T) *Stream {
|
|
return &Stream{
|
|
DID: did,
|
|
Stream: s,
|
|
Expiry: time.Now().UTC().Add(2 * time.Minute),
|
|
}
|
|
}
|
|
|
|
type ProtocolStream map[protocol.ID]map[pp.ID]*Stream
|
|
|
|
func (ps ProtocolStream) Get(protocol protocol.ID) map[pp.ID]*Stream {
|
|
if ps[protocol] == nil {
|
|
ps[protocol] = map[pp.ID]*Stream{}
|
|
}
|
|
|
|
return ps[protocol]
|
|
}
|
|
|
|
func (ps ProtocolStream) Add(protocol protocol.ID, peerID *pp.ID, s *Stream) error {
|
|
if ps[protocol] == nil {
|
|
ps[protocol] = map[pp.ID]*Stream{}
|
|
}
|
|
if peerID != nil {
|
|
if s != nil {
|
|
ps[protocol][*peerID] = s
|
|
} else {
|
|
return errors.New("unable to add stream : stream missing")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (ps ProtocolStream) Delete(protocol protocol.ID, peerID *pp.ID) {
|
|
if streams, ok := ps[protocol]; ok {
|
|
if peerID != nil && streams[*peerID] != nil {
|
|
streams[*peerID].Stream.Close()
|
|
delete(streams, *peerID)
|
|
} else {
|
|
for _, s := range ps {
|
|
for _, v := range s {
|
|
v.Stream.Close()
|
|
}
|
|
}
|
|
delete(ps, protocol)
|
|
}
|
|
}
|
|
}
|
|
|
|
const (
|
|
ProtocolPublish = "/opencloud/record/publish/1.0"
|
|
ProtocolGet = "/opencloud/record/get/1.0"
|
|
)
|
|
|
|
var TimeWatcher time.Time
|
|
|
|
var StaticIndexers map[string]*pp.AddrInfo = map[string]*pp.AddrInfo{}
|
|
var StreamMuIndexes sync.RWMutex
|
|
var StreamIndexers ProtocolStream = ProtocolStream{}
|
|
|
|
func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) error {
|
|
TimeWatcher = time.Now().UTC()
|
|
logger := oclib.GetLogger()
|
|
addresses := strings.Split(conf.GetConfig().IndexerAddresses, ",")
|
|
|
|
if len(addresses) > maxIndexer {
|
|
addresses = addresses[0:maxIndexer]
|
|
}
|
|
|
|
for _, indexerAddr := range addresses {
|
|
fmt.Println("GENERATE ADDR", indexerAddr)
|
|
ad, err := pp.AddrInfoFromString(indexerAddr)
|
|
if err != nil {
|
|
logger.Err(err)
|
|
continue
|
|
}
|
|
/*for _, proto := range []protocol.ID{ProtocolPublish, ProtocolGet, ProtocolHeartbeat} {
|
|
if stream, err := TempStream(h, *ad, proto); err == nil {
|
|
StreamMuIndexes.Lock()
|
|
if StreamIndexers[proto] == nil {
|
|
StreamIndexers[proto] = map[pp.ID]*Stream{}
|
|
}
|
|
time.AfterFunc(2*time.Second, func() {
|
|
StreamMuIndexes.Lock()
|
|
defer StreamMuIndexes.Unlock()
|
|
delete(StreamIndexers[proto], ad.ID)
|
|
})
|
|
StreamIndexers[proto][ad.ID] = &Stream{
|
|
Stream: stream,
|
|
Expiry: time.Now().UTC().Add(2 * time.Second),
|
|
}
|
|
StreamMuIndexes.Unlock()
|
|
} else {
|
|
continue
|
|
}
|
|
}*/
|
|
StaticIndexers[indexerAddr] = ad
|
|
}
|
|
|
|
SendHeartbeat(context.Background(), ProtocolHeartbeat, conf.GetConfig().Name, h, StreamIndexers, StaticIndexers, 20*time.Second) // your indexer is just like a node for the next indexer.
|
|
if len(StaticIndexers) < minIndexer {
|
|
return errors.New("you run a node without indexers... your gonna be isolated.")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, force bool, onStreamCreated *func(network.Stream)) ProtocolStream {
|
|
if onStreamCreated == nil {
|
|
f := func(s network.Stream) {
|
|
protoS[proto][id] = &Stream{
|
|
Stream: s,
|
|
Expiry: time.Now().UTC().Add(2 * time.Minute),
|
|
}
|
|
}
|
|
onStreamCreated = &f
|
|
}
|
|
f := *onStreamCreated
|
|
if mypid > id || force {
|
|
if ctx == nil {
|
|
c := context.Background()
|
|
ctx = &c
|
|
}
|
|
if protoS[proto] == nil {
|
|
protoS[proto] = map[pp.ID]*Stream{}
|
|
}
|
|
|
|
if protoS[proto][id] != nil {
|
|
protoS[proto][id].Expiry = time.Now().Add(2 * time.Minute)
|
|
} else {
|
|
fmt.Println("NEW STREAM", proto, id)
|
|
s, err := h.NewStream(*ctx, id, proto)
|
|
if err != nil {
|
|
panic(err.Error())
|
|
}
|
|
f(s)
|
|
}
|
|
}
|
|
return protoS
|
|
}
|
|
|
|
type Heartbeat struct {
|
|
Name string `json:"name"`
|
|
Stream *Stream `json:"stream"`
|
|
DID string `json:"did"`
|
|
PeerID string `json:"peer_id"`
|
|
Timestamp int64 `json:"timestamp"`
|
|
IndexersBinded []string `json:"indexers_binded"`
|
|
Score float64
|
|
}
|
|
|
|
func (hb *Heartbeat) ComputeIndexerScore(uptimeHours float64, bpms float64, diversity float64) {
|
|
hb.Score = (0.4 * uptimeHours) +
|
|
(0.4 * bpms) +
|
|
(0.2 * diversity)
|
|
}
|
|
|
|
type HeartbeatInfo []struct {
|
|
Info []byte `json:"info"`
|
|
}
|
|
|
|
const ProtocolHeartbeat = "/opencloud/heartbeat/1.0"
|
|
|
|
func SendHeartbeat(ctx context.Context, proto protocol.ID, name string, h host.Host, ps ProtocolStream, peers map[string]*pp.AddrInfo, interval time.Duration) {
|
|
peerID, err := oclib.GenerateNodeID()
|
|
if err == nil {
|
|
panic("can't heartbeat daemon failed to start")
|
|
}
|
|
go func() {
|
|
t := time.NewTicker(interval)
|
|
defer t.Stop()
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
addrs := []string{}
|
|
for addr := range StaticIndexers {
|
|
addrs = append(addrs, addr)
|
|
}
|
|
hb := Heartbeat{
|
|
Name: name,
|
|
DID: peerID,
|
|
PeerID: h.ID().String(),
|
|
Timestamp: time.Now().UTC().Unix(),
|
|
IndexersBinded: addrs,
|
|
}
|
|
for _, ix := range peers {
|
|
if err = sendHeartbeat(ctx, h, proto, ix, hb, ps, interval*time.Second); err != nil {
|
|
StreamMuIndexes.Lock()
|
|
delete(StreamIndexers[proto], ix.ID)
|
|
StreamMuIndexes.Unlock()
|
|
}
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func TempStream(h host.Host, ad pp.AddrInfo, proto protocol.ID, did string, streams ProtocolStream, mu *sync.RWMutex) (ProtocolStream, error) {
|
|
if ctxTTL, err := context.WithTimeout(context.Background(), 2*time.Second); err == nil {
|
|
if h.Network().Connectedness(ad.ID) != network.Connected {
|
|
if err := h.Connect(ctxTTL, ad); err != nil {
|
|
return streams, err
|
|
}
|
|
}
|
|
if streams[proto] != nil && streams[proto][ad.ID] != nil {
|
|
return streams, nil
|
|
} else if s, err := h.NewStream(ctxTTL, ad.ID, proto); err == nil {
|
|
mu.Lock()
|
|
if streams[proto] == nil {
|
|
streams[proto] = map[pp.ID]*Stream{}
|
|
}
|
|
mu.Unlock()
|
|
time.AfterFunc(2*time.Second, func() {
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
delete(streams[proto], ad.ID)
|
|
})
|
|
streams[ProtocolPublish][ad.ID] = &Stream{
|
|
DID: did,
|
|
Stream: s,
|
|
Expiry: time.Now().UTC().Add(2 * time.Second),
|
|
}
|
|
mu.Unlock()
|
|
return streams, nil
|
|
} else {
|
|
return streams, err
|
|
}
|
|
}
|
|
return streams, errors.New("can't create a context")
|
|
}
|
|
|
|
func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.AddrInfo,
|
|
hb Heartbeat, ps ProtocolStream, interval time.Duration) error {
|
|
streams := ps.Get(proto)
|
|
if len(streams) == 0 {
|
|
return errors.New("no stream for protocol heartbeat founded")
|
|
}
|
|
pss, exists := streams[p.ID]
|
|
ctxTTL, _ := context.WithTimeout(ctx, 3*interval)
|
|
// Connect si nécessaire
|
|
if h.Network().Connectedness(p.ID) != network.Connected {
|
|
if err := h.Connect(ctxTTL, *p); err != nil {
|
|
return err
|
|
}
|
|
exists = false // on devra recréer le stream
|
|
}
|
|
// Crée le stream si inexistant ou fermé
|
|
if !exists || pss.Stream == nil {
|
|
s, err := h.NewStream(ctx, p.ID, proto)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
pss = &Stream{
|
|
Stream: s,
|
|
Expiry: time.Now().UTC().Add(2 * time.Minute),
|
|
}
|
|
streams[p.ID] = pss
|
|
}
|
|
|
|
// Envoie le heartbeat
|
|
ss := json.NewEncoder(pss.Stream)
|
|
err := ss.Encode(&hb)
|
|
if err != nil {
|
|
pss.Stream.Close()
|
|
pss.Stream = nil // recréera au prochain tick
|
|
return err
|
|
}
|
|
pss.Expiry = time.Now().UTC().Add(2 * time.Minute)
|
|
return nil
|
|
}
|