289 lines
10 KiB
Go
289 lines
10 KiB
Go
package workflow
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
|
|
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
|
|
"cloud.o-forge.io/core/oc-lib/models/peer"
|
|
"cloud.o-forge.io/core/oc-lib/models/resources"
|
|
"cloud.o-forge.io/core/oc-lib/models/utils"
|
|
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
|
|
"cloud.o-forge.io/core/oc-lib/tools"
|
|
)
|
|
|
|
/*
|
|
* AbstractWorkflow is a struct that represents a workflow for resource or native workflow
|
|
* Warning: there is 2 types of workflows, the resource workflow and the native workflow
|
|
* native workflow is the one that you create to schedule an execution
|
|
* resource workflow is the one that is created to set our native workflow in catalog
|
|
*/
|
|
type AbstractWorkflow struct {
|
|
resources.ResourceSet
|
|
Graph *graph.Graph `bson:"graph,omitempty" json:"graph,omitempty"` // Graph UI & logic representation of the workflow
|
|
ScheduleActive bool `json:"schedule_active" bson:"schedule_active"` // ScheduleActive is a flag that indicates if the schedule is active, if not the workflow is not scheduled and no execution or booking will be set
|
|
// Schedule *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"` // Schedule is the schedule of the workflow
|
|
Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workflow
|
|
}
|
|
|
|
func (d *Workflow) GetAccessor(request *tools.APIRequest) utils.Accessor {
|
|
return NewAccessor(request) // Create a new instance of the accessor
|
|
}
|
|
|
|
func (w *AbstractWorkflow) GetGraphItems(f func(item graph.GraphItem) bool) (list_datas []graph.GraphItem) {
|
|
for _, item := range w.Graph.Items {
|
|
if f(item) {
|
|
list_datas = append(list_datas, item)
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (w *AbstractWorkflow) GetResources(f func(item graph.GraphItem) bool) map[string]resources.ResourceInterface {
|
|
list_datas := map[string]resources.ResourceInterface{}
|
|
for _, item := range w.Graph.Items {
|
|
if f(item) {
|
|
_, res := item.GetResource()
|
|
list_datas[res.GetID()] = res
|
|
}
|
|
}
|
|
return list_datas
|
|
}
|
|
|
|
func (w *AbstractWorkflow) GetPricedItem(f func(item graph.GraphItem) bool, request *tools.APIRequest) map[string]pricing.PricedItemITF {
|
|
list_datas := map[string]pricing.PricedItemITF{}
|
|
for _, item := range w.Graph.Items {
|
|
if f(item) {
|
|
dt, res := item.GetResource()
|
|
ord := res.ConvertToPricedResource(dt, request)
|
|
list_datas[res.GetID()] = ord
|
|
}
|
|
}
|
|
return list_datas
|
|
}
|
|
|
|
func (w *AbstractWorkflow) GetByRelatedProcessing(processingID string, g func(item graph.GraphItem) bool) []resources.ResourceInterface {
|
|
storages := []resources.ResourceInterface{}
|
|
for _, link := range w.Graph.Links {
|
|
nodeID := link.Destination.ID
|
|
var node resources.ResourceInterface
|
|
if g(w.Graph.Items[link.Source.ID]) {
|
|
item := w.Graph.Items[link.Source.ID]
|
|
_, node = item.GetResource()
|
|
}
|
|
if node == nil && g(w.Graph.Items[link.Destination.ID]) { // if the source is not a storage, we consider that the destination is the storage
|
|
nodeID = link.Source.ID
|
|
item := w.Graph.Items[link.Destination.ID] // and the processing is the source
|
|
_, node = item.GetResource() // we are looking for the storage as destination
|
|
}
|
|
if processingID == nodeID && node != nil { // if the storage is linked to the processing
|
|
storages = append(storages, node)
|
|
}
|
|
}
|
|
return storages
|
|
}
|
|
|
|
func (wf *AbstractWorkflow) IsProcessing(item graph.GraphItem) bool {
|
|
return item.Processing != nil
|
|
}
|
|
|
|
func (wf *AbstractWorkflow) IsCompute(item graph.GraphItem) bool {
|
|
return item.Compute != nil
|
|
}
|
|
|
|
func (wf *AbstractWorkflow) IsData(item graph.GraphItem) bool {
|
|
return item.Data != nil
|
|
}
|
|
|
|
func (wf *AbstractWorkflow) IsStorage(item graph.GraphItem) bool {
|
|
return item.Storage != nil
|
|
}
|
|
|
|
func (wf *AbstractWorkflow) IsWorkflow(item graph.GraphItem) bool {
|
|
return item.Workflow != nil
|
|
}
|
|
|
|
/*
|
|
* Workflow is a struct that represents a workflow
|
|
* it defines the native workflow
|
|
*/
|
|
type Workflow struct {
|
|
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
|
|
AbstractWorkflow // AbstractWorkflow contains the basic fields of a workflow
|
|
}
|
|
|
|
func (w *Workflow) getPricedItem(item graph.GraphItem, request *tools.APIRequest) pricing.PricedItemITF {
|
|
dt, res := item.GetResource()
|
|
if dt == tools.INVALID {
|
|
return nil
|
|
}
|
|
return res.ConvertToPricedResource(dt, request)
|
|
}
|
|
|
|
func (ao *Workflow) VerifyAuth(request *tools.APIRequest) bool {
|
|
isAuthorized := false
|
|
if len(ao.Shared) > 0 {
|
|
for _, shared := range ao.Shared {
|
|
shared, code, _ := shallow_collaborative_area.NewAccessor(request).LoadOne(shared)
|
|
if code != 200 || shared == nil {
|
|
isAuthorized = false
|
|
}
|
|
isAuthorized = shared.VerifyAuth(request)
|
|
}
|
|
}
|
|
return ao.AbstractObject.VerifyAuth(request) || isAuthorized
|
|
}
|
|
|
|
/*
|
|
* CheckBooking is a function that checks the booking of the workflow on peers (even ourselves)
|
|
*/
|
|
func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) {
|
|
// check if
|
|
if wfa.Graph == nil { // no graph no booking
|
|
return false, nil
|
|
}
|
|
accessor := (&resources.ComputeResource{}).GetAccessor(&tools.APIRequest{Caller: caller})
|
|
for _, link := range wfa.Graph.Links {
|
|
if ok, compute_id := link.IsComputeLink(*wfa.Graph); ok { // check if the link is a link between a compute and a resource
|
|
compute, code, _ := accessor.LoadOne(compute_id)
|
|
if code != 200 {
|
|
continue
|
|
}
|
|
// CHECK BOOKING ON PEER, compute could be a remote one
|
|
peerID := compute.(*resources.ComputeResource).CreatorID
|
|
if peerID == "" {
|
|
return false, errors.New("no peer id")
|
|
} // no peer id no booking, we need to know where to book
|
|
_, err := (&peer.Peer{}).LaunchPeerExecution(peerID, compute_id, tools.BOOKING, tools.GET, nil, caller)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIRequest) (float64, map[tools.DataType][]pricing.PricedItemITF, *Workflow, error) {
|
|
processings := []*resources.ProcessingResource{}
|
|
priceds := map[tools.DataType][]pricing.PricedItemITF{}
|
|
priceds[tools.PROCESSING_RESOURCE] = []pricing.PricedItemITF{}
|
|
for _, item := range wf.GetGraphItems(wf.IsProcessing) {
|
|
dt, realItem := item.GetResource()
|
|
if realItem == nil {
|
|
return 0, priceds, nil, errors.New("could not load the processing resource")
|
|
}
|
|
priced := realItem.ConvertToPricedResource(dt, request)
|
|
timeFromStartS := wf.Graph.GetAverageTimeProcessingBeforeStart(0, realItem.GetID(), request)
|
|
started := start.Add(time.Duration(timeFromStartS) * time.Second)
|
|
priced.SetLocationStart(started)
|
|
priced.SetLocationEnd(started.Add(time.Duration(priced.GetExplicitDurationInS())))
|
|
processings = append(processings, realItem.(*resources.ProcessingResource))
|
|
priceds[tools.PROCESSING_RESOURCE] = append(priceds[tools.PROCESSING_RESOURCE], priced)
|
|
}
|
|
priceds[tools.DATA_RESOURCE] = []pricing.PricedItemITF{}
|
|
for _, item := range wf.GetGraphItems(wf.IsData) {
|
|
dt, realItem := item.GetResource()
|
|
if realItem == nil {
|
|
continue
|
|
}
|
|
priced := realItem.ConvertToPricedResource(dt, request)
|
|
priced.SetLocationStart(start)
|
|
priced.SetLocationEnd(*end)
|
|
priceds[tools.PROCESSING_RESOURCE] = append(priceds[tools.PROCESSING_RESOURCE], priced)
|
|
}
|
|
for _, f := range []func(graph.GraphItem) bool{wf.IsStorage, wf.IsCompute} {
|
|
for _, item := range wf.GetGraphItems(f) {
|
|
dt, r := item.GetResource()
|
|
if r == nil {
|
|
continue
|
|
}
|
|
if priceds[dt] == nil {
|
|
priceds[dt] = []pricing.PricedItemITF{}
|
|
}
|
|
priced := r.ConvertToPricedResource(dt, request)
|
|
nearestStart, longestDuration := wf.Graph.GetAverageTimeRelatedToProcessingActivity(start, processings, r,
|
|
func(i graph.GraphItem) resources.ResourceInterface {
|
|
if f(i) {
|
|
_, r := i.GetResource()
|
|
return r
|
|
} else {
|
|
return nil
|
|
}
|
|
}, request)
|
|
started := start.Add(time.Duration(nearestStart) * time.Second)
|
|
priced.SetLocationStart(started)
|
|
if longestDuration >= 0 {
|
|
priced.SetLocationEnd(started.Add(time.Duration(longestDuration)))
|
|
}
|
|
priceds[dt] = append(priceds[dt], priced)
|
|
}
|
|
}
|
|
longest := wf.getLongestTime(end, priceds, request)
|
|
priceds[tools.WORKFLOW_RESOURCE] = []pricing.PricedItemITF{}
|
|
for _, item := range wf.GetGraphItems(wf.IsWorkflow) {
|
|
access := NewAccessor(nil)
|
|
_, r := item.GetResource()
|
|
if r == nil {
|
|
return 0, priceds, nil, errors.New("could not load the workflow")
|
|
}
|
|
priced := r.ConvertToPricedResource(tools.WORKFLOW_RESOURCE, request)
|
|
res, code, err := access.LoadOne(r.GetID())
|
|
if code != 200 || err != nil {
|
|
return 0, priceds, nil, errors.New("could not load the workflow with id: " + fmt.Sprintf("%v", err.Error()))
|
|
}
|
|
neoLongest := float64(0)
|
|
innerWF := res.(*Workflow)
|
|
neoLongest, _, innerWF, err = innerWF.Planify(start, end, request)
|
|
if neoLongest > longest {
|
|
longest = neoLongest
|
|
}
|
|
started := start.Add(time.Duration(wf.getNearestStart(start, priceds, request)) * time.Second)
|
|
priced.SetLocationStart(started)
|
|
durationE := time.Duration(longest)
|
|
if durationE < 0 {
|
|
continue
|
|
}
|
|
ended := start.Add(durationE * time.Second)
|
|
priced.SetLocationEnd(ended)
|
|
priceds[tools.WORKFLOW_RESOURCE] = append(priceds[tools.WORKFLOW_RESOURCE], priced)
|
|
}
|
|
return longest, priceds, wf, nil
|
|
}
|
|
|
|
func (wf *Workflow) getNearestStart(start time.Time, priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 {
|
|
near := float64(10000000000)
|
|
for _, items := range priceds {
|
|
for _, priced := range items {
|
|
if priced.GetLocationStart() == nil {
|
|
continue
|
|
}
|
|
newS := priced.GetLocationStart()
|
|
if newS.Sub(start).Seconds() < near {
|
|
near = newS.Sub(start).Seconds()
|
|
}
|
|
}
|
|
// get the nearest start from start var
|
|
}
|
|
return near
|
|
}
|
|
|
|
func (wf *Workflow) getLongestTime(end *time.Time, priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 {
|
|
if end == nil {
|
|
return -1
|
|
}
|
|
longestTime := float64(0)
|
|
for _, priced := range priceds[tools.PROCESSING_RESOURCE] {
|
|
if priced.GetLocationEnd() == nil {
|
|
continue
|
|
}
|
|
newS := priced.GetLocationEnd()
|
|
if longestTime < newS.Sub(*end).Seconds() {
|
|
longestTime = newS.Sub(*end).Seconds()
|
|
}
|
|
// get the nearest start from start var
|
|
}
|
|
return longestTime
|
|
}
|