package peer import ( "encoding/json" "errors" "fmt" "regexp" "strings" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" ) type PeerExecution struct { Method string `json:"method" bson:"method"` Url string `json:"url" bson:"url"` Body map[string]interface{} `json:"body" bson:"body"` DataType int `json:"data_type" bson:"data_type"` DataID string `json:"data_id" bson:"data_id"` } type PeerCache struct { Executions []*PeerExecution } func (p *PeerCache) urlFormat(url string, dt utils.DataType) string { if strings.Contains(url, "localhost") || strings.Contains(url, "127.0.0.1") { url = strings.ReplaceAll(url, "localhost", dt.API()) url = strings.ReplaceAll(url, "127.0.0.1", dt.API()) r := regexp.MustCompile("(:[0-9]+)") t := r.FindString(url) if t != "" { url = strings.Replace(url, t, ":8080", -1) } r.ReplaceAllString(url, ":8080") } return url } func (p *PeerCache) checkPeerStatus(peerID string, caller *tools.HTTPCaller) (*Peer, bool) { api := tools.API{} access := (&Peer{}).GetAccessor(nil) res, code, _ := access.LoadOne(peerID) if code != 200 { return nil, false } methods := caller.URLS[utils.PEER.String()] fmt.Println("PEER AFT 3", methods) if methods == nil { return res.(*Peer), false } meth := methods[tools.POST] fmt.Println("PEER AFT 4", meth) if meth == "" { return res.(*Peer), false } url := p.urlFormat(res.(*Peer).Url+meth, utils.PEER) fmt.Println("PEER AFT 5", url) state, services := api.CheckRemotePeer(url) res.(*Peer).Services = services access.UpdateOne(res, peerID) fmt.Printf("Peer %v is %v\n", peerID, state) return res.(*Peer), state != tools.DEAD } func (p *PeerCache) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { data := New() data.Init(utils.PEER, caller) return data } func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) { var err error b := []byte{} methods := caller.URLS[dt.String()] if _, ok := methods[method]; !ok { return nil, errors.New("no path found") } meth := methods[method] if meth == "" { return nil, errors.New("no path found") } else { meth = strings.ReplaceAll(meth, ":id", dataID) } url := "" pexec := &PeerExecution{ Method: method.String(), Url: url + methods[method], Body: body, DataType: dt.EnumIndex(), DataID: dataID, } if mypeer, ok := p.checkPeerStatus(peerID, caller); !ok { mypeer.AddExecution(*pexec) mypeer.GetAccessor(nil).UpdateOne(mypeer, peerID) return nil, errors.New("peer is not reachable") } else { url = p.urlFormat((mypeer.Url)+meth, dt) fmt.Println("LaunchPeerExecution AFT 3", url) tmp := mypeer.FailedExecution mypeer.FailedExecution = []PeerExecution{} mypeer.GetAccessor(nil).UpdateOne(mypeer, peerID) for _, v := range tmp { go p.LaunchPeerExecution(peerID, v.DataID, utils.DataType(v.DataType), tools.ToMethod(v.Method), v.Body, caller) } } if method == tools.POST { b, err = caller.CallPost(url, "", body) } if method == tools.GET { b, err = caller.CallGet(url, "") } if method == tools.DELETE { b, err = caller.CallDelete(url, "") } var m map[string]interface{} json.Unmarshal(b, &m) if err != nil { return nil, err } fmt.Println("LaunchPeerExecution AFT 3", m, url) if e, ok := m["error"]; !ok && e != "" { return nil, errors.New(fmt.Sprintf("%v", m["error"])) } return nil, err }