Peer Discovery
This commit is contained in:
@@ -1,81 +0,0 @@
|
|||||||
package controllers
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"oc-peer/infrastructure"
|
|
||||||
|
|
||||||
beego "github.com/beego/beego/v2/server/web"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Operations about workflow
|
|
||||||
type DistributedPeerController struct {
|
|
||||||
beego.Controller
|
|
||||||
}
|
|
||||||
|
|
||||||
// @Title Search
|
|
||||||
// @Description search workspace
|
|
||||||
// @Param search path string true "the word search you want to get"
|
|
||||||
// @Param is_draft query string false
|
|
||||||
// @Success 200 {workspace} models.workspace
|
|
||||||
// @router /search/:search [get]
|
|
||||||
func (o *DistributedPeerController) Search() {
|
|
||||||
//user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
|
||||||
// store and return Id or post with UUIDLibDataEnum
|
|
||||||
search := o.Ctx.Input.Param(":search")
|
|
||||||
|
|
||||||
service := infrastructure.GetDHTService()
|
|
||||||
code := 400
|
|
||||||
err := "no DHT Service available"
|
|
||||||
if service != nil {
|
|
||||||
if p, errr := service.DiscoverPeers(context.Background(), search); errr == nil {
|
|
||||||
o.Data["json"] = map[string]interface{}{
|
|
||||||
"data": p,
|
|
||||||
"code": 200,
|
|
||||||
"error": nil,
|
|
||||||
}
|
|
||||||
o.ServeJSON()
|
|
||||||
return
|
|
||||||
} else {
|
|
||||||
err = errr.Error()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
o.Data["json"] = map[string]interface{}{
|
|
||||||
"data": []interface{}{},
|
|
||||||
"code": code,
|
|
||||||
"error": err,
|
|
||||||
}
|
|
||||||
o.ServeJSON()
|
|
||||||
}
|
|
||||||
|
|
||||||
// @Title Get
|
|
||||||
// @Description find peer by peerid
|
|
||||||
// @Param id path string true "the peer id you want to get"
|
|
||||||
// @Success 200 {peer} models.peer
|
|
||||||
// @router /:name [get]
|
|
||||||
func (o *DistributedPeerController) Get() {
|
|
||||||
name := o.Ctx.Input.Param(":name")
|
|
||||||
service := infrastructure.GetDHTService()
|
|
||||||
code := 400
|
|
||||||
err := "no DHT Service available"
|
|
||||||
if service != nil {
|
|
||||||
if p, errr := service.GetPeer(context.Background(), name); errr == nil {
|
|
||||||
o.Data["json"] = map[string]interface{}{
|
|
||||||
"data": p,
|
|
||||||
"code": 200,
|
|
||||||
"error": nil,
|
|
||||||
}
|
|
||||||
o.ServeJSON()
|
|
||||||
return
|
|
||||||
} else {
|
|
||||||
err = errr.Error()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
o.Data["json"] = map[string]interface{}{
|
|
||||||
"data": nil,
|
|
||||||
"code": code,
|
|
||||||
"error": err,
|
|
||||||
}
|
|
||||||
o.ServeJSON()
|
|
||||||
}
|
|
||||||
@@ -67,7 +67,7 @@ func (o *PeerController) Get() {
|
|||||||
// @Success 200 {peer} models.peer
|
// @Success 200 {peer} models.peer
|
||||||
// @router /:from/link/:relation [get]
|
// @router /:from/link/:relation [get]
|
||||||
func (o *PeerController) Link() {
|
func (o *PeerController) Link() {
|
||||||
user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
_, peerID, _ := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
||||||
id := o.Ctx.Input.Param(":from")
|
id := o.Ctx.Input.Param(":from")
|
||||||
if ok, _ := oclib.IsMySelf(peerID); ok {
|
if ok, _ := oclib.IsMySelf(peerID); ok {
|
||||||
o.Data["json"] = map[string]interface{}{
|
o.Data["json"] = map[string]interface{}{
|
||||||
@@ -88,7 +88,7 @@ func (o *PeerController) Link() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
relation := o.Ctx.Input.Param(":relation") // as partner, blacklist, unknown
|
relation := o.Ctx.Input.Param(":relation") // as partner, blacklist, unknown
|
||||||
req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), user, peerID, groups, nil)
|
req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||||
l := req.LoadOne(id)
|
l := req.LoadOne(id)
|
||||||
if p := l.ToPeer(); p != nil {
|
if p := l.ToPeer(); p != nil {
|
||||||
if peer.GetRelationPath(relation) != -1 {
|
if peer.GetRelationPath(relation) != -1 {
|
||||||
@@ -120,9 +120,9 @@ func (o *PeerController) Link() {
|
|||||||
// @Success 200 {peer} models.peer
|
// @Success 200 {peer} models.peer
|
||||||
// @router /:id/unknown [post]
|
// @router /:id/unknown [post]
|
||||||
func (o *PeerController) Unknown() {
|
func (o *PeerController) Unknown() {
|
||||||
user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
//user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
||||||
id := o.Ctx.Input.Param(":id")
|
id := o.Ctx.Input.Param(":id")
|
||||||
req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), user, peerID, groups, nil)
|
req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||||
data := req.LoadOne(id)
|
data := req.LoadOne(id)
|
||||||
o.changeRelation(data.ToPeer(), peer.NONE, req)
|
o.changeRelation(data.ToPeer(), peer.NONE, req)
|
||||||
}
|
}
|
||||||
@@ -133,9 +133,9 @@ func (o *PeerController) Unknown() {
|
|||||||
// @Success 200 {peer} models.peer
|
// @Success 200 {peer} models.peer
|
||||||
// @router /:id/partner [post]
|
// @router /:id/partner [post]
|
||||||
func (o *PeerController) Partner() {
|
func (o *PeerController) Partner() {
|
||||||
user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
// user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
||||||
id := o.Ctx.Input.Param(":id")
|
id := o.Ctx.Input.Param(":id")
|
||||||
req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), user, peerID, groups, nil)
|
req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||||
data := req.LoadOne(id)
|
data := req.LoadOne(id)
|
||||||
o.changeRelation(data.ToPeer(), peer.PARTNER, req)
|
o.changeRelation(data.ToPeer(), peer.PARTNER, req)
|
||||||
}
|
}
|
||||||
@@ -146,9 +146,9 @@ func (o *PeerController) Partner() {
|
|||||||
// @Success 200 {peer} models.peer
|
// @Success 200 {peer} models.peer
|
||||||
// @router /:id/blacklist [post]
|
// @router /:id/blacklist [post]
|
||||||
func (o *PeerController) Blacklist() {
|
func (o *PeerController) Blacklist() {
|
||||||
user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
// user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
||||||
id := o.Ctx.Input.Param(":id")
|
id := o.Ctx.Input.Param(":id")
|
||||||
req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), user, peerID, groups, nil)
|
req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||||
data := req.LoadOne(id)
|
data := req.LoadOne(id)
|
||||||
o.changeRelation(data.ToPeer(), peer.BLACKLIST, req)
|
o.changeRelation(data.ToPeer(), peer.BLACKLIST, req)
|
||||||
}
|
}
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -5,7 +5,7 @@ go 1.24.6
|
|||||||
toolchain go1.24.11
|
toolchain go1.24.11
|
||||||
|
|
||||||
require (
|
require (
|
||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260127143728-3c052bf16572
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260128145248-d2c5d2031857
|
||||||
github.com/beego/beego/v2 v2.3.8
|
github.com/beego/beego/v2 v2.3.8
|
||||||
github.com/smartystreets/goconvey v1.7.2
|
github.com/smartystreets/goconvey v1.7.2
|
||||||
)
|
)
|
||||||
|
|||||||
6
go.sum
6
go.sum
@@ -24,6 +24,12 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260126120055-055e6c70cdd7 h1:LAK86efqe2HNV
|
|||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260126120055-055e6c70cdd7/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260126120055-055e6c70cdd7/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260127143728-3c052bf16572 h1:jrUHgs4DqNWLnLcb5nd4lrJim77+aGkJFACUfMogiu8=
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260127143728-3c052bf16572 h1:jrUHgs4DqNWLnLcb5nd4lrJim77+aGkJFACUfMogiu8=
|
||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260127143728-3c052bf16572/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260127143728-3c052bf16572/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260128140807-1c9d7b63c0b3 h1:zAT4ZulAaX+l28QdCMvuXh5XQxn+fU8x6YNJ1zmA7+Q=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260128140807-1c9d7b63c0b3/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260128144921-e3fe49c239f4 h1:V4dawzvOJjV1l5fCVzEdI4/fd9lOMTs3qsfxckBtp6U=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260128144921-e3fe49c239f4/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260128145248-d2c5d2031857 h1:SrvueDUww61xt//jPlDO8zeNLIMr9r7qRluy+Q8OHYA=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260128145248-d2c5d2031857/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||||
github.com/beego/beego/v2 v2.3.8 h1:wplhB1pF4TxR+2SS4PUej8eDoH4xGfxuHfS7wAk9VBc=
|
github.com/beego/beego/v2 v2.3.8 h1:wplhB1pF4TxR+2SS4PUej8eDoH4xGfxuHfS7wAk9VBc=
|
||||||
github.com/beego/beego/v2 v2.3.8/go.mod h1:8vl9+RrXqvodrl9C8yivX1e6le6deCK6RWeq8R7gTTg=
|
github.com/beego/beego/v2 v2.3.8/go.mod h1:8vl9+RrXqvodrl9C8yivX1e6le6deCK6RWeq8R7gTTg=
|
||||||
|
|||||||
@@ -1,66 +0,0 @@
|
|||||||
package infrastructure
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"fmt"
|
|
||||||
"oc-peer/conf"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/crypto"
|
|
||||||
"github.com/libp2p/go-libp2p/core/pnet"
|
|
||||||
)
|
|
||||||
|
|
||||||
func sign(priv crypto.PrivKey, data []byte) ([]byte, error) {
|
|
||||||
return priv.Sign(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
func verify(pub crypto.PubKey, data, sig []byte) (bool, error) {
|
|
||||||
return pub.Verify(data, sig)
|
|
||||||
}
|
|
||||||
|
|
||||||
func LoadKeyFromFile(isPublic bool) (crypto.PrivKey, error) {
|
|
||||||
path := conf.GetConfig().PrivateKeyPath
|
|
||||||
if isPublic {
|
|
||||||
path = conf.GetConfig().PublicKeyPath
|
|
||||||
}
|
|
||||||
data, err := os.ReadFile(path)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to unmarshal as libp2p private key (supports ed25519, rsa, etc.)
|
|
||||||
priv, err := crypto.UnmarshalPrivateKey(data)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return priv, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func VerifyPubWithPriv() bool {
|
|
||||||
priv, err := LoadKeyFromFile(false)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
pub, err := LoadKeyFromFile(true)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return priv.GetPublic().Equals(pub)
|
|
||||||
}
|
|
||||||
|
|
||||||
func LoadPSKFromFile() (pnet.PSK, error) {
|
|
||||||
path := conf.GetConfig().PSKPath
|
|
||||||
data, err := os.ReadFile(path)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to unmarshal as libp2p private key (supports ed25519, rsa, etc.)
|
|
||||||
psk, err := pnet.DecodeV1PSK(bytes.NewReader(data))
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return psk, nil
|
|
||||||
}
|
|
||||||
@@ -1,446 +0,0 @@
|
|||||||
package infrastructure
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/base64"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"oc-peer/conf"
|
|
||||||
"slices"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
oclib "cloud.o-forge.io/core/oc-lib"
|
|
||||||
pp "cloud.o-forge.io/core/oc-lib/models/peer"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/tools"
|
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/libp2p/go-libp2p"
|
|
||||||
dht "github.com/libp2p/go-libp2p-kad-dht"
|
|
||||||
"github.com/libp2p/go-libp2p/core/crypto"
|
|
||||||
"github.com/libp2p/go-libp2p/core/host"
|
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
|
||||||
)
|
|
||||||
|
|
||||||
type DHTRecord struct {
|
|
||||||
Name string `json:"name"`
|
|
||||||
State int `json:"state"`
|
|
||||||
DID string `json:"did"`
|
|
||||||
PeerID string `json:"peer_id"`
|
|
||||||
PubKey []byte `json:"pub_key"`
|
|
||||||
URL string `json:"url"`
|
|
||||||
NATSUrl string `json:"nats_url"`
|
|
||||||
Wallet string `json:"wallet"`
|
|
||||||
Signature []byte `json:"signature"`
|
|
||||||
ExpiryDate time.Time `json:"expiry_date"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type DHTService struct {
|
|
||||||
Key string
|
|
||||||
Host host.Host
|
|
||||||
DHT *dht.IpfsDHT
|
|
||||||
Cache []string
|
|
||||||
mutex sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
var singletonService *DHTService
|
|
||||||
|
|
||||||
func GetDHTService() *DHTService {
|
|
||||||
return singletonService
|
|
||||||
}
|
|
||||||
|
|
||||||
func Init(ctx context.Context) (*DHTService, error) {
|
|
||||||
service := &DHTService{}
|
|
||||||
priv, err := LoadKeyFromFile(false)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
psk, err := LoadPSKFromFile()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
h, err := libp2p.New(
|
|
||||||
libp2p.PrivateNetwork(psk),
|
|
||||||
libp2p.Identity(priv),
|
|
||||||
libp2p.ListenAddrStrings(
|
|
||||||
fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", conf.GetConfig().DHTEndpointPort),
|
|
||||||
),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
service.Host = h
|
|
||||||
service.DHT, err = dht.New(ctx, h, dht.MaxRecordAge(24*time.Hour)) // every day DHT will purge expired data... if not used.
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
err = service.DHT.Bootstrap(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
pi, err := peer.AddrInfoFromString(conf.GetConfig().BootstrapAddress)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
logger := oclib.GetLogger()
|
|
||||||
if err := h.Connect(ctx, *pi); err != nil {
|
|
||||||
logger.Err(fmt.Errorf("Failed to connect to MAIN bootstrap peer %s: %s", pi.ID, err))
|
|
||||||
} else {
|
|
||||||
logger.Info().Msg(fmt.Sprintf("Connected to MAIN bootstrap peer %s", pi.ID))
|
|
||||||
}
|
|
||||||
|
|
||||||
singletonService = service
|
|
||||||
if VerifyPubWithPriv() {
|
|
||||||
if _, err := singletonService.ClaimName(context.Background(),
|
|
||||||
conf.GetConfig().Name,
|
|
||||||
conf.GetConfig().Hostname); err == nil {
|
|
||||||
go service.RefreshKeys(ctx, 30*time.Minute)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return service, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DHTService) RefreshKeys(ctx context.Context, interval time.Duration) {
|
|
||||||
ticker := time.NewTicker(interval)
|
|
||||||
go func() {
|
|
||||||
defer ticker.Stop()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
s := []string{}
|
|
||||||
d.mutex.Lock()
|
|
||||||
s = append(s, d.Cache...)
|
|
||||||
d.mutex.Unlock()
|
|
||||||
for _, key := range s {
|
|
||||||
_, _ = d.GetValue(ctx, key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DHTService) PutValue(
|
|
||||||
ctx context.Context,
|
|
||||||
key string,
|
|
||||||
value []byte,
|
|
||||||
) error {
|
|
||||||
err := d.DHT.PutValue(ctx, key, value)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
d.mutex.Lock()
|
|
||||||
if !slices.Contains(d.Cache, key) {
|
|
||||||
d.Cache = append(d.Cache, key)
|
|
||||||
}
|
|
||||||
d.mutex.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DHTService) GetValue(
|
|
||||||
ctx context.Context,
|
|
||||||
key string,
|
|
||||||
) (*DHTRecord, error) {
|
|
||||||
dht, err := d.DHT.GetValue(ctx, key)
|
|
||||||
if err != nil {
|
|
||||||
cache := []string{}
|
|
||||||
d.mutex.Lock()
|
|
||||||
for _, c := range d.Cache {
|
|
||||||
if c != key {
|
|
||||||
cache = append(cache, c)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
d.Cache = cache
|
|
||||||
d.mutex.Unlock()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
d.mutex.Lock()
|
|
||||||
if !slices.Contains(d.Cache, key) {
|
|
||||||
d.Cache = append(d.Cache, key)
|
|
||||||
}
|
|
||||||
d.mutex.Unlock()
|
|
||||||
var data DHTRecord
|
|
||||||
json.Unmarshal(dht, &data)
|
|
||||||
|
|
||||||
peerID, err := oclib.GenerateNodeID()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
access := pp.NewAccessor(&tools.APIRequest{Admin: true})
|
|
||||||
p := &pp.Peer{
|
|
||||||
AbstractObject: utils.AbstractObject{
|
|
||||||
UUID: uuid.New().String(),
|
|
||||||
Name: data.Name,
|
|
||||||
},
|
|
||||||
State: pp.ONLINE,
|
|
||||||
Relation: pp.SELF,
|
|
||||||
PeerID: peerID,
|
|
||||||
PublicKey: string(data.PubKey),
|
|
||||||
Url: data.URL,
|
|
||||||
NATSUrl: oclib.GetConfig().NATSUrl,
|
|
||||||
WalletAddress: data.Wallet,
|
|
||||||
}
|
|
||||||
if founded, _, err := access.Search(nil, fmt.Sprintf("%v", pp.SELF.EnumIndex()), false); err != nil || len(founded) == 0 {
|
|
||||||
access.StoreOne(p)
|
|
||||||
} else if f, _, err := access.LoadOne(founded[0].GetID()); err == nil && f.(*pp.Peer).Relation != pp.BLACKLIST {
|
|
||||||
f.(*pp.Peer).State = pp.ONLINE
|
|
||||||
f.(*pp.Peer).NATSUrl = p.NATSUrl
|
|
||||||
f.(*pp.Peer).Url = p.Url
|
|
||||||
f.(*pp.Peer).PeerID = p.PeerID
|
|
||||||
f.(*pp.Peer).Relation = p.Relation
|
|
||||||
f.(*pp.Peer).WalletAddress = p.WalletAddress
|
|
||||||
access.UpdateOne(f, f.GetID())
|
|
||||||
}
|
|
||||||
|
|
||||||
return &data, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DHTService) generateKey() (string, error) {
|
|
||||||
s, err := oclib.GenerateNodeID()
|
|
||||||
if err != nil {
|
|
||||||
return s, err
|
|
||||||
}
|
|
||||||
return "/opencloud/peer/" + s, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create your peer.
|
|
||||||
func (d *DHTService) ClaimName(
|
|
||||||
ctx context.Context,
|
|
||||||
name string,
|
|
||||||
endPoint string,
|
|
||||||
) (*pp.Peer, error) {
|
|
||||||
if endPoint == "" {
|
|
||||||
return nil, errors.New("no endpoint found for peer" + name)
|
|
||||||
}
|
|
||||||
|
|
||||||
peerID, err := oclib.GenerateNodeID()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
pub := d.Host.Peerstore().PubKey(d.Host.ID())
|
|
||||||
pubBytes, _ := pub.Raw()
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
expiry := now.Add(1 * time.Hour)
|
|
||||||
|
|
||||||
rec := DHTRecord{
|
|
||||||
Name: name,
|
|
||||||
PeerID: peerID,
|
|
||||||
PubKey: pubBytes,
|
|
||||||
}
|
|
||||||
|
|
||||||
payload, _ := json.Marshal(rec)
|
|
||||||
sig, _ := sign(d.Host.Peerstore().PrivKey(d.Host.ID()), payload)
|
|
||||||
rec.Signature = sig
|
|
||||||
|
|
||||||
rec.URL = endPoint
|
|
||||||
rec.NATSUrl = oclib.GetConfig().NATSUrl
|
|
||||||
rec.State = pp.ONLINE.EnumIndex()
|
|
||||||
rec.ExpiryDate = expiry
|
|
||||||
|
|
||||||
data, _ := json.Marshal(rec)
|
|
||||||
|
|
||||||
key, err := d.generateKey()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// retrieve your key name in standard
|
|
||||||
old, err := d.GetValue(ctx, key)
|
|
||||||
if err == nil {
|
|
||||||
if old.PeerID != peerID { // check if someone claims your name before
|
|
||||||
return nil, errors.New("name already claimed by another peer")
|
|
||||||
// TODO : can be fragile if 2 peers connect at the same time
|
|
||||||
}
|
|
||||||
if now.After(old.ExpiryDate) {
|
|
||||||
payload, _ := json.Marshal(rec)
|
|
||||||
d.PutValue(ctx, key, payload)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := d.PutValue(ctx, key, data); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
pubStr := base64.StdEncoding.EncodeToString(pubBytes)
|
|
||||||
d.Key = key
|
|
||||||
access := pp.NewAccessor(&tools.APIRequest{Admin: true})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
p := &pp.Peer{
|
|
||||||
AbstractObject: utils.AbstractObject{
|
|
||||||
UUID: uuid.New().String(),
|
|
||||||
Name: name,
|
|
||||||
},
|
|
||||||
State: pp.ONLINE,
|
|
||||||
Relation: pp.SELF,
|
|
||||||
PeerID: peerID,
|
|
||||||
PublicKey: pubStr,
|
|
||||||
Url: endPoint,
|
|
||||||
NATSUrl: oclib.GetConfig().NATSUrl,
|
|
||||||
WalletAddress: "my-wallet",
|
|
||||||
}
|
|
||||||
if founded, _, err := access.Search(nil, fmt.Sprintf("%v", pp.SELF.EnumIndex()), false); err != nil || len(founded) == 0 {
|
|
||||||
access.StoreOne(p)
|
|
||||||
} else if f, _, err := access.LoadOne(founded[0].GetID()); err == nil {
|
|
||||||
peerID, err := oclib.GenerateNodeID()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
f.(*pp.Peer).Name = name
|
|
||||||
f.(*pp.Peer).PeerID = peerID
|
|
||||||
f.(*pp.Peer).State = pp.ONLINE
|
|
||||||
f.(*pp.Peer).Url = endPoint
|
|
||||||
f.(*pp.Peer).NATSUrl = oclib.GetConfig().NATSUrl
|
|
||||||
f.(*pp.Peer).Relation = pp.SELF
|
|
||||||
access.UpdateOne(f, f.GetID())
|
|
||||||
}
|
|
||||||
|
|
||||||
return p, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DHTService) treatPeer(ctx context.Context, key string, rec *DHTRecord) (*pp.Peer, error) {
|
|
||||||
pubKey, err := crypto.UnmarshalPublicKey(rec.PubKey)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
dht := DHTRecord{
|
|
||||||
Name: rec.Name,
|
|
||||||
PeerID: rec.PeerID,
|
|
||||||
PubKey: rec.PubKey,
|
|
||||||
}
|
|
||||||
payload, _ := json.Marshal(dht)
|
|
||||||
|
|
||||||
if ok, _ := verify(pubKey, payload, rec.Signature); !ok {
|
|
||||||
return nil, errors.New("invalid signature")
|
|
||||||
}
|
|
||||||
pubBytes, _ := pubKey.Raw()
|
|
||||||
pubStr := base64.StdEncoding.EncodeToString(pubBytes)
|
|
||||||
|
|
||||||
rel := pp.NONE
|
|
||||||
if d.Key == key {
|
|
||||||
rel = pp.SELF
|
|
||||||
}
|
|
||||||
|
|
||||||
p := &pp.Peer{
|
|
||||||
AbstractObject: utils.AbstractObject{
|
|
||||||
UUID: uuid.New().String(),
|
|
||||||
Name: rec.Name,
|
|
||||||
},
|
|
||||||
State: pp.ONLINE,
|
|
||||||
Relation: rel,
|
|
||||||
PeerID: rec.PeerID,
|
|
||||||
PublicKey: pubStr,
|
|
||||||
Url: rec.URL,
|
|
||||||
NATSUrl: rec.NATSUrl,
|
|
||||||
WalletAddress: rec.Wallet,
|
|
||||||
}
|
|
||||||
access := pp.NewAccessor(&tools.APIRequest{Admin: true})
|
|
||||||
if now.After(rec.ExpiryDate) {
|
|
||||||
rec.State = pp.OFFLINE.EnumIndex()
|
|
||||||
p.State = pp.OFFLINE
|
|
||||||
payload, _ := json.Marshal(rec)
|
|
||||||
d.PutValue(ctx, key, payload)
|
|
||||||
if founded, _, err := access.Search(nil, p.Name, false); err != nil || len(founded) == 0 {
|
|
||||||
access.StoreOne(p)
|
|
||||||
} else if f, _, err := access.LoadOne(founded[0].GetID()); err == nil {
|
|
||||||
f.(*pp.Peer).State = pp.OFFLINE
|
|
||||||
access.UpdateOne(f, f.GetID())
|
|
||||||
}
|
|
||||||
return nil, errors.New("peer " + key + " is expired")
|
|
||||||
}
|
|
||||||
if rec.State == pp.OFFLINE.EnumIndex() {
|
|
||||||
if founded, _, err := access.Search(nil, p.Name, false); err != nil || len(founded) == 0 {
|
|
||||||
ppp, _, err := access.StoreOne(p)
|
|
||||||
if err == nil {
|
|
||||||
return ppp.(*pp.Peer), nil
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
f, ok := pp.CheckPeerStatus(founded[0].GetID(), "")
|
|
||||||
if ok {
|
|
||||||
f.State = pp.ONLINE
|
|
||||||
} else {
|
|
||||||
f.State = pp.OFFLINE
|
|
||||||
}
|
|
||||||
ppp, _, err := access.UpdateOne(f, f.GetID())
|
|
||||||
if err == nil && ok {
|
|
||||||
return ppp.(*pp.Peer), nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil, errors.New("peer " + key + " is offline")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Discover a specific Peer
|
|
||||||
func (d *DHTService) DiscoverPeers(
|
|
||||||
ctx context.Context,
|
|
||||||
name string,
|
|
||||||
) ([]*pp.Peer, error) {
|
|
||||||
peers := []*pp.Peer{}
|
|
||||||
key, err := d.generateKey()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
datas, err := d.DHT.SearchValue(ctx, key)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.New("no DHT peer not found")
|
|
||||||
}
|
|
||||||
for data := range datas {
|
|
||||||
var dht *DHTRecord
|
|
||||||
json.Unmarshal(data, dht)
|
|
||||||
if p, err := d.treatPeer(ctx, key, dht); err == nil {
|
|
||||||
peers = append(peers, p)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return peers, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DHTService) GetPeer(
|
|
||||||
ctx context.Context,
|
|
||||||
name string,
|
|
||||||
) (*pp.Peer, error) {
|
|
||||||
key, err := d.generateKey()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
data, err := d.GetValue(ctx, key)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.New("no DHT peer not found")
|
|
||||||
}
|
|
||||||
return d.treatPeer(ctx, key, data)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *DHTService) existsDHT(ctx context.Context) (*DHTRecord, error) {
|
|
||||||
if d.Key == "" {
|
|
||||||
return nil, errors.New("no self peer found")
|
|
||||||
}
|
|
||||||
rec, err := d.GetValue(ctx, d.Key)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.New("no DHT peer found")
|
|
||||||
}
|
|
||||||
|
|
||||||
pubKey, err := crypto.UnmarshalPublicKey(rec.PubKey)
|
|
||||||
if err != nil {
|
|
||||||
return rec, err
|
|
||||||
}
|
|
||||||
|
|
||||||
dht := DHTRecord{
|
|
||||||
Name: rec.Name,
|
|
||||||
PeerID: rec.PeerID,
|
|
||||||
PubKey: rec.PubKey,
|
|
||||||
}
|
|
||||||
payload, _ := json.Marshal(dht)
|
|
||||||
|
|
||||||
if ok, _ := verify(pubKey, payload, rec.Signature); !ok {
|
|
||||||
return rec, err
|
|
||||||
}
|
|
||||||
return rec, nil
|
|
||||||
}
|
|
||||||
37
infrastructure/nats.go
Normal file
37
infrastructure/nats.go
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
package infrastructure
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
oclib "cloud.o-forge.io/core/oc-lib"
|
||||||
|
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||||
|
"cloud.o-forge.io/core/oc-lib/tools"
|
||||||
|
)
|
||||||
|
|
||||||
|
func ListenNATS() {
|
||||||
|
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
|
||||||
|
tools.CREATE_PEER: func(resp tools.NATSResponse) {
|
||||||
|
p := &peer.Peer{}
|
||||||
|
err := json.Unmarshal(resp.Payload, p)
|
||||||
|
if err == nil {
|
||||||
|
search := p.PeerID
|
||||||
|
if p.Relation == peer.SELF {
|
||||||
|
search = fmt.Sprintf("%v", peer.SELF.EnumIndex())
|
||||||
|
}
|
||||||
|
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||||
|
if data := access.Search(nil, search, false); len(data.Data) > 0 {
|
||||||
|
pp := data.Data[0].(*peer.Peer)
|
||||||
|
access.UpdateOne(pp.Serialize(pp), pp.GetID())
|
||||||
|
} else {
|
||||||
|
access.StoreOne(p.Serialize(p))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
tools.REMOVE_PEER: func(tools.NATSResponse) {
|
||||||
|
p := &peer.Peer{}
|
||||||
|
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||||
|
access.DeleteOne(p.GetID())
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
3
main.go
3
main.go
@@ -1,7 +1,6 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"oc-peer/conf"
|
"oc-peer/conf"
|
||||||
"oc-peer/infrastructure"
|
"oc-peer/infrastructure"
|
||||||
_ "oc-peer/routers"
|
_ "oc-peer/routers"
|
||||||
@@ -51,6 +50,6 @@ func main() {
|
|||||||
ExposeHeaders: []string{"Content-Length", "Content-Type"},
|
ExposeHeaders: []string{"Content-Length", "Content-Type"},
|
||||||
AllowCredentials: true,
|
AllowCredentials: true,
|
||||||
}))
|
}))
|
||||||
infrastructure.Init(context.Background())
|
infrastructure.ListenNATS()
|
||||||
beego.Run()
|
beego.Run()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -23,11 +23,6 @@ func init() {
|
|||||||
beego.NSInclude(
|
beego.NSInclude(
|
||||||
&controllers.PeerController{},
|
&controllers.PeerController{},
|
||||||
),
|
),
|
||||||
beego.NSNamespace("/distributed",
|
|
||||||
beego.NSInclude(
|
|
||||||
&controllers.DistributedPeerController{},
|
|
||||||
),
|
|
||||||
),
|
|
||||||
beego.NSNamespace("/version",
|
beego.NSNamespace("/version",
|
||||||
beego.NSInclude(
|
beego.NSInclude(
|
||||||
&controllers.VersionController{},
|
&controllers.VersionController{},
|
||||||
|
|||||||
Reference in New Issue
Block a user