From fa914958b688534edad9d9f208d8f4202488944f Mon Sep 17 00:00:00 2001 From: mr Date: Mon, 9 Feb 2026 13:28:00 +0100 Subject: [PATCH] Keep Peer Caching + Resource Verification. --- daemons/node/common/common_pubsub.go | 12 +++--- daemons/node/common/common_stream.go | 20 ++++----- daemons/node/common/crypto.go | 43 ------------------- daemons/node/indexer/handler.go | 60 +++++++++++++------------- daemons/node/indexer/service.go | 13 +++--- daemons/node/nats.go | 42 +++++++++++++++++- daemons/node/node.go | 45 ++++++++++++++++--- daemons/node/pubsub/publish.go | 3 +- daemons/node/stream/handler.go | 64 +++++++++++++++++++++------- daemons/node/stream/publish.go | 37 ++++++++++------ daemons/node/stream/service.go | 3 ++ go.mod | 2 +- go.sum | 8 ++++ main.go | 2 - 14 files changed, 221 insertions(+), 133 deletions(-) diff --git a/daemons/node/common/common_pubsub.go b/daemons/node/common/common_pubsub.go index 807c653..eecfcff 100644 --- a/daemons/node/common/common_pubsub.go +++ b/daemons/node/common/common_pubsub.go @@ -28,7 +28,7 @@ type Event struct { } func NewEvent(name string, from string, dt *tools.DataType, user string, payload []byte) *Event { - priv, err := LoadKeyFromFilePrivate() // your node private key + priv, err := tools.LoadKeyFromFilePrivate() // your node private key if err != nil { return nil } @@ -88,11 +88,11 @@ func (event *Event) Verify(p *peer.Peer) error { } type TopicNodeActivityPub struct { - NodeActivity peer.PeerState - Disposer pp.AddrInfo `json:"disposer_address"` - Name string `json:"name"` - DID string `json:"did"` // real PEER ID - PeerID string `json:"peer_id"` + NodeActivity int `json:"node_activity"` + Disposer string `json:"disposer_address"` + Name string `json:"name"` + DID string `json:"did"` // real PEER ID + PeerID string `json:"peer_id"` } type LongLivedPubSubService struct { diff --git a/daemons/node/common/common_stream.go b/daemons/node/common/common_stream.go index a8d557b..c701d6a 100644 --- a/daemons/node/common/common_stream.go +++ b/daemons/node/common/common_stream.go @@ -68,18 +68,16 @@ func (ix *LongLivedStreamRecordedService[T]) gc() { } ix.PubsubMu.Lock() if ix.LongLivedPubSubs[TopicPubSubNodeActivity] != nil { - ad, err := pp.AddrInfoFromString("/ip4/" + conf.GetConfig().Hostname + "/tcp/" + fmt.Sprintf("%v", conf.GetConfig().NodeEndpointPort) + "/p2p/" + ix.Host.ID().String()) - if err == nil { - if b, err := json.Marshal(TopicNodeActivityPub{ - Disposer: *ad, - Name: rec.HeartbeatStream.Name, - DID: rec.HeartbeatStream.DID, - PeerID: pid.String(), - NodeActivity: peer.OFFLINE, - }); err == nil { - ix.LongLivedPubSubs[TopicPubSubNodeActivity].Publish(context.Background(), b) - } + if b, err := json.Marshal(TopicNodeActivityPub{ + Disposer: "/ip4/" + conf.GetConfig().Hostname + "/tcp/" + fmt.Sprintf("%v", conf.GetConfig().NodeEndpointPort) + "/p2p/" + ix.Host.ID().String(), + Name: rec.HeartbeatStream.Name, + DID: rec.HeartbeatStream.DID, + PeerID: pid.String(), + NodeActivity: peer.OFFLINE.EnumIndex(), + }); err == nil { + ix.LongLivedPubSubs[TopicPubSubNodeActivity].Publish(context.Background(), b) } + } ix.PubsubMu.Unlock() } diff --git a/daemons/node/common/crypto.go b/daemons/node/common/crypto.go index 854ca0d..065026c 100644 --- a/daemons/node/common/crypto.go +++ b/daemons/node/common/crypto.go @@ -2,12 +2,8 @@ package common import ( "bytes" - "crypto/ed25519" - "crypto/x509" "encoding/base64" - "encoding/pem" "errors" - "fmt" "oc-discovery/conf" "oc-discovery/models" "os" @@ -47,45 +43,6 @@ func Verify(pub crypto.PubKey, data, sig []byte) (bool, error) { return pub.Verify(data, sig) } -func LoadKeyFromFilePrivate() (crypto.PrivKey, error) { - path := conf.GetConfig().PrivateKeyPath - data, err := os.ReadFile(path) - if err != nil { - return nil, err - } - block, _ := pem.Decode(data) - keyAny, err := x509.ParsePKCS8PrivateKey(block.Bytes) - if err != nil { - return nil, err - } - - edKey, ok := keyAny.(ed25519.PrivateKey) - if !ok { - return nil, fmt.Errorf("not an ed25519 key") - } - return crypto.UnmarshalEd25519PrivateKey(edKey) -} - -func LoadKeyFromFilePublic() (crypto.PubKey, error) { - path := conf.GetConfig().PublicKeyPath - data, err := os.ReadFile(path) - if err != nil { - return nil, err - } - block, _ := pem.Decode(data) - keyAny, err := x509.ParsePKIXPublicKey(block.Bytes) - if err != nil { - return nil, err - } - - edKey, ok := keyAny.(ed25519.PublicKey) - if !ok { - return nil, fmt.Errorf("not an ed25519 key") - } - // Try to unmarshal as libp2p private key (supports ed25519, rsa, etc.) - return crypto.UnmarshalEd25519PublicKey(edKey) -} - func LoadPSKFromFile() (pnet.PSK, error) { path := conf.GetConfig().PSKPath data, err := os.ReadFile(path) diff --git a/daemons/node/indexer/handler.go b/daemons/node/indexer/handler.go index 7a1dbba..9d5536f 100644 --- a/daemons/node/indexer/handler.go +++ b/daemons/node/indexer/handler.go @@ -36,7 +36,7 @@ type PeerRecord struct { } func (p *PeerRecord) Sign() error { - priv, err := common.LoadKeyFromFilePrivate() + priv, err := tools.LoadKeyFromFilePrivate() if err != nil { return err } @@ -53,7 +53,6 @@ func (p *PeerRecord) Sign() error { } func (p *PeerRecord) Verify() (crypto.PubKey, error) { - fmt.Println(p.PubKey) pubKey, err := crypto.UnmarshalPublicKey(p.PubKey) // retrieve pub key in message if err != nil { fmt.Println("UnmarshalPublicKey") @@ -183,18 +182,17 @@ func (ix *IndexerService) handleNodePublish(s network.Stream) { } if ix.LongLivedPubSubs[common.TopicPubSubNodeActivity] != nil && !rec.NoPub { - ad, err := peer.AddrInfoFromString("/ip4/" + conf.GetConfig().Hostname + "/tcp/" + fmt.Sprintf("%v", conf.GetConfig().NodeEndpointPort) + "/p2p/" + ix.Host.ID().String()) - if err == nil { - if b, err := json.Marshal(common.TopicNodeActivityPub{ - Disposer: *ad, - DID: rec.DID, - Name: rec.Name, - PeerID: pid.String(), - NodeActivity: pp.ONLINE, - }); err == nil { - ix.LongLivedPubSubs[common.TopicPubSubNodeActivity].Publish(context.Background(), b) - } + + if b, err := json.Marshal(common.TopicNodeActivityPub{ + Disposer: "/ip4/" + conf.GetConfig().Hostname + "/tcp/" + fmt.Sprintf("%v", conf.GetConfig().NodeEndpointPort) + "/p2p/" + ix.Host.ID().String(), + DID: rec.DID, + Name: rec.Name, + PeerID: pid.String(), + NodeActivity: pp.ONLINE.EnumIndex(), + }); err == nil { + ix.LongLivedPubSubs[common.TopicPubSubNodeActivity].Publish(context.Background(), b) } + } if rec.TTL > 0 { @@ -257,26 +255,28 @@ func (ix *IndexerService) handleNodeGet(s network.Stream) { continue } if ix.Host.Network().Connectedness(pid) != network.Connected { - _ = ix.Host.Connect(ctxTTL, dsp.Disposer) - str, err := ix.Host.NewStream(ctxTTL, pid, common.ProtocolGet) - if err != nil { - continue - } - for { - if ctxTTL.Err() == context.DeadlineExceeded { - break - } - var subResp GetResponse - if err := json.NewDecoder(str).Decode(&resp); err != nil { + if ad, err := peer.AddrInfoFromString(dsp.Disposer); err == nil { + _ = ix.Host.Connect(ctxTTL, *ad) + str, err := ix.Host.NewStream(ctxTTL, pid, common.ProtocolGet) + if err != nil { continue } - if subResp.Found { - for k, v := range subResp.Records { - if _, ok := resp.Records[k]; !ok { - resp.Records[k] = v - } + for { + if ctxTTL.Err() == context.DeadlineExceeded { + break + } + var subResp GetResponse + if err := json.NewDecoder(str).Decode(&resp); err != nil { + continue + } + if subResp.Found { + for k, v := range subResp.Records { + if _, ok := resp.Records[k]; !ok { + resp.Records[k] = v + } + } + break } - break } } } diff --git a/daemons/node/indexer/service.go b/daemons/node/indexer/service.go index 379bbf7..d5fd840 100644 --- a/daemons/node/indexer/service.go +++ b/daemons/node/indexer/service.go @@ -48,12 +48,15 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int) *IndexerServ } f := func(ctx context.Context, evt common.TopicNodeActivityPub, _ string) { ix.mu.Lock() - if evt.NodeActivity == pp.OFFLINE { - delete(ix.DisposedPeers, evt.Disposer.ID) - } - if evt.NodeActivity == pp.ONLINE { - ix.DisposedPeers[evt.Disposer.ID] = &evt + if pid, err := peer.Decode(evt.PeerID); err == nil { + if evt.NodeActivity == pp.OFFLINE.EnumIndex() { + delete(ix.DisposedPeers, pid) + } + if evt.NodeActivity == pp.ONLINE.EnumIndex() { + ix.DisposedPeers[pid] = &evt + } } + ix.mu.Unlock() } ix.SubscribeToNodeActivity(ix.PS, &f) // now we subscribe to a long run topic named node-activity, to relay message. diff --git a/daemons/node/nats.go b/daemons/node/nats.go index 2120433..8814c75 100644 --- a/daemons/node/nats.go +++ b/daemons/node/nats.go @@ -5,16 +5,48 @@ import ( "encoding/json" "fmt" "oc-discovery/daemons/node/common" + "oc-discovery/daemons/node/stream" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/models/peer" + "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/tools" pp "github.com/libp2p/go-libp2p/core/peer" ) -func ListenNATS(n Node) { +func ListenNATS(n *Node) { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ + tools.VERIFY_RESOURCE: func(resp tools.NATSResponse) { + if resp.FromApp == config.GetAppName() { + return + } + if res, err := resources.ToResource(resp.Datatype.EnumIndex(), resp.Payload); err == nil { + access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) + p := access.LoadOne(res.GetCreatorID()) + realP := p.ToPeer() + if realP == nil { + return + } else if realP.Relation == peer.SELF { + pubKey, err := common.PubKeyFromString(realP.PublicKey) // extract pubkey from pubkey str + if err != nil { + return + } + ok, _ := pubKey.Verify(resp.Payload, res.GetSignature()) + if b, err := json.Marshal(stream.Verify{ + IsVerified: ok, + }); err == nil { + tools.NewNATSCaller().SetNATSPub(tools.VERIFY_RESOURCE, tools.NATSResponse{ + FromApp: "oc-discovery", + Method: int(tools.VERIFY_RESOURCE), + Payload: b, + }) + } + } else if realP.Relation != peer.BLACKLIST { + n.StreamService.PublishVerifyResources(&resp.Datatype, resp.User, realP.PeerID, resp.Payload) + } + } + }, tools.CREATE_PEER: func(resp tools.NATSResponse) { if resp.FromApp == config.GetAppName() { return @@ -30,9 +62,16 @@ func ListenNATS(n Node) { p = p.Deserialize(m, p).(*peer.Peer) ad, err := pp.AddrInfoFromString(p.PeerID) + if err != nil { + return + } n.StreamService.Mu.Lock() defer n.StreamService.Mu.Unlock() + n.Mu.Lock() + n.Peers[ad.ID] = p.State == peer.ONLINE + n.Mu.Unlock() + if p.Relation == peer.PARTNER { n.StreamService.ConnectToPartner(ad.ID, ad) } else { @@ -50,6 +89,7 @@ func ListenNATS(n Node) { } n.StreamService.Streams = ps } + }, tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) { var propalgation tools.PropalgationMessage diff --git a/daemons/node/node.go b/daemons/node/node.go index fd50428..89212ca 100644 --- a/daemons/node/node.go +++ b/daemons/node/node.go @@ -11,10 +11,12 @@ import ( "oc-discovery/daemons/node/indexer" "oc-discovery/daemons/node/pubsub" "oc-discovery/daemons/node/stream" + "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/models/peer" + "cloud.o-forge.io/core/oc-lib/tools" "github.com/google/uuid" "github.com/libp2p/go-libp2p" pubsubs "github.com/libp2p/go-libp2p-pubsub" @@ -30,6 +32,9 @@ type Node struct { StreamService *stream.StreamService PeerID pp.ID isIndexer bool + + Mu sync.RWMutex + Peers map[pp.ID]bool } func InitNode(isNode bool, isIndexer bool) (*Node, error) { @@ -38,7 +43,7 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) { } logger := oclib.GetLogger() logger.Info().Msg("retrieving private key...") - priv, err := common.LoadKeyFromFilePrivate() // your node private key + priv, err := tools.LoadKeyFromFilePrivate() // your node private key if err != nil { return nil, err } @@ -60,6 +65,7 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) { return nil, errors.New("no host no node") } node := &Node{ + Peers: map[pp.ID]bool{}, PeerID: h.ID(), isIndexer: isIndexer, LongLivedStreamRecordedService: common.NewStreamRecordedService[interface{}](h, 1000, false), @@ -95,13 +101,42 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) { } } node.SubscribeToSearch(node.PS, &f) + ff := func(ctx context.Context, evt common.TopicNodeActivityPub, _ string) { + node.Mu.Lock() + defer node.Mu.Unlock() + if pid, err := pp.Decode(evt.PeerID); err == nil { + if _, ok := node.Peers[pid]; !ok { + node.Peers[pid] = evt.NodeActivity == peer.ONLINE.EnumIndex() + m := map[string]interface{}{ + "id": evt.DID, + "state": evt.NodeActivity, + } + if b, err := json.Marshal(m); err == nil { + go tools.NewNATSCaller().SetNATSPub(tools.CREATE_PEER, tools.NATSResponse{ + FromApp: "oc-discovery", + Datatype: tools.PEER, + Method: int(tools.CREATE_PEER), + Payload: b, + }) + } + } + } + } + node.SubscribeToNodeActivity(node.PS, &ff) + access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) + peers := access.LoadAll(false) + for _, p := range peers.Data { // fill cache. + if pid, err := pp.Decode(p.(*peer.Peer).PeerID); err == nil { + node.Peers[pid] = p.(*peer.Peer).State == peer.ONLINE + } + } } if isIndexer { logger.Info().Msg("generate opencloud indexer...") node.IndexerService = indexer.NewIndexerService(node.Host, ps, 5) } logger.Info().Msg("connect to NATS") - ListenNATS(*node) + ListenNATS(node) logger.Info().Msg("Node is actually running.") return node, nil } @@ -118,7 +153,7 @@ func (d *Node) Close() { func (d *Node) publishPeerRecord( rec *indexer.PeerRecord, ) error { - priv, err := common.LoadKeyFromFilePrivate() // your node private key + priv, err := tools.LoadKeyFromFilePrivate() // your node private key if err != nil { return err } @@ -203,11 +238,11 @@ func (d *Node) claimInfo( return nil, errors.New("no endpoint found for peer") } peerID := uuid.New().String() - priv, err := common.LoadKeyFromFilePrivate() + priv, err := tools.LoadKeyFromFilePrivate() if err != nil { return nil, err } - pub, err := common.LoadKeyFromFilePublic() + pub, err := tools.LoadKeyFromFilePublic() if err != nil { return nil, err } diff --git a/daemons/node/pubsub/publish.go b/daemons/node/pubsub/publish.go index 9f63d5f..a614a8f 100644 --- a/daemons/node/pubsub/publish.go +++ b/daemons/node/pubsub/publish.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "errors" - "oc-discovery/daemons/node/common" "oc-discovery/models" oclib "cloud.o-forge.io/core/oc-lib" @@ -54,7 +53,7 @@ func (ps *PubSubService) publishEvent( if err != nil { return err } - priv, err := common.LoadKeyFromFilePrivate() + priv, err := tools.LoadKeyFromFilePrivate() if err != nil { return err } diff --git a/daemons/node/stream/handler.go b/daemons/node/stream/handler.go index 80b0054..3b5b105 100644 --- a/daemons/node/stream/handler.go +++ b/daemons/node/stream/handler.go @@ -2,6 +2,7 @@ package stream import ( "context" + "crypto/subtle" "encoding/json" "errors" "oc-discovery/daemons/node/common" @@ -13,24 +14,60 @@ import ( "cloud.o-forge.io/core/oc-lib/tools" ) -func (ps *StreamService) getTopicName(topicName string) tools.PubSubAction { +type Verify struct { + IsVerified bool `json:"is_verified"` +} + +func (ps *StreamService) getTopicName(topicName string) string { ns := strings.Split(topicName, ".") if len(ns) > 0 { - return tools.GetActionString(ns[0]) + return ns[0] } - return tools.NONE + return tools.NONE.String() } func (ps *StreamService) handleEvent(topicName string, evt *common.Event) error { action := ps.getTopicName(topicName) - if err := ps.handleEventFromPartner(evt, action); err != nil { - return err + ps.handleEventFromPartner(evt, action) + if action == "verify_resource" { + if evt.DataType == -1 { + tools.NewNATSCaller().SetNATSPub(tools.VERIFY_RESOURCE, tools.NATSResponse{ + FromApp: "oc-discovery", + Method: int(tools.VERIFY_RESOURCE), + Payload: evt.Payload, + }) + } else if err := ps.verifyResponse(evt); err != nil { + return err + } } - if action == tools.PB_SEARCH_RESPONSE { + if action == tools.PB_SEARCH_RESPONSE.String() { if err := ps.retrieveResponse(evt); err != nil { return err } } + return errors.New("no action authorized available : " + action) +} + +func (abs *StreamService) verifyResponse(event *common.Event) error { // + res, err := resources.ToResource(int(event.DataType), event.Payload) + if err != nil || res == nil { + return nil + } + verify := Verify{ + IsVerified: false, + } + access := oclib.NewRequestAdmin(oclib.LibDataEnum(event.DataType), nil) + data := access.LoadOne(res.GetID()) + if data.Err == "" && data.Data != nil { + if b, err := json.Marshal(data.Data); err == nil { + if res2, err := resources.ToResource(int(event.DataType), b); err == nil { + verify.IsVerified = subtle.ConstantTimeCompare(res.GetSignature(), res2.GetSignature()) == 1 + } + } + } + if b, err := json.Marshal(verify); err == nil { + abs.PublishVerifyResources(nil, "", event.From, b) + } return nil } @@ -49,10 +86,7 @@ func (abs *StreamService) retrieveResponse(event *common.Event) error { // return nil } -func (ps *StreamService) handleEventFromPartner(evt *common.Event, action tools.PubSubAction) error { - if !(action == tools.PB_CREATE || action == tools.PB_UPDATE || action == tools.PB_DELETE) { - return nil - } +func (ps *StreamService) handleEventFromPartner(evt *common.Event, action string) error { resource, err := resources.ToResource(int(evt.DataType), evt.Payload) if err != nil { return err @@ -62,7 +96,7 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, action tools. return err } switch action { - case tools.PB_SEARCH: + case tools.PB_SEARCH.String(): access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) peers := access.Search(nil, evt.From, false) if len(peers.Data) > 0 { @@ -72,15 +106,15 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, action tools. } else if p, err := ps.Node.GetPeerRecord(context.Background(), evt.From); err == nil && len(p) > 0 { // peer from is peerID ps.SendResponse(p[0], evt) } - case tools.PB_CREATE: - case tools.PB_UPDATE: + case tools.PB_CREATE.String(): + case tools.PB_UPDATE.String(): go tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.DataType(evt.DataType), Method: int(tools.CREATE_RESOURCE), Payload: b, }) - case tools.PB_DELETE: + case tools.PB_DELETE.String(): go tools.NewNATSCaller().SetNATSPub(tools.REMOVE_RESOURCE, tools.NATSResponse{ FromApp: "oc-discovery", Datatype: tools.DataType(evt.DataType), @@ -88,7 +122,7 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, action tools. Payload: b, }) default: - return errors.New("no action authorized available : " + action.String()) + return errors.New("no action authorized available : " + action) } return nil } diff --git a/daemons/node/stream/publish.go b/daemons/node/stream/publish.go index a588aa9..ca08fbb 100644 --- a/daemons/node/stream/publish.go +++ b/daemons/node/stream/publish.go @@ -17,6 +17,20 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" ) +func (ps *StreamService) PublishVerifyResources(dt *tools.DataType, user string, toPeerID string, resource []byte) (*common.Stream, error) { + access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) + p := access.LoadOne(toPeerID) + if p.Err != "" { + return nil, errors.New(p.Err) + } else { + ad, err := pp.AddrInfoFromString(p.Data.(*peer.Peer).StreamAddress) + if err != nil { + return nil, err + } + return ps.write("verify_resource", toPeerID, ad, dt, user, resource, ProtocolVerifyResource, p.Data.(*peer.Peer).Relation == peer.PARTNER) + } +} + func (ps *StreamService) PublishResources(dt *tools.DataType, user string, toPeerID string, resource []byte) error { access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil) p := access.LoadOne(toPeerID) @@ -27,7 +41,7 @@ func (ps *StreamService) PublishResources(dt *tools.DataType, user string, toPee if err != nil { return err } - ps.write(tools.PB_SEARCH, toPeerID, ad, dt, user, resource, ProtocolSearchResource, p.Data.(*peer.Peer).Relation == peer.PARTNER) + ps.write(tools.PB_SEARCH.String(), toPeerID, ad, dt, user, resource, ProtocolSearchResource, p.Data.(*peer.Peer).Relation == peer.PARTNER) } return nil } @@ -55,7 +69,7 @@ func (ps *StreamService) SearchKnownPublishEvent(dt *tools.DataType, user string if err != nil { continue } - ps.write(tools.PB_SEARCH, p.GetID(), ad, dt, user, b, ProtocolSearchResource, p.(*peer.Peer).Relation == peer.PARTNER) + ps.write(tools.PB_SEARCH.String(), p.GetID(), ad, dt, user, b, ProtocolSearchResource, p.(*peer.Peer).Relation == peer.PARTNER) } } return nil @@ -74,7 +88,7 @@ func (ps *StreamService) SearchPartnersPublishEvent(dt *tools.DataType, user str if err != nil { continue } - ps.write(tools.PB_SEARCH, p.GetID(), ad, dt, user, b, ProtocolSearchResource, true) + ps.write(tools.PB_SEARCH.String(), p.GetID(), ad, dt, user, b, ProtocolSearchResource, true) } } return nil @@ -116,7 +130,7 @@ func (ps *StreamService) ToPartnerPublishEvent( if err != nil { continue } - ps.write(action, p.GetID(), ad, dt, user, payload, protocol, true) + ps.write(action.String(), p.GetID(), ad, dt, user, payload, protocol, true) } } } @@ -124,19 +138,19 @@ func (ps *StreamService) ToPartnerPublishEvent( } func (s *StreamService) write( - action tools.PubSubAction, + action string, did string, peerID *pp.AddrInfo, dt *tools.DataType, user string, payload []byte, proto protocol.ID, - isAPartner bool) error { + isAPartner bool) (*common.Stream, error) { logger := oclib.GetLogger() - name := action.String() + "#" + peerID.ID.String() + name := action + "#" + peerID.ID.String() if dt != nil { - name = action.String() + "." + (*dt).String() + "#" + peerID.ID.String() + name = action + "." + (*dt).String() + "#" + peerID.ID.String() } s.Mu.Lock() defer s.Mu.Unlock() @@ -169,10 +183,9 @@ func (s *StreamService) write( } } } - } } - return errors.New("no stream available for protocol " + fmt.Sprintf("%v", proto) + " from PID " + peerID.ID.String()) + return nil, errors.New("no stream available for protocol " + fmt.Sprintf("%v", proto) + " from PID " + peerID.ID.String()) } stream := s.Streams[proto][peerID.ID] @@ -182,7 +195,7 @@ func (s *StreamService) write( if err := enc.Encode(evt); err != nil { stream.Stream.Close() logger.Err(err) - return nil + return stream, nil } - return nil + return stream, nil } diff --git a/daemons/node/stream/service.go b/daemons/node/stream/service.go index 56ecd12..8141c59 100644 --- a/daemons/node/stream/service.go +++ b/daemons/node/stream/service.go @@ -26,6 +26,8 @@ const ProtocolCreateResource = "/opencloud/resource/create/1.0" const ProtocolUpdateResource = "/opencloud/resource/update/1.0" const ProtocolDeleteResource = "/opencloud/resource/delete/1.0" +const ProtocolVerifyResource = "/opencloud/resource/verify/1.0" + const ProtocolHeartbeatPartner = "/opencloud/resource/heartbeat/partner/1.0" var protocols = []protocol.ID{ @@ -33,6 +35,7 @@ var protocols = []protocol.ID{ ProtocolCreateResource, ProtocolUpdateResource, ProtocolDeleteResource, + ProtocolVerifyResource, } type StreamService struct { diff --git a/go.mod b/go.mod index 3bd8cd0..44cf874 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-discovery go 1.24.6 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260205131630-342451db2581 + cloud.o-forge.io/core/oc-lib v0.0.0-20260209113703-b9c9b6678099 github.com/beego/beego v1.12.13 github.com/beego/beego/v2 v2.3.8 github.com/go-redis/redis v6.15.9+incompatible diff --git a/go.sum b/go.sum index 0d96cf0..81f43a6 100644 --- a/go.sum +++ b/go.sum @@ -38,6 +38,14 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260205131048-425cd2a9ba2f h1:Ku6u+SeoNXHMB cloud.o-forge.io/core/oc-lib v0.0.0-20260205131048-425cd2a9ba2f/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks= cloud.o-forge.io/core/oc-lib v0.0.0-20260205131630-342451db2581 h1:V9eANWFEkoEPg3nWCvYXnLYbKDdAm3/Y7uCw1nt22Cc= cloud.o-forge.io/core/oc-lib v0.0.0-20260205131630-342451db2581/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks= +cloud.o-forge.io/core/oc-lib v0.0.0-20260209090340-c2aa2fedaa02 h1:sPVOuXArsUhtBecqyu8PB+/UJUsLHJfzX8tkFtkGbTs= +cloud.o-forge.io/core/oc-lib v0.0.0-20260209090340-c2aa2fedaa02/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks= +cloud.o-forge.io/core/oc-lib v0.0.0-20260209095010-bafeee0d0590 h1:SPw0rHNwgSKtcvzUSCz97zV11iKO8bDBCAokqkTMpvw= +cloud.o-forge.io/core/oc-lib v0.0.0-20260209095010-bafeee0d0590/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks= +cloud.o-forge.io/core/oc-lib v0.0.0-20260209095536-b767afb30168 h1:HHmfg0ktsJ5aTIXjmMhY8s6Pxb3F94OuVPQl+vhQ5Xs= +cloud.o-forge.io/core/oc-lib v0.0.0-20260209095536-b767afb30168/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks= +cloud.o-forge.io/core/oc-lib v0.0.0-20260209113703-b9c9b6678099 h1:HczicbRtjiU51McjpDkmCsrQVs406bHybbLd+ZkqTo0= +cloud.o-forge.io/core/oc-lib v0.0.0-20260209113703-b9c9b6678099/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/main.go b/main.go index 201b4a0..73b2fda 100644 --- a/main.go +++ b/main.go @@ -27,8 +27,6 @@ func main() { conf.GetConfig().Hostname = o.GetStringDefault("HOSTNAME", "127.0.0.1") conf.GetConfig().PSKPath = o.GetStringDefault("PSK_PATH", "./psk/psk.key") conf.GetConfig().NodeEndpointPort = o.GetInt64Default("NODE_ENDPOINT_PORT", 4001) - conf.GetConfig().PublicKeyPath = o.GetStringDefault("PUBLIC_KEY_PATH", "./pem/public.pem") - conf.GetConfig().PrivateKeyPath = o.GetStringDefault("PRIVATE_KEY_PATH", "./pem/private.pem") conf.GetConfig().IndexerAddresses = o.GetStringDefault("INDEXER_ADDRESSES", "") conf.GetConfig().PeerIDS = o.GetStringDefault("PEER_IDS", "")