diff --git a/models/booking/booking.go b/models/booking/booking.go new file mode 100644 index 0000000..f27f633 --- /dev/null +++ b/models/booking/booking.go @@ -0,0 +1,83 @@ +package booking + +import ( + "encoding/json" + "time" + + "cloud.o-forge.io/core/oc-lib/dbs" + "cloud.o-forge.io/core/oc-lib/models/utils" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" + "github.com/google/uuid" + "go.mongodb.org/mongo-driver/bson/primitive" +) + +type Booking struct { + workflow_execution.WorkflowExecution + PeerID string `json:"peer_id,omitempty" bson:"peer_id,omitempty" validate:"required"` + DatacenterResourceID string `json:"datacenter_resource_id,omitempty" bson:"datacenter_resource_id,omitempty" validate:"required"` +} + +func (wfa *Booking) CheckBooking(start time.Time, end *time.Time) (bool, error) { + // check if + if end == nil { + // if no end... then Book like a savage + return true, nil + } + e := *end + accessor := wfa.GetAccessor() + res, code, err := accessor.Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "workflowexecution.state": {{Operator: dbs.EQUAL.String(), Value: workflow_execution.SCHEDULED.EnumIndex()}}, + "workflowexecution.execution_date": { + {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(e)}, + {Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(start)}, + }, + }, + }, "") + if code != 200 { + return false, err + } + return len(res) == 0, nil +} + +func (wfa *Booking) ArgoStatusToState(status string) *Booking { + wfa.WorkflowExecution.ArgoStatusToState(status) + return wfa +} + +func (ao *Booking) GetID() string { + return ao.UUID +} + +func (r *Booking) GenerateID() { + r.UUID = uuid.New().String() +} + +func (d *Booking) GetName() string { + return d.UUID + "_" + d.ExecDate.String() +} + +func (d *Booking) GetAccessor() utils.Accessor { + data := New() + data.SetLogger(utils.BOOKING) + return data +} + +func (dma *Booking) Deserialize(j map[string]interface{}) utils.DBObject { + b, err := json.Marshal(j) + if err != nil { + return nil + } + json.Unmarshal(b, dma) + return dma +} + +func (dma *Booking) Serialize() map[string]interface{} { + var m map[string]interface{} + b, err := json.Marshal(dma) + if err != nil { + return nil + } + json.Unmarshal(b, &m) + return m +} diff --git a/models/booking/booking_execution_mongo_accessor.go b/models/booking/booking_execution_mongo_accessor.go new file mode 100644 index 0000000..891d450 --- /dev/null +++ b/models/booking/booking_execution_mongo_accessor.go @@ -0,0 +1,83 @@ +package booking + +import ( + "cloud.o-forge.io/core/oc-lib/dbs" + "cloud.o-forge.io/core/oc-lib/dbs/mongo" + "cloud.o-forge.io/core/oc-lib/models/utils" +) + +type bookingExecutionMongoAccessor struct { + utils.AbstractAccessor +} + +func New() *bookingExecutionMongoAccessor { + return &bookingExecutionMongoAccessor{} +} + +func (wfa *bookingExecutionMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { + return wfa.GenericDeleteOne(id, wfa) +} + +func (wfa *bookingExecutionMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { + return wfa.GenericUpdateOne(set, id, wfa, &Booking{}) +} + +func (wfa *bookingExecutionMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { + return wfa.GenericStoreOne(data, wfa) +} + +func (wfa *bookingExecutionMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) { + return wfa.GenericStoreOne(data, wfa) +} + +func (wfa *bookingExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { + var workflow Booking + res_mongo, code, err := mongo.MONGOService.LoadOne(id, wfa.GetType()) + if err != nil { + wfa.Logger.Error().Msg("Could not retrieve " + id + " from db. Error: " + err.Error()) + return nil, code, err + } + res_mongo.Decode(&workflow) + return &workflow, 200, nil +} + +func (wfa bookingExecutionMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) { + objs := []utils.ShallowDBObject{} + res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType()) + if err != nil { + wfa.Logger.Error().Msg("Could not retrieve any from db. Error: " + err.Error()) + return nil, code, err + } + var results []Booking + if err = res_mongo.All(mongo.MngoCtx, &results); err != nil { + return nil, 404, err + } + for _, r := range results { + objs = append(objs, &r.AbstractObject) + } + return objs, 200, nil +} + +func (wfa *bookingExecutionMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) { + objs := []utils.ShallowDBObject{} + if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" { + filters = &dbs.Filters{ + Or: map[string][]dbs.Filter{ + "abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, + }, + } + } + res_mongo, code, err := mongo.MONGOService.Search(filters, wfa.GetType()) + if err != nil { + wfa.Logger.Error().Msg("Could not store to db. Error: " + err.Error()) + return nil, code, err + } + var results []Booking + if err = res_mongo.All(mongo.MngoCtx, &results); err != nil { + return nil, 404, err + } + for _, r := range results { + objs = append(objs, &r) + } + return objs, 200, nil +} diff --git a/models/models.go b/models/models.go index 69f9dc2..38680a2 100644 --- a/models/models.go +++ b/models/models.go @@ -3,6 +3,7 @@ package models import ( "cloud.o-forge.io/core/oc-lib/logs" + "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resource_model" d "cloud.o-forge.io/core/oc-lib/models/resources/data" @@ -31,6 +32,7 @@ var models = map[string]func() utils.DBObject{ utils.PEER.String(): func() utils.DBObject { return &peer.Peer{} }, utils.SHARED_WORKSPACE.String(): func() utils.DBObject { return &shared_workspace.SharedWorkspace{} }, utils.RULE.String(): func() utils.DBObject { return &rule.Rule{} }, + utils.BOOKING.String(): func() utils.DBObject { return &booking.Booking{} }, } func Model(model int) utils.DBObject { diff --git a/models/utils/enums.go b/models/utils/enums.go index e3b5749..9dce549 100644 --- a/models/utils/enums.go +++ b/models/utils/enums.go @@ -16,6 +16,7 @@ const ( PEER SHARED_WORKSPACE RULE + BOOKING ) var Str = [...]string{ @@ -32,6 +33,7 @@ var Str = [...]string{ "peer", "shared_workspace", "rule", + "booking", } func FromInt(i int) string { diff --git a/models/workflow/workflow.go b/models/workflow/workflow.go index b99d814..15603f3 100644 --- a/models/workflow/workflow.go +++ b/models/workflow/workflow.go @@ -4,9 +4,12 @@ import ( "encoding/json" "slices" + "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" "cloud.o-forge.io/core/oc-lib/models/utils" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" + "go.mongodb.org/mongo-driver/bson/primitive" ) type AbstractWorkflow struct { @@ -16,11 +19,14 @@ type AbstractWorkflow struct { Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` } -func (w *AbstractWorkflow) isDCLink(link graph.GraphLink) bool { - if slices.Contains(w.Datacenters, link.Destination.ID) || slices.Contains(w.Datacenters, link.Source.ID) { - return true +func (w *AbstractWorkflow) isDCLink(link graph.GraphLink) (bool, string) { + if slices.Contains(w.Datacenters, link.Source.ID) { + return true, link.Source.ID } - return false + if slices.Contains(w.Datacenters, link.Destination.ID) { + return true, link.Destination.ID + } + return false, "" } type Workflow struct { @@ -28,35 +34,34 @@ type Workflow struct { AbstractWorkflow } -func (d *Workflow) GetName() string { - return d.Name +func (wfa *Workflow) CheckBooking() (bool, error) { + // check if + if wfa.Schedule == nil || wfa.Schedule.Start == nil { + return false, nil + } + if wfa.Schedule.End == nil { + // if no end... then Book like a savage + return true, nil + } + e := *wfa.Schedule.End + accessor := wfa.GetAccessor() + res, code, err := accessor.Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "workflowexecution.state": {{Operator: dbs.EQUAL.String(), Value: workflow_execution.SCHEDULED.EnumIndex()}}, + "workflowexecution.execution_date": { + {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(e)}, + {Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(*wfa.Schedule.Start)}, + }, + }, + }, "") + if code != 200 { + return false, err + } + return len(res) == 0, nil } -func (d *Workflow) CheckBooking() bool { - return true - /*if d.Schedule != nil && d.Schedule.Start != nil { - sd := primitive.NewDateTimeFromTime(d.Schedule.Start.Add(time.Minute * -1)) - var f dbs.Filters - if d.Schedule.End == nil { - ed := primitive.NewDateTimeFromTime(d.Schedule.Start.Add(time.Minute * 10)) - f = dbs.Filters{ - And: map[string][]dbs.Filter{ - "execution_date": {{Operator: "gte", Value: sd}, {Operator: "lte", Value: ed}}, - }, - } - } else { - ed := primitive.NewDateTimeFromTime(d.Schedule.End.Add(time.Minute * 1)) - f = dbs.Filters{ - And: map[string][]dbs.Filter{ - "execution_date": {{Operator: "gte", Value: sd}}, - "end_date": {{Operator: "lte", Value: ed}}, - }, - } - } - res, _, _ := (&workflow_execution.WorkflowExecution{}).GetAccessor().Search(&f, "") - return len(res) == 0 - } - return true*/ +func (d *Workflow) GetName() string { + return d.Name } func (d *Workflow) GetAccessor() utils.Accessor { diff --git a/models/workflow/workflow_mongo_accessor.go b/models/workflow/workflow_mongo_accessor.go index bc17c55..ced28d0 100644 --- a/models/workflow/workflow_mongo_accessor.go +++ b/models/workflow/workflow_mongo_accessor.go @@ -6,7 +6,9 @@ import ( "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/dbs/mongo" + "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/resources" + "cloud.o-forge.io/core/oc-lib/models/resources/datacenter" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/models/workspace" @@ -78,6 +80,60 @@ func (wfa *workflowMongoAccessor) DeleteOne(id string) (utils.DBObject, int, err return wfa.GenericDeleteOne(id, wfa) } +func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*workflow_execution.WorkflowExecution) []*booking.Booking { + books := []*booking.Booking{} + if realData.Schedule == nil { + return books + } + res, _, _ := wfa.LoadOne(id) + r := res.(*Workflow) + g := r.Graph + if realData.Graph != nil { + g = realData.Graph + } + if g != nil && g.Links != nil && len(g.Links) > 0 { + bookAccessor := (&booking.Booking{}).GetAccessor() + accessor := (&datacenter.DatacenterResource{}).GetAccessor() + for _, link := range g.Links { + if ok, dc_id := realData.isDCLink(link); ok { + _, code, _ := accessor.LoadOne(dc_id) + if code != 200 { + continue + } + // CHECK BOOKING + // dc.(*datacenter.DatacenterResource).SourceUrl should get source url... but it's not implemented + res, code, _ := bookAccessor.Search(&dbs.Filters{And: map[string][]dbs.Filter{ + "peer_id": {{Operator: dbs.EQUAL.String(), Value: "my_peer"}}, // peer is always the same for the moment + "datacenter_resource_id": {{Operator: dbs.EQUAL.String(), Value: dc_id}}, + }}, "") + if code != 200 { + continue + } + for _, b := range res { + bookAccessor.DeleteOne(b.GetID()) + } + for _, exec := range execs { + if ok, err := (&booking.Booking{}).CheckBooking(*exec.ExecDate, exec.EndDate); !ok { + if err != nil { + return books + } + return books + } + b, code, _ := bookAccessor.StoreOne(&booking.Booking{ + PeerID: "my_peer", + DatacenterResourceID: dc_id, + WorkflowExecution: *exec, + }) + if code == 200 { + books = append(books, b.(*booking.Booking)) + } + } + } + } + } + return books +} + func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delete bool) (int, error) { if realData.Schedule == nil { return 200, nil @@ -87,8 +143,15 @@ func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delet if r.Schedule != nil && r.Schedule.Start == realData.Schedule.Start && r.Schedule.End == realData.Schedule.End && r.Schedule.Cron == realData.Schedule.Cron { return 200, nil } - if !realData.CheckBooking() { - return 409, errors.New("the booking is already taken.") + accessor := (&workflow_execution.WorkflowExecution{}).GetAccessor() + execs, err := wfa.getExecutions(id, realData) + for _, exec := range execs { + if ok, err := (&booking.Booking{}).CheckBooking(*exec.ExecDate, exec.EndDate); !ok { + if err != nil { + return 500, err + } + return 409, errors.New("the booking from " + exec.ExecDate.String() + " is already taken.") + } } if delete { mongo.MONGOService.DeleteMultiple(map[string]interface{}{ @@ -96,8 +159,6 @@ func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delet "state": 1, }, utils.WORKFLOW_EXECUTION.String()) } - accessor := (&workflow_execution.WorkflowExecution{}).GetAccessor() - execs, err := wfa.getExecutions(id, realData) if err == nil && len(execs) > 0 { for _, obj := range execs { _, code, err := accessor.StoreOne(obj) @@ -108,6 +169,7 @@ func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delet } else { return 422, err } + wfa.book(id, realData, execs) return 200, nil }