light modification
This commit is contained in:
		@@ -2,10 +2,10 @@ package workflow
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
 | 
			
		||||
	"cloud.o-forge.io/core/oc-lib/models/common"
 | 
			
		||||
	"cloud.o-forge.io/core/oc-lib/models/common/pricing"
 | 
			
		||||
	"cloud.o-forge.io/core/oc-lib/models/peer"
 | 
			
		||||
	"cloud.o-forge.io/core/oc-lib/models/resources"
 | 
			
		||||
@@ -15,24 +15,23 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
* AbstractWorkflow is a struct that represents a workflow for resource or native workflow
 | 
			
		||||
* Warning: there is 2 types of workflows, the resource workflow and the native workflow
 | 
			
		||||
* native workflow is the one that you create to schedule an execution
 | 
			
		||||
* resource workflow is the one that is created to set our native workflow in catalog
 | 
			
		||||
* Workflow is a struct that represents a workflow
 | 
			
		||||
* it defines the native workflow
 | 
			
		||||
 */
 | 
			
		||||
type AbstractWorkflow struct {
 | 
			
		||||
type Workflow struct {
 | 
			
		||||
	utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
 | 
			
		||||
	resources.ResourceSet
 | 
			
		||||
	Graph          *graph.Graph `bson:"graph,omitempty" json:"graph,omitempty"` // Graph UI & logic representation of the workflow
 | 
			
		||||
	ScheduleActive bool         `json:"schedule_active" bson:"schedule_active"` // ScheduleActive is a flag that indicates if the schedule is active, if not the workflow is not scheduled and no execution or booking will be set
 | 
			
		||||
	// Schedule       *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"` // Schedule is the schedule of the workflow
 | 
			
		||||
	Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workflow
 | 
			
		||||
	Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workflow     // AbstractWorkflow contains the basic fields of a workflow
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (d *Workflow) GetAccessor(request *tools.APIRequest) utils.Accessor {
 | 
			
		||||
	return NewAccessor(request) // Create a new instance of the accessor
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *AbstractWorkflow) GetGraphItems(f func(item graph.GraphItem) bool) (list_datas []graph.GraphItem) {
 | 
			
		||||
func (w *Workflow) GetGraphItems(f func(item graph.GraphItem) bool) (list_datas []graph.GraphItem) {
 | 
			
		||||
	for _, item := range w.Graph.Items {
 | 
			
		||||
		if f(item) {
 | 
			
		||||
			list_datas = append(list_datas, item)
 | 
			
		||||
@@ -41,18 +40,7 @@ func (w *AbstractWorkflow) GetGraphItems(f func(item graph.GraphItem) bool) (lis
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *AbstractWorkflow) GetResources(f func(item graph.GraphItem) bool) map[string]resources.ResourceInterface {
 | 
			
		||||
	list_datas := map[string]resources.ResourceInterface{}
 | 
			
		||||
	for _, item := range w.Graph.Items {
 | 
			
		||||
		if f(item) {
 | 
			
		||||
			_, res := item.GetResource()
 | 
			
		||||
			list_datas[res.GetID()] = res
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return list_datas
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *AbstractWorkflow) GetPricedItem(f func(item graph.GraphItem) bool, request *tools.APIRequest) map[string]pricing.PricedItemITF {
 | 
			
		||||
func (w *Workflow) GetPricedItem(f func(item graph.GraphItem) bool, request *tools.APIRequest) map[string]pricing.PricedItemITF {
 | 
			
		||||
	list_datas := map[string]pricing.PricedItemITF{}
 | 
			
		||||
	for _, item := range w.Graph.Items {
 | 
			
		||||
		if f(item) {
 | 
			
		||||
@@ -64,7 +52,7 @@ func (w *AbstractWorkflow) GetPricedItem(f func(item graph.GraphItem) bool, requ
 | 
			
		||||
	return list_datas
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *AbstractWorkflow) GetByRelatedProcessing(processingID string, g func(item graph.GraphItem) bool) []resources.ResourceInterface {
 | 
			
		||||
func (w *Workflow) GetByRelatedProcessing(processingID string, g func(item graph.GraphItem) bool) []resources.ResourceInterface {
 | 
			
		||||
	storages := []resources.ResourceInterface{}
 | 
			
		||||
	for _, link := range w.Graph.Links {
 | 
			
		||||
		nodeID := link.Destination.ID
 | 
			
		||||
@@ -85,43 +73,6 @@ func (w *AbstractWorkflow) GetByRelatedProcessing(processingID string, g func(it
 | 
			
		||||
	return storages
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (wf *AbstractWorkflow) IsProcessing(item graph.GraphItem) bool {
 | 
			
		||||
	return item.Processing != nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (wf *AbstractWorkflow) IsCompute(item graph.GraphItem) bool {
 | 
			
		||||
	return item.Compute != nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (wf *AbstractWorkflow) IsData(item graph.GraphItem) bool {
 | 
			
		||||
	return item.Data != nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (wf *AbstractWorkflow) IsStorage(item graph.GraphItem) bool {
 | 
			
		||||
	return item.Storage != nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (wf *AbstractWorkflow) IsWorkflow(item graph.GraphItem) bool {
 | 
			
		||||
	return item.Workflow != nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
* Workflow is a struct that represents a workflow
 | 
			
		||||
* it defines the native workflow
 | 
			
		||||
 */
 | 
			
		||||
type Workflow struct {
 | 
			
		||||
	utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
 | 
			
		||||
	AbstractWorkflow     // AbstractWorkflow contains the basic fields of a workflow
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (w *Workflow) getPricedItem(item graph.GraphItem, request *tools.APIRequest) pricing.PricedItemITF {
 | 
			
		||||
	dt, res := item.GetResource()
 | 
			
		||||
	if dt == tools.INVALID {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	return res.ConvertToPricedResource(dt, request)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ao *Workflow) VerifyAuth(request *tools.APIRequest) bool {
 | 
			
		||||
	isAuthorized := false
 | 
			
		||||
	if len(ao.Shared) > 0 {
 | 
			
		||||
@@ -166,123 +117,83 @@ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIRequest) (float64, map[tools.DataType][]pricing.PricedItemITF, *Workflow, error) {
 | 
			
		||||
	processings := []*resources.ProcessingResource{}
 | 
			
		||||
	priceds := map[tools.DataType][]pricing.PricedItemITF{}
 | 
			
		||||
	priceds[tools.PROCESSING_RESOURCE] = []pricing.PricedItemITF{}
 | 
			
		||||
	for _, item := range wf.GetGraphItems(wf.IsProcessing) {
 | 
			
		||||
		dt, realItem := item.GetResource()
 | 
			
		||||
		if realItem == nil {
 | 
			
		||||
			return 0, priceds, nil, errors.New("could not load the processing resource")
 | 
			
		||||
		}
 | 
			
		||||
		priced := realItem.ConvertToPricedResource(dt, request)
 | 
			
		||||
		timeFromStartS := wf.Graph.GetAverageTimeProcessingBeforeStart(0, realItem.GetID(), request)
 | 
			
		||||
		started := start.Add(time.Duration(timeFromStartS) * time.Second)
 | 
			
		||||
		priced.SetLocationStart(started)
 | 
			
		||||
		priced.SetLocationEnd(started.Add(time.Duration(priced.GetExplicitDurationInS())))
 | 
			
		||||
		processings = append(processings, realItem.(*resources.ProcessingResource))
 | 
			
		||||
		priceds[tools.PROCESSING_RESOURCE] = append(priceds[tools.PROCESSING_RESOURCE], priced)
 | 
			
		||||
	ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, wf, priceds, request, wf.Graph.IsProcessing,
 | 
			
		||||
		func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
 | 
			
		||||
			return start.Add(time.Duration(wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(), request)) * time.Second), priced.GetExplicitDurationInS()
 | 
			
		||||
		}, func(started time.Time, duration float64) time.Time {
 | 
			
		||||
			return started.Add(time.Duration(duration))
 | 
			
		||||
		})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return 0, priceds, nil, err
 | 
			
		||||
	}
 | 
			
		||||
	priceds[tools.DATA_RESOURCE] = []pricing.PricedItemITF{}
 | 
			
		||||
	for _, item := range wf.GetGraphItems(wf.IsData) {
 | 
			
		||||
		dt, realItem := item.GetResource()
 | 
			
		||||
		if realItem == nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		priced := realItem.ConvertToPricedResource(dt, request)
 | 
			
		||||
		priced.SetLocationStart(start)
 | 
			
		||||
		priced.SetLocationEnd(*end)
 | 
			
		||||
		priceds[tools.PROCESSING_RESOURCE] = append(priceds[tools.PROCESSING_RESOURCE], priced)
 | 
			
		||||
	if _, priceds, err = plan[resources.ResourceInterface](tools.DATA_RESOURCE, wf, priceds, request, wf.Graph.IsData,
 | 
			
		||||
		func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
 | 
			
		||||
			return start, 0
 | 
			
		||||
		}, func(started time.Time, duration float64) time.Time {
 | 
			
		||||
			return *end
 | 
			
		||||
		}); err != nil {
 | 
			
		||||
		return 0, priceds, nil, err
 | 
			
		||||
	}
 | 
			
		||||
	for _, f := range []func(graph.GraphItem) bool{wf.IsStorage, wf.IsCompute} {
 | 
			
		||||
		for _, item := range wf.GetGraphItems(f) {
 | 
			
		||||
			dt, r := item.GetResource()
 | 
			
		||||
			if r == nil {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			if priceds[dt] == nil {
 | 
			
		||||
				priceds[dt] = []pricing.PricedItemITF{}
 | 
			
		||||
			}
 | 
			
		||||
			priced := r.ConvertToPricedResource(dt, request)
 | 
			
		||||
			nearestStart, longestDuration := wf.Graph.GetAverageTimeRelatedToProcessingActivity(start, processings, r,
 | 
			
		||||
				func(i graph.GraphItem) resources.ResourceInterface {
 | 
			
		||||
	for k, f := range map[tools.DataType]func(graph.GraphItem) bool{tools.STORAGE_RESOURCE: wf.Graph.IsStorage, tools.COMPUTE_RESOURCE: wf.Graph.IsCompute} {
 | 
			
		||||
		if _, priceds, err = plan[resources.ResourceInterface](k, wf, priceds, request, f,
 | 
			
		||||
			func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
 | 
			
		||||
				nearestStart, longestDuration := wf.Graph.GetAverageTimeRelatedToProcessingActivity(start, ps, res, func(i graph.GraphItem) (r resources.ResourceInterface) {
 | 
			
		||||
					if f(i) {
 | 
			
		||||
						_, r := i.GetResource()
 | 
			
		||||
						return r
 | 
			
		||||
					} else {
 | 
			
		||||
						return nil
 | 
			
		||||
						_, r = i.GetResource()
 | 
			
		||||
					}
 | 
			
		||||
					return r
 | 
			
		||||
				}, request)
 | 
			
		||||
			started := start.Add(time.Duration(nearestStart) * time.Second)
 | 
			
		||||
			priced.SetLocationStart(started)
 | 
			
		||||
			if longestDuration >= 0 {
 | 
			
		||||
				priced.SetLocationEnd(started.Add(time.Duration(longestDuration)))
 | 
			
		||||
			}
 | 
			
		||||
			priceds[dt] = append(priceds[dt], priced)
 | 
			
		||||
				return start.Add(time.Duration(nearestStart) * time.Second), longestDuration
 | 
			
		||||
			}, func(started time.Time, duration float64) time.Time {
 | 
			
		||||
				return started.Add(time.Duration(duration))
 | 
			
		||||
			}); err != nil {
 | 
			
		||||
			return 0, priceds, nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	longest := wf.getLongestTime(end, priceds, request)
 | 
			
		||||
	priceds[tools.WORKFLOW_RESOURCE] = []pricing.PricedItemITF{}
 | 
			
		||||
	for _, item := range wf.GetGraphItems(wf.IsWorkflow) {
 | 
			
		||||
		access := NewAccessor(nil)
 | 
			
		||||
		_, r := item.GetResource()
 | 
			
		||||
		if r == nil {
 | 
			
		||||
			return 0, priceds, nil, errors.New("could not load the workflow")
 | 
			
		||||
		}
 | 
			
		||||
		priced := r.ConvertToPricedResource(tools.WORKFLOW_RESOURCE, request)
 | 
			
		||||
		res, code, err := access.LoadOne(r.GetID())
 | 
			
		||||
		if code != 200 || err != nil {
 | 
			
		||||
			return 0, priceds, nil, errors.New("could not load the workflow with id: " + fmt.Sprintf("%v", err.Error()))
 | 
			
		||||
		}
 | 
			
		||||
		neoLongest := float64(0)
 | 
			
		||||
		innerWF := res.(*Workflow)
 | 
			
		||||
		neoLongest, _, innerWF, err = innerWF.Planify(start, end, request)
 | 
			
		||||
		if neoLongest > longest {
 | 
			
		||||
			longest = neoLongest
 | 
			
		||||
		}
 | 
			
		||||
		started := start.Add(time.Duration(wf.getNearestStart(start, priceds, request)) * time.Second)
 | 
			
		||||
		priced.SetLocationStart(started)
 | 
			
		||||
		durationE := time.Duration(longest)
 | 
			
		||||
		if durationE < 0 {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		ended := start.Add(durationE * time.Second)
 | 
			
		||||
		priced.SetLocationEnd(ended)
 | 
			
		||||
		priceds[tools.WORKFLOW_RESOURCE] = append(priceds[tools.WORKFLOW_RESOURCE], priced)
 | 
			
		||||
	longest := common.GetPlannerLongestTime(end, priceds, request)
 | 
			
		||||
	if _, priceds, err = plan[resources.ResourceInterface](tools.WORKFLOW_RESOURCE, wf, priceds, request, wf.Graph.IsWorkflow,
 | 
			
		||||
		func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
 | 
			
		||||
			start := start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds, request)) * time.Second)
 | 
			
		||||
			longest := float64(-1)
 | 
			
		||||
			r, code, err := res.GetAccessor(request).LoadOne(res.GetID())
 | 
			
		||||
			if code != 200 || err != nil {
 | 
			
		||||
				return start, longest
 | 
			
		||||
			}
 | 
			
		||||
			if neoLongest, _, _, err := r.(*Workflow).Planify(start, end, request); err != nil {
 | 
			
		||||
				return start, longest
 | 
			
		||||
			} else if neoLongest > longest {
 | 
			
		||||
				longest = neoLongest
 | 
			
		||||
			}
 | 
			
		||||
			return start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds, request)) * time.Second), longest
 | 
			
		||||
		}, func(start time.Time, longest float64) time.Time {
 | 
			
		||||
			return start.Add(time.Duration(longest) * time.Second)
 | 
			
		||||
		}); err != nil {
 | 
			
		||||
		return 0, priceds, nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return longest, priceds, wf, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (wf *Workflow) getNearestStart(start time.Time, priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 {
 | 
			
		||||
	near := float64(10000000000)
 | 
			
		||||
	for _, items := range priceds {
 | 
			
		||||
		for _, priced := range items {
 | 
			
		||||
			if priced.GetLocationStart() == nil {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			newS := priced.GetLocationStart()
 | 
			
		||||
			if newS.Sub(start).Seconds() < near {
 | 
			
		||||
				near = newS.Sub(start).Seconds()
 | 
			
		||||
			}
 | 
			
		||||
func plan[T resources.ResourceInterface](dt tools.DataType, wf *Workflow, priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest,
 | 
			
		||||
	f func(graph.GraphItem) bool, start func(resources.ResourceInterface, pricing.PricedItemITF) (time.Time, float64), end func(time.Time, float64) time.Time) ([]T, map[tools.DataType][]pricing.PricedItemITF, error) {
 | 
			
		||||
	resources := []T{}
 | 
			
		||||
	for _, item := range wf.GetGraphItems(f) {
 | 
			
		||||
		if priceds[dt] == nil {
 | 
			
		||||
			priceds[dt] = []pricing.PricedItemITF{}
 | 
			
		||||
		}
 | 
			
		||||
		// get the nearest start from start var
 | 
			
		||||
		dt, realItem := item.GetResource()
 | 
			
		||||
		if realItem == nil {
 | 
			
		||||
			return resources, priceds, errors.New("could not load the processing resource")
 | 
			
		||||
		}
 | 
			
		||||
		priced := realItem.ConvertToPricedResource(dt, request)
 | 
			
		||||
		started, duration := start(realItem, priced)
 | 
			
		||||
		priced.SetLocationStart(started)
 | 
			
		||||
		if duration >= 0 {
 | 
			
		||||
			priced.SetLocationEnd(end(started, duration))
 | 
			
		||||
		}
 | 
			
		||||
		priced.SetLocationEnd(end(started, priced.GetExplicitDurationInS()))
 | 
			
		||||
		resources = append(resources, realItem.(T))
 | 
			
		||||
		priceds[dt] = append(priceds[dt], priced)
 | 
			
		||||
	}
 | 
			
		||||
	return near
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (wf *Workflow) getLongestTime(end *time.Time, priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 {
 | 
			
		||||
	if end == nil {
 | 
			
		||||
		return -1
 | 
			
		||||
	}
 | 
			
		||||
	longestTime := float64(0)
 | 
			
		||||
	for _, priced := range priceds[tools.PROCESSING_RESOURCE] {
 | 
			
		||||
		if priced.GetLocationEnd() == nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		newS := priced.GetLocationEnd()
 | 
			
		||||
		if longestTime < newS.Sub(*end).Seconds() {
 | 
			
		||||
			longestTime = newS.Sub(*end).Seconds()
 | 
			
		||||
		}
 | 
			
		||||
		// get the nearest start from start var
 | 
			
		||||
	}
 | 
			
		||||
	return longestTime
 | 
			
		||||
	return resources, priceds, nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user