package peer import ( "encoding/json" "errors" "fmt" "strings" "time" "cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" ) var currentRountine = 0 var singleton = &PeerCache{ Executions: []*PeerExecution{}, } type PeerExecution struct { Method tools.METHOD Url string Body map[string]interface{} IsMySelf bool Caller tools.HTTPCaller PeerID string DataType utils.DataType DataID string } type PeerCache struct { Executions []*PeerExecution } func (p *PeerCache) checkPeerStatus(peerID string, caller *tools.HTTPCaller) bool { api := tools.API{} access := (&Peer{}).GetAccessor(nil) res, code, _ := access.LoadOne(peerID) if code != 200 { return false } url := res.(*Peer).Url if strings.Contains(url, "localhost") || strings.Contains(url, "127.0.0.1") { url = strings.ReplaceAll(url, "localhost", utils.PEER.API()) url = strings.ReplaceAll(url, "127.0.0.1", utils.PEER.API()) } methods := caller.URLS[utils.PEER.String()] if methods == nil { return false } meth := methods[tools.POST] if meth == "" { return false } state := api.CheckRemotePeer(url + meth) return state != tools.DEAD } func (p *PeerCache) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { data := New() data.Init(utils.PEER, caller) return data } func (p *PeerCache) LaunchPeerExecution(peerID string, isMySelf bool, dataID string, url string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) { if strings.Contains(url, "localhost") || strings.Contains(url, "127.0.0.1") { if isMySelf { url = "http://" + dt.API() // default behavior on basic API container name } else { return nil, errors.New("Peer " + peerID + " is not reachable") } } var err error b := []byte{} methods := caller.URLS[dt.String()] if _, ok := methods[method]; !ok { return nil, errors.New("no path found") } if !p.checkPeerStatus(peerID, caller) { return nil, err } if method == tools.POST { b, err = caller.CallPost(url, methods[method], body) } if method == tools.GET { b, err = caller.CallGet(url, strings.ReplaceAll(methods[method], ":id", dataID)) } if method == tools.DELETE { b, err = caller.CallDelete(url, strings.ReplaceAll(methods[method], ":id", dataID)) } var m map[string]interface{} json.Unmarshal(b, &m) if err != nil { pexec := &PeerExecution{ Method: method, Url: url + methods[method], Body: body, IsMySelf: isMySelf, Caller: *caller, PeerID: peerID, DataType: dt, DataID: dataID, } singleton.Executions = append(singleton.Executions, pexec) if currentRountine == 0 { currentRountine++ go p.retryPeerExecution() } return pexec, err } if _, ok := m["error"]; !ok { return nil, errors.New(fmt.Sprintf("%v", m["error"])) } return nil, err } func (p *PeerCache) retryPeerExecution() { execs := []*PeerExecution{} for _, v := range singleton.Executions { d, err := p.LaunchPeerExecution(v.PeerID, v.IsMySelf, v.DataID, v.Url, v.DataType, v.Method, v.Body, &v.Caller) if err == nil { execs = append(execs, d) } else { logs.GetLogger().With().Err(err) } } singleton.Executions = execs if len(singleton.Executions) > 0 { time.Sleep(60 * time.Second) p.retryPeerExecution() } else { currentRountine = 0 } }