peers logic
This commit is contained in:
parent
4911e32ec2
commit
2d9b4587ac
@ -4,6 +4,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||||
|
"cloud.o-forge.io/core/oc-lib/static"
|
||||||
"cloud.o-forge.io/core/oc-lib/tools"
|
"cloud.o-forge.io/core/oc-lib/tools"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
@ -14,6 +15,15 @@ type Peer struct {
|
|||||||
PublicKey string `json:"public_key,omitempty" bson:"public_key,omitempty"`
|
PublicKey string `json:"public_key,omitempty" bson:"public_key,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ao *Peer) IsMySelf() bool {
|
||||||
|
id, _ := static.GetMyLocalJsonPeer()
|
||||||
|
return ao.UUID == id
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *Peer) LaunchPeerExecution(peerID string, dataID string, url string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
|
||||||
|
return (&PeerCache{}).LaunchPeerExecution(peerID, dataID, url, dt, method, body, caller)
|
||||||
|
}
|
||||||
|
|
||||||
func (ao *Peer) GetID() string {
|
func (ao *Peer) GetID() string {
|
||||||
return ao.UUID
|
return ao.UUID
|
||||||
}
|
}
|
||||||
|
105
models/peer/peer_cache.go
Normal file
105
models/peer/peer_cache.go
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
package peer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"cloud.o-forge.io/core/oc-lib/logs"
|
||||||
|
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||||
|
"cloud.o-forge.io/core/oc-lib/tools"
|
||||||
|
)
|
||||||
|
|
||||||
|
var currentRountine = 0
|
||||||
|
var singleton = &PeerCache{
|
||||||
|
Executions: []*PeerExecution{},
|
||||||
|
}
|
||||||
|
|
||||||
|
type PeerExecution struct {
|
||||||
|
Method tools.METHOD
|
||||||
|
Url string
|
||||||
|
Body map[string]interface{}
|
||||||
|
Caller tools.HTTPCaller
|
||||||
|
PeerID string
|
||||||
|
DataType utils.DataType
|
||||||
|
DataID string
|
||||||
|
}
|
||||||
|
|
||||||
|
type PeerCache struct {
|
||||||
|
Executions []*PeerExecution
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PeerCache) checkPeerStatus(peerID string) bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PeerCache) GetAccessor(caller *tools.HTTPCaller) utils.Accessor {
|
||||||
|
data := New()
|
||||||
|
data.Init(utils.PEER, caller)
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string, url string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
|
||||||
|
var err error
|
||||||
|
b := []byte{}
|
||||||
|
methods := caller.URLS[dt.String()]
|
||||||
|
if _, ok := methods[method]; !ok {
|
||||||
|
return nil, errors.New("no path found")
|
||||||
|
}
|
||||||
|
if !p.checkPeerStatus(peerID) {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if method == tools.POST {
|
||||||
|
b, err = caller.CallPost(url, methods[method], body)
|
||||||
|
}
|
||||||
|
if method == tools.GET {
|
||||||
|
b, err = caller.CallGet(url, strings.ReplaceAll(methods[method], ":id", dataID))
|
||||||
|
}
|
||||||
|
if method == tools.DELETE {
|
||||||
|
b, err = caller.CallDelete(url, strings.ReplaceAll(methods[method], ":id", dataID))
|
||||||
|
}
|
||||||
|
var m map[string]interface{}
|
||||||
|
json.Unmarshal(b, &m)
|
||||||
|
if err != nil {
|
||||||
|
pexec := &PeerExecution{
|
||||||
|
Method: method,
|
||||||
|
Url: url + methods[method],
|
||||||
|
Body: body,
|
||||||
|
Caller: *caller,
|
||||||
|
PeerID: peerID,
|
||||||
|
DataType: dt,
|
||||||
|
DataID: dataID,
|
||||||
|
}
|
||||||
|
singleton.Executions = append(singleton.Executions, pexec)
|
||||||
|
if currentRountine == 0 {
|
||||||
|
currentRountine++
|
||||||
|
go p.retryPeerExecution()
|
||||||
|
}
|
||||||
|
return pexec, err
|
||||||
|
}
|
||||||
|
if _, ok := m["error"]; !ok {
|
||||||
|
return nil, errors.New(fmt.Sprintf("%v", m["error"]))
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *PeerCache) retryPeerExecution() {
|
||||||
|
execs := []*PeerExecution{}
|
||||||
|
for _, v := range singleton.Executions {
|
||||||
|
d, err := p.LaunchPeerExecution(v.PeerID, v.DataID, v.Url, v.DataType, v.Method, v.Body, &v.Caller)
|
||||||
|
if err == nil {
|
||||||
|
execs = append(execs, d)
|
||||||
|
} else {
|
||||||
|
logs.GetLogger().With().Err(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
singleton.Executions = execs
|
||||||
|
if len(singleton.Executions) > 0 {
|
||||||
|
time.Sleep(60 * time.Second)
|
||||||
|
p.retryPeerExecution()
|
||||||
|
} else {
|
||||||
|
currentRountine = 0
|
||||||
|
}
|
||||||
|
}
|
@ -18,9 +18,10 @@ import (
|
|||||||
|
|
||||||
type AbstractWorkflow struct {
|
type AbstractWorkflow struct {
|
||||||
resources.ResourceSet
|
resources.ResourceSet
|
||||||
Graph *graph.Graph `bson:"graph,omitempty" json:"graph,omitempty"`
|
Graph *graph.Graph `bson:"graph,omitempty" json:"graph,omitempty"`
|
||||||
Schedule *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"`
|
ScheduleActive bool `bson:"schedule_active,omitempty" json:"schedule_active,omitempty"`
|
||||||
Shared []string `json:"shared,omitempty" bson:"shared,omitempty"`
|
Schedule *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"`
|
||||||
|
Shared []string `json:"shared,omitempty" bson:"shared,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *AbstractWorkflow) isDCLink(link graph.GraphLink) (bool, string) {
|
func (w *AbstractWorkflow) isDCLink(link graph.GraphLink) (bool, string) {
|
||||||
|
@ -1,9 +1,7 @@
|
|||||||
package oclib
|
package oclib
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"cloud.o-forge.io/core/oc-lib/dbs"
|
"cloud.o-forge.io/core/oc-lib/dbs"
|
||||||
@ -80,6 +78,9 @@ func (wfa *workflowMongoAccessor) getExecutions(id string, data *Workflow) ([]*w
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wfa *workflowMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) {
|
func (wfa *workflowMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) {
|
||||||
|
wfa.execution(id, &Workflow{
|
||||||
|
AbstractWorkflow: AbstractWorkflow{ScheduleActive: false},
|
||||||
|
}, true)
|
||||||
return wfa.GenericDeleteOne(id, wfa)
|
return wfa.GenericDeleteOne(id, wfa)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -91,9 +92,6 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w
|
|||||||
if _, ok := methods[tools.POST]; !ok {
|
if _, ok := methods[tools.POST]; !ok {
|
||||||
return errors.New("no path found")
|
return errors.New("no path found")
|
||||||
}
|
}
|
||||||
if realData.Schedule == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
res, _, _ := wfa.LoadOne(id)
|
res, _, _ := wfa.LoadOne(id)
|
||||||
r := res.(*Workflow)
|
r := res.(*Workflow)
|
||||||
g := r.Graph
|
g := r.Graph
|
||||||
@ -113,22 +111,20 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w
|
|||||||
if peerID == "" {
|
if peerID == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
p, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(peerID)
|
paccess := (&peer.Peer{})
|
||||||
|
p, code, _ := paccess.GetAccessor(nil).LoadOne(peerID)
|
||||||
if code != 200 {
|
if code != 200 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
resp, err := wfa.Caller.CallPost(p.(*peer.Peer).Url, methods[tools.POST], (&workflow_execution.WorkflowExecutions{
|
b, err := paccess.LaunchPeerExecution(p.GetID(), "", p.(*peer.Peer).Url, utils.BOOKING, tools.POST,
|
||||||
ResourceID: dc_id,
|
(&workflow_execution.WorkflowExecutions{
|
||||||
Executions: execs,
|
WorkflowID: id,
|
||||||
}).Serialize())
|
ResourceID: dc_id,
|
||||||
if err != nil {
|
Executions: execs,
|
||||||
|
}).Serialize(), wfa.Caller)
|
||||||
|
if err != nil && b == nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
var response map[string]interface{}
|
|
||||||
json.Unmarshal(resp, &response)
|
|
||||||
if code, ok := response["code"]; ok && code != 200 {
|
|
||||||
return errors.New(fmt.Sprintf("%v", response["error"]))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -136,9 +132,17 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delete bool) (int, error) {
|
func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delete bool) (int, error) {
|
||||||
if realData.Schedule == nil {
|
if realData.Schedule == nil && realData.ScheduleActive {
|
||||||
return 200, nil
|
return 200, nil
|
||||||
}
|
}
|
||||||
|
if realData.Schedule == nil && !realData.ScheduleActive {
|
||||||
|
mongo.MONGOService.DeleteMultiple(map[string]interface{}{
|
||||||
|
"state": 1,
|
||||||
|
"workflow_id": id,
|
||||||
|
}, utils.WORKFLOW_EXECUTION.String())
|
||||||
|
err := wfa.book(id, realData, []*workflow_execution.WorkflowExecution{})
|
||||||
|
return 200, err
|
||||||
|
}
|
||||||
res, _, _ := wfa.LoadOne(id)
|
res, _, _ := wfa.LoadOne(id)
|
||||||
r := res.(*Workflow)
|
r := res.(*Workflow)
|
||||||
if r.Schedule != nil && r.Schedule.Start == realData.Schedule.Start && r.Schedule.End == realData.Schedule.End && r.Schedule.Cron == realData.Schedule.Cron {
|
if r.Schedule != nil && r.Schedule.Start == realData.Schedule.Start && r.Schedule.End == realData.Schedule.End && r.Schedule.Cron == realData.Schedule.Cron {
|
||||||
@ -159,7 +163,7 @@ func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delet
|
|||||||
"state": 1,
|
"state": 1,
|
||||||
}, utils.WORKFLOW_EXECUTION.String())
|
}, utils.WORKFLOW_EXECUTION.String())
|
||||||
}
|
}
|
||||||
if err == nil && len(execs) > 0 {
|
if len(execs) > 0 {
|
||||||
for _, obj := range execs {
|
for _, obj := range execs {
|
||||||
_, code, err := accessor.StoreOne(obj)
|
_, code, err := accessor.StoreOne(obj)
|
||||||
if code != 200 {
|
if code != 200 {
|
||||||
|
@ -40,6 +40,7 @@ func (d ScheduledType) EnumIndex() int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type WorkflowExecutions struct {
|
type WorkflowExecutions struct {
|
||||||
|
WorkflowID string `json:"workflow_id" bson:"workflow_id"`
|
||||||
ResourceID string `json:"resource_id" bson:"resource_id"`
|
ResourceID string `json:"resource_id" bson:"resource_id"`
|
||||||
Executions []*WorkflowExecution `json:"executions" bson:"executions"`
|
Executions []*WorkflowExecution `json:"executions" bson:"executions"`
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,7 @@
|
|||||||
package shared_workspace
|
package shared_workspace
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"slices"
|
"slices"
|
||||||
"strings"
|
|
||||||
|
|
||||||
"cloud.o-forge.io/core/oc-lib/dbs"
|
"cloud.o-forge.io/core/oc-lib/dbs"
|
||||||
"cloud.o-forge.io/core/oc-lib/dbs/mongo"
|
"cloud.o-forge.io/core/oc-lib/dbs/mongo"
|
||||||
@ -23,6 +21,10 @@ func New() *sharedWorkspaceMongoAccessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wfa *sharedWorkspaceMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) {
|
func (wfa *sharedWorkspaceMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) {
|
||||||
|
set, code, _ := wfa.LoadOne(id)
|
||||||
|
if code == 200 {
|
||||||
|
wfa.deleteToPeer(set.(*SharedWorkspace))
|
||||||
|
}
|
||||||
wfa.sharedWorkflow(&SharedWorkspace{}, id)
|
wfa.sharedWorkflow(&SharedWorkspace{}, id)
|
||||||
wfa.sharedWorkspace(&SharedWorkspace{}, id)
|
wfa.sharedWorkspace(&SharedWorkspace{}, id)
|
||||||
return wfa.GenericDeleteOne(id, wfa)
|
return wfa.GenericDeleteOne(id, wfa)
|
||||||
@ -39,22 +41,14 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkspace(shared *SharedWorkspace
|
|||||||
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKSPACE.String()] == nil {
|
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKSPACE.String()] == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
methods := wfa.Caller.URLS[utils.WORKSPACE.String()]
|
paccess := (&peer.Peer{})
|
||||||
if _, ok := methods[tools.DELETE]; !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, p := range shared.Peers {
|
for _, p := range shared.Peers {
|
||||||
pp, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(p)
|
pp, code, _ := paccess.GetAccessor(nil).LoadOne(p)
|
||||||
if code == 200 {
|
if code == 200 {
|
||||||
resp, err := wfa.Caller.CallDelete(pp.(*peer.Peer).Url, strings.ReplaceAll(methods[tools.DELETE], ":id", v))
|
b, err := paccess.LaunchPeerExecution(p, v, pp.(*peer.Peer).Url, utils.WORKSPACE, tools.DELETE, nil, wfa.Caller)
|
||||||
if err != nil {
|
if err != nil && b == nil {
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error())
|
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error())
|
||||||
}
|
}
|
||||||
var r map[string]interface{}
|
|
||||||
json.Unmarshal(resp, &r)
|
|
||||||
if e, ok := r["error"]; ok && e != "" {
|
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error" + e.(string))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -66,25 +60,17 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkspace(shared *SharedWorkspace
|
|||||||
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKSPACE.String()] == nil {
|
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKSPACE.String()] == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
methods := wfa.Caller.URLS[utils.WORKSPACE.String()]
|
|
||||||
if _, ok := methods[tools.POST]; !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, p := range shared.Peers {
|
for _, p := range shared.Peers {
|
||||||
if code != 200 {
|
if code != 200 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
pp, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(p)
|
paccess := (&peer.Peer{})
|
||||||
|
pp, code, _ := paccess.GetAccessor(nil).LoadOne(p)
|
||||||
if code == 200 {
|
if code == 200 {
|
||||||
resp, err := wfa.Caller.CallPost(pp.(*peer.Peer).Url, methods[tools.POST], workspace.Serialize())
|
b, err := paccess.LaunchPeerExecution(p, v, pp.(*peer.Peer).Url, utils.WORKSPACE, tools.POST, workspace.Serialize(), wfa.Caller)
|
||||||
if err != nil {
|
if err != nil && b == nil {
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error())
|
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error())
|
||||||
}
|
}
|
||||||
var r map[string]interface{}
|
|
||||||
json.Unmarshal(resp, &r)
|
|
||||||
if e, ok := r["error"]; ok && e != "" {
|
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error" + e.(string))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -113,22 +99,15 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace,
|
|||||||
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKFLOW.String()] == nil {
|
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKFLOW.String()] == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
methods := wfa.Caller.URLS[utils.WORKFLOW.String()]
|
paccess := (&peer.Peer{})
|
||||||
if _, ok := methods[tools.DELETE]; !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, p := range shared.Peers {
|
for _, p := range shared.Peers {
|
||||||
pp, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(p)
|
pp, code, _ := paccess.GetAccessor(nil).LoadOne(p)
|
||||||
if code == 200 {
|
if code != 200 {
|
||||||
resp, err := wfa.Caller.CallDelete(pp.(*peer.Peer).Url, strings.ReplaceAll(methods[tools.DELETE], ":id", v))
|
continue
|
||||||
if err != nil {
|
}
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error())
|
b, err := paccess.LaunchPeerExecution(p, v, pp.(*peer.Peer).Url, utils.WORKFLOW, tools.DELETE, nil, wfa.Caller)
|
||||||
}
|
if err != nil && b == nil {
|
||||||
var r map[string]interface{}
|
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error())
|
||||||
json.Unmarshal(resp, &r)
|
|
||||||
if e, ok := r["error"]; ok && e != "" {
|
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error" + e.(string))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -146,25 +125,17 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace,
|
|||||||
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKFLOW.String()] == nil {
|
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKFLOW.String()] == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
methods := wfa.Caller.URLS[utils.WORKFLOW.String()]
|
paccess := (&peer.Peer{})
|
||||||
if _, ok := methods[tools.POST]; !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for _, p := range shared.Peers {
|
for _, p := range shared.Peers {
|
||||||
if code != 200 {
|
if code != 200 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
pp, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(p)
|
pp, code, _ := paccess.GetAccessor(nil).LoadOne(p)
|
||||||
if code == 200 {
|
if code == 200 {
|
||||||
resp, err := wfa.Caller.CallPost(pp.(*peer.Peer).Url, methods[tools.POST], workflow.Serialize())
|
b, err := paccess.LaunchPeerExecution(p, shared.UUID, pp.(*peer.Peer).Url, utils.WORKFLOW, tools.POST, workflow.Serialize(), wfa.Caller)
|
||||||
if err != nil {
|
if err != nil && b == nil {
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error())
|
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error())
|
||||||
}
|
}
|
||||||
var r map[string]interface{}
|
|
||||||
json.Unmarshal(resp, &r)
|
|
||||||
if e, ok := r["error"]; ok && e != "" {
|
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error" + e.(string))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -177,24 +148,19 @@ func (wfa *sharedWorkspaceMongoAccessor) deleteToPeer(shared *SharedWorkspace) {
|
|||||||
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()] == nil {
|
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()] == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
methods := wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()]
|
paccess := (&peer.Peer{})
|
||||||
if _, ok := methods[tools.DELETE]; !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, v := range shared.Peers {
|
for _, v := range shared.Peers {
|
||||||
p, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(v)
|
if (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: v}}).IsMySelf() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
p, code, _ := paccess.GetAccessor(nil).LoadOne(v)
|
||||||
if code != 200 {
|
if code != 200 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
resp, err := wfa.Caller.CallDelete(p.(*peer.Peer).Url, strings.ReplaceAll(methods[tools.DELETE], ":id", shared.GetID()))
|
b, err := paccess.LaunchPeerExecution(p.GetID(), shared.UUID, p.(*peer.Peer).Url, utils.SHARED_WORKSPACE, tools.DELETE, nil, wfa.Caller)
|
||||||
if err != nil {
|
if err != nil && b == nil {
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error: " + err.Error())
|
wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error: " + err.Error())
|
||||||
}
|
}
|
||||||
var r map[string]interface{}
|
|
||||||
json.Unmarshal(resp, &r)
|
|
||||||
if e, ok := r["error"]; ok && e != "" {
|
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error" + e.(string))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -202,24 +168,19 @@ func (wfa *sharedWorkspaceMongoAccessor) sendToPeer(shared *SharedWorkspace) {
|
|||||||
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()] == nil {
|
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()] == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
methods := wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()]
|
paccess := (&peer.Peer{})
|
||||||
if _, ok := methods[tools.POST]; !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, v := range shared.Peers {
|
for _, v := range shared.Peers {
|
||||||
p, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(v)
|
if (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: v}}).IsMySelf() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
p, code, _ := paccess.GetAccessor(nil).LoadOne(v)
|
||||||
if code != 200 {
|
if code != 200 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
resp, err := wfa.Caller.CallPost(p.(*peer.Peer).Url, methods[tools.POST], shared.Serialize())
|
b, err := paccess.LaunchPeerExecution(p.GetID(), v, p.(*peer.Peer).Url, utils.SHARED_WORKSPACE, tools.POST, shared.Serialize(), wfa.Caller)
|
||||||
if err != nil {
|
if err != nil && b == nil {
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error: " + err.Error())
|
wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error: " + err.Error())
|
||||||
}
|
}
|
||||||
var r map[string]interface{}
|
|
||||||
json.Unmarshal(resp, &r)
|
|
||||||
if e, ok := r["error"]; ok && e != "" {
|
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error" + e.(string))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -232,7 +193,6 @@ func (wfa *sharedWorkspaceMongoAccessor) UpdateOne(set utils.DBObject, id string
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wfa *sharedWorkspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
|
func (wfa *sharedWorkspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
|
||||||
wfa.deleteToPeer(data.(*SharedWorkspace))
|
|
||||||
d, code, err := wfa.GenericStoreOne(data.(*SharedWorkspace), wfa)
|
d, code, err := wfa.GenericStoreOne(data.(*SharedWorkspace), wfa)
|
||||||
if code == 200 {
|
if code == 200 {
|
||||||
wfa.sharedWorkflow(d.(*SharedWorkspace), d.GetID())
|
wfa.sharedWorkflow(d.(*SharedWorkspace), d.GetID())
|
||||||
|
Loading…
Reference in New Issue
Block a user