From 2d9b4587ac957af643719fc038aaa552140e7148 Mon Sep 17 00:00:00 2001 From: mr Date: Tue, 13 Aug 2024 14:33:26 +0200 Subject: [PATCH] peers logic --- models/peer/peer.go | 10 ++ models/peer/peer_cache.go | 105 ++++++++++++++++ models/workflow/workflow.go | 7 +- models/workflow/workflow_mongo_accessor.go | 40 +++--- .../workflow_execution/workflow_execution.go | 1 + .../shared/shared_workspace_mongo_accessor.go | 116 ++++++------------ 6 files changed, 180 insertions(+), 99 deletions(-) create mode 100644 models/peer/peer_cache.go diff --git a/models/peer/peer.go b/models/peer/peer.go index 8cff1cc..29c5cfa 100644 --- a/models/peer/peer.go +++ b/models/peer/peer.go @@ -4,6 +4,7 @@ import ( "encoding/json" "cloud.o-forge.io/core/oc-lib/models/utils" + "cloud.o-forge.io/core/oc-lib/static" "cloud.o-forge.io/core/oc-lib/tools" "github.com/google/uuid" ) @@ -14,6 +15,15 @@ type Peer struct { PublicKey string `json:"public_key,omitempty" bson:"public_key,omitempty"` } +func (ao *Peer) IsMySelf() bool { + id, _ := static.GetMyLocalJsonPeer() + return ao.UUID == id +} + +func (p *Peer) LaunchPeerExecution(peerID string, dataID string, url string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) { + return (&PeerCache{}).LaunchPeerExecution(peerID, dataID, url, dt, method, body, caller) +} + func (ao *Peer) GetID() string { return ao.UUID } diff --git a/models/peer/peer_cache.go b/models/peer/peer_cache.go new file mode 100644 index 0000000..69384f9 --- /dev/null +++ b/models/peer/peer_cache.go @@ -0,0 +1,105 @@ +package peer + +import ( + "encoding/json" + "errors" + "fmt" + "strings" + "time" + + "cloud.o-forge.io/core/oc-lib/logs" + "cloud.o-forge.io/core/oc-lib/models/utils" + "cloud.o-forge.io/core/oc-lib/tools" +) + +var currentRountine = 0 +var singleton = &PeerCache{ + Executions: []*PeerExecution{}, +} + +type PeerExecution struct { + Method tools.METHOD + Url string + Body map[string]interface{} + Caller tools.HTTPCaller + PeerID string + DataType utils.DataType + DataID string +} + +type PeerCache struct { + Executions []*PeerExecution +} + +func (p *PeerCache) checkPeerStatus(peerID string) bool { + return true +} + +func (p *PeerCache) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { + data := New() + data.Init(utils.PEER, caller) + return data +} + +func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string, url string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) { + var err error + b := []byte{} + methods := caller.URLS[dt.String()] + if _, ok := methods[method]; !ok { + return nil, errors.New("no path found") + } + if !p.checkPeerStatus(peerID) { + return nil, err + } + if method == tools.POST { + b, err = caller.CallPost(url, methods[method], body) + } + if method == tools.GET { + b, err = caller.CallGet(url, strings.ReplaceAll(methods[method], ":id", dataID)) + } + if method == tools.DELETE { + b, err = caller.CallDelete(url, strings.ReplaceAll(methods[method], ":id", dataID)) + } + var m map[string]interface{} + json.Unmarshal(b, &m) + if err != nil { + pexec := &PeerExecution{ + Method: method, + Url: url + methods[method], + Body: body, + Caller: *caller, + PeerID: peerID, + DataType: dt, + DataID: dataID, + } + singleton.Executions = append(singleton.Executions, pexec) + if currentRountine == 0 { + currentRountine++ + go p.retryPeerExecution() + } + return pexec, err + } + if _, ok := m["error"]; !ok { + return nil, errors.New(fmt.Sprintf("%v", m["error"])) + } + return nil, err +} + +func (p *PeerCache) retryPeerExecution() { + execs := []*PeerExecution{} + for _, v := range singleton.Executions { + d, err := p.LaunchPeerExecution(v.PeerID, v.DataID, v.Url, v.DataType, v.Method, v.Body, &v.Caller) + if err == nil { + execs = append(execs, d) + } else { + logs.GetLogger().With().Err(err) + } + } + singleton.Executions = execs + if len(singleton.Executions) > 0 { + time.Sleep(60 * time.Second) + p.retryPeerExecution() + } else { + currentRountine = 0 + } +} diff --git a/models/workflow/workflow.go b/models/workflow/workflow.go index 7750048..ee7f7d1 100644 --- a/models/workflow/workflow.go +++ b/models/workflow/workflow.go @@ -18,9 +18,10 @@ import ( type AbstractWorkflow struct { resources.ResourceSet - Graph *graph.Graph `bson:"graph,omitempty" json:"graph,omitempty"` - Schedule *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"` - Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` + Graph *graph.Graph `bson:"graph,omitempty" json:"graph,omitempty"` + ScheduleActive bool `bson:"schedule_active,omitempty" json:"schedule_active,omitempty"` + Schedule *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"` + Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` } func (w *AbstractWorkflow) isDCLink(link graph.GraphLink) (bool, string) { diff --git a/models/workflow/workflow_mongo_accessor.go b/models/workflow/workflow_mongo_accessor.go index a42c2c9..ee576da 100644 --- a/models/workflow/workflow_mongo_accessor.go +++ b/models/workflow/workflow_mongo_accessor.go @@ -1,9 +1,7 @@ package oclib import ( - "encoding/json" "errors" - "fmt" "strings" "cloud.o-forge.io/core/oc-lib/dbs" @@ -80,6 +78,9 @@ func (wfa *workflowMongoAccessor) getExecutions(id string, data *Workflow) ([]*w } func (wfa *workflowMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { + wfa.execution(id, &Workflow{ + AbstractWorkflow: AbstractWorkflow{ScheduleActive: false}, + }, true) return wfa.GenericDeleteOne(id, wfa) } @@ -91,9 +92,6 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w if _, ok := methods[tools.POST]; !ok { return errors.New("no path found") } - if realData.Schedule == nil { - return nil - } res, _, _ := wfa.LoadOne(id) r := res.(*Workflow) g := r.Graph @@ -113,22 +111,20 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w if peerID == "" { continue } - p, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(peerID) + paccess := (&peer.Peer{}) + p, code, _ := paccess.GetAccessor(nil).LoadOne(peerID) if code != 200 { continue } - resp, err := wfa.Caller.CallPost(p.(*peer.Peer).Url, methods[tools.POST], (&workflow_execution.WorkflowExecutions{ - ResourceID: dc_id, - Executions: execs, - }).Serialize()) - if err != nil { + b, err := paccess.LaunchPeerExecution(p.GetID(), "", p.(*peer.Peer).Url, utils.BOOKING, tools.POST, + (&workflow_execution.WorkflowExecutions{ + WorkflowID: id, + ResourceID: dc_id, + Executions: execs, + }).Serialize(), wfa.Caller) + if err != nil && b == nil { return err } - var response map[string]interface{} - json.Unmarshal(resp, &response) - if code, ok := response["code"]; ok && code != 200 { - return errors.New(fmt.Sprintf("%v", response["error"])) - } } } } @@ -136,9 +132,17 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w } func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delete bool) (int, error) { - if realData.Schedule == nil { + if realData.Schedule == nil && realData.ScheduleActive { return 200, nil } + if realData.Schedule == nil && !realData.ScheduleActive { + mongo.MONGOService.DeleteMultiple(map[string]interface{}{ + "state": 1, + "workflow_id": id, + }, utils.WORKFLOW_EXECUTION.String()) + err := wfa.book(id, realData, []*workflow_execution.WorkflowExecution{}) + return 200, err + } res, _, _ := wfa.LoadOne(id) r := res.(*Workflow) if r.Schedule != nil && r.Schedule.Start == realData.Schedule.Start && r.Schedule.End == realData.Schedule.End && r.Schedule.Cron == realData.Schedule.Cron { @@ -159,7 +163,7 @@ func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delet "state": 1, }, utils.WORKFLOW_EXECUTION.String()) } - if err == nil && len(execs) > 0 { + if len(execs) > 0 { for _, obj := range execs { _, code, err := accessor.StoreOne(obj) if code != 200 { diff --git a/models/workflow_execution/workflow_execution.go b/models/workflow_execution/workflow_execution.go index 6183df3..f68cec9 100644 --- a/models/workflow_execution/workflow_execution.go +++ b/models/workflow_execution/workflow_execution.go @@ -40,6 +40,7 @@ func (d ScheduledType) EnumIndex() int { } type WorkflowExecutions struct { + WorkflowID string `json:"workflow_id" bson:"workflow_id"` ResourceID string `json:"resource_id" bson:"resource_id"` Executions []*WorkflowExecution `json:"executions" bson:"executions"` } diff --git a/models/workspace/shared/shared_workspace_mongo_accessor.go b/models/workspace/shared/shared_workspace_mongo_accessor.go index e2a0227..3560a46 100644 --- a/models/workspace/shared/shared_workspace_mongo_accessor.go +++ b/models/workspace/shared/shared_workspace_mongo_accessor.go @@ -1,9 +1,7 @@ package shared_workspace import ( - "encoding/json" "slices" - "strings" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/dbs/mongo" @@ -23,6 +21,10 @@ func New() *sharedWorkspaceMongoAccessor { } func (wfa *sharedWorkspaceMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { + set, code, _ := wfa.LoadOne(id) + if code == 200 { + wfa.deleteToPeer(set.(*SharedWorkspace)) + } wfa.sharedWorkflow(&SharedWorkspace{}, id) wfa.sharedWorkspace(&SharedWorkspace{}, id) return wfa.GenericDeleteOne(id, wfa) @@ -39,22 +41,14 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkspace(shared *SharedWorkspace if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKSPACE.String()] == nil { continue } - methods := wfa.Caller.URLS[utils.WORKSPACE.String()] - if _, ok := methods[tools.DELETE]; !ok { - continue - } + paccess := (&peer.Peer{}) for _, p := range shared.Peers { - pp, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(p) + pp, code, _ := paccess.GetAccessor(nil).LoadOne(p) if code == 200 { - resp, err := wfa.Caller.CallDelete(pp.(*peer.Peer).Url, strings.ReplaceAll(methods[tools.DELETE], ":id", v)) - if err != nil { + b, err := paccess.LaunchPeerExecution(p, v, pp.(*peer.Peer).Url, utils.WORKSPACE, tools.DELETE, nil, wfa.Caller) + if err != nil && b == nil { wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error()) } - var r map[string]interface{} - json.Unmarshal(resp, &r) - if e, ok := r["error"]; ok && e != "" { - wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error" + e.(string)) - } } } } @@ -66,25 +60,17 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkspace(shared *SharedWorkspace if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKSPACE.String()] == nil { continue } - methods := wfa.Caller.URLS[utils.WORKSPACE.String()] - if _, ok := methods[tools.POST]; !ok { - continue - } for _, p := range shared.Peers { if code != 200 { continue } - pp, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(p) + paccess := (&peer.Peer{}) + pp, code, _ := paccess.GetAccessor(nil).LoadOne(p) if code == 200 { - resp, err := wfa.Caller.CallPost(pp.(*peer.Peer).Url, methods[tools.POST], workspace.Serialize()) - if err != nil { + b, err := paccess.LaunchPeerExecution(p, v, pp.(*peer.Peer).Url, utils.WORKSPACE, tools.POST, workspace.Serialize(), wfa.Caller) + if err != nil && b == nil { wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error()) } - var r map[string]interface{} - json.Unmarshal(resp, &r) - if e, ok := r["error"]; ok && e != "" { - wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error" + e.(string)) - } } } } @@ -113,22 +99,15 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace, if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKFLOW.String()] == nil { continue } - methods := wfa.Caller.URLS[utils.WORKFLOW.String()] - if _, ok := methods[tools.DELETE]; !ok { - continue - } + paccess := (&peer.Peer{}) for _, p := range shared.Peers { - pp, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(p) - if code == 200 { - resp, err := wfa.Caller.CallDelete(pp.(*peer.Peer).Url, strings.ReplaceAll(methods[tools.DELETE], ":id", v)) - if err != nil { - wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error()) - } - var r map[string]interface{} - json.Unmarshal(resp, &r) - if e, ok := r["error"]; ok && e != "" { - wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error" + e.(string)) - } + pp, code, _ := paccess.GetAccessor(nil).LoadOne(p) + if code != 200 { + continue + } + b, err := paccess.LaunchPeerExecution(p, v, pp.(*peer.Peer).Url, utils.WORKFLOW, tools.DELETE, nil, wfa.Caller) + if err != nil && b == nil { + wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error()) } } } @@ -146,25 +125,17 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace, if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKFLOW.String()] == nil { continue } - methods := wfa.Caller.URLS[utils.WORKFLOW.String()] - if _, ok := methods[tools.POST]; !ok { - continue - } + paccess := (&peer.Peer{}) for _, p := range shared.Peers { if code != 200 { continue } - pp, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(p) + pp, code, _ := paccess.GetAccessor(nil).LoadOne(p) if code == 200 { - resp, err := wfa.Caller.CallPost(pp.(*peer.Peer).Url, methods[tools.POST], workflow.Serialize()) - if err != nil { + b, err := paccess.LaunchPeerExecution(p, shared.UUID, pp.(*peer.Peer).Url, utils.WORKFLOW, tools.POST, workflow.Serialize(), wfa.Caller) + if err != nil && b == nil { wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error()) } - var r map[string]interface{} - json.Unmarshal(resp, &r) - if e, ok := r["error"]; ok && e != "" { - wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error" + e.(string)) - } } } } @@ -177,24 +148,19 @@ func (wfa *sharedWorkspaceMongoAccessor) deleteToPeer(shared *SharedWorkspace) { if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()] == nil { return } - methods := wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()] - if _, ok := methods[tools.DELETE]; !ok { - return - } + paccess := (&peer.Peer{}) for _, v := range shared.Peers { - p, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(v) + if (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: v}}).IsMySelf() { + continue + } + p, code, _ := paccess.GetAccessor(nil).LoadOne(v) if code != 200 { continue } - resp, err := wfa.Caller.CallDelete(p.(*peer.Peer).Url, strings.ReplaceAll(methods[tools.DELETE], ":id", shared.GetID())) - if err != nil { + b, err := paccess.LaunchPeerExecution(p.GetID(), shared.UUID, p.(*peer.Peer).Url, utils.SHARED_WORKSPACE, tools.DELETE, nil, wfa.Caller) + if err != nil && b == nil { wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error: " + err.Error()) } - var r map[string]interface{} - json.Unmarshal(resp, &r) - if e, ok := r["error"]; ok && e != "" { - wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error" + e.(string)) - } } } @@ -202,24 +168,19 @@ func (wfa *sharedWorkspaceMongoAccessor) sendToPeer(shared *SharedWorkspace) { if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()] == nil { return } - methods := wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()] - if _, ok := methods[tools.POST]; !ok { - return - } + paccess := (&peer.Peer{}) for _, v := range shared.Peers { - p, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(v) + if (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: v}}).IsMySelf() { + continue + } + p, code, _ := paccess.GetAccessor(nil).LoadOne(v) if code != 200 { continue } - resp, err := wfa.Caller.CallPost(p.(*peer.Peer).Url, methods[tools.POST], shared.Serialize()) - if err != nil { + b, err := paccess.LaunchPeerExecution(p.GetID(), v, p.(*peer.Peer).Url, utils.SHARED_WORKSPACE, tools.POST, shared.Serialize(), wfa.Caller) + if err != nil && b == nil { wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error: " + err.Error()) } - var r map[string]interface{} - json.Unmarshal(resp, &r) - if e, ok := r["error"]; ok && e != "" { - wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error" + e.(string)) - } } } @@ -232,7 +193,6 @@ func (wfa *sharedWorkspaceMongoAccessor) UpdateOne(set utils.DBObject, id string } func (wfa *sharedWorkspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { - wfa.deleteToPeer(data.(*SharedWorkspace)) d, code, err := wfa.GenericStoreOne(data.(*SharedWorkspace), wfa) if code == 200 { wfa.sharedWorkflow(d.(*SharedWorkspace), d.GetID())