2024-08-13 14:33:26 +02:00
|
|
|
package peer
|
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
2024-08-22 10:19:14 +02:00
|
|
|
"regexp"
|
2024-08-13 14:33:26 +02:00
|
|
|
"strings"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"cloud.o-forge.io/core/oc-lib/logs"
|
|
|
|
"cloud.o-forge.io/core/oc-lib/models/utils"
|
|
|
|
"cloud.o-forge.io/core/oc-lib/tools"
|
|
|
|
)
|
|
|
|
|
|
|
|
var currentRountine = 0
|
|
|
|
var singleton = &PeerCache{
|
|
|
|
Executions: []*PeerExecution{},
|
|
|
|
}
|
|
|
|
|
|
|
|
type PeerExecution struct {
|
|
|
|
Method tools.METHOD
|
|
|
|
Url string
|
|
|
|
Body map[string]interface{}
|
2024-08-21 08:54:29 +02:00
|
|
|
IsMySelf bool
|
2024-08-13 14:33:26 +02:00
|
|
|
Caller tools.HTTPCaller
|
|
|
|
PeerID string
|
|
|
|
DataType utils.DataType
|
|
|
|
DataID string
|
|
|
|
}
|
|
|
|
|
|
|
|
type PeerCache struct {
|
|
|
|
Executions []*PeerExecution
|
|
|
|
}
|
|
|
|
|
2024-08-21 16:03:56 +02:00
|
|
|
func (p *PeerCache) checkPeerStatus(peerID string, caller *tools.HTTPCaller) bool {
|
2024-08-21 15:46:16 +02:00
|
|
|
api := tools.API{}
|
|
|
|
access := (&Peer{}).GetAccessor(nil)
|
|
|
|
res, code, _ := access.LoadOne(peerID)
|
|
|
|
if code != 200 {
|
|
|
|
return false
|
|
|
|
}
|
2024-08-21 16:03:56 +02:00
|
|
|
methods := caller.URLS[utils.PEER.String()]
|
2024-08-22 12:29:38 +02:00
|
|
|
fmt.Println("checkPeerStatus AFT 3", methods)
|
2024-08-22 08:50:24 +02:00
|
|
|
|
2024-08-21 16:03:56 +02:00
|
|
|
if methods == nil {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
meth := methods[tools.POST]
|
2024-08-22 08:50:24 +02:00
|
|
|
fmt.Println("checkPeerStatus AFT 4", meth)
|
2024-08-21 16:03:56 +02:00
|
|
|
if meth == "" {
|
|
|
|
return false
|
|
|
|
}
|
2024-08-22 12:29:38 +02:00
|
|
|
url := res.(*Peer).Url + meth
|
|
|
|
if strings.Contains(url, "localhost") || strings.Contains(url, "127.0.0.1") {
|
|
|
|
url = strings.ReplaceAll(url, "localhost", utils.PEER.API())
|
|
|
|
url = strings.ReplaceAll(url, "127.0.0.1", utils.PEER.API())
|
|
|
|
r := regexp.MustCompile("(:[0-9]+)")
|
|
|
|
ttt := r.FindAllString(url, -1)
|
|
|
|
for _, t := range ttt {
|
|
|
|
url = strings.Replace(url, t, ":8080", -1)
|
|
|
|
}
|
|
|
|
r.ReplaceAllString(url, ":8080")
|
|
|
|
}
|
|
|
|
fmt.Println("checkPeerStatus AFT 4", url)
|
|
|
|
state := api.CheckRemotePeer(url)
|
|
|
|
fmt.Println("checkPeerStatus AFT 4", state, tools.DEAD)
|
2024-08-22 08:50:24 +02:00
|
|
|
|
2024-08-21 15:46:16 +02:00
|
|
|
return state != tools.DEAD
|
2024-08-13 14:33:26 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
func (p *PeerCache) GetAccessor(caller *tools.HTTPCaller) utils.Accessor {
|
|
|
|
data := New()
|
|
|
|
data.Init(utils.PEER, caller)
|
|
|
|
return data
|
|
|
|
}
|
|
|
|
|
2024-08-21 08:54:29 +02:00
|
|
|
func (p *PeerCache) LaunchPeerExecution(peerID string, isMySelf bool, dataID string, url string,
|
|
|
|
dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
|
2024-08-21 16:38:29 +02:00
|
|
|
fmt.Println("LaunchPeerExecution", url, strings.Contains(url, "localhost") || strings.Contains(url, "127.0.0.1"))
|
2024-08-21 08:54:29 +02:00
|
|
|
if strings.Contains(url, "localhost") || strings.Contains(url, "127.0.0.1") {
|
2024-08-21 16:38:29 +02:00
|
|
|
url = strings.ReplaceAll(url, "localhost", dt.API())
|
|
|
|
url = strings.ReplaceAll(url, "127.0.0.1", dt.API())
|
2024-08-22 10:39:03 +02:00
|
|
|
r := regexp.MustCompile("(:[0-9]+)")
|
2024-08-22 11:59:23 +02:00
|
|
|
ttt := r.FindAllString(url, -1)
|
|
|
|
for _, t := range ttt {
|
|
|
|
url = strings.Replace(url, t, ":8080", -1)
|
|
|
|
}
|
2024-08-22 10:19:14 +02:00
|
|
|
r.ReplaceAllString(url, ":8080")
|
2024-08-21 08:54:29 +02:00
|
|
|
}
|
2024-08-21 16:38:29 +02:00
|
|
|
fmt.Println("LaunchPeerExecution AFT", url, dt.API())
|
2024-08-13 14:33:26 +02:00
|
|
|
var err error
|
|
|
|
b := []byte{}
|
|
|
|
methods := caller.URLS[dt.String()]
|
2024-08-22 08:50:24 +02:00
|
|
|
fmt.Println("LaunchPeerExecution AFT 2", methods)
|
2024-08-13 14:33:26 +02:00
|
|
|
if _, ok := methods[method]; !ok {
|
|
|
|
return nil, errors.New("no path found")
|
|
|
|
}
|
2024-08-21 16:03:56 +02:00
|
|
|
if !p.checkPeerStatus(peerID, caller) {
|
2024-08-22 12:29:38 +02:00
|
|
|
return nil, errors.New("peer is not reachable")
|
2024-08-13 14:33:26 +02:00
|
|
|
}
|
|
|
|
if method == tools.POST {
|
|
|
|
b, err = caller.CallPost(url, methods[method], body)
|
|
|
|
}
|
|
|
|
if method == tools.GET {
|
|
|
|
b, err = caller.CallGet(url, strings.ReplaceAll(methods[method], ":id", dataID))
|
|
|
|
}
|
|
|
|
if method == tools.DELETE {
|
|
|
|
b, err = caller.CallDelete(url, strings.ReplaceAll(methods[method], ":id", dataID))
|
|
|
|
}
|
|
|
|
var m map[string]interface{}
|
|
|
|
json.Unmarshal(b, &m)
|
|
|
|
if err != nil {
|
|
|
|
pexec := &PeerExecution{
|
|
|
|
Method: method,
|
|
|
|
Url: url + methods[method],
|
|
|
|
Body: body,
|
2024-08-21 08:54:29 +02:00
|
|
|
IsMySelf: isMySelf,
|
2024-08-13 14:33:26 +02:00
|
|
|
Caller: *caller,
|
|
|
|
PeerID: peerID,
|
|
|
|
DataType: dt,
|
|
|
|
DataID: dataID,
|
|
|
|
}
|
|
|
|
singleton.Executions = append(singleton.Executions, pexec)
|
|
|
|
if currentRountine == 0 {
|
|
|
|
currentRountine++
|
|
|
|
go p.retryPeerExecution()
|
|
|
|
}
|
|
|
|
return pexec, err
|
|
|
|
}
|
2024-08-22 08:50:24 +02:00
|
|
|
fmt.Println("LaunchPeerExecution AFT 3", m)
|
|
|
|
|
|
|
|
if e, ok := m["error"]; !ok && e != "" {
|
2024-08-13 14:33:26 +02:00
|
|
|
return nil, errors.New(fmt.Sprintf("%v", m["error"]))
|
|
|
|
}
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *PeerCache) retryPeerExecution() {
|
|
|
|
execs := []*PeerExecution{}
|
|
|
|
for _, v := range singleton.Executions {
|
2024-08-21 08:54:29 +02:00
|
|
|
|
|
|
|
d, err := p.LaunchPeerExecution(v.PeerID, v.IsMySelf, v.DataID, v.Url, v.DataType, v.Method, v.Body, &v.Caller)
|
2024-08-13 14:33:26 +02:00
|
|
|
if err == nil {
|
|
|
|
execs = append(execs, d)
|
|
|
|
} else {
|
|
|
|
logs.GetLogger().With().Err(err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
singleton.Executions = execs
|
|
|
|
if len(singleton.Executions) > 0 {
|
|
|
|
time.Sleep(60 * time.Second)
|
|
|
|
p.retryPeerExecution()
|
|
|
|
} else {
|
|
|
|
currentRountine = 0
|
|
|
|
}
|
|
|
|
}
|