set up
This commit is contained in:
@@ -55,7 +55,7 @@ func (wf *Graph) IsWorkflow(item GraphItem) bool {
|
||||
}
|
||||
|
||||
func (g *Graph) GetAverageTimeRelatedToProcessingActivity(start time.Time, processings []*resources.ProcessingResource, resource resources.ResourceInterface,
|
||||
f func(GraphItem) resources.ResourceInterface, request *tools.APIRequest) (float64, float64) {
|
||||
f func(GraphItem) resources.ResourceInterface, request *tools.APIRequest, buyingStrategy int, pricingStrategy int) (float64, float64) {
|
||||
nearestStart := float64(10000000000)
|
||||
oneIsInfinite := false
|
||||
longestDuration := float64(0)
|
||||
@@ -67,7 +67,7 @@ func (g *Graph) GetAverageTimeRelatedToProcessingActivity(start time.Time, proce
|
||||
} else if link.Source.ID == processing.GetID() && f(g.Items[link.Source.ID]) != nil && f(g.Items[link.Source.ID]).GetID() == resource.GetID() { // if the source is the processing and the destination is not a compute
|
||||
source = link.Destination.ID
|
||||
}
|
||||
priced := processing.ConvertToPricedResource(tools.PROCESSING_RESOURCE, request)
|
||||
priced := processing.ConvertToPricedResource(tools.PROCESSING_RESOURCE, request, buyingStrategy, pricingStrategy)
|
||||
if source != "" {
|
||||
if priced.GetLocationStart() != nil {
|
||||
near := float64(priced.GetLocationStart().Sub(start).Seconds())
|
||||
@@ -96,7 +96,7 @@ func (g *Graph) GetAverageTimeRelatedToProcessingActivity(start time.Time, proce
|
||||
/*
|
||||
* GetAverageTimeBeforeStart is a function that returns the average time before the start of a processing
|
||||
*/
|
||||
func (g *Graph) GetAverageTimeProcessingBeforeStart(average float64, processingID string, request *tools.APIRequest) float64 {
|
||||
func (g *Graph) GetAverageTimeProcessingBeforeStart(average float64, processingID string, request *tools.APIRequest, buyingStrategy int, pricingStrategy int) float64 {
|
||||
currents := []float64{} // list of current time
|
||||
for _, link := range g.Links { // for each link
|
||||
var source string // source is the source of the link
|
||||
@@ -112,13 +112,13 @@ func (g *Graph) GetAverageTimeProcessingBeforeStart(average float64, processingI
|
||||
if r == nil { // if item is nil, continue
|
||||
continue
|
||||
}
|
||||
priced := r.ConvertToPricedResource(dt, request)
|
||||
priced := r.ConvertToPricedResource(dt, request, buyingStrategy, pricingStrategy)
|
||||
current := priced.GetExplicitDurationInS() // get the explicit duration of the item
|
||||
if current < 0 { // if current is negative, its means that duration of a before could be infinite continue
|
||||
return current
|
||||
}
|
||||
current += g.GetAverageTimeProcessingBeforeStart(current, source, request) // get the average time before start of the source
|
||||
currents = append(currents, current) // append the current to the currents
|
||||
current += g.GetAverageTimeProcessingBeforeStart(current, source, request, buyingStrategy, pricingStrategy) // get the average time before start of the source
|
||||
currents = append(currents, current) // append the current to the currents
|
||||
}
|
||||
var max float64 // get the max time to wait dependancies to finish
|
||||
for _, current := range currents {
|
||||
|
||||
@@ -77,12 +77,13 @@ func (w *Workflow) GetGraphItems(f func(item graph.GraphItem) bool) (list_datas
|
||||
return
|
||||
}
|
||||
|
||||
func (w *Workflow) GetPricedItem(f func(item graph.GraphItem) bool, request *tools.APIRequest) map[string]pricing.PricedItemITF {
|
||||
func (w *Workflow) GetPricedItem(
|
||||
f func(item graph.GraphItem) bool, request *tools.APIRequest, buyingStrategy int, pricingStrategy int) map[string]pricing.PricedItemITF {
|
||||
list_datas := map[string]pricing.PricedItemITF{}
|
||||
for _, item := range w.Graph.Items {
|
||||
if f(item) {
|
||||
dt, res := item.GetResource()
|
||||
ord := res.ConvertToPricedResource(dt, request)
|
||||
ord := res.ConvertToPricedResource(dt, request, buyingStrategy, pricingStrategy)
|
||||
list_datas[res.GetID()] = ord
|
||||
}
|
||||
}
|
||||
@@ -165,11 +166,12 @@ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIRequest) (float64, map[tools.DataType]map[string]pricing.PricedItemITF, *Workflow, error) {
|
||||
func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIRequest, buyingStrategy int, pricingStrategy int) (float64, map[tools.DataType]map[string]pricing.PricedItemITF, *Workflow, error) {
|
||||
priceds := map[tools.DataType]map[string]pricing.PricedItemITF{}
|
||||
ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, wf, priceds, request, wf.Graph.IsProcessing,
|
||||
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
|
||||
return start.Add(time.Duration(wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(), request)) * time.Second), priced.GetExplicitDurationInS()
|
||||
ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, wf, priceds, request,
|
||||
buyingStrategy, pricingStrategy,
|
||||
wf.Graph.IsProcessing, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
|
||||
return start.Add(time.Duration(wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(), request, buyingStrategy, pricingStrategy)) * time.Second), priced.GetExplicitDurationInS()
|
||||
}, func(started time.Time, duration float64) *time.Time {
|
||||
s := started.Add(time.Duration(duration))
|
||||
return &s
|
||||
@@ -177,8 +179,8 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIR
|
||||
if err != nil {
|
||||
return 0, priceds, nil, err
|
||||
}
|
||||
if _, priceds, err = plan[resources.ResourceInterface](tools.DATA_RESOURCE, wf, priceds, request, wf.Graph.IsData,
|
||||
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
|
||||
if _, priceds, err = plan[resources.ResourceInterface](tools.DATA_RESOURCE, wf, priceds, request, buyingStrategy, pricingStrategy,
|
||||
wf.Graph.IsData, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
|
||||
return start, 0
|
||||
}, func(started time.Time, duration float64) *time.Time {
|
||||
return end
|
||||
@@ -186,14 +188,14 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIR
|
||||
return 0, priceds, nil, err
|
||||
}
|
||||
for k, f := range map[tools.DataType]func(graph.GraphItem) bool{tools.STORAGE_RESOURCE: wf.Graph.IsStorage, tools.COMPUTE_RESOURCE: wf.Graph.IsCompute} {
|
||||
if _, priceds, err = plan[resources.ResourceInterface](k, wf, priceds, request, f,
|
||||
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
|
||||
if _, priceds, err = plan[resources.ResourceInterface](k, wf, priceds, request, buyingStrategy, pricingStrategy,
|
||||
f, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
|
||||
nearestStart, longestDuration := wf.Graph.GetAverageTimeRelatedToProcessingActivity(start, ps, res, func(i graph.GraphItem) (r resources.ResourceInterface) {
|
||||
if f(i) {
|
||||
_, r = i.GetResource()
|
||||
}
|
||||
return r
|
||||
}, request)
|
||||
}, request, buyingStrategy, pricingStrategy)
|
||||
return start.Add(time.Duration(nearestStart) * time.Second), longestDuration
|
||||
}, func(started time.Time, duration float64) *time.Time {
|
||||
s := started.Add(time.Duration(duration))
|
||||
@@ -203,7 +205,8 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIR
|
||||
}
|
||||
}
|
||||
longest := common.GetPlannerLongestTime(end, priceds, request)
|
||||
if _, priceds, err = plan[resources.ResourceInterface](tools.WORKFLOW_RESOURCE, wf, priceds, request, wf.Graph.IsWorkflow,
|
||||
if _, priceds, err = plan[resources.ResourceInterface](tools.WORKFLOW_RESOURCE, wf, priceds, request,
|
||||
buyingStrategy, pricingStrategy, wf.Graph.IsWorkflow,
|
||||
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
|
||||
start := start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds, request)) * time.Second)
|
||||
longest := float64(-1)
|
||||
@@ -211,7 +214,7 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIR
|
||||
if code != 200 || err != nil {
|
||||
return start, longest
|
||||
}
|
||||
if neoLongest, _, _, err := r.(*Workflow).Planify(start, end, request); err != nil {
|
||||
if neoLongest, _, _, err := r.(*Workflow).Planify(start, end, request, buyingStrategy, pricingStrategy); err != nil {
|
||||
return start, longest
|
||||
} else if neoLongest > longest {
|
||||
longest = neoLongest
|
||||
@@ -226,7 +229,9 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIR
|
||||
return longest, priceds, wf, nil
|
||||
}
|
||||
|
||||
func plan[T resources.ResourceInterface](dt tools.DataType, wf *Workflow, priceds map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest,
|
||||
func plan[T resources.ResourceInterface](
|
||||
dt tools.DataType, wf *Workflow, priceds map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest,
|
||||
buyingStrategy int, pricingStrategy int,
|
||||
f func(graph.GraphItem) bool, start func(resources.ResourceInterface, pricing.PricedItemITF) (time.Time, float64), end func(time.Time, float64) *time.Time) ([]T, map[tools.DataType]map[string]pricing.PricedItemITF, error) {
|
||||
resources := []T{}
|
||||
for _, item := range wf.GetGraphItems(f) {
|
||||
@@ -237,7 +242,7 @@ func plan[T resources.ResourceInterface](dt tools.DataType, wf *Workflow, priced
|
||||
if realItem == nil {
|
||||
return resources, priceds, errors.New("could not load the processing resource")
|
||||
}
|
||||
priced := realItem.ConvertToPricedResource(dt, request)
|
||||
priced := realItem.ConvertToPricedResource(dt, request, buyingStrategy, pricingStrategy)
|
||||
started, duration := start(realItem, priced)
|
||||
priced.SetLocationStart(started)
|
||||
if duration >= 0 {
|
||||
|
||||
Reference in New Issue
Block a user