diff --git a/models/common/enum/infrastructure.go b/models/common/enum/infrastructure.go new file mode 100644 index 0000000..911af0e --- /dev/null +++ b/models/common/enum/infrastructure.go @@ -0,0 +1,15 @@ +package enum + +type InfrastructureType int + +const ( + DOCKER InfrastructureType = iota + KUBERNETES + SLURM + HW + CONDOR +) + +func (t InfrastructureType) String() string { + return [...]string{"DOCKER", "KUBERNETES", "SLURM", "HW", "CONDOR"}[t] +} diff --git a/models/common/schedule_type.go b/models/common/enum/schedule.go similarity index 97% rename from models/common/schedule_type.go rename to models/common/enum/schedule.go index 86c7d1a..95ab281 100644 --- a/models/common/schedule_type.go +++ b/models/common/enum/schedule.go @@ -1,4 +1,4 @@ -package common +package enum type ScheduledType int diff --git a/models/common/size.go b/models/common/enum/size.go similarity index 97% rename from models/common/size.go rename to models/common/enum/size.go index c2c4b82..7a7e126 100644 --- a/models/common/size.go +++ b/models/common/enum/size.go @@ -1,4 +1,4 @@ -package common +package enum type StorageSize int diff --git a/models/common/access_configuration.go b/models/common/models/access_configuration.go similarity index 98% rename from models/common/access_configuration.go rename to models/common/models/access_configuration.go index 6df6174..a428854 100644 --- a/models/common/access_configuration.go +++ b/models/common/models/access_configuration.go @@ -1,4 +1,4 @@ -package common +package models type Container struct { Image string `json:"image,omitempty" bson:"image,omitempty"` // Image is the container image TEMPO diff --git a/models/common/devices.go b/models/common/models/devices.go similarity index 76% rename from models/common/devices.go rename to models/common/models/devices.go index 666ba6c..77f2f74 100644 --- a/models/common/devices.go +++ b/models/common/models/devices.go @@ -1,4 +1,4 @@ -package common +package models // CPU is a struct that represents a CPU type CPU struct { @@ -18,17 +18,3 @@ type GPU struct { MemoryGb float64 `bson:"memory,omitempty" json:"memory,omitempty" description:"Units in MB"` Cores map[string]int `bson:"cores,omitempty" json:"cores,omitempty"` } - -type InfrastructureType int - -const ( - DOCKER InfrastructureType = iota - KUBERNETES - SLURM - HW - CONDOR -) - -func (t InfrastructureType) String() string { - return [...]string{"DOCKER", "KUBERNETES", "SLURM", "HW", "CONDOR"}[t] -} diff --git a/models/common/planner.go b/models/common/planner.go new file mode 100644 index 0000000..7db0c75 --- /dev/null +++ b/models/common/planner.go @@ -0,0 +1,42 @@ +package common + +import ( + "time" + + "cloud.o-forge.io/core/oc-lib/models/common/pricing" + "cloud.o-forge.io/core/oc-lib/tools" +) + +func GetPlannerNearestStart(start time.Time, planned map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 { + near := float64(10000000000) // set a high value + for _, items := range planned { // loop through the planned items + for _, priced := range items { // loop through the priced items + if priced.GetLocationStart() == nil { // if the start is nil, + continue // skip the iteration + } + newS := priced.GetLocationStart() // get the start + if newS.Sub(start).Seconds() < near { // if the difference between the start and the new start is less than the nearest start + near = newS.Sub(start).Seconds() + } + } + } + return near +} + +func GetPlannerLongestTime(end *time.Time, planned map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 { + if end == nil { + return -1 + } + longestTime := float64(0) + for _, priced := range planned[tools.PROCESSING_RESOURCE] { + if priced.GetLocationEnd() == nil { + continue + } + newS := priced.GetLocationEnd() + if longestTime < newS.Sub(*end).Seconds() { + longestTime = newS.Sub(*end).Seconds() + } + // get the nearest start from start var + } + return longestTime +} diff --git a/models/order/order.go b/models/order/order.go index b52f948..9d04567 100644 --- a/models/order/order.go +++ b/models/order/order.go @@ -115,10 +115,10 @@ func (o *Order) draftStoreFromModel(scheduler *workflow_execution.WorkflowSchedu // set the name of the order resourcesByPeer := map[string][]pricing.PricedItemITF{} // create a map of resources by peer - processings := scheduler.Workflow.GetPricedItem(scheduler.Workflow.IsProcessing, request) // get the processing items - datas := scheduler.Workflow.GetPricedItem(scheduler.Workflow.IsData, request) // get the data items - storages := scheduler.Workflow.GetPricedItem(scheduler.Workflow.IsStorage, request) // get the storage items - workflows := scheduler.Workflow.GetPricedItem(scheduler.Workflow.IsWorkflow, request) // get the workflow items + processings := scheduler.Workflow.GetPricedItem(scheduler.Workflow.Graph.IsProcessing, request) // get the processing items + datas := scheduler.Workflow.GetPricedItem(scheduler.Workflow.Graph.IsData, request) // get the data items + storages := scheduler.Workflow.GetPricedItem(scheduler.Workflow.Graph.IsStorage, request) // get the storage items + workflows := scheduler.Workflow.GetPricedItem(scheduler.Workflow.Graph.IsWorkflow, request) // get the workflow items for _, items := range []map[string]pricing.PricedItemITF{processings, datas, storages, workflows} { for _, item := range items { if _, ok := resourcesByPeer[item.GetCreatorID()]; !ok { diff --git a/models/resources/compute.go b/models/resources/compute.go index 7f47087..242cd8a 100644 --- a/models/resources/compute.go +++ b/models/resources/compute.go @@ -5,7 +5,8 @@ import ( "strings" "time" - "cloud.o-forge.io/core/oc-lib/models/common" + "cloud.o-forge.io/core/oc-lib/models/common/enum" + "cloud.o-forge.io/core/oc-lib/models/common/models" "cloud.o-forge.io/core/oc-lib/models/common/pricing" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" @@ -17,8 +18,8 @@ import ( */ type ComputeResource struct { AbstractIntanciatedResource[*ComputeResourceInstance] - Architecture string `json:"architecture,omitempty" bson:"architecture,omitempty"` // Architecture is the architecture - Infrastructure common.InfrastructureType `json:"infrastructure,omitempty" bson:"infrastructure,omitempty"` + Architecture string `json:"architecture,omitempty" bson:"architecture,omitempty"` // Architecture is the architecture + Infrastructure enum.InfrastructureType `json:"infrastructure,omitempty" bson:"infrastructure,omitempty"` } func (d *ComputeResource) GetAccessor(request *tools.APIRequest) utils.Accessor { @@ -27,6 +28,9 @@ func (d *ComputeResource) GetAccessor(request *tools.APIRequest) utils.Accessor func (abs *ComputeResource) ConvertToPricedResource( t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF { + if t != tools.COMPUTE_RESOURCE { + return nil + } p := abs.AbstractIntanciatedResource.ConvertToPricedResource(t, request) priced := p.(*PricedResource) return &PricedComputeResource{ @@ -37,7 +41,7 @@ func (abs *ComputeResource) ConvertToPricedResource( type ComputeNode struct { Name string `json:"name,omitempty" bson:"name,omitempty"` Quantity int64 `json:"quantity" bson:"quantity" default:"1"` - RAM *common.RAM `bson:"ram,omitempty" json:"ram,omitempty"` // RAM is the RAM + RAM *models.RAM `bson:"ram,omitempty" json:"ram,omitempty"` // RAM is the RAM CPUs map[string]int64 `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model GPUs map[string]int64 `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model } @@ -47,8 +51,8 @@ type ComputeResourceInstance struct { SecurityLevel string `json:"security_level,omitempty" bson:"security_level,omitempty"` PowerSources []string `json:"power_sources,omitempty" bson:"power_sources,omitempty"` AnnualCO2Emissions float64 `json:"annual_co2_emissions,omitempty" bson:"co2_emissions,omitempty"` - CPUs map[string]*common.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model - GPUs map[string]*common.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model + CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model + GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model Nodes []*ComputeNode `json:"nodes,omitempty" bson:"nodes,omitempty"` } diff --git a/models/resources/data.go b/models/resources/data.go index 0fab993..0cc479a 100644 --- a/models/resources/data.go +++ b/models/resources/data.go @@ -43,6 +43,9 @@ func (d *DataResource) GetAccessor(request *tools.APIRequest) utils.Accessor { func (abs *DataResource) ConvertToPricedResource( t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF { + if t != tools.DATA_RESOURCE { + return nil + } p := abs.AbstractIntanciatedResource.ConvertToPricedResource(t, request) priced := p.(*PricedResource) return &PricedDataResource{ diff --git a/models/resources/processing.go b/models/resources/processing.go index a0509e6..374deaa 100644 --- a/models/resources/processing.go +++ b/models/resources/processing.go @@ -3,16 +3,17 @@ package resources import ( "time" - "cloud.o-forge.io/core/oc-lib/models/common" + "cloud.o-forge.io/core/oc-lib/models/common/enum" + "cloud.o-forge.io/core/oc-lib/models/common/models" "cloud.o-forge.io/core/oc-lib/models/common/pricing" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" ) type ProcessingUsage struct { - CPUs map[string]*common.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model - GPUs map[string]*common.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model - RAM *common.RAM `bson:"ram,omitempty" json:"ram,omitempty"` // RAM is the RAM + CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model + GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model + RAM *models.RAM `bson:"ram,omitempty" json:"ram,omitempty"` // RAM is the RAM StorageGb float64 `bson:"storage,omitempty" json:"storage,omitempty"` // Storage is the storage Hypothesis string `bson:"hypothesis,omitempty" json:"hypothesis,omitempty"` @@ -25,13 +26,13 @@ type ProcessingUsage struct { */ type ProcessingResource struct { AbstractIntanciatedResource[*ResourceInstance[*ResourcePartnerShip[*ProcessingResourcePricingProfile]]] - Infrastructure common.InfrastructureType `json:"infrastructure,omitempty" bson:"infrastructure,omitempty"` - IsService bool `json:"is_service,omitempty" bson:"is_service,omitempty"` // IsService is a flag that indicates if the processing is a service - Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"` // Usage is the usage of the processing - OpenSource bool `json:"open_source" bson:"open_source" default:"false"` - License string `json:"license,omitempty" bson:"license,omitempty"` - Maturity string `json:"maturity,omitempty" bson:"maturity,omitempty"` - Container *common.Container `json:"container,omitempty" bson:"container,omitempty"` // Container is the container + Infrastructure enum.InfrastructureType `json:"infrastructure,omitempty" bson:"infrastructure,omitempty"` + IsService bool `json:"is_service,omitempty" bson:"is_service,omitempty"` // IsService is a flag that indicates if the processing is a service + Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"` // Usage is the usage of the processing + OpenSource bool `json:"open_source" bson:"open_source" default:"false"` + License string `json:"license,omitempty" bson:"license,omitempty"` + Maturity string `json:"maturity,omitempty" bson:"maturity,omitempty"` + Container *models.Container `json:"container,omitempty" bson:"container,omitempty"` // Container is the container } type PricedProcessingResource struct { diff --git a/models/resources/resource.go b/models/resources/resource.go index 27bbfda..35a3425 100644 --- a/models/resources/resource.go +++ b/models/resources/resource.go @@ -33,10 +33,6 @@ type AbstractResource struct { SelectedInstanceIndex *int `json:"selected_instance_index,omitempty" bson:"selected_instance_index,omitempty"` // SelectedInstance is the selected instance } -func (ao *AbstractResource) GetAccessor(request *tools.APIRequest) utils.Accessor { - return nil -} - func (r *AbstractResource) StoreDraftDefault() { r.IsDraft = true } diff --git a/models/resources/storage.go b/models/resources/storage.go index 7a91bcb..bd46add 100644 --- a/models/resources/storage.go +++ b/models/resources/storage.go @@ -4,7 +4,7 @@ import ( "errors" "time" - "cloud.o-forge.io/core/oc-lib/models/common" + "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/pricing" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" @@ -15,25 +15,37 @@ import ( * it defines the resource storage */ type StorageResource struct { - AbstractIntanciatedResource[*StorageResourceInstance] // AbstractResource contains the basic fields of an object (id, name) - Source string `bson:"source,omitempty" json:"source,omitempty"` // Source is the source of the storage - StorageType common.StorageType `bson:"storage_type,omitempty" json:"storage_type,omitempty"` // Type is the type of the storage - Acronym string `bson:"acronym,omitempty" json:"acronym,omitempty"` // Acronym is the acronym of the storage + AbstractIntanciatedResource[*StorageResourceInstance] // AbstractResource contains the basic fields of an object (id, name) + Source string `bson:"source,omitempty" json:"source,omitempty"` // Source is the source of the storage + StorageType enum.StorageType `bson:"storage_type,omitempty" json:"storage_type,omitempty"` // Type is the type of the storage + Acronym string `bson:"acronym,omitempty" json:"acronym,omitempty"` // Acronym is the acronym of the storage } func (d *StorageResource) GetAccessor(request *tools.APIRequest) utils.Accessor { return NewAccessor[*StorageResource](tools.STORAGE_RESOURCE, request, func() utils.DBObject { return &StorageResource{} }) // Create a new instance of the accessor } +func (abs *StorageResource) ConvertToPricedResource( + t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF { + if t != tools.STORAGE_RESOURCE { + return nil + } + p := abs.AbstractIntanciatedResource.ConvertToPricedResource(t, request) + priced := p.(*PricedResource) + return &PricedStorageResource{ + PricedResource: *priced, + } +} + type StorageResourceInstance struct { ResourceInstance[*StorageResourcePartnership] - Local bool `bson:"local" json:"local"` - SecurityLevel string `bson:"security_level,omitempty" json:"security_level,omitempty"` - SizeType common.StorageSize `bson:"size_type" json:"size_type" default:"0"` // SizeType is the type of the storage size - SizeGB int64 `bson:"size,omitempty" json:"size,omitempty"` // Size is the size of the storage - Encryption bool `bson:"encryption,omitempty" json:"encryption,omitempty"` // Encryption is a flag that indicates if the storage is encrypted - Redundancy string `bson:"redundancy,omitempty" json:"redundancy,omitempty"` // Redundancy is the redundancy of the storage - Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` // Throughput is the throughput of the storage + Local bool `bson:"local" json:"local"` + SecurityLevel string `bson:"security_level,omitempty" json:"security_level,omitempty"` + SizeType enum.StorageSize `bson:"size_type" json:"size_type" default:"0"` // SizeType is the type of the storage size + SizeGB int64 `bson:"size,omitempty" json:"size,omitempty"` // Size is the size of the storage + Encryption bool `bson:"encryption,omitempty" json:"encryption,omitempty"` // Encryption is a flag that indicates if the storage is encrypted + Redundancy string `bson:"redundancy,omitempty" json:"redundancy,omitempty"` // Redundancy is the redundancy of the storage + Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` // Throughput is the throughput of the storage } func (i *StorageResourceInstance) GetID() string { diff --git a/models/workflow/graph/graph.go b/models/workflow/graph/graph.go index 3ad7bf7..358b67b 100644 --- a/models/workflow/graph/graph.go +++ b/models/workflow/graph/graph.go @@ -14,6 +14,26 @@ type Graph struct { Links []GraphLink `bson:"links" json:"links" default:"{}" validate:"required"` // Links is the list of links between elements in the graph } +func (wf *Graph) IsProcessing(item GraphItem) bool { + return item.Processing != nil +} + +func (wf *Graph) IsCompute(item GraphItem) bool { + return item.Compute != nil +} + +func (wf *Graph) IsData(item GraphItem) bool { + return item.Data != nil +} + +func (wf *Graph) IsStorage(item GraphItem) bool { + return item.Storage != nil +} + +func (wf *Graph) IsWorkflow(item GraphItem) bool { + return item.Workflow != nil +} + func (g *Graph) GetAverageTimeRelatedToProcessingActivity(start time.Time, processings []*resources.ProcessingResource, resource resources.ResourceInterface, f func(GraphItem) resources.ResourceInterface, request *tools.APIRequest) (float64, float64) { nearestStart := float64(10000000000) diff --git a/models/workflow/workflow.go b/models/workflow/workflow.go index ca1dc52..ed8a98f 100644 --- a/models/workflow/workflow.go +++ b/models/workflow/workflow.go @@ -2,10 +2,10 @@ package workflow import ( "errors" - "fmt" "time" "cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area" + "cloud.o-forge.io/core/oc-lib/models/common" "cloud.o-forge.io/core/oc-lib/models/common/pricing" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resources" @@ -15,24 +15,23 @@ import ( ) /* -* AbstractWorkflow is a struct that represents a workflow for resource or native workflow -* Warning: there is 2 types of workflows, the resource workflow and the native workflow -* native workflow is the one that you create to schedule an execution -* resource workflow is the one that is created to set our native workflow in catalog +* Workflow is a struct that represents a workflow +* it defines the native workflow */ -type AbstractWorkflow struct { +type Workflow struct { + utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) resources.ResourceSet Graph *graph.Graph `bson:"graph,omitempty" json:"graph,omitempty"` // Graph UI & logic representation of the workflow ScheduleActive bool `json:"schedule_active" bson:"schedule_active"` // ScheduleActive is a flag that indicates if the schedule is active, if not the workflow is not scheduled and no execution or booking will be set // Schedule *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"` // Schedule is the schedule of the workflow - Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workflow + Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workflow // AbstractWorkflow contains the basic fields of a workflow } func (d *Workflow) GetAccessor(request *tools.APIRequest) utils.Accessor { return NewAccessor(request) // Create a new instance of the accessor } -func (w *AbstractWorkflow) GetGraphItems(f func(item graph.GraphItem) bool) (list_datas []graph.GraphItem) { +func (w *Workflow) GetGraphItems(f func(item graph.GraphItem) bool) (list_datas []graph.GraphItem) { for _, item := range w.Graph.Items { if f(item) { list_datas = append(list_datas, item) @@ -41,18 +40,7 @@ func (w *AbstractWorkflow) GetGraphItems(f func(item graph.GraphItem) bool) (lis return } -func (w *AbstractWorkflow) GetResources(f func(item graph.GraphItem) bool) map[string]resources.ResourceInterface { - list_datas := map[string]resources.ResourceInterface{} - for _, item := range w.Graph.Items { - if f(item) { - _, res := item.GetResource() - list_datas[res.GetID()] = res - } - } - return list_datas -} - -func (w *AbstractWorkflow) GetPricedItem(f func(item graph.GraphItem) bool, request *tools.APIRequest) map[string]pricing.PricedItemITF { +func (w *Workflow) GetPricedItem(f func(item graph.GraphItem) bool, request *tools.APIRequest) map[string]pricing.PricedItemITF { list_datas := map[string]pricing.PricedItemITF{} for _, item := range w.Graph.Items { if f(item) { @@ -64,7 +52,7 @@ func (w *AbstractWorkflow) GetPricedItem(f func(item graph.GraphItem) bool, requ return list_datas } -func (w *AbstractWorkflow) GetByRelatedProcessing(processingID string, g func(item graph.GraphItem) bool) []resources.ResourceInterface { +func (w *Workflow) GetByRelatedProcessing(processingID string, g func(item graph.GraphItem) bool) []resources.ResourceInterface { storages := []resources.ResourceInterface{} for _, link := range w.Graph.Links { nodeID := link.Destination.ID @@ -85,43 +73,6 @@ func (w *AbstractWorkflow) GetByRelatedProcessing(processingID string, g func(it return storages } -func (wf *AbstractWorkflow) IsProcessing(item graph.GraphItem) bool { - return item.Processing != nil -} - -func (wf *AbstractWorkflow) IsCompute(item graph.GraphItem) bool { - return item.Compute != nil -} - -func (wf *AbstractWorkflow) IsData(item graph.GraphItem) bool { - return item.Data != nil -} - -func (wf *AbstractWorkflow) IsStorage(item graph.GraphItem) bool { - return item.Storage != nil -} - -func (wf *AbstractWorkflow) IsWorkflow(item graph.GraphItem) bool { - return item.Workflow != nil -} - -/* -* Workflow is a struct that represents a workflow -* it defines the native workflow - */ -type Workflow struct { - utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) - AbstractWorkflow // AbstractWorkflow contains the basic fields of a workflow -} - -func (w *Workflow) getPricedItem(item graph.GraphItem, request *tools.APIRequest) pricing.PricedItemITF { - dt, res := item.GetResource() - if dt == tools.INVALID { - return nil - } - return res.ConvertToPricedResource(dt, request) -} - func (ao *Workflow) VerifyAuth(request *tools.APIRequest) bool { isAuthorized := false if len(ao.Shared) > 0 { @@ -166,123 +117,83 @@ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) { } func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIRequest) (float64, map[tools.DataType][]pricing.PricedItemITF, *Workflow, error) { - processings := []*resources.ProcessingResource{} priceds := map[tools.DataType][]pricing.PricedItemITF{} - priceds[tools.PROCESSING_RESOURCE] = []pricing.PricedItemITF{} - for _, item := range wf.GetGraphItems(wf.IsProcessing) { - dt, realItem := item.GetResource() - if realItem == nil { - return 0, priceds, nil, errors.New("could not load the processing resource") - } - priced := realItem.ConvertToPricedResource(dt, request) - timeFromStartS := wf.Graph.GetAverageTimeProcessingBeforeStart(0, realItem.GetID(), request) - started := start.Add(time.Duration(timeFromStartS) * time.Second) - priced.SetLocationStart(started) - priced.SetLocationEnd(started.Add(time.Duration(priced.GetExplicitDurationInS()))) - processings = append(processings, realItem.(*resources.ProcessingResource)) - priceds[tools.PROCESSING_RESOURCE] = append(priceds[tools.PROCESSING_RESOURCE], priced) + ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, wf, priceds, request, wf.Graph.IsProcessing, + func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) { + return start.Add(time.Duration(wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(), request)) * time.Second), priced.GetExplicitDurationInS() + }, func(started time.Time, duration float64) time.Time { + return started.Add(time.Duration(duration)) + }) + if err != nil { + return 0, priceds, nil, err } - priceds[tools.DATA_RESOURCE] = []pricing.PricedItemITF{} - for _, item := range wf.GetGraphItems(wf.IsData) { - dt, realItem := item.GetResource() - if realItem == nil { - continue - } - priced := realItem.ConvertToPricedResource(dt, request) - priced.SetLocationStart(start) - priced.SetLocationEnd(*end) - priceds[tools.PROCESSING_RESOURCE] = append(priceds[tools.PROCESSING_RESOURCE], priced) + if _, priceds, err = plan[resources.ResourceInterface](tools.DATA_RESOURCE, wf, priceds, request, wf.Graph.IsData, + func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) { + return start, 0 + }, func(started time.Time, duration float64) time.Time { + return *end + }); err != nil { + return 0, priceds, nil, err } - for _, f := range []func(graph.GraphItem) bool{wf.IsStorage, wf.IsCompute} { - for _, item := range wf.GetGraphItems(f) { - dt, r := item.GetResource() - if r == nil { - continue - } - if priceds[dt] == nil { - priceds[dt] = []pricing.PricedItemITF{} - } - priced := r.ConvertToPricedResource(dt, request) - nearestStart, longestDuration := wf.Graph.GetAverageTimeRelatedToProcessingActivity(start, processings, r, - func(i graph.GraphItem) resources.ResourceInterface { + for k, f := range map[tools.DataType]func(graph.GraphItem) bool{tools.STORAGE_RESOURCE: wf.Graph.IsStorage, tools.COMPUTE_RESOURCE: wf.Graph.IsCompute} { + if _, priceds, err = plan[resources.ResourceInterface](k, wf, priceds, request, f, + func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) { + nearestStart, longestDuration := wf.Graph.GetAverageTimeRelatedToProcessingActivity(start, ps, res, func(i graph.GraphItem) (r resources.ResourceInterface) { if f(i) { - _, r := i.GetResource() - return r - } else { - return nil + _, r = i.GetResource() } + return r }, request) - started := start.Add(time.Duration(nearestStart) * time.Second) - priced.SetLocationStart(started) - if longestDuration >= 0 { - priced.SetLocationEnd(started.Add(time.Duration(longestDuration))) - } - priceds[dt] = append(priceds[dt], priced) + return start.Add(time.Duration(nearestStart) * time.Second), longestDuration + }, func(started time.Time, duration float64) time.Time { + return started.Add(time.Duration(duration)) + }); err != nil { + return 0, priceds, nil, err } } - longest := wf.getLongestTime(end, priceds, request) - priceds[tools.WORKFLOW_RESOURCE] = []pricing.PricedItemITF{} - for _, item := range wf.GetGraphItems(wf.IsWorkflow) { - access := NewAccessor(nil) - _, r := item.GetResource() - if r == nil { - return 0, priceds, nil, errors.New("could not load the workflow") - } - priced := r.ConvertToPricedResource(tools.WORKFLOW_RESOURCE, request) - res, code, err := access.LoadOne(r.GetID()) - if code != 200 || err != nil { - return 0, priceds, nil, errors.New("could not load the workflow with id: " + fmt.Sprintf("%v", err.Error())) - } - neoLongest := float64(0) - innerWF := res.(*Workflow) - neoLongest, _, innerWF, err = innerWF.Planify(start, end, request) - if neoLongest > longest { - longest = neoLongest - } - started := start.Add(time.Duration(wf.getNearestStart(start, priceds, request)) * time.Second) - priced.SetLocationStart(started) - durationE := time.Duration(longest) - if durationE < 0 { - continue - } - ended := start.Add(durationE * time.Second) - priced.SetLocationEnd(ended) - priceds[tools.WORKFLOW_RESOURCE] = append(priceds[tools.WORKFLOW_RESOURCE], priced) + longest := common.GetPlannerLongestTime(end, priceds, request) + if _, priceds, err = plan[resources.ResourceInterface](tools.WORKFLOW_RESOURCE, wf, priceds, request, wf.Graph.IsWorkflow, + func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) { + start := start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds, request)) * time.Second) + longest := float64(-1) + r, code, err := res.GetAccessor(request).LoadOne(res.GetID()) + if code != 200 || err != nil { + return start, longest + } + if neoLongest, _, _, err := r.(*Workflow).Planify(start, end, request); err != nil { + return start, longest + } else if neoLongest > longest { + longest = neoLongest + } + return start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds, request)) * time.Second), longest + }, func(start time.Time, longest float64) time.Time { + return start.Add(time.Duration(longest) * time.Second) + }); err != nil { + return 0, priceds, nil, err } return longest, priceds, wf, nil } -func (wf *Workflow) getNearestStart(start time.Time, priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 { - near := float64(10000000000) - for _, items := range priceds { - for _, priced := range items { - if priced.GetLocationStart() == nil { - continue - } - newS := priced.GetLocationStart() - if newS.Sub(start).Seconds() < near { - near = newS.Sub(start).Seconds() - } +func plan[T resources.ResourceInterface](dt tools.DataType, wf *Workflow, priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest, + f func(graph.GraphItem) bool, start func(resources.ResourceInterface, pricing.PricedItemITF) (time.Time, float64), end func(time.Time, float64) time.Time) ([]T, map[tools.DataType][]pricing.PricedItemITF, error) { + resources := []T{} + for _, item := range wf.GetGraphItems(f) { + if priceds[dt] == nil { + priceds[dt] = []pricing.PricedItemITF{} } - // get the nearest start from start var + dt, realItem := item.GetResource() + if realItem == nil { + return resources, priceds, errors.New("could not load the processing resource") + } + priced := realItem.ConvertToPricedResource(dt, request) + started, duration := start(realItem, priced) + priced.SetLocationStart(started) + if duration >= 0 { + priced.SetLocationEnd(end(started, duration)) + } + priced.SetLocationEnd(end(started, priced.GetExplicitDurationInS())) + resources = append(resources, realItem.(T)) + priceds[dt] = append(priceds[dt], priced) } - return near -} - -func (wf *Workflow) getLongestTime(end *time.Time, priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 { - if end == nil { - return -1 - } - longestTime := float64(0) - for _, priced := range priceds[tools.PROCESSING_RESOURCE] { - if priced.GetLocationEnd() == nil { - continue - } - newS := priced.GetLocationEnd() - if longestTime < newS.Sub(*end).Seconds() { - longestTime = newS.Sub(*end).Seconds() - } - // get the nearest start from start var - } - return longestTime + return resources, priceds, nil }