oc-lib/models/peer/peer_cache.go

137 lines
3.3 KiB
Go
Raw Normal View History

2024-08-13 14:33:26 +02:00
package peer
import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
var currentRountine = 0
var singleton = &PeerCache{
Executions: []*PeerExecution{},
}
type PeerExecution struct {
Method tools.METHOD
Url string
Body map[string]interface{}
2024-08-21 08:54:29 +02:00
IsMySelf bool
2024-08-13 14:33:26 +02:00
Caller tools.HTTPCaller
PeerID string
DataType utils.DataType
DataID string
}
type PeerCache struct {
Executions []*PeerExecution
}
2024-08-21 16:03:56 +02:00
func (p *PeerCache) checkPeerStatus(peerID string, caller *tools.HTTPCaller) bool {
2024-08-21 15:46:16 +02:00
api := tools.API{}
access := (&Peer{}).GetAccessor(nil)
res, code, _ := access.LoadOne(peerID)
if code != 200 {
return false
}
url := res.(*Peer).Url
if strings.Contains(url, "localhost") || strings.Contains(url, "127.0.0.1") {
url = strings.ReplaceAll(url, "localhost", utils.PEER.API())
2024-08-21 16:07:57 +02:00
url = strings.ReplaceAll(url, "127.0.0.1", utils.PEER.API())
2024-08-21 15:46:16 +02:00
}
2024-08-21 16:03:56 +02:00
methods := caller.URLS[utils.PEER.String()]
if methods == nil {
return false
}
meth := methods[tools.POST]
if meth == "" {
return false
}
state := api.CheckRemotePeer(url + meth)
2024-08-21 15:46:16 +02:00
return state != tools.DEAD
2024-08-13 14:33:26 +02:00
}
func (p *PeerCache) GetAccessor(caller *tools.HTTPCaller) utils.Accessor {
data := New()
data.Init(utils.PEER, caller)
return data
}
2024-08-21 08:54:29 +02:00
func (p *PeerCache) LaunchPeerExecution(peerID string, isMySelf bool, dataID string, url string,
dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
if strings.Contains(url, "localhost") || strings.Contains(url, "127.0.0.1") {
if isMySelf {
url = "http://" + dt.API() // default behavior on basic API container name
} else {
return nil, errors.New("Peer " + peerID + " is not reachable")
}
}
2024-08-13 14:33:26 +02:00
var err error
b := []byte{}
methods := caller.URLS[dt.String()]
if _, ok := methods[method]; !ok {
return nil, errors.New("no path found")
}
2024-08-21 16:03:56 +02:00
if !p.checkPeerStatus(peerID, caller) {
2024-08-13 14:33:26 +02:00
return nil, err
}
if method == tools.POST {
b, err = caller.CallPost(url, methods[method], body)
}
if method == tools.GET {
b, err = caller.CallGet(url, strings.ReplaceAll(methods[method], ":id", dataID))
}
if method == tools.DELETE {
b, err = caller.CallDelete(url, strings.ReplaceAll(methods[method], ":id", dataID))
}
var m map[string]interface{}
json.Unmarshal(b, &m)
if err != nil {
pexec := &PeerExecution{
Method: method,
Url: url + methods[method],
Body: body,
2024-08-21 08:54:29 +02:00
IsMySelf: isMySelf,
2024-08-13 14:33:26 +02:00
Caller: *caller,
PeerID: peerID,
DataType: dt,
DataID: dataID,
}
singleton.Executions = append(singleton.Executions, pexec)
if currentRountine == 0 {
currentRountine++
go p.retryPeerExecution()
}
return pexec, err
}
if _, ok := m["error"]; !ok {
return nil, errors.New(fmt.Sprintf("%v", m["error"]))
}
return nil, err
}
func (p *PeerCache) retryPeerExecution() {
execs := []*PeerExecution{}
for _, v := range singleton.Executions {
2024-08-21 08:54:29 +02:00
d, err := p.LaunchPeerExecution(v.PeerID, v.IsMySelf, v.DataID, v.Url, v.DataType, v.Method, v.Body, &v.Caller)
2024-08-13 14:33:26 +02:00
if err == nil {
execs = append(execs, d)
} else {
logs.GetLogger().With().Err(err)
}
}
singleton.Executions = execs
if len(singleton.Executions) > 0 {
time.Sleep(60 * time.Second)
p.retryPeerExecution()
} else {
currentRountine = 0
}
}