delete scheduler
This commit is contained in:
@@ -244,52 +244,17 @@ func NewRequest(collection LibDataEnum, user string, peerID string, groups []str
|
|||||||
return &Request{collection: collection, user: user, peerID: peerID, groups: groups, caller: caller}
|
return &Request{collection: collection, user: user, peerID: peerID, groups: groups, caller: caller}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ToScheduler(m interface{}) (n *workflow_execution.WorkflowSchedule) {
|
/*
|
||||||
defer func() {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return m.(*workflow_execution.WorkflowSchedule)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Request) Schedule(wfID string, scheduler *workflow_execution.WorkflowSchedule) (*workflow_execution.WorkflowSchedule, error) {
|
|
||||||
ws, _, _, err := scheduler.Schedules(wfID, &tools.APIRequest{
|
|
||||||
Caller: r.caller,
|
|
||||||
Username: r.user,
|
|
||||||
PeerID: r.peerID,
|
|
||||||
Groups: r.groups,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return ws, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Request) CheckBooking(wfID string, mode int, start string, end string, durationInS float64, cron string) bool {
|
|
||||||
ok, _, _, _, _, err := workflow_execution.NewScheduler(mode, start, end, durationInS, cron).GetBuyAndBook(wfID, &tools.APIRequest{
|
|
||||||
Caller: r.caller,
|
|
||||||
Username: r.user,
|
|
||||||
PeerID: r.peerID,
|
|
||||||
Groups: r.groups,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println(err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return ok
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Request) PaymentTunnel(o *order.Order, scheduler *workflow_execution.WorkflowSchedule) error {
|
func (r *Request) PaymentTunnel(o *order.Order, scheduler *workflow_execution.WorkflowSchedule) error {
|
||||||
/*return o.Pay(scheduler, &tools.APIRequest{
|
return o.Pay(scheduler, &tools.APIRequest{
|
||||||
Caller: r.caller,
|
Caller: r.caller,
|
||||||
Username: r.user,
|
Username: r.user,
|
||||||
PeerID: r.peerID,
|
PeerID: r.peerID,
|
||||||
Groups: r.groups,
|
Groups: r.groups,
|
||||||
})*/
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
/*
|
/*
|
||||||
* Search will search for the data in the database
|
* Search will search for the data in the database
|
||||||
* @param filters *dbs.Filters
|
* @param filters *dbs.Filters
|
||||||
|
|||||||
@@ -1,149 +0,0 @@
|
|||||||
package workflow_execution_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/workflow"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
|
||||||
|
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/mock"
|
|
||||||
)
|
|
||||||
|
|
||||||
type MockAccessor struct {
|
|
||||||
mock.Mock
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MockAccessor) LoadOne(id string) (interface{}, int, error) {
|
|
||||||
args := m.Called(id)
|
|
||||||
return args.Get(0), args.Int(1), args.Error(2)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNewScheduler_ValidInput(t *testing.T) {
|
|
||||||
s := "2025-06-16T15:00:00"
|
|
||||||
e := "2025-06-16T17:00:00"
|
|
||||||
dur := 7200.0
|
|
||||||
cronStr := "0 0 * * * *"
|
|
||||||
|
|
||||||
sched := workflow_execution.NewScheduler(0, s, e, dur, cronStr)
|
|
||||||
|
|
||||||
assert.NotNil(t, sched)
|
|
||||||
assert.Equal(t, dur, sched.DurationS)
|
|
||||||
assert.Equal(t, cronStr, sched.Cron)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNewScheduler_InvalidStart(t *testing.T) {
|
|
||||||
s := "invalid"
|
|
||||||
e := "2025-06-16T17:00:00"
|
|
||||||
dur := 7200.0
|
|
||||||
cronStr := "0 0 * * * *"
|
|
||||||
|
|
||||||
sched := workflow_execution.NewScheduler(0, s, e, dur, cronStr)
|
|
||||||
assert.Nil(t, sched)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestNewScheduler_InvalidEnd(t *testing.T) {
|
|
||||||
s := "2025-06-16T15:00:00"
|
|
||||||
e := "invalid"
|
|
||||||
dur := 7200.0
|
|
||||||
cronStr := "0 0 * * * *"
|
|
||||||
|
|
||||||
sched := workflow_execution.NewScheduler(0, s, e, dur, cronStr)
|
|
||||||
assert.NotNil(t, sched)
|
|
||||||
assert.Nil(t, sched.End)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetDates_NoCron(t *testing.T) {
|
|
||||||
start := time.Now()
|
|
||||||
end := start.Add(2 * time.Hour)
|
|
||||||
|
|
||||||
s := &workflow_execution.WorkflowSchedule{
|
|
||||||
Start: start,
|
|
||||||
End: &end,
|
|
||||||
}
|
|
||||||
|
|
||||||
schedule, err := s.GetDates()
|
|
||||||
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Len(t, schedule, 1)
|
|
||||||
assert.Equal(t, start, schedule[0].Start)
|
|
||||||
assert.Equal(t, end, *schedule[0].End)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetDates_InvalidCron(t *testing.T) {
|
|
||||||
start := time.Now()
|
|
||||||
end := start.Add(2 * time.Hour)
|
|
||||||
|
|
||||||
s := &workflow_execution.WorkflowSchedule{
|
|
||||||
Start: start,
|
|
||||||
End: &end,
|
|
||||||
Cron: "bad cron",
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := s.GetDates()
|
|
||||||
assert.Error(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetDates_ValidCron(t *testing.T) {
|
|
||||||
start := time.Now()
|
|
||||||
end := start.Add(10 * time.Minute)
|
|
||||||
|
|
||||||
s := &workflow_execution.WorkflowSchedule{
|
|
||||||
Start: start,
|
|
||||||
End: &end,
|
|
||||||
DurationS: 60,
|
|
||||||
Cron: "0 */2 * * * *",
|
|
||||||
}
|
|
||||||
|
|
||||||
dates, err := s.GetDates()
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Greater(t, len(dates), 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestGetExecutions_Success(t *testing.T) {
|
|
||||||
start := time.Now()
|
|
||||||
end := start.Add(1 * time.Hour)
|
|
||||||
ws := &workflow_execution.WorkflowSchedule{
|
|
||||||
UUID: uuid.New().String(),
|
|
||||||
Start: start,
|
|
||||||
End: &end,
|
|
||||||
}
|
|
||||||
|
|
||||||
wf := &workflow.Workflow{
|
|
||||||
AbstractObject: utils.AbstractObject{
|
|
||||||
UUID: uuid.New().String(),
|
|
||||||
Name: "TestWorkflow",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
execs, err := ws.GetExecutions(wf, false)
|
|
||||||
assert.NoError(t, err)
|
|
||||||
assert.Greater(t, len(execs), 0)
|
|
||||||
assert.Equal(t, wf.UUID, execs[0].WorkflowID)
|
|
||||||
assert.Equal(t, ws.UUID, execs[0].ExecutionsID)
|
|
||||||
assert.Equal(t, enum.DRAFT, execs[0].State)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSchedules_NoRequest(t *testing.T) {
|
|
||||||
ws := &workflow_execution.WorkflowSchedule{}
|
|
||||||
|
|
||||||
ws, wf, execs, err := ws.Schedules("someID", nil)
|
|
||||||
assert.Error(t, err)
|
|
||||||
assert.Nil(t, wf)
|
|
||||||
assert.Len(t, execs, 0)
|
|
||||||
assert.Equal(t, ws, ws)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Additional test stubs to be completed with gomock usage for:
|
|
||||||
// - CheckBooking
|
|
||||||
// - BookExecs
|
|
||||||
// - getBooking
|
|
||||||
// - Schedules (success path)
|
|
||||||
// - Planify mocking in CheckBooking
|
|
||||||
// - Peer interaction in BookExecs
|
|
||||||
// - Caller deep copy errors in getCallerCopy
|
|
||||||
// Will be continued...
|
|
||||||
@@ -9,8 +9,18 @@ import (
|
|||||||
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||||
"cloud.o-forge.io/core/oc-lib/tools"
|
"cloud.o-forge.io/core/oc-lib/tools"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type MockAccessor struct {
|
||||||
|
mock.Mock
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MockAccessor) LoadOne(id string) (interface{}, int, error) {
|
||||||
|
args := m.Called(id)
|
||||||
|
return args.Get(0), args.Int(1), args.Error(2)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *MockAccessor) DeleteOne(id string) (utils.DBObject, int, error) {
|
func (m *MockAccessor) DeleteOne(id string) (utils.DBObject, int, error) {
|
||||||
args := m.Called(id)
|
args := m.Called(id)
|
||||||
return nil, args.Int(1), args.Error(2)
|
return nil, args.Int(1), args.Error(2)
|
||||||
|
|||||||
@@ -1,352 +0,0 @@
|
|||||||
package workflow_execution
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/bill"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/booking"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/order"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/peer"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/workflow"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/tools"
|
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/robfig/cron"
|
|
||||||
)
|
|
||||||
|
|
||||||
/*
|
|
||||||
* WorkflowSchedule is a struct that contains the scheduling information of a workflow
|
|
||||||
* It contains the mode of the schedule (Task or Service), the name of the schedule, the start and end time of the schedule and the cron expression
|
|
||||||
*/
|
|
||||||
// it's a flying object only use in a session time. It's not stored in the database
|
|
||||||
type WorkflowSchedule struct {
|
|
||||||
UUID string `json:"id" validate:"required"` // ExecutionsID is the list of the executions id of the workflow
|
|
||||||
Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule
|
|
||||||
WorkflowExecution []*WorkflowExecution `json:"workflow_executions,omitempty"` // WorkflowExecution is the list of executions of the workflow
|
|
||||||
Message string `json:"message,omitempty"` // Message is the message of the schedule
|
|
||||||
Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule
|
|
||||||
Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time
|
|
||||||
End *time.Time `json:"end,omitempty"` // End is the end time of the schedule, is required and must be greater than the Start time
|
|
||||||
DurationS float64 `json:"duration_s" default:"-1"` // End is the end time of the schedule
|
|
||||||
Cron string `json:"cron,omitempty"` // here the cron format : ss mm hh dd MM dw task
|
|
||||||
|
|
||||||
BookingMode booking.BookingMode `json:"booking_mode,omitempty"` // BookingMode qualify the preemption order of the scheduling. if no payment allowed with preemption set up When_Possible
|
|
||||||
SelectedInstances workflow.ConfigItem `json:"selected_instances"`
|
|
||||||
SelectedPartnerships workflow.ConfigItem `json:"selected_partnerships"`
|
|
||||||
SelectedBuyings workflow.ConfigItem `json:"selected_buyings"`
|
|
||||||
SelectedStrategies workflow.ConfigItem `json:"selected_strategies"`
|
|
||||||
|
|
||||||
SelectedBillingStrategy pricing.BillingStrategy `json:"selected_billing_strategy"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO PREEMPTION !
|
|
||||||
/*
|
|
||||||
To schedule a preempted, omg.
|
|
||||||
pour faire ça on doit alors lancé une exécution prioritaire qui passera devant toutes les autres, celon un niveau de priorité.
|
|
||||||
Preemptible = 7, pour le moment il n'existera que 0 et 7.
|
|
||||||
Dans le cas d'une préemption l'exécution est immédiable et bloquera tout le monde tant qu'il n'a pas été exécuté.
|
|
||||||
Une ressource doit pouvoir être preemptible pour être exécutée de la sorte.
|
|
||||||
Se qui implique si on est sur une ressource par ressource que si un élement n'est pas préemptible,
|
|
||||||
alors il devra être effectué dés que possible
|
|
||||||
|
|
||||||
Dans le cas dés que possible, la start date est immédiate MAIS !
|
|
||||||
ne pourra se lancé que SI il n'existe pas d'exécution se lançant durant la période indicative. ( Ultra complexe )
|
|
||||||
*/
|
|
||||||
|
|
||||||
func NewScheduler(mode int, start string, end string, durationInS float64, cron string) *WorkflowSchedule {
|
|
||||||
ws := &WorkflowSchedule{
|
|
||||||
UUID: uuid.New().String(),
|
|
||||||
Start: time.Now(),
|
|
||||||
BookingMode: booking.BookingMode(mode),
|
|
||||||
DurationS: durationInS,
|
|
||||||
Cron: cron,
|
|
||||||
}
|
|
||||||
s, err := time.Parse("2006-01-02T15:04:05", start)
|
|
||||||
if err == nil && ws.BookingMode == booking.PLANNED {
|
|
||||||
ws.Start = s // can apply a defined start other than now, if planned
|
|
||||||
}
|
|
||||||
|
|
||||||
e, err := time.Parse("2006-01-02T15:04:05", end)
|
|
||||||
if err == nil {
|
|
||||||
ws.End = &e
|
|
||||||
}
|
|
||||||
return ws
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ws *WorkflowSchedule) GetBuyAndBook(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*purchase_resource.PurchaseResource, []*booking.Booking, error) {
|
|
||||||
if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" {
|
|
||||||
return false, nil, []*WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, errors.New("no caller defined")
|
|
||||||
}
|
|
||||||
access := workflow.NewAccessor(request)
|
|
||||||
res, code, err := access.LoadOne(wfID)
|
|
||||||
if code != 200 {
|
|
||||||
return false, nil, []*WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, errors.New("could not load the workflow with id: " + err.Error())
|
|
||||||
}
|
|
||||||
wf := res.(*workflow.Workflow)
|
|
||||||
isPreemptible, longest, priceds, wf, err := wf.Planify(ws.Start, ws.End,
|
|
||||||
ws.SelectedInstances, ws.SelectedPartnerships, ws.SelectedBuyings, ws.SelectedStrategies,
|
|
||||||
int(ws.BookingMode), request)
|
|
||||||
if err != nil {
|
|
||||||
return false, wf, []*WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, err
|
|
||||||
}
|
|
||||||
ws.DurationS = longest
|
|
||||||
ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds."
|
|
||||||
if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) {
|
|
||||||
ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n"
|
|
||||||
}
|
|
||||||
execs, err := ws.GetExecutions(wf, isPreemptible)
|
|
||||||
if err != nil {
|
|
||||||
return false, wf, []*WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, err
|
|
||||||
}
|
|
||||||
purchased := []*purchase_resource.PurchaseResource{}
|
|
||||||
bookings := []*booking.Booking{}
|
|
||||||
for _, exec := range execs {
|
|
||||||
purchased = append(purchased, exec.Buy(ws.SelectedBillingStrategy, ws.UUID, wfID, priceds)...)
|
|
||||||
bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...)
|
|
||||||
}
|
|
||||||
|
|
||||||
errCh := make(chan error, len(bookings))
|
|
||||||
var m sync.Mutex
|
|
||||||
|
|
||||||
for _, b := range bookings {
|
|
||||||
go getBooking(b, request, errCh, &m)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < len(bookings); i++ {
|
|
||||||
if err := <-errCh; err != nil {
|
|
||||||
return false, wf, execs, purchased, bookings, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, wf, execs, purchased, bookings, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ws *WorkflowSchedule) GenerateOrder(purchases []*purchase_resource.PurchaseResource, bookings []*booking.Booking, request *tools.APIRequest) error {
|
|
||||||
newOrder := &order.Order{
|
|
||||||
AbstractObject: utils.AbstractObject{
|
|
||||||
Name: "order_" + request.PeerID + "_" + time.Now().UTC().Format("2006-01-02T15:04:05"),
|
|
||||||
IsDraft: true,
|
|
||||||
},
|
|
||||||
ExecutionsID: ws.UUID,
|
|
||||||
Purchases: purchases,
|
|
||||||
Bookings: bookings,
|
|
||||||
Status: enum.PENDING,
|
|
||||||
}
|
|
||||||
if res, _, err := order.NewAccessor(request).StoreOne(newOrder); err == nil {
|
|
||||||
if _, err := bill.DraftFirstBill(res.(*order.Order), request); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
} else {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getBooking(b *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
|
|
||||||
m.Lock()
|
|
||||||
c, err := getCallerCopy(request, errCh)
|
|
||||||
if err != nil {
|
|
||||||
errCh <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
m.Unlock()
|
|
||||||
|
|
||||||
meth := c.URLS[tools.BOOKING][tools.GET]
|
|
||||||
meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
|
|
||||||
meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05"))
|
|
||||||
meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05"))
|
|
||||||
c.URLS[tools.BOOKING][tools.GET] = meth
|
|
||||||
_, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
errCh <- fmt.Errorf("error on " + b.DestPeerID + err.Error())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
errCh <- nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) {
|
|
||||||
var c tools.HTTPCaller
|
|
||||||
err := request.Caller.DeepCopy(c)
|
|
||||||
if err != nil {
|
|
||||||
errCh <- err
|
|
||||||
return tools.HTTPCaller{}, nil
|
|
||||||
}
|
|
||||||
c.URLS = request.Caller.URLS
|
|
||||||
return c, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) {
|
|
||||||
if request == nil {
|
|
||||||
return ws, nil, []*WorkflowExecution{}, errors.New("no request found")
|
|
||||||
}
|
|
||||||
c := request.Caller
|
|
||||||
if c == nil || c.URLS == nil || c.URLS[tools.BOOKING] == nil {
|
|
||||||
return ws, nil, []*WorkflowExecution{}, errors.New("no caller defined")
|
|
||||||
}
|
|
||||||
methods := c.URLS[tools.BOOKING]
|
|
||||||
if _, ok := methods[tools.GET]; !ok {
|
|
||||||
return ws, nil, []*WorkflowExecution{}, errors.New("no path found")
|
|
||||||
}
|
|
||||||
ok, wf, executions, purchases, bookings, err := ws.GetBuyAndBook(wfID, request)
|
|
||||||
ws.WorkflowExecution = executions
|
|
||||||
if !ok || err != nil {
|
|
||||||
return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
|
|
||||||
}
|
|
||||||
ws.Workflow = wf
|
|
||||||
|
|
||||||
var errCh = make(chan error, len(bookings))
|
|
||||||
var m sync.Mutex
|
|
||||||
|
|
||||||
for _, purchase := range purchases {
|
|
||||||
go ws.CallDatacenter(purchase, purchase.DestPeerID, tools.PURCHASE_RESOURCE, request, errCh, &m)
|
|
||||||
}
|
|
||||||
for i := 0; i < len(purchases); i++ {
|
|
||||||
if err := <-errCh; err != nil {
|
|
||||||
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
errCh = make(chan error, len(bookings))
|
|
||||||
|
|
||||||
for _, booking := range bookings {
|
|
||||||
go ws.CallDatacenter(booking, booking.DestPeerID, tools.BOOKING, request, errCh, &m)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < len(bookings); i++ {
|
|
||||||
if err := <-errCh; err != nil {
|
|
||||||
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := ws.GenerateOrder(purchases, bookings, request); err != nil {
|
|
||||||
return ws, wf, executions, err
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("Schedules")
|
|
||||||
for _, exec := range executions {
|
|
||||||
err := exec.PurgeDraft(request)
|
|
||||||
if err != nil {
|
|
||||||
return ws, nil, []*WorkflowExecution{}, errors.New("purge draft" + fmt.Sprintf("%v", err))
|
|
||||||
}
|
|
||||||
exec.StoreDraftDefault()
|
|
||||||
utils.GenericStoreOne(exec, NewAccessor(request))
|
|
||||||
}
|
|
||||||
fmt.Println("Schedules")
|
|
||||||
return ws, wf, executions, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ws *WorkflowSchedule) CallDatacenter(purchase utils.DBObject, destPeerID string, dt tools.DataType, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
|
|
||||||
m.Lock()
|
|
||||||
c, err := getCallerCopy(request, errCh)
|
|
||||||
if err != nil {
|
|
||||||
errCh <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
m.Unlock()
|
|
||||||
if res, err := (&peer.Peer{}).LaunchPeerExecution(destPeerID, "", dt, tools.POST, purchase.Serialize(purchase), &c); err != nil {
|
|
||||||
errCh <- err
|
|
||||||
return
|
|
||||||
} else {
|
|
||||||
data := res["data"].(map[string]interface{})
|
|
||||||
purchase.SetID(fmt.Sprintf("%v", data["id"]))
|
|
||||||
}
|
|
||||||
errCh <- nil
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
BOOKING IMPLIED TIME, not of subscription but of execution
|
|
||||||
so is processing time execution time applied on computes
|
|
||||||
data can improve the processing time
|
|
||||||
time should implied a security time border (10sec) if not from the same executions
|
|
||||||
VERIFY THAT WE HANDLE DIFFERENCE BETWEEN LOCATION TIME && BOOKING
|
|
||||||
*/
|
|
||||||
|
|
||||||
/*
|
|
||||||
* getExecutions is a function that returns the executions of a workflow
|
|
||||||
* it returns an array of workflow_execution.WorkflowExecution
|
|
||||||
*/
|
|
||||||
func (ws *WorkflowSchedule) GetExecutions(workflow *workflow.Workflow, isPreemptible bool) ([]*WorkflowExecution, error) {
|
|
||||||
workflows_executions := []*WorkflowExecution{}
|
|
||||||
dates, err := ws.GetDates()
|
|
||||||
if err != nil {
|
|
||||||
return workflows_executions, err
|
|
||||||
}
|
|
||||||
for _, date := range dates {
|
|
||||||
obj := &WorkflowExecution{
|
|
||||||
AbstractObject: utils.AbstractObject{
|
|
||||||
UUID: uuid.New().String(), // set the uuid of the execution
|
|
||||||
Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution
|
|
||||||
},
|
|
||||||
Priority: 1,
|
|
||||||
ExecutionsID: ws.UUID,
|
|
||||||
ExecDate: date.Start, // set the execution date
|
|
||||||
EndDate: date.End, // set the end date
|
|
||||||
State: enum.DRAFT, // set the state to 1 (scheduled)
|
|
||||||
WorkflowID: workflow.GetID(), // set the workflow id dependancy of the execution
|
|
||||||
}
|
|
||||||
if ws.BookingMode != booking.PLANNED {
|
|
||||||
obj.Priority = 0
|
|
||||||
}
|
|
||||||
if ws.BookingMode == booking.PREEMPTED && isPreemptible {
|
|
||||||
obj.Priority = 7
|
|
||||||
}
|
|
||||||
workflows_executions = append(workflows_executions, obj)
|
|
||||||
}
|
|
||||||
return workflows_executions, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ws *WorkflowSchedule) GetDates() ([]Schedule, error) {
|
|
||||||
schedule := []Schedule{}
|
|
||||||
if len(ws.Cron) > 0 { // if cron is set then end date should be set
|
|
||||||
if ws.End == nil {
|
|
||||||
return schedule, errors.New("a cron task should have an end date")
|
|
||||||
}
|
|
||||||
if ws.DurationS <= 0 {
|
|
||||||
ws.DurationS = ws.End.Sub(ws.Start).Seconds()
|
|
||||||
}
|
|
||||||
cronStr := strings.Split(ws.Cron, " ") // split the cron string to treat it
|
|
||||||
if len(cronStr) < 6 { // if the cron string is less than 6 fields, return an error because format is : ss mm hh dd MM dw (6 fields)
|
|
||||||
return schedule, errors.New("Bad cron message: (" + ws.Cron + "). Should be at least ss mm hh dd MM dw")
|
|
||||||
}
|
|
||||||
subCron := strings.Join(cronStr[:6], " ")
|
|
||||||
// cron should be parsed as ss mm hh dd MM dw t (min 6 fields)
|
|
||||||
specParser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) // create a new cron parser
|
|
||||||
sched, err := specParser.Parse(subCron) // parse the cron string
|
|
||||||
if err != nil {
|
|
||||||
return schedule, errors.New("Bad cron message: " + err.Error())
|
|
||||||
}
|
|
||||||
// loop through the cron schedule to set the executions
|
|
||||||
for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) {
|
|
||||||
e := s.Add(time.Duration(ws.DurationS) * time.Second)
|
|
||||||
schedule = append(schedule, Schedule{
|
|
||||||
Start: s,
|
|
||||||
End: &e,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
} else { // if no cron, set the execution to the start date
|
|
||||||
schedule = append(schedule, Schedule{
|
|
||||||
Start: ws.Start,
|
|
||||||
End: ws.End,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return schedule, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type Schedule struct {
|
|
||||||
Start time.Time
|
|
||||||
End *time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* TODO : LARGEST GRAIN PLANIFYING THE WORKFLOW WHEN OPTION IS SET
|
|
||||||
* SET PROTECTION BORDER TIME
|
|
||||||
*/
|
|
||||||
Reference in New Issue
Block a user