From e71bd3544f807e73dfffd6aaec7eba8d710d44a9 Mon Sep 17 00:00:00 2001 From: mr Date: Tue, 13 Aug 2024 09:49:42 +0200 Subject: [PATCH] shared peer conn --- dbs/dbs.go | 2 - dbs/mongo/mongo.go | 5 + models/utils/abstracts.go | 4 +- models/workflow/workflow.go | 14 +- models/workflow/workflow_mongo_accessor.go | 19 ++- models/workspace/shared/shared_workspace.go | 4 +- .../shared/shared_workspace_mongo_accessor.go | 160 +++++++++++++++++- models/workspace/workspace.go | 4 +- static/peer_static.go | 21 +++ tools/remote_caller.go | 28 ++- 10 files changed, 237 insertions(+), 24 deletions(-) create mode 100644 static/peer_static.go diff --git a/dbs/dbs.go b/dbs/dbs.go index 279da8f..ce4fdef 100644 --- a/dbs/dbs.go +++ b/dbs/dbs.go @@ -88,7 +88,6 @@ func (m Operator) ToMongoEOperator(k string, value interface{}) bson.E { default: return defaultValue } - return defaultValue } func (m Operator) ToMongoOperator(k string, value interface{}) bson.M { @@ -142,7 +141,6 @@ func (m Operator) ToMongoOperator(k string, value interface{}) bson.M { default: return defaultValue } - return defaultValue } func StringToOperator(s string) Operator { diff --git a/dbs/mongo/mongo.go b/dbs/mongo/mongo.go index afe42e4..77ed4d7 100644 --- a/dbs/mongo/mongo.go +++ b/dbs/mongo/mongo.go @@ -7,6 +7,7 @@ import ( "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/logs" + "cloud.o-forge.io/core/oc-lib/static" "github.com/rs/zerolog" "go.mongodb.org/mongo-driver/bson" @@ -104,6 +105,10 @@ func (m *MongoDB) prepareDB(list_collection []string, config MongoConf) { new_collection := mngoDB.Collection(collection_name) if _, exists := collectionMap[collection_name]; !exists { m.createCollection(collection_name, new_collection) + if collection_name == "peer" { + id, p := static.GetMyLocalBsonPeer() + m.StoreOne(p, id, collection_name) + } } else { CollectionMap[collection_name] = new_collection } diff --git a/models/utils/abstracts.go b/models/utils/abstracts.go index 4f3ef67..b556cae 100644 --- a/models/utils/abstracts.go +++ b/models/utils/abstracts.go @@ -52,7 +52,9 @@ func (dma *AbstractObject) Serialize() map[string]interface{} { } func (r *AbstractObject) GenerateID() { - r.UUID = uuid.New().String() + if r.UUID == "" { + r.UUID = uuid.New().String() + } } type AbstractAccessor struct { diff --git a/models/workflow/workflow.go b/models/workflow/workflow.go index 6b06d78..7750048 100644 --- a/models/workflow/workflow.go +++ b/models/workflow/workflow.go @@ -8,6 +8,7 @@ import ( "strings" "time" + "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/resources/datacenter" "cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" @@ -55,11 +56,18 @@ func (wfa *Workflow) CheckBooking(subPath string) (bool, error) { continue } // CHECK BOOKING - url := dc.(*datacenter.DatacenterResource).SourceUrl - caller := tools.NewHTTPCaller("", "", "", "") + peerID := dc.(*datacenter.DatacenterResource).PeerID + if peerID == "" { + return false, errors.New("no peer id") + } + p, code, err := (&peer.Peer{}).GetAccessor(nil).LoadOne(peerID) + if code != 200 { + return false, err + } + caller := tools.NewHTTPCaller(map[string]map[tools.METHOD]string{}) subPath = strings.ReplaceAll(subPath, ":start_date", wfa.getFormat(wfa.Schedule.Start)) subPath = strings.ReplaceAll(subPath, ":end_date", wfa.getFormat(&e)) - resp, err := caller.CallGet(url, subPath) + resp, err := caller.CallGet(p.(*peer.Peer).Url, subPath) if err != nil { return false, err } diff --git a/models/workflow/workflow_mongo_accessor.go b/models/workflow/workflow_mongo_accessor.go index 215bdfc..8c9e835 100644 --- a/models/workflow/workflow_mongo_accessor.go +++ b/models/workflow/workflow_mongo_accessor.go @@ -8,11 +8,13 @@ import ( "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/dbs/mongo" + "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/resources/datacenter" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/models/workspace" + "cloud.o-forge.io/core/oc-lib/tools" cron "github.com/robfig/cron/v3" ) @@ -82,9 +84,13 @@ func (wfa *workflowMongoAccessor) DeleteOne(id string) (utils.DBObject, int, err } func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*workflow_execution.WorkflowExecution) error { - if wfa.Caller == nil { + if wfa.Caller == nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKFLOW_EXECUTION.String()] == nil { return errors.New("no caller defined") } + methods := wfa.Caller.URLS[utils.WORKFLOW_EXECUTION.String()] + if _, ok := methods[tools.POST]; !ok { + return errors.New("no path found") + } if realData.Schedule == nil { return nil } @@ -103,8 +109,15 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w continue } // CHECK BOOKING - url := dc.(*datacenter.DatacenterResource).SourceUrl - resp, err := wfa.Caller.CallPost(url, wfa.Caller.DestSubPath, (&workflow_execution.WorkflowExecutions{ + peerID := dc.(*datacenter.DatacenterResource).PeerID + if peerID == "" { + continue + } + p, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(peerID) + if code != 200 { + continue + } + resp, err := wfa.Caller.CallPost(p.(*peer.Peer).Url, methods[tools.POST], (&workflow_execution.WorkflowExecutions{ Executions: execs, }).Serialize()) if err != nil { diff --git a/models/workspace/shared/shared_workspace.go b/models/workspace/shared/shared_workspace.go index c9bfb42..9339afc 100644 --- a/models/workspace/shared/shared_workspace.go +++ b/models/workspace/shared/shared_workspace.go @@ -24,7 +24,9 @@ func (ao *SharedWorkspace) GetID() string { } func (r *SharedWorkspace) GenerateID() { - r.UUID = uuid.New().String() + if r.UUID == "" { + r.UUID = uuid.New().String() + } } func (d *SharedWorkspace) GetName() string { diff --git a/models/workspace/shared/shared_workspace_mongo_accessor.go b/models/workspace/shared/shared_workspace_mongo_accessor.go index 8d03edf..e2a0227 100644 --- a/models/workspace/shared/shared_workspace_mongo_accessor.go +++ b/models/workspace/shared/shared_workspace_mongo_accessor.go @@ -1,13 +1,17 @@ package shared_workspace import ( + "encoding/json" "slices" + "strings" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/dbs/mongo" + "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/utils" w "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workspace" + "cloud.o-forge.io/core/oc-lib/tools" ) type sharedWorkspaceMongoAccessor struct { @@ -26,18 +30,63 @@ func (wfa *sharedWorkspaceMongoAccessor) DeleteOne(id string) (utils.DBObject, i func (wfa *sharedWorkspaceMongoAccessor) sharedWorkspace(shared *SharedWorkspace, id string) { eldest, code, _ := wfa.LoadOne(id) + accessor := (&workspace.Workspace{}).GetAccessor(nil) if code == 200 { eld := eldest.(*SharedWorkspace) - accessor := (&workspace.Workspace{}).GetAccessor(nil) if eld.Workspaces != nil { for _, v := range eld.Workspaces { accessor.UpdateOne(&workspace.Workspace{Shared: false}, v) + if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKSPACE.String()] == nil { + continue + } + methods := wfa.Caller.URLS[utils.WORKSPACE.String()] + if _, ok := methods[tools.DELETE]; !ok { + continue + } + for _, p := range shared.Peers { + pp, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(p) + if code == 200 { + resp, err := wfa.Caller.CallDelete(pp.(*peer.Peer).Url, strings.ReplaceAll(methods[tools.DELETE], ":id", v)) + if err != nil { + wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error()) + } + var r map[string]interface{} + json.Unmarshal(resp, &r) + if e, ok := r["error"]; ok && e != "" { + wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error" + e.(string)) + } + } + } } } } if shared.Workspaces != nil { for _, v := range shared.Workspaces { - wfa.UpdateOne(&workspace.Workspace{Shared: false}, v) + workspace, code, _ := accessor.UpdateOne(&workspace.Workspace{Shared: true}, v) + if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKSPACE.String()] == nil { + continue + } + methods := wfa.Caller.URLS[utils.WORKSPACE.String()] + if _, ok := methods[tools.POST]; !ok { + continue + } + for _, p := range shared.Peers { + if code != 200 { + continue + } + pp, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(p) + if code == 200 { + resp, err := wfa.Caller.CallPost(pp.(*peer.Peer).Url, methods[tools.POST], workspace.Serialize()) + if err != nil { + wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error()) + } + var r map[string]interface{} + json.Unmarshal(resp, &r) + if e, ok := r["error"]; ok && e != "" { + wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error" + e.(string)) + } + } + } } } } @@ -61,6 +110,27 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace, n := &w.Workflow{} n.Shared = new accessor.UpdateOne(n, v) + if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKFLOW.String()] == nil { + continue + } + methods := wfa.Caller.URLS[utils.WORKFLOW.String()] + if _, ok := methods[tools.DELETE]; !ok { + continue + } + for _, p := range shared.Peers { + pp, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(p) + if code == 200 { + resp, err := wfa.Caller.CallDelete(pp.(*peer.Peer).Url, strings.ReplaceAll(methods[tools.DELETE], ":id", v)) + if err != nil { + wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error()) + } + var r map[string]interface{} + json.Unmarshal(resp, &r) + if e, ok := r["error"]; ok && e != "" { + wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error" + e.(string)) + } + } + } } } } @@ -72,24 +142,102 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace, s := data.(*w.Workflow) if !slices.Contains(s.Shared, id) { s.Shared = append(s.Shared, id) - accessor.UpdateOne(s, v) + workflow, code, _ := accessor.UpdateOne(s, v) + if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKFLOW.String()] == nil { + continue + } + methods := wfa.Caller.URLS[utils.WORKFLOW.String()] + if _, ok := methods[tools.POST]; !ok { + continue + } + for _, p := range shared.Peers { + if code != 200 { + continue + } + pp, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(p) + if code == 200 { + resp, err := wfa.Caller.CallPost(pp.(*peer.Peer).Url, methods[tools.POST], workflow.Serialize()) + if err != nil { + wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error()) + } + var r map[string]interface{} + json.Unmarshal(resp, &r) + if e, ok := r["error"]; ok && e != "" { + wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error" + e.(string)) + } + } + } } } } } } +func (wfa *sharedWorkspaceMongoAccessor) deleteToPeer(shared *SharedWorkspace) { + if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()] == nil { + return + } + methods := wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()] + if _, ok := methods[tools.DELETE]; !ok { + return + } + for _, v := range shared.Peers { + p, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(v) + if code != 200 { + continue + } + resp, err := wfa.Caller.CallDelete(p.(*peer.Peer).Url, strings.ReplaceAll(methods[tools.DELETE], ":id", shared.GetID())) + if err != nil { + wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error: " + err.Error()) + } + var r map[string]interface{} + json.Unmarshal(resp, &r) + if e, ok := r["error"]; ok && e != "" { + wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error" + e.(string)) + } + } +} + +func (wfa *sharedWorkspaceMongoAccessor) sendToPeer(shared *SharedWorkspace) { + if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()] == nil { + return + } + methods := wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()] + if _, ok := methods[tools.POST]; !ok { + return + } + for _, v := range shared.Peers { + p, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(v) + if code != 200 { + continue + } + resp, err := wfa.Caller.CallPost(p.(*peer.Peer).Url, methods[tools.POST], shared.Serialize()) + if err != nil { + wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error: " + err.Error()) + } + var r map[string]interface{} + json.Unmarshal(resp, &r) + if e, ok := r["error"]; ok && e != "" { + wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error" + e.(string)) + } + } +} + func (wfa *sharedWorkspaceMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { + wfa.deleteToPeer(set.(*SharedWorkspace)) wfa.sharedWorkflow(set.(*SharedWorkspace), id) wfa.sharedWorkspace(set.(*SharedWorkspace), id) + wfa.sendToPeer(set.(*SharedWorkspace)) return wfa.GenericUpdateOne(set.(*SharedWorkspace), id, wfa, &SharedWorkspace{}) } func (wfa *sharedWorkspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { - data, code, err := wfa.GenericStoreOne(data.(*SharedWorkspace), wfa) + wfa.deleteToPeer(data.(*SharedWorkspace)) + d, code, err := wfa.GenericStoreOne(data.(*SharedWorkspace), wfa) if code == 200 { - wfa.sharedWorkflow(data.(*SharedWorkspace), data.GetID()) - wfa.sharedWorkspace(data.(*SharedWorkspace), data.GetID()) + wfa.sharedWorkflow(d.(*SharedWorkspace), d.GetID()) + wfa.sharedWorkspace(d.(*SharedWorkspace), d.GetID()) + wfa.sendToPeer(d.(*SharedWorkspace)) } return data, code, err } diff --git a/models/workspace/workspace.go b/models/workspace/workspace.go index ede5cfb..b118a97 100644 --- a/models/workspace/workspace.go +++ b/models/workspace/workspace.go @@ -22,7 +22,9 @@ func (ao *Workspace) GetID() string { } func (r *Workspace) GenerateID() { - r.UUID = uuid.New().String() + if r.UUID == "" { + r.UUID = uuid.New().String() + } } func (d *Workspace) GetName() string { diff --git a/static/peer_static.go b/static/peer_static.go new file mode 100644 index 0000000..479f203 --- /dev/null +++ b/static/peer_static.go @@ -0,0 +1,21 @@ +package static + +func GetMyLocalBsonPeer() (string, map[string]interface{}) { + return "6fd0134c-fefc-427e-94c2-e01365fc5fb0", map[string]interface{}{ + "abstractobject": map[string]interface{}{ + "uuid": "6fd0134c-fefc-427e-94c2-e01365fc5fb0", + "name": "local_peer", + }, + "url": "http://localhost", + "public_key": "public_key_lulz", + } +} + +func GetMyLocalJsonPeer() (string, map[string]interface{}) { + return "6fd0134c-fefc-427e-94c2-e01365fc5fb0", map[string]interface{}{ + "uuid": "6fd0134c-fefc-427e-94c2-e01365fc5fb0", + "name": "local_peer", + "url": "http://localhost", + "public_key": "public_key_lulz", + } +} diff --git a/tools/remote_caller.go b/tools/remote_caller.go index e12184b..95856e9 100644 --- a/tools/remote_caller.go +++ b/tools/remote_caller.go @@ -7,19 +7,24 @@ import ( "net/http" ) +type METHOD int + +const ( + GET METHOD = iota + PUT + POST + DELETE +) + var HTTPCallerInstance = &HTTPCaller{} type HTTPCaller struct { - Origin string - OriginSubPath string - DestSubPath string + URLS map[string]map[METHOD]string } -func NewHTTPCaller(url string, origin string, originSubPath string, destSubPath string) *HTTPCaller { +func NewHTTPCaller(urls map[string]map[METHOD]string) *HTTPCaller { return &HTTPCaller{ - Origin: origin, - OriginSubPath: originSubPath, - DestSubPath: destSubPath, + URLS: urls, } } @@ -32,6 +37,15 @@ func (caller *HTTPCaller) CallGet(url string, subpath string) ([]byte, error) { return io.ReadAll(resp.Body) } +func (caller *HTTPCaller) CallDelete(url string, subpath string) ([]byte, error) { + resp, err := http.NewRequest("DELETE", url+subpath, nil) + if err != nil { + return nil, err + } + defer resp.Body.Close() + return io.ReadAll(resp.Body) +} + func (caller *HTTPCaller) CallPost(url string, subpath string, body map[string]interface{}) ([]byte, error) { postBody, _ := json.Marshal(body) responseBody := bytes.NewBuffer(postBody)