From 5302ed48b3be3a7abe65c217c837ee34b48d788a Mon Sep 17 00:00:00 2001 From: mr Date: Wed, 14 Jan 2026 15:15:26 +0100 Subject: [PATCH] WorkflowScheduler Gestionnary --- controllers/workflow_sheduler.go | 91 ++++---- go.mod | 2 +- go.sum | 6 + infrastructure/scheduler.go | 362 +++++++++++++++++++++++++++++++ 4 files changed, 418 insertions(+), 43 deletions(-) create mode 100644 infrastructure/scheduler.go diff --git a/controllers/workflow_sheduler.go b/controllers/workflow_sheduler.go index e95d95c..0b6f875 100644 --- a/controllers/workflow_sheduler.go +++ b/controllers/workflow_sheduler.go @@ -3,6 +3,7 @@ package controllers import ( "encoding/json" "fmt" + "oc-scheduler/infrastructure" "slices" oclib "cloud.o-forge.io/core/oc-lib" @@ -11,7 +12,6 @@ import ( "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workflow/graph" - "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" beego "github.com/beego/beego/v2/server/web" "github.com/google/uuid" @@ -38,9 +38,9 @@ func (o *WorkflowSchedulerController) Schedule() { e := "" user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) wfId := o.Ctx.Input.Param(":id") - var resp *workflow_execution.WorkflowSchedule + var resp *infrastructure.WorkflowSchedule json.Unmarshal(o.Ctx.Input.CopyBody(100000), &resp) - + caller := tools.NewHTTPCaller(map[tools.DataType]map[tools.METHOD]string{ // paths to call other OC services tools.PEER: { tools.POST: "/status/", @@ -49,14 +49,18 @@ func (o *WorkflowSchedulerController) Schedule() { tools.GET: "/booking/check/:id/:start_date/:end_date", tools.POST: "/booking/", }, - }) logger.Info().Msg("Booking for " + wfId) req := oclib.NewRequest(collection, user, peerID, groups, caller) resp.UUID = uuid.New().String() - sch, err := req.Schedule(wfId, resp) + sch, _, execs, err := resp.Schedules(wfId, &tools.APIRequest{ + Username: user, + PeerID: peerID, + Groups: groups, + Caller: caller, + }) if err != nil { if sch != nil { for _, w := range sch.WorkflowExecution { @@ -73,12 +77,11 @@ func (o *WorkflowSchedulerController) Schedule() { } logger.Info().Msg("Creating S3 service account if necessary") - execs := sch.WorkflowExecution for _, exec := range execs { execId := exec.ExecutionsID logger.Info().Msg("S3 ServiceAccount for " + execId) // execId = "6cdaf6e4-5727-480e-ab97-f78853c4e553" - err = createStorageServiceAccount(execId, peerID, wfId, resp, caller, user, groups) + err = createStorageServiceAccount(execId, peerID, wfId, sch, caller, user, groups) if err != nil { // if sch != nil { // for _, w := range sch.WorkflowExecution { @@ -92,9 +95,9 @@ func (o *WorkflowSchedulerController) Schedule() { } o.ServeJSON() return - } + } } - + o.Data["json"] = map[string]interface{}{ "data": sch.WorkflowExecution, "code": code, @@ -140,10 +143,9 @@ func (o *WorkflowSchedulerController) SearchScheduledDraftOrder() { o.ServeJSON() } - -func createStorageServiceAccount(execId string, peerID string, wfId string, wfs *workflow_execution.WorkflowSchedule, caller *tools.HTTPCaller, user string, groups []string) error { +func createStorageServiceAccount(execId string, peerID string, wfId string, wfs *infrastructure.WorkflowSchedule, caller *tools.HTTPCaller, user string, groups []string) error { // Retrieve the Workflow in the WorkflowSchedule - wf := loadWorkflow(wfId,peerID) + wf := loadWorkflow(wfId, peerID) // storageItems := wf.GetGraphItems(wf.Graph.IsStorage) itemMap := wf.GetItemsByResources() // mapStorageRessources, err := getItemByRessourceId(wf, storageItems) @@ -157,17 +159,17 @@ func createStorageServiceAccount(execId string, peerID string, wfId string, wfs if s.StorageType == enum.S3 { // DEV MULTI PEER MINIO CREDENTIAL CREATION - + // retrieve all the processing linked to a compute using the storage : processing -- compute -- storage - // In this case we need to retrieve the Item ID(s) for each storage to be able to evaluate links with other items - associatedComputingResources := getAssociatedComputeRessources(*wf, itemMap[tools.STORAGE_RESOURCE][id]) + // In this case we need to retrieve the Item ID(s) for each storage to be able to evaluate links with other items + associatedComputingResources := getAssociatedComputeRessources(*wf, itemMap[tools.STORAGE_RESOURCE][id]) for _, computeId := range associatedComputingResources { c, err := oclib.LoadOneComputing(computeId, user, peerID, groups) if err != nil { return err } - + if c.CreatorID == s.CreatorID { // post on datacenter /minio/createServiceAccount err := postCreateServiceAccount(peerID, s, caller, execId, wfId) @@ -183,7 +185,7 @@ func createStorageServiceAccount(execId string, peerID string, wfId string, wfs return err } // post on computing datacenter /minio/createSAsecret - err = postS3Secret(peerID, *s, caller, execId, wfId,*c, access, secret) // create the secret holding the retrieved access on c's peer + err = postS3Secret(peerID, *s, caller, execId, wfId, *c, access, secret) // create the secret holding the retrieved access on c's peer if err != nil { // Add a logger.Info() here return err @@ -232,29 +234,29 @@ func postCreateServiceAccount(peerID string, s *resources.StorageResource, calle } func loadWorkflow(workflowId string, peerId string) *workflow.Workflow { - res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerId, []string{},nil).LoadOne(workflowId) + res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerId, []string{}, nil).LoadOne(workflowId) if res.Code != 200 { l := oclib.GetLogger() l.Error().Msg("Error while loading a workflow for creation of the serviceAccount") - return nil + return nil } return res.ToWorkflow() -} +} // func getItemByRessourceId(storages string) (map[string][]string, error) { // var storagesMap map[string][]string // } func getAssociatedComputeRessources(wf workflow.Workflow, storageNodes []string) []string { - storageProcessingLinks := make([]string,0) - for _, id := range storageNodes{ - processings := getStorageRelatedProcessing(wf, id) // Retrieve all the Processing item linked to one storage node + storageProcessingLinks := make([]string, 0) + for _, id := range storageNodes { + processings := getStorageRelatedProcessing(wf, id) // Retrieve all the Processing item linked to one storage node for _, procId := range processings { computings := getComputeProcessing(wf, procId) - if !slices.Contains(storageProcessingLinks,computings){ - storageProcessingLinks= append(storageProcessingLinks, computings) + if !slices.Contains(storageProcessingLinks, computings) { + storageProcessingLinks = append(storageProcessingLinks, computings) } } } @@ -263,37 +265,43 @@ func getAssociatedComputeRessources(wf workflow.Workflow, storageNodes []string) } // returns a list of processing item's Id that use the Storage -// theses item Id can be used to instantiate the resource +// theses item Id can be used to instantiate the resource func getStorageRelatedProcessing(wf workflow.Workflow, storageId string) (relatedProcessing []string) { var storageLinks []graph.GraphLink // Only keep the links that are associated to the storage for _, link := range wf.Graph.Links { if link.Destination.ID == storageId || link.Source.ID == storageId { - storageLinks = append(storageLinks,link) + storageLinks = append(storageLinks, link) } } for _, link := range storageLinks { var resourceId string - if link.Source.ID != storageId { resourceId = link.Source.ID } else { resourceId = link.Destination.ID } - if wf.Graph.IsProcessing(wf.Graph.Items[resourceId]){ relatedProcessing = append(relatedProcessing, resourceId) } + if link.Source.ID != storageId { + resourceId = link.Source.ID + } else { + resourceId = link.Destination.ID + } + if wf.Graph.IsProcessing(wf.Graph.Items[resourceId]) { + relatedProcessing = append(relatedProcessing, resourceId) + } } return } func getComputeProcessing(wf workflow.Workflow, processingId string) (res string) { - computeRel := wf.GetByRelatedProcessing(processingId,wf.Graph.IsCompute) + computeRel := wf.GetByRelatedProcessing(processingId, wf.Graph.IsCompute) for _, rel := range computeRel { return rel.Node.GetID() } - + return "" } func getServiceAccountCredentials(peerID string, storageRes resources.StorageResource, caller *tools.HTTPCaller, execId string, wfId string, computeRes resources.ComputeResource) (string, string, error) { l := oclib.GetLogger() - fmt.Println("Getting a service account for" + computeRes.CreatorID + " on S3 " + storageRes.Name + " on peer " + storageRes.CreatorID ) + fmt.Println("Getting a service account for" + computeRes.CreatorID + " on S3 " + storageRes.Name + " on peer " + storageRes.CreatorID) res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", peerID, []string{}, nil).LoadOne(storageRes.CreatorID) if res.Code != 200 { l.Error().Msg("Error while loading a peer for creation of the serviceAccount") @@ -313,9 +321,9 @@ func getServiceAccountCredentials(peerID string, storageRes resources.StorageRes l.Error().Msg(err.Error()) return "", "", err } - + result_code := caller.LastResults["code"].(int) - if !slices.Contains([]int{200,201}, result_code) { + if !slices.Contains([]int{200, 201}, result_code) { l.Error().Msg(fmt.Sprint("Error when trying to create a serviceAccount on storage " + storageRes.Name + " on peer at " + p.Url)) if _, ok := caller.LastResults["body"]; ok { l.Error().Msg(string(caller.LastResults["body"].([]byte))) @@ -327,7 +335,7 @@ func getServiceAccountCredentials(peerID string, storageRes resources.StorageRes if a, ok := resp["access"]; !ok { return "", "", fmt.Errorf("Error in the response returned when creating a S3 serviceAccount on " + storageRes.Name + " on peer " + p.UUID) } else { - access = a.(string) + access = a.(string) } if s, ok := resp["secret"]; !ok { @@ -336,7 +344,6 @@ func getServiceAccountCredentials(peerID string, storageRes resources.StorageRes secret = s.(string) } - return access, secret, nil } @@ -359,17 +366,17 @@ func postS3Secret(peerID string, s resources.StorageResource, caller *tools.HTTP if err != nil { l.Error().Msg("Error when executing on peer at " + p.Url + " when creating a secret holding s3 credentials in namespace " + execId) l.Error().Msg(err.Error()) - return fmt.Errorf("Error when executing on peer at " + p.Url + " when creating a secret holding s3 credentials" + " : " + err.Error()) + return fmt.Errorf("Error when executing on peer at " + p.Url + " when creating a secret holding s3 credentials" + " : " + err.Error()) } - + result_code := caller.LastResults["code"].(int) - if !slices.Contains([]int{200,201}, result_code) { - l.Error().Msg(fmt.Sprint("Error when trying to post the credential to " + s.Name + "to a secret on peer at " + p.Url)) + if !slices.Contains([]int{200, 201}, result_code) { + l.Error().Msg(fmt.Sprint("Error when trying to post the credential to " + s.Name + "to a secret on peer at " + p.Url)) if _, ok := caller.LastResults["body"]; ok { l.Error().Msg(string(caller.LastResults["body"].([]byte))) - return fmt.Errorf(string(caller.LastResults["body"].(map[string]interface{})["error"].([]byte))) + return fmt.Errorf(string(caller.LastResults["body"].(map[string]interface{})["error"].([]byte))) } } return nil -} \ No newline at end of file +} diff --git a/go.mod b/go.mod index 7c19f6f..3233e6f 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.23.0 toolchain go1.24.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20250805113921-40a61387b9f1 + cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d github.com/beego/beego/v2 v2.3.8 github.com/google/uuid v1.6.0 github.com/smartystreets/goconvey v1.7.2 diff --git a/go.sum b/go.sum index 6965ed1..e303e7f 100644 --- a/go.sum +++ b/go.sum @@ -26,6 +26,12 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20250805112547-cc939451fd81 h1:539qIasa1Vz+F cloud.o-forge.io/core/oc-lib v0.0.0-20250805112547-cc939451fd81/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= cloud.o-forge.io/core/oc-lib v0.0.0-20250805113921-40a61387b9f1 h1:53KzZ+1JqRY6J7EVzQpNBmLzNuxb8oHNW3UgqxkYABo= cloud.o-forge.io/core/oc-lib v0.0.0-20250805113921-40a61387b9f1/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= +cloud.o-forge.io/core/oc-lib v0.0.0-20260113150431-6d745fe92216 h1:9ab37/TK1JhdOOvYbqq9J9hDbipofBkq0l2GQ6umARY= +cloud.o-forge.io/core/oc-lib v0.0.0-20260113150431-6d745fe92216/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= +cloud.o-forge.io/core/oc-lib v0.0.0-20260114125532-0e378dc19c06 h1:kDTCqxzV8dvLeXPzPWIn4LgFqwgVprrXwNnP+ftA9C0= +cloud.o-forge.io/core/oc-lib v0.0.0-20260114125532-0e378dc19c06/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= +cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d h1:6oGSN4Fb+H7LNVbUEN7vaDtWBHZTdd2Y1BkBdZ7MLXE= +cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/beego/beego/v2 v2.3.8 h1:wplhB1pF4TxR+2SS4PUej8eDoH4xGfxuHfS7wAk9VBc= github.com/beego/beego/v2 v2.3.8/go.mod h1:8vl9+RrXqvodrl9C8yivX1e6le6deCK6RWeq8R7gTTg= diff --git a/infrastructure/scheduler.go b/infrastructure/scheduler.go new file mode 100644 index 0000000..53639fb --- /dev/null +++ b/infrastructure/scheduler.go @@ -0,0 +1,362 @@ +package infrastructure + +import ( + "errors" + "fmt" + "strings" + "sync" + "time" + + "cloud.o-forge.io/core/oc-lib/models/bill" + "cloud.o-forge.io/core/oc-lib/models/booking" + "cloud.o-forge.io/core/oc-lib/models/common/enum" + "cloud.o-forge.io/core/oc-lib/models/common/pricing" + "cloud.o-forge.io/core/oc-lib/models/order" + "cloud.o-forge.io/core/oc-lib/models/peer" + "cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource" + "cloud.o-forge.io/core/oc-lib/models/utils" + "cloud.o-forge.io/core/oc-lib/models/workflow" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" + "cloud.o-forge.io/core/oc-lib/tools" + "github.com/google/uuid" + "github.com/robfig/cron" +) + +/* +* WorkflowSchedule is a struct that contains the scheduling information of a workflow +* It contains the mode of the schedule (Task or Service), the name of the schedule, the start and end time of the schedule and the cron expression + */ +// it's a flying object only use in a session time. It's not stored in the database +type WorkflowSchedule struct { + UUID string `json:"id" validate:"required"` // ExecutionsID is the list of the executions id of the workflow + Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule + WorkflowExecution []*workflow_execution.WorkflowExecution `json:"workflow_executions,omitempty"` // WorkflowExecution is the list of executions of the workflow + Message string `json:"message,omitempty"` // Message is the message of the schedule + Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule + Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time + End *time.Time `json:"end,omitempty"` // End is the end time of the schedule, is required and must be greater than the Start time + DurationS float64 `json:"duration_s" default:"-1"` // End is the end time of the schedule + Cron string `json:"cron,omitempty"` // here the cron format : ss mm hh dd MM dw task + + BookingMode booking.BookingMode `json:"booking_mode,omitempty"` // BookingMode qualify the preemption order of the scheduling. if no payment allowed with preemption set up When_Possible + SelectedInstances workflow.ConfigItem `json:"selected_instances"` + SelectedPartnerships workflow.ConfigItem `json:"selected_partnerships"` + SelectedBuyings workflow.ConfigItem `json:"selected_buyings"` + SelectedStrategies workflow.ConfigItem `json:"selected_strategies"` + + SelectedBillingStrategy pricing.BillingStrategy `json:"selected_billing_strategy"` +} + +// TODO PREEMPTION ! +/* +To schedule a preempted, omg. +pour faire ça on doit alors lancé une exécution prioritaire qui passera devant toutes les autres, celon un niveau de priorité. +Preemptible = 7, pour le moment il n'existera que 0 et 7. +Dans le cas d'une préemption l'exécution est immédiable et bloquera tout le monde tant qu'il n'a pas été exécuté. +Une ressource doit pouvoir être preemptible pour être exécutée de la sorte. +Se qui implique si on est sur une ressource par ressource que si un élement n'est pas préemptible, +alors il devra être effectué dés que possible + +Dans le cas dés que possible, la start date est immédiate MAIS ! +ne pourra se lancé que SI il n'existe pas d'exécution se lançant durant la période indicative. ( Ultra complexe ) +*/ + +func NewScheduler(mode int, start string, end string, durationInS float64, cron string) *WorkflowSchedule { + ws := &WorkflowSchedule{ + UUID: uuid.New().String(), + Start: time.Now(), + BookingMode: booking.BookingMode(mode), + DurationS: durationInS, + Cron: cron, + } + s, err := time.Parse("2006-01-02T15:04:05", start) + if err == nil && ws.BookingMode == booking.PLANNED { + ws.Start = s // can apply a defined start other than now, if planned + } + + e, err := time.Parse("2006-01-02T15:04:05", end) + if err == nil { + ws.End = &e + } + return ws +} + +func (ws *WorkflowSchedule) GetBuyAndBook(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*workflow_execution.WorkflowExecution, []*purchase_resource.PurchaseResource, []*booking.Booking, error) { + if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { + return false, nil, []*workflow_execution.WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, errors.New("no caller defined") + } + access := workflow.NewAccessor(request) + res, code, err := access.LoadOne(wfID) + if code != 200 { + return false, nil, []*workflow_execution.WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, errors.New("could not load the workflow with id: " + err.Error()) + } + wf := res.(*workflow.Workflow) + isPreemptible, longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, + ws.SelectedInstances, ws.SelectedPartnerships, ws.SelectedBuyings, ws.SelectedStrategies, + int(ws.BookingMode), request) + if err != nil { + return false, wf, []*workflow_execution.WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, err + } + ws.DurationS = longest + ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds." + if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) { + ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n" + } + execs, err := ws.GetExecutions(wf, isPreemptible) + if err != nil { + return false, wf, []*workflow_execution.WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, err + } + purchased := []*purchase_resource.PurchaseResource{} + bookings := []*booking.Booking{} + for _, exec := range execs { + purchased = append(purchased, exec.Buy(ws.SelectedBillingStrategy, ws.UUID, wfID, priceds)...) + bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) + } + + errCh := make(chan error, len(bookings)) + var m sync.Mutex + + for _, b := range bookings { + go getBooking(b, request, errCh, &m) + } + + for i := 0; i < len(bookings); i++ { + if err := <-errCh; err != nil { + return false, wf, execs, purchased, bookings, err + } + } + + return true, wf, execs, purchased, bookings, nil +} + +func (ws *WorkflowSchedule) GenerateOrder(purchases []*purchase_resource.PurchaseResource, bookings []*booking.Booking, request *tools.APIRequest) error { + newOrder := &order.Order{ + AbstractObject: utils.AbstractObject{ + Name: "order_" + request.PeerID + "_" + time.Now().UTC().Format("2006-01-02T15:04:05"), + IsDraft: true, + }, + ExecutionsID: ws.UUID, + Purchases: purchases, + Bookings: bookings, + Status: enum.PENDING, + } + if res, _, err := order.NewAccessor(request).StoreOne(newOrder); err == nil { + if _, err := bill.DraftFirstBill(res.(*order.Order), request); err != nil { + return err + } + return nil + } else { + return err + } +} + +func getBooking(b *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) { + m.Lock() + c, err := getCallerCopy(request, errCh) + if err != nil { + errCh <- err + return + } + m.Unlock() + + meth := c.URLS[tools.BOOKING][tools.GET] + meth = strings.ReplaceAll(meth, ":id", b.ResourceID) + meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) + meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) + c.URLS[tools.BOOKING][tools.GET] = meth + _, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c) + + if err != nil { + errCh <- fmt.Errorf("error on " + b.DestPeerID + err.Error()) + return + } + + errCh <- nil +} + +func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) { + var c tools.HTTPCaller + err := request.Caller.DeepCopy(c) + if err != nil { + errCh <- err + return tools.HTTPCaller{}, nil + } + c.URLS = request.Caller.URLS + return c, err +} + +func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*workflow_execution.WorkflowExecution, error) { + if request == nil { + return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("no request found") + } + c := request.Caller + if c == nil || c.URLS == nil || c.URLS[tools.BOOKING] == nil { + return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("no caller defined") + } + methods := c.URLS[tools.BOOKING] + if _, ok := methods[tools.GET]; !ok { + return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("no path found") + } + ok, wf, executions, purchases, bookings, err := ws.GetBuyAndBook(wfID, request) + ws.WorkflowExecution = executions + if !ok || err != nil { + return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) + } + ws.Workflow = wf + + var errCh = make(chan error, len(bookings)) + var m sync.Mutex + + for _, purchase := range purchases { + go ws.CallDatacenter(purchase, purchase.DestPeerID, tools.PURCHASE_RESOURCE, request, errCh, &m) + } + for i := 0; i < len(purchases); i++ { + if err := <-errCh; err != nil { + return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) + } + } + + errCh = make(chan error, len(bookings)) + + for _, booking := range bookings { + go ws.CallDatacenter(booking, booking.DestPeerID, tools.BOOKING, request, errCh, &m) + } + + for i := 0; i < len(bookings); i++ { + if err := <-errCh; err != nil { + return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) + } + } + + if err := ws.GenerateOrder(purchases, bookings, request); err != nil { + return ws, wf, executions, err + } + + fmt.Println("Schedules") + for _, exec := range executions { + err := exec.PurgeDraft(request) + if err != nil { + return ws, nil, []*workflow_execution.WorkflowExecution{}, errors.New("purge draft" + fmt.Sprintf("%v", err)) + } + exec.StoreDraftDefault() + utils.GenericStoreOne(exec, workflow_execution.NewAccessor(request)) + } + fmt.Println("Schedules") + + wf.GetAccessor(&tools.APIRequest{Admin: true}).UpdateOne(wf, wf.GetID()) + + return ws, wf, executions, nil +} + +func (ws *WorkflowSchedule) CallDatacenter(purchase utils.DBObject, destPeerID string, dt tools.DataType, request *tools.APIRequest, errCh chan error, m *sync.Mutex) { + m.Lock() + c, err := getCallerCopy(request, errCh) + if err != nil { + errCh <- err + return + } + m.Unlock() + if res, err := (&peer.Peer{}).LaunchPeerExecution(destPeerID, "", dt, tools.POST, purchase.Serialize(purchase), &c); err != nil { + errCh <- err + return + } else { + data := res["data"].(map[string]interface{}) + purchase.SetID(fmt.Sprintf("%v", data["id"])) + } + errCh <- nil +} + +/* +BOOKING IMPLIED TIME, not of subscription but of execution +so is processing time execution time applied on computes +data can improve the processing time +time should implied a security time border (10sec) if not from the same executions +VERIFY THAT WE HANDLE DIFFERENCE BETWEEN LOCATION TIME && BOOKING +*/ + +/* +* getExecutions is a function that returns the executions of a workflow +* it returns an array of workflow_execution.WorkflowExecution + */ +func (ws *WorkflowSchedule) GetExecutions(workflow *workflow.Workflow, isPreemptible bool) ([]*workflow_execution.WorkflowExecution, error) { + workflows_executions := []*workflow_execution.WorkflowExecution{} + dates, err := ws.GetDates() + if err != nil { + return workflows_executions, err + } + for _, date := range dates { + obj := &workflow_execution.WorkflowExecution{ + AbstractObject: utils.AbstractObject{ + UUID: uuid.New().String(), // set the uuid of the execution + Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution + }, + Priority: 1, + ExecutionsID: ws.UUID, + ExecDate: date.Start, // set the execution date + EndDate: date.End, // set the end date + State: enum.DRAFT, // set the state to 1 (scheduled) + WorkflowID: workflow.GetID(), // set the workflow id dependancy of the execution + } + if ws.BookingMode != booking.PLANNED { + obj.Priority = 0 + } + if ws.BookingMode == booking.PREEMPTED && isPreemptible { + obj.Priority = 7 + } + + ws.SelectedStrategies = obj.SelectedStrategies + ws.SelectedPartnerships = obj.SelectedPartnerships + ws.SelectedBuyings = obj.SelectedBuyings + ws.SelectedInstances = obj.SelectedInstances + + workflows_executions = append(workflows_executions, obj) + } + return workflows_executions, nil +} + +func (ws *WorkflowSchedule) GetDates() ([]Schedule, error) { + schedule := []Schedule{} + if len(ws.Cron) > 0 { // if cron is set then end date should be set + if ws.End == nil { + return schedule, errors.New("a cron task should have an end date") + } + if ws.DurationS <= 0 { + ws.DurationS = ws.End.Sub(ws.Start).Seconds() + } + cronStr := strings.Split(ws.Cron, " ") // split the cron string to treat it + if len(cronStr) < 6 { // if the cron string is less than 6 fields, return an error because format is : ss mm hh dd MM dw (6 fields) + return schedule, errors.New("Bad cron message: (" + ws.Cron + "). Should be at least ss mm hh dd MM dw") + } + subCron := strings.Join(cronStr[:6], " ") + // cron should be parsed as ss mm hh dd MM dw t (min 6 fields) + specParser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) // create a new cron parser + sched, err := specParser.Parse(subCron) // parse the cron string + if err != nil { + return schedule, errors.New("Bad cron message: " + err.Error()) + } + // loop through the cron schedule to set the executions + for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) { + e := s.Add(time.Duration(ws.DurationS) * time.Second) + schedule = append(schedule, Schedule{ + Start: s, + End: &e, + }) + } + } else { // if no cron, set the execution to the start date + schedule = append(schedule, Schedule{ + Start: ws.Start, + End: ws.End, + }) + } + return schedule, nil +} + +type Schedule struct { + Start time.Time + End *time.Time +} + +/* +* TODO : LARGEST GRAIN PLANIFYING THE WORKFLOW WHEN OPTION IS SET +* SET PROTECTION BORDER TIME + */