Execution
This commit is contained in:
parent
806f5d0f20
commit
00f25b48c0
@ -146,6 +146,24 @@ func (m *MongoDB) DeleteOne(id string, collection_name string) (int64, int, erro
|
|||||||
return result.DeletedCount, 200, nil
|
return result.DeletedCount, 200, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MongoDB) DeleteMultiple(f map[string]interface{}, collection_name string) (int64, int, error) {
|
||||||
|
filter := bson.D{}
|
||||||
|
for k, v := range f {
|
||||||
|
filter = append(filter, bson.E{Key: k, Value: v})
|
||||||
|
}
|
||||||
|
targetDBCollection := CollectionMap[collection_name]
|
||||||
|
opts := options.Delete().SetHint(bson.D{{Key: "_id", Value: 1}})
|
||||||
|
MngoCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
result, err := targetDBCollection.DeleteMany(MngoCtx, filter, opts)
|
||||||
|
if err != nil {
|
||||||
|
m.Logger.Error().Msg("Couldn't insert resource: " + err.Error())
|
||||||
|
return 0, 404, err
|
||||||
|
}
|
||||||
|
return result.DeletedCount, 200, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *MongoDB) UpdateOne(set interface{}, id string, collection_name string) (string, int, error) {
|
func (m *MongoDB) UpdateOne(set interface{}, id string, collection_name string) (string, int, error) {
|
||||||
var doc map[string]interface{}
|
var doc map[string]interface{}
|
||||||
b, _ := bson.Marshal(set)
|
b, _ := bson.Marshal(set)
|
||||||
|
1
go.mod
1
go.mod
@ -26,6 +26,7 @@ require (
|
|||||||
github.com/leodido/go-urn v1.4.0 // indirect
|
github.com/leodido/go-urn v1.4.0 // indirect
|
||||||
github.com/montanaflynn/stats v0.7.1 // indirect
|
github.com/montanaflynn/stats v0.7.1 // indirect
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||||
|
github.com/vk496/cron v1.2.0
|
||||||
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
|
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
|
||||||
github.com/xdg-go/scram v1.1.2 // indirect
|
github.com/xdg-go/scram v1.1.2 // indirect
|
||||||
github.com/xdg-go/stringprep v1.0.4 // indirect
|
github.com/xdg-go/stringprep v1.0.4 // indirect
|
||||||
|
2
go.sum
2
go.sum
@ -40,6 +40,8 @@ github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
|
|||||||
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
|
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
|
||||||
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||||
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
|
github.com/vk496/cron v1.2.0 h1:fDxb4qNi6Rmxh3h9snW1sKJ0nHgjpg3fYc0Oq+igbvk=
|
||||||
|
github.com/vk496/cron v1.2.0/go.mod h1:f8lpm+SIXbjvujp8Dix4S2B+GGva/q0yrRPQ8hwTtOc=
|
||||||
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
|
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
|
||||||
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
|
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
|
||||||
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
|
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
w "cloud.o-forge.io/core/oc-lib/models/resources/workflow"
|
w "cloud.o-forge.io/core/oc-lib/models/resources/workflow"
|
||||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||||
w2 "cloud.o-forge.io/core/oc-lib/models/workflow"
|
w2 "cloud.o-forge.io/core/oc-lib/models/workflow"
|
||||||
|
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||||
)
|
)
|
||||||
|
|
||||||
var models = map[string]func() utils.DBObject{
|
var models = map[string]func() utils.DBObject{
|
||||||
@ -19,6 +20,7 @@ var models = map[string]func() utils.DBObject{
|
|||||||
utils.STORAGE_RESOURCE.String(): func() utils.DBObject { return &s.StorageResource{} },
|
utils.STORAGE_RESOURCE.String(): func() utils.DBObject { return &s.StorageResource{} },
|
||||||
utils.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &p.ProcessingResource{} },
|
utils.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &p.ProcessingResource{} },
|
||||||
utils.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} },
|
utils.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} },
|
||||||
|
utils.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecution{} },
|
||||||
}
|
}
|
||||||
|
|
||||||
func Model(model int) utils.DBObject {
|
func Model(model int) utils.DBObject {
|
||||||
|
@ -10,6 +10,7 @@ const (
|
|||||||
DATACENTER_RESOURCE
|
DATACENTER_RESOURCE
|
||||||
WORKFLOW_RESOURCE
|
WORKFLOW_RESOURCE
|
||||||
WORKFLOW
|
WORKFLOW
|
||||||
|
WORKFLOW_EXECUTION
|
||||||
)
|
)
|
||||||
|
|
||||||
var str = [...]string{
|
var str = [...]string{
|
||||||
|
@ -1,8 +1,12 @@
|
|||||||
package oclib
|
package oclib
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
|
|
||||||
"cloud.o-forge.io/core/oc-lib/dbs/mongo"
|
"cloud.o-forge.io/core/oc-lib/dbs/mongo"
|
||||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||||
|
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||||
|
"github.com/vk496/cron"
|
||||||
)
|
)
|
||||||
|
|
||||||
type WorkflowMongoAccessor struct {
|
type WorkflowMongoAccessor struct {
|
||||||
@ -13,12 +17,64 @@ func (wfa *WorkflowMongoAccessor) DeleteOne(id string) (utils.DBObject, int, err
|
|||||||
return wfa.GenericDeleteOne(id, wfa)
|
return wfa.GenericDeleteOne(id, wfa)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (wfa *WorkflowMongoAccessor) execution(realData *Workflow, delete bool) (int, error) {
|
||||||
|
if delete {
|
||||||
|
mongo.MONGOService.DeleteMultiple(map[string]interface{}{
|
||||||
|
"workflow_id": realData.UUID,
|
||||||
|
}, wfa.GetType())
|
||||||
|
}
|
||||||
|
if realData.Schedule != nil {
|
||||||
|
accessor := workflow_execution.WorkflowExecutionMongoAccessor{}
|
||||||
|
if realData.Schedule.Start.IsZero() {
|
||||||
|
return 422, errors.New("should get a start date on the scheduler.")
|
||||||
|
}
|
||||||
|
if len(realData.Schedule.Cron) > 0 {
|
||||||
|
if realData.Schedule.End.IsZero() {
|
||||||
|
return 422, errors.New("a cron task should got a end date.")
|
||||||
|
}
|
||||||
|
c, err := cron.Parse(realData.Schedule.Cron)
|
||||||
|
if err != nil {
|
||||||
|
return 422, errors.New("Bad cron message: " + err.Error())
|
||||||
|
}
|
||||||
|
for s := c.Next(realData.Schedule.Start); !s.IsZero() && s.Before(realData.Schedule.End); s = c.Next(s) {
|
||||||
|
obj := &workflow_execution.WorkflowExecution{
|
||||||
|
ExecDate: s,
|
||||||
|
EndDate: realData.Schedule.End,
|
||||||
|
State: workflow_execution.SCHEDULED,
|
||||||
|
WorkflowID: realData.UUID,
|
||||||
|
}
|
||||||
|
accessor.StoreOne(obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
obj := &workflow_execution.WorkflowExecution{
|
||||||
|
ExecDate: realData.Schedule.Start,
|
||||||
|
EndDate: realData.Schedule.End,
|
||||||
|
State: workflow_execution.SCHEDULED,
|
||||||
|
WorkflowID: realData.UUID,
|
||||||
|
}
|
||||||
|
accessor.StoreOne(obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 200, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (wfa *WorkflowMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
|
func (wfa *WorkflowMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
|
||||||
|
if code, err := wfa.execution(set.(*Workflow), true); err != nil {
|
||||||
|
return nil, code, err
|
||||||
|
}
|
||||||
return wfa.GenericUpdateOne(set, id, wfa)
|
return wfa.GenericUpdateOne(set, id, wfa)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wfa *WorkflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
|
func (wfa *WorkflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
|
||||||
return wfa.GenericStoreOne(data, wfa)
|
res, code, err := wfa.GenericStoreOne(data, wfa)
|
||||||
|
if err != nil {
|
||||||
|
return nil, code, err
|
||||||
|
}
|
||||||
|
if code, err := wfa.execution(res.(*Workflow), false); err != nil {
|
||||||
|
return nil, code, err
|
||||||
|
}
|
||||||
|
return res, code, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wfa *WorkflowMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
|
func (wfa *WorkflowMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
|
||||||
|
@ -3,13 +3,13 @@ package oclib
|
|||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
type WorkflowSchedule struct {
|
type WorkflowSchedule struct {
|
||||||
Id string `json:"id"`
|
Id string `json:"id"`
|
||||||
Start time.Time
|
Start time.Time `json:"start" bson:"start validate:"required""`
|
||||||
End time.Time
|
End time.Time `json:"end,omitempty" bson:"end,omitempty"`
|
||||||
Cron string
|
Cron string `json:"cron,omitempty" bson:"cron,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *WorkflowSchedule) GetAllDates() (timetable []time.Time){
|
func (ws *WorkflowSchedule) GetAllDates() (timetable []time.Time) {
|
||||||
// Return all the execution time generated by the Cron
|
// Return all the execution time generated by the Cron
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
83
models/workflow_execution/workflow_execution.go
Normal file
83
models/workflow_execution/workflow_execution.go
Normal file
@ -0,0 +1,83 @@
|
|||||||
|
package workflow_execution
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||||
|
"github.com/google/uuid"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ScheduledType int
|
||||||
|
|
||||||
|
const (
|
||||||
|
SCHEDULED ScheduledType = iota
|
||||||
|
STARTED
|
||||||
|
FAILURE
|
||||||
|
SUCCESS
|
||||||
|
)
|
||||||
|
|
||||||
|
var str = [...]string{
|
||||||
|
"scheduled",
|
||||||
|
"started",
|
||||||
|
"failure",
|
||||||
|
"success",
|
||||||
|
}
|
||||||
|
|
||||||
|
func FromInt(i int) string {
|
||||||
|
return str[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d ScheduledType) String() string {
|
||||||
|
return str[d]
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnumIndex - Creating common behavior - give the type a EnumIndex functio
|
||||||
|
func (d ScheduledType) EnumIndex() int {
|
||||||
|
return int(d)
|
||||||
|
}
|
||||||
|
|
||||||
|
type WorkflowExecution struct {
|
||||||
|
UUID string `json:"id,omitempty" bson:"id,omitempty" validate:"required"`
|
||||||
|
ExecDate time.Time `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"`
|
||||||
|
EndDate time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"`
|
||||||
|
State ScheduledType `json:"state,omitempty" bson:"state,omitempty" `
|
||||||
|
WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ao *WorkflowExecution) GetID() string {
|
||||||
|
return ao.UUID
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *WorkflowExecution) GenerateID() {
|
||||||
|
r.UUID = uuid.New().String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *WorkflowExecution) GetName() string {
|
||||||
|
return d.UUID + "_" + d.ExecDate.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *WorkflowExecution) GetAccessor() utils.Accessor {
|
||||||
|
data := &WorkflowExecutionMongoAccessor{}
|
||||||
|
data.SetLogger(utils.WORKFLOW)
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dma *WorkflowExecution) Deserialize(j map[string]interface{}) utils.DBObject {
|
||||||
|
b, err := json.Marshal(j)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
json.Unmarshal(b, dma)
|
||||||
|
return dma
|
||||||
|
}
|
||||||
|
|
||||||
|
func (dma *WorkflowExecution) Serialize() map[string]interface{} {
|
||||||
|
var m map[string]interface{}
|
||||||
|
b, err := json.Marshal(dma)
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
json.Unmarshal(b, dma)
|
||||||
|
return m
|
||||||
|
}
|
@ -0,0 +1,52 @@
|
|||||||
|
package workflow_execution
|
||||||
|
|
||||||
|
import (
|
||||||
|
"cloud.o-forge.io/core/oc-lib/dbs/mongo"
|
||||||
|
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||||
|
)
|
||||||
|
|
||||||
|
type WorkflowExecutionMongoAccessor struct {
|
||||||
|
utils.AbstractAccessor
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wfa *WorkflowExecutionMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) {
|
||||||
|
return wfa.GenericDeleteOne(id, wfa)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wfa *WorkflowExecutionMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
|
||||||
|
return wfa.GenericUpdateOne(set, id, wfa)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wfa *WorkflowExecutionMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
|
||||||
|
return wfa.GenericStoreOne(data, wfa)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wfa *WorkflowExecutionMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
|
||||||
|
return wfa.GenericStoreOne(data, wfa)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wfa *WorkflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
|
||||||
|
var workflow WorkflowExecution
|
||||||
|
res_mongo, code, err := mongo.MONGOService.LoadOne(id, wfa.GetType())
|
||||||
|
if err != nil {
|
||||||
|
wfa.Logger.Error().Msg("Could not retrieve " + id + " from db. Error: " + err.Error())
|
||||||
|
return nil, code, err
|
||||||
|
}
|
||||||
|
res_mongo.Decode(&workflow)
|
||||||
|
return &workflow, 200, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wfa WorkflowExecutionMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) {
|
||||||
|
objs := []utils.ShallowDBObject{}
|
||||||
|
res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType())
|
||||||
|
if err != nil {
|
||||||
|
wfa.Logger.Error().Msg("Could not retrieve any from db. Error: " + err.Error())
|
||||||
|
return nil, code, err
|
||||||
|
}
|
||||||
|
for res_mongo.Next(mongo.MngoCtx) {
|
||||||
|
var obj utils.AbstractObject
|
||||||
|
res_mongo.Decode(&obj)
|
||||||
|
objs = append(objs, &obj)
|
||||||
|
}
|
||||||
|
return objs, 200, nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user