2026-02-02 12:14:01 +01:00
|
|
|
package node
|
2026-01-28 17:22:29 +01:00
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
2026-02-05 16:17:14 +01:00
|
|
|
"oc-discovery/daemons/node/common"
|
2026-02-09 13:28:00 +01:00
|
|
|
"oc-discovery/daemons/node/stream"
|
2026-01-28 17:22:29 +01:00
|
|
|
|
2026-02-05 16:17:14 +01:00
|
|
|
oclib "cloud.o-forge.io/core/oc-lib"
|
|
|
|
|
"cloud.o-forge.io/core/oc-lib/config"
|
|
|
|
|
"cloud.o-forge.io/core/oc-lib/models/peer"
|
2026-02-09 13:28:00 +01:00
|
|
|
"cloud.o-forge.io/core/oc-lib/models/resources"
|
2026-01-28 17:22:29 +01:00
|
|
|
"cloud.o-forge.io/core/oc-lib/tools"
|
2026-02-05 16:17:14 +01:00
|
|
|
pp "github.com/libp2p/go-libp2p/core/peer"
|
2026-01-28 17:22:29 +01:00
|
|
|
)
|
|
|
|
|
|
2026-02-09 13:28:00 +01:00
|
|
|
func ListenNATS(n *Node) {
|
2026-01-28 17:22:29 +01:00
|
|
|
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
|
2026-02-09 13:28:00 +01:00
|
|
|
tools.VERIFY_RESOURCE: func(resp tools.NATSResponse) {
|
|
|
|
|
if resp.FromApp == config.GetAppName() {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if res, err := resources.ToResource(resp.Datatype.EnumIndex(), resp.Payload); err == nil {
|
|
|
|
|
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
|
|
|
|
p := access.LoadOne(res.GetCreatorID())
|
|
|
|
|
realP := p.ToPeer()
|
|
|
|
|
if realP == nil {
|
|
|
|
|
return
|
|
|
|
|
} else if realP.Relation == peer.SELF {
|
|
|
|
|
pubKey, err := common.PubKeyFromString(realP.PublicKey) // extract pubkey from pubkey str
|
|
|
|
|
if err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
ok, _ := pubKey.Verify(resp.Payload, res.GetSignature())
|
|
|
|
|
if b, err := json.Marshal(stream.Verify{
|
|
|
|
|
IsVerified: ok,
|
|
|
|
|
}); err == nil {
|
|
|
|
|
tools.NewNATSCaller().SetNATSPub(tools.VERIFY_RESOURCE, tools.NATSResponse{
|
|
|
|
|
FromApp: "oc-discovery",
|
|
|
|
|
Method: int(tools.VERIFY_RESOURCE),
|
|
|
|
|
Payload: b,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
} else if realP.Relation != peer.BLACKLIST {
|
|
|
|
|
n.StreamService.PublishVerifyResources(&resp.Datatype, resp.User, realP.PeerID, resp.Payload)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
},
|
2026-02-05 16:17:14 +01:00
|
|
|
tools.CREATE_PEER: func(resp tools.NATSResponse) {
|
|
|
|
|
if resp.FromApp == config.GetAppName() {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
logger := oclib.GetLogger()
|
|
|
|
|
m := map[string]interface{}{}
|
|
|
|
|
err := json.Unmarshal(resp.Payload, &m)
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Err(err)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
p := &peer.Peer{}
|
|
|
|
|
p = p.Deserialize(m, p).(*peer.Peer)
|
|
|
|
|
|
|
|
|
|
ad, err := pp.AddrInfoFromString(p.PeerID)
|
2026-02-09 13:28:00 +01:00
|
|
|
if err != nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
2026-02-05 16:17:14 +01:00
|
|
|
n.StreamService.Mu.Lock()
|
|
|
|
|
defer n.StreamService.Mu.Unlock()
|
|
|
|
|
|
2026-02-09 13:28:00 +01:00
|
|
|
n.Mu.Lock()
|
|
|
|
|
n.Peers[ad.ID] = p.State == peer.ONLINE
|
|
|
|
|
n.Mu.Unlock()
|
|
|
|
|
|
2026-02-05 16:17:14 +01:00
|
|
|
if p.Relation == peer.PARTNER {
|
|
|
|
|
n.StreamService.ConnectToPartner(ad.ID, ad)
|
|
|
|
|
} else {
|
|
|
|
|
ps := common.ProtocolStream{}
|
|
|
|
|
for p, s := range n.StreamService.Streams {
|
|
|
|
|
m := map[pp.ID]*common.Stream{}
|
|
|
|
|
for k := range s {
|
|
|
|
|
if ad.ID != k {
|
|
|
|
|
m[k] = s[k]
|
2026-02-05 16:17:33 +01:00
|
|
|
} else {
|
|
|
|
|
s[k].Stream.Close()
|
2026-02-05 16:17:14 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
ps[p] = m
|
|
|
|
|
}
|
|
|
|
|
n.StreamService.Streams = ps
|
|
|
|
|
}
|
2026-02-09 13:28:00 +01:00
|
|
|
|
2026-02-05 16:17:14 +01:00
|
|
|
},
|
2026-01-28 17:22:29 +01:00
|
|
|
tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) {
|
|
|
|
|
var propalgation tools.PropalgationMessage
|
|
|
|
|
err := json.Unmarshal(resp.Payload, &propalgation)
|
|
|
|
|
var dt *tools.DataType
|
|
|
|
|
if propalgation.DataType > 0 {
|
|
|
|
|
dtt := tools.DataType(propalgation.DataType)
|
|
|
|
|
dt = &dtt
|
|
|
|
|
}
|
|
|
|
|
if err == nil {
|
|
|
|
|
switch propalgation.Action {
|
|
|
|
|
case tools.PB_CREATE:
|
|
|
|
|
case tools.PB_UPDATE:
|
|
|
|
|
case tools.PB_DELETE:
|
2026-01-30 16:57:36 +01:00
|
|
|
n.StreamService.ToPartnerPublishEvent(
|
2026-01-28 17:22:29 +01:00
|
|
|
context.Background(),
|
|
|
|
|
propalgation.Action,
|
2026-01-28 17:31:34 +01:00
|
|
|
dt, resp.User,
|
2026-01-28 17:22:29 +01:00
|
|
|
propalgation.Payload,
|
|
|
|
|
)
|
|
|
|
|
case tools.PB_SEARCH:
|
|
|
|
|
m := map[string]interface{}{}
|
|
|
|
|
json.Unmarshal(propalgation.Payload, &m)
|
2026-01-30 16:57:36 +01:00
|
|
|
n.PubSubService.SearchPublishEvent(
|
2026-01-28 17:22:29 +01:00
|
|
|
context.Background(),
|
|
|
|
|
dt,
|
|
|
|
|
fmt.Sprintf("%v", m["type"]),
|
2026-01-28 17:31:34 +01:00
|
|
|
resp.User,
|
2026-01-28 17:22:29 +01:00
|
|
|
fmt.Sprintf("%v", m["search"]),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
})
|
|
|
|
|
}
|