peer improvment
This commit is contained in:
@@ -8,14 +8,11 @@ import (
|
||||
)
|
||||
|
||||
// now write a go enum for the state partner with self, blacklist, partner
|
||||
|
||||
type PeerState int
|
||||
|
||||
const (
|
||||
NONE PeerState = iota
|
||||
SELF
|
||||
PARTNER
|
||||
BLACKLIST
|
||||
OFFLINE PeerState = iota
|
||||
ONLINE
|
||||
)
|
||||
|
||||
func (m PeerState) String() string {
|
||||
@@ -26,6 +23,24 @@ func (m PeerState) EnumIndex() int {
|
||||
return int(m)
|
||||
}
|
||||
|
||||
type PeerRelation int
|
||||
|
||||
const (
|
||||
NONE PeerRelation = iota
|
||||
SELF
|
||||
PARTNER
|
||||
BLACKLIST
|
||||
UNKNOWN
|
||||
)
|
||||
|
||||
func (m PeerRelation) String() string {
|
||||
return [...]string{"NONE", "SELF", "PARTNER", "BLACKLIST"}[m]
|
||||
}
|
||||
|
||||
func (m PeerRelation) EnumIndex() int {
|
||||
return int(m)
|
||||
}
|
||||
|
||||
func GetSelf() (utils.ShallowDBObject, string) {
|
||||
d, code, err := NewAccessor(nil).Search(nil, SELF.String(), false)
|
||||
if code != 200 || err != nil || len(d) == 0 {
|
||||
@@ -47,10 +62,12 @@ func IsMySelf(peerID string) (bool, string) {
|
||||
// Peer is a struct that represents a peer
|
||||
type Peer struct {
|
||||
utils.AbstractObject
|
||||
PeerID string `json:"peer_id" bson:"peer_id" validate:"required"`
|
||||
Url string `json:"url" bson:"url" validate:"required"` // Url is the URL of the peer (base64url)
|
||||
WalletAddress string `json:"wallet_address" bson:"wallet_address" validate:"required"` // WalletAddress is the wallet address of the peer
|
||||
PublicKey string `json:"public_key" bson:"public_key" validate:"required"` // PublicKey is the public key of the peer
|
||||
State PeerState `json:"state" bson:"state" default:"0"`
|
||||
Relation PeerRelation `json:"relation" bson:"state" default:"0"`
|
||||
ServicesState map[string]int `json:"services_state,omitempty" bson:"services_state,omitempty"`
|
||||
FailedExecution []PeerExecution `json:"failed_execution" bson:"failed_execution"` // FailedExecution is the list of failed executions, to be retried
|
||||
}
|
||||
|
||||
@@ -28,19 +28,19 @@ type PeerCache struct {
|
||||
}
|
||||
|
||||
// urlFormat formats the URL of the peer with the data type API function
|
||||
func (p *PeerCache) urlFormat(hostUrl string, dt tools.DataType) string {
|
||||
func urlFormat(hostUrl string, dt tools.DataType) string {
|
||||
return hostUrl + "/" + strings.ReplaceAll(dt.API(), "oc-", "")
|
||||
}
|
||||
|
||||
// checkPeerStatus checks the status of a peer
|
||||
func (p *PeerCache) checkPeerStatus(peerID string, appName string) (*Peer, bool) {
|
||||
func CheckPeerStatus(peerID string, appName string) (*Peer, bool) {
|
||||
api := tools.API{}
|
||||
access := NewShallowAccessor()
|
||||
res, code, _ := access.LoadOne(peerID) // Load the peer from db
|
||||
if code != 200 { // no peer no party
|
||||
return nil, false
|
||||
}
|
||||
url := p.urlFormat(res.(*Peer).Url, tools.PEER) + "/status" // Format the URL
|
||||
url := urlFormat(res.(*Peer).Url, tools.PEER) + "/status" // Format the URL
|
||||
state, services := api.CheckRemotePeer(url)
|
||||
res.(*Peer).ServicesState = services // Update the services states of the peer
|
||||
access.UpdateOne(res, peerID) // Update the peer in the db
|
||||
@@ -61,11 +61,11 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
|
||||
url := ""
|
||||
|
||||
// Check the status of the peer
|
||||
if mypeer, ok := p.checkPeerStatus(peerID, dt.API()); !ok && mypeer != nil {
|
||||
if mypeer, ok := CheckPeerStatus(peerID, dt.API()); !ok && mypeer != nil {
|
||||
// If the peer is not reachable, add the execution to the failed executions list
|
||||
pexec := &PeerExecution{
|
||||
Method: method.String(),
|
||||
Url: p.urlFormat((mypeer.Url), dt) + path, // the url is constitued of : host URL + resource path + action path (ex : mypeer.com/datacenter/resourcetype/path/to/action)
|
||||
Url: urlFormat((mypeer.Url), dt) + path, // the url is constitued of : host URL + resource path + action path (ex : mypeer.com/datacenter/resourcetype/path/to/action)
|
||||
Body: body,
|
||||
DataType: dt.EnumIndex(),
|
||||
DataID: dataID,
|
||||
@@ -78,7 +78,7 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
|
||||
return map[string]interface{}{}, errors.New("peer " + peerID + " not found")
|
||||
}
|
||||
// If the peer is reachable, launch the execution
|
||||
url = p.urlFormat((mypeer.Url), dt) + path // Format the URL
|
||||
url = urlFormat((mypeer.Url), dt) + path // Format the URL
|
||||
tmp := mypeer.FailedExecution // Get the failed executions list
|
||||
mypeer.FailedExecution = []PeerExecution{} // Reset the failed executions list
|
||||
NewShallowAccessor().UpdateOne(mypeer, peerID) // Update the peer in the db
|
||||
|
||||
@@ -93,6 +93,7 @@ func (a *peerMongoAccessor) GetDefaultFilter(search string) *dbs.Filters {
|
||||
Or: map[string][]dbs.Filter{ // search by name if no filters are provided
|
||||
"abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
|
||||
"url": {{Operator: dbs.LIKE.String(), Value: search}},
|
||||
"relation": {{Operator: dbs.LIKE.String(), Value: search}},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -51,7 +51,7 @@ func newTestPeer() *peer.Peer {
|
||||
Url: "http://localhost",
|
||||
WalletAddress: "0x123",
|
||||
PublicKey: "pubkey",
|
||||
State: peer.SELF,
|
||||
Relation: peer.SELF,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user