136 lines
5.0 KiB
Go
136 lines
5.0 KiB
Go
package peer
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"regexp"
|
|
"strings"
|
|
|
|
"cloud.o-forge.io/core/oc-lib/models/utils"
|
|
"cloud.o-forge.io/core/oc-lib/tools"
|
|
)
|
|
|
|
/*
|
|
* PeerExecution is a struct that represents an execution on a peer
|
|
* it defines the execution data
|
|
*/
|
|
type PeerExecution struct {
|
|
Method string `json:"method" bson:"method"`
|
|
Url string `json:"url" bson:"url"`
|
|
Body map[string]interface{} `json:"body" bson:"body"`
|
|
DataType int `json:"data_type" bson:"data_type"`
|
|
DataID string `json:"data_id" bson:"data_id"`
|
|
}
|
|
|
|
var cache = &PeerCache{} // Singleton instance of the peer cache
|
|
// PeerCache is a struct that represents a peer cache
|
|
type PeerCache struct {
|
|
Executions []*PeerExecution
|
|
}
|
|
|
|
// urlFormat formats the URL of the peer with the data type API function
|
|
func (p *PeerCache) urlFormat(url string, dt utils.DataType) string {
|
|
// localhost is replaced by the local peer URL
|
|
// because localhost must collide on a web request security protocol
|
|
if strings.Contains(url, "localhost") || strings.Contains(url, "127.0.0.1") {
|
|
url = strings.ReplaceAll(url, "localhost", dt.API())
|
|
url = strings.ReplaceAll(url, "127.0.0.1", dt.API())
|
|
r := regexp.MustCompile("(:[0-9]+)")
|
|
t := r.FindString(url)
|
|
if t != "" {
|
|
url = strings.Replace(url, t, ":8080", -1)
|
|
}
|
|
r.ReplaceAllString(url, ":8080")
|
|
}
|
|
return url
|
|
}
|
|
|
|
// checkPeerStatus checks the status of a peer
|
|
func (p *PeerCache) checkPeerStatus(peerID string, appName string, caller *tools.HTTPCaller) (*Peer, bool) {
|
|
api := tools.API{}
|
|
access := (&Peer{}).GetAccessor(nil)
|
|
res, code, _ := access.LoadOne(peerID) // Load the peer from db
|
|
if code != 200 { // no peer no party
|
|
return nil, false
|
|
}
|
|
methods := caller.URLS[utils.PEER.String()] // Get the methods url of the peer
|
|
if methods == nil {
|
|
return res.(*Peer), false
|
|
}
|
|
meth := methods[tools.POST] // Get the POST method to check status
|
|
if meth == "" {
|
|
return res.(*Peer), false
|
|
}
|
|
url := p.urlFormat(res.(*Peer).Url+meth, utils.PEER) // Format the URL
|
|
state, services := api.CheckRemotePeer(url) // Check the status of the peer
|
|
res.(*Peer).Services = services // Update the services states of the peer
|
|
access.UpdateOne(res, peerID) // Update the peer in the db
|
|
return res.(*Peer), state != tools.DEAD && services[appName] == 0 // Return the peer and its status
|
|
}
|
|
|
|
// LaunchPeerExecution launches an execution on a peer
|
|
func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
|
|
dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
|
|
methods := caller.URLS[dt.String()] // Get the methods url of the data type
|
|
if _, ok := methods[method]; !ok {
|
|
return nil, errors.New("no path found")
|
|
}
|
|
meth := methods[method] // Get the method url to execute
|
|
if meth == "" {
|
|
return nil, errors.New("no path found")
|
|
} else {
|
|
meth = strings.ReplaceAll(meth, ":id", dataID) // Replace the id in the url in case of a DELETE / UPDATE method (it's a standard naming in OC)
|
|
}
|
|
url := ""
|
|
// Check the status of the peer
|
|
if mypeer, ok := p.checkPeerStatus(peerID, dt.API(), caller); !ok {
|
|
// If the peer is not reachable, add the execution to the failed executions list
|
|
pexec := &PeerExecution{
|
|
Method: method.String(),
|
|
Url: p.urlFormat((mypeer.Url)+meth, dt),
|
|
Body: body,
|
|
DataType: dt.EnumIndex(),
|
|
DataID: dataID,
|
|
}
|
|
mypeer.AddExecution(*pexec)
|
|
mypeer.GetAccessor(nil).UpdateOne(mypeer, peerID) // Update the peer in the db
|
|
return nil, errors.New("peer is not reachable")
|
|
} else {
|
|
// If the peer is reachable, launch the execution
|
|
url = p.urlFormat((mypeer.Url)+meth, dt) // Format the URL
|
|
tmp := mypeer.FailedExecution // Get the failed executions list
|
|
mypeer.FailedExecution = []PeerExecution{} // Reset the failed executions list
|
|
mypeer.GetAccessor(nil).UpdateOne(mypeer, peerID) // Update the peer in the db
|
|
for _, v := range tmp { // Retry the failed executions
|
|
go p.exec(v.Url, tools.ToMethod(v.Method), v.Body, caller)
|
|
}
|
|
}
|
|
return nil, p.exec(url, method, body, caller) // Execute the method
|
|
}
|
|
|
|
// exec executes the method on the peer
|
|
func (p *PeerCache) exec(url string, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) error {
|
|
var b []byte
|
|
var err error
|
|
if method == tools.POST { // Execute the POST method if it's a POST method
|
|
b, err = caller.CallPost(url, "", body)
|
|
}
|
|
if method == tools.GET { // Execute the GET method if it's a GET method
|
|
b, err = caller.CallGet(url, "")
|
|
}
|
|
if method == tools.DELETE { // Execute the DELETE method if it's a DELETE method
|
|
b, err = caller.CallDelete(url, "")
|
|
}
|
|
var m map[string]interface{}
|
|
json.Unmarshal(b, &m)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if e, ok := m["error"]; !ok && e != "" { // Check if there is an error in the response
|
|
return errors.New(fmt.Sprintf("%v", m["error"]))
|
|
}
|
|
return err
|
|
}
|