workflow scheduler create booking with a booking execution lot id
This commit is contained in:
		| @@ -563,9 +563,9 @@ func (l *LibData) ToRule() *rule.Rule { | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (l *LibData) ToWorkflowExecution() *workflow_execution.WorkflowExecutions { | func (l *LibData) ToWorkflowExecution() *workflow_execution.WorkflowExecution { | ||||||
| 	if l.Data.GetAccessor(nil).GetType() == tools.WORKFLOW_EXECUTION { | 	if l.Data.GetAccessor(nil).GetType() == tools.WORKFLOW_EXECUTION { | ||||||
| 		return l.Data.(*workflow_execution.WorkflowExecutions) | 		return l.Data.(*workflow_execution.WorkflowExecution) | ||||||
| 	} | 	} | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|   | |||||||
| @@ -15,8 +15,9 @@ import ( | |||||||
|  */ |  */ | ||||||
| type Booking struct { | type Booking struct { | ||||||
| 	utils.AbstractObject                    // AbstractObject contains the basic fields of an object (id, name) | 	utils.AbstractObject                    // AbstractObject contains the basic fields of an object (id, name) | ||||||
| 	DestPeerID           string             `json:"dest_peer_id,omitempty"`                             // DestPeerID is the ID of the destination peer | 	ExecutionsID         string             `json:"executions_id,omitempty" bson:"executions_id,omitempty"` // ExecutionsID is the ID of the executions | ||||||
| 	WorkflowID           string             `json:"workflow_id,omitempty" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow | 	DestPeerID           string             `json:"dest_peer_id,omitempty"`                                 // DestPeerID is the ID of the destination peer | ||||||
|  | 	WorkflowID           string             `json:"workflow_id,omitempty" bson:"workflow_id,omitempty"`     // WorkflowID is the ID of the workflow | ||||||
| 	ExecutionID          string             `json:"execution_id,omitempty" bson:"execution_id,omitempty" validate:"required"` | 	ExecutionID          string             `json:"execution_id,omitempty" bson:"execution_id,omitempty" validate:"required"` | ||||||
| 	State                enum.BookingStatus `json:"state,omitempty" bson:"state,omitempty" validate:"required"`                             // State is the state of the booking | 	State                enum.BookingStatus `json:"state,omitempty" bson:"state,omitempty" validate:"required"`                             // State is the state of the booking | ||||||
| 	ExpectedStartDate    time.Time          `json:"expected_start_date,omitempty" bson:"expected_start_date,omitempty" validate:"required"` // ExpectedStartDate is the expected start date of the booking | 	ExpectedStartDate    time.Time          `json:"expected_start_date,omitempty" bson:"expected_start_date,omitempty" validate:"required"` // ExpectedStartDate is the expected start date of the booking | ||||||
| @@ -93,7 +94,7 @@ func (d *Booking) VerifyAuth(request *tools.APIRequest) bool { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (r *Booking) StoreDraftDefault() { | func (r *Booking) StoreDraftDefault() { | ||||||
| 	r.IsDraft = true | 	r.IsDraft = false | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) { | func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) { | ||||||
|   | |||||||
| @@ -28,7 +28,7 @@ var models = map[string]func() utils.DBObject{ | |||||||
| 	tools.STORAGE_RESOURCE.String():    func() utils.DBObject { return &resource.StorageResource{} }, | 	tools.STORAGE_RESOURCE.String():    func() utils.DBObject { return &resource.StorageResource{} }, | ||||||
| 	tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &resource.ProcessingResource{} }, | 	tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &resource.ProcessingResource{} }, | ||||||
| 	tools.WORKFLOW.String():            func() utils.DBObject { return &w2.Workflow{} }, | 	tools.WORKFLOW.String():            func() utils.DBObject { return &w2.Workflow{} }, | ||||||
| 	tools.WORKFLOW_EXECUTION.String():  func() utils.DBObject { return &workflow_execution.WorkflowExecutions{} }, | 	tools.WORKFLOW_EXECUTION.String():  func() utils.DBObject { return &workflow_execution.WorkflowExecution{} }, | ||||||
| 	tools.WORKSPACE.String():           func() utils.DBObject { return &w3.Workspace{} }, | 	tools.WORKSPACE.String():           func() utils.DBObject { return &w3.Workspace{} }, | ||||||
| 	tools.PEER.String():                func() utils.DBObject { return &peer.Peer{} }, | 	tools.PEER.String():                func() utils.DBObject { return &peer.Peer{} }, | ||||||
| 	tools.COLLABORATIVE_AREA.String():  func() utils.DBObject { return &collaborative_area.CollaborativeArea{} }, | 	tools.COLLABORATIVE_AREA.String():  func() utils.DBObject { return &collaborative_area.CollaborativeArea{} }, | ||||||
|   | |||||||
| @@ -68,10 +68,10 @@ func (o *Order) Pay(scheduler *workflow_execution.WorkflowSchedule, request *too | |||||||
| 	} else { | 	} else { | ||||||
| 		o.IsDraft = false | 		o.IsDraft = false | ||||||
| 	} | 	} | ||||||
| 	for _, exec := range scheduler.WorkflowExecutions { | 	for _, exec := range scheduler.WorkflowExecution { | ||||||
| 		exec.IsDraft = false | 		exec.IsDraft = false | ||||||
| 		_, code, err := utils.GenericUpdateOne(exec, exec.GetID(), | 		_, code, err := utils.GenericUpdateOne(exec, exec.GetID(), | ||||||
| 			workflow_execution.NewAccessor(request), &workflow_execution.WorkflowExecutions{}) | 			workflow_execution.NewAccessor(request), &workflow_execution.WorkflowExecution{}) | ||||||
| 		if code != 200 || err != nil { | 		if code != 200 || err != nil { | ||||||
| 			return errors.New("could not update the workflow execution" + fmt.Sprintf("%v", err)) | 			return errors.New("could not update the workflow execution" + fmt.Sprintf("%v", err)) | ||||||
| 		} | 		} | ||||||
| @@ -100,7 +100,7 @@ func (o *Order) draftStoreFromModel(scheduler *workflow_execution.WorkflowSchedu | |||||||
| 	o.IsDraft = true | 	o.IsDraft = true | ||||||
| 	o.OrderBy = request.PeerID | 	o.OrderBy = request.PeerID | ||||||
| 	o.WorkflowExecutionIDs = []string{} // create an array of ids | 	o.WorkflowExecutionIDs = []string{} // create an array of ids | ||||||
| 	for _, exec := range scheduler.WorkflowExecutions { | 	for _, exec := range scheduler.WorkflowExecution { | ||||||
| 		o.WorkflowExecutionIDs = append(o.WorkflowExecutionIDs, exec.GetID()) | 		o.WorkflowExecutionIDs = append(o.WorkflowExecutionIDs, exec.GetID()) | ||||||
| 	} | 	} | ||||||
| 	// set the name of the order | 	// set the name of the order | ||||||
| @@ -166,12 +166,12 @@ func (o *Order) draftBookOrder(scheduler *workflow_execution.WorkflowSchedule, r | |||||||
| 	if request == nil { | 	if request == nil { | ||||||
| 		return draftedBookings, errors.New("no request found") | 		return draftedBookings, errors.New("no request found") | ||||||
| 	} | 	} | ||||||
| 	for _, exec := range scheduler.WorkflowExecutions { | 	for _, exec := range scheduler.WorkflowExecution { | ||||||
| 		_, priceds, _, err := scheduler.Workflow.Planify(exec.ExecDate, exec.EndDate, request) | 		_, priceds, _, err := scheduler.Workflow.Planify(exec.ExecDate, exec.EndDate, request) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return draftedBookings, errors.New("could not planify the workflow" + fmt.Sprintf("%v", err)) | 			return draftedBookings, errors.New("could not planify the workflow" + fmt.Sprintf("%v", err)) | ||||||
| 		} | 		} | ||||||
| 		bookings := exec.Book(scheduler.Workflow.UUID, priceds) | 		bookings := exec.Book(scheduler.UUID, scheduler.Workflow.UUID, priceds) | ||||||
| 		for _, booking := range bookings { | 		for _, booking := range bookings { | ||||||
| 			_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", | 			_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", | ||||||
| 				tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) | 				tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) | ||||||
|   | |||||||
| @@ -15,39 +15,40 @@ import ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| /* | /* | ||||||
| * WorkflowExecutions is a struct that represents a list of workflow executions | * WorkflowExecution is a struct that represents a list of workflow executions | ||||||
| * Warning: No user can write (del, post, put) a workflow execution, it is only used by the system | * Warning: No user can write (del, post, put) a workflow execution, it is only used by the system | ||||||
| * workflows generate their own executions | * workflows generate their own executions | ||||||
|  */ |  */ | ||||||
| type WorkflowExecutions struct { | type WorkflowExecution struct { | ||||||
| 	utils.AbstractObject                    // AbstractObject contains the basic fields of an object (id, name) | 	utils.AbstractObject                    // AbstractObject contains the basic fields of an object (id, name) | ||||||
|  | 	ExecutionID          string             `json:"execution_id,omitempty" bson:"execution_id,omitempty" validate:"required"` | ||||||
| 	ExecDate             time.Time          `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` // ExecDate is the execution date of the workflow, is required | 	ExecDate             time.Time          `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` // ExecDate is the execution date of the workflow, is required | ||||||
| 	EndDate              *time.Time         `json:"end_date,omitempty" bson:"end_date,omitempty"`                                 // EndDate is the end date of the workflow | 	EndDate              *time.Time         `json:"end_date,omitempty" bson:"end_date,omitempty"`                                 // EndDate is the end date of the workflow | ||||||
| 	State                enum.BookingStatus `json:"state" bson:"state" default:"0"`                                               // TEMPORARY TODO DEFAULT 1 -> 0 State is the state of the workflow | 	State                enum.BookingStatus `json:"state" bson:"state" default:"0"`                                               // TEMPORARY TODO DEFAULT 1 -> 0 State is the state of the workflow | ||||||
| 	WorkflowID           string             `json:"workflow_id" bson:"workflow_id,omitempty"`                                     // WorkflowID is the ID of the workflow | 	WorkflowID           string             `json:"workflow_id" bson:"workflow_id,omitempty"`                                     // WorkflowID is the ID of the workflow | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *WorkflowExecutions) StoreDraftDefault() { | func (r *WorkflowExecution) StoreDraftDefault() { | ||||||
| 	r.IsDraft = true // TODO: TEMPORARY | 	r.IsDraft = false // TODO: TEMPORARY | ||||||
| 	r.State = enum.DRAFT | 	r.State = enum.SCHEDULED | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *WorkflowExecutions) CanUpdate(set utils.DBObject) (bool, utils.DBObject) { | func (r *WorkflowExecution) CanUpdate(set utils.DBObject) (bool, utils.DBObject) { | ||||||
| 	if r.State != set.(*WorkflowExecutions).State { | 	if r.State != set.(*WorkflowExecution).State { | ||||||
| 		return true, &WorkflowExecutions{State: set.(*WorkflowExecutions).State} // only state can be updated | 		return true, &WorkflowExecution{State: set.(*WorkflowExecution).State} // only state can be updated | ||||||
| 	} | 	} | ||||||
| 	return !r.IsDraft, set // only draft buying can be updated | 	return !r.IsDraft, set // only draft buying can be updated | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *WorkflowExecutions) CanDelete() bool { | func (r *WorkflowExecution) CanDelete() bool { | ||||||
| 	return r.IsDraft // only draft bookings can be deleted | 	return r.IsDraft // only draft bookings can be deleted | ||||||
| } | } | ||||||
|  |  | ||||||
| func (wfa *WorkflowExecutions) Equals(we *WorkflowExecutions) bool { | func (wfa *WorkflowExecution) Equals(we *WorkflowExecution) bool { | ||||||
| 	return wfa.ExecDate.Equal(we.ExecDate) && wfa.WorkflowID == we.WorkflowID | 	return wfa.ExecDate.Equal(we.ExecDate) && wfa.WorkflowID == we.WorkflowID | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ws *WorkflowExecutions) PurgeDraft(request *tools.APIRequest) error { | func (ws *WorkflowExecution) PurgeDraft(request *tools.APIRequest) error { | ||||||
| 	if ws.EndDate == nil { | 	if ws.EndDate == nil { | ||||||
| 		// if no end... then Book like a savage | 		// if no end... then Book like a savage | ||||||
| 		e := ws.ExecDate.Add(time.Hour) | 		e := ws.ExecDate.Add(time.Hour) | ||||||
| @@ -74,7 +75,7 @@ func (ws *WorkflowExecutions) PurgeDraft(request *tools.APIRequest) error { | |||||||
| } | } | ||||||
|  |  | ||||||
| // tool to transform the argo status to a state | // tool to transform the argo status to a state | ||||||
| func (wfa *WorkflowExecutions) ArgoStatusToState(status string) *WorkflowExecutions { | func (wfa *WorkflowExecution) ArgoStatusToState(status string) *WorkflowExecution { | ||||||
| 	status = strings.ToLower(status) | 	status = strings.ToLower(status) | ||||||
| 	switch status { | 	switch status { | ||||||
| 	case "succeeded": // Succeeded | 	case "succeeded": // Succeeded | ||||||
| @@ -89,29 +90,29 @@ func (wfa *WorkflowExecutions) ArgoStatusToState(status string) *WorkflowExecuti | |||||||
| 	return wfa | 	return wfa | ||||||
| } | } | ||||||
|  |  | ||||||
| func (r *WorkflowExecutions) GenerateID() { | func (r *WorkflowExecution) GenerateID() { | ||||||
| 	r.UUID = uuid.New().String() | 	r.UUID = uuid.New().String() | ||||||
| } | } | ||||||
|  |  | ||||||
| func (d *WorkflowExecutions) GetName() string { | func (d *WorkflowExecution) GetName() string { | ||||||
| 	return d.UUID + "_" + d.ExecDate.String() | 	return d.UUID + "_" + d.ExecDate.String() | ||||||
| } | } | ||||||
|  |  | ||||||
| func (d *WorkflowExecutions) GetAccessor(request *tools.APIRequest) utils.Accessor { | func (d *WorkflowExecution) GetAccessor(request *tools.APIRequest) utils.Accessor { | ||||||
| 	return NewAccessor(request) // Create a new instance of the accessor | 	return NewAccessor(request) // Create a new instance of the accessor | ||||||
| } | } | ||||||
|  |  | ||||||
| func (d *WorkflowExecutions) VerifyAuth(request *tools.APIRequest) bool { | func (d *WorkflowExecution) VerifyAuth(request *tools.APIRequest) bool { | ||||||
| 	return true | 	return true | ||||||
| } | } | ||||||
|  |  | ||||||
| func (d *WorkflowExecutions) Book(wfID string, priceds map[tools.DataType][]pricing.PricedItemITF) []*booking.Booking { | func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[tools.DataType][]pricing.PricedItemITF) []*booking.Booking { | ||||||
| 	booking := d.bookEach(wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE]) | 	booking := d.bookEach(executionsID, wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE]) | ||||||
| 	booking = append(booking, d.bookEach(wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...) | 	booking = append(booking, d.bookEach(executionsID, wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...) | ||||||
| 	return booking | 	return booking | ||||||
| } | } | ||||||
|  |  | ||||||
| func (d *WorkflowExecutions) bookEach(wfID string, dt tools.DataType, priceds []pricing.PricedItemITF) []*booking.Booking { | func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.DataType, priceds []pricing.PricedItemITF) []*booking.Booking { | ||||||
| 	items := []*booking.Booking{} | 	items := []*booking.Booking{} | ||||||
| 	for _, priced := range priceds { | 	for _, priced := range priceds { | ||||||
| 		start := d.ExecDate | 		start := d.ExecDate | ||||||
| @@ -120,7 +121,8 @@ func (d *WorkflowExecutions) bookEach(wfID string, dt tools.DataType, priceds [] | |||||||
| 		} | 		} | ||||||
| 		end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second) | 		end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second) | ||||||
| 		bookingItem := &booking.Booking{ | 		bookingItem := &booking.Booking{ | ||||||
| 			State:             enum.DRAFT, | 			ExecutionsID:      executionsID, | ||||||
|  | 			State:             enum.SCHEDULED, | ||||||
| 			ResourceID:        priced.GetID(), | 			ResourceID:        priced.GetID(), | ||||||
| 			ResourceType:      dt, | 			ResourceType:      dt, | ||||||
| 			DestPeerID:        priced.GetCreatorID(), | 			DestPeerID:        priced.GetCreatorID(), | ||||||
|   | |||||||
| @@ -43,11 +43,11 @@ func (wfa *workflowExecutionMongoAccessor) DeleteOne(id string) (utils.DBObject, | |||||||
| } | } | ||||||
|  |  | ||||||
| func (wfa *workflowExecutionMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { | func (wfa *workflowExecutionMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { | ||||||
| 	if set.(*WorkflowExecutions).State == 0 { | 	if set.(*WorkflowExecution).State == 0 { | ||||||
| 		return nil, 400, errors.New("state is required") | 		return nil, 400, errors.New("state is required") | ||||||
| 	} | 	} | ||||||
| 	realSet := WorkflowExecutions{State: set.(*WorkflowExecutions).State} | 	realSet := WorkflowExecution{State: set.(*WorkflowExecution).State} | ||||||
| 	return utils.GenericUpdateOne(&realSet, id, wfa, &WorkflowExecutions{}) | 	return utils.GenericUpdateOne(&realSet, id, wfa, &WorkflowExecution{}) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (wfa *workflowExecutionMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { | func (wfa *workflowExecutionMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { | ||||||
| @@ -59,13 +59,13 @@ func (wfa *workflowExecutionMongoAccessor) CopyOne(data utils.DBObject) (utils.D | |||||||
| } | } | ||||||
|  |  | ||||||
| func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { | func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { | ||||||
| 	return utils.GenericLoadOne[*WorkflowExecutions](id, func(d utils.DBObject) (utils.DBObject, int, error) { | 	return utils.GenericLoadOne[*WorkflowExecution](id, func(d utils.DBObject) (utils.DBObject, int, error) { | ||||||
| 		if d.(*WorkflowExecutions).State == enum.DRAFT && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) { | 		if d.(*WorkflowExecution).State == enum.DRAFT && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecution).ExecDate) { | ||||||
| 			utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request)) | 			utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request)) | ||||||
| 			return nil, 404, errors.New("not found") | 			return nil, 404, errors.New("not found") | ||||||
| 		} | 		} | ||||||
| 		if d.(*WorkflowExecutions).State == enum.SCHEDULED && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) { | 		if d.(*WorkflowExecution).State == enum.SCHEDULED && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecution).ExecDate) { | ||||||
| 			d.(*WorkflowExecutions).State = enum.FORGOTTEN | 			d.(*WorkflowExecution).State = enum.FORGOTTEN | ||||||
| 			utils.GenericRawUpdateOne(d, id, newShallowAccessor(a.Request)) | 			utils.GenericRawUpdateOne(d, id, newShallowAccessor(a.Request)) | ||||||
| 		} | 		} | ||||||
| 		return d, 200, nil | 		return d, 200, nil | ||||||
| @@ -73,21 +73,21 @@ func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int | |||||||
| } | } | ||||||
|  |  | ||||||
| func (a *workflowExecutionMongoAccessor) LoadAll(isDraft bool) ([]utils.ShallowDBObject, int, error) { | func (a *workflowExecutionMongoAccessor) LoadAll(isDraft bool) ([]utils.ShallowDBObject, int, error) { | ||||||
| 	return utils.GenericLoadAll[*WorkflowExecutions](a.getExec(), isDraft, a) | 	return utils.GenericLoadAll[*WorkflowExecution](a.getExec(), isDraft, a) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (a *workflowExecutionMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) { | func (a *workflowExecutionMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) { | ||||||
| 	return utils.GenericSearch[*WorkflowExecutions](filters, search, a.GetExecFilters(search), a.getExec(), isDraft, a) | 	return utils.GenericSearch[*WorkflowExecution](filters, search, a.GetExecFilters(search), a.getExec(), isDraft, a) | ||||||
| } | } | ||||||
|  |  | ||||||
| func (a *workflowExecutionMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject { | func (a *workflowExecutionMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject { | ||||||
| 	return func(d utils.DBObject) utils.ShallowDBObject { | 	return func(d utils.DBObject) utils.ShallowDBObject { | ||||||
| 		if d.(*WorkflowExecutions).State == enum.DRAFT && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) { | 		if d.(*WorkflowExecution).State == enum.DRAFT && time.Now().UTC().After(d.(*WorkflowExecution).ExecDate) { | ||||||
| 			utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request)) | 			utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request)) | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| 		if d.(*WorkflowExecutions).State == enum.SCHEDULED && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) { | 		if d.(*WorkflowExecution).State == enum.SCHEDULED && time.Now().UTC().After(d.(*WorkflowExecution).ExecDate) { | ||||||
| 			d.(*WorkflowExecutions).State = enum.FORGOTTEN | 			d.(*WorkflowExecution).State = enum.FORGOTTEN | ||||||
| 			utils.GenericRawUpdateOne(d, d.GetID(), newShallowAccessor(a.Request)) | 			utils.GenericRawUpdateOne(d, d.GetID(), newShallowAccessor(a.Request)) | ||||||
| 			return d | 			return d | ||||||
| 		} | 		} | ||||||
|   | |||||||
| @@ -11,6 +11,7 @@ import ( | |||||||
| 	"cloud.o-forge.io/core/oc-lib/models/utils" | 	"cloud.o-forge.io/core/oc-lib/models/utils" | ||||||
| 	"cloud.o-forge.io/core/oc-lib/models/workflow" | 	"cloud.o-forge.io/core/oc-lib/models/workflow" | ||||||
| 	"cloud.o-forge.io/core/oc-lib/tools" | 	"cloud.o-forge.io/core/oc-lib/tools" | ||||||
|  | 	"github.com/google/uuid" | ||||||
| 	"github.com/robfig/cron" | 	"github.com/robfig/cron" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -20,14 +21,15 @@ import ( | |||||||
|  */ |  */ | ||||||
| // it's a flying object only use in a session time. It's not stored in the database | // it's a flying object only use in a session time. It's not stored in the database | ||||||
| type WorkflowSchedule struct { | type WorkflowSchedule struct { | ||||||
| 	Workflow           *workflow.Workflow    `json:"workflow,omitempty"`                    // Workflow is the workflow dependancy of the schedule | 	UUID              string               `json:"id" validate:"required"`                // ExecutionsID is the list of the executions id of the workflow | ||||||
| 	WorkflowExecutions []*WorkflowExecutions `json:"workflow_executions,omitempty"`         // WorkflowExecutions is the list of executions of the workflow | 	Workflow          *workflow.Workflow   `json:"workflow,omitempty"`                    // Workflow is the workflow dependancy of the schedule | ||||||
| 	Message            string                `json:"message,omitempty"`                     // Message is the message of the schedule | 	WorkflowExecution []*WorkflowExecution `json:"workflow_executions,omitempty"`         // WorkflowExecution is the list of executions of the workflow | ||||||
| 	Warning            string                `json:"warning,omitempty"`                     // Warning is the warning message of the schedule | 	Message           string               `json:"message,omitempty"`                     // Message is the message of the schedule | ||||||
| 	Start              time.Time             `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time | 	Warning           string               `json:"warning,omitempty"`                     // Warning is the warning message of the schedule | ||||||
| 	End                *time.Time            `json:"end,omitempty"`                         // End is the end time of the schedule, is required and must be greater than the Start time | 	Start             time.Time            `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time | ||||||
| 	DurationS          float64               `json:"duration_s" default:"-1"`               // End is the end time of the schedule | 	End               *time.Time           `json:"end,omitempty"`                         // End is the end time of the schedule, is required and must be greater than the Start time | ||||||
| 	Cron               string                `json:"cron,omitempty"`                        // here the cron format : ss mm hh dd MM dw task | 	DurationS         float64              `json:"duration_s" default:"-1"`               // End is the end time of the schedule | ||||||
|  | 	Cron              string               `json:"cron,omitempty"`                        // here the cron format : ss mm hh dd MM dw task | ||||||
| } | } | ||||||
|  |  | ||||||
| func NewScheduler(start string, end string, durationInS float64, cron string) *WorkflowSchedule { | func NewScheduler(start string, end string, durationInS float64, cron string) *WorkflowSchedule { | ||||||
| @@ -36,6 +38,7 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W | |||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 	ws := &WorkflowSchedule{ | 	ws := &WorkflowSchedule{ | ||||||
|  | 		UUID:      uuid.New().String(), | ||||||
| 		Start:     s, | 		Start:     s, | ||||||
| 		DurationS: durationInS, | 		DurationS: durationInS, | ||||||
| 		Cron:      cron, | 		Cron:      cron, | ||||||
| @@ -47,19 +50,19 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W | |||||||
| 	return ws | 	return ws | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecutions, error) { | func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, error) { | ||||||
| 	if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { | 	if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { | ||||||
| 		return false, nil, []*WorkflowExecutions{}, errors.New("no caller defined") | 		return false, nil, []*WorkflowExecution{}, errors.New("no caller defined") | ||||||
| 	} | 	} | ||||||
| 	access := workflow.NewAccessor(request) | 	access := workflow.NewAccessor(request) | ||||||
| 	res, code, err := access.LoadOne(wfID) | 	res, code, err := access.LoadOne(wfID) | ||||||
| 	if code != 200 { | 	if code != 200 { | ||||||
| 		return false, nil, []*WorkflowExecutions{}, errors.New("could not load the workflow with id: " + err.Error()) | 		return false, nil, []*WorkflowExecution{}, errors.New("could not load the workflow with id: " + err.Error()) | ||||||
| 	} | 	} | ||||||
| 	wf := res.(*workflow.Workflow) | 	wf := res.(*workflow.Workflow) | ||||||
| 	longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request) | 	longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return false, wf, []*WorkflowExecutions{}, err | 		return false, wf, []*WorkflowExecution{}, err | ||||||
| 	} | 	} | ||||||
| 	ws.DurationS = longest | 	ws.DurationS = longest | ||||||
| 	ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds." | 	ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds." | ||||||
| @@ -68,10 +71,10 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) | |||||||
| 	} | 	} | ||||||
| 	execs, err := ws.getExecutions(wf) | 	execs, err := ws.getExecutions(wf) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return false, wf, []*WorkflowExecutions{}, err | 		return false, wf, []*WorkflowExecution{}, err | ||||||
| 	} | 	} | ||||||
| 	for _, exec := range execs { | 	for _, exec := range execs { | ||||||
| 		bookings := exec.Book(wfID, priceds) | 		bookings := exec.Book(ws.UUID, wfID, priceds) | ||||||
| 		for _, b := range bookings { | 		for _, b := range bookings { | ||||||
| 			meth := request.Caller.URLS[tools.BOOKING][tools.GET] | 			meth := request.Caller.URLS[tools.BOOKING][tools.GET] | ||||||
| 			meth = strings.ReplaceAll(meth, ":id", b.ResourceID) | 			meth = strings.ReplaceAll(meth, ":id", b.ResourceID) | ||||||
| @@ -83,34 +86,42 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) | |||||||
| 				return false, wf, execs, err | 				return false, wf, execs, err | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
|  | 		for _, booking := range bookings { | ||||||
|  | 			_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", | ||||||
|  | 				tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) | ||||||
|  | 			if err != nil { | ||||||
|  | 				return false, wf, execs, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  |  | ||||||
| 	} | 	} | ||||||
| 	return true, wf, execs, nil | 	return true, wf, execs, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecutions, error) { | func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { | ||||||
| 	if request == nil { | 	if request == nil { | ||||||
| 		return ws, nil, []*WorkflowExecutions{}, errors.New("no request found") | 		return ws, nil, []*WorkflowExecution{}, errors.New("no request found") | ||||||
| 	} | 	} | ||||||
| 	c := request.Caller | 	c := request.Caller | ||||||
| 	if c == nil || c.URLS == nil || c.URLS[tools.BOOKING] == nil { | 	if c == nil || c.URLS == nil || c.URLS[tools.BOOKING] == nil { | ||||||
| 		return ws, nil, []*WorkflowExecutions{}, errors.New("no caller defined") | 		return ws, nil, []*WorkflowExecution{}, errors.New("no caller defined") | ||||||
| 	} | 	} | ||||||
| 	methods := c.URLS[tools.BOOKING] | 	methods := c.URLS[tools.BOOKING] | ||||||
| 	if _, ok := methods[tools.GET]; !ok { | 	if _, ok := methods[tools.GET]; !ok { | ||||||
| 		return ws, nil, []*WorkflowExecutions{}, errors.New("no path found") | 		return ws, nil, []*WorkflowExecution{}, errors.New("no path found") | ||||||
| 	} | 	} | ||||||
| 	ok, wf, executions, err := ws.CheckBooking(wfID, request) | 	ok, wf, executions, err := ws.CheckBooking(wfID, request) | ||||||
| 	if !ok || err != nil { | 	if !ok || err != nil { | ||||||
| 		return ws, nil, []*WorkflowExecutions{}, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) | 		return ws, nil, []*WorkflowExecution{}, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	ws.Workflow = wf | 	ws.Workflow = wf | ||||||
| 	ws.WorkflowExecutions = executions | 	ws.WorkflowExecution = executions | ||||||
|  |  | ||||||
| 	for _, exec := range executions { | 	for _, exec := range executions { | ||||||
| 		err := exec.PurgeDraft(request) | 		err := exec.PurgeDraft(request) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			return ws, nil, []*WorkflowExecutions{}, errors.New("purge draft" + fmt.Sprintf("%v", err)) | 			return ws, nil, []*WorkflowExecution{}, errors.New("purge draft" + fmt.Sprintf("%v", err)) | ||||||
| 		} | 		} | ||||||
| 		exec.GenerateID() | 		exec.GenerateID() | ||||||
| 		exec.StoreDraftDefault() | 		exec.StoreDraftDefault() | ||||||
| @@ -132,14 +143,14 @@ VERIFY THAT WE HANDLE DIFFERENCE BETWEEN LOCATION TIME && BOOKING | |||||||
| * getExecutions is a function that returns the executions of a workflow | * getExecutions is a function that returns the executions of a workflow | ||||||
| * it returns an array of workflow_execution.WorkflowExecution | * it returns an array of workflow_execution.WorkflowExecution | ||||||
|  */ |  */ | ||||||
| func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*WorkflowExecutions, error) { | func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*WorkflowExecution, error) { | ||||||
| 	workflows_executions := []*WorkflowExecutions{} | 	workflows_executions := []*WorkflowExecution{} | ||||||
| 	dates, err := ws.getDates() | 	dates, err := ws.getDates() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return workflows_executions, err | 		return workflows_executions, err | ||||||
| 	} | 	} | ||||||
| 	for _, date := range dates { | 	for _, date := range dates { | ||||||
| 		obj := &WorkflowExecutions{ | 		obj := &WorkflowExecution{ | ||||||
| 			AbstractObject: utils.AbstractObject{ | 			AbstractObject: utils.AbstractObject{ | ||||||
| 				Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution | 				Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution | ||||||
| 			}, | 			}, | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user