diff --git a/entrypoint.go b/entrypoint.go index e9f3899..30b2b84 100644 --- a/entrypoint.go +++ b/entrypoint.go @@ -244,52 +244,17 @@ func NewRequest(collection LibDataEnum, user string, peerID string, groups []str return &Request{collection: collection, user: user, peerID: peerID, groups: groups, caller: caller} } -func ToScheduler(m interface{}) (n *workflow_execution.WorkflowSchedule) { - defer func() { - if r := recover(); r != nil { - return - } - }() - return m.(*workflow_execution.WorkflowSchedule) -} - -func (r *Request) Schedule(wfID string, scheduler *workflow_execution.WorkflowSchedule) (*workflow_execution.WorkflowSchedule, error) { - ws, _, _, err := scheduler.Schedules(wfID, &tools.APIRequest{ - Caller: r.caller, - Username: r.user, - PeerID: r.peerID, - Groups: r.groups, - }) - if err != nil { - return nil, err - } - return ws, nil -} - -func (r *Request) CheckBooking(wfID string, mode int, start string, end string, durationInS float64, cron string) bool { - ok, _, _, _, _, err := workflow_execution.NewScheduler(mode, start, end, durationInS, cron).GetBuyAndBook(wfID, &tools.APIRequest{ - Caller: r.caller, - Username: r.user, - PeerID: r.peerID, - Groups: r.groups, - }) - if err != nil { - fmt.Println(err) - return false - } - return ok -} - +/* func (r *Request) PaymentTunnel(o *order.Order, scheduler *workflow_execution.WorkflowSchedule) error { - /*return o.Pay(scheduler, &tools.APIRequest{ + return o.Pay(scheduler, &tools.APIRequest{ Caller: r.caller, Username: r.user, PeerID: r.peerID, Groups: r.groups, - })*/ + }) return nil } - +*/ /* * Search will search for the data in the database * @param filters *dbs.Filters diff --git a/models/workflow_execution/tests/workflow_scheduler_test.go b/models/workflow_execution/tests/workflow_scheduler_test.go deleted file mode 100644 index 32e465f..0000000 --- a/models/workflow_execution/tests/workflow_scheduler_test.go +++ /dev/null @@ -1,149 +0,0 @@ -package workflow_execution_test - -import ( - "testing" - "time" - - "cloud.o-forge.io/core/oc-lib/models/common/enum" - "cloud.o-forge.io/core/oc-lib/models/utils" - "cloud.o-forge.io/core/oc-lib/models/workflow" - "cloud.o-forge.io/core/oc-lib/models/workflow_execution" - - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" -) - -type MockAccessor struct { - mock.Mock -} - -func (m *MockAccessor) LoadOne(id string) (interface{}, int, error) { - args := m.Called(id) - return args.Get(0), args.Int(1), args.Error(2) -} - -func TestNewScheduler_ValidInput(t *testing.T) { - s := "2025-06-16T15:00:00" - e := "2025-06-16T17:00:00" - dur := 7200.0 - cronStr := "0 0 * * * *" - - sched := workflow_execution.NewScheduler(0, s, e, dur, cronStr) - - assert.NotNil(t, sched) - assert.Equal(t, dur, sched.DurationS) - assert.Equal(t, cronStr, sched.Cron) -} - -func TestNewScheduler_InvalidStart(t *testing.T) { - s := "invalid" - e := "2025-06-16T17:00:00" - dur := 7200.0 - cronStr := "0 0 * * * *" - - sched := workflow_execution.NewScheduler(0, s, e, dur, cronStr) - assert.Nil(t, sched) -} - -func TestNewScheduler_InvalidEnd(t *testing.T) { - s := "2025-06-16T15:00:00" - e := "invalid" - dur := 7200.0 - cronStr := "0 0 * * * *" - - sched := workflow_execution.NewScheduler(0, s, e, dur, cronStr) - assert.NotNil(t, sched) - assert.Nil(t, sched.End) -} - -func TestGetDates_NoCron(t *testing.T) { - start := time.Now() - end := start.Add(2 * time.Hour) - - s := &workflow_execution.WorkflowSchedule{ - Start: start, - End: &end, - } - - schedule, err := s.GetDates() - - assert.NoError(t, err) - assert.Len(t, schedule, 1) - assert.Equal(t, start, schedule[0].Start) - assert.Equal(t, end, *schedule[0].End) -} - -func TestGetDates_InvalidCron(t *testing.T) { - start := time.Now() - end := start.Add(2 * time.Hour) - - s := &workflow_execution.WorkflowSchedule{ - Start: start, - End: &end, - Cron: "bad cron", - } - - _, err := s.GetDates() - assert.Error(t, err) -} - -func TestGetDates_ValidCron(t *testing.T) { - start := time.Now() - end := start.Add(10 * time.Minute) - - s := &workflow_execution.WorkflowSchedule{ - Start: start, - End: &end, - DurationS: 60, - Cron: "0 */2 * * * *", - } - - dates, err := s.GetDates() - assert.NoError(t, err) - assert.Greater(t, len(dates), 0) -} - -func TestGetExecutions_Success(t *testing.T) { - start := time.Now() - end := start.Add(1 * time.Hour) - ws := &workflow_execution.WorkflowSchedule{ - UUID: uuid.New().String(), - Start: start, - End: &end, - } - - wf := &workflow.Workflow{ - AbstractObject: utils.AbstractObject{ - UUID: uuid.New().String(), - Name: "TestWorkflow", - }, - } - - execs, err := ws.GetExecutions(wf, false) - assert.NoError(t, err) - assert.Greater(t, len(execs), 0) - assert.Equal(t, wf.UUID, execs[0].WorkflowID) - assert.Equal(t, ws.UUID, execs[0].ExecutionsID) - assert.Equal(t, enum.DRAFT, execs[0].State) -} - -func TestSchedules_NoRequest(t *testing.T) { - ws := &workflow_execution.WorkflowSchedule{} - - ws, wf, execs, err := ws.Schedules("someID", nil) - assert.Error(t, err) - assert.Nil(t, wf) - assert.Len(t, execs, 0) - assert.Equal(t, ws, ws) -} - -// Additional test stubs to be completed with gomock usage for: -// - CheckBooking -// - BookExecs -// - getBooking -// - Schedules (success path) -// - Planify mocking in CheckBooking -// - Peer interaction in BookExecs -// - Caller deep copy errors in getCallerCopy -// Will be continued... diff --git a/models/workflow_execution/tests/workflow_test.go b/models/workflow_execution/tests/workflow_test.go index 2ca3f9c..bcdfdec 100755 --- a/models/workflow_execution/tests/workflow_test.go +++ b/models/workflow_execution/tests/workflow_test.go @@ -9,8 +9,18 @@ import ( "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) +type MockAccessor struct { + mock.Mock +} + +func (m *MockAccessor) LoadOne(id string) (interface{}, int, error) { + args := m.Called(id) + return args.Get(0), args.Int(1), args.Error(2) +} + func (m *MockAccessor) DeleteOne(id string) (utils.DBObject, int, error) { args := m.Called(id) return nil, args.Int(1), args.Error(2) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go deleted file mode 100755 index 988323b..0000000 --- a/models/workflow_execution/workflow_scheduler.go +++ /dev/null @@ -1,352 +0,0 @@ -package workflow_execution - -import ( - "errors" - "fmt" - "strings" - "sync" - "time" - - "cloud.o-forge.io/core/oc-lib/models/bill" - "cloud.o-forge.io/core/oc-lib/models/booking" - "cloud.o-forge.io/core/oc-lib/models/common/enum" - "cloud.o-forge.io/core/oc-lib/models/common/pricing" - "cloud.o-forge.io/core/oc-lib/models/order" - "cloud.o-forge.io/core/oc-lib/models/peer" - "cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource" - "cloud.o-forge.io/core/oc-lib/models/utils" - "cloud.o-forge.io/core/oc-lib/models/workflow" - "cloud.o-forge.io/core/oc-lib/tools" - "github.com/google/uuid" - "github.com/robfig/cron" -) - -/* -* WorkflowSchedule is a struct that contains the scheduling information of a workflow -* It contains the mode of the schedule (Task or Service), the name of the schedule, the start and end time of the schedule and the cron expression - */ -// it's a flying object only use in a session time. It's not stored in the database -type WorkflowSchedule struct { - UUID string `json:"id" validate:"required"` // ExecutionsID is the list of the executions id of the workflow - Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule - WorkflowExecution []*WorkflowExecution `json:"workflow_executions,omitempty"` // WorkflowExecution is the list of executions of the workflow - Message string `json:"message,omitempty"` // Message is the message of the schedule - Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule - Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time - End *time.Time `json:"end,omitempty"` // End is the end time of the schedule, is required and must be greater than the Start time - DurationS float64 `json:"duration_s" default:"-1"` // End is the end time of the schedule - Cron string `json:"cron,omitempty"` // here the cron format : ss mm hh dd MM dw task - - BookingMode booking.BookingMode `json:"booking_mode,omitempty"` // BookingMode qualify the preemption order of the scheduling. if no payment allowed with preemption set up When_Possible - SelectedInstances workflow.ConfigItem `json:"selected_instances"` - SelectedPartnerships workflow.ConfigItem `json:"selected_partnerships"` - SelectedBuyings workflow.ConfigItem `json:"selected_buyings"` - SelectedStrategies workflow.ConfigItem `json:"selected_strategies"` - - SelectedBillingStrategy pricing.BillingStrategy `json:"selected_billing_strategy"` -} - -// TODO PREEMPTION ! -/* -To schedule a preempted, omg. -pour faire ça on doit alors lancé une exécution prioritaire qui passera devant toutes les autres, celon un niveau de priorité. -Preemptible = 7, pour le moment il n'existera que 0 et 7. -Dans le cas d'une préemption l'exécution est immédiable et bloquera tout le monde tant qu'il n'a pas été exécuté. -Une ressource doit pouvoir être preemptible pour être exécutée de la sorte. -Se qui implique si on est sur une ressource par ressource que si un élement n'est pas préemptible, -alors il devra être effectué dés que possible - -Dans le cas dés que possible, la start date est immédiate MAIS ! -ne pourra se lancé que SI il n'existe pas d'exécution se lançant durant la période indicative. ( Ultra complexe ) -*/ - -func NewScheduler(mode int, start string, end string, durationInS float64, cron string) *WorkflowSchedule { - ws := &WorkflowSchedule{ - UUID: uuid.New().String(), - Start: time.Now(), - BookingMode: booking.BookingMode(mode), - DurationS: durationInS, - Cron: cron, - } - s, err := time.Parse("2006-01-02T15:04:05", start) - if err == nil && ws.BookingMode == booking.PLANNED { - ws.Start = s // can apply a defined start other than now, if planned - } - - e, err := time.Parse("2006-01-02T15:04:05", end) - if err == nil { - ws.End = &e - } - return ws -} - -func (ws *WorkflowSchedule) GetBuyAndBook(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*purchase_resource.PurchaseResource, []*booking.Booking, error) { - if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { - return false, nil, []*WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, errors.New("no caller defined") - } - access := workflow.NewAccessor(request) - res, code, err := access.LoadOne(wfID) - if code != 200 { - return false, nil, []*WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, errors.New("could not load the workflow with id: " + err.Error()) - } - wf := res.(*workflow.Workflow) - isPreemptible, longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, - ws.SelectedInstances, ws.SelectedPartnerships, ws.SelectedBuyings, ws.SelectedStrategies, - int(ws.BookingMode), request) - if err != nil { - return false, wf, []*WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, err - } - ws.DurationS = longest - ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds." - if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) { - ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n" - } - execs, err := ws.GetExecutions(wf, isPreemptible) - if err != nil { - return false, wf, []*WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, err - } - purchased := []*purchase_resource.PurchaseResource{} - bookings := []*booking.Booking{} - for _, exec := range execs { - purchased = append(purchased, exec.Buy(ws.SelectedBillingStrategy, ws.UUID, wfID, priceds)...) - bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) - } - - errCh := make(chan error, len(bookings)) - var m sync.Mutex - - for _, b := range bookings { - go getBooking(b, request, errCh, &m) - } - - for i := 0; i < len(bookings); i++ { - if err := <-errCh; err != nil { - return false, wf, execs, purchased, bookings, err - } - } - - return true, wf, execs, purchased, bookings, nil -} - -func (ws *WorkflowSchedule) GenerateOrder(purchases []*purchase_resource.PurchaseResource, bookings []*booking.Booking, request *tools.APIRequest) error { - newOrder := &order.Order{ - AbstractObject: utils.AbstractObject{ - Name: "order_" + request.PeerID + "_" + time.Now().UTC().Format("2006-01-02T15:04:05"), - IsDraft: true, - }, - ExecutionsID: ws.UUID, - Purchases: purchases, - Bookings: bookings, - Status: enum.PENDING, - } - if res, _, err := order.NewAccessor(request).StoreOne(newOrder); err == nil { - if _, err := bill.DraftFirstBill(res.(*order.Order), request); err != nil { - return err - } - return nil - } else { - return err - } -} - -func getBooking(b *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) { - m.Lock() - c, err := getCallerCopy(request, errCh) - if err != nil { - errCh <- err - return - } - m.Unlock() - - meth := c.URLS[tools.BOOKING][tools.GET] - meth = strings.ReplaceAll(meth, ":id", b.ResourceID) - meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) - meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) - c.URLS[tools.BOOKING][tools.GET] = meth - _, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c) - - if err != nil { - errCh <- fmt.Errorf("error on " + b.DestPeerID + err.Error()) - return - } - - errCh <- nil -} - -func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) { - var c tools.HTTPCaller - err := request.Caller.DeepCopy(c) - if err != nil { - errCh <- err - return tools.HTTPCaller{}, nil - } - c.URLS = request.Caller.URLS - return c, err -} - -func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { - if request == nil { - return ws, nil, []*WorkflowExecution{}, errors.New("no request found") - } - c := request.Caller - if c == nil || c.URLS == nil || c.URLS[tools.BOOKING] == nil { - return ws, nil, []*WorkflowExecution{}, errors.New("no caller defined") - } - methods := c.URLS[tools.BOOKING] - if _, ok := methods[tools.GET]; !ok { - return ws, nil, []*WorkflowExecution{}, errors.New("no path found") - } - ok, wf, executions, purchases, bookings, err := ws.GetBuyAndBook(wfID, request) - ws.WorkflowExecution = executions - if !ok || err != nil { - return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) - } - ws.Workflow = wf - - var errCh = make(chan error, len(bookings)) - var m sync.Mutex - - for _, purchase := range purchases { - go ws.CallDatacenter(purchase, purchase.DestPeerID, tools.PURCHASE_RESOURCE, request, errCh, &m) - } - for i := 0; i < len(purchases); i++ { - if err := <-errCh; err != nil { - return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) - } - } - - errCh = make(chan error, len(bookings)) - - for _, booking := range bookings { - go ws.CallDatacenter(booking, booking.DestPeerID, tools.BOOKING, request, errCh, &m) - } - - for i := 0; i < len(bookings); i++ { - if err := <-errCh; err != nil { - return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) - } - } - - if err := ws.GenerateOrder(purchases, bookings, request); err != nil { - return ws, wf, executions, err - } - - fmt.Println("Schedules") - for _, exec := range executions { - err := exec.PurgeDraft(request) - if err != nil { - return ws, nil, []*WorkflowExecution{}, errors.New("purge draft" + fmt.Sprintf("%v", err)) - } - exec.StoreDraftDefault() - utils.GenericStoreOne(exec, NewAccessor(request)) - } - fmt.Println("Schedules") - return ws, wf, executions, nil -} - -func (ws *WorkflowSchedule) CallDatacenter(purchase utils.DBObject, destPeerID string, dt tools.DataType, request *tools.APIRequest, errCh chan error, m *sync.Mutex) { - m.Lock() - c, err := getCallerCopy(request, errCh) - if err != nil { - errCh <- err - return - } - m.Unlock() - if res, err := (&peer.Peer{}).LaunchPeerExecution(destPeerID, "", dt, tools.POST, purchase.Serialize(purchase), &c); err != nil { - errCh <- err - return - } else { - data := res["data"].(map[string]interface{}) - purchase.SetID(fmt.Sprintf("%v", data["id"])) - } - errCh <- nil -} - -/* -BOOKING IMPLIED TIME, not of subscription but of execution -so is processing time execution time applied on computes -data can improve the processing time -time should implied a security time border (10sec) if not from the same executions -VERIFY THAT WE HANDLE DIFFERENCE BETWEEN LOCATION TIME && BOOKING -*/ - -/* -* getExecutions is a function that returns the executions of a workflow -* it returns an array of workflow_execution.WorkflowExecution - */ -func (ws *WorkflowSchedule) GetExecutions(workflow *workflow.Workflow, isPreemptible bool) ([]*WorkflowExecution, error) { - workflows_executions := []*WorkflowExecution{} - dates, err := ws.GetDates() - if err != nil { - return workflows_executions, err - } - for _, date := range dates { - obj := &WorkflowExecution{ - AbstractObject: utils.AbstractObject{ - UUID: uuid.New().String(), // set the uuid of the execution - Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution - }, - Priority: 1, - ExecutionsID: ws.UUID, - ExecDate: date.Start, // set the execution date - EndDate: date.End, // set the end date - State: enum.DRAFT, // set the state to 1 (scheduled) - WorkflowID: workflow.GetID(), // set the workflow id dependancy of the execution - } - if ws.BookingMode != booking.PLANNED { - obj.Priority = 0 - } - if ws.BookingMode == booking.PREEMPTED && isPreemptible { - obj.Priority = 7 - } - workflows_executions = append(workflows_executions, obj) - } - return workflows_executions, nil -} - -func (ws *WorkflowSchedule) GetDates() ([]Schedule, error) { - schedule := []Schedule{} - if len(ws.Cron) > 0 { // if cron is set then end date should be set - if ws.End == nil { - return schedule, errors.New("a cron task should have an end date") - } - if ws.DurationS <= 0 { - ws.DurationS = ws.End.Sub(ws.Start).Seconds() - } - cronStr := strings.Split(ws.Cron, " ") // split the cron string to treat it - if len(cronStr) < 6 { // if the cron string is less than 6 fields, return an error because format is : ss mm hh dd MM dw (6 fields) - return schedule, errors.New("Bad cron message: (" + ws.Cron + "). Should be at least ss mm hh dd MM dw") - } - subCron := strings.Join(cronStr[:6], " ") - // cron should be parsed as ss mm hh dd MM dw t (min 6 fields) - specParser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) // create a new cron parser - sched, err := specParser.Parse(subCron) // parse the cron string - if err != nil { - return schedule, errors.New("Bad cron message: " + err.Error()) - } - // loop through the cron schedule to set the executions - for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) { - e := s.Add(time.Duration(ws.DurationS) * time.Second) - schedule = append(schedule, Schedule{ - Start: s, - End: &e, - }) - } - } else { // if no cron, set the execution to the start date - schedule = append(schedule, Schedule{ - Start: ws.Start, - End: ws.End, - }) - } - return schedule, nil -} - -type Schedule struct { - Start time.Time - End *time.Time -} - -/* -* TODO : LARGEST GRAIN PLANIFYING THE WORKFLOW WHEN OPTION IS SET -* SET PROTECTION BORDER TIME - */