1 Commits

Author SHA1 Message Date
mr
5302ed48b3 WorkflowScheduler Gestionnary 2026-01-14 15:15:26 +01:00
4 changed files with 418 additions and 43 deletions

View File

@@ -3,6 +3,7 @@ package controllers
import (
"encoding/json"
"fmt"
"oc-scheduler/infrastructure"
"slices"
oclib "cloud.o-forge.io/core/oc-lib"
@@ -11,7 +12,6 @@ import (
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
beego "github.com/beego/beego/v2/server/web"
"github.com/google/uuid"
@@ -38,7 +38,7 @@ func (o *WorkflowSchedulerController) Schedule() {
e := ""
user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
wfId := o.Ctx.Input.Param(":id")
var resp *workflow_execution.WorkflowSchedule
var resp *infrastructure.WorkflowSchedule
json.Unmarshal(o.Ctx.Input.CopyBody(100000), &resp)
caller := tools.NewHTTPCaller(map[tools.DataType]map[tools.METHOD]string{ // paths to call other OC services
@@ -49,14 +49,18 @@ func (o *WorkflowSchedulerController) Schedule() {
tools.GET: "/booking/check/:id/:start_date/:end_date",
tools.POST: "/booking/",
},
})
logger.Info().Msg("Booking for " + wfId)
req := oclib.NewRequest(collection, user, peerID, groups, caller)
resp.UUID = uuid.New().String()
sch, err := req.Schedule(wfId, resp)
sch, _, execs, err := resp.Schedules(wfId, &tools.APIRequest{
Username: user,
PeerID: peerID,
Groups: groups,
Caller: caller,
})
if err != nil {
if sch != nil {
for _, w := range sch.WorkflowExecution {
@@ -73,12 +77,11 @@ func (o *WorkflowSchedulerController) Schedule() {
}
logger.Info().Msg("Creating S3 service account if necessary")
execs := sch.WorkflowExecution
for _, exec := range execs {
execId := exec.ExecutionsID
logger.Info().Msg("S3 ServiceAccount for " + execId)
// execId = "6cdaf6e4-5727-480e-ab97-f78853c4e553"
err = createStorageServiceAccount(execId, peerID, wfId, resp, caller, user, groups)
err = createStorageServiceAccount(execId, peerID, wfId, sch, caller, user, groups)
if err != nil {
// if sch != nil {
// for _, w := range sch.WorkflowExecution {
@@ -140,10 +143,9 @@ func (o *WorkflowSchedulerController) SearchScheduledDraftOrder() {
o.ServeJSON()
}
func createStorageServiceAccount(execId string, peerID string, wfId string, wfs *workflow_execution.WorkflowSchedule, caller *tools.HTTPCaller, user string, groups []string) error {
func createStorageServiceAccount(execId string, peerID string, wfId string, wfs *infrastructure.WorkflowSchedule, caller *tools.HTTPCaller, user string, groups []string) error {
// Retrieve the Workflow in the WorkflowSchedule
wf := loadWorkflow(wfId,peerID)
wf := loadWorkflow(wfId, peerID)
// storageItems := wf.GetGraphItems(wf.Graph.IsStorage)
itemMap := wf.GetItemsByResources()
// mapStorageRessources, err := getItemByRessourceId(wf, storageItems)
@@ -183,7 +185,7 @@ func createStorageServiceAccount(execId string, peerID string, wfId string, wfs
return err
}
// post on computing datacenter /minio/createSAsecret
err = postS3Secret(peerID, *s, caller, execId, wfId,*c, access, secret) // create the secret holding the retrieved access on c's peer
err = postS3Secret(peerID, *s, caller, execId, wfId, *c, access, secret) // create the secret holding the retrieved access on c's peer
if err != nil {
// Add a logger.Info() here
return err
@@ -232,7 +234,7 @@ func postCreateServiceAccount(peerID string, s *resources.StorageResource, calle
}
func loadWorkflow(workflowId string, peerId string) *workflow.Workflow {
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerId, []string{},nil).LoadOne(workflowId)
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerId, []string{}, nil).LoadOne(workflowId)
if res.Code != 200 {
l := oclib.GetLogger()
l.Error().Msg("Error while loading a workflow for creation of the serviceAccount")
@@ -248,13 +250,13 @@ func loadWorkflow(workflowId string, peerId string) *workflow.Workflow {
// }
func getAssociatedComputeRessources(wf workflow.Workflow, storageNodes []string) []string {
storageProcessingLinks := make([]string,0)
for _, id := range storageNodes{
storageProcessingLinks := make([]string, 0)
for _, id := range storageNodes {
processings := getStorageRelatedProcessing(wf, id) // Retrieve all the Processing item linked to one storage node
for _, procId := range processings {
computings := getComputeProcessing(wf, procId)
if !slices.Contains(storageProcessingLinks,computings){
storageProcessingLinks= append(storageProcessingLinks, computings)
if !slices.Contains(storageProcessingLinks, computings) {
storageProcessingLinks = append(storageProcessingLinks, computings)
}
}
}
@@ -269,21 +271,27 @@ func getStorageRelatedProcessing(wf workflow.Workflow, storageId string) (relate
// Only keep the links that are associated to the storage
for _, link := range wf.Graph.Links {
if link.Destination.ID == storageId || link.Source.ID == storageId {
storageLinks = append(storageLinks,link)
storageLinks = append(storageLinks, link)
}
}
for _, link := range storageLinks {
var resourceId string
if link.Source.ID != storageId { resourceId = link.Source.ID } else { resourceId = link.Destination.ID }
if wf.Graph.IsProcessing(wf.Graph.Items[resourceId]){ relatedProcessing = append(relatedProcessing, resourceId) }
if link.Source.ID != storageId {
resourceId = link.Source.ID
} else {
resourceId = link.Destination.ID
}
if wf.Graph.IsProcessing(wf.Graph.Items[resourceId]) {
relatedProcessing = append(relatedProcessing, resourceId)
}
}
return
}
func getComputeProcessing(wf workflow.Workflow, processingId string) (res string) {
computeRel := wf.GetByRelatedProcessing(processingId,wf.Graph.IsCompute)
computeRel := wf.GetByRelatedProcessing(processingId, wf.Graph.IsCompute)
for _, rel := range computeRel {
return rel.Node.GetID()
}
@@ -293,7 +301,7 @@ func getComputeProcessing(wf workflow.Workflow, processingId string) (res string
func getServiceAccountCredentials(peerID string, storageRes resources.StorageResource, caller *tools.HTTPCaller, execId string, wfId string, computeRes resources.ComputeResource) (string, string, error) {
l := oclib.GetLogger()
fmt.Println("Getting a service account for" + computeRes.CreatorID + " on S3 " + storageRes.Name + " on peer " + storageRes.CreatorID )
fmt.Println("Getting a service account for" + computeRes.CreatorID + " on S3 " + storageRes.Name + " on peer " + storageRes.CreatorID)
res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", peerID, []string{}, nil).LoadOne(storageRes.CreatorID)
if res.Code != 200 {
l.Error().Msg("Error while loading a peer for creation of the serviceAccount")
@@ -315,7 +323,7 @@ func getServiceAccountCredentials(peerID string, storageRes resources.StorageRes
}
result_code := caller.LastResults["code"].(int)
if !slices.Contains([]int{200,201}, result_code) {
if !slices.Contains([]int{200, 201}, result_code) {
l.Error().Msg(fmt.Sprint("Error when trying to create a serviceAccount on storage " + storageRes.Name + " on peer at " + p.Url))
if _, ok := caller.LastResults["body"]; ok {
l.Error().Msg(string(caller.LastResults["body"].([]byte)))
@@ -336,7 +344,6 @@ func getServiceAccountCredentials(peerID string, storageRes resources.StorageRes
secret = s.(string)
}
return access, secret, nil
}
@@ -363,7 +370,7 @@ func postS3Secret(peerID string, s resources.StorageResource, caller *tools.HTTP
}
result_code := caller.LastResults["code"].(int)
if !slices.Contains([]int{200,201}, result_code) {
if !slices.Contains([]int{200, 201}, result_code) {
l.Error().Msg(fmt.Sprint("Error when trying to post the credential to " + s.Name + "to a secret on peer at " + p.Url))
if _, ok := caller.LastResults["body"]; ok {
l.Error().Msg(string(caller.LastResults["body"].([]byte)))

2
go.mod
View File

@@ -5,7 +5,7 @@ go 1.23.0
toolchain go1.24.0
require (
cloud.o-forge.io/core/oc-lib v0.0.0-20250805113921-40a61387b9f1
cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d
github.com/beego/beego/v2 v2.3.8
github.com/google/uuid v1.6.0
github.com/smartystreets/goconvey v1.7.2

6
go.sum
View File

@@ -26,6 +26,12 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20250805112547-cc939451fd81 h1:539qIasa1Vz+F
cloud.o-forge.io/core/oc-lib v0.0.0-20250805112547-cc939451fd81/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20250805113921-40a61387b9f1 h1:53KzZ+1JqRY6J7EVzQpNBmLzNuxb8oHNW3UgqxkYABo=
cloud.o-forge.io/core/oc-lib v0.0.0-20250805113921-40a61387b9f1/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20260113150431-6d745fe92216 h1:9ab37/TK1JhdOOvYbqq9J9hDbipofBkq0l2GQ6umARY=
cloud.o-forge.io/core/oc-lib v0.0.0-20260113150431-6d745fe92216/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20260114125532-0e378dc19c06 h1:kDTCqxzV8dvLeXPzPWIn4LgFqwgVprrXwNnP+ftA9C0=
cloud.o-forge.io/core/oc-lib v0.0.0-20260114125532-0e378dc19c06/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d h1:6oGSN4Fb+H7LNVbUEN7vaDtWBHZTdd2Y1BkBdZ7MLXE=
cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/beego/beego/v2 v2.3.8 h1:wplhB1pF4TxR+2SS4PUej8eDoH4xGfxuHfS7wAk9VBc=
github.com/beego/beego/v2 v2.3.8/go.mod h1:8vl9+RrXqvodrl9C8yivX1e6le6deCK6RWeq8R7gTTg=

362
infrastructure/scheduler.go Normal file
View File

@@ -0,0 +1,362 @@
package infrastructure
import (
"errors"
"fmt"
"strings"
"sync"
"time"
"cloud.o-forge.io/core/oc-lib/models/bill"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
"github.com/robfig/cron"
)
/*
* WorkflowSchedule is a struct that contains the scheduling information of a workflow
* It contains the mode of the schedule (Task or Service), the name of the schedule, the start and end time of the schedule and the cron expression
*/
// it's a flying object only use in a session time. It's not stored in the database
type WorkflowSchedule struct {
UUID string `json:"id" validate:"required"` // ExecutionsID is the list of the executions id of the workflow
Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule
WorkflowExecution []*workflow_execution.WorkflowExecution `json:"workflow_executions,omitempty"` // WorkflowExecution is the list of executions of the workflow
Message string `json:"message,omitempty"` // Message is the message of the schedule
Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule
Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time
End *time.Time `json:"end,omitempty"` // End is the end time of the schedule, is required and must be greater than the Start time
DurationS float64 `json:"duration_s" default:"-1"` // End is the end time of the schedule
Cron string `json:"cron,omitempty"` // here the cron format : ss mm hh dd MM dw task
BookingMode booking.BookingMode `json:"booking_mode,omitempty"` // BookingMode qualify the preemption order of the scheduling. if no payment allowed with preemption set up When_Possible
SelectedInstances workflow.ConfigItem `json:"selected_instances"`
SelectedPartnerships workflow.ConfigItem `json:"selected_partnerships"`
SelectedBuyings workflow.ConfigItem `json:"selected_buyings"`
SelectedStrategies workflow.ConfigItem `json:"selected_strategies"`
SelectedBillingStrategy pricing.BillingStrategy `json:"selected_billing_strategy"`
}
// TODO PREEMPTION !
/*
To schedule a preempted, omg.
pour faire ça on doit alors lancé une exécution prioritaire qui passera devant toutes les autres, celon un niveau de priorité.
Preemptible = 7, pour le moment il n'existera que 0 et 7.
Dans le cas d'une préemption l'exécution est immédiable et bloquera tout le monde tant qu'il n'a pas été exécuté.
Une ressource doit pouvoir être preemptible pour être exécutée de la sorte.
Se qui implique si on est sur une ressource par ressource que si un élement n'est pas préemptible,
alors il devra être effectué dés que possible
Dans le cas dés que possible, la start date est immédiate MAIS !
ne pourra se lancé que SI il n'existe pas d'exécution se lançant durant la période indicative. ( Ultra complexe )
*/
func NewScheduler(mode int, start string, end string, durationInS float64, cron string) *WorkflowSchedule {
ws := &WorkflowSchedule{
UUID: uuid.New().String(),
Start: time.Now(),
BookingMode: booking.BookingMode(mode),
DurationS: durationInS,
Cron: cron,
}
s, err := time.Parse("2006-01-02T15:04:05", start)
if err == nil && ws.BookingMode == booking.PLANNED {
ws.Start = s // can apply a defined start other than now, if planned
}
e, err := time.Parse("2006-01-02T15:04:05", end)
if err == nil {
ws.End = &e
}
return ws
}
func (ws *WorkflowSchedule) GetBuyAndBook(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*workflow_execution.WorkflowExecution, []*purchase_resource.PurchaseResource, []*booking.Booking, error) {
if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" {
return false, nil, []*workflow_execution.WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, errors.New("no caller defined")
}
access := workflow.NewAccessor(request)
res, code, err := access.LoadOne(wfID)
if code != 200 {
return false, nil, []*workflow_execution.WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, errors.New("could not load the workflow with id: " + err.Error())
}
wf := res.(*workflow.Workflow)
isPreemptible, longest, priceds, wf, err := wf.Planify(ws.Start, ws.End,
ws.SelectedInstances, ws.SelectedPartnerships, ws.SelectedBuyings, ws.SelectedStrategies,
int(ws.BookingMode), request)
if err != nil {
return false, wf, []*workflow_execution.WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, err
}
ws.DurationS = longest
ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds."
if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) {
ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n"
}
execs, err := ws.GetExecutions(wf, isPreemptible)
if err != nil {
return false, wf, []*workflow_execution.WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, err
}
purchased := []*purchase_resource.PurchaseResource{}
bookings := []*booking.Booking{}
for _, exec := range execs {
purchased = append(purchased, exec.Buy(ws.SelectedBillingStrategy, ws.UUID, wfID, priceds)...)
bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...)
}
errCh := make(chan error, len(bookings))
var m sync.Mutex
for _, b := range bookings {
go getBooking(b, request, errCh, &m)
}
for i := 0; i < len(bookings); i++ {
if err := <-errCh; err != nil {
return false, wf, execs, purchased, bookings, err
}
}
return true, wf, execs, purchased, bookings, nil
}
func (ws *WorkflowSchedule) GenerateOrder(purchases []*purchase_resource.PurchaseResource, bookings []*booking.Booking, request *tools.APIRequest) error {
newOrder := &order.Order{
AbstractObject: utils.AbstractObject{
Name: "order_" + request.PeerID + "_" + time.Now().UTC().Format("2006-01-02T15:04:05"),
IsDraft: true,
},
ExecutionsID: ws.UUID,
Purchases: purchases,
Bookings: bookings,
Status: enum.PENDING,
}
if res, _, err := order.NewAccessor(request).StoreOne(newOrder); err == nil {
if _, err := bill.DraftFirstBill(res.(*order.Order), request); err != nil {
return err
}
return nil
} else {
return err
}
}
func getBooking(b *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
m.Lock()
c, err := getCallerCopy(request, errCh)
if err != nil {
errCh <- err
return
}
m.Unlock()
meth := c.URLS[tools.BOOKING][tools.GET]
meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05"))
meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05"))
c.URLS[tools.BOOKING][tools.GET] = meth
_, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c)
if err != nil {
errCh <- fmt.Errorf("error on " + b.DestPeerID + err.Error())
return
}
errCh <- nil
}
func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) {
var c tools.HTTPCaller
err := request.Caller.DeepCopy(c)
if err != nil {
errCh <- err
return tools.HTTPCaller{}, nil
}
c.URLS = request.Caller.URLS
return c, err
}
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*workflow_execution.WorkflowExecution, error) {
if request == nil {
return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("no request found")
}
c := request.Caller
if c == nil || c.URLS == nil || c.URLS[tools.BOOKING] == nil {
return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("no caller defined")
}
methods := c.URLS[tools.BOOKING]
if _, ok := methods[tools.GET]; !ok {
return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("no path found")
}
ok, wf, executions, purchases, bookings, err := ws.GetBuyAndBook(wfID, request)
ws.WorkflowExecution = executions
if !ok || err != nil {
return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
}
ws.Workflow = wf
var errCh = make(chan error, len(bookings))
var m sync.Mutex
for _, purchase := range purchases {
go ws.CallDatacenter(purchase, purchase.DestPeerID, tools.PURCHASE_RESOURCE, request, errCh, &m)
}
for i := 0; i < len(purchases); i++ {
if err := <-errCh; err != nil {
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
}
}
errCh = make(chan error, len(bookings))
for _, booking := range bookings {
go ws.CallDatacenter(booking, booking.DestPeerID, tools.BOOKING, request, errCh, &m)
}
for i := 0; i < len(bookings); i++ {
if err := <-errCh; err != nil {
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
}
}
if err := ws.GenerateOrder(purchases, bookings, request); err != nil {
return ws, wf, executions, err
}
fmt.Println("Schedules")
for _, exec := range executions {
err := exec.PurgeDraft(request)
if err != nil {
return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("purge draft" + fmt.Sprintf("%v", err))
}
exec.StoreDraftDefault()
utils.GenericStoreOne(exec, workflow_execution.NewAccessor(request))
}
fmt.Println("Schedules")
wf.GetAccessor(&tools.APIRequest{Admin: true}).UpdateOne(wf, wf.GetID())
return ws, wf, executions, nil
}
func (ws *WorkflowSchedule) CallDatacenter(purchase utils.DBObject, destPeerID string, dt tools.DataType, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
m.Lock()
c, err := getCallerCopy(request, errCh)
if err != nil {
errCh <- err
return
}
m.Unlock()
if res, err := (&peer.Peer{}).LaunchPeerExecution(destPeerID, "", dt, tools.POST, purchase.Serialize(purchase), &c); err != nil {
errCh <- err
return
} else {
data := res["data"].(map[string]interface{})
purchase.SetID(fmt.Sprintf("%v", data["id"]))
}
errCh <- nil
}
/*
BOOKING IMPLIED TIME, not of subscription but of execution
so is processing time execution time applied on computes
data can improve the processing time
time should implied a security time border (10sec) if not from the same executions
VERIFY THAT WE HANDLE DIFFERENCE BETWEEN LOCATION TIME && BOOKING
*/
/*
* getExecutions is a function that returns the executions of a workflow
* it returns an array of workflow_execution.WorkflowExecution
*/
func (ws *WorkflowSchedule) GetExecutions(workflow *workflow.Workflow, isPreemptible bool) ([]*workflow_execution.WorkflowExecution, error) {
workflows_executions := []*workflow_execution.WorkflowExecution{}
dates, err := ws.GetDates()
if err != nil {
return workflows_executions, err
}
for _, date := range dates {
obj := &workflow_execution.WorkflowExecution{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(), // set the uuid of the execution
Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution
},
Priority: 1,
ExecutionsID: ws.UUID,
ExecDate: date.Start, // set the execution date
EndDate: date.End, // set the end date
State: enum.DRAFT, // set the state to 1 (scheduled)
WorkflowID: workflow.GetID(), // set the workflow id dependancy of the execution
}
if ws.BookingMode != booking.PLANNED {
obj.Priority = 0
}
if ws.BookingMode == booking.PREEMPTED && isPreemptible {
obj.Priority = 7
}
ws.SelectedStrategies = obj.SelectedStrategies
ws.SelectedPartnerships = obj.SelectedPartnerships
ws.SelectedBuyings = obj.SelectedBuyings
ws.SelectedInstances = obj.SelectedInstances
workflows_executions = append(workflows_executions, obj)
}
return workflows_executions, nil
}
func (ws *WorkflowSchedule) GetDates() ([]Schedule, error) {
schedule := []Schedule{}
if len(ws.Cron) > 0 { // if cron is set then end date should be set
if ws.End == nil {
return schedule, errors.New("a cron task should have an end date")
}
if ws.DurationS <= 0 {
ws.DurationS = ws.End.Sub(ws.Start).Seconds()
}
cronStr := strings.Split(ws.Cron, " ") // split the cron string to treat it
if len(cronStr) < 6 { // if the cron string is less than 6 fields, return an error because format is : ss mm hh dd MM dw (6 fields)
return schedule, errors.New("Bad cron message: (" + ws.Cron + "). Should be at least ss mm hh dd MM dw")
}
subCron := strings.Join(cronStr[:6], " ")
// cron should be parsed as ss mm hh dd MM dw t (min 6 fields)
specParser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) // create a new cron parser
sched, err := specParser.Parse(subCron) // parse the cron string
if err != nil {
return schedule, errors.New("Bad cron message: " + err.Error())
}
// loop through the cron schedule to set the executions
for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) {
e := s.Add(time.Duration(ws.DurationS) * time.Second)
schedule = append(schedule, Schedule{
Start: s,
End: &e,
})
}
} else { // if no cron, set the execution to the start date
schedule = append(schedule, Schedule{
Start: ws.Start,
End: ws.End,
})
}
return schedule, nil
}
type Schedule struct {
Start time.Time
End *time.Time
}
/*
* TODO : LARGEST GRAIN PLANIFYING THE WORKFLOW WHEN OPTION IS SET
* SET PROTECTION BORDER TIME
*/