Files
oc-peer/infrastructure/dht.go

447 lines
10 KiB
Go
Raw Normal View History

2026-01-15 13:35:11 +01:00
package infrastructure
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"oc-peer/conf"
2026-01-26 11:23:04 +01:00
"slices"
2026-01-26 15:45:13 +01:00
"sync"
2026-01-15 13:35:11 +01:00
"time"
oclib "cloud.o-forge.io/core/oc-lib"
2026-01-26 11:23:04 +01:00
pp "cloud.o-forge.io/core/oc-lib/models/peer"
2026-01-15 13:35:11 +01:00
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/host"
2026-01-27 16:23:28 +01:00
"github.com/libp2p/go-libp2p/core/peer"
2026-01-15 13:35:11 +01:00
)
type DHTRecord struct {
2026-01-26 11:23:04 +01:00
Name string `json:"name"`
State int `json:"state"`
DID string `json:"did"`
PeerID string `json:"peer_id"`
PubKey []byte `json:"pub_key"`
URL string `json:"url"`
NATSUrl string `json:"nats_url"`
Wallet string `json:"wallet"`
Signature []byte `json:"signature"`
ExpiryDate time.Time `json:"expiry_date"`
2026-01-15 13:35:11 +01:00
}
type DHTService struct {
2026-01-26 11:23:04 +01:00
Key string
Host host.Host
DHT *dht.IpfsDHT
Cache []string
2026-01-26 15:45:13 +01:00
mutex sync.RWMutex
2026-01-15 13:35:11 +01:00
}
var singletonService *DHTService
func GetDHTService() *DHTService {
return singletonService
}
func Init(ctx context.Context) (*DHTService, error) {
service := &DHTService{}
2026-01-26 11:23:04 +01:00
priv, err := LoadKeyFromFile(false)
2026-01-15 13:35:11 +01:00
if err != nil {
return nil, err
}
2026-01-27 15:49:57 +01:00
psk, err := LoadPSKFromFile()
if err != nil {
return nil, err
}
2026-01-15 13:35:11 +01:00
h, err := libp2p.New(
2026-01-27 15:49:57 +01:00
libp2p.PrivateNetwork(psk),
2026-01-15 13:35:11 +01:00
libp2p.Identity(priv),
libp2p.ListenAddrStrings(
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", conf.GetConfig().DHTEndpointPort),
),
)
if err != nil {
return nil, err
}
service.Host = h
service.DHT, err = dht.New(ctx, h, dht.MaxRecordAge(24*time.Hour)) // every day DHT will purge expired data... if not used.
if err != nil {
return nil, err
}
2026-01-26 11:23:04 +01:00
err = service.DHT.Bootstrap(ctx)
if err != nil {
return nil, err
}
2026-01-27 16:23:28 +01:00
pi, err := peer.AddrInfoFromString(conf.GetConfig().BootstrapAddress)
if err != nil {
return nil, err
}
logger := oclib.GetLogger()
if err := h.Connect(ctx, *pi); err != nil {
logger.Err(fmt.Errorf("Failed to connect to MAIN bootstrap peer %s: %s", pi.ID, err))
} else {
logger.Info().Msg(fmt.Sprintf("Connected to MAIN bootstrap peer %s", pi.ID))
}
2026-01-15 13:35:11 +01:00
singletonService = service
2026-01-23 08:07:17 +01:00
if VerifyPubWithPriv() {
if _, err := singletonService.ClaimName(context.Background(),
conf.GetConfig().Name,
conf.GetConfig().Hostname); err == nil {
2026-01-26 11:23:04 +01:00
go service.RefreshKeys(ctx, 30*time.Minute)
}
}
return service, nil
}
func (d *DHTService) RefreshKeys(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
go func() {
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s := []string{}
2026-01-26 15:45:13 +01:00
d.mutex.Lock()
2026-01-26 11:23:04 +01:00
s = append(s, d.Cache...)
2026-01-26 15:45:13 +01:00
d.mutex.Unlock()
2026-01-26 11:23:04 +01:00
for _, key := range s {
_, _ = d.GetValue(ctx, key)
2026-01-23 08:07:17 +01:00
}
2026-01-26 11:23:04 +01:00
}
2026-01-15 13:35:11 +01:00
}
2026-01-26 11:23:04 +01:00
}()
}
func (d *DHTService) PutValue(
ctx context.Context,
key string,
value []byte,
) error {
err := d.DHT.PutValue(ctx, key, value)
if err != nil {
return err
}
2026-01-26 15:45:13 +01:00
d.mutex.Lock()
2026-01-26 11:23:04 +01:00
if !slices.Contains(d.Cache, key) {
d.Cache = append(d.Cache, key)
2026-01-15 13:35:11 +01:00
}
2026-01-26 15:45:13 +01:00
d.mutex.Unlock()
2026-01-26 11:23:04 +01:00
return nil
}
2026-01-23 08:07:17 +01:00
2026-01-26 11:23:04 +01:00
func (d *DHTService) GetValue(
ctx context.Context,
key string,
) (*DHTRecord, error) {
dht, err := d.DHT.GetValue(ctx, key)
if err != nil {
cache := []string{}
2026-01-26 15:45:13 +01:00
d.mutex.Lock()
2026-01-26 11:23:04 +01:00
for _, c := range d.Cache {
if c != key {
cache = append(cache, c)
}
}
d.Cache = cache
2026-01-26 15:45:13 +01:00
d.mutex.Unlock()
2026-01-26 11:23:04 +01:00
return nil, err
}
2026-01-26 15:45:13 +01:00
d.mutex.Lock()
2026-01-26 11:23:04 +01:00
if !slices.Contains(d.Cache, key) {
d.Cache = append(d.Cache, key)
}
2026-01-26 15:45:13 +01:00
d.mutex.Unlock()
var data DHTRecord
json.Unmarshal(dht, &data)
2026-01-26 11:23:04 +01:00
2026-01-26 15:45:13 +01:00
peerID, err := oclib.GenerateNodeID()
if err != nil {
return nil, err
2026-01-26 11:23:04 +01:00
}
access := pp.NewAccessor(&tools.APIRequest{Admin: true})
p := &pp.Peer{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: data.Name,
},
State: pp.ONLINE,
Relation: pp.SELF,
2026-01-26 15:45:13 +01:00
PeerID: peerID,
2026-01-26 11:23:04 +01:00
PublicKey: string(data.PubKey),
Url: data.URL,
NATSUrl: oclib.GetConfig().NATSUrl,
WalletAddress: data.Wallet,
}
if founded, _, err := access.Search(nil, fmt.Sprintf("%v", pp.SELF.EnumIndex()), false); err != nil || len(founded) == 0 {
access.StoreOne(p)
} else if f, _, err := access.LoadOne(founded[0].GetID()); err == nil && f.(*pp.Peer).Relation != pp.BLACKLIST {
f.(*pp.Peer).State = pp.ONLINE
f.(*pp.Peer).NATSUrl = p.NATSUrl
f.(*pp.Peer).Url = p.Url
f.(*pp.Peer).PeerID = p.PeerID
f.(*pp.Peer).Relation = p.Relation
f.(*pp.Peer).WalletAddress = p.WalletAddress
access.UpdateOne(f, f.GetID())
}
2026-01-26 15:45:13 +01:00
return &data, err
2026-01-26 11:23:04 +01:00
}
2026-01-26 15:45:13 +01:00
func (d *DHTService) generateKey() (string, error) {
s, err := oclib.GenerateNodeID()
if err != nil {
return s, err
}
return "/opencloud/peer/" + s, nil
2026-01-15 13:35:11 +01:00
}
// Create your peer.
func (d *DHTService) ClaimName(
ctx context.Context,
name string,
endPoint string,
2026-01-26 11:23:04 +01:00
) (*pp.Peer, error) {
2026-01-15 13:35:11 +01:00
if endPoint == "" {
return nil, errors.New("no endpoint found for peer" + name)
}
2026-01-26 15:45:13 +01:00
peerID, err := oclib.GenerateNodeID()
if err != nil {
return nil, err
}
2026-01-15 13:35:11 +01:00
pub := d.Host.Peerstore().PubKey(d.Host.ID())
pubBytes, _ := pub.Raw()
now := time.Now()
expiry := now.Add(1 * time.Hour)
2026-01-26 15:45:13 +01:00
2026-01-15 13:35:11 +01:00
rec := DHTRecord{
Name: name,
2026-01-26 15:45:13 +01:00
PeerID: peerID,
2026-01-15 13:35:11 +01:00
PubKey: pubBytes,
}
payload, _ := json.Marshal(rec)
sig, _ := sign(d.Host.Peerstore().PrivKey(d.Host.ID()), payload)
rec.Signature = sig
rec.URL = endPoint
2026-01-23 08:07:17 +01:00
rec.NATSUrl = oclib.GetConfig().NATSUrl
2026-01-26 11:23:04 +01:00
rec.State = pp.ONLINE.EnumIndex()
2026-01-15 13:35:11 +01:00
rec.ExpiryDate = expiry
data, _ := json.Marshal(rec)
2026-01-26 15:45:13 +01:00
key, err := d.generateKey()
if err != nil {
return nil, err
}
2026-01-15 13:35:11 +01:00
// retrieve your key name in standard
2026-01-26 11:23:04 +01:00
old, err := d.GetValue(ctx, key)
if err == nil {
2026-01-26 15:45:13 +01:00
if old.PeerID != peerID { // check if someone claims your name before
2026-01-15 13:35:11 +01:00
return nil, errors.New("name already claimed by another peer")
2026-01-26 11:23:04 +01:00
// TODO : can be fragile if 2 peers connect at the same time
2026-01-15 13:35:11 +01:00
}
if now.After(old.ExpiryDate) {
payload, _ := json.Marshal(rec)
2026-01-26 11:23:04 +01:00
d.PutValue(ctx, key, payload)
2026-01-15 13:35:11 +01:00
}
}
2026-01-26 11:23:04 +01:00
if err := d.PutValue(ctx, key, data); err != nil {
2026-01-15 13:35:11 +01:00
return nil, err
}
pubStr := base64.StdEncoding.EncodeToString(pubBytes)
d.Key = key
2026-01-26 11:23:04 +01:00
access := pp.NewAccessor(&tools.APIRequest{Admin: true})
2026-01-26 15:45:13 +01:00
if err != nil {
return nil, err
}
2026-01-26 11:23:04 +01:00
p := &pp.Peer{
2026-01-15 13:35:11 +01:00
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: name,
},
2026-01-26 11:23:04 +01:00
State: pp.ONLINE,
Relation: pp.SELF,
2026-01-26 15:45:13 +01:00
PeerID: peerID,
2026-01-15 13:35:11 +01:00
PublicKey: pubStr,
Url: endPoint,
2026-01-23 08:07:17 +01:00
NATSUrl: oclib.GetConfig().NATSUrl,
2026-01-15 13:35:11 +01:00
WalletAddress: "my-wallet",
}
2026-01-26 11:23:04 +01:00
if founded, _, err := access.Search(nil, fmt.Sprintf("%v", pp.SELF.EnumIndex()), false); err != nil || len(founded) == 0 {
2026-01-15 13:35:11 +01:00
access.StoreOne(p)
} else if f, _, err := access.LoadOne(founded[0].GetID()); err == nil {
2026-01-26 15:45:13 +01:00
peerID, err := oclib.GenerateNodeID()
if err != nil {
return nil, err
}
2026-01-26 11:23:04 +01:00
f.(*pp.Peer).Name = name
2026-01-26 15:45:13 +01:00
f.(*pp.Peer).PeerID = peerID
2026-01-26 11:23:04 +01:00
f.(*pp.Peer).State = pp.ONLINE
f.(*pp.Peer).Url = endPoint
f.(*pp.Peer).NATSUrl = oclib.GetConfig().NATSUrl
f.(*pp.Peer).Relation = pp.SELF
2026-01-15 13:35:11 +01:00
access.UpdateOne(f, f.GetID())
}
return p, nil
}
2026-01-26 11:23:04 +01:00
func (d *DHTService) treatPeer(ctx context.Context, key string, rec *DHTRecord) (*pp.Peer, error) {
2026-01-15 13:35:11 +01:00
pubKey, err := crypto.UnmarshalPublicKey(rec.PubKey)
if err != nil {
return nil, err
}
now := time.Now()
dht := DHTRecord{
Name: rec.Name,
PeerID: rec.PeerID,
PubKey: rec.PubKey,
}
payload, _ := json.Marshal(dht)
if ok, _ := verify(pubKey, payload, rec.Signature); !ok {
return nil, errors.New("invalid signature")
}
pubBytes, _ := pubKey.Raw()
pubStr := base64.StdEncoding.EncodeToString(pubBytes)
2026-01-26 11:23:04 +01:00
rel := pp.NONE
2026-01-15 13:35:11 +01:00
if d.Key == key {
2026-01-26 11:23:04 +01:00
rel = pp.SELF
2026-01-15 13:35:11 +01:00
}
2026-01-26 11:23:04 +01:00
p := &pp.Peer{
2026-01-15 13:35:11 +01:00
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: rec.Name,
},
2026-01-26 11:23:04 +01:00
State: pp.ONLINE,
Relation: rel,
PeerID: rec.PeerID,
PublicKey: pubStr,
Url: rec.URL,
NATSUrl: rec.NATSUrl,
WalletAddress: rec.Wallet,
}
access := pp.NewAccessor(&tools.APIRequest{Admin: true})
2026-01-15 13:35:11 +01:00
if now.After(rec.ExpiryDate) {
2026-01-26 11:23:04 +01:00
rec.State = pp.OFFLINE.EnumIndex()
p.State = pp.OFFLINE
2026-01-15 13:35:11 +01:00
payload, _ := json.Marshal(rec)
2026-01-26 11:23:04 +01:00
d.PutValue(ctx, key, payload)
2026-01-15 13:35:11 +01:00
if founded, _, err := access.Search(nil, p.Name, false); err != nil || len(founded) == 0 {
access.StoreOne(p)
} else if f, _, err := access.LoadOne(founded[0].GetID()); err == nil {
2026-01-26 11:23:04 +01:00
f.(*pp.Peer).State = pp.OFFLINE
2026-01-15 13:35:11 +01:00
access.UpdateOne(f, f.GetID())
}
return nil, errors.New("peer " + key + " is expired")
}
2026-01-26 11:23:04 +01:00
if rec.State == pp.OFFLINE.EnumIndex() {
2026-01-15 13:35:11 +01:00
if founded, _, err := access.Search(nil, p.Name, false); err != nil || len(founded) == 0 {
2026-01-26 11:23:04 +01:00
ppp, _, err := access.StoreOne(p)
2026-01-15 13:35:11 +01:00
if err == nil {
2026-01-26 11:23:04 +01:00
return ppp.(*pp.Peer), nil
2026-01-15 13:35:11 +01:00
}
} else {
2026-01-26 11:23:04 +01:00
f, ok := pp.CheckPeerStatus(founded[0].GetID(), "")
2026-01-15 13:35:11 +01:00
if ok {
2026-01-26 11:23:04 +01:00
f.State = pp.ONLINE
2026-01-15 13:35:11 +01:00
} else {
2026-01-26 11:23:04 +01:00
f.State = pp.OFFLINE
2026-01-15 13:35:11 +01:00
}
2026-01-26 11:23:04 +01:00
ppp, _, err := access.UpdateOne(f, f.GetID())
2026-01-15 13:35:11 +01:00
if err == nil && ok {
2026-01-26 11:23:04 +01:00
return ppp.(*pp.Peer), nil
2026-01-15 13:35:11 +01:00
}
}
}
return nil, errors.New("peer " + key + " is offline")
}
// Discover a specific Peer
func (d *DHTService) DiscoverPeers(
ctx context.Context,
name string,
2026-01-26 11:23:04 +01:00
) ([]*pp.Peer, error) {
peers := []*pp.Peer{}
2026-01-26 15:45:13 +01:00
key, err := d.generateKey()
if err != nil {
return nil, err
}
2026-01-15 13:35:11 +01:00
datas, err := d.DHT.SearchValue(ctx, key)
if err != nil {
return nil, errors.New("no DHT peer not found")
}
for data := range datas {
2026-01-26 11:23:04 +01:00
var dht *DHTRecord
json.Unmarshal(data, dht)
if p, err := d.treatPeer(ctx, key, dht); err == nil {
2026-01-15 13:35:11 +01:00
peers = append(peers, p)
}
}
return peers, nil
}
2026-01-26 11:23:04 +01:00
func (d *DHTService) GetPeer(
ctx context.Context,
name string,
) (*pp.Peer, error) {
2026-01-26 15:45:13 +01:00
key, err := d.generateKey()
if err != nil {
return nil, err
}
2026-01-26 11:23:04 +01:00
data, err := d.GetValue(ctx, key)
if err != nil {
return nil, errors.New("no DHT peer not found")
}
return d.treatPeer(ctx, key, data)
}
2026-01-15 13:35:11 +01:00
func (d *DHTService) existsDHT(ctx context.Context) (*DHTRecord, error) {
if d.Key == "" {
return nil, errors.New("no self peer found")
}
2026-01-26 11:23:04 +01:00
rec, err := d.GetValue(ctx, d.Key)
2026-01-15 13:35:11 +01:00
if err != nil {
return nil, errors.New("no DHT peer found")
}
pubKey, err := crypto.UnmarshalPublicKey(rec.PubKey)
if err != nil {
2026-01-26 11:23:04 +01:00
return rec, err
2026-01-15 13:35:11 +01:00
}
dht := DHTRecord{
Name: rec.Name,
PeerID: rec.PeerID,
PubKey: rec.PubKey,
}
payload, _ := json.Marshal(dht)
if ok, _ := verify(pubKey, payload, rec.Signature); !ok {
2026-01-26 11:23:04 +01:00
return rec, err
2026-01-15 13:35:11 +01:00
}
2026-01-26 11:23:04 +01:00
return rec, nil
2026-01-15 13:35:11 +01:00
}