package infrastructure import ( "context" "encoding/base64" "encoding/json" "errors" "fmt" "oc-peer/conf" "slices" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" pp "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" "github.com/google/uuid" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" ) type DHTRecord struct { Name string `json:"name"` State int `json:"state"` DID string `json:"did"` PeerID string `json:"peer_id"` PubKey []byte `json:"pub_key"` URL string `json:"url"` NATSUrl string `json:"nats_url"` Wallet string `json:"wallet"` Signature []byte `json:"signature"` ExpiryDate time.Time `json:"expiry_date"` } type DHTService struct { Key string Host host.Host DHT *dht.IpfsDHT Cache []string mutex sync.RWMutex } var singletonService *DHTService func GetDHTService() *DHTService { return singletonService } func Init(ctx context.Context) (*DHTService, error) { service := &DHTService{} priv, err := LoadKeyFromFile(false) if err != nil { return nil, err } h, err := libp2p.New( libp2p.Identity(priv), libp2p.ListenAddrStrings( fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", conf.GetConfig().DHTEndpointPort), ), ) if err != nil { return nil, err } service.Host = h service.DHT, err = dht.New(ctx, h, dht.MaxRecordAge(24*time.Hour)) // every day DHT will purge expired data... if not used. if err != nil { return nil, err } err = service.DHT.Bootstrap(ctx) if err != nil { return nil, err } singletonService = service if VerifyPubWithPriv() { if _, err := singletonService.ClaimName(context.Background(), conf.GetConfig().Name, conf.GetConfig().Hostname); err == nil { go service.RefreshKeys(ctx, 30*time.Minute) } } return service, nil } func (d *DHTService) RefreshKeys(ctx context.Context, interval time.Duration) { ticker := time.NewTicker(interval) go func() { defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: s := []string{} d.mutex.Lock() s = append(s, d.Cache...) d.mutex.Unlock() for _, key := range s { _, _ = d.GetValue(ctx, key) } } } }() } func (d *DHTService) PutValue( ctx context.Context, key string, value []byte, ) error { err := d.DHT.PutValue(ctx, key, value) if err != nil { return err } d.mutex.Lock() if !slices.Contains(d.Cache, key) { d.Cache = append(d.Cache, key) } d.mutex.Unlock() return nil } func (d *DHTService) GetValue( ctx context.Context, key string, ) (*DHTRecord, error) { dht, err := d.DHT.GetValue(ctx, key) if err != nil { cache := []string{} d.mutex.Lock() for _, c := range d.Cache { if c != key { cache = append(cache, c) } } d.Cache = cache d.mutex.Unlock() return nil, err } d.mutex.Lock() if !slices.Contains(d.Cache, key) { d.Cache = append(d.Cache, key) } d.mutex.Unlock() var data DHTRecord json.Unmarshal(dht, &data) peerID, err := oclib.GenerateNodeID() if err != nil { return nil, err } access := pp.NewAccessor(&tools.APIRequest{Admin: true}) p := &pp.Peer{ AbstractObject: utils.AbstractObject{ UUID: uuid.New().String(), Name: data.Name, }, State: pp.ONLINE, Relation: pp.SELF, PeerID: peerID, PublicKey: string(data.PubKey), Url: data.URL, NATSUrl: oclib.GetConfig().NATSUrl, WalletAddress: data.Wallet, } if founded, _, err := access.Search(nil, fmt.Sprintf("%v", pp.SELF.EnumIndex()), false); err != nil || len(founded) == 0 { access.StoreOne(p) } else if f, _, err := access.LoadOne(founded[0].GetID()); err == nil && f.(*pp.Peer).Relation != pp.BLACKLIST { f.(*pp.Peer).State = pp.ONLINE f.(*pp.Peer).NATSUrl = p.NATSUrl f.(*pp.Peer).Url = p.Url f.(*pp.Peer).PeerID = p.PeerID f.(*pp.Peer).Relation = p.Relation f.(*pp.Peer).WalletAddress = p.WalletAddress access.UpdateOne(f, f.GetID()) } return &data, err } func (d *DHTService) generateKey() (string, error) { s, err := oclib.GenerateNodeID() if err != nil { return s, err } return "/opencloud/peer/" + s, nil } // Create your peer. func (d *DHTService) ClaimName( ctx context.Context, name string, endPoint string, ) (*pp.Peer, error) { if endPoint == "" { return nil, errors.New("no endpoint found for peer" + name) } peerID, err := oclib.GenerateNodeID() if err != nil { return nil, err } pub := d.Host.Peerstore().PubKey(d.Host.ID()) pubBytes, _ := pub.Raw() now := time.Now() expiry := now.Add(1 * time.Hour) rec := DHTRecord{ Name: name, PeerID: peerID, PubKey: pubBytes, } payload, _ := json.Marshal(rec) sig, _ := sign(d.Host.Peerstore().PrivKey(d.Host.ID()), payload) rec.Signature = sig rec.URL = endPoint rec.NATSUrl = oclib.GetConfig().NATSUrl rec.State = pp.ONLINE.EnumIndex() rec.ExpiryDate = expiry data, _ := json.Marshal(rec) key, err := d.generateKey() if err != nil { return nil, err } // retrieve your key name in standard old, err := d.GetValue(ctx, key) if err == nil { if old.PeerID != peerID { // check if someone claims your name before return nil, errors.New("name already claimed by another peer") // TODO : can be fragile if 2 peers connect at the same time } if now.After(old.ExpiryDate) { payload, _ := json.Marshal(rec) d.PutValue(ctx, key, payload) } } if err := d.PutValue(ctx, key, data); err != nil { return nil, err } pubStr := base64.StdEncoding.EncodeToString(pubBytes) d.Key = key access := pp.NewAccessor(&tools.APIRequest{Admin: true}) if err != nil { return nil, err } p := &pp.Peer{ AbstractObject: utils.AbstractObject{ UUID: uuid.New().String(), Name: name, }, State: pp.ONLINE, Relation: pp.SELF, PeerID: peerID, PublicKey: pubStr, Url: endPoint, NATSUrl: oclib.GetConfig().NATSUrl, WalletAddress: "my-wallet", } if founded, _, err := access.Search(nil, fmt.Sprintf("%v", pp.SELF.EnumIndex()), false); err != nil || len(founded) == 0 { access.StoreOne(p) } else if f, _, err := access.LoadOne(founded[0].GetID()); err == nil { peerID, err := oclib.GenerateNodeID() if err != nil { return nil, err } f.(*pp.Peer).Name = name f.(*pp.Peer).PeerID = peerID f.(*pp.Peer).State = pp.ONLINE f.(*pp.Peer).Url = endPoint f.(*pp.Peer).NATSUrl = oclib.GetConfig().NATSUrl f.(*pp.Peer).Relation = pp.SELF access.UpdateOne(f, f.GetID()) } return p, nil } func (d *DHTService) treatPeer(ctx context.Context, key string, rec *DHTRecord) (*pp.Peer, error) { pubKey, err := crypto.UnmarshalPublicKey(rec.PubKey) if err != nil { return nil, err } now := time.Now() dht := DHTRecord{ Name: rec.Name, PeerID: rec.PeerID, PubKey: rec.PubKey, } payload, _ := json.Marshal(dht) if ok, _ := verify(pubKey, payload, rec.Signature); !ok { return nil, errors.New("invalid signature") } pubBytes, _ := pubKey.Raw() pubStr := base64.StdEncoding.EncodeToString(pubBytes) rel := pp.NONE if d.Key == key { rel = pp.SELF } p := &pp.Peer{ AbstractObject: utils.AbstractObject{ UUID: uuid.New().String(), Name: rec.Name, }, State: pp.ONLINE, Relation: rel, PeerID: rec.PeerID, PublicKey: pubStr, Url: rec.URL, NATSUrl: rec.NATSUrl, WalletAddress: rec.Wallet, } access := pp.NewAccessor(&tools.APIRequest{Admin: true}) if now.After(rec.ExpiryDate) { rec.State = pp.OFFLINE.EnumIndex() p.State = pp.OFFLINE payload, _ := json.Marshal(rec) d.PutValue(ctx, key, payload) if founded, _, err := access.Search(nil, p.Name, false); err != nil || len(founded) == 0 { access.StoreOne(p) } else if f, _, err := access.LoadOne(founded[0].GetID()); err == nil { f.(*pp.Peer).State = pp.OFFLINE access.UpdateOne(f, f.GetID()) } return nil, errors.New("peer " + key + " is expired") } if rec.State == pp.OFFLINE.EnumIndex() { if founded, _, err := access.Search(nil, p.Name, false); err != nil || len(founded) == 0 { ppp, _, err := access.StoreOne(p) if err == nil { return ppp.(*pp.Peer), nil } } else { f, ok := pp.CheckPeerStatus(founded[0].GetID(), "") if ok { f.State = pp.ONLINE } else { f.State = pp.OFFLINE } ppp, _, err := access.UpdateOne(f, f.GetID()) if err == nil && ok { return ppp.(*pp.Peer), nil } } } return nil, errors.New("peer " + key + " is offline") } // Discover a specific Peer func (d *DHTService) DiscoverPeers( ctx context.Context, name string, ) ([]*pp.Peer, error) { peers := []*pp.Peer{} key, err := d.generateKey() if err != nil { return nil, err } datas, err := d.DHT.SearchValue(ctx, key) if err != nil { return nil, errors.New("no DHT peer not found") } for data := range datas { var dht *DHTRecord json.Unmarshal(data, dht) if p, err := d.treatPeer(ctx, key, dht); err == nil { peers = append(peers, p) } } return peers, nil } func (d *DHTService) GetPeer( ctx context.Context, name string, ) (*pp.Peer, error) { key, err := d.generateKey() if err != nil { return nil, err } data, err := d.GetValue(ctx, key) if err != nil { return nil, errors.New("no DHT peer not found") } return d.treatPeer(ctx, key, data) } func (d *DHTService) existsDHT(ctx context.Context) (*DHTRecord, error) { if d.Key == "" { return nil, errors.New("no self peer found") } rec, err := d.GetValue(ctx, d.Key) if err != nil { return nil, errors.New("no DHT peer found") } pubKey, err := crypto.UnmarshalPublicKey(rec.PubKey) if err != nil { return rec, err } dht := DHTRecord{ Name: rec.Name, PeerID: rec.PeerID, PubKey: rec.PubKey, } payload, _ := json.Marshal(dht) if ok, _ := verify(pubKey, payload, rec.Signature); !ok { return rec, err } return rec, nil }