simplify call to peer

This commit is contained in:
mr
2024-08-23 09:53:37 +02:00
parent 29c2ab0e4c
commit e8acf8a127
6 changed files with 98 additions and 124 deletions

View File

@@ -2,6 +2,7 @@ package peer
import (
"encoding/json"
"fmt"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/static"
@@ -11,9 +12,33 @@ import (
type Peer struct {
utils.AbstractObject
Url string `json:"url,omitempty" bson:"url,omitempty" validate:"required,base64url"`
PublicKey string `json:"public_key,omitempty" bson:"public_key,omitempty"`
Services map[string]int `json:"services,omitempty" bson:"services,omitempty"`
Url string `json:"url,omitempty" bson:"url,omitempty" validate:"required,base64url"`
PublicKey string `json:"public_key,omitempty" bson:"public_key,omitempty"`
Services map[string]int `json:"services,omitempty" bson:"services,omitempty"`
FailedExecution []PeerExecution `json:"failed_execution,omitempty" bson:"failed_execution,omitempty"`
}
func (ao *Peer) AddExecution(exec PeerExecution) {
found := false
for _, v := range ao.FailedExecution {
if v.Url == exec.Url && v.Method == exec.Method && fmt.Sprint(v.Body) == fmt.Sprint(exec.Body) {
found = true
break
}
}
if !found {
ao.FailedExecution = append(ao.FailedExecution, exec)
}
}
func (ao *Peer) RemoveExecution(exec PeerExecution) {
new := []PeerExecution{}
for i, v := range ao.FailedExecution {
if !(v.Url == exec.Url && v.Method == exec.Method && fmt.Sprint(v.Body) == fmt.Sprint(exec.Body)) {
new = append(new, ao.FailedExecution[i])
}
}
ao.FailedExecution = new
}
func (ao *Peer) IsMySelf() bool {
@@ -21,9 +46,9 @@ func (ao *Peer) IsMySelf() bool {
return ao.UUID == id
}
func (p *Peer) LaunchPeerExecution(peerID string, dataID string, url string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
func (p *Peer) LaunchPeerExecution(peerID string, dataID string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
p.UUID = peerID
return (&PeerCache{}).LaunchPeerExecution(peerID, p.IsMySelf(), dataID, url, dt, method, body, caller)
return (&PeerCache{}).LaunchPeerExecution(peerID, dataID, dt, method, body, caller)
}
func (ao *Peer) GetID() string {

View File

@@ -6,27 +6,17 @@ import (
"fmt"
"regexp"
"strings"
"time"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
var currentRountine = 0
var singleton = &PeerCache{
Executions: []*PeerExecution{},
}
type PeerExecution struct {
Method tools.METHOD
Url string
Body map[string]interface{}
IsMySelf bool
Caller tools.HTTPCaller
PeerID string
DataType utils.DataType
DataID string
Method string `json:"method" bson:"method"`
Url string `json:"url" bson:"url"`
Body map[string]interface{} `json:"body" bson:"body"`
DataType int `json:"data_type" bson:"data_type"`
DataID string `json:"data_id" bson:"data_id"`
}
type PeerCache struct {
@@ -47,22 +37,22 @@ func (p *PeerCache) urlFormat(url string, dt utils.DataType) string {
return url
}
func (p *PeerCache) checkPeerStatus(peerID string, caller *tools.HTTPCaller) bool {
func (p *PeerCache) checkPeerStatus(peerID string, caller *tools.HTTPCaller) (*Peer, bool) {
api := tools.API{}
access := (&Peer{}).GetAccessor(nil)
res, code, _ := access.LoadOne(peerID)
if code != 200 {
return false
return nil, false
}
methods := caller.URLS[utils.PEER.String()]
fmt.Println("PEER AFT 3", methods)
if methods == nil {
return false
return res.(*Peer), false
}
meth := methods[tools.POST]
fmt.Println("PEER AFT 4", meth)
if meth == "" {
return false
return res.(*Peer), false
}
url := p.urlFormat(res.(*Peer).Url+meth, utils.PEER)
fmt.Println("PEER AFT 5", url)
@@ -70,7 +60,7 @@ func (p *PeerCache) checkPeerStatus(peerID string, caller *tools.HTTPCaller) boo
res.(*Peer).Services = services
access.UpdateOne(res, peerID)
fmt.Printf("Peer %v is %v\n", peerID, state)
return state != tools.DEAD
return res.(*Peer), state != tools.DEAD
}
func (p *PeerCache) GetAccessor(caller *tools.HTTPCaller) utils.Accessor {
@@ -79,7 +69,7 @@ func (p *PeerCache) GetAccessor(caller *tools.HTTPCaller) utils.Accessor {
return data
}
func (p *PeerCache) LaunchPeerExecution(peerID string, isMySelf bool, dataID string, url string,
func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
var err error
b := []byte{}
@@ -93,10 +83,28 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, isMySelf bool, dataID str
} else {
meth = strings.ReplaceAll(meth, ":id", dataID)
}
url = p.urlFormat(url+meth, dt)
fmt.Println("LaunchPeerExecution AFT 3", url)
if !p.checkPeerStatus(peerID, caller) {
url := ""
pexec := &PeerExecution{
Method: method.String(),
Url: url + methods[method],
Body: body,
DataType: dt.EnumIndex(),
DataID: dataID,
}
if mypeer, ok := p.checkPeerStatus(peerID, caller); !ok {
mypeer.AddExecution(*pexec)
return nil, errors.New("peer is not reachable")
} else {
url = p.urlFormat((mypeer.Url)+meth, dt)
fmt.Println("LaunchPeerExecution AFT 3", url)
tmp := []PeerExecution{}
for _, v := range mypeer.FailedExecution {
tmp = append(tmp, v)
}
mypeer.FailedExecution = []PeerExecution{}
for _, v := range tmp {
go p.LaunchPeerExecution(peerID, v.DataID, utils.DataType(v.DataType), tools.ToMethod(v.Method), v.Body, caller)
}
}
if method == tools.POST {
b, err = caller.CallPost(url, "", body)
@@ -110,22 +118,7 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, isMySelf bool, dataID str
var m map[string]interface{}
json.Unmarshal(b, &m)
if err != nil {
pexec := &PeerExecution{
Method: method,
Url: url + methods[method],
Body: body,
IsMySelf: isMySelf,
Caller: *caller,
PeerID: peerID,
DataType: dt,
DataID: dataID,
}
singleton.Executions = append(singleton.Executions, pexec)
/*if currentRountine == 0 {
currentRountine++
go p.retryPeerExecution()
}*/
return pexec, err
return nil, err
}
fmt.Println("LaunchPeerExecution AFT 3", m, url)
@@ -134,23 +127,3 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, isMySelf bool, dataID str
}
return nil, err
}
func (p *PeerCache) retryPeerExecution() {
execs := []*PeerExecution{}
for _, v := range singleton.Executions {
d, err := p.LaunchPeerExecution(v.PeerID, v.IsMySelf, v.DataID, v.Url, v.DataType, v.Method, v.Body, &v.Caller)
if err == nil {
execs = append(execs, d)
} else {
logs.GetLogger().With().Err(err)
}
}
singleton.Executions = execs
if len(singleton.Executions) > 0 {
time.Sleep(60 * time.Second)
p.retryPeerExecution()
} else {
currentRountine = 0
}
}