Peer Discovery -> DHT // no more pubsub state

This commit is contained in:
mr
2026-02-18 13:29:50 +01:00
parent 6a5ffb9a92
commit 0250c3b339
9 changed files with 318 additions and 292 deletions

View File

@@ -140,7 +140,6 @@ func (ix *LongLivedStreamRecordedService[T]) HandleNodeHeartbeat(s network.Strea
// if record already seen update last seen
if rec, ok := streams[*pid]; ok {
rec.DID = hb.DID
rec.Stream = s
rec.HeartbeatStream = hb.Stream
rec.HeartbeatStream.UptimeTracker.LastSeen = time.Now().UTC()
} else {
@@ -151,7 +150,6 @@ func (ix *LongLivedStreamRecordedService[T]) HandleNodeHeartbeat(s network.Strea
streams[*pid] = &StreamRecord[T]{
DID: hb.DID,
HeartbeatStream: hb.Stream,
Stream: s,
}
}
ix.StreamMU.Unlock()
@@ -283,7 +281,6 @@ func (u *UptimeTracker) IsEligible(min time.Duration) bool {
type StreamRecord[T interface{}] struct {
DID string
HeartbeatStream *Stream
Stream network.Stream
Record T
}
@@ -362,9 +359,10 @@ const (
var TimeWatcher time.Time
var StaticIndexers map[string]*pp.AddrInfo = map[string]*pp.AddrInfo{}
var StreamMuIndexes sync.RWMutex
var StreamIndexers ProtocolStream = ProtocolStream{}
func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) {
func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID) error {
TimeWatcher = time.Now().UTC()
logger := oclib.GetLogger()
addresses := strings.Split(conf.GetConfig().IndexerAddresses, ",")
@@ -380,29 +378,34 @@ func ConnectToIndexers(h host.Host, minIndexer int, maxIndexer int, myPID pp.ID)
logger.Err(err)
continue
}
force := false
if h.Network().Connectedness(ad.ID) != network.Connected {
force = true
if err := h.Connect(context.Background(), *ad); err != nil {
fmt.Println(err)
logger.Err(err)
/*for _, proto := range []protocol.ID{ProtocolPublish, ProtocolGet, ProtocolHeartbeat} {
if stream, err := TempStream(h, *ad, proto); err == nil {
StreamMuIndexes.Lock()
if StreamIndexers[proto] == nil {
StreamIndexers[proto] = map[pp.ID]*Stream{}
}
time.AfterFunc(2*time.Second, func() {
StreamMuIndexes.Lock()
defer StreamMuIndexes.Unlock()
delete(StreamIndexers[proto], ad.ID)
})
StreamIndexers[proto][ad.ID] = &Stream{
Stream: stream,
Expiry: time.Now().UTC().Add(2 * time.Second),
}
StreamMuIndexes.Unlock()
} else {
continue
}
}
}*/
StaticIndexers[indexerAddr] = ad
// make a privilege streams with indexer.
for _, proto := range []protocol.ID{ProtocolPublish, ProtocolGet, ProtocolHeartbeat} {
AddStreamProtocol(nil, StreamIndexers, h, proto, ad.ID, myPID, force, nil)
}
}
if len(StaticIndexers) == 0 {
logger.Err(errors.New("you run a node without indexers... your gonna be isolated."))
}
if len(StaticIndexers) < minIndexer {
// TODO : ask for unknown indexer.
}
SendHeartbeat(context.Background(), ProtocolHeartbeat, conf.GetConfig().Name, h, StreamIndexers, StaticIndexers, 20*time.Second) // your indexer is just like a node for the next indexer.
if len(StaticIndexers) < minIndexer {
return errors.New("you run a node without indexers... your gonna be isolated.")
}
return nil
}
func AddStreamProtocol(ctx *context.Context, protoS ProtocolStream, h host.Host, proto protocol.ID, id pp.ID, mypid pp.ID, force bool, onStreamCreated *func(network.Stream)) ProtocolStream {
@@ -484,7 +487,11 @@ func SendHeartbeat(ctx context.Context, proto protocol.ID, name string, h host.H
IndexersBinded: addrs,
}
for _, ix := range peers {
_ = sendHeartbeat(ctx, h, proto, ix, hb, ps, interval*time.Second)
if err = sendHeartbeat(ctx, h, proto, ix, hb, ps, interval*time.Second); err != nil {
StreamMuIndexes.Lock()
delete(StreamIndexers[proto], ix.ID)
StreamMuIndexes.Unlock()
}
}
case <-ctx.Done():
return
@@ -493,6 +500,40 @@ func SendHeartbeat(ctx context.Context, proto protocol.ID, name string, h host.H
}()
}
func TempStream(h host.Host, ad pp.AddrInfo, proto protocol.ID, did string, streams ProtocolStream, mu *sync.RWMutex) (ProtocolStream, error) {
if ctxTTL, err := context.WithTimeout(context.Background(), 2*time.Second); err == nil {
if h.Network().Connectedness(ad.ID) != network.Connected {
if err := h.Connect(ctxTTL, ad); err != nil {
return streams, err
}
}
if streams[proto] != nil && streams[proto][ad.ID] != nil {
return streams, nil
} else if s, err := h.NewStream(ctxTTL, ad.ID, proto); err == nil {
mu.Lock()
if streams[proto] == nil {
streams[proto] = map[pp.ID]*Stream{}
}
mu.Unlock()
time.AfterFunc(2*time.Second, func() {
mu.Lock()
defer mu.Unlock()
delete(streams[proto], ad.ID)
})
streams[ProtocolPublish][ad.ID] = &Stream{
DID: did,
Stream: s,
Expiry: time.Now().UTC().Add(2 * time.Second),
}
mu.Unlock()
return streams, nil
} else {
return streams, err
}
}
return streams, errors.New("can't create a context")
}
func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.AddrInfo,
hb Heartbeat, ps ProtocolStream, interval time.Duration) error {
streams := ps.Get(proto)
@@ -500,14 +541,14 @@ func sendHeartbeat(ctx context.Context, h host.Host, proto protocol.ID, p *pp.Ad
return errors.New("no stream for protocol heartbeat founded")
}
pss, exists := streams[p.ID]
ctxTTL, _ := context.WithTimeout(ctx, 3*interval)
// Connect si nécessaire
if h.Network().Connectedness(p.ID) != network.Connected {
_ = h.Connect(ctxTTL, *p)
if err := h.Connect(ctxTTL, *p); err != nil {
return err
}
exists = false // on devra recréer le stream
}
// Crée le stream si inexistant ou fermé
if !exists || pss.Stream == nil {
s, err := h.NewStream(ctx, p.ID, proto)

View File

@@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"oc-discovery/conf"
"oc-discovery/daemons/node/common"
"time"
@@ -30,9 +29,6 @@ type PeerRecord struct {
WalletAddress string `json:"wallet_address"`
Signature []byte `json:"signature"`
ExpiryDate time.Time `json:"expiry_date"`
TTL int `json:"ttl"` // max of hop diffusion
NoPub bool `json:"no_pub"`
}
func (p *PeerRecord) Sign() error {
@@ -119,6 +115,7 @@ func (pr *PeerRecord) ExtractPeer(ourkey string, key string, pubKey crypto.PubKe
type GetValue struct {
Key string `json:"key"`
PeerID peer.ID `json:"peer_id"`
}
type GetResponse struct {
@@ -126,8 +123,11 @@ type GetResponse struct {
Records map[string]PeerRecord `json:"records,omitempty"`
}
func (ix *IndexerService) genKey(did string) string {
return "/node/" + did
}
func (ix *IndexerService) initNodeHandler() {
fmt.Println("Node activity")
ix.Host.SetStreamHandler(common.ProtocolHeartbeat, ix.HandleNodeHeartbeat)
ix.Host.SetStreamHandler(common.ProtocolPublish, ix.handleNodePublish)
ix.Host.SetStreamHandler(common.ProtocolGet, ix.handleNodeGet)
@@ -139,6 +139,7 @@ func (ix *IndexerService) handleNodePublish(s network.Stream) {
for {
var rec PeerRecord
if err := json.NewDecoder(s).Decode(&rec); err != nil {
logger.Err(err)
continue
}
rec2 := PeerRecord{
@@ -152,7 +153,7 @@ func (ix *IndexerService) handleNodePublish(s network.Stream) {
continue
}
if rec.PeerID == "" || rec.ExpiryDate.Before(time.Now().UTC()) { // already expired
logger.Warn().Msg(rec.PeerID + " is expired.")
logger.Err(errors.New(rec.PeerID + " is expired."))
continue
}
pid, err := peer.Decode(rec.PeerID)
@@ -162,13 +163,12 @@ func (ix *IndexerService) handleNodePublish(s network.Stream) {
ix.StreamMU.Lock()
if ix.StreamRecords[common.ProtocolPublish] == nil {
ix.StreamRecords[common.ProtocolPublish] = map[peer.ID]*common.StreamRecord[PeerRecord]{}
if ix.StreamRecords[common.ProtocolHeartbeat] == nil {
ix.StreamRecords[common.ProtocolHeartbeat] = map[peer.ID]*common.StreamRecord[PeerRecord]{}
}
streams := ix.StreamRecords[common.ProtocolPublish]
streams := ix.StreamRecords[common.ProtocolHeartbeat]
if srec, ok := streams[pid]; ok {
fmt.Println("UPDATE PUBLISH", pid)
srec.DID = rec.DID
srec.Record = rec
srec.HeartbeatStream.UptimeTracker.LastSeen = time.Now().UTC()
@@ -179,36 +179,22 @@ func (ix *IndexerService) handleNodePublish(s network.Stream) {
}
ix.StreamMU.Unlock()
if ix.LongLivedPubSubs[common.TopicPubSubNodeActivity] != nil && !rec.NoPub {
key := ix.genKey(rec.DID)
if b, err := json.Marshal(common.TopicNodeActivityPub{
Disposer: "/ip4/" + conf.GetConfig().Hostname + "/tcp/" + fmt.Sprintf("%v", conf.GetConfig().NodeEndpointPort) + "/p2p/" + ix.Host.ID().String(),
DID: rec.DID,
Name: rec.Name,
PeerID: pid.String(),
NodeActivity: pp.ONLINE.EnumIndex(),
}); err == nil {
ix.LongLivedPubSubs[common.TopicPubSubNodeActivity].Publish(context.Background(), b)
}
}
if rec.TTL > 0 {
rec.NoPub = true
for _, ad := range common.StaticIndexers {
if ad.ID == s.Conn().RemotePeer() {
data, err := json.Marshal(rec)
if err != nil {
logger.Err(err)
continue
}
if common.StreamIndexers[common.ProtocolPublish][ad.ID] == nil {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
if err := ix.DHT.PutValue(ctx, key, data); err != nil {
logger.Err(err)
cancel()
continue
}
stream := common.StreamIndexers[common.ProtocolPublish][ad.ID]
rec.TTL -= 1
if err := json.NewEncoder(stream.Stream).Encode(&rec); err != nil { // then publish on stream
continue
}
}
}
cancel()
break // response... so quit
}
}
@@ -223,64 +209,40 @@ func (ix *IndexerService) handleNodeGet(s network.Stream) {
}
ix.StreamMU.Lock()
if ix.StreamRecords[common.ProtocolGet] == nil {
ix.StreamRecords[common.ProtocolGet] = map[peer.ID]*common.StreamRecord[PeerRecord]{}
if ix.StreamRecords[common.ProtocolHeartbeat] == nil {
ix.StreamRecords[common.ProtocolHeartbeat] = map[peer.ID]*common.StreamRecord[PeerRecord]{}
}
resp := GetResponse{
Found: false,
Records: map[string]PeerRecord{},
}
streams := ix.StreamRecords[common.ProtocolPublish]
streams := ix.StreamRecords[common.ProtocolHeartbeat]
key := ix.genKey(req.Key)
// simple lookup by PeerID (or DID)
for _, rec := range streams {
if rec.Record.DID == req.Key || rec.Record.PeerID == req.Key || rec.Record.Name == req.Key { // OK
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
recBytes, err := ix.DHT.SearchValue(ctx, key)
if err != nil {
logger.Err(err).Msg("Failed to fetch PeerRecord from DHT")
cancel()
}
cancel()
for c := range recBytes {
var rec PeerRecord
if err := json.Unmarshal(c, &rec); err != nil || rec.PeerID != req.PeerID.String() {
continue
}
resp.Found = true
resp.Records[rec.Record.PeerID] = rec.Record
if rec.Record.DID == req.Key || rec.Record.PeerID == req.Key { // there unique... no need to proceed more...
_ = json.NewEncoder(s).Encode(resp)
ix.StreamMU.Unlock()
return
}
continue
}
}
// if not found ask to my neighboor indexers
for pid, dsp := range ix.DisposedPeers {
if _, ok := resp.Records[dsp.PeerID]; !ok && (dsp.Name == req.Key || dsp.DID == req.Key || dsp.PeerID == req.Key) {
ctxTTL, err := context.WithTimeout(context.Background(), 120*time.Second)
if err != nil {
continue
}
if ix.Host.Network().Connectedness(pid) != network.Connected {
if ad, err := peer.AddrInfoFromString(dsp.Disposer); err == nil {
_ = ix.Host.Connect(ctxTTL, *ad)
str, err := ix.Host.NewStream(ctxTTL, pid, common.ProtocolGet)
if err != nil {
continue
}
for {
if ctxTTL.Err() == context.DeadlineExceeded {
break
}
var subResp GetResponse
if err := json.NewDecoder(str).Decode(&resp); err != nil {
continue
}
if subResp.Found {
for k, v := range subResp.Records {
if _, ok := resp.Records[k]; !ok {
resp.Records[k] = v
}
}
break
}
}
}
}
resp.Records[rec.PeerID] = rec
if srec, ok := streams[req.PeerID]; ok {
srec.DID = rec.DID
srec.Record = rec
srec.HeartbeatStream.UptimeTracker.LastSeen = time.Now().UTC()
}
}
// Not found
_ = json.NewEncoder(s).Encode(resp)
ix.StreamMU.Unlock()
break // response... so quit
}
}

View File

@@ -6,8 +6,9 @@ import (
"sync"
oclib "cloud.o-forge.io/core/oc-lib"
pp "cloud.o-forge.io/core/oc-lib/models/peer"
dht "github.com/libp2p/go-libp2p-kad-dht"
pubsub "github.com/libp2p/go-libp2p-pubsub"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
)
@@ -16,9 +17,13 @@ import (
type IndexerService struct {
*common.LongLivedStreamRecordedService[PeerRecord]
PS *pubsub.PubSub
DHT *dht.IpfsDHT
isStrictIndexer bool
mu sync.RWMutex
DisposedPeers map[peer.ID]*common.TopicNodeActivityPub
SeenQueries map[string]bool
SeenMU sync.Mutex
}
// if a pubsub is given... indexer is also an active oc-node. If not... your a strict indexer
@@ -46,28 +51,25 @@ func NewIndexerService(h host.Host, ps *pubsub.PubSub, maxNode int) *IndexerServ
logger.Info().Msg("subscribe to decentralized search flow as strict indexer...")
ix.SubscribeToSearch(ix.PS, nil)
}
f := func(ctx context.Context, evt common.TopicNodeActivityPub, _ string) {
ix.mu.Lock()
if pid, err := peer.Decode(evt.PeerID); err == nil {
if evt.NodeActivity == pp.OFFLINE.EnumIndex() {
delete(ix.DisposedPeers, pid)
if ix.DHT, err = dht.New(
context.Background(),
ix.Host,
dht.Mode(dht.ModeServer),
dht.Validator(record.NamespacedValidator{
"node": PeerRecordValidator{},
}),
); err != nil {
return nil
}
if evt.NodeActivity == pp.ONLINE.EnumIndex() {
ix.DisposedPeers[pid] = &evt
}
}
ix.mu.Unlock()
}
ix.SubscribeToNodeActivity(ix.PS, &f) // now we subscribe to a long run topic named node-activity, to relay message.
ix.initNodeHandler() // then listen up on every protocol expected
return ix
}
func (ix *IndexerService) Close() {
ix.DHT.Close()
ix.PS.UnregisterTopicValidator(common.TopicPubSubSearch)
for _, s := range ix.StreamRecords {
for _, ss := range s {
ss.Stream.Close()
ss.HeartbeatStream.Stream.Close()
}
}

View File

@@ -0,0 +1,61 @@
package indexer
import (
"encoding/json"
"errors"
"time"
)
type PeerRecordValidator struct{}
func (v PeerRecordValidator) Validate(key string, value []byte) error {
var rec PeerRecord
if err := json.Unmarshal(value, &rec); err != nil {
return errors.New("invalid json")
}
// PeerID must exist
if rec.PeerID == "" {
return errors.New("missing peerID")
}
// Expiry check
if rec.ExpiryDate.Before(time.Now().UTC()) {
return errors.New("record expired")
}
// Signature verification
rec2 := PeerRecord{
Name: rec.Name,
DID: rec.DID,
PubKey: rec.PubKey,
PeerID: rec.PeerID,
}
if _, err := rec2.Verify(); err != nil {
return errors.New("invalid signature")
}
return nil
}
func (v PeerRecordValidator) Select(key string, values [][]byte) (int, error) {
var newest time.Time
index := 0
for i, val := range values {
var rec PeerRecord
if err := json.Unmarshal(val, &rec); err != nil {
continue
}
if rec.ExpiryDate.After(newest) {
newest = rec.ExpiryDate
index = i
}
}
return index, nil
}

View File

@@ -37,7 +37,7 @@ type Node struct {
Peers map[pp.ID]bool
}
func InitNode(isNode bool, isIndexer bool) (*Node, error) {
func InitNode(isNode bool, isIndexer bool, isNativeIndexer bool) (*Node, error) {
if !isNode && !isIndexer {
return nil, errors.New("wait... what ? your node need to at least something. Retry we can't be friend in that case")
}
@@ -101,28 +101,6 @@ func InitNode(isNode bool, isIndexer bool) (*Node, error) {
}
}
node.SubscribeToSearch(node.PS, &f)
ff := func(ctx context.Context, evt common.TopicNodeActivityPub, _ string) {
node.Mu.Lock()
defer node.Mu.Unlock()
if pid, err := pp.Decode(evt.PeerID); err == nil {
if _, ok := node.Peers[pid]; !ok {
node.Peers[pid] = evt.NodeActivity == peer.ONLINE.EnumIndex()
m := map[string]interface{}{
"id": evt.DID,
"state": evt.NodeActivity,
}
if b, err := json.Marshal(m); err == nil {
go tools.NewNATSCaller().SetNATSPub(tools.CREATE_PEER, tools.NATSResponse{
FromApp: "oc-discovery",
Datatype: tools.PEER,
Method: int(tools.CREATE_PEER),
Payload: b,
})
}
}
}
}
node.SubscribeToNodeActivity(node.PS, &ff)
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
peers := access.LoadAll(false)
for _, p := range peers.Data { // fill cache.
@@ -157,12 +135,10 @@ func (d *Node) publishPeerRecord(
if err != nil {
return err
}
if common.StreamIndexers[common.ProtocolPublish] == nil {
return errors.New("no protocol Publish is set up on the node")
}
for _, ad := range common.StaticIndexers {
if common.StreamIndexers[common.ProtocolPublish][ad.ID] == nil {
return errors.New("no protocol Publish for peer " + ad.ID.String() + " is set up on the node")
var err error
if common.StreamIndexers, err = common.TempStream(d.Host, *ad, common.ProtocolPublish, "", common.StreamIndexers, &common.StreamMuIndexes); err != nil {
continue
}
stream := common.StreamIndexers[common.ProtocolPublish][ad.ID]
base := indexer.PeerRecord{
@@ -176,7 +152,6 @@ func (d *Node) publishPeerRecord(
rec.ExpiryDate = base.ExpiryDate
rec.Signature, err = priv.Sign(hash[:])
rec.TTL = 2
if err := json.NewEncoder(stream.Stream).Encode(&rec); err != nil { // then publish on stream
return err
}
@@ -186,22 +161,33 @@ func (d *Node) publishPeerRecord(
func (d *Node) GetPeerRecord(
ctx context.Context,
key string,
pidOrdid string,
) ([]*peer.Peer, error) {
did := pidOrdid // if known pidOrdid is did
pid := pidOrdid // if not known pidOrdid is pid
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
if data := access.Search(nil, did, true); len(data.Data) > 0 {
did = data.Data[0].GetID()
pid = data.Data[0].(*peer.Peer).PeerID
}
var err error
var info map[string]indexer.PeerRecord
if common.StreamIndexers[common.ProtocolPublish] == nil {
return nil, errors.New("no protocol Publish is set up on the node")
}
for _, ad := range common.StaticIndexers {
if common.StreamIndexers[common.ProtocolPublish][ad.ID] == nil {
return nil, errors.New("no protocol Publish for peer " + ad.ID.String() + " is set up on the node")
if common.StreamIndexers, err = common.TempStream(d.Host, *ad, common.ProtocolGet, "",
common.StreamIndexers, &common.StreamMuIndexes); err != nil {
continue
}
stream := common.StreamIndexers[common.ProtocolPublish][ad.ID]
if err := json.NewEncoder(stream.Stream).Encode(indexer.GetValue{Key: key}); err != nil {
pidR, err := pp.Decode(pid)
if err != nil {
continue
}
stream := common.StreamIndexers[common.ProtocolGet][ad.ID]
if err := json.NewEncoder(stream.Stream).Encode(indexer.GetValue{
Key: did,
PeerID: pidR,
}); err != nil {
return nil, err
}
for {
var resp indexer.GetResponse
if err := json.NewDecoder(stream.Stream).Decode(&resp); err != nil {
@@ -217,7 +203,7 @@ func (d *Node) GetPeerRecord(
for _, pr := range info {
if pk, err := pr.Verify(); err != nil {
return nil, err
} else if ok, p, err := pr.ExtractPeer(d.PeerID.String(), key, pk); err != nil {
} else if ok, p, err := pr.ExtractPeer(d.PeerID.String(), did, pk); err != nil {
return nil, err
} else {
if ok {

View File

@@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"oc-discovery/daemons/node/common"
"strings"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/peer"
@@ -18,18 +17,9 @@ type Verify struct {
IsVerified bool `json:"is_verified"`
}
func (ps *StreamService) getTopicName(topicName string) string {
ns := strings.Split(topicName, ".")
if len(ns) > 0 {
return ns[0]
}
return tools.NONE.String()
}
func (ps *StreamService) handleEvent(topicName string, evt *common.Event) error {
action := ps.getTopicName(topicName)
ps.handleEventFromPartner(evt, action)
if action == "verify_resource" {
func (ps *StreamService) handleEvent(protocol string, evt *common.Event) error {
ps.handleEventFromPartner(evt, protocol)
if protocol == ProtocolVerifyResource {
if evt.DataType == -1 {
tools.NewNATSCaller().SetNATSPub(tools.VERIFY_RESOURCE, tools.NATSResponse{
FromApp: "oc-discovery",
@@ -40,12 +30,12 @@ func (ps *StreamService) handleEvent(topicName string, evt *common.Event) error
return err
}
}
if action == tools.PB_SEARCH_RESPONSE.String() {
if protocol == ProtocolSearchResource && evt.DataType > -1 {
if err := ps.retrieveResponse(evt); err != nil {
return err
}
}
return errors.New("no action authorized available : " + action)
return errors.New("no action authorized available : " + protocol)
}
func (abs *StreamService) verifyResponse(event *common.Event) error { //
@@ -86,7 +76,7 @@ func (abs *StreamService) retrieveResponse(event *common.Event) error { //
return nil
}
func (ps *StreamService) handleEventFromPartner(evt *common.Event, action string) error {
func (ps *StreamService) handleEventFromPartner(evt *common.Event, protocol string) error {
resource, err := resources.ToResource(int(evt.DataType), evt.Payload)
if err != nil {
return err
@@ -95,8 +85,9 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, action string
if err != nil {
return err
}
switch action {
case tools.PB_SEARCH.String():
switch protocol {
case ProtocolSearchResource:
if evt.DataType < 0 {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
peers := access.Search(nil, evt.From, false)
if len(peers.Data) > 0 {
@@ -106,15 +97,16 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, action string
} else if p, err := ps.Node.GetPeerRecord(context.Background(), evt.From); err == nil && len(p) > 0 { // peer from is peerID
ps.SendResponse(p[0], evt)
}
case tools.PB_CREATE.String():
case tools.PB_UPDATE.String():
}
case ProtocolCreateResource:
case ProtocolUpdateResource:
go tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{
FromApp: "oc-discovery",
Datatype: tools.DataType(evt.DataType),
Method: int(tools.CREATE_RESOURCE),
Payload: b,
})
case tools.PB_DELETE.String():
case ProtocolDeleteResource:
go tools.NewNATSCaller().SetNATSPub(tools.REMOVE_RESOURCE, tools.NATSResponse{
FromApp: "oc-discovery",
Datatype: tools.DataType(evt.DataType),
@@ -122,7 +114,7 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, action string
Payload: b,
})
default:
return errors.New("no action authorized available : " + action)
return errors.New("no action authorized available : " + protocol)
}
return nil
}
@@ -130,8 +122,12 @@ func (ps *StreamService) handleEventFromPartner(evt *common.Event, action string
func (abs *StreamService) SendResponse(p *peer.Peer, event *common.Event) error {
dts := []oclib.LibDataEnum{oclib.LibDataEnum(event.DataType)}
if event.DataType == -1 { // expect all resources
dts = []oclib.LibDataEnum{oclib.LibDataEnum(oclib.COMPUTE_RESOURCE), oclib.LibDataEnum(oclib.STORAGE_RESOURCE),
oclib.LibDataEnum(oclib.PROCESSING_RESOURCE), oclib.LibDataEnum(oclib.DATA_RESOURCE), oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE)}
dts = []oclib.LibDataEnum{
oclib.LibDataEnum(oclib.COMPUTE_RESOURCE),
oclib.LibDataEnum(oclib.STORAGE_RESOURCE),
oclib.LibDataEnum(oclib.PROCESSING_RESOURCE),
oclib.LibDataEnum(oclib.DATA_RESOURCE),
oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE)}
}
var m map[string]string
err := json.Unmarshal(event.Payload, &m)

View File

@@ -6,13 +6,11 @@ import (
"errors"
"fmt"
"oc-discovery/daemons/node/common"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/libp2p/go-libp2p/core/network"
pp "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
)
@@ -27,7 +25,7 @@ func (ps *StreamService) PublishVerifyResources(dt *tools.DataType, user string,
if err != nil {
return nil, err
}
return ps.write("verify_resource", toPeerID, ad, dt, user, resource, ProtocolVerifyResource, p.Data.(*peer.Peer).Relation == peer.PARTNER)
return ps.write(toPeerID, ad, dt, user, resource, ProtocolVerifyResource)
}
}
@@ -41,7 +39,7 @@ func (ps *StreamService) PublishResources(dt *tools.DataType, user string, toPee
if err != nil {
return err
}
ps.write(tools.PB_SEARCH.String(), toPeerID, ad, dt, user, resource, ProtocolSearchResource, p.Data.(*peer.Peer).Relation == peer.PARTNER)
ps.write(toPeerID, ad, dt, user, resource, ProtocolSearchResource)
}
return nil
}
@@ -69,7 +67,7 @@ func (ps *StreamService) SearchKnownPublishEvent(dt *tools.DataType, user string
if err != nil {
continue
}
ps.write(tools.PB_SEARCH.String(), p.GetID(), ad, dt, user, b, ProtocolSearchResource, p.(*peer.Peer).Relation == peer.PARTNER)
ps.write(p.GetID(), ad, dt, user, b, ProtocolSearchResource)
}
}
return nil
@@ -88,7 +86,7 @@ func (ps *StreamService) SearchPartnersPublishEvent(dt *tools.DataType, user str
if err != nil {
continue
}
ps.write(tools.PB_SEARCH.String(), p.GetID(), ad, dt, user, b, ProtocolSearchResource, true)
ps.write(p.GetID(), ad, dt, user, b, ProtocolSearchResource)
}
}
return nil
@@ -125,12 +123,12 @@ func (ps *StreamService) ToPartnerPublishEvent(
return err
} else {
for _, p := range peers {
for _, protocol := range protocols {
for protocol := range protocolsPartners {
ad, err := pp.AddrInfoFromString(p.StreamAddress)
if err != nil {
continue
}
ps.write(action.String(), p.GetID(), ad, dt, user, payload, protocol, true)
ps.write(p.GetID(), ad, dt, user, payload, protocol)
}
}
}
@@ -138,61 +136,22 @@ func (ps *StreamService) ToPartnerPublishEvent(
}
func (s *StreamService) write(
action string,
did string,
peerID *pp.AddrInfo,
dt *tools.DataType,
user string,
payload []byte,
proto protocol.ID,
isAPartner bool) (*common.Stream, error) {
proto protocol.ID) (*common.Stream, error) {
logger := oclib.GetLogger()
name := action + "#" + peerID.ID.String()
if dt != nil {
name = action + "." + (*dt).String() + "#" + peerID.ID.String()
}
s.Mu.Lock()
defer s.Mu.Unlock()
if s.Streams[proto] == nil {
s.Streams[proto] = map[pp.ID]*common.Stream{}
}
if s.Streams[proto][peerID.ID] == nil {
var err error
// should create a very temp stream
ctxTTL, err := context.WithTimeout(context.Background(), 60*time.Second)
if err == nil {
if isAPartner {
ctxTTL = context.Background()
}
if s.Host.Network().Connectedness(peerID.ID) != network.Connected {
_ = s.Host.Connect(ctxTTL, *peerID)
str, err := s.Host.NewStream(ctxTTL, peerID.ID, ProtocolHeartbeatPartner)
if err == nil {
s.Streams[ProtocolHeartbeatPartner][peerID.ID] = &common.Stream{
DID: did,
Stream: str,
Expiry: time.Now().UTC().Add(5 * time.Second),
}
str2, err := s.Host.NewStream(ctxTTL, peerID.ID, proto)
if err == nil {
s.Streams[proto][peerID.ID] = &common.Stream{
DID: did,
Stream: str2,
Expiry: time.Now().UTC().Add(5 * time.Second),
}
}
}
}
}
if s.Streams, err = common.TempStream(s.Host, *peerID, proto, did, s.Streams, &s.Mu); err != nil {
return nil, errors.New("no stream available for protocol " + fmt.Sprintf("%v", proto) + " from PID " + peerID.ID.String())
}
stream := s.Streams[proto][peerID.ID]
enc := json.NewEncoder(stream.Stream)
evt := common.NewEvent(name, peerID.ID.String(), dt, user, payload)
if err := enc.Encode(evt); err != nil {
evt := common.NewEvent(string(proto), peerID.ID.String(), dt, user, payload)
if err := json.NewEncoder(stream.Stream).Encode(evt); err != nil {
stream.Stream.Close()
logger.Err(err)
return stream, nil

View File

@@ -28,15 +28,22 @@ const ProtocolUpdateResource = "/opencloud/resource/update/1.0"
const ProtocolDeleteResource = "/opencloud/resource/delete/1.0"
const ProtocolVerifyResource = "/opencloud/resource/verify/1.0"
const ProtocolHeartbeatPartner = "/opencloud/resource/heartbeat/partner/1.0"
var protocols = []protocol.ID{
ProtocolSearchResource,
ProtocolCreateResource,
ProtocolUpdateResource,
ProtocolDeleteResource,
ProtocolVerifyResource,
type ProtocolInfo struct {
PersistantStream bool
WaitResponse bool
}
var protocols = map[protocol.ID]*ProtocolInfo{
ProtocolSearchResource: {WaitResponse: true},
ProtocolVerifyResource: {WaitResponse: true},
}
var protocolsPartners = map[protocol.ID]*ProtocolInfo{
ProtocolCreateResource: {},
ProtocolUpdateResource: {},
ProtocolDeleteResource: {},
}
type StreamService struct {
@@ -60,12 +67,32 @@ func InitStream(ctx context.Context, h host.Host, key pp.ID, maxNode int, node c
}
logger.Info().Msg("handle to partner heartbeat protocol...")
service.Host.SetStreamHandler(ProtocolHeartbeatPartner, service.HandlePartnerHeartbeat)
for proto := range protocols {
service.Host.SetStreamHandler(proto, service.HandleResponse)
}
logger.Info().Msg("connect to partners...")
service.connectToPartners() // we set up a stream
go service.StartGC(30 * time.Second)
go service.StartGC(8 * time.Second)
return service, nil
}
func (s *StreamService) HandleResponse(stream network.Stream) {
s.Mu.Lock()
stream.Protocol()
if s.Streams[stream.Protocol()] == nil {
s.Streams[stream.Protocol()] = map[pp.ID]*common.Stream{}
}
s.Streams[stream.Protocol()][stream.Conn().RemotePeer()] = &common.Stream{
Stream: stream,
Expiry: time.Now().UTC().Add(1 * time.Minute),
}
s.Mu.Unlock()
go s.readLoop(s.Streams[stream.Protocol()][stream.Conn().RemotePeer()],
stream.Conn().RemotePeer(),
stream.Protocol(), protocols[stream.Protocol()])
}
func (s *StreamService) HandlePartnerHeartbeat(stream network.Stream) {
s.Mu.Lock()
if s.Streams[ProtocolHeartbeatPartner] == nil {
@@ -86,7 +113,7 @@ func (s *StreamService) HandlePartnerHeartbeat(stream network.Stream) {
// if record already seen update last seen
if rec, ok := streams[*pid]; ok {
rec.DID = hb.DID
rec.Expiry = time.Now().UTC().Add(2 * time.Minute)
rec.Expiry = time.Now().UTC().Add(10 * time.Second)
} else { // if not in stream ?
val, err := stream.Conn().RemoteMultiaddr().ValueForProtocol(ma.P_IP4)
if err == nil {
@@ -97,16 +124,16 @@ func (s *StreamService) HandlePartnerHeartbeat(stream network.Stream) {
}
func (s *StreamService) connectToPartners() error {
for _, proto := range protocols {
for proto, info := range protocolsPartners {
f := func(ss network.Stream) {
if s.Streams[proto] == nil {
s.Streams[proto] = map[pp.ID]*common.Stream{}
}
s.Streams[proto][ss.Conn().RemotePeer()] = &common.Stream{
Stream: ss,
Expiry: time.Now().UTC().Add(2 * time.Minute),
Expiry: time.Now().UTC().Add(10 * time.Second),
}
go s.readLoop(s.Streams[proto][ss.Conn().RemotePeer()])
go s.readLoop(s.Streams[proto][ss.Conn().RemotePeer()], ss.Conn().RemotePeer(), proto, info)
}
fmt.Println("SetStreamHandler", proto)
s.Host.SetStreamHandler(proto, f)
@@ -117,42 +144,16 @@ func (s *StreamService) connectToPartners() error {
}
for _, p := range peers {
s.ConnectToPartner(p.StreamAddress)
// heartbeat your partner.
}
// TODO if handle... from partner then HeartBeat back
return nil
}
func (s *StreamService) ConnectToPartner(address string) {
ad, err := pp.AddrInfoFromString(address)
if err != nil {
return
}
logger := oclib.GetLogger()
force := false
for _, proto := range protocols {
f := func(ss network.Stream) {
if s.Streams[proto] == nil {
s.Streams[proto] = map[pp.ID]*common.Stream{}
}
s.Streams[proto][ad.ID] = &common.Stream{
Stream: ss,
Expiry: time.Now().UTC().Add(2 * time.Minute),
}
go s.readLoop(s.Streams[proto][ad.ID])
}
if s.Host.Network().Connectedness(ad.ID) != network.Connected {
force = true
if err := s.Host.Connect(context.Background(), *ad); err != nil {
logger.Err(err)
continue
}
}
s.Streams = common.AddStreamProtocol(nil, s.Streams, s.Host, proto, ad.ID, s.Key, force, &f)
}
if ad, err := pp.AddrInfoFromString(address); err == nil {
common.SendHeartbeat(context.Background(), ProtocolHeartbeatPartner, conf.GetConfig().Name,
s.Host, s.Streams, map[string]*pp.AddrInfo{address: ad}, 20*time.Second)
}
}
func (s *StreamService) searchPeer(search string) ([]*peer.Peer, error) {
/* TODO FOR TEST ONLY A VARS THAT DEFINE ADDRESS... deserialize */
@@ -221,15 +222,32 @@ func (s *StreamService) gc() {
}
}
func (ps *StreamService) readLoop(s *common.Stream) {
func (ps *StreamService) readLoop(s *common.Stream, id pp.ID, proto protocol.ID, protocolInfo *ProtocolInfo) {
defer s.Stream.Close()
defer func() {
ps.Mu.Lock()
defer ps.Mu.Unlock()
delete(ps.Streams[proto], id)
}()
loop := true
if !protocolInfo.PersistantStream && !protocolInfo.WaitResponse { // 2 sec is enough... to wait a response
time.AfterFunc(2*time.Second, func() {
loop = false
})
}
for {
if !loop {
break
}
var evt common.Event
if err := json.NewDecoder(s.Stream).Decode(&evt); err != nil {
s.Stream.Close()
continue
}
ps.handleEvent(evt.Type, &evt)
if protocolInfo.WaitResponse && !protocolInfo.PersistantStream {
break
}
}
}

View File

@@ -42,8 +42,9 @@ func main() {
fmt.Println(conf.GetConfig().NodeMode)
isNode := strings.Contains(conf.GetConfig().NodeMode, "node")
isIndexer := strings.Contains(conf.GetConfig().NodeMode, "indexer")
isNativeIndexer := strings.Contains(conf.GetConfig().NodeMode, "native-indexer")
if n, err := node.InitNode(isNode, isIndexer); err != nil {
if n, err := node.InitNode(isNode, isIndexer, isNativeIndexer); err != nil {
panic(err)
} else {
<-ctx.Done() // 👈 the only blocking point