Compare commits
10 Commits
feature/dh
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| fa914958b6 | |||
| 1c0b2b4312 | |||
| 631e2846fe | |||
| d985d8339a | |||
| ea14ad3933 | |||
| 2e31df89c2 | |||
| 425cbdfe7d | |||
| 8ee5b84e21 | |||
| 552bb17e2b | |||
| 88e29073a2 |
13
Dockerfile
13
Dockerfile
@@ -21,11 +21,6 @@ RUN go mod download
|
|||||||
FROM golang:alpine AS builder
|
FROM golang:alpine AS builder
|
||||||
ARG CONF_NUM
|
ARG CONF_NUM
|
||||||
|
|
||||||
# Fail fast if CONF_NUM missing
|
|
||||||
RUN test -n "$CONF_NUM"
|
|
||||||
|
|
||||||
RUN apk add --no-cache git
|
|
||||||
|
|
||||||
WORKDIR /oc-discovery
|
WORKDIR /oc-discovery
|
||||||
|
|
||||||
# Reuse Go cache
|
# Reuse Go cache
|
||||||
@@ -55,13 +50,13 @@ WORKDIR /app
|
|||||||
|
|
||||||
RUN mkdir ./pem
|
RUN mkdir ./pem
|
||||||
|
|
||||||
COPY --from=builder /app/extracted/pem/private${CONF_NUM}.pem ./pem/private.pem
|
COPY --from=builder /app/extracted/pem/private${CONF_NUM:-1}.pem ./pem/private.pem
|
||||||
COPY --from=builder /app/extracted/psk ./psk
|
COPY --from=builder /app/extracted/psk ./psk
|
||||||
COPY --from=builder /app/extracted/pem/public${CONF_NUM}.pem ./pem/public.pem
|
COPY --from=builder /app/extracted/pem/public${CONF_NUM:-1}.pem ./pem/public.pem
|
||||||
|
|
||||||
COPY --from=builder /app/extracted/oc-discovery /usr/bin/oc-discovery
|
COPY --from=builder /app/extracted/oc-discovery /usr/bin/oc-discovery
|
||||||
COPY --from=builder /app/extracted/docker_discovery${CONF_NUM}.json /etc/oc/discovery.json
|
COPY --from=builder /app/extracted/docker_discovery${CONF_NUM:-1}.json /etc/oc/discovery.json
|
||||||
|
|
||||||
EXPOSE 400${CONF_NUM}
|
EXPOSE 400${CONF_NUM:-1}
|
||||||
|
|
||||||
ENTRYPOINT ["oc-discovery"]
|
ENTRYPOINT ["oc-discovery"]
|
||||||
12
Makefile
12
Makefile
@@ -10,15 +10,17 @@ clean:
|
|||||||
rm -rf oc-discovery
|
rm -rf oc-discovery
|
||||||
|
|
||||||
docker:
|
docker:
|
||||||
DOCKER_BUILDKIT=1 docker build -t oc/oc-discovery:0.0.1 -f Dockerfile .
|
DOCKER_BUILDKIT=1 docker build -t oc-discovery -f Dockerfile .
|
||||||
docker tag oc/oc-discovery:0.0.1 oc/oc-discovery:latest
|
docker tag oc-discovery opencloudregistry/oc-discovery:latest
|
||||||
|
|
||||||
publish-kind:
|
publish-kind:
|
||||||
kind load docker-image oc/oc-discovery:0.0.1 --name opencloud
|
kind load docker-image opencloudregistry/oc-discovery:latest --name opencloud
|
||||||
|
|
||||||
publish-registry:
|
publish-registry:
|
||||||
@echo "TODO"
|
docker push opencloudregistry/oc-discovery:latest
|
||||||
|
|
||||||
all: docker publish-kind publish-registry
|
all: docker publish-kind
|
||||||
|
|
||||||
|
ci: docker publish-registry
|
||||||
|
|
||||||
.PHONY: build run clean docker publish-kind publish-registry
|
.PHONY: build run clean docker publish-kind publish-registry
|
||||||
@@ -14,3 +14,4 @@ If default Swagger page is displayed instead of tyour api, change url in swagger
|
|||||||
|
|
||||||
url: "swagger.json"
|
url: "swagger.json"
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -28,7 +28,7 @@ type Event struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func NewEvent(name string, from string, dt *tools.DataType, user string, payload []byte) *Event {
|
func NewEvent(name string, from string, dt *tools.DataType, user string, payload []byte) *Event {
|
||||||
priv, err := LoadKeyFromFilePrivate() // your node private key
|
priv, err := tools.LoadKeyFromFilePrivate() // your node private key
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -88,8 +88,8 @@ func (event *Event) Verify(p *peer.Peer) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type TopicNodeActivityPub struct {
|
type TopicNodeActivityPub struct {
|
||||||
NodeActivity peer.PeerState
|
NodeActivity int `json:"node_activity"`
|
||||||
Disposer pp.AddrInfo `json:"disposer_address"`
|
Disposer string `json:"disposer_address"`
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
DID string `json:"did"` // real PEER ID
|
DID string `json:"did"` // real PEER ID
|
||||||
PeerID string `json:"peer_id"`
|
PeerID string `json:"peer_id"`
|
||||||
@@ -159,20 +159,16 @@ func (s *LongLivedPubSubService) SubscribeToSearch(ps *pubsub.PubSub, f *func(co
|
|||||||
func SubscribeEvents[T interface{}](s *LongLivedPubSubService,
|
func SubscribeEvents[T interface{}](s *LongLivedPubSubService,
|
||||||
ctx context.Context, proto string, timeout int, f func(context.Context, T, string),
|
ctx context.Context, proto string, timeout int, f func(context.Context, T, string),
|
||||||
) error {
|
) error {
|
||||||
s.PubsubMu.Lock()
|
|
||||||
if s.LongLivedPubSubs[proto] == nil {
|
if s.LongLivedPubSubs[proto] == nil {
|
||||||
s.PubsubMu.Unlock()
|
|
||||||
return errors.New("no protocol subscribed in pubsub")
|
return errors.New("no protocol subscribed in pubsub")
|
||||||
}
|
}
|
||||||
topic := s.LongLivedPubSubs[proto]
|
topic := s.LongLivedPubSubs[proto]
|
||||||
s.PubsubMu.Unlock()
|
|
||||||
|
|
||||||
sub, err := topic.Subscribe() // then subscribe to it
|
sub, err := topic.Subscribe() // then subscribe to it
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// launch loop waiting for results.
|
// launch loop waiting for results.
|
||||||
go waitResults[T](s, ctx, sub, proto, timeout, f)
|
go waitResults(s, ctx, sub, proto, timeout, f)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -207,10 +203,5 @@ func waitResults[T interface{}](s *LongLivedPubSubService, ctx context.Context,
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
f(ctx, evt, fmt.Sprintf("%v", proto))
|
f(ctx, evt, fmt.Sprintf("%v", proto))
|
||||||
/*if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 {
|
|
||||||
if err := ps.processEvent(ctx, p[0], &evt, topicName); err != nil {
|
|
||||||
logger.Err(err)
|
|
||||||
}
|
|
||||||
}*/
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -68,18 +68,16 @@ func (ix *LongLivedStreamRecordedService[T]) gc() {
|
|||||||
}
|
}
|
||||||
ix.PubsubMu.Lock()
|
ix.PubsubMu.Lock()
|
||||||
if ix.LongLivedPubSubs[TopicPubSubNodeActivity] != nil {
|
if ix.LongLivedPubSubs[TopicPubSubNodeActivity] != nil {
|
||||||
ad, err := pp.AddrInfoFromString("/ip4/" + conf.GetConfig().Hostname + " /tcp/" + fmt.Sprintf("%v", conf.GetConfig().NodeEndpointPort) + " /p2p/" + ix.Host.ID().String())
|
|
||||||
if err == nil {
|
|
||||||
if b, err := json.Marshal(TopicNodeActivityPub{
|
if b, err := json.Marshal(TopicNodeActivityPub{
|
||||||
Disposer: *ad,
|
Disposer: "/ip4/" + conf.GetConfig().Hostname + "/tcp/" + fmt.Sprintf("%v", conf.GetConfig().NodeEndpointPort) + "/p2p/" + ix.Host.ID().String(),
|
||||||
Name: rec.HeartbeatStream.Name,
|
Name: rec.HeartbeatStream.Name,
|
||||||
DID: rec.HeartbeatStream.DID,
|
DID: rec.HeartbeatStream.DID,
|
||||||
PeerID: pid.String(),
|
PeerID: pid.String(),
|
||||||
NodeActivity: peer.OFFLINE,
|
NodeActivity: peer.OFFLINE.EnumIndex(),
|
||||||
}); err == nil {
|
}); err == nil {
|
||||||
ix.LongLivedPubSubs[TopicPubSubNodeActivity].Publish(context.Background(), b)
|
ix.LongLivedPubSubs[TopicPubSubNodeActivity].Publish(context.Background(), b)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
ix.PubsubMu.Unlock()
|
ix.PubsubMu.Unlock()
|
||||||
}
|
}
|
||||||
@@ -235,7 +233,6 @@ var StreamIndexers ProtocolStream = ProtocolStream{}
|
|||||||
|
|
||||||
func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) {
|
func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) {
|
||||||
logger := oclib.GetLogger()
|
logger := oclib.GetLogger()
|
||||||
ctx := context.Background()
|
|
||||||
addresses := strings.Split(conf.GetConfig().IndexerAddresses, ",")
|
addresses := strings.Split(conf.GetConfig().IndexerAddresses, ",")
|
||||||
|
|
||||||
if len(addresses) > maxIndexer {
|
if len(addresses) > maxIndexer {
|
||||||
@@ -243,13 +240,17 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID)
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, indexerAddr := range addresses {
|
for _, indexerAddr := range addresses {
|
||||||
|
fmt.Println("GENERATE ADDR", indexerAddr)
|
||||||
ad, err := pp.AddrInfoFromString(indexerAddr)
|
ad, err := pp.AddrInfoFromString(indexerAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
fmt.Println("ADDR ERR", err)
|
||||||
logger.Err(err)
|
logger.Err(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
force := false
|
||||||
if h.Network().Connectedness(ad.ID) != network.Connected {
|
if h.Network().Connectedness(ad.ID) != network.Connected {
|
||||||
if err := h.Connect(ctx, *ad); err != nil {
|
force = true
|
||||||
|
if err := h.Connect(context.Background(), *ad); err != nil {
|
||||||
fmt.Println(err)
|
fmt.Println(err)
|
||||||
logger.Err(err)
|
logger.Err(err)
|
||||||
continue
|
continue
|
||||||
@@ -258,7 +259,7 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID)
|
|||||||
StaticIndexers = append(StaticIndexers, ad)
|
StaticIndexers = append(StaticIndexers, ad)
|
||||||
// make a privilege streams with indexer.
|
// make a privilege streams with indexer.
|
||||||
for _, proto := range []protocol.ID{ProtocolPublish, ProtocolGet, ProtocolHeartbeat} {
|
for _, proto := range []protocol.ID{ProtocolPublish, ProtocolGet, ProtocolHeartbeat} {
|
||||||
AddStreamProtocol(nil, StreamIndexers, h, proto, ad.ID, myPID, true, nil)
|
AddStreamProtocol(nil, StreamIndexers, h, proto, ad.ID, myPID, force, nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(StaticIndexers) == 0 {
|
if len(StaticIndexers) == 0 {
|
||||||
@@ -268,7 +269,7 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID)
|
|||||||
if len(StaticIndexers) < minIndexer {
|
if len(StaticIndexers) < minIndexer {
|
||||||
// TODO : ask for unknown indexer.
|
// TODO : ask for unknown indexer.
|
||||||
}
|
}
|
||||||
SendHeartbeat(ctx, ProtocolHeartbeat, conf.GetConfig().Name, h, StreamIndexers, StaticIndexers, 20*time.Second) // your indexer is just like a node for the next indexer.
|
SendHeartbeat(context.Background(), ProtocolHeartbeat, conf.GetConfig().Name, h, StreamIndexers, StaticIndexers, 20*time.Second) // your indexer is just like a node for the next indexer.
|
||||||
}
|
}
|
||||||
|
|
||||||
func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, force bool, onStreamCreated *func(network.Stream)) ProtocolStream {
|
func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, force bool, onStreamCreated *func(network.Stream)) ProtocolStream {
|
||||||
@@ -294,7 +295,7 @@ func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host,
|
|||||||
if protoS[proto][id] != nil {
|
if protoS[proto][id] != nil {
|
||||||
protoS[proto][id].Expiry = time.Now().Add(2 * time.Minute)
|
protoS[proto][id].Expiry = time.Now().Add(2 * time.Minute)
|
||||||
} else {
|
} else {
|
||||||
fmt.Println("GENERATE STREAM", proto, id)
|
fmt.Println("NEW STREAM", proto, id)
|
||||||
s, err := h.NewStream(*ctx, id, proto)
|
s, err := h.NewStream(*ctx, id, proto)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err.Error())
|
panic(err.Error())
|
||||||
@@ -385,18 +386,3 @@ func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.Ad
|
|||||||
pss.Expiry = time.Now().UTC().Add(2 * time.Minute)
|
pss.Expiry = time.Now().UTC().Add(2 * time.Minute)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
func SearchPeer(search string) ([]*peer.Peer, error) {
|
|
||||||
ps := []*peer.Peer{}
|
|
||||||
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
|
||||||
peers := access.Search(nil, search, false)
|
|
||||||
if len(peers.Data) == 0 {
|
|
||||||
return ps, errors.New("no self available")
|
|
||||||
}
|
|
||||||
for _, p := range peers.Data {
|
|
||||||
ps = append(ps, p.(*peer.Peer))
|
|
||||||
}
|
|
||||||
return ps, nil
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|||||||
@@ -2,12 +2,8 @@ package common
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/ed25519"
|
|
||||||
"crypto/x509"
|
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/pem"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"oc-discovery/conf"
|
"oc-discovery/conf"
|
||||||
"oc-discovery/models"
|
"oc-discovery/models"
|
||||||
"os"
|
"os"
|
||||||
@@ -47,45 +43,6 @@ func Verify(pub crypto.PubKey, data, sig []byte) (bool, error) {
|
|||||||
return pub.Verify(data, sig)
|
return pub.Verify(data, sig)
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadKeyFromFilePrivate() (crypto.PrivKey, error) {
|
|
||||||
path := conf.GetConfig().PrivateKeyPath
|
|
||||||
data, err := os.ReadFile(path)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
block, _ := pem.Decode(data)
|
|
||||||
keyAny, err := x509.ParsePKCS8PrivateKey(block.Bytes)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
edKey, ok := keyAny.(ed25519.PrivateKey)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("not an ed25519 key")
|
|
||||||
}
|
|
||||||
return crypto.UnmarshalEd25519PrivateKey(edKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
func LoadKeyFromFilePublic() (crypto.PubKey, error) {
|
|
||||||
path := conf.GetConfig().PublicKeyPath
|
|
||||||
data, err := os.ReadFile(path)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
block, _ := pem.Decode(data)
|
|
||||||
keyAny, err := x509.ParsePKIXPublicKey(block.Bytes)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
edKey, ok := keyAny.(ed25519.PublicKey)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("not an ed25519 key")
|
|
||||||
}
|
|
||||||
// Try to unmarshal as libp2p private key (supports ed25519, rsa, etc.)
|
|
||||||
return crypto.UnmarshalEd25519PublicKey(edKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
func LoadPSKFromFile() (pnet.PSK, error) {
|
func LoadPSKFromFile() (pnet.PSK, error) {
|
||||||
path := conf.GetConfig().PSKPath
|
path := conf.GetConfig().PSKPath
|
||||||
data, err := os.ReadFile(path)
|
data, err := os.ReadFile(path)
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ type PeerRecord struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *PeerRecord) Sign() error {
|
func (p *PeerRecord) Sign() error {
|
||||||
priv, err := common.LoadKeyFromFilePrivate()
|
priv, err := tools.LoadKeyFromFilePrivate()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -53,7 +53,6 @@ func (p *PeerRecord) Sign() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *PeerRecord) Verify() (crypto.PubKey, error) {
|
func (p *PeerRecord) Verify() (crypto.PubKey, error) {
|
||||||
fmt.Println(p.PubKey)
|
|
||||||
pubKey, err := crypto.UnmarshalPublicKey(p.PubKey) // retrieve pub key in message
|
pubKey, err := crypto.UnmarshalPublicKey(p.PubKey) // retrieve pub key in message
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("UnmarshalPublicKey")
|
fmt.Println("UnmarshalPublicKey")
|
||||||
@@ -79,7 +78,6 @@ func (pr *PeerRecord) ExtractPeer(ourkey string, key string, pubKey crypto.PubKe
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return false, nil, err
|
return false, nil, err
|
||||||
}
|
}
|
||||||
fmt.Println("ExtractPeer MarshalPublicKey")
|
|
||||||
rel := pp.NONE
|
rel := pp.NONE
|
||||||
if ourkey == key { // at this point is PeerID is same as our... we are... thats our peer INFO
|
if ourkey == key { // at this point is PeerID is same as our... we are... thats our peer INFO
|
||||||
rel = pp.SELF
|
rel = pp.SELF
|
||||||
@@ -106,7 +104,8 @@ func (pr *PeerRecord) ExtractPeer(ourkey string, key string, pubKey crypto.PubKe
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return pp.SELF == p.Relation, nil, err
|
return pp.SELF == p.Relation, nil, err
|
||||||
}
|
}
|
||||||
go tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{
|
fmt.Println("SENDPEER SELF")
|
||||||
|
go tools.NewNATSCaller().SetNATSPub(tools.CREATE_PEER, tools.NATSResponse{
|
||||||
FromApp: "oc-discovery",
|
FromApp: "oc-discovery",
|
||||||
Datatype: tools.PEER,
|
Datatype: tools.PEER,
|
||||||
Method: int(tools.CREATE_PEER),
|
Method: int(tools.CREATE_PEER),
|
||||||
@@ -128,6 +127,7 @@ type GetResponse struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ix *IndexerService) initNodeHandler() {
|
func (ix *IndexerService) initNodeHandler() {
|
||||||
|
fmt.Println("Node activity")
|
||||||
ix.Host.SetStreamHandler(common.ProtocolHeartbeat, ix.HandleNodeHeartbeat)
|
ix.Host.SetStreamHandler(common.ProtocolHeartbeat, ix.HandleNodeHeartbeat)
|
||||||
ix.Host.SetStreamHandler(common.ProtocolPublish, ix.handleNodePublish)
|
ix.Host.SetStreamHandler(common.ProtocolPublish, ix.handleNodePublish)
|
||||||
ix.Host.SetStreamHandler(common.ProtocolGet, ix.handleNodeGet)
|
ix.Host.SetStreamHandler(common.ProtocolGet, ix.handleNodeGet)
|
||||||
@@ -182,18 +182,17 @@ func (ix *IndexerService) handleNodePublish(s network.Stream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if ix.LongLivedPubSubs[common.TopicPubSubNodeActivity] != nil && !rec.NoPub {
|
if ix.LongLivedPubSubs[common.TopicPubSubNodeActivity] != nil && !rec.NoPub {
|
||||||
ad, err := peer.AddrInfoFromString("/ip4/" + conf.GetConfig().Hostname + " /tcp/" + fmt.Sprintf("%v", conf.GetConfig().NodeEndpointPort) + " /p2p/" + ix.Host.ID().String())
|
|
||||||
if err == nil {
|
|
||||||
if b, err := json.Marshal(common.TopicNodeActivityPub{
|
if b, err := json.Marshal(common.TopicNodeActivityPub{
|
||||||
Disposer: *ad,
|
Disposer: "/ip4/" + conf.GetConfig().Hostname + "/tcp/" + fmt.Sprintf("%v", conf.GetConfig().NodeEndpointPort) + "/p2p/" + ix.Host.ID().String(),
|
||||||
DID: rec.DID,
|
DID: rec.DID,
|
||||||
Name: rec.Name,
|
Name: rec.Name,
|
||||||
PeerID: pid.String(),
|
PeerID: pid.String(),
|
||||||
NodeActivity: pp.ONLINE,
|
NodeActivity: pp.ONLINE.EnumIndex(),
|
||||||
}); err == nil {
|
}); err == nil {
|
||||||
ix.LongLivedPubSubs[common.TopicPubSubNodeActivity].Publish(context.Background(), b)
|
ix.LongLivedPubSubs[common.TopicPubSubNodeActivity].Publish(context.Background(), b)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if rec.TTL > 0 {
|
if rec.TTL > 0 {
|
||||||
@@ -256,7 +255,8 @@ func (ix *IndexerService) handleNodeGet(s network.Stream) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if ix.Host.Network().Connectedness(pid) != network.Connected {
|
if ix.Host.Network().Connectedness(pid) != network.Connected {
|
||||||
_ = ix.Host.Connect(ctxTTL, dsp.Disposer)
|
if ad, err := peer.AddrInfoFromString(dsp.Disposer); err == nil {
|
||||||
|
_ = ix.Host.Connect(ctxTTL, *ad)
|
||||||
str, err := ix.Host.NewStream(ctxTTL, pid, common.ProtocolGet)
|
str, err := ix.Host.NewStream(ctxTTL, pid, common.ProtocolGet)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
@@ -281,6 +281,7 @@ func (ix *IndexerService) handleNodeGet(s network.Stream) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
// Not found
|
// Not found
|
||||||
_ = json.NewEncoder(s).Encode(resp)
|
_ = json.NewEncoder(s).Encode(resp)
|
||||||
ix.StreamMU.Unlock()
|
ix.StreamMU.Unlock()
|
||||||
|
|||||||
@@ -48,12 +48,15 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int) *IndexerServ
|
|||||||
}
|
}
|
||||||
f := func(ctx context.Context, evt common.TopicNodeActivityPub, _ string) {
|
f := func(ctx context.Context, evt common.TopicNodeActivityPub, _ string) {
|
||||||
ix.mu.Lock()
|
ix.mu.Lock()
|
||||||
if evt.NodeActivity == pp.OFFLINE {
|
if pid, err := peer.Decode(evt.PeerID); err == nil {
|
||||||
delete(ix.DisposedPeers, evt.Disposer.ID)
|
if evt.NodeActivity == pp.OFFLINE.EnumIndex() {
|
||||||
|
delete(ix.DisposedPeers, pid)
|
||||||
}
|
}
|
||||||
if evt.NodeActivity == pp.ONLINE {
|
if evt.NodeActivity == pp.ONLINE.EnumIndex() {
|
||||||
ix.DisposedPeers[evt.Disposer.ID] = &evt
|
ix.DisposedPeers[pid] = &evt
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ix.mu.Unlock()
|
ix.mu.Unlock()
|
||||||
}
|
}
|
||||||
ix.SubscribeToNodeActivity(ix.PS, &f) // now we subscribe to a long run topic named node-activity, to relay message.
|
ix.SubscribeToNodeActivity(ix.PS, &f) // now we subscribe to a long run topic named node-activity, to relay message.
|
||||||
|
|||||||
@@ -4,12 +4,93 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"oc-discovery/daemons/node/common"
|
||||||
|
"oc-discovery/daemons/node/stream"
|
||||||
|
|
||||||
|
oclib "cloud.o-forge.io/core/oc-lib"
|
||||||
|
"cloud.o-forge.io/core/oc-lib/config"
|
||||||
|
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||||
|
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||||||
"cloud.o-forge.io/core/oc-lib/tools"
|
"cloud.o-forge.io/core/oc-lib/tools"
|
||||||
|
pp "github.com/libp2p/go-libp2p/core/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ListenNATS(n Node) {
|
func ListenNATS(n *Node) {
|
||||||
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
|
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
|
||||||
|
tools.VERIFY_RESOURCE: func(resp tools.NATSResponse) {
|
||||||
|
if resp.FromApp == config.GetAppName() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if res, err := resources.ToResource(resp.Datatype.EnumIndex(), resp.Payload); err == nil {
|
||||||
|
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||||
|
p := access.LoadOne(res.GetCreatorID())
|
||||||
|
realP := p.ToPeer()
|
||||||
|
if realP == nil {
|
||||||
|
return
|
||||||
|
} else if realP.Relation == peer.SELF {
|
||||||
|
pubKey, err := common.PubKeyFromString(realP.PublicKey) // extract pubkey from pubkey str
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ok, _ := pubKey.Verify(resp.Payload, res.GetSignature())
|
||||||
|
if b, err := json.Marshal(stream.Verify{
|
||||||
|
IsVerified: ok,
|
||||||
|
}); err == nil {
|
||||||
|
tools.NewNATSCaller().SetNATSPub(tools.VERIFY_RESOURCE, tools.NATSResponse{
|
||||||
|
FromApp: "oc-discovery",
|
||||||
|
Method: int(tools.VERIFY_RESOURCE),
|
||||||
|
Payload: b,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
} else if realP.Relation != peer.BLACKLIST {
|
||||||
|
n.StreamService.PublishVerifyResources(&resp.Datatype, resp.User, realP.PeerID, resp.Payload)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
tools.CREATE_PEER: func(resp tools.NATSResponse) {
|
||||||
|
if resp.FromApp == config.GetAppName() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logger := oclib.GetLogger()
|
||||||
|
m := map[string]interface{}{}
|
||||||
|
err := json.Unmarshal(resp.Payload, &m)
|
||||||
|
if err != nil {
|
||||||
|
logger.Err(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p := &peer.Peer{}
|
||||||
|
p = p.Deserialize(m, p).(*peer.Peer)
|
||||||
|
|
||||||
|
ad, err := pp.AddrInfoFromString(p.PeerID)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
n.StreamService.Mu.Lock()
|
||||||
|
defer n.StreamService.Mu.Unlock()
|
||||||
|
|
||||||
|
n.Mu.Lock()
|
||||||
|
n.Peers[ad.ID] = p.State == peer.ONLINE
|
||||||
|
n.Mu.Unlock()
|
||||||
|
|
||||||
|
if p.Relation == peer.PARTNER {
|
||||||
|
n.StreamService.ConnectToPartner(ad.ID, ad)
|
||||||
|
} else {
|
||||||
|
ps := common.ProtocolStream{}
|
||||||
|
for p, s := range n.StreamService.Streams {
|
||||||
|
m := map[pp.ID]*common.Stream{}
|
||||||
|
for k := range s {
|
||||||
|
if ad.ID != k {
|
||||||
|
m[k] = s[k]
|
||||||
|
} else {
|
||||||
|
s[k].Stream.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ps[p] = m
|
||||||
|
}
|
||||||
|
n.StreamService.Streams = ps
|
||||||
|
}
|
||||||
|
|
||||||
|
},
|
||||||
tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) {
|
tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) {
|
||||||
var propalgation tools.PropalgationMessage
|
var propalgation tools.PropalgationMessage
|
||||||
err := json.Unmarshal(resp.Payload, &propalgation)
|
err := json.Unmarshal(resp.Payload, &propalgation)
|
||||||
|
|||||||
@@ -11,10 +11,12 @@ import (
|
|||||||
"oc-discovery/daemons/node/indexer"
|
"oc-discovery/daemons/node/indexer"
|
||||||
"oc-discovery/daemons/node/pubsub"
|
"oc-discovery/daemons/node/pubsub"
|
||||||
"oc-discovery/daemons/node/stream"
|
"oc-discovery/daemons/node/stream"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
oclib "cloud.o-forge.io/core/oc-lib"
|
oclib "cloud.o-forge.io/core/oc-lib"
|
||||||
"cloud.o-forge.io/core/oc-lib/models/peer"
|
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||||
|
"cloud.o-forge.io/core/oc-lib/tools"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
"github.com/libp2p/go-libp2p"
|
"github.com/libp2p/go-libp2p"
|
||||||
pubsubs "github.com/libp2p/go-libp2p-pubsub"
|
pubsubs "github.com/libp2p/go-libp2p-pubsub"
|
||||||
@@ -30,6 +32,9 @@ type Node struct {
|
|||||||
StreamService *stream.StreamService
|
StreamService *stream.StreamService
|
||||||
PeerID pp.ID
|
PeerID pp.ID
|
||||||
isIndexer bool
|
isIndexer bool
|
||||||
|
|
||||||
|
Mu sync.RWMutex
|
||||||
|
Peers map[pp.ID]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func InitNode(isNode bool, isIndexer bool) (*Node, error) {
|
func InitNode(isNode bool, isIndexer bool) (*Node, error) {
|
||||||
@@ -38,7 +43,7 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) {
|
|||||||
}
|
}
|
||||||
logger := oclib.GetLogger()
|
logger := oclib.GetLogger()
|
||||||
logger.Info().Msg("retrieving private key...")
|
logger.Info().Msg("retrieving private key...")
|
||||||
priv, err := common.LoadKeyFromFilePrivate() // your node private key
|
priv, err := tools.LoadKeyFromFilePrivate() // your node private key
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -60,6 +65,7 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) {
|
|||||||
return nil, errors.New("no host no node")
|
return nil, errors.New("no host no node")
|
||||||
}
|
}
|
||||||
node := &Node{
|
node := &Node{
|
||||||
|
Peers: map[pp.ID]bool{},
|
||||||
PeerID: h.ID(),
|
PeerID: h.ID(),
|
||||||
isIndexer: isIndexer,
|
isIndexer: isIndexer,
|
||||||
LongLivedStreamRecordedService: common.NewStreamRecordedService[interface{}](h, 1000, false),
|
LongLivedStreamRecordedService: common.NewStreamRecordedService[interface{}](h, 1000, false),
|
||||||
@@ -95,13 +101,42 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
node.SubscribeToSearch(node.PS, &f)
|
node.SubscribeToSearch(node.PS, &f)
|
||||||
|
ff := func(ctx context.Context, evt common.TopicNodeActivityPub, _ string) {
|
||||||
|
node.Mu.Lock()
|
||||||
|
defer node.Mu.Unlock()
|
||||||
|
if pid, err := pp.Decode(evt.PeerID); err == nil {
|
||||||
|
if _, ok := node.Peers[pid]; !ok {
|
||||||
|
node.Peers[pid] = evt.NodeActivity == peer.ONLINE.EnumIndex()
|
||||||
|
m := map[string]interface{}{
|
||||||
|
"id": evt.DID,
|
||||||
|
"state": evt.NodeActivity,
|
||||||
|
}
|
||||||
|
if b, err := json.Marshal(m); err == nil {
|
||||||
|
go tools.NewNATSCaller().SetNATSPub(tools.CREATE_PEER, tools.NATSResponse{
|
||||||
|
FromApp: "oc-discovery",
|
||||||
|
Datatype: tools.PEER,
|
||||||
|
Method: int(tools.CREATE_PEER),
|
||||||
|
Payload: b,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
node.SubscribeToNodeActivity(node.PS, &ff)
|
||||||
|
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||||
|
peers := access.LoadAll(false)
|
||||||
|
for _, p := range peers.Data { // fill cache.
|
||||||
|
if pid, err := pp.Decode(p.(*peer.Peer).PeerID); err == nil {
|
||||||
|
node.Peers[pid] = p.(*peer.Peer).State == peer.ONLINE
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if isIndexer {
|
if isIndexer {
|
||||||
logger.Info().Msg("generate opencloud indexer...")
|
logger.Info().Msg("generate opencloud indexer...")
|
||||||
node.IndexerService = indexer.NewIndexerService(node.Host, ps, 5)
|
node.IndexerService = indexer.NewIndexerService(node.Host, ps, 5)
|
||||||
}
|
}
|
||||||
logger.Info().Msg("connect to NATS")
|
logger.Info().Msg("connect to NATS")
|
||||||
ListenNATS(*node)
|
ListenNATS(node)
|
||||||
logger.Info().Msg("Node is actually running.")
|
logger.Info().Msg("Node is actually running.")
|
||||||
return node, nil
|
return node, nil
|
||||||
}
|
}
|
||||||
@@ -118,7 +153,7 @@ func (d *Node) Close() {
|
|||||||
func (d *Node) publishPeerRecord(
|
func (d *Node) publishPeerRecord(
|
||||||
rec *indexer.PeerRecord,
|
rec *indexer.PeerRecord,
|
||||||
) error {
|
) error {
|
||||||
priv, err := common.LoadKeyFromFilePrivate() // your node private key
|
priv, err := tools.LoadKeyFromFilePrivate() // your node private key
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -203,11 +238,11 @@ func (d *Node) claimInfo(
|
|||||||
return nil, errors.New("no endpoint found for peer")
|
return nil, errors.New("no endpoint found for peer")
|
||||||
}
|
}
|
||||||
peerID := uuid.New().String()
|
peerID := uuid.New().String()
|
||||||
priv, err := common.LoadKeyFromFilePrivate()
|
priv, err := tools.LoadKeyFromFilePrivate()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
pub, err := common.LoadKeyFromFilePublic()
|
pub, err := tools.LoadKeyFromFilePublic()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"oc-discovery/daemons/node/common"
|
|
||||||
"oc-discovery/models"
|
"oc-discovery/models"
|
||||||
|
|
||||||
oclib "cloud.o-forge.io/core/oc-lib"
|
oclib "cloud.o-forge.io/core/oc-lib"
|
||||||
@@ -54,7 +53,7 @@ func (ps *PubSubService) publishEvent(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
priv, err := common.LoadKeyFromFilePrivate()
|
priv, err := tools.LoadKeyFromFilePrivate()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package stream
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/subtle"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"oc-discovery/daemons/node/common"
|
"oc-discovery/daemons/node/common"
|
||||||
@@ -13,24 +14,60 @@ import (
|
|||||||
"cloud.o-forge.io/core/oc-lib/tools"
|
"cloud.o-forge.io/core/oc-lib/tools"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (ps *StreamService) getTopicName(topicName string) tools.PubSubAction {
|
type Verify struct {
|
||||||
|
IsVerified bool `json:"is_verified"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ps *StreamService) getTopicName(topicName string) string {
|
||||||
ns := strings.Split(topicName, ".")
|
ns := strings.Split(topicName, ".")
|
||||||
if len(ns) > 0 {
|
if len(ns) > 0 {
|
||||||
return tools.GetActionString(ns[0])
|
return ns[0]
|
||||||
}
|
}
|
||||||
return tools.NONE
|
return tools.NONE.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *StreamService) handleEvent(topicName string, evt *common.Event) error {
|
func (ps *StreamService) handleEvent(topicName string, evt *common.Event) error {
|
||||||
action := ps.getTopicName(topicName)
|
action := ps.getTopicName(topicName)
|
||||||
if err := ps.handleEventFromPartner(evt, action); err != nil {
|
ps.handleEventFromPartner(evt, action)
|
||||||
|
if action == "verify_resource" {
|
||||||
|
if evt.DataType == -1 {
|
||||||
|
tools.NewNATSCaller().SetNATSPub(tools.VERIFY_RESOURCE, tools.NATSResponse{
|
||||||
|
FromApp: "oc-discovery",
|
||||||
|
Method: int(tools.VERIFY_RESOURCE),
|
||||||
|
Payload: evt.Payload,
|
||||||
|
})
|
||||||
|
} else if err := ps.verifyResponse(evt); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if action == tools.PB_SEARCH_RESPONSE {
|
}
|
||||||
|
if action == tools.PB_SEARCH_RESPONSE.String() {
|
||||||
if err := ps.retrieveResponse(evt); err != nil {
|
if err := ps.retrieveResponse(evt); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return errors.New("no action authorized available : " + action)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (abs *StreamService) verifyResponse(event *common.Event) error { //
|
||||||
|
res, err := resources.ToResource(int(event.DataType), event.Payload)
|
||||||
|
if err != nil || res == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
verify := Verify{
|
||||||
|
IsVerified: false,
|
||||||
|
}
|
||||||
|
access := oclib.NewRequestAdmin(oclib.LibDataEnum(event.DataType), nil)
|
||||||
|
data := access.LoadOne(res.GetID())
|
||||||
|
if data.Err == "" && data.Data != nil {
|
||||||
|
if b, err := json.Marshal(data.Data); err == nil {
|
||||||
|
if res2, err := resources.ToResource(int(event.DataType), b); err == nil {
|
||||||
|
verify.IsVerified = subtle.ConstantTimeCompare(res.GetSignature(), res2.GetSignature()) == 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if b, err := json.Marshal(verify); err == nil {
|
||||||
|
abs.PublishVerifyResources(nil, "", event.From, b)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -49,10 +86,7 @@ func (abs *StreamService) retrieveResponse(event *common.Event) error { //
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ps *StreamService) handleEventFromPartner(evt *common.Event, action tools.PubSubAction) error {
|
func (ps *StreamService) handleEventFromPartner(evt *common.Event, action string) error {
|
||||||
if !(action == tools.PB_CREATE || action == tools.PB_UPDATE || action == tools.PB_DELETE) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
resource, err := resources.ToResource(int(evt.DataType), evt.Payload)
|
resource, err := resources.ToResource(int(evt.DataType), evt.Payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -62,7 +96,7 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, action tools.
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
switch action {
|
switch action {
|
||||||
case tools.PB_SEARCH:
|
case tools.PB_SEARCH.String():
|
||||||
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||||
peers := access.Search(nil, evt.From, false)
|
peers := access.Search(nil, evt.From, false)
|
||||||
if len(peers.Data) > 0 {
|
if len(peers.Data) > 0 {
|
||||||
@@ -72,15 +106,15 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, action tools.
|
|||||||
} else if p, err := ps.Node.GetPeerRecord(context.Background(), evt.From); err == nil && len(p) > 0 { // peer from is peerID
|
} else if p, err := ps.Node.GetPeerRecord(context.Background(), evt.From); err == nil && len(p) > 0 { // peer from is peerID
|
||||||
ps.SendResponse(p[0], evt)
|
ps.SendResponse(p[0], evt)
|
||||||
}
|
}
|
||||||
case tools.PB_CREATE:
|
case tools.PB_CREATE.String():
|
||||||
case tools.PB_UPDATE:
|
case tools.PB_UPDATE.String():
|
||||||
go tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{
|
go tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{
|
||||||
FromApp: "oc-discovery",
|
FromApp: "oc-discovery",
|
||||||
Datatype: tools.DataType(evt.DataType),
|
Datatype: tools.DataType(evt.DataType),
|
||||||
Method: int(tools.CREATE_RESOURCE),
|
Method: int(tools.CREATE_RESOURCE),
|
||||||
Payload: b,
|
Payload: b,
|
||||||
})
|
})
|
||||||
case tools.PB_DELETE:
|
case tools.PB_DELETE.String():
|
||||||
go tools.NewNATSCaller().SetNATSPub(tools.REMOVE_RESOURCE, tools.NATSResponse{
|
go tools.NewNATSCaller().SetNATSPub(tools.REMOVE_RESOURCE, tools.NATSResponse{
|
||||||
FromApp: "oc-discovery",
|
FromApp: "oc-discovery",
|
||||||
Datatype: tools.DataType(evt.DataType),
|
Datatype: tools.DataType(evt.DataType),
|
||||||
@@ -88,7 +122,7 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, action tools.
|
|||||||
Payload: b,
|
Payload: b,
|
||||||
})
|
})
|
||||||
default:
|
default:
|
||||||
return errors.New("no action authorized available : " + action.String())
|
return errors.New("no action authorized available : " + action)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,6 +17,20 @@ import (
|
|||||||
"github.com/libp2p/go-libp2p/core/protocol"
|
"github.com/libp2p/go-libp2p/core/protocol"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (ps *StreamService) PublishVerifyResources(dt *tools.DataType, user string, toPeerID string, resource []byte) (*common.Stream, error) {
|
||||||
|
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||||
|
p := access.LoadOne(toPeerID)
|
||||||
|
if p.Err != "" {
|
||||||
|
return nil, errors.New(p.Err)
|
||||||
|
} else {
|
||||||
|
ad, err := pp.AddrInfoFromString(p.Data.(*peer.Peer).StreamAddress)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return ps.write("verify_resource", toPeerID, ad, dt, user, resource, ProtocolVerifyResource, p.Data.(*peer.Peer).Relation == peer.PARTNER)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (ps *StreamService) PublishResources(dt *tools.DataType, user string, toPeerID string, resource []byte) error {
|
func (ps *StreamService) PublishResources(dt *tools.DataType, user string, toPeerID string, resource []byte) error {
|
||||||
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||||
p := access.LoadOne(toPeerID)
|
p := access.LoadOne(toPeerID)
|
||||||
@@ -27,7 +41,7 @@ func (ps *StreamService) PublishResources(dt *tools.DataType, user string, toPee
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
ps.write(tools.PB_SEARCH, toPeerID, ad, dt, user, resource, ProtocolSearchResource, p.Data.(*peer.Peer).Relation == peer.PARTNER)
|
ps.write(tools.PB_SEARCH.String(), toPeerID, ad, dt, user, resource, ProtocolSearchResource, p.Data.(*peer.Peer).Relation == peer.PARTNER)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -55,7 +69,7 @@ func (ps *StreamService) SearchKnownPublishEvent(dt *tools.DataType, user string
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ps.write(tools.PB_SEARCH, p.GetID(), ad, dt, user, b, ProtocolSearchResource, p.(*peer.Peer).Relation == peer.PARTNER)
|
ps.write(tools.PB_SEARCH.String(), p.GetID(), ad, dt, user, b, ProtocolSearchResource, p.(*peer.Peer).Relation == peer.PARTNER)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -74,7 +88,7 @@ func (ps *StreamService) SearchPartnersPublishEvent(dt *tools.DataType, user str
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ps.write(tools.PB_SEARCH, p.GetID(), ad, dt, user, b, ProtocolSearchResource, true)
|
ps.write(tools.PB_SEARCH.String(), p.GetID(), ad, dt, user, b, ProtocolSearchResource, true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -91,8 +105,8 @@ func (ps *StreamService) ToPartnerPublishEvent(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
ps.mu.Lock()
|
ps.Mu.Lock()
|
||||||
defer ps.mu.Unlock()
|
defer ps.Mu.Unlock()
|
||||||
if p.Relation == peer.PARTNER {
|
if p.Relation == peer.PARTNER {
|
||||||
if ps.Streams[ProtocolHeartbeatPartner] == nil {
|
if ps.Streams[ProtocolHeartbeatPartner] == nil {
|
||||||
ps.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{}
|
ps.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{}
|
||||||
@@ -116,7 +130,7 @@ func (ps *StreamService) ToPartnerPublishEvent(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
ps.write(action, p.GetID(), ad, dt, user, payload, protocol, true)
|
ps.write(action.String(), p.GetID(), ad, dt, user, payload, protocol, true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -124,22 +138,22 @@ func (ps *StreamService) ToPartnerPublishEvent(
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *StreamService) write(
|
func (s *StreamService) write(
|
||||||
action tools.PubSubAction,
|
action string,
|
||||||
did string,
|
did string,
|
||||||
peerID *pp.AddrInfo,
|
peerID *pp.AddrInfo,
|
||||||
dt *tools.DataType,
|
dt *tools.DataType,
|
||||||
user string,
|
user string,
|
||||||
payload []byte,
|
payload []byte,
|
||||||
proto protocol.ID,
|
proto protocol.ID,
|
||||||
isAPartner bool) error {
|
isAPartner bool) (*common.Stream, error) {
|
||||||
logger := oclib.GetLogger()
|
logger := oclib.GetLogger()
|
||||||
|
|
||||||
name := action.String() + "#" + peerID.ID.String()
|
name := action + "#" + peerID.ID.String()
|
||||||
if dt != nil {
|
if dt != nil {
|
||||||
name = action.String() + "." + (*dt).String() + "#" + peerID.ID.String()
|
name = action + "." + (*dt).String() + "#" + peerID.ID.String()
|
||||||
}
|
}
|
||||||
s.mu.Lock()
|
s.Mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.Mu.Unlock()
|
||||||
if s.Streams[proto] == nil {
|
if s.Streams[proto] == nil {
|
||||||
s.Streams[proto] = map[pp.ID]*common.Stream{}
|
s.Streams[proto] = map[pp.ID]*common.Stream{}
|
||||||
}
|
}
|
||||||
@@ -169,10 +183,9 @@ func (s *StreamService) write(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return errors.New("no stream available for protocol " + fmt.Sprintf("%v", proto) + " from PID " + peerID.ID.String())
|
return nil, errors.New("no stream available for protocol " + fmt.Sprintf("%v", proto) + " from PID " + peerID.ID.String())
|
||||||
}
|
}
|
||||||
stream := s.Streams[proto][peerID.ID]
|
stream := s.Streams[proto][peerID.ID]
|
||||||
|
|
||||||
@@ -182,7 +195,7 @@ func (s *StreamService) write(
|
|||||||
if err := enc.Encode(evt); err != nil {
|
if err := enc.Encode(evt); err != nil {
|
||||||
stream.Stream.Close()
|
stream.Stream.Close()
|
||||||
logger.Err(err)
|
logger.Err(err)
|
||||||
return nil
|
return stream, nil
|
||||||
}
|
}
|
||||||
return nil
|
return stream, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -26,6 +26,8 @@ const ProtocolCreateResource = "/opencloud/resource/create/1.0"
|
|||||||
const ProtocolUpdateResource = "/opencloud/resource/update/1.0"
|
const ProtocolUpdateResource = "/opencloud/resource/update/1.0"
|
||||||
const ProtocolDeleteResource = "/opencloud/resource/delete/1.0"
|
const ProtocolDeleteResource = "/opencloud/resource/delete/1.0"
|
||||||
|
|
||||||
|
const ProtocolVerifyResource = "/opencloud/resource/verify/1.0"
|
||||||
|
|
||||||
const ProtocolHeartbeatPartner = "/opencloud/resource/heartbeat/partner/1.0"
|
const ProtocolHeartbeatPartner = "/opencloud/resource/heartbeat/partner/1.0"
|
||||||
|
|
||||||
var protocols = []protocol.ID{
|
var protocols = []protocol.ID{
|
||||||
@@ -33,6 +35,7 @@ var protocols = []protocol.ID{
|
|||||||
ProtocolCreateResource,
|
ProtocolCreateResource,
|
||||||
ProtocolUpdateResource,
|
ProtocolUpdateResource,
|
||||||
ProtocolDeleteResource,
|
ProtocolDeleteResource,
|
||||||
|
ProtocolVerifyResource,
|
||||||
}
|
}
|
||||||
|
|
||||||
type StreamService struct {
|
type StreamService struct {
|
||||||
@@ -41,7 +44,7 @@ type StreamService struct {
|
|||||||
Node common.DiscoveryPeer
|
Node common.DiscoveryPeer
|
||||||
Streams common.ProtocolStream
|
Streams common.ProtocolStream
|
||||||
maxNodesConn int
|
maxNodesConn int
|
||||||
mu sync.Mutex
|
Mu sync.Mutex
|
||||||
// Stream map[protocol.ID]map[pp.ID]*daemons.Stream
|
// Stream map[protocol.ID]map[pp.ID]*daemons.Stream
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -67,8 +70,8 @@ func (s *StreamService) HandlePartnerHeartbeat(stream network.Stream) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
s.mu.Lock()
|
s.Mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.Mu.Unlock()
|
||||||
|
|
||||||
if s.Streams[ProtocolHeartbeatPartner] == nil {
|
if s.Streams[ProtocolHeartbeatPartner] == nil {
|
||||||
s.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{}
|
s.Streams[ProtocolHeartbeatPartner] = map[pp.ID]*common.Stream{}
|
||||||
@@ -89,6 +92,20 @@ func (s *StreamService) HandlePartnerHeartbeat(stream network.Stream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *StreamService) connectToPartners() error {
|
func (s *StreamService) connectToPartners() error {
|
||||||
|
for _, proto := range protocols {
|
||||||
|
f := func(ss network.Stream) {
|
||||||
|
if s.Streams[proto] == nil {
|
||||||
|
s.Streams[proto] = map[pp.ID]*common.Stream{}
|
||||||
|
}
|
||||||
|
s.Streams[proto][ss.Conn().RemotePeer()] = &common.Stream{
|
||||||
|
Stream: ss,
|
||||||
|
Expiry: time.Now().UTC().Add(2 * time.Minute),
|
||||||
|
}
|
||||||
|
go s.readLoop(s.Streams[proto][ss.Conn().RemotePeer()])
|
||||||
|
}
|
||||||
|
fmt.Println("SetStreamHandler", proto)
|
||||||
|
s.Host.SetStreamHandler(proto, f)
|
||||||
|
}
|
||||||
peers, err := s.searchPeer(fmt.Sprintf("%v", peer.PARTNER.EnumIndex()))
|
peers, err := s.searchPeer(fmt.Sprintf("%v", peer.PARTNER.EnumIndex()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -105,26 +122,13 @@ func (s *StreamService) connectToPartners() error {
|
|||||||
s.ConnectToPartner(pid, ad)
|
s.ConnectToPartner(pid, ad)
|
||||||
// heartbeat your partner.
|
// heartbeat your partner.
|
||||||
}
|
}
|
||||||
for _, proto := range protocols {
|
|
||||||
f := func(ss network.Stream) {
|
|
||||||
if s.Streams[proto] == nil {
|
|
||||||
s.Streams[proto] = map[pp.ID]*common.Stream{}
|
|
||||||
}
|
|
||||||
s.Streams[proto][ss.Conn().RemotePeer()] = &common.Stream{
|
|
||||||
Stream: ss,
|
|
||||||
Expiry: time.Now().UTC().Add(2 * time.Minute),
|
|
||||||
}
|
|
||||||
s.readLoop(s.Streams[proto][ss.Conn().RemotePeer()])
|
|
||||||
}
|
|
||||||
fmt.Println("SetStreamHandler", proto)
|
|
||||||
s.Host.SetStreamHandler(proto, f)
|
|
||||||
}
|
|
||||||
// TODO if handle... from partner then HeartBeat back
|
// TODO if handle... from partner then HeartBeat back
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *StreamService) ConnectToPartner(pid pp.ID, ad *pp.AddrInfo) {
|
func (s *StreamService) ConnectToPartner(pid pp.ID, ad *pp.AddrInfo) {
|
||||||
logger := oclib.GetLogger()
|
logger := oclib.GetLogger()
|
||||||
|
force := false
|
||||||
for _, proto := range protocols {
|
for _, proto := range protocols {
|
||||||
f := func(ss network.Stream) {
|
f := func(ss network.Stream) {
|
||||||
if s.Streams[proto] == nil {
|
if s.Streams[proto] == nil {
|
||||||
@@ -134,15 +138,16 @@ func (s *StreamService) ConnectToPartner(pid pp.ID, ad *pp.AddrInfo) {
|
|||||||
Stream: ss,
|
Stream: ss,
|
||||||
Expiry: time.Now().UTC().Add(2 * time.Minute),
|
Expiry: time.Now().UTC().Add(2 * time.Minute),
|
||||||
}
|
}
|
||||||
s.readLoop(s.Streams[proto][pid])
|
go s.readLoop(s.Streams[proto][pid])
|
||||||
}
|
}
|
||||||
if s.Host.Network().Connectedness(ad.ID) != network.Connected {
|
if s.Host.Network().Connectedness(ad.ID) != network.Connected {
|
||||||
|
force = true
|
||||||
if err := s.Host.Connect(context.Background(), *ad); err != nil {
|
if err := s.Host.Connect(context.Background(), *ad); err != nil {
|
||||||
logger.Err(err)
|
logger.Err(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.Streams = common.AddStreamProtocol(nil, s.Streams, s.Host, proto, pid, s.Key, false, &f)
|
s.Streams = common.AddStreamProtocol(nil, s.Streams, s.Host, proto, pid, s.Key, force, &f)
|
||||||
}
|
}
|
||||||
common.SendHeartbeat(context.Background(), ProtocolHeartbeatPartner, conf.GetConfig().Name,
|
common.SendHeartbeat(context.Background(), ProtocolHeartbeatPartner, conf.GetConfig().Name,
|
||||||
s.Host, s.Streams, []*pp.AddrInfo{ad}, 20*time.Second)
|
s.Host, s.Streams, []*pp.AddrInfo{ad}, 20*time.Second)
|
||||||
@@ -153,14 +158,15 @@ func (s *StreamService) searchPeer(search string) ([]*peer.Peer, error) {
|
|||||||
ps := []*peer.Peer{}
|
ps := []*peer.Peer{}
|
||||||
if conf.GetConfig().PeerIDS != "" {
|
if conf.GetConfig().PeerIDS != "" {
|
||||||
for _, peerID := range strings.Split(conf.GetConfig().PeerIDS, ",") {
|
for _, peerID := range strings.Split(conf.GetConfig().PeerIDS, ",") {
|
||||||
ppID := strings.Split(peerID, ":")
|
ppID := strings.Split(peerID, "/")
|
||||||
|
fmt.Println(ppID, peerID)
|
||||||
ps = append(ps, &peer.Peer{
|
ps = append(ps, &peer.Peer{
|
||||||
AbstractObject: utils.AbstractObject{
|
AbstractObject: utils.AbstractObject{
|
||||||
UUID: uuid.New().String(),
|
UUID: uuid.New().String(),
|
||||||
Name: ppID[1],
|
Name: ppID[1],
|
||||||
},
|
},
|
||||||
PeerID: ppID[1],
|
PeerID: ppID[len(ppID)-1],
|
||||||
StreamAddress: "/ip4/127.0.0.1/tcp/" + ppID[0] + "/p2p/" + ppID[1],
|
StreamAddress: peerID,
|
||||||
State: peer.ONLINE,
|
State: peer.ONLINE,
|
||||||
Relation: peer.PARTNER,
|
Relation: peer.PARTNER,
|
||||||
})
|
})
|
||||||
@@ -194,8 +200,8 @@ func (s *StreamService) StartGC(interval time.Duration) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *StreamService) gc() {
|
func (s *StreamService) gc() {
|
||||||
s.mu.Lock()
|
s.Mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.Mu.Unlock()
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
|
|
||||||
if s.Streams[ProtocolHeartbeatPartner] == nil {
|
if s.Streams[ProtocolHeartbeatPartner] == nil {
|
||||||
|
|||||||
@@ -4,5 +4,5 @@
|
|||||||
"NATS_URL": "nats://nats:4222",
|
"NATS_URL": "nats://nats:4222",
|
||||||
"NODE_MODE": "indexer",
|
"NODE_MODE": "indexer",
|
||||||
"NODE_ENDPOINT_PORT": 4002,
|
"NODE_ENDPOINT_PORT": 4002,
|
||||||
"INDEXER_ADDRESSES": "/ip4/oc-discovery1/tcp/4001/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu"
|
"INDEXER_ADDRESSES": "/ip4/172.19.0.2/tcp/4001/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu"
|
||||||
}
|
}
|
||||||
@@ -4,5 +4,5 @@
|
|||||||
"NATS_URL": "nats://nats:4222",
|
"NATS_URL": "nats://nats:4222",
|
||||||
"NODE_MODE": "node",
|
"NODE_MODE": "node",
|
||||||
"NODE_ENDPOINT_PORT": 4003,
|
"NODE_ENDPOINT_PORT": 4003,
|
||||||
"INDEXER_ADDRESSES": "/ip4/oc-discovery2/tcp/4002/p2p/12D3KooWC3GNStak8KCYtJq11Dxiq45EJV53z1ZvKetMcZBeBX6u"
|
"INDEXER_ADDRESSES": "/ip4/172.19.0.3/tcp/4002/p2p/12D3KooWC3GNStak8KCYtJq11Dxiq45EJV53z1ZvKetMcZBeBX6u"
|
||||||
}
|
}
|
||||||
@@ -4,6 +4,6 @@
|
|||||||
"NATS_URL": "nats://nats:4222",
|
"NATS_URL": "nats://nats:4222",
|
||||||
"NODE_MODE": "node",
|
"NODE_MODE": "node",
|
||||||
"NODE_ENDPOINT_PORT": 4004,
|
"NODE_ENDPOINT_PORT": 4004,
|
||||||
"INDEXER_ADDRESSES": "/ip4/oc-discovery1/tcp/4001/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu",
|
"INDEXER_ADDRESSES": "/ip4/172.19.0.2/tcp/4001/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu",
|
||||||
"PEER_IDS": "/ip4/oc-discovery3/tcp/4003/p2p/12D3KooWGn3j4XqTSrjJDGGpTQERdDV5TPZdhQp87rAUnvQssvQu"
|
"PEER_IDS": "/ip4/172.19.0.4/tcp/4003/p2p/12D3KooWBh9kZrekBAE5G33q4jCLNRAzygem3gP1mMdK8mhoCTaw"
|
||||||
}
|
}
|
||||||
2
go.mod
2
go.mod
@@ -3,7 +3,7 @@ module oc-discovery
|
|||||||
go 1.24.6
|
go 1.24.6
|
||||||
|
|
||||||
require (
|
require (
|
||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260203150531-ef916fe2d995
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260209113703-b9c9b6678099
|
||||||
github.com/beego/beego v1.12.13
|
github.com/beego/beego v1.12.13
|
||||||
github.com/beego/beego/v2 v2.3.8
|
github.com/beego/beego/v2 v2.3.8
|
||||||
github.com/go-redis/redis v6.15.9+incompatible
|
github.com/go-redis/redis v6.15.9+incompatible
|
||||||
|
|||||||
12
go.sum
12
go.sum
@@ -34,6 +34,18 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260203150123-4258f6b58083 h1:nKiU4AfeX+axS
|
|||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260203150123-4258f6b58083/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks=
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260203150123-4258f6b58083/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks=
|
||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260203150531-ef916fe2d995 h1:ZDRvnzTTNHgMm5hYmseHdEPqQ6rn/4v+P9f/JIxPaNw=
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260203150531-ef916fe2d995 h1:ZDRvnzTTNHgMm5hYmseHdEPqQ6rn/4v+P9f/JIxPaNw=
|
||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260203150531-ef916fe2d995/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks=
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260203150531-ef916fe2d995/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260205131048-425cd2a9ba2f h1:Ku6u+SeoNXHMBzckekGyXCHLDJPh20Y8GayO6fXEcZE=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260205131048-425cd2a9ba2f/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260205131630-342451db2581 h1:V9eANWFEkoEPg3nWCvYXnLYbKDdAm3/Y7uCw1nt22Cc=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260205131630-342451db2581/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260209090340-c2aa2fedaa02 h1:sPVOuXArsUhtBecqyu8PB+/UJUsLHJfzX8tkFtkGbTs=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260209090340-c2aa2fedaa02/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260209095010-bafeee0d0590 h1:SPw0rHNwgSKtcvzUSCz97zV11iKO8bDBCAokqkTMpvw=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260209095010-bafeee0d0590/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260209095536-b767afb30168 h1:HHmfg0ktsJ5aTIXjmMhY8s6Pxb3F94OuVPQl+vhQ5Xs=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260209095536-b767afb30168/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260209113703-b9c9b6678099 h1:HczicbRtjiU51McjpDkmCsrQVs406bHybbLd+ZkqTo0=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260209113703-b9c9b6678099/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo=
|
||||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||||
github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
|
github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
|
||||||
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||||
|
|||||||
4
main.go
4
main.go
@@ -21,14 +21,12 @@ func main() {
|
|||||||
oclib.InitDaemon(appname)
|
oclib.InitDaemon(appname)
|
||||||
// get the right config file
|
// get the right config file
|
||||||
|
|
||||||
o := oclib.GetConfLoader()
|
o := oclib.GetConfLoader(appname)
|
||||||
|
|
||||||
conf.GetConfig().Name = o.GetStringDefault("NAME", "opencloud-demo")
|
conf.GetConfig().Name = o.GetStringDefault("NAME", "opencloud-demo")
|
||||||
conf.GetConfig().Hostname = o.GetStringDefault("HOSTNAME", "127.0.0.1")
|
conf.GetConfig().Hostname = o.GetStringDefault("HOSTNAME", "127.0.0.1")
|
||||||
conf.GetConfig().PSKPath = o.GetStringDefault("PSK_PATH", "./psk/psk.key")
|
conf.GetConfig().PSKPath = o.GetStringDefault("PSK_PATH", "./psk/psk.key")
|
||||||
conf.GetConfig().NodeEndpointPort = o.GetInt64Default("NODE_ENDPOINT_PORT", 4001)
|
conf.GetConfig().NodeEndpointPort = o.GetInt64Default("NODE_ENDPOINT_PORT", 4001)
|
||||||
conf.GetConfig().PublicKeyPath = o.GetStringDefault("PUBLIC_KEY_PATH", "./pem/public.pem")
|
|
||||||
conf.GetConfig().PrivateKeyPath = o.GetStringDefault("PRIVATE_KEY_PATH", "./pem/private.pem")
|
|
||||||
conf.GetConfig().IndexerAddresses = o.GetStringDefault("INDEXER_ADDRESSES", "")
|
conf.GetConfig().IndexerAddresses = o.GetStringDefault("INDEXER_ADDRESSES", "")
|
||||||
|
|
||||||
conf.GetConfig().PeerIDS = o.GetStringDefault("PEER_IDS", "")
|
conf.GetConfig().PeerIDS = o.GetStringDefault("PEER_IDS", "")
|
||||||
|
|||||||
Reference in New Issue
Block a user