oc-lib/models/peer/peer_cache.go

134 lines
3.7 KiB
Go
Raw Normal View History

2024-08-13 14:33:26 +02:00
package peer
import (
"encoding/json"
"errors"
"fmt"
2024-08-22 10:19:14 +02:00
"regexp"
2024-08-13 14:33:26 +02:00
"strings"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
type PeerExecution struct {
2024-08-23 09:53:37 +02:00
Method string `json:"method" bson:"method"`
Url string `json:"url" bson:"url"`
Body map[string]interface{} `json:"body" bson:"body"`
DataType int `json:"data_type" bson:"data_type"`
DataID string `json:"data_id" bson:"data_id"`
2024-08-13 14:33:26 +02:00
}
type PeerCache struct {
Executions []*PeerExecution
}
2024-08-22 16:13:07 +02:00
func (p *PeerCache) urlFormat(url string, dt utils.DataType) string {
2024-08-22 15:18:59 +02:00
if strings.Contains(url, "localhost") || strings.Contains(url, "127.0.0.1") {
2024-08-22 16:13:07 +02:00
url = strings.ReplaceAll(url, "localhost", dt.API())
url = strings.ReplaceAll(url, "127.0.0.1", dt.API())
2024-08-22 15:18:59 +02:00
r := regexp.MustCompile("(:[0-9]+)")
t := r.FindString(url)
if t != "" {
url = strings.Replace(url, t, ":8080", -1)
}
r.ReplaceAllString(url, ":8080")
}
return url
}
2024-08-23 10:01:37 +02:00
func (p *PeerCache) checkPeerStatus(peerID string, appName string, caller *tools.HTTPCaller) (*Peer, bool) {
2024-08-21 15:46:16 +02:00
api := tools.API{}
access := (&Peer{}).GetAccessor(nil)
res, code, _ := access.LoadOne(peerID)
if code != 200 {
2024-08-23 09:53:37 +02:00
return nil, false
2024-08-21 15:46:16 +02:00
}
2024-08-21 16:03:56 +02:00
methods := caller.URLS[utils.PEER.String()]
2024-08-23 09:11:41 +02:00
fmt.Println("PEER AFT 3", methods)
2024-08-21 16:03:56 +02:00
if methods == nil {
2024-08-23 09:53:37 +02:00
return res.(*Peer), false
2024-08-21 16:03:56 +02:00
}
meth := methods[tools.POST]
2024-08-23 09:11:41 +02:00
fmt.Println("PEER AFT 4", meth)
2024-08-21 16:03:56 +02:00
if meth == "" {
2024-08-23 09:53:37 +02:00
return res.(*Peer), false
2024-08-21 16:03:56 +02:00
}
2024-08-22 16:13:07 +02:00
url := p.urlFormat(res.(*Peer).Url+meth, utils.PEER)
2024-08-23 09:11:41 +02:00
fmt.Println("PEER AFT 5", url)
state, services := api.CheckRemotePeer(url)
res.(*Peer).Services = services
2024-08-23 08:23:24 +02:00
access.UpdateOne(res, peerID)
2024-08-23 15:43:12 +02:00
fmt.Println("PEER AFT 6", state, services, appName)
2024-08-23 10:01:37 +02:00
return res.(*Peer), state != tools.DEAD && services[appName] == 0
2024-08-13 14:33:26 +02:00
}
func (p *PeerCache) GetAccessor(caller *tools.HTTPCaller) utils.Accessor {
data := New()
data.Init(utils.PEER, caller)
return data
}
2024-08-23 09:53:37 +02:00
func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
2024-08-21 08:54:29 +02:00
dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
2024-08-13 14:33:26 +02:00
methods := caller.URLS[dt.String()]
if _, ok := methods[method]; !ok {
return nil, errors.New("no path found")
}
2024-08-22 16:28:21 +02:00
meth := methods[method]
if meth == "" {
return nil, errors.New("no path found")
} else {
meth = strings.ReplaceAll(meth, ":id", dataID)
}
2024-08-23 09:53:37 +02:00
url := ""
2024-08-23 23:00:57 +02:00
2024-08-23 10:01:37 +02:00
if mypeer, ok := p.checkPeerStatus(peerID, dt.API(), caller); !ok {
2024-08-23 23:00:57 +02:00
pexec := &PeerExecution{
Method: method.String(),
Url: p.urlFormat((mypeer.Url)+meth, dt),
Body: body,
DataType: dt.EnumIndex(),
DataID: dataID,
}
2024-08-23 09:53:37 +02:00
mypeer.AddExecution(*pexec)
2024-08-23 09:57:11 +02:00
mypeer.GetAccessor(nil).UpdateOne(mypeer, peerID)
2024-08-22 12:29:38 +02:00
return nil, errors.New("peer is not reachable")
2024-08-23 09:53:37 +02:00
} else {
url = p.urlFormat((mypeer.Url)+meth, dt)
fmt.Println("LaunchPeerExecution AFT 3", url)
2024-08-23 09:57:52 +02:00
tmp := mypeer.FailedExecution
2024-08-23 09:53:37 +02:00
mypeer.FailedExecution = []PeerExecution{}
2024-08-23 09:57:11 +02:00
mypeer.GetAccessor(nil).UpdateOne(mypeer, peerID)
2024-08-23 09:53:37 +02:00
for _, v := range tmp {
2024-08-23 23:00:57 +02:00
go p.exec(v.Url, tools.ToMethod(v.Method), v.Body, caller)
2024-08-23 09:53:37 +02:00
}
2024-08-13 14:33:26 +02:00
}
2024-08-23 23:00:57 +02:00
return nil, p.exec(url, method, body, caller)
}
func (p *PeerCache) exec(url string, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) error {
var b []byte
var err error
2024-08-13 14:33:26 +02:00
if method == tools.POST {
2024-08-22 16:28:21 +02:00
b, err = caller.CallPost(url, "", body)
2024-08-13 14:33:26 +02:00
}
if method == tools.GET {
2024-08-22 16:28:21 +02:00
b, err = caller.CallGet(url, "")
2024-08-13 14:33:26 +02:00
}
if method == tools.DELETE {
2024-08-22 16:28:21 +02:00
b, err = caller.CallDelete(url, "")
2024-08-13 14:33:26 +02:00
}
var m map[string]interface{}
json.Unmarshal(b, &m)
if err != nil {
2024-08-23 23:00:57 +02:00
return err
2024-08-13 14:33:26 +02:00
}
2024-08-22 16:47:15 +02:00
fmt.Println("LaunchPeerExecution AFT 3", m, url)
2024-08-22 08:50:24 +02:00
if e, ok := m["error"]; !ok && e != "" {
2024-08-23 23:00:57 +02:00
return errors.New(fmt.Sprintf("%v", m["error"]))
2024-08-13 14:33:26 +02:00
}
2024-08-23 23:00:57 +02:00
return err
2024-08-13 14:33:26 +02:00
}