shared peer conn

This commit is contained in:
mr
2024-08-13 09:49:42 +02:00
parent 6fe862a9b5
commit e71bd3544f
10 changed files with 237 additions and 24 deletions

View File

@@ -24,7 +24,9 @@ func (ao *SharedWorkspace) GetID() string {
}
func (r *SharedWorkspace) GenerateID() {
r.UUID = uuid.New().String()
if r.UUID == "" {
r.UUID = uuid.New().String()
}
}
func (d *SharedWorkspace) GetName() string {

View File

@@ -1,13 +1,17 @@
package shared_workspace
import (
"encoding/json"
"slices"
"strings"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/dbs/mongo"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/utils"
w "cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workspace"
"cloud.o-forge.io/core/oc-lib/tools"
)
type sharedWorkspaceMongoAccessor struct {
@@ -26,18 +30,63 @@ func (wfa *sharedWorkspaceMongoAccessor) DeleteOne(id string) (utils.DBObject, i
func (wfa *sharedWorkspaceMongoAccessor) sharedWorkspace(shared *SharedWorkspace, id string) {
eldest, code, _ := wfa.LoadOne(id)
accessor := (&workspace.Workspace{}).GetAccessor(nil)
if code == 200 {
eld := eldest.(*SharedWorkspace)
accessor := (&workspace.Workspace{}).GetAccessor(nil)
if eld.Workspaces != nil {
for _, v := range eld.Workspaces {
accessor.UpdateOne(&workspace.Workspace{Shared: false}, v)
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKSPACE.String()] == nil {
continue
}
methods := wfa.Caller.URLS[utils.WORKSPACE.String()]
if _, ok := methods[tools.DELETE]; !ok {
continue
}
for _, p := range shared.Peers {
pp, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(p)
if code == 200 {
resp, err := wfa.Caller.CallDelete(pp.(*peer.Peer).Url, strings.ReplaceAll(methods[tools.DELETE], ":id", v))
if err != nil {
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error())
}
var r map[string]interface{}
json.Unmarshal(resp, &r)
if e, ok := r["error"]; ok && e != "" {
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error" + e.(string))
}
}
}
}
}
}
if shared.Workspaces != nil {
for _, v := range shared.Workspaces {
wfa.UpdateOne(&workspace.Workspace{Shared: false}, v)
workspace, code, _ := accessor.UpdateOne(&workspace.Workspace{Shared: true}, v)
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKSPACE.String()] == nil {
continue
}
methods := wfa.Caller.URLS[utils.WORKSPACE.String()]
if _, ok := methods[tools.POST]; !ok {
continue
}
for _, p := range shared.Peers {
if code != 200 {
continue
}
pp, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(p)
if code == 200 {
resp, err := wfa.Caller.CallPost(pp.(*peer.Peer).Url, methods[tools.POST], workspace.Serialize())
if err != nil {
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error())
}
var r map[string]interface{}
json.Unmarshal(resp, &r)
if e, ok := r["error"]; ok && e != "" {
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error" + e.(string))
}
}
}
}
}
}
@@ -61,6 +110,27 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace,
n := &w.Workflow{}
n.Shared = new
accessor.UpdateOne(n, v)
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKFLOW.String()] == nil {
continue
}
methods := wfa.Caller.URLS[utils.WORKFLOW.String()]
if _, ok := methods[tools.DELETE]; !ok {
continue
}
for _, p := range shared.Peers {
pp, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(p)
if code == 200 {
resp, err := wfa.Caller.CallDelete(pp.(*peer.Peer).Url, strings.ReplaceAll(methods[tools.DELETE], ":id", v))
if err != nil {
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error())
}
var r map[string]interface{}
json.Unmarshal(resp, &r)
if e, ok := r["error"]; ok && e != "" {
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error" + e.(string))
}
}
}
}
}
}
@@ -72,24 +142,102 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace,
s := data.(*w.Workflow)
if !slices.Contains(s.Shared, id) {
s.Shared = append(s.Shared, id)
accessor.UpdateOne(s, v)
workflow, code, _ := accessor.UpdateOne(s, v)
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKFLOW.String()] == nil {
continue
}
methods := wfa.Caller.URLS[utils.WORKFLOW.String()]
if _, ok := methods[tools.POST]; !ok {
continue
}
for _, p := range shared.Peers {
if code != 200 {
continue
}
pp, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(p)
if code == 200 {
resp, err := wfa.Caller.CallPost(pp.(*peer.Peer).Url, methods[tools.POST], workflow.Serialize())
if err != nil {
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error())
}
var r map[string]interface{}
json.Unmarshal(resp, &r)
if e, ok := r["error"]; ok && e != "" {
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error" + e.(string))
}
}
}
}
}
}
}
}
func (wfa *sharedWorkspaceMongoAccessor) deleteToPeer(shared *SharedWorkspace) {
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()] == nil {
return
}
methods := wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()]
if _, ok := methods[tools.DELETE]; !ok {
return
}
for _, v := range shared.Peers {
p, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(v)
if code != 200 {
continue
}
resp, err := wfa.Caller.CallDelete(p.(*peer.Peer).Url, strings.ReplaceAll(methods[tools.DELETE], ":id", shared.GetID()))
if err != nil {
wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error: " + err.Error())
}
var r map[string]interface{}
json.Unmarshal(resp, &r)
if e, ok := r["error"]; ok && e != "" {
wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error" + e.(string))
}
}
}
func (wfa *sharedWorkspaceMongoAccessor) sendToPeer(shared *SharedWorkspace) {
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()] == nil {
return
}
methods := wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()]
if _, ok := methods[tools.POST]; !ok {
return
}
for _, v := range shared.Peers {
p, code, _ := (&peer.Peer{}).GetAccessor(nil).LoadOne(v)
if code != 200 {
continue
}
resp, err := wfa.Caller.CallPost(p.(*peer.Peer).Url, methods[tools.POST], shared.Serialize())
if err != nil {
wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error: " + err.Error())
}
var r map[string]interface{}
json.Unmarshal(resp, &r)
if e, ok := r["error"]; ok && e != "" {
wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error" + e.(string))
}
}
}
func (wfa *sharedWorkspaceMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
wfa.deleteToPeer(set.(*SharedWorkspace))
wfa.sharedWorkflow(set.(*SharedWorkspace), id)
wfa.sharedWorkspace(set.(*SharedWorkspace), id)
wfa.sendToPeer(set.(*SharedWorkspace))
return wfa.GenericUpdateOne(set.(*SharedWorkspace), id, wfa, &SharedWorkspace{})
}
func (wfa *sharedWorkspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
data, code, err := wfa.GenericStoreOne(data.(*SharedWorkspace), wfa)
wfa.deleteToPeer(data.(*SharedWorkspace))
d, code, err := wfa.GenericStoreOne(data.(*SharedWorkspace), wfa)
if code == 200 {
wfa.sharedWorkflow(data.(*SharedWorkspace), data.GetID())
wfa.sharedWorkspace(data.(*SharedWorkspace), data.GetID())
wfa.sharedWorkflow(d.(*SharedWorkspace), d.GetID())
wfa.sharedWorkspace(d.(*SharedWorkspace), d.GetID())
wfa.sendToPeer(d.(*SharedWorkspace))
}
return data, code, err
}

View File

@@ -22,7 +22,9 @@ func (ao *Workspace) GetID() string {
}
func (r *Workspace) GenerateID() {
r.UUID = uuid.New().String()
if r.UUID == "" {
r.UUID = uuid.New().String()
}
}
func (d *Workspace) GetName() string {