This commit is contained in:
mr
2024-11-28 13:19:01 +01:00
parent b388e43f30
commit 9ea342c4c4
37 changed files with 198 additions and 122 deletions

View File

@@ -63,7 +63,7 @@ func (ao *Peer) RemoveExecution(exec PeerExecution) {
// IsMySelf checks if the peer is the local peer
func (ao *Peer) IsMySelf() (bool, string) {
d, code, err := New().Search(nil, SELF.String())
d, code, err := New(tools.PEER, "", nil, nil).Search(nil, SELF.String())
if code != 200 || err != nil || len(d) == 0 {
return false, ""
}
@@ -77,7 +77,6 @@ func (p *Peer) LaunchPeerExecution(peerID string, dataID string, dt tools.DataTy
return cache.LaunchPeerExecution(peerID, dataID, dt, method, body, caller) // Launch the execution on the peer through the cache
}
func (d *Peer) GetAccessor(peerID string, groups []string, caller *tools.HTTPCaller) utils.Accessor {
data := New() // Create a new instance of the accessor
data.Init(tools.PEER, peerID, groups, caller) // Initialize the accessor with the PEER model type
data := New(tools.PEER, peerID, groups, caller) // Create a new instance of the accessor
return data
}

View File

@@ -56,7 +56,7 @@ func (p *PeerCache) urlFormat(url string, dt tools.DataType) string {
// checkPeerStatus checks the status of a peer
func (p *PeerCache) checkPeerStatus(peerID string, appName string, caller *tools.HTTPCaller) (*Peer, bool) {
api := tools.API{}
access := New()
access := NewShallow()
res, code, _ := access.LoadOne(peerID) // Load the peer from db
if code != 200 { // no peer no party
return nil, false
@@ -101,7 +101,7 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
DataID: dataID,
}
mypeer.AddExecution(*pexec)
New().UpdateOne(mypeer, peerID) // Update the peer in the db
NewShallow().UpdateOne(mypeer, peerID) // Update the peer in the db
return nil, errors.New("peer is not reachable")
} else {
if mypeer == nil {
@@ -111,7 +111,7 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
url = p.urlFormat((mypeer.Url)+meth, dt) // Format the URL
tmp := mypeer.FailedExecution // Get the failed executions list
mypeer.FailedExecution = []PeerExecution{} // Reset the failed executions list
New().UpdateOne(mypeer, peerID) // Update the peer in the db
NewShallow().UpdateOne(mypeer, peerID) // Update the peer in the db
for _, v := range tmp { // Retry the failed executions
go p.exec(v.Url, tools.ToMethod(v.Method), v.Body, caller)
}

View File

@@ -5,7 +5,9 @@ import (
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/dbs/mongo"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
type peerMongoAccessor struct {
@@ -13,8 +15,25 @@ type peerMongoAccessor struct {
}
// New creates a new instance of the peerMongoAccessor
func New() *peerMongoAccessor {
return &peerMongoAccessor{}
func NewShallow() *peerMongoAccessor {
return &peerMongoAccessor{
utils.AbstractAccessor{
Logger: logs.CreateLogger(tools.PEER.String()), // Create a logger with the data type
Type: tools.PEER.String(),
},
}
}
func New(t tools.DataType, peerID string, groups []string, caller *tools.HTTPCaller) *peerMongoAccessor {
return &peerMongoAccessor{
utils.AbstractAccessor{
Logger: logs.CreateLogger(t.String()), // Create a logger with the data type
Caller: caller,
PeerID: peerID,
Groups: groups, // Set the caller
Type: t.String(),
},
}
}
/*