From 00f25b48c0ecf0ded691f2b96c8ac6059e9c9e36 Mon Sep 17 00:00:00 2001 From: mr Date: Tue, 23 Jul 2024 16:14:46 +0200 Subject: [PATCH] Execution --- dbs/mongo/mongo.go | 18 ++++ go.mod | 1 + go.sum | 2 + models/models.go | 2 + models/utils/enums.go | 1 + models/workflow/workflow_mongo_accessor.go | 58 ++++++++++++- models/workflow/workflow_schedule.go | 12 +-- .../workflow_execution/workflow_execution.go | 83 +++++++++++++++++++ .../workflow_execution_mongo_accessor.go | 52 ++++++++++++ 9 files changed, 222 insertions(+), 7 deletions(-) create mode 100644 models/workflow_execution/workflow_execution.go create mode 100644 models/workflow_execution/workflow_execution_mongo_accessor.go diff --git a/dbs/mongo/mongo.go b/dbs/mongo/mongo.go index ae6f7be..2f23ec6 100644 --- a/dbs/mongo/mongo.go +++ b/dbs/mongo/mongo.go @@ -146,6 +146,24 @@ func (m *MongoDB) DeleteOne(id string, collection_name string) (int64, int, erro return result.DeletedCount, 200, nil } +func (m *MongoDB) DeleteMultiple(f map[string]interface{}, collection_name string) (int64, int, error) { + filter := bson.D{} + for k, v := range f { + filter = append(filter, bson.E{Key: k, Value: v}) + } + targetDBCollection := CollectionMap[collection_name] + opts := options.Delete().SetHint(bson.D{{Key: "_id", Value: 1}}) + MngoCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + result, err := targetDBCollection.DeleteMany(MngoCtx, filter, opts) + if err != nil { + m.Logger.Error().Msg("Couldn't insert resource: " + err.Error()) + return 0, 404, err + } + return result.DeletedCount, 200, nil +} + func (m *MongoDB) UpdateOne(set interface{}, id string, collection_name string) (string, int, error) { var doc map[string]interface{} b, _ := bson.Marshal(set) diff --git a/go.mod b/go.mod index 73db29f..0ec749a 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/leodido/go-urn v1.4.0 // indirect github.com/montanaflynn/stats v0.7.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/vk496/cron v1.2.0 github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect diff --git a/go.sum b/go.sum index 76fad78..537fec1 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,8 @@ github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/vk496/cron v1.2.0 h1:fDxb4qNi6Rmxh3h9snW1sKJ0nHgjpg3fYc0Oq+igbvk= +github.com/vk496/cron v1.2.0/go.mod h1:f8lpm+SIXbjvujp8Dix4S2B+GGva/q0yrRPQ8hwTtOc= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= diff --git a/models/models.go b/models/models.go index 8ee20ad..dcfb12d 100644 --- a/models/models.go +++ b/models/models.go @@ -10,6 +10,7 @@ import ( w "cloud.o-forge.io/core/oc-lib/models/resources/workflow" "cloud.o-forge.io/core/oc-lib/models/utils" w2 "cloud.o-forge.io/core/oc-lib/models/workflow" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" ) var models = map[string]func() utils.DBObject{ @@ -19,6 +20,7 @@ var models = map[string]func() utils.DBObject{ utils.STORAGE_RESOURCE.String(): func() utils.DBObject { return &s.StorageResource{} }, utils.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &p.ProcessingResource{} }, utils.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} }, + utils.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecution{} }, } func Model(model int) utils.DBObject { diff --git a/models/utils/enums.go b/models/utils/enums.go index 34e18f9..cc6268c 100644 --- a/models/utils/enums.go +++ b/models/utils/enums.go @@ -10,6 +10,7 @@ const ( DATACENTER_RESOURCE WORKFLOW_RESOURCE WORKFLOW + WORKFLOW_EXECUTION ) var str = [...]string{ diff --git a/models/workflow/workflow_mongo_accessor.go b/models/workflow/workflow_mongo_accessor.go index ea5bd78..4cf4b43 100644 --- a/models/workflow/workflow_mongo_accessor.go +++ b/models/workflow/workflow_mongo_accessor.go @@ -1,8 +1,12 @@ package oclib import ( + "errors" + "cloud.o-forge.io/core/oc-lib/dbs/mongo" "cloud.o-forge.io/core/oc-lib/models/utils" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" + "github.com/vk496/cron" ) type WorkflowMongoAccessor struct { @@ -13,12 +17,64 @@ func (wfa *WorkflowMongoAccessor) DeleteOne(id string) (utils.DBObject, int, err return wfa.GenericDeleteOne(id, wfa) } +func (wfa *WorkflowMongoAccessor) execution(realData *Workflow, delete bool) (int, error) { + if delete { + mongo.MONGOService.DeleteMultiple(map[string]interface{}{ + "workflow_id": realData.UUID, + }, wfa.GetType()) + } + if realData.Schedule != nil { + accessor := workflow_execution.WorkflowExecutionMongoAccessor{} + if realData.Schedule.Start.IsZero() { + return 422, errors.New("should get a start date on the scheduler.") + } + if len(realData.Schedule.Cron) > 0 { + if realData.Schedule.End.IsZero() { + return 422, errors.New("a cron task should got a end date.") + } + c, err := cron.Parse(realData.Schedule.Cron) + if err != nil { + return 422, errors.New("Bad cron message: " + err.Error()) + } + for s := c.Next(realData.Schedule.Start); !s.IsZero() && s.Before(realData.Schedule.End); s = c.Next(s) { + obj := &workflow_execution.WorkflowExecution{ + ExecDate: s, + EndDate: realData.Schedule.End, + State: workflow_execution.SCHEDULED, + WorkflowID: realData.UUID, + } + accessor.StoreOne(obj) + } + + } else { + obj := &workflow_execution.WorkflowExecution{ + ExecDate: realData.Schedule.Start, + EndDate: realData.Schedule.End, + State: workflow_execution.SCHEDULED, + WorkflowID: realData.UUID, + } + accessor.StoreOne(obj) + } + } + return 200, nil +} + func (wfa *WorkflowMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { + if code, err := wfa.execution(set.(*Workflow), true); err != nil { + return nil, code, err + } return wfa.GenericUpdateOne(set, id, wfa) } func (wfa *WorkflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { - return wfa.GenericStoreOne(data, wfa) + res, code, err := wfa.GenericStoreOne(data, wfa) + if err != nil { + return nil, code, err + } + if code, err := wfa.execution(res.(*Workflow), false); err != nil { + return nil, code, err + } + return res, code, err } func (wfa *WorkflowMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) { diff --git a/models/workflow/workflow_schedule.go b/models/workflow/workflow_schedule.go index 7d5d5a1..83e88c4 100644 --- a/models/workflow/workflow_schedule.go +++ b/models/workflow/workflow_schedule.go @@ -3,13 +3,13 @@ package oclib import "time" type WorkflowSchedule struct { - Id string `json:"id"` - Start time.Time - End time.Time - Cron string + Id string `json:"id"` + Start time.Time `json:"start" bson:"start validate:"required""` + End time.Time `json:"end,omitempty" bson:"end,omitempty"` + Cron string `json:"cron,omitempty" bson:"cron,omitempty"` } -func (ws *WorkflowSchedule) GetAllDates() (timetable []time.Time){ +func (ws *WorkflowSchedule) GetAllDates() (timetable []time.Time) { // Return all the execution time generated by the Cron return -} \ No newline at end of file +} diff --git a/models/workflow_execution/workflow_execution.go b/models/workflow_execution/workflow_execution.go new file mode 100644 index 0000000..14211cc --- /dev/null +++ b/models/workflow_execution/workflow_execution.go @@ -0,0 +1,83 @@ +package workflow_execution + +import ( + "encoding/json" + "time" + + "cloud.o-forge.io/core/oc-lib/models/utils" + "github.com/google/uuid" +) + +type ScheduledType int + +const ( + SCHEDULED ScheduledType = iota + STARTED + FAILURE + SUCCESS +) + +var str = [...]string{ + "scheduled", + "started", + "failure", + "success", +} + +func FromInt(i int) string { + return str[i] +} + +func (d ScheduledType) String() string { + return str[d] +} + +// EnumIndex - Creating common behavior - give the type a EnumIndex functio +func (d ScheduledType) EnumIndex() int { + return int(d) +} + +type WorkflowExecution struct { + UUID string `json:"id,omitempty" bson:"id,omitempty" validate:"required"` + ExecDate time.Time `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` + EndDate time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` + State ScheduledType `json:"state,omitempty" bson:"state,omitempty" ` + WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` +} + +func (ao *WorkflowExecution) GetID() string { + return ao.UUID +} + +func (r *WorkflowExecution) GenerateID() { + r.UUID = uuid.New().String() +} + +func (d *WorkflowExecution) GetName() string { + return d.UUID + "_" + d.ExecDate.String() +} + +func (d *WorkflowExecution) GetAccessor() utils.Accessor { + data := &WorkflowExecutionMongoAccessor{} + data.SetLogger(utils.WORKFLOW) + return data +} + +func (dma *WorkflowExecution) Deserialize(j map[string]interface{}) utils.DBObject { + b, err := json.Marshal(j) + if err != nil { + return nil + } + json.Unmarshal(b, dma) + return dma +} + +func (dma *WorkflowExecution) Serialize() map[string]interface{} { + var m map[string]interface{} + b, err := json.Marshal(dma) + if err != nil { + return nil + } + json.Unmarshal(b, dma) + return m +} diff --git a/models/workflow_execution/workflow_execution_mongo_accessor.go b/models/workflow_execution/workflow_execution_mongo_accessor.go new file mode 100644 index 0000000..7edb8ee --- /dev/null +++ b/models/workflow_execution/workflow_execution_mongo_accessor.go @@ -0,0 +1,52 @@ +package workflow_execution + +import ( + "cloud.o-forge.io/core/oc-lib/dbs/mongo" + "cloud.o-forge.io/core/oc-lib/models/utils" +) + +type WorkflowExecutionMongoAccessor struct { + utils.AbstractAccessor +} + +func (wfa *WorkflowExecutionMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { + return wfa.GenericDeleteOne(id, wfa) +} + +func (wfa *WorkflowExecutionMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { + return wfa.GenericUpdateOne(set, id, wfa) +} + +func (wfa *WorkflowExecutionMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { + return wfa.GenericStoreOne(data, wfa) +} + +func (wfa *WorkflowExecutionMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) { + return wfa.GenericStoreOne(data, wfa) +} + +func (wfa *WorkflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { + var workflow WorkflowExecution + res_mongo, code, err := mongo.MONGOService.LoadOne(id, wfa.GetType()) + if err != nil { + wfa.Logger.Error().Msg("Could not retrieve " + id + " from db. Error: " + err.Error()) + return nil, code, err + } + res_mongo.Decode(&workflow) + return &workflow, 200, nil +} + +func (wfa WorkflowExecutionMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) { + objs := []utils.ShallowDBObject{} + res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType()) + if err != nil { + wfa.Logger.Error().Msg("Could not retrieve any from db. Error: " + err.Error()) + return nil, code, err + } + for res_mongo.Next(mongo.MngoCtx) { + var obj utils.AbstractObject + res_mongo.Decode(&obj) + objs = append(objs, &obj) + } + return objs, 200, nil +}