diff --git a/models/workflow/workflow.go b/models/workflow/workflow.go index b5ca48d..54c5991 100644 --- a/models/workflow/workflow.go +++ b/models/workflow/workflow.go @@ -1,4 +1,4 @@ -package oclib +package workflow import ( "encoding/json" diff --git a/models/workflow/workflow_mongo_accessor.go b/models/workflow/workflow_mongo_accessor.go index 4a46fe2..a528ad7 100644 --- a/models/workflow/workflow_mongo_accessor.go +++ b/models/workflow/workflow_mongo_accessor.go @@ -1,4 +1,4 @@ -package oclib +package workflow import ( "errors" @@ -13,6 +13,7 @@ import ( "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/models/workspace" + "cloud.o-forge.io/core/oc-lib/models/workspace/shared/shallow_shared_workspace" "cloud.o-forge.io/core/oc-lib/tools" cron "github.com/robfig/cron/v3" ) @@ -86,6 +87,7 @@ func (wfa *workflowMongoAccessor) DeleteOne(id string) (utils.DBObject, int, err if res != nil && code == 200 { wfa.execute(res.(*Workflow), false) } + wfa.share(res.(*Workflow), true, wfa.Caller) return res, code, err } @@ -134,6 +136,35 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w return nil } +func (wfa *workflowMongoAccessor) share(realData *Workflow, delete bool, caller *tools.HTTPCaller) { + if realData.Shared == nil || len(realData.Shared) == 0 { + return + } + for _, sharedID := range realData.Shared { + access := (&shallow_shared_workspace.ShallowSharedWorkspace{}).GetAccessor(nil) + res, code, _ := access.LoadOne(sharedID) + if code != 200 { + continue + } + var err error + paccess := &peer.Peer{} + for _, p := range res.(*shallow_shared_workspace.ShallowSharedWorkspace).Peers { + paccess.UUID = p + if paccess.IsMySelf() { + continue + } + if delete { + _, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKFLOW, tools.DELETE, map[string]interface{}{}, caller) + } else { + _, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKFLOW, tools.PUT, res.Serialize(), caller) + } + } + if err != nil { + wfa.Logger.Error().Msg(err.Error()) + } + } +} + func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delete bool) (int, error) { var err error nats := tools.NewNATSCaller() @@ -200,16 +231,11 @@ func (wfa *workflowMongoAccessor) UpdateOne(set utils.DBObject, id string) (util } } wfa.execute(res.(*Workflow), false) + wfa.share(res.(*Workflow), false, wfa.Caller) return res, code, err } func (wfa *workflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { - /*new := data.(*Workflow) - for _, i := range new.Graph.Items { - if i.Datacenter == nil && i.Processing == nil && i.Storage == nil && i.Workflow == nil && i.Data == nil { - return nil, 422, errors.New("graph item should have at least one resource data is corrupted") - } - }*/ res, code, err := wfa.GenericStoreOne(data, wfa) if err != nil { return nil, code, err diff --git a/models/workflow/workflow_schedule.go b/models/workflow/workflow_schedule.go index 333f71f..39861ac 100644 --- a/models/workflow/workflow_schedule.go +++ b/models/workflow/workflow_schedule.go @@ -1,4 +1,4 @@ -package oclib +package workflow import "time" diff --git a/models/workflow/workflow_test.go b/models/workflow/workflow_test.go index fb81bab..8a3320a 100644 --- a/models/workflow/workflow_test.go +++ b/models/workflow/workflow_test.go @@ -1,4 +1,4 @@ -package oclib +package workflow import ( "testing" diff --git a/models/workspace/shared/shallow_shared_workspace/shallow_shared_workspace.go b/models/workspace/shared/shallow_shared_workspace/shallow_shared_workspace.go new file mode 100644 index 0000000..f6a4085 --- /dev/null +++ b/models/workspace/shared/shallow_shared_workspace/shallow_shared_workspace.go @@ -0,0 +1,61 @@ +package shallow_shared_workspace + +import ( + "encoding/json" + + "cloud.o-forge.io/core/oc-lib/models/utils" + "cloud.o-forge.io/core/oc-lib/tools" + "github.com/google/uuid" +) + +type ShallowSharedWorkspace struct { + utils.AbstractObject + IsSent bool `json:"is_sent" bson:"-"` + CreatorID string `json:"peer_id,omitempty" bson:"peer_id,omitempty" validate:"required"` + Version string `json:"version,omitempty" bson:"version,omitempty"` + Description string `json:"description,omitempty" bson:"description,omitempty" validate:"required"` + Attributes map[string]interface{} `json:"attributes,omitempty" bson:"attributes,omitempty"` + Workspaces []string `json:"workspaces,omitempty" bson:"workspaces,omitempty"` + Workflows []string `json:"workflows,omitempty" bson:"workflows,omitempty"` + Peers []string `json:"peers,omitempty" bson:"peers,omitempty"` + Rules []string `json:"rules,omitempty" bson:"rules,omitempty"` +} + +func (ao *ShallowSharedWorkspace) GetID() string { + return ao.UUID +} + +func (r *ShallowSharedWorkspace) GenerateID() { + if r.UUID == "" { + r.UUID = uuid.New().String() + } +} + +func (d *ShallowSharedWorkspace) GetName() string { + return d.Name +} + +func (d *ShallowSharedWorkspace) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { + data := New() + data.Init(utils.SHARED_WORKSPACE, caller) + return data +} + +func (dma *ShallowSharedWorkspace) Deserialize(j map[string]interface{}) utils.DBObject { + b, err := json.Marshal(j) + if err != nil { + return nil + } + json.Unmarshal(b, dma) + return dma +} + +func (dma *ShallowSharedWorkspace) Serialize() map[string]interface{} { + var m map[string]interface{} + b, err := json.Marshal(dma) + if err != nil { + return nil + } + json.Unmarshal(b, &m) + return m +} diff --git a/models/workspace/shared/shallow_shared_workspace/shallow_shared_workspace_mongo_accessor.go b/models/workspace/shared/shallow_shared_workspace/shallow_shared_workspace_mongo_accessor.go new file mode 100644 index 0000000..cf78151 --- /dev/null +++ b/models/workspace/shared/shallow_shared_workspace/shallow_shared_workspace_mongo_accessor.go @@ -0,0 +1,83 @@ +package shallow_shared_workspace + +import ( + "cloud.o-forge.io/core/oc-lib/dbs" + "cloud.o-forge.io/core/oc-lib/dbs/mongo" + "cloud.o-forge.io/core/oc-lib/models/utils" +) + +type shallowSharedWorkspaceMongoAccessor struct { + utils.AbstractAccessor +} + +func New() *shallowSharedWorkspaceMongoAccessor { + return &shallowSharedWorkspaceMongoAccessor{} +} + +func (wfa *shallowSharedWorkspaceMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { + return wfa.GenericDeleteOne(id, wfa) +} + +func (wfa *shallowSharedWorkspaceMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { + return wfa.GenericUpdateOne(set.(*ShallowSharedWorkspace), id, wfa, &ShallowSharedWorkspace{}) +} + +func (wfa *shallowSharedWorkspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { + return wfa.GenericStoreOne(data.(*ShallowSharedWorkspace), wfa) +} + +func (wfa *shallowSharedWorkspaceMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) { + return wfa.StoreOne(data) +} + +func (wfa *shallowSharedWorkspaceMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { + var sharedWorkspace ShallowSharedWorkspace + res_mongo, code, err := mongo.MONGOService.LoadOne(id, wfa.GetType()) + if err != nil { + wfa.Logger.Error().Msg("Could not retrieve " + id + " from db. Error: " + err.Error()) + return nil, code, err + } + res_mongo.Decode(&sharedWorkspace) + return &sharedWorkspace, 200, nil +} + +func (wfa shallowSharedWorkspaceMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) { + objs := []utils.ShallowDBObject{} + res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType()) + if err != nil { + wfa.Logger.Error().Msg("Could not retrieve any from db. Error: " + err.Error()) + return nil, code, err + } + var results []ShallowSharedWorkspace + if err = res_mongo.All(mongo.MngoCtx, &results); err != nil { + return nil, 404, err + } + for _, r := range results { + objs = append(objs, &r) + } + return objs, 200, nil +} + +func (wfa *shallowSharedWorkspaceMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) { + objs := []utils.ShallowDBObject{} + if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" { + filters = &dbs.Filters{ + Or: map[string][]dbs.Filter{ + "abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, + }, + } + } + res_mongo, code, err := mongo.MONGOService.Search(filters, wfa.GetType()) + if err != nil { + wfa.Logger.Error().Msg("Could not store to db. Error: " + err.Error()) + return nil, code, err + } + var results []ShallowSharedWorkspace + if err = res_mongo.All(mongo.MngoCtx, &results); err != nil { + return nil, 404, err + } + for _, r := range results { + objs = append(objs, &r) + } + return objs, 200, nil +} diff --git a/models/workspace/workspace_mongo_accessor.go b/models/workspace/workspace_mongo_accessor.go index 8dcd4ba..251ef18 100644 --- a/models/workspace/workspace_mongo_accessor.go +++ b/models/workspace/workspace_mongo_accessor.go @@ -5,12 +5,15 @@ import ( "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/dbs/mongo" + "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resources/data" "cloud.o-forge.io/core/oc-lib/models/resources/datacenter" "cloud.o-forge.io/core/oc-lib/models/resources/processing" "cloud.o-forge.io/core/oc-lib/models/resources/storage" w "cloud.o-forge.io/core/oc-lib/models/resources/workflow" "cloud.o-forge.io/core/oc-lib/models/utils" + "cloud.o-forge.io/core/oc-lib/models/workspace/shared/shallow_shared_workspace" + "cloud.o-forge.io/core/oc-lib/tools" ) type workspaceMongoAccessor struct { @@ -22,7 +25,9 @@ func New() *workspaceMongoAccessor { } func (wfa *workspaceMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { - return wfa.GenericDeleteOne(id, wfa) + res, code, err := wfa.GenericDeleteOne(id, wfa) + wfa.share(res.(*Workspace), true, wfa.Caller) + return res, code, err } func (wfa *workspaceMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { @@ -43,7 +48,9 @@ func (wfa *workspaceMongoAccessor) UpdateOne(set utils.DBObject, id string) (uti } } } - return wfa.GenericUpdateOne(set, id, wfa, &Workspace{}) + res, code, err := wfa.GenericUpdateOne(set, id, wfa, &Workspace{}) + wfa.share(res.(*Workspace), false, wfa.Caller) + return res, code, err } func (wfa *workspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { @@ -170,3 +177,30 @@ func (wfa *workspaceMongoAccessor) Search(filters *dbs.Filters, search string) ( } return objs, 200, nil } + +func (wfa *workspaceMongoAccessor) share(realData *Workspace, delete bool, caller *tools.HTTPCaller) { + if realData.Shared == "" { + return + } + access := (&shallow_shared_workspace.ShallowSharedWorkspace{}).GetAccessor(nil) + res, code, _ := access.LoadOne(realData.Shared) + if code != 200 { + return + } + var err error + paccess := &peer.Peer{} + for _, p := range res.(*shallow_shared_workspace.ShallowSharedWorkspace).Peers { + paccess.UUID = p + if paccess.IsMySelf() { + continue + } + if delete { + _, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKSPACE, tools.DELETE, map[string]interface{}{}, caller) + } else { + _, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKSPACE, tools.PUT, res.Serialize(), caller) + } + } + if err != nil { + wfa.Logger.Error().Msg(err.Error()) + } +}