From 019b590b4f442348b69c2d3772ed97ec54795862 Mon Sep 17 00:00:00 2001 From: mr Date: Tue, 11 Feb 2025 11:26:02 +0100 Subject: [PATCH] workflow scheduler create booking with a booking execution lot id --- entrypoint.go | 4 +- models/booking/booking.go | 7 ++- models/models.go | 2 +- models/order/order.go | 10 ++-- .../workflow_execution/workflow_execution.go | 44 +++++++------- .../workflow_execution_mongo_accessor.go | 24 ++++---- .../workflow_execution/workflow_scheduler.go | 59 +++++++++++-------- 7 files changed, 82 insertions(+), 68 deletions(-) diff --git a/entrypoint.go b/entrypoint.go index 65def07..674847d 100644 --- a/entrypoint.go +++ b/entrypoint.go @@ -563,9 +563,9 @@ func (l *LibData) ToRule() *rule.Rule { return nil } -func (l *LibData) ToWorkflowExecution() *workflow_execution.WorkflowExecutions { +func (l *LibData) ToWorkflowExecution() *workflow_execution.WorkflowExecution { if l.Data.GetAccessor(nil).GetType() == tools.WORKFLOW_EXECUTION { - return l.Data.(*workflow_execution.WorkflowExecutions) + return l.Data.(*workflow_execution.WorkflowExecution) } return nil } diff --git a/models/booking/booking.go b/models/booking/booking.go index 53f3ea7..2023376 100644 --- a/models/booking/booking.go +++ b/models/booking/booking.go @@ -15,8 +15,9 @@ import ( */ type Booking struct { utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) - DestPeerID string `json:"dest_peer_id,omitempty"` // DestPeerID is the ID of the destination peer - WorkflowID string `json:"workflow_id,omitempty" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow + ExecutionsID string `json:"executions_id,omitempty" bson:"executions_id,omitempty"` // ExecutionsID is the ID of the executions + DestPeerID string `json:"dest_peer_id,omitempty"` // DestPeerID is the ID of the destination peer + WorkflowID string `json:"workflow_id,omitempty" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow ExecutionID string `json:"execution_id,omitempty" bson:"execution_id,omitempty" validate:"required"` State enum.BookingStatus `json:"state,omitempty" bson:"state,omitempty" validate:"required"` // State is the state of the booking ExpectedStartDate time.Time `json:"expected_start_date,omitempty" bson:"expected_start_date,omitempty" validate:"required"` // ExpectedStartDate is the expected start date of the booking @@ -93,7 +94,7 @@ func (d *Booking) VerifyAuth(request *tools.APIRequest) bool { } func (r *Booking) StoreDraftDefault() { - r.IsDraft = true + r.IsDraft = false } func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) { diff --git a/models/models.go b/models/models.go index d99ccd4..a52574a 100644 --- a/models/models.go +++ b/models/models.go @@ -28,7 +28,7 @@ var models = map[string]func() utils.DBObject{ tools.STORAGE_RESOURCE.String(): func() utils.DBObject { return &resource.StorageResource{} }, tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &resource.ProcessingResource{} }, tools.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} }, - tools.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecutions{} }, + tools.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecution{} }, tools.WORKSPACE.String(): func() utils.DBObject { return &w3.Workspace{} }, tools.PEER.String(): func() utils.DBObject { return &peer.Peer{} }, tools.COLLABORATIVE_AREA.String(): func() utils.DBObject { return &collaborative_area.CollaborativeArea{} }, diff --git a/models/order/order.go b/models/order/order.go index 81ce701..63389bc 100644 --- a/models/order/order.go +++ b/models/order/order.go @@ -68,10 +68,10 @@ func (o *Order) Pay(scheduler *workflow_execution.WorkflowSchedule, request *too } else { o.IsDraft = false } - for _, exec := range scheduler.WorkflowExecutions { + for _, exec := range scheduler.WorkflowExecution { exec.IsDraft = false _, code, err := utils.GenericUpdateOne(exec, exec.GetID(), - workflow_execution.NewAccessor(request), &workflow_execution.WorkflowExecutions{}) + workflow_execution.NewAccessor(request), &workflow_execution.WorkflowExecution{}) if code != 200 || err != nil { return errors.New("could not update the workflow execution" + fmt.Sprintf("%v", err)) } @@ -100,7 +100,7 @@ func (o *Order) draftStoreFromModel(scheduler *workflow_execution.WorkflowSchedu o.IsDraft = true o.OrderBy = request.PeerID o.WorkflowExecutionIDs = []string{} // create an array of ids - for _, exec := range scheduler.WorkflowExecutions { + for _, exec := range scheduler.WorkflowExecution { o.WorkflowExecutionIDs = append(o.WorkflowExecutionIDs, exec.GetID()) } // set the name of the order @@ -166,12 +166,12 @@ func (o *Order) draftBookOrder(scheduler *workflow_execution.WorkflowSchedule, r if request == nil { return draftedBookings, errors.New("no request found") } - for _, exec := range scheduler.WorkflowExecutions { + for _, exec := range scheduler.WorkflowExecution { _, priceds, _, err := scheduler.Workflow.Planify(exec.ExecDate, exec.EndDate, request) if err != nil { return draftedBookings, errors.New("could not planify the workflow" + fmt.Sprintf("%v", err)) } - bookings := exec.Book(scheduler.Workflow.UUID, priceds) + bookings := exec.Book(scheduler.UUID, scheduler.Workflow.UUID, priceds) for _, booking := range bookings { _, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) diff --git a/models/workflow_execution/workflow_execution.go b/models/workflow_execution/workflow_execution.go index 41726c3..797a11a 100644 --- a/models/workflow_execution/workflow_execution.go +++ b/models/workflow_execution/workflow_execution.go @@ -15,39 +15,40 @@ import ( ) /* -* WorkflowExecutions is a struct that represents a list of workflow executions +* WorkflowExecution is a struct that represents a list of workflow executions * Warning: No user can write (del, post, put) a workflow execution, it is only used by the system * workflows generate their own executions */ -type WorkflowExecutions struct { +type WorkflowExecution struct { utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) + ExecutionID string `json:"execution_id,omitempty" bson:"execution_id,omitempty" validate:"required"` ExecDate time.Time `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` // ExecDate is the execution date of the workflow, is required EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` // EndDate is the end date of the workflow State enum.BookingStatus `json:"state" bson:"state" default:"0"` // TEMPORARY TODO DEFAULT 1 -> 0 State is the state of the workflow WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow } -func (r *WorkflowExecutions) StoreDraftDefault() { - r.IsDraft = true // TODO: TEMPORARY - r.State = enum.DRAFT +func (r *WorkflowExecution) StoreDraftDefault() { + r.IsDraft = false // TODO: TEMPORARY + r.State = enum.SCHEDULED } -func (r *WorkflowExecutions) CanUpdate(set utils.DBObject) (bool, utils.DBObject) { - if r.State != set.(*WorkflowExecutions).State { - return true, &WorkflowExecutions{State: set.(*WorkflowExecutions).State} // only state can be updated +func (r *WorkflowExecution) CanUpdate(set utils.DBObject) (bool, utils.DBObject) { + if r.State != set.(*WorkflowExecution).State { + return true, &WorkflowExecution{State: set.(*WorkflowExecution).State} // only state can be updated } return !r.IsDraft, set // only draft buying can be updated } -func (r *WorkflowExecutions) CanDelete() bool { +func (r *WorkflowExecution) CanDelete() bool { return r.IsDraft // only draft bookings can be deleted } -func (wfa *WorkflowExecutions) Equals(we *WorkflowExecutions) bool { +func (wfa *WorkflowExecution) Equals(we *WorkflowExecution) bool { return wfa.ExecDate.Equal(we.ExecDate) && wfa.WorkflowID == we.WorkflowID } -func (ws *WorkflowExecutions) PurgeDraft(request *tools.APIRequest) error { +func (ws *WorkflowExecution) PurgeDraft(request *tools.APIRequest) error { if ws.EndDate == nil { // if no end... then Book like a savage e := ws.ExecDate.Add(time.Hour) @@ -74,7 +75,7 @@ func (ws *WorkflowExecutions) PurgeDraft(request *tools.APIRequest) error { } // tool to transform the argo status to a state -func (wfa *WorkflowExecutions) ArgoStatusToState(status string) *WorkflowExecutions { +func (wfa *WorkflowExecution) ArgoStatusToState(status string) *WorkflowExecution { status = strings.ToLower(status) switch status { case "succeeded": // Succeeded @@ -89,29 +90,29 @@ func (wfa *WorkflowExecutions) ArgoStatusToState(status string) *WorkflowExecuti return wfa } -func (r *WorkflowExecutions) GenerateID() { +func (r *WorkflowExecution) GenerateID() { r.UUID = uuid.New().String() } -func (d *WorkflowExecutions) GetName() string { +func (d *WorkflowExecution) GetName() string { return d.UUID + "_" + d.ExecDate.String() } -func (d *WorkflowExecutions) GetAccessor(request *tools.APIRequest) utils.Accessor { +func (d *WorkflowExecution) GetAccessor(request *tools.APIRequest) utils.Accessor { return NewAccessor(request) // Create a new instance of the accessor } -func (d *WorkflowExecutions) VerifyAuth(request *tools.APIRequest) bool { +func (d *WorkflowExecution) VerifyAuth(request *tools.APIRequest) bool { return true } -func (d *WorkflowExecutions) Book(wfID string, priceds map[tools.DataType][]pricing.PricedItemITF) []*booking.Booking { - booking := d.bookEach(wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE]) - booking = append(booking, d.bookEach(wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...) +func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[tools.DataType][]pricing.PricedItemITF) []*booking.Booking { + booking := d.bookEach(executionsID, wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE]) + booking = append(booking, d.bookEach(executionsID, wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...) return booking } -func (d *WorkflowExecutions) bookEach(wfID string, dt tools.DataType, priceds []pricing.PricedItemITF) []*booking.Booking { +func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.DataType, priceds []pricing.PricedItemITF) []*booking.Booking { items := []*booking.Booking{} for _, priced := range priceds { start := d.ExecDate @@ -120,7 +121,8 @@ func (d *WorkflowExecutions) bookEach(wfID string, dt tools.DataType, priceds [] } end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second) bookingItem := &booking.Booking{ - State: enum.DRAFT, + ExecutionsID: executionsID, + State: enum.SCHEDULED, ResourceID: priced.GetID(), ResourceType: dt, DestPeerID: priced.GetCreatorID(), diff --git a/models/workflow_execution/workflow_execution_mongo_accessor.go b/models/workflow_execution/workflow_execution_mongo_accessor.go index 84aa407..af649d0 100644 --- a/models/workflow_execution/workflow_execution_mongo_accessor.go +++ b/models/workflow_execution/workflow_execution_mongo_accessor.go @@ -43,11 +43,11 @@ func (wfa *workflowExecutionMongoAccessor) DeleteOne(id string) (utils.DBObject, } func (wfa *workflowExecutionMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { - if set.(*WorkflowExecutions).State == 0 { + if set.(*WorkflowExecution).State == 0 { return nil, 400, errors.New("state is required") } - realSet := WorkflowExecutions{State: set.(*WorkflowExecutions).State} - return utils.GenericUpdateOne(&realSet, id, wfa, &WorkflowExecutions{}) + realSet := WorkflowExecution{State: set.(*WorkflowExecution).State} + return utils.GenericUpdateOne(&realSet, id, wfa, &WorkflowExecution{}) } func (wfa *workflowExecutionMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { @@ -59,13 +59,13 @@ func (wfa *workflowExecutionMongoAccessor) CopyOne(data utils.DBObject) (utils.D } func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { - return utils.GenericLoadOne[*WorkflowExecutions](id, func(d utils.DBObject) (utils.DBObject, int, error) { - if d.(*WorkflowExecutions).State == enum.DRAFT && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) { + return utils.GenericLoadOne[*WorkflowExecution](id, func(d utils.DBObject) (utils.DBObject, int, error) { + if d.(*WorkflowExecution).State == enum.DRAFT && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecution).ExecDate) { utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request)) return nil, 404, errors.New("not found") } - if d.(*WorkflowExecutions).State == enum.SCHEDULED && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) { - d.(*WorkflowExecutions).State = enum.FORGOTTEN + if d.(*WorkflowExecution).State == enum.SCHEDULED && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecution).ExecDate) { + d.(*WorkflowExecution).State = enum.FORGOTTEN utils.GenericRawUpdateOne(d, id, newShallowAccessor(a.Request)) } return d, 200, nil @@ -73,21 +73,21 @@ func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int } func (a *workflowExecutionMongoAccessor) LoadAll(isDraft bool) ([]utils.ShallowDBObject, int, error) { - return utils.GenericLoadAll[*WorkflowExecutions](a.getExec(), isDraft, a) + return utils.GenericLoadAll[*WorkflowExecution](a.getExec(), isDraft, a) } func (a *workflowExecutionMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) { - return utils.GenericSearch[*WorkflowExecutions](filters, search, a.GetExecFilters(search), a.getExec(), isDraft, a) + return utils.GenericSearch[*WorkflowExecution](filters, search, a.GetExecFilters(search), a.getExec(), isDraft, a) } func (a *workflowExecutionMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject { return func(d utils.DBObject) utils.ShallowDBObject { - if d.(*WorkflowExecutions).State == enum.DRAFT && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) { + if d.(*WorkflowExecution).State == enum.DRAFT && time.Now().UTC().After(d.(*WorkflowExecution).ExecDate) { utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request)) return nil } - if d.(*WorkflowExecutions).State == enum.SCHEDULED && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) { - d.(*WorkflowExecutions).State = enum.FORGOTTEN + if d.(*WorkflowExecution).State == enum.SCHEDULED && time.Now().UTC().After(d.(*WorkflowExecution).ExecDate) { + d.(*WorkflowExecution).State = enum.FORGOTTEN utils.GenericRawUpdateOne(d, d.GetID(), newShallowAccessor(a.Request)) return d } diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index 852b2b4..f92909c 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -11,6 +11,7 @@ import ( "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/tools" + "github.com/google/uuid" "github.com/robfig/cron" ) @@ -20,14 +21,15 @@ import ( */ // it's a flying object only use in a session time. It's not stored in the database type WorkflowSchedule struct { - Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule - WorkflowExecutions []*WorkflowExecutions `json:"workflow_executions,omitempty"` // WorkflowExecutions is the list of executions of the workflow - Message string `json:"message,omitempty"` // Message is the message of the schedule - Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule - Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time - End *time.Time `json:"end,omitempty"` // End is the end time of the schedule, is required and must be greater than the Start time - DurationS float64 `json:"duration_s" default:"-1"` // End is the end time of the schedule - Cron string `json:"cron,omitempty"` // here the cron format : ss mm hh dd MM dw task + UUID string `json:"id" validate:"required"` // ExecutionsID is the list of the executions id of the workflow + Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule + WorkflowExecution []*WorkflowExecution `json:"workflow_executions,omitempty"` // WorkflowExecution is the list of executions of the workflow + Message string `json:"message,omitempty"` // Message is the message of the schedule + Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule + Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time + End *time.Time `json:"end,omitempty"` // End is the end time of the schedule, is required and must be greater than the Start time + DurationS float64 `json:"duration_s" default:"-1"` // End is the end time of the schedule + Cron string `json:"cron,omitempty"` // here the cron format : ss mm hh dd MM dw task } func NewScheduler(start string, end string, durationInS float64, cron string) *WorkflowSchedule { @@ -36,6 +38,7 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W return nil } ws := &WorkflowSchedule{ + UUID: uuid.New().String(), Start: s, DurationS: durationInS, Cron: cron, @@ -47,19 +50,19 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W return ws } -func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecutions, error) { +func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, error) { if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { - return false, nil, []*WorkflowExecutions{}, errors.New("no caller defined") + return false, nil, []*WorkflowExecution{}, errors.New("no caller defined") } access := workflow.NewAccessor(request) res, code, err := access.LoadOne(wfID) if code != 200 { - return false, nil, []*WorkflowExecutions{}, errors.New("could not load the workflow with id: " + err.Error()) + return false, nil, []*WorkflowExecution{}, errors.New("could not load the workflow with id: " + err.Error()) } wf := res.(*workflow.Workflow) longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request) if err != nil { - return false, wf, []*WorkflowExecutions{}, err + return false, wf, []*WorkflowExecution{}, err } ws.DurationS = longest ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds." @@ -68,10 +71,10 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) } execs, err := ws.getExecutions(wf) if err != nil { - return false, wf, []*WorkflowExecutions{}, err + return false, wf, []*WorkflowExecution{}, err } for _, exec := range execs { - bookings := exec.Book(wfID, priceds) + bookings := exec.Book(ws.UUID, wfID, priceds) for _, b := range bookings { meth := request.Caller.URLS[tools.BOOKING][tools.GET] meth = strings.ReplaceAll(meth, ":id", b.ResourceID) @@ -83,34 +86,42 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) return false, wf, execs, err } } + for _, booking := range bookings { + _, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", + tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) + if err != nil { + return false, wf, execs, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) + } + } + } return true, wf, execs, nil } -func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecutions, error) { +func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { if request == nil { - return ws, nil, []*WorkflowExecutions{}, errors.New("no request found") + return ws, nil, []*WorkflowExecution{}, errors.New("no request found") } c := request.Caller if c == nil || c.URLS == nil || c.URLS[tools.BOOKING] == nil { - return ws, nil, []*WorkflowExecutions{}, errors.New("no caller defined") + return ws, nil, []*WorkflowExecution{}, errors.New("no caller defined") } methods := c.URLS[tools.BOOKING] if _, ok := methods[tools.GET]; !ok { - return ws, nil, []*WorkflowExecutions{}, errors.New("no path found") + return ws, nil, []*WorkflowExecution{}, errors.New("no path found") } ok, wf, executions, err := ws.CheckBooking(wfID, request) if !ok || err != nil { - return ws, nil, []*WorkflowExecutions{}, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) + return ws, nil, []*WorkflowExecution{}, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) } ws.Workflow = wf - ws.WorkflowExecutions = executions + ws.WorkflowExecution = executions for _, exec := range executions { err := exec.PurgeDraft(request) if err != nil { - return ws, nil, []*WorkflowExecutions{}, errors.New("purge draft" + fmt.Sprintf("%v", err)) + return ws, nil, []*WorkflowExecution{}, errors.New("purge draft" + fmt.Sprintf("%v", err)) } exec.GenerateID() exec.StoreDraftDefault() @@ -132,14 +143,14 @@ VERIFY THAT WE HANDLE DIFFERENCE BETWEEN LOCATION TIME && BOOKING * getExecutions is a function that returns the executions of a workflow * it returns an array of workflow_execution.WorkflowExecution */ -func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*WorkflowExecutions, error) { - workflows_executions := []*WorkflowExecutions{} +func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*WorkflowExecution, error) { + workflows_executions := []*WorkflowExecution{} dates, err := ws.getDates() if err != nil { return workflows_executions, err } for _, date := range dates { - obj := &WorkflowExecutions{ + obj := &WorkflowExecution{ AbstractObject: utils.AbstractObject{ Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution },