package peer import ( "encoding/json" "errors" "fmt" "regexp" "strings" "cloud.o-forge.io/core/oc-lib/tools" ) /* * PeerExecution is a struct that represents an execution on a peer * it defines the execution data */ type PeerExecution struct { Method string `json:"method" bson:"method"` Url string `json:"url" bson:"url"` Body map[string]interface{} `json:"body" bson:"body"` DataType int `json:"data_type" bson:"data_type"` DataID string `json:"data_id" bson:"data_id"` } var cache = &PeerCache{} // Singleton instance of the peer cache // PeerCache is a struct that represents a peer cache type PeerCache struct { Executions []*PeerExecution } // urlFormat formats the URL of the peer with the data type API function func (p *PeerCache) urlFormat(url string, dt tools.DataType) string { // localhost is replaced by the local peer URL // because localhost must collide on a web request security protocol localhost := "" if strings.Contains(url, "localhost") { localhost = "localhost" } if strings.Contains(url, "127.0.0.1") { localhost = "127.0.0.1" } if localhost != "" { r := regexp.MustCompile("(" + localhost + ":[0-9]+)") t := r.FindString(url) if t != "" { url = strings.Replace(url, t, dt.API()+":8080/oc", -1) } else { url = strings.ReplaceAll(url, localhost, dt.API()+":8080/oc") } } else { url = url + "/" + dt.API() } return url } // checkPeerStatus checks the status of a peer func (p *PeerCache) checkPeerStatus(peerID string, appName string, caller *tools.HTTPCaller) (*Peer, bool) { api := tools.API{} access := (&Peer{}).GetAccessor(nil) res, code, _ := access.LoadOne(peerID) // Load the peer from db if code != 200 { // no peer no party return nil, false } methods := caller.URLS[tools.PEER] // Get the methods url of the peer if methods == nil { return res.(*Peer), false } meth := methods[tools.POST] // Get the POST method to check status if meth == "" { return res.(*Peer), false } url := p.urlFormat(res.(*Peer).Url, tools.PEER) + meth // Format the URL fmt.Println("Checking peer status on", url, "...") state, services := api.CheckRemotePeer(url) fmt.Println("Checking peer status on", url, state, services) // Check the status of the peer res.(*Peer).Services = services // Update the services states of the peer access.UpdateOne(res, peerID) // Update the peer in the db return res.(*Peer), state != tools.DEAD && services[appName] == 0 // Return the peer and its status } // LaunchPeerExecution launches an execution on a peer func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string, dt tools.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) { fmt.Println("Launching peer execution on", caller.URLS, dt, method) methods := caller.URLS[dt] // Get the methods url of the data type if m, ok := methods[method]; !ok || m == "" { return nil, errors.New("no path found") } meth := methods[method] // Get the method url to execute meth = strings.ReplaceAll(meth, ":id", dataID) // Replace the id in the url in case of a DELETE / UPDATE method (it's a standard naming in OC) url := "" // Check the status of the peer if mypeer, ok := p.checkPeerStatus(peerID, dt.API(), caller); !ok { // If the peer is not reachable, add the execution to the failed executions list pexec := &PeerExecution{ Method: method.String(), Url: p.urlFormat((mypeer.Url)+meth, dt), Body: body, DataType: dt.EnumIndex(), DataID: dataID, } mypeer.AddExecution(*pexec) mypeer.GetAccessor(nil).UpdateOne(mypeer, peerID) // Update the peer in the db return nil, errors.New("peer is not reachable") } else { // If the peer is reachable, launch the execution url = p.urlFormat((mypeer.Url)+meth, dt) // Format the URL tmp := mypeer.FailedExecution // Get the failed executions list mypeer.FailedExecution = []PeerExecution{} // Reset the failed executions list mypeer.GetAccessor(nil).UpdateOne(mypeer, peerID) // Update the peer in the db for _, v := range tmp { // Retry the failed executions go p.exec(v.Url, tools.ToMethod(v.Method), v.Body, caller) } } fmt.Println("URL exec", url) return nil, p.exec(url, method, body, caller) // Execute the method } // exec executes the method on the peer func (p *PeerCache) exec(url string, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) error { var b []byte var err error if method == tools.POST { // Execute the POST method if it's a POST method b, err = caller.CallPost(url, "", body) } if method == tools.GET { // Execute the GET method if it's a GET method b, err = caller.CallGet(url, "") } if method == tools.DELETE { // Execute the DELETE method if it's a DELETE method b, err = caller.CallDelete(url, "") } var m map[string]interface{} json.Unmarshal(b, &m) if err != nil { return err } if e, ok := m["error"]; ok && e != "" && e != "" { // Check if there is an error in the response return errors.New(fmt.Sprintf("%v", m["error"])) } return nil }