Pass + Doc
This commit is contained in:
@@ -469,8 +469,18 @@ func SendHeartbeat(ctx context.Context, proto protocol.ID, name string, h host.H
|
||||
}()
|
||||
}
|
||||
|
||||
func TempStream(h host.Host, ad pp.AddrInfo, proto protocol.ID, did string, streams ProtocolStream, mu *sync.RWMutex) (ProtocolStream, error) {
|
||||
if ctxTTL, err := context.WithTimeout(context.Background(), 2*time.Second); err == nil {
|
||||
type ProtocolInfo struct {
|
||||
PersistantStream bool
|
||||
WaitResponse bool
|
||||
TTL time.Duration
|
||||
}
|
||||
|
||||
func TempStream(h host.Host, ad pp.AddrInfo, proto protocol.ID, did string, streams ProtocolStream, pts map[protocol.ID]*ProtocolInfo, mu *sync.RWMutex) (ProtocolStream, error) {
|
||||
expiry := 2 * time.Second
|
||||
if pts[proto] != nil {
|
||||
expiry = pts[proto].TTL
|
||||
}
|
||||
if ctxTTL, err := context.WithTimeout(context.Background(), expiry); err == nil {
|
||||
if h.Network().Connectedness(ad.ID) != network.Connected {
|
||||
if err := h.Connect(ctxTTL, ad); err != nil {
|
||||
return streams, err
|
||||
@@ -484,7 +494,7 @@ func TempStream(h host.Host, ad pp.AddrInfo, proto protocol.ID, did string, stre
|
||||
streams[proto] = map[pp.ID]*Stream{}
|
||||
}
|
||||
mu.Unlock()
|
||||
time.AfterFunc(2*time.Second, func() {
|
||||
time.AfterFunc(expiry, func() {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
delete(streams[proto], ad.ID)
|
||||
@@ -492,7 +502,7 @@ func TempStream(h host.Host, ad pp.AddrInfo, proto protocol.ID, did string, stre
|
||||
streams[ProtocolPublish][ad.ID] = &Stream{
|
||||
DID: did,
|
||||
Stream: s,
|
||||
Expiry: time.Now().UTC().Add(2 * time.Second),
|
||||
Expiry: time.Now().UTC().Add(expiry),
|
||||
}
|
||||
mu.Unlock()
|
||||
return streams, nil
|
||||
|
||||
@@ -5,14 +5,24 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"oc-discovery/daemons/node/common"
|
||||
"oc-discovery/daemons/node/stream"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/config"
|
||||
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
pp "github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
)
|
||||
|
||||
type configPayload struct {
|
||||
PeerID string `json:"source_peer_id"`
|
||||
}
|
||||
|
||||
type executionConsidersPayload struct {
|
||||
PeerIDs []string `json:"peer_ids"`
|
||||
}
|
||||
|
||||
func ListenNATS(n *Node) {
|
||||
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
|
||||
/*tools.VERIFY_RESOURCE: func(resp tools.NATSResponse) {
|
||||
@@ -46,7 +56,7 @@ func ListenNATS(n *Node) {
|
||||
}
|
||||
},*/
|
||||
tools.CREATE_RESOURCE: func(resp tools.NATSResponse) {
|
||||
if resp.FromApp == config.GetAppName() && resp.Datatype != tools.PEER {
|
||||
if resp.FromApp == config.GetAppName() && resp.Datatype != tools.PEER && resp.Datatype != tools.WORKFLOW {
|
||||
return
|
||||
}
|
||||
logger := oclib.GetLogger()
|
||||
@@ -86,6 +96,9 @@ func ListenNATS(n *Node) {
|
||||
|
||||
},
|
||||
tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) {
|
||||
if resp.FromApp == config.GetAppName() {
|
||||
return
|
||||
}
|
||||
var propalgation tools.PropalgationMessage
|
||||
err := json.Unmarshal(resp.Payload, &propalgation)
|
||||
var dt *tools.DataType
|
||||
@@ -95,6 +108,20 @@ func ListenNATS(n *Node) {
|
||||
}
|
||||
if err == nil {
|
||||
switch propalgation.Action {
|
||||
case tools.PB_ADMIRALTY_CONFIG:
|
||||
case tools.PB_MINIO_CONFIG:
|
||||
var m configPayload
|
||||
var proto protocol.ID = stream.ProtocolAdmiraltyConfigResource
|
||||
if propalgation.Action == tools.PB_MINIO_CONFIG {
|
||||
proto = stream.ProtocolMinioConfigResource
|
||||
}
|
||||
if err := json.Unmarshal(resp.Payload, &m); err == nil {
|
||||
peers, _ := n.GetPeerRecord(context.Background(), m.PeerID)
|
||||
for _, p := range peers {
|
||||
n.StreamService.PublishCommon(&resp.Datatype, resp.User,
|
||||
p.PeerID, proto, resp.Payload)
|
||||
}
|
||||
}
|
||||
case tools.PB_CREATE:
|
||||
case tools.PB_UPDATE:
|
||||
case tools.PB_DELETE:
|
||||
@@ -104,16 +131,83 @@ func ListenNATS(n *Node) {
|
||||
dt, resp.User,
|
||||
propalgation.Payload,
|
||||
)
|
||||
case tools.PB_SEARCH:
|
||||
case tools.PB_CONSIDERS:
|
||||
switch resp.Datatype {
|
||||
case tools.BOOKING:
|
||||
case tools.PURCHASE_RESOURCE:
|
||||
case tools.WORKFLOW_EXECUTION:
|
||||
var m executionConsidersPayload
|
||||
if err := json.Unmarshal(resp.Payload, &m); err == nil {
|
||||
for _, p := range m.PeerIDs {
|
||||
peers, _ := n.GetPeerRecord(context.Background(), p)
|
||||
for _, pp := range peers {
|
||||
n.StreamService.PublishCommon(&resp.Datatype, resp.User,
|
||||
pp.PeerID, stream.ProtocolConsidersResource, resp.Payload)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
case tools.PB_PLANNER:
|
||||
m := map[string]interface{}{}
|
||||
json.Unmarshal(propalgation.Payload, &m)
|
||||
n.PubSubService.SearchPublishEvent(
|
||||
context.Background(),
|
||||
dt,
|
||||
fmt.Sprintf("%v", m["type"]),
|
||||
resp.User,
|
||||
fmt.Sprintf("%v", m["search"]),
|
||||
)
|
||||
if err := json.Unmarshal(resp.Payload, &m); err == nil {
|
||||
b := []byte{}
|
||||
if len(m) > 1 {
|
||||
b = resp.Payload
|
||||
}
|
||||
if m["peer_id"] == nil { // send to every active stream
|
||||
n.StreamService.Mu.Lock()
|
||||
if n.StreamService.Streams[stream.ProtocolSendPlanner] != nil {
|
||||
for pid := range n.StreamService.Streams[stream.ProtocolSendPlanner] {
|
||||
n.StreamService.PublishCommon(nil, resp.User, pid.String(), stream.ProtocolSendPlanner, b)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
n.StreamService.PublishCommon(nil, resp.User, fmt.Sprintf("%v", m["peer_id"]), stream.ProtocolSendPlanner, b)
|
||||
}
|
||||
n.StreamService.Mu.Unlock()
|
||||
}
|
||||
case tools.PB_CLOSE_PLANNER:
|
||||
m := map[string]interface{}{}
|
||||
if err := json.Unmarshal(resp.Payload, &m); err == nil {
|
||||
n.StreamService.Mu.Lock()
|
||||
if pid, err := pp.Decode(fmt.Sprintf("%v", m["peer_id"])); err == nil {
|
||||
if n.StreamService.Streams[stream.ProtocolSendPlanner] != nil && n.StreamService.Streams[stream.ProtocolSendPlanner][pid] != nil {
|
||||
n.StreamService.Streams[stream.ProtocolSendPlanner][pid].Stream.Close()
|
||||
delete(n.StreamService.Streams[stream.ProtocolSendPlanner], pid)
|
||||
}
|
||||
}
|
||||
n.StreamService.Mu.Unlock()
|
||||
}
|
||||
case tools.PB_SEARCH:
|
||||
if propalgation.DataType == int(tools.PEER) {
|
||||
m := map[string]interface{}{}
|
||||
if err := json.Unmarshal(propalgation.Payload, &m); err == nil {
|
||||
if peers, err := n.GetPeerRecord(context.Background(), fmt.Sprintf("%v", m["search"])); err == nil {
|
||||
for _, p := range peers {
|
||||
if b, err := json.Marshal(p); err == nil {
|
||||
go tools.NewNATSCaller().SetNATSPub(tools.SEARCH_EVENT, tools.NATSResponse{
|
||||
FromApp: "oc-discovery",
|
||||
Datatype: tools.DataType(tools.PEER),
|
||||
Method: int(tools.SEARCH_EVENT),
|
||||
Payload: b,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
m := map[string]interface{}{}
|
||||
if err := json.Unmarshal(propalgation.Payload, &m); err == nil {
|
||||
n.PubSubService.SearchPublishEvent(
|
||||
context.Background(),
|
||||
dt,
|
||||
fmt.Sprintf("%v", m["type"]),
|
||||
resp.User,
|
||||
fmt.Sprintf("%v", m["search"]),
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
@@ -22,6 +22,7 @@ import (
|
||||
pubsubs "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/libp2p/go-libp2p/core/crypto"
|
||||
pp "github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
)
|
||||
|
||||
type Node struct {
|
||||
@@ -128,7 +129,8 @@ func (d *Node) publishPeerRecord(
|
||||
}
|
||||
for _, ad := range common.StaticIndexers {
|
||||
var err error
|
||||
if common.StreamIndexers, err = common.TempStream(d.Host, *ad, common.ProtocolPublish, "", common.StreamIndexers, &common.StreamMuIndexes); err != nil {
|
||||
if common.StreamIndexers, err = common.TempStream(d.Host, *ad, common.ProtocolPublish, "", common.StreamIndexers, map[protocol.ID]*common.ProtocolInfo{},
|
||||
&common.StreamMuIndexes); err != nil {
|
||||
continue
|
||||
}
|
||||
stream := common.StreamIndexers[common.ProtocolPublish][ad.ID]
|
||||
@@ -165,7 +167,7 @@ func (d *Node) GetPeerRecord(
|
||||
var info map[string]indexer.PeerRecord
|
||||
for _, ad := range common.StaticIndexers {
|
||||
if common.StreamIndexers, err = common.TempStream(d.Host, *ad, common.ProtocolGet, "",
|
||||
common.StreamIndexers, &common.StreamMuIndexes); err != nil {
|
||||
common.StreamIndexers, map[protocol.ID]*common.ProtocolInfo{}, &common.StreamMuIndexes); err != nil {
|
||||
continue
|
||||
}
|
||||
pidR, err := pp.Decode(pid)
|
||||
|
||||
@@ -20,7 +20,7 @@ func (ps *PubSubService) handleEventSearch( // only : on partner followings. 3 c
|
||||
evt *common.Event,
|
||||
action tools.PubSubAction,
|
||||
) error {
|
||||
if !(action == tools.PB_SEARCH_RESPONSE || action == tools.PB_SEARCH) {
|
||||
if !(action == tools.PB_SEARCH) {
|
||||
return nil
|
||||
}
|
||||
if p, err := ps.Node.GetPeerRecord(ctx, evt.From); err == nil && len(p) > 0 { // peerFrom is Unique
|
||||
@@ -32,7 +32,6 @@ func (ps *PubSubService) handleEventSearch( // only : on partner followings. 3 c
|
||||
if err := ps.StreamService.SendResponse(p[0], evt); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -30,25 +30,12 @@ func (ps *PubSubService) SearchPublishEvent(
|
||||
|
||||
func (ps *PubSubService) searchPublishEvent(
|
||||
ctx context.Context, dt *tools.DataType, user string, payload []byte) error {
|
||||
id, err := oclib.GenerateNodeID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ps.subscribeEvents(ctx, dt, tools.PB_SEARCH_RESPONSE, id, 60); err != nil { // TODO Catpure Event !
|
||||
return err
|
||||
}
|
||||
return ps.publishEvent(ctx, dt, tools.PB_SEARCH, user, "", payload, false)
|
||||
return ps.publishEvent(ctx, dt, tools.PB_SEARCH, user, payload)
|
||||
}
|
||||
|
||||
func (ps *PubSubService) publishEvent(
|
||||
ctx context.Context, dt *tools.DataType, action tools.PubSubAction, user string,
|
||||
peerID string, payload []byte, chanNamedByDt bool,
|
||||
ctx context.Context, dt *tools.DataType, action tools.PubSubAction, user string, payload []byte,
|
||||
) error {
|
||||
name := action.String() + "#" + peerID
|
||||
if chanNamedByDt && dt != nil { // if a datatype is precised then : app.action.datatype#peerID
|
||||
name = action.String() + "." + (*dt).String() + "#" + peerID
|
||||
}
|
||||
|
||||
from, err := oclib.GenerateNodeID()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -57,8 +44,8 @@ func (ps *PubSubService) publishEvent(
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
msg, _ := json.Marshal(models.NewEvent(name, from, dt, user, payload, priv))
|
||||
topic, err := ps.PS.Join(name)
|
||||
msg, _ := json.Marshal(models.NewEvent(action.String(), from, dt, user, payload, priv))
|
||||
topic, err := ps.PS.Join(action.String())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
)
|
||||
|
||||
func (ps *PubSubService) initSubscribeEvents(ctx context.Context) error {
|
||||
if err := ps.subscribeEvents(ctx, nil, tools.PB_SEARCH, "", -1); err != nil {
|
||||
if err := ps.subscribeEvents(ctx, nil, tools.PB_SEARCH, ""); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
@@ -18,7 +18,7 @@ func (ps *PubSubService) initSubscribeEvents(ctx context.Context) error {
|
||||
|
||||
// generic function to subscribe to DHT flow of event
|
||||
func (ps *PubSubService) subscribeEvents(
|
||||
ctx context.Context, dt *tools.DataType, action tools.PubSubAction, peerID string, timeout int,
|
||||
ctx context.Context, dt *tools.DataType, action tools.PubSubAction, peerID string,
|
||||
) error {
|
||||
logger := oclib.GetLogger()
|
||||
// define a name app.action#peerID
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"oc-discovery/daemons/node/common"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/models/booking/planner"
|
||||
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
@@ -30,11 +31,31 @@ func (ps *StreamService) handleEvent(protocol string, evt *common.Event) error {
|
||||
return err
|
||||
}
|
||||
}*/
|
||||
if protocol == ProtocolSendPlanner {
|
||||
if err := ps.sendPlanner(evt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if protocol == ProtocolSearchResource && evt.DataType > -1 {
|
||||
if err := ps.retrieveResponse(evt); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if protocol == ProtocolConsidersResource && evt.DataType > -1 {
|
||||
if err := ps.pass(evt, tools.PB_CONSIDERS); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if protocol == ProtocolAdmiraltyConfigResource {
|
||||
if err := ps.pass(evt, tools.PB_ADMIRALTY_CONFIG); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if protocol == ProtocolMinioConfigResource {
|
||||
if err := ps.pass(evt, tools.PB_MINIO_CONFIG); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return errors.New("no action authorized available : " + protocol)
|
||||
}
|
||||
|
||||
@@ -56,7 +77,41 @@ func (abs *StreamService) verifyResponse(event *common.Event) error { //
|
||||
}
|
||||
}
|
||||
if b, err := json.Marshal(verify); err == nil {
|
||||
abs.PublishVerifyResources(nil, "", event.From, b)
|
||||
abs.PublishCommon(nil, "", event.From, ProtocolVerifyResource, b)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (abs *StreamService) sendPlanner(event *common.Event) error { //
|
||||
if len(event.Payload) == 0 {
|
||||
if plan, err := planner.GenerateShallow(&tools.APIRequest{Admin: true}); err == nil {
|
||||
if b, err := json.Marshal(plan); err == nil {
|
||||
abs.PublishCommon(nil, event.User, event.From, ProtocolSendPlanner, b)
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
m := map[string]interface{}{}
|
||||
if err := json.Unmarshal(event.Payload, &m); err == nil {
|
||||
m["peer_id"] = event.From
|
||||
if pl, err := json.Marshal(m); err == nil {
|
||||
if b, err := json.Marshal(tools.PropalgationMessage{
|
||||
DataType: -1,
|
||||
Action: tools.PB_PLANNER,
|
||||
Payload: pl,
|
||||
}); err == nil {
|
||||
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
|
||||
FromApp: "oc-discovery",
|
||||
Datatype: tools.DataType(oclib.BOOKING),
|
||||
Method: int(tools.PROPALGATION_EVENT),
|
||||
Payload: b,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -67,15 +122,31 @@ func (abs *StreamService) retrieveResponse(event *common.Event) error { //
|
||||
return nil
|
||||
}
|
||||
b, err := json.Marshal(res.Serialize(res))
|
||||
go tools.NewNATSCaller().SetNATSPub(tools.CATALOG_SEARCH_EVENT, tools.NATSResponse{
|
||||
go tools.NewNATSCaller().SetNATSPub(tools.SEARCH_EVENT, tools.NATSResponse{
|
||||
FromApp: "oc-discovery",
|
||||
Datatype: tools.DataType(event.DataType),
|
||||
Method: int(tools.CATALOG_SEARCH_EVENT),
|
||||
Method: int(tools.SEARCH_EVENT),
|
||||
Payload: b,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (abs *StreamService) pass(event *common.Event, action tools.PubSubAction) error { //
|
||||
if b, err := json.Marshal(&tools.PropalgationMessage{
|
||||
Action: action,
|
||||
DataType: int(event.DataType),
|
||||
Payload: event.Payload,
|
||||
}); err == nil {
|
||||
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
|
||||
FromApp: "oc-discovery",
|
||||
Datatype: tools.DataType(event.DataType),
|
||||
Method: int(tools.PROPALGATION_EVENT),
|
||||
Payload: b,
|
||||
})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ps *StreamService) handleEventFromPartner(evt *common.Event, protocol string) error {
|
||||
resource, err := resources.ToResource(int(evt.DataType), evt.Payload)
|
||||
if err != nil {
|
||||
|
||||
@@ -15,7 +15,7 @@ import (
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
)
|
||||
|
||||
func (ps *StreamService) PublishVerifyResources(dt *tools.DataType, user string, toPeerID string, resource []byte) (*common.Stream, error) {
|
||||
func (ps *StreamService) PublishCommon(dt *tools.DataType, user string, toPeerID string, proto protocol.ID, resource []byte) (*common.Stream, error) {
|
||||
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||
p := access.LoadOne(toPeerID)
|
||||
if p.Err != "" {
|
||||
@@ -25,7 +25,7 @@ func (ps *StreamService) PublishVerifyResources(dt *tools.DataType, user string,
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ps.write(toPeerID, ad, dt, user, resource, ProtocolVerifyResource)
|
||||
return ps.write(toPeerID, ad, dt, user, resource, proto)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -144,8 +144,15 @@ func (s *StreamService) write(
|
||||
proto protocol.ID) (*common.Stream, error) {
|
||||
logger := oclib.GetLogger()
|
||||
var err error
|
||||
pts := map[protocol.ID]*common.ProtocolInfo{}
|
||||
for k, v := range protocols {
|
||||
pts[k] = v
|
||||
}
|
||||
for k, v := range protocolsPartners {
|
||||
pts[k] = v
|
||||
}
|
||||
// should create a very temp stream
|
||||
if s.Streams, err = common.TempStream(s.Host, *peerID, proto, did, s.Streams, &s.Mu); err != nil {
|
||||
if s.Streams, err = common.TempStream(s.Host, *peerID, proto, did, s.Streams, pts, &s.Mu); err != nil {
|
||||
return nil, errors.New("no stream available for protocol " + fmt.Sprintf("%v", proto) + " from PID " + peerID.ID.String())
|
||||
|
||||
}
|
||||
|
||||
@@ -22,28 +22,32 @@ import (
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
)
|
||||
|
||||
const ProtocolConsidersResource = "/opencloud/resource/considers/1.0"
|
||||
const ProtocolMinioConfigResource = "/opencloud/minio/config/1.0"
|
||||
const ProtocolAdmiraltyConfigResource = "/opencloud/admiralty/config/1.0"
|
||||
|
||||
const ProtocolSearchResource = "/opencloud/resource/search/1.0"
|
||||
const ProtocolCreateResource = "/opencloud/resource/create/1.0"
|
||||
const ProtocolUpdateResource = "/opencloud/resource/update/1.0"
|
||||
const ProtocolDeleteResource = "/opencloud/resource/delete/1.0"
|
||||
|
||||
const ProtocolSendPlanner = "/opencloud/resource/planner/1.0"
|
||||
const ProtocolVerifyResource = "/opencloud/resource/verify/1.0"
|
||||
const ProtocolHeartbeatPartner = "/opencloud/resource/heartbeat/partner/1.0"
|
||||
|
||||
type ProtocolInfo struct {
|
||||
PersistantStream bool
|
||||
WaitResponse bool
|
||||
var protocols = map[protocol.ID]*common.ProtocolInfo{
|
||||
ProtocolConsidersResource: {WaitResponse: false, TTL: 3 * time.Second},
|
||||
ProtocolSendPlanner: {WaitResponse: true, TTL: 24 * time.Hour},
|
||||
ProtocolSearchResource: {WaitResponse: true, TTL: 1 * time.Minute},
|
||||
ProtocolVerifyResource: {WaitResponse: true, TTL: 1 * time.Minute},
|
||||
ProtocolMinioConfigResource: {WaitResponse: true, TTL: 1 * time.Minute},
|
||||
ProtocolAdmiraltyConfigResource: {WaitResponse: true, TTL: 1 * time.Minute},
|
||||
}
|
||||
|
||||
var protocols = map[protocol.ID]*ProtocolInfo{
|
||||
ProtocolSearchResource: {WaitResponse: true},
|
||||
ProtocolVerifyResource: {WaitResponse: true},
|
||||
}
|
||||
|
||||
var protocolsPartners = map[protocol.ID]*ProtocolInfo{
|
||||
ProtocolCreateResource: {},
|
||||
ProtocolUpdateResource: {},
|
||||
ProtocolDeleteResource: {},
|
||||
var protocolsPartners = map[protocol.ID]*common.ProtocolInfo{
|
||||
ProtocolCreateResource: {TTL: 3 * time.Second},
|
||||
ProtocolUpdateResource: {TTL: 3 * time.Second},
|
||||
ProtocolDeleteResource: {TTL: 3 * time.Second},
|
||||
}
|
||||
|
||||
type StreamService struct {
|
||||
@@ -82,9 +86,17 @@ func (s *StreamService) HandleResponse(stream network.Stream) {
|
||||
if s.Streams[stream.Protocol()] == nil {
|
||||
s.Streams[stream.Protocol()] = map[pp.ID]*common.Stream{}
|
||||
}
|
||||
expiry := 1 * time.Minute
|
||||
|
||||
if protocols[stream.Protocol()] != nil {
|
||||
expiry = protocols[stream.Protocol()].TTL
|
||||
} else if protocolsPartners[stream.Protocol()] != nil {
|
||||
expiry = protocolsPartners[stream.Protocol()].TTL
|
||||
}
|
||||
|
||||
s.Streams[stream.Protocol()][stream.Conn().RemotePeer()] = &common.Stream{
|
||||
Stream: stream,
|
||||
Expiry: time.Now().UTC().Add(1 * time.Minute),
|
||||
Expiry: time.Now().UTC().Add(expiry + 1*time.Minute),
|
||||
}
|
||||
s.Mu.Unlock()
|
||||
|
||||
@@ -221,7 +233,7 @@ func (s *StreamService) gc() {
|
||||
}
|
||||
}
|
||||
|
||||
func (ps *StreamService) readLoop(s *common.Stream, id pp.ID, proto protocol.ID, protocolInfo *ProtocolInfo) {
|
||||
func (ps *StreamService) readLoop(s *common.Stream, id pp.ID, proto protocol.ID, protocolInfo *common.ProtocolInfo) {
|
||||
defer s.Stream.Close()
|
||||
defer func() {
|
||||
ps.Mu.Lock()
|
||||
|
||||
Reference in New Issue
Block a user