massive draft for payment process (UNCOMPLETE)

This commit is contained in:
mr
2024-12-12 16:25:47 +01:00
parent fbbce7817b
commit 02d1e93c78
55 changed files with 3018 additions and 1177 deletions

View File

@@ -1,124 +1,136 @@
package workflow_execution
import (
"encoding/json"
"strings"
"time"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
"go.mongodb.org/mongo-driver/bson/primitive"
)
// ScheduledType - Enum for the different states of a workflow execution
type ScheduledType int
const (
SCHEDULED ScheduledType = iota + 1
STARTED
FAILURE
SUCCESS
FORGOTTEN
)
var str = [...]string{
"scheduled",
"started",
"failure",
"success",
"forgotten",
}
func FromInt(i int) string {
return str[i]
}
func (d ScheduledType) String() string {
return str[d]
}
// EnumIndex - Creating common behavior-give the type a EnumIndex functio
func (d ScheduledType) EnumIndex() int {
return int(d)
}
/*
* WorkflowExecutions is a struct that represents a list of workflow executions
* Warning: No user can write (del, post, put) a workflow execution, it is only used by the system
* workflows generate their own executions
*/
type WorkflowExecutions struct {
WorkflowID string `json:"workflow_id" bson:"workflow_id"`
ResourceID string `json:"resource_id" bson:"resource_id"`
Executions []*WorkflowExecution `json:"executions" bson:"executions"`
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
ExecDate time.Time `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` // ExecDate is the execution date of the workflow, is required
EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` // EndDate is the end date of the workflow
State common.ScheduledType `json:"state" bson:"state" default:"0"` // State is the state of the workflow
WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
}
// New - Creates a new instance of the WorkflowExecutions from a map
func (dma *WorkflowExecutions) Deserialize(j map[string]interface{}) *WorkflowExecutions {
b, err := json.Marshal(j)
if err != nil {
return nil
func (r *WorkflowExecutions) StoreDraftDefault() {
r.IsDraft = true
}
func (r *WorkflowExecutions) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
if r.State != set.(*WorkflowExecutions).State {
return true, &WorkflowExecutions{State: set.(*WorkflowExecutions).State} // only state can be updated
}
json.Unmarshal(b, dma)
return dma
return r.IsDraft, set // only draft buying can be updated
}
// Serialize - Returns the WorkflowExecutions as a map
func (dma *WorkflowExecutions) Serialize() map[string]interface{} {
var m map[string]interface{}
b, err := json.Marshal(dma)
if err != nil {
return nil
func (r *WorkflowExecutions) CanDelete() bool {
return r.IsDraft // only draft bookings can be deleted
}
func (wfa *WorkflowExecutions) Equals(we *WorkflowExecutions) bool {
return wfa.ExecDate.Equal(we.ExecDate) && wfa.WorkflowID == we.WorkflowID
}
func (ws *WorkflowExecutions) PurgeDraft(request *tools.APIRequest) error {
if ws.EndDate == nil {
// if no end... then Book like a savage
e := ws.ExecDate.Add(time.Hour)
ws.EndDate = &e
}
json.Unmarshal(b, &m)
return m
}
/*
* WorkflowExecution is a struct that represents a workflow execution
* Warning: No user can write (del, post, put) a workflow execution, it is only used by the system
* workflows generate their own executions
*/
type WorkflowExecution struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
ExecDate *time.Time `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` // ExecDate is the execution date of the workflow, is required
EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` // EndDate is the end date of the workflow
State ScheduledType `json:"state" bson:"state" default:"0"` // State is the state of the workflow
WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
}
func (wfa *WorkflowExecution) Equals(we *WorkflowExecution) bool {
return wfa.ExecDate.Equal(*we.ExecDate) && wfa.WorkflowID == we.WorkflowID
accessor := ws.GetAccessor(request)
res, code, err := accessor.Search(&dbs.Filters{
And: map[string][]dbs.Filter{ // check if there is a booking on the same compute resource by filtering on the compute_resource_id, the state and the execution date
"state": {{Operator: dbs.EQUAL.String(), Value: common.DRAFT.EnumIndex()}},
"workflow_id": {{Operator: dbs.EQUAL.String(), Value: ws.WorkflowID}},
"execution_date": {
{Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(*ws.EndDate)},
{Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(ws.ExecDate)},
},
},
}, "", ws.IsDraft)
if code != 200 || err != nil {
return err
}
for _, r := range res {
accessor.DeleteOne(r.GetID())
}
return nil
}
// tool to transform the argo status to a state
func (wfa *WorkflowExecution) ArgoStatusToState(status string) *WorkflowExecution {
func (wfa *WorkflowExecutions) ArgoStatusToState(status string) *WorkflowExecutions {
status = strings.ToLower(status)
switch status {
case "succeeded": // Succeeded
wfa.State = SUCCESS
wfa.State = common.SUCCESS
case "pending": // Pending
wfa.State = SCHEDULED
wfa.State = common.SCHEDULED
case "running": // Running
wfa.State = STARTED
wfa.State = common.STARTED
default: // Failed
wfa.State = FAILURE
wfa.State = common.FAILURE
}
return wfa
}
func (r *WorkflowExecution) GenerateID() {
func (r *WorkflowExecutions) GenerateID() {
r.UUID = uuid.New().String()
}
func (d *WorkflowExecution) GetName() string {
func (d *WorkflowExecutions) GetName() string {
return d.UUID + "_" + d.ExecDate.String()
}
func (d *WorkflowExecution) GetAccessor(username string, peerID string, groups []string, caller *tools.HTTPCaller) utils.Accessor {
return New(tools.WORKFLOW_EXECUTION, username, peerID, groups, caller) // Create a new instance of the accessor
func (d *WorkflowExecutions) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewShallowAccessor(request) // Create a new instance of the accessor
}
func (d *WorkflowExecution) VerifyAuth(username string, peerID string, groups []string) bool {
func (d *WorkflowExecutions) VerifyAuth(request *tools.APIRequest) bool {
return true
}
func (d *WorkflowExecutions) ToBookings(wf *workflow.Workflow) []*booking.Booking {
booking := []*booking.Booking{}
for _, p := range wf.ProcessingResources {
booking = append(booking, d.toItemBooking(wf.GetByRelatedProcessing(p.GetID(), wf.IsStorage))...)
booking = append(booking, d.toItemBooking(wf.GetByRelatedProcessing(p.GetID(), wf.IsProcessing))...)
}
return booking
}
func (d *WorkflowExecutions) toItemBooking(ss []resources.ShallowResourceInterface) []*booking.Booking {
items := []*booking.Booking{}
for _, s := range ss {
start := d.ExecDate
if s := s.GetLocationStart(); s != nil {
start = *s
}
end := start.Add(time.Duration(s.GetExplicitDurationInS()) * time.Second)
bookingItem := &booking.Booking{
State: common.DRAFT,
ResourceID: s.GetID(),
ResourceType: s.GetType(),
DestPeerID: s.GetCreatorID(),
ExpectedStartDate: start,
ExpectedEndDate: &end,
}
items = append(items, bookingItem)
}
return items
}

View File

@@ -1,10 +1,12 @@
package workflow_execution
import (
"errors"
"time"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/common"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
@@ -13,57 +15,62 @@ type workflowExecutionMongoAccessor struct {
utils.AbstractAccessor
}
func New(t tools.DataType, username string, peerID string, groups []string, caller *tools.HTTPCaller) *workflowExecutionMongoAccessor {
func NewAccessor(request *tools.APIRequest) *workflowExecutionMongoAccessor {
return &workflowExecutionMongoAccessor{
utils.AbstractAccessor{
Logger: logs.CreateLogger(t.String()), // Create a logger with the data type
Caller: caller,
PeerID: peerID,
User: username, // Set the caller
Groups: groups, // Set the caller
Type: t,
Logger: logs.CreateLogger(tools.WORKFLOW_EXECUTION.String()), // Create a logger with the data type
Request: request,
Type: tools.WORKFLOW_EXECUTION,
},
}
}
func (wfa *workflowExecutionMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) {
return utils.GenericDeleteOne(id, wfa)
return nil, 404, errors.New("not implemented")
}
func (wfa *workflowExecutionMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
return utils.GenericUpdateOne(set, id, wfa, &WorkflowExecution{})
return nil, 404, errors.New("not implemented")
}
func (wfa *workflowExecutionMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
return utils.GenericStoreOne(data, wfa)
return nil, 404, errors.New("not implemented")
}
func (wfa *workflowExecutionMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
return utils.GenericStoreOne(data, wfa)
return nil, 404, errors.New("not implemented")
}
func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne[*WorkflowExecution](id, func(d utils.DBObject) (utils.DBObject, int, error) {
if d.(*WorkflowExecution).State == SCHEDULED && time.Now().UTC().After(*d.(*WorkflowExecution).ExecDate) {
d.(*WorkflowExecution).State = FORGOTTEN
return utils.GenericLoadOne[*WorkflowExecutions](id, func(d utils.DBObject) (utils.DBObject, int, error) {
if d.(*WorkflowExecutions).State == common.DRAFT && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
utils.GenericDeleteOne(d.GetID(), a)
return nil, 404, errors.New("Not found")
}
if d.(*WorkflowExecutions).State == common.SCHEDULED && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
d.(*WorkflowExecutions).State = common.FORGOTTEN
utils.GenericRawUpdateOne(d, id, a)
}
return d, 200, nil
}, a)
}
func (a *workflowExecutionMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) {
return utils.GenericLoadAll[*WorkflowExecution](a.getExec(), a)
func (a *workflowExecutionMongoAccessor) LoadAll(isDraft bool) ([]utils.ShallowDBObject, int, error) {
return utils.GenericLoadAll[*WorkflowExecutions](a.getExec(), isDraft, a)
}
func (a *workflowExecutionMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) {
return utils.GenericSearch[*WorkflowExecution](filters, search, (&WorkflowExecution{}).GetObjectFilters(search), a.getExec(), a)
func (a *workflowExecutionMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) {
return utils.GenericSearch[*WorkflowExecutions](filters, search, (&WorkflowExecutions{}).GetObjectFilters(search), a.getExec(), isDraft, a)
}
func (a *workflowExecutionMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject {
return func(d utils.DBObject) utils.ShallowDBObject {
if d.(*WorkflowExecution).State == SCHEDULED && time.Now().UTC().After(*d.(*WorkflowExecution).ExecDate) {
d.(*WorkflowExecution).State = FORGOTTEN
if d.(*WorkflowExecutions).State == common.DRAFT && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
utils.GenericDeleteOne(d.GetID(), a)
return nil
}
if d.(*WorkflowExecutions).State == common.SCHEDULED && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
d.(*WorkflowExecutions).State = common.FORGOTTEN
utils.GenericRawUpdateOne(d, d.GetID(), a)
}
return d

View File

@@ -0,0 +1,249 @@
package workflow_execution
import (
"errors"
"fmt"
"strings"
"time"
"cloud.o-forge.io/core/oc-lib/models/common"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/robfig/cron"
)
type Schedule struct {
Start time.Time
End *time.Time
}
/*
* WorkflowSchedule is a struct that contains the scheduling information of a workflow
* It contains the mode of the schedule (Task or Service), the name of the schedule, the start and end time of the schedule and the cron expression
*/
// it's a flying object only use in a session time. It's not stored in the database
type WorkflowSchedule struct {
Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule
WorkflowExecutions []*WorkflowExecutions `json:"workflow_executions,omitempty"` // WorkflowExecutions is the list of executions of the workflow
Message string `json:"message,omitempty"` // Message is the message of the schedule
Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule
Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time
End *time.Time `json:"end,omitempty"` // End is the end time of the schedule, is required and must be greater than the Start time
DurationS float64 `json:"duration_s" default:"-1"` // End is the end time of the schedule
Cron string `json:"cron,omitempty"` // here the cron format : ss mm hh dd MM dw task
}
func NewScheduler(start string, end string, durationInS float64, cron string) *WorkflowSchedule {
s, err := time.Parse("2006-01-02T15:04:05", start)
if err != nil {
return nil
}
ws := &WorkflowSchedule{
Start: s,
DurationS: durationInS,
Cron: cron,
}
e, err := time.Parse("2006-01-02T15:04:05", end)
if err == nil {
ws.End = &e
}
return ws
}
func (ws *WorkflowSchedule) CheckBooking(wfID string, caller *tools.HTTPCaller) (bool, *workflow.Workflow, []*WorkflowExecutions, error) {
if caller == nil && caller.URLS == nil && caller.URLS[tools.BOOKING] == nil || caller.URLS[tools.BOOKING][tools.POST] == "" {
return false, nil, []*WorkflowExecutions{}, errors.New("no caller defined")
}
access := workflow.NewAccessor(nil)
res, code, err := access.LoadOne(wfID)
if code != 200 {
return false, nil, []*WorkflowExecutions{}, errors.New("could not load the workflow with id: " + err.Error())
}
wf := res.(*workflow.Workflow)
wf, err = ws.planifyWorkflow(wf)
if err != nil {
return false, wf, []*WorkflowExecutions{}, err
}
ws.DurationS = wf.GetLongestTime(ws.End)
ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + "seconds."
if ws.End != nil && ws.Start.Add(time.Duration(wf.GetLongestTime(ws.End))*time.Second).After(*ws.End) {
ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n"
}
execs, err := ws.getExecutions(wf)
if err != nil {
return false, wf, []*WorkflowExecutions{}, err
}
for _, exec := range execs {
bookings := exec.ToBookings(wf)
for _, booking := range bookings {
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POSTCHECK, booking.Serialize(booking), caller)
if err != nil {
return false, wf, execs, err
}
}
}
return true, wf, execs, nil
}
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*workflow.Workflow, []*WorkflowExecutions, error) {
if request == nil {
return nil, []*WorkflowExecutions{}, errors.New("no request found")
}
c := request.Caller
if c == nil || c.URLS == nil || c.URLS[tools.BOOKING] == nil {
return nil, []*WorkflowExecutions{}, errors.New("no caller defined")
}
methods := c.URLS[tools.BOOKING]
if _, ok := methods[tools.POST]; !ok {
return nil, []*WorkflowExecutions{}, errors.New("no path found")
}
ok, wf, executions, err := ws.CheckBooking(wfID, request.Caller)
if !ok || err != nil {
return nil, []*WorkflowExecutions{}, errors.New("could not book the workflow" + fmt.Sprintf("%v", err))
}
ws.Workflow = wf
ws.WorkflowExecutions = executions
for _, exec := range executions {
err := exec.PurgeDraft(request)
if err != nil {
return nil, []*WorkflowExecutions{}, errors.New("could not book the workflow" + fmt.Sprintf("%v", err))
}
exec.GenerateID()
// Should DELETE the previous execution2
utils.GenericStoreOne(exec, NewAccessor(request))
}
return wf, executions, nil
}
func (ws *WorkflowSchedule) planifyWorkflow(wf *workflow.Workflow) (*workflow.Workflow, error) {
processings := []*resources.CustomizedProcessingResource{}
for _, item := range wf.GetGraphItems(wf.IsProcessing) {
realItem := item.GetResource().(*resources.CustomizedProcessingResource)
timeFromStartS := wf.Graph.GetAverageTimeProcessingBeforeStart(0, realItem.GetID())
started := ws.Start.Add(time.Duration(timeFromStartS) * time.Second)
wf.Graph.SetItemStartUsage(item.ID, started)
wf.Graph.SetItemEndUsage(item.ID, started.Add(time.Duration(realItem.ExplicitBookingDurationS)))
processings = append(processings, realItem)
}
for _, item := range wf.GetGraphItems(wf.IsData) {
wf.Graph.SetItemStartUsage(item.ID, ws.Start)
wf.Graph.SetItemEndUsage(item.ID, *ws.End)
}
for _, f := range []func(graph.GraphItem) bool{wf.IsStorage, wf.IsCompute} {
for _, item := range wf.GetGraphItems(f) {
nearestStart, longestDuration := wf.Graph.GetAverageTimeRelatedToProcessingActivity(ws.Start, processings, item.GetResource(),
func(i graph.GraphItem) resources.ShallowResourceInterface {
if f(i) {
return i.GetResource()
} else {
return nil
}
})
started := ws.Start.Add(time.Duration(nearestStart) * time.Second)
wf.Graph.SetItemStartUsage(item.ID, started)
if longestDuration >= 0 {
wf.Graph.SetItemEndUsage(item.ID, started.Add(time.Duration(longestDuration)))
}
}
}
for _, item := range wf.GetGraphItems(wf.IsWorkflow) {
access := workflow.NewAccessor(nil)
res, code, err := access.LoadOne(item.GetResource().GetID())
if code != 200 || err != nil {
return nil, errors.New("could not load the workflow with id: " + fmt.Sprintf("%v", err.Error()))
}
innerWF := res.(*workflow.Workflow)
innerWF, err = ws.planifyWorkflow(innerWF)
started := ws.Start.Add(time.Duration(innerWF.GetNearestStart(ws.Start)) * time.Second)
wf.Graph.SetItemStartUsage(item.ID, started)
durationE := time.Duration(innerWF.GetLongestTime(ws.End))
if durationE < 0 {
continue
}
ended := ws.Start.Add(durationE * time.Second)
wf.Graph.SetItemEndUsage(item.ID, ended)
}
return wf, nil
}
/*
BOOKING IMPLIED TIME, not of subscription but of execution
so is processing time execution time applied on computes
data can improve the processing time
time should implied a security time border (10sec) if not from the same executions
VERIFY THAT WE HANDLE DIFFERENCE BETWEEN LOCATION TIME && BOOKING
*/
/*
* getExecutions is a function that returns the executions of a workflow
* it returns an array of workflow_execution.WorkflowExecution
*/
func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*WorkflowExecutions, error) {
workflows_executions := []*WorkflowExecutions{}
dates, err := ws.getDates()
if err != nil {
return workflows_executions, err
}
for _, date := range dates {
obj := &WorkflowExecutions{
AbstractObject: utils.AbstractObject{
Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution
},
ExecDate: date.Start, // set the execution date
EndDate: date.End, // set the end date
State: common.DRAFT, // set the state to 1 (scheduled)
WorkflowID: workflow.GetID(), // set the workflow id dependancy of the execution
}
workflows_executions = append(workflows_executions, obj)
}
return workflows_executions, nil
}
func (ws *WorkflowSchedule) getDates() ([]Schedule, error) {
schedule := []Schedule{}
if len(ws.Cron) > 0 { // if cron is set then end date should be set
if ws.End == nil {
return schedule, errors.New("a cron task should have an end date.")
}
if ws.DurationS <= 0 {
ws.DurationS = ws.End.Sub(ws.Start).Seconds()
}
cronStr := strings.Split(ws.Cron, " ") // split the cron string to treat it
if len(cronStr) < 6 { // if the cron string is less than 6 fields, return an error because format is : ss mm hh dd MM dw (6 fields)
return schedule, errors.New("Bad cron message: (" + ws.Cron + "). Should be at least ss mm hh dd MM dw")
}
subCron := strings.Join(cronStr[:6], " ")
// cron should be parsed as ss mm hh dd MM dw t (min 6 fields)
specParser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) // create a new cron parser
sched, err := specParser.Parse(subCron) // parse the cron string
if err != nil {
return schedule, errors.New("Bad cron message: " + err.Error())
}
// loop through the cron schedule to set the executions
for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) {
e := s.Add(time.Duration(ws.DurationS) * time.Second)
schedule = append(schedule, Schedule{
Start: s,
End: &e,
})
}
} else { // if no cron, set the execution to the start date
schedule = append(schedule, Schedule{
Start: ws.Start,
End: ws.End,
})
}
return schedule, nil
}
/*
* TODO : LARGEST GRAIN PLANIFYING THE WORKFLOW WHEN OPTION IS SET
* SET PROTECTION BORDER TIME
*/