add shared logic on peer

This commit is contained in:
mr 2024-08-30 09:14:03 +02:00
parent 2e8246fb2f
commit db78c70dc3
7 changed files with 216 additions and 12 deletions

View File

@ -1,4 +1,4 @@
package oclib package workflow
import ( import (
"encoding/json" "encoding/json"

View File

@ -1,4 +1,4 @@
package oclib package workflow
import ( import (
"errors" "errors"
@ -13,6 +13,7 @@ import (
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/models/workspace" "cloud.o-forge.io/core/oc-lib/models/workspace"
"cloud.o-forge.io/core/oc-lib/models/workspace/shared/shallow_shared_workspace"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
cron "github.com/robfig/cron/v3" cron "github.com/robfig/cron/v3"
) )
@ -86,6 +87,7 @@ func (wfa *workflowMongoAccessor) DeleteOne(id string) (utils.DBObject, int, err
if res != nil && code == 200 { if res != nil && code == 200 {
wfa.execute(res.(*Workflow), false) wfa.execute(res.(*Workflow), false)
} }
wfa.share(res.(*Workflow), true, wfa.Caller)
return res, code, err return res, code, err
} }
@ -134,6 +136,35 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w
return nil return nil
} }
func (wfa *workflowMongoAccessor) share(realData *Workflow, delete bool, caller *tools.HTTPCaller) {
if realData.Shared == nil || len(realData.Shared) == 0 {
return
}
for _, sharedID := range realData.Shared {
access := (&shallow_shared_workspace.ShallowSharedWorkspace{}).GetAccessor(nil)
res, code, _ := access.LoadOne(sharedID)
if code != 200 {
continue
}
var err error
paccess := &peer.Peer{}
for _, p := range res.(*shallow_shared_workspace.ShallowSharedWorkspace).Peers {
paccess.UUID = p
if paccess.IsMySelf() {
continue
}
if delete {
_, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKFLOW, tools.DELETE, map[string]interface{}{}, caller)
} else {
_, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKFLOW, tools.PUT, res.Serialize(), caller)
}
}
if err != nil {
wfa.Logger.Error().Msg(err.Error())
}
}
}
func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delete bool) (int, error) { func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delete bool) (int, error) {
var err error var err error
nats := tools.NewNATSCaller() nats := tools.NewNATSCaller()
@ -200,16 +231,11 @@ func (wfa *workflowMongoAccessor) UpdateOne(set utils.DBObject, id string) (util
} }
} }
wfa.execute(res.(*Workflow), false) wfa.execute(res.(*Workflow), false)
wfa.share(res.(*Workflow), false, wfa.Caller)
return res, code, err return res, code, err
} }
func (wfa *workflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { func (wfa *workflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
/*new := data.(*Workflow)
for _, i := range new.Graph.Items {
if i.Datacenter == nil && i.Processing == nil && i.Storage == nil && i.Workflow == nil && i.Data == nil {
return nil, 422, errors.New("graph item should have at least one resource data is corrupted")
}
}*/
res, code, err := wfa.GenericStoreOne(data, wfa) res, code, err := wfa.GenericStoreOne(data, wfa)
if err != nil { if err != nil {
return nil, code, err return nil, code, err

View File

@ -1,4 +1,4 @@
package oclib package workflow
import "time" import "time"

View File

@ -1,4 +1,4 @@
package oclib package workflow
import ( import (
"testing" "testing"

View File

@ -0,0 +1,61 @@
package shallow_shared_workspace
import (
"encoding/json"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
)
type ShallowSharedWorkspace struct {
utils.AbstractObject
IsSent bool `json:"is_sent" bson:"-"`
CreatorID string `json:"peer_id,omitempty" bson:"peer_id,omitempty" validate:"required"`
Version string `json:"version,omitempty" bson:"version,omitempty"`
Description string `json:"description,omitempty" bson:"description,omitempty" validate:"required"`
Attributes map[string]interface{} `json:"attributes,omitempty" bson:"attributes,omitempty"`
Workspaces []string `json:"workspaces,omitempty" bson:"workspaces,omitempty"`
Workflows []string `json:"workflows,omitempty" bson:"workflows,omitempty"`
Peers []string `json:"peers,omitempty" bson:"peers,omitempty"`
Rules []string `json:"rules,omitempty" bson:"rules,omitempty"`
}
func (ao *ShallowSharedWorkspace) GetID() string {
return ao.UUID
}
func (r *ShallowSharedWorkspace) GenerateID() {
if r.UUID == "" {
r.UUID = uuid.New().String()
}
}
func (d *ShallowSharedWorkspace) GetName() string {
return d.Name
}
func (d *ShallowSharedWorkspace) GetAccessor(caller *tools.HTTPCaller) utils.Accessor {
data := New()
data.Init(utils.SHARED_WORKSPACE, caller)
return data
}
func (dma *ShallowSharedWorkspace) Deserialize(j map[string]interface{}) utils.DBObject {
b, err := json.Marshal(j)
if err != nil {
return nil
}
json.Unmarshal(b, dma)
return dma
}
func (dma *ShallowSharedWorkspace) Serialize() map[string]interface{} {
var m map[string]interface{}
b, err := json.Marshal(dma)
if err != nil {
return nil
}
json.Unmarshal(b, &m)
return m
}

View File

@ -0,0 +1,83 @@
package shallow_shared_workspace
import (
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/dbs/mongo"
"cloud.o-forge.io/core/oc-lib/models/utils"
)
type shallowSharedWorkspaceMongoAccessor struct {
utils.AbstractAccessor
}
func New() *shallowSharedWorkspaceMongoAccessor {
return &shallowSharedWorkspaceMongoAccessor{}
}
func (wfa *shallowSharedWorkspaceMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) {
return wfa.GenericDeleteOne(id, wfa)
}
func (wfa *shallowSharedWorkspaceMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
return wfa.GenericUpdateOne(set.(*ShallowSharedWorkspace), id, wfa, &ShallowSharedWorkspace{})
}
func (wfa *shallowSharedWorkspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
return wfa.GenericStoreOne(data.(*ShallowSharedWorkspace), wfa)
}
func (wfa *shallowSharedWorkspaceMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
return wfa.StoreOne(data)
}
func (wfa *shallowSharedWorkspaceMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
var sharedWorkspace ShallowSharedWorkspace
res_mongo, code, err := mongo.MONGOService.LoadOne(id, wfa.GetType())
if err != nil {
wfa.Logger.Error().Msg("Could not retrieve " + id + " from db. Error: " + err.Error())
return nil, code, err
}
res_mongo.Decode(&sharedWorkspace)
return &sharedWorkspace, 200, nil
}
func (wfa shallowSharedWorkspaceMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) {
objs := []utils.ShallowDBObject{}
res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType())
if err != nil {
wfa.Logger.Error().Msg("Could not retrieve any from db. Error: " + err.Error())
return nil, code, err
}
var results []ShallowSharedWorkspace
if err = res_mongo.All(mongo.MngoCtx, &results); err != nil {
return nil, 404, err
}
for _, r := range results {
objs = append(objs, &r)
}
return objs, 200, nil
}
func (wfa *shallowSharedWorkspaceMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) {
objs := []utils.ShallowDBObject{}
if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" {
filters = &dbs.Filters{
Or: map[string][]dbs.Filter{
"abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
},
}
}
res_mongo, code, err := mongo.MONGOService.Search(filters, wfa.GetType())
if err != nil {
wfa.Logger.Error().Msg("Could not store to db. Error: " + err.Error())
return nil, code, err
}
var results []ShallowSharedWorkspace
if err = res_mongo.All(mongo.MngoCtx, &results); err != nil {
return nil, 404, err
}
for _, r := range results {
objs = append(objs, &r)
}
return objs, 200, nil
}

View File

@ -5,12 +5,15 @@ import (
"cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/dbs/mongo" "cloud.o-forge.io/core/oc-lib/dbs/mongo"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources/data" "cloud.o-forge.io/core/oc-lib/models/resources/data"
"cloud.o-forge.io/core/oc-lib/models/resources/datacenter" "cloud.o-forge.io/core/oc-lib/models/resources/datacenter"
"cloud.o-forge.io/core/oc-lib/models/resources/processing" "cloud.o-forge.io/core/oc-lib/models/resources/processing"
"cloud.o-forge.io/core/oc-lib/models/resources/storage" "cloud.o-forge.io/core/oc-lib/models/resources/storage"
w "cloud.o-forge.io/core/oc-lib/models/resources/workflow" w "cloud.o-forge.io/core/oc-lib/models/resources/workflow"
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workspace/shared/shallow_shared_workspace"
"cloud.o-forge.io/core/oc-lib/tools"
) )
type workspaceMongoAccessor struct { type workspaceMongoAccessor struct {
@ -22,7 +25,9 @@ func New() *workspaceMongoAccessor {
} }
func (wfa *workspaceMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { func (wfa *workspaceMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) {
return wfa.GenericDeleteOne(id, wfa) res, code, err := wfa.GenericDeleteOne(id, wfa)
wfa.share(res.(*Workspace), true, wfa.Caller)
return res, code, err
} }
func (wfa *workspaceMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { func (wfa *workspaceMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
@ -43,7 +48,9 @@ func (wfa *workspaceMongoAccessor) UpdateOne(set utils.DBObject, id string) (uti
} }
} }
} }
return wfa.GenericUpdateOne(set, id, wfa, &Workspace{}) res, code, err := wfa.GenericUpdateOne(set, id, wfa, &Workspace{})
wfa.share(res.(*Workspace), false, wfa.Caller)
return res, code, err
} }
func (wfa *workspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { func (wfa *workspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
@ -170,3 +177,30 @@ func (wfa *workspaceMongoAccessor) Search(filters *dbs.Filters, search string) (
} }
return objs, 200, nil return objs, 200, nil
} }
func (wfa *workspaceMongoAccessor) share(realData *Workspace, delete bool, caller *tools.HTTPCaller) {
if realData.Shared == "" {
return
}
access := (&shallow_shared_workspace.ShallowSharedWorkspace{}).GetAccessor(nil)
res, code, _ := access.LoadOne(realData.Shared)
if code != 200 {
return
}
var err error
paccess := &peer.Peer{}
for _, p := range res.(*shallow_shared_workspace.ShallowSharedWorkspace).Peers {
paccess.UUID = p
if paccess.IsMySelf() {
continue
}
if delete {
_, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKSPACE, tools.DELETE, map[string]interface{}{}, caller)
} else {
_, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKSPACE, tools.PUT, res.Serialize(), caller)
}
}
if err != nil {
wfa.Logger.Error().Msg(err.Error())
}
}