add event base intelligency

This commit is contained in:
mr
2026-01-13 16:04:31 +01:00
parent c35b06e0bc
commit 6d745fe922
46 changed files with 859 additions and 455 deletions

View File

@@ -9,6 +9,7 @@ import (
"strings"
"time"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
"cloud.o-forge.io/core/oc-lib/models/common"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
@@ -20,6 +21,16 @@ import (
"github.com/google/uuid"
)
type ConfigItem map[string]int
func (c ConfigItem) Get(key string) *int {
i := 0
if ins, ok := c[key]; ok {
i = ins
}
return &i
}
/*
* Workflow is a struct that represents a workflow
* it defines the native workflow
@@ -40,6 +51,11 @@ func (d *Workflow) GetAccessor(request *tools.APIRequest) utils.Accessor {
func (d *Workflow) GetResources(dt tools.DataType) []resources.ResourceInterface {
itf := []resources.ResourceInterface{}
switch dt {
case tools.NATIVE_TOOL:
for _, d := range d.NativeTools {
itf = append(itf, d)
}
return itf
case tools.DATA_RESOURCE:
for _, d := range d.DataResources {
itf = append(itf, d)
@@ -134,19 +150,25 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A
graphVarName[varName] = graphItem
continue
} else if strings.Contains(line, n+"-->") {
err := d.extractLink(line, graphVarName, "-->")
err := d.extractLink(line, graphVarName, "-->", false)
if err != nil {
fmt.Println(err)
continue
}
} else if strings.Contains(line, n+"<--") {
err := d.extractLink(line, graphVarName, "<--", true)
if err != nil {
fmt.Println(err)
continue
}
} else if strings.Contains(line, n+"--") {
err := d.extractLink(line, graphVarName, "--")
err := d.extractLink(line, graphVarName, "--", false)
if err != nil {
fmt.Println(err)
continue
}
} else if strings.Contains(line, n+"-") {
err := d.extractLink(line, graphVarName, "-")
err := d.extractLink(line, graphVarName, "-", false)
if err != nil {
fmt.Println(err)
continue
@@ -176,7 +198,7 @@ func (d *Workflow) generateResource(datas []resources.ResourceInterface, request
return nil
}
func (d *Workflow) extractLink(line string, graphVarName map[string]*graph.GraphItem, pattern string) error {
func (d *Workflow) extractLink(line string, graphVarName map[string]*graph.GraphItem, pattern string, reverse bool) error {
splitted := strings.Split(line, pattern)
if len(splitted) < 2 {
return errors.New("links elements not found")
@@ -199,6 +221,11 @@ func (d *Workflow) extractLink(line string, graphVarName map[string]*graph.Graph
Y: 0,
},
}
if reverse {
tmp := link.Destination
link.Destination = link.Source
link.Source = tmp
}
splittedComments := strings.Split(line, "'")
if len(splittedComments) <= 1 {
return errors.New("Can't deserialize Object, there's no commentary")
@@ -256,10 +283,19 @@ func (d *Workflow) getNewGraphItem(dataName string, graphItem *graph.GraphItem,
d.Datas = append(d.Datas, resource.GetID())
d.DataResources = append(d.DataResources, resource.(*resources.DataResource))
graphItem.Data = resource.(*resources.DataResource)
case "Processing", "Event":
case "Processing":
d.Processings = append(d.Processings, resource.GetID())
d.ProcessingResources = append(d.ProcessingResources, resource.(*resources.ProcessingResource))
graphItem.Processing = resource.(*resources.ProcessingResource)
case "Event":
access := resources.NewAccessor[*resources.NativeTool](tools.NATIVE_TOOL, &tools.APIRequest{
Admin: true,
}, func() utils.DBObject { return &resources.NativeTool{} })
t, _, err := access.Search(nil, "WORKFLOW_EVENT", false)
if err == nil && len(t) > 0 {
d.NativeTool = append(d.NativeTool, t[0].GetID())
graphItem.NativeTool = t[0].(*resources.NativeTool)
}
case "Storage":
d.Storages = append(d.Storages, resource.GetID())
d.StorageResources = append(d.StorageResources, resource.(*resources.StorageResource))
@@ -312,6 +348,12 @@ func (w *Workflow) IsDependancy(id string) []Deps {
return dependancyOfIDs
}
func (w *Workflow) GetFirstItems() []graph.GraphItem {
return w.GetGraphItems(func(item graph.GraphItem) bool {
return len(w.GetDependencies(w.GetID())) == 0
})
}
func (w *Workflow) GetDependencies(id string) (dependencies []Deps) {
for _, link := range w.Graph.Links {
if _, ok := w.Graph.Items[link.Source.ID]; !ok {
@@ -336,16 +378,26 @@ func (w *Workflow) GetGraphItems(f func(item graph.GraphItem) bool) (list_datas
}
func (w *Workflow) GetPricedItem(
f func(item graph.GraphItem) bool, request *tools.APIRequest, buyingStrategy int, pricingStrategy int) map[string]pricing.PricedItemITF {
f func(item graph.GraphItem) bool, request *tools.APIRequest,
instance int,
partnership int,
buying int,
strategy int,
bookingMode int,
buyingStrategy int,
pricingStrategy int) (map[string]pricing.PricedItemITF, error) {
list_datas := map[string]pricing.PricedItemITF{}
for _, item := range w.Graph.Items {
if f(item) {
dt, res := item.GetResource()
ord := res.ConvertToPricedResource(dt, request)
ord, err := res.ConvertToPricedResource(dt, &instance, &partnership, &buying, &strategy, &bookingMode, request)
if err != nil {
return list_datas, err
}
list_datas[res.GetID()] = ord
}
}
return list_datas
return list_datas, nil
}
type Related struct {
@@ -378,7 +430,7 @@ func (w *Workflow) GetByRelatedProcessing(processingID string, g func(item graph
return related
}
func (ao *Workflow) VerifyAuth(request *tools.APIRequest) bool {
func (ao *Workflow) VerifyAuth(callName string, request *tools.APIRequest) bool {
isAuthorized := false
if len(ao.Shared) > 0 {
for _, shared := range ao.Shared {
@@ -386,11 +438,11 @@ func (ao *Workflow) VerifyAuth(request *tools.APIRequest) bool {
if code != 200 || shared == nil {
isAuthorized = false
} else {
isAuthorized = shared.VerifyAuth(request)
isAuthorized = shared.VerifyAuth(callName, request)
}
}
}
return ao.AbstractObject.VerifyAuth(request) || isAuthorized
return ao.AbstractObject.VerifyAuth(callName, request) || isAuthorized
}
/*
@@ -422,65 +474,106 @@ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) {
return true, nil
}
func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIRequest) (float64, map[tools.DataType]map[string]pricing.PricedItemITF, *Workflow, error) {
func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigItem, partnerships ConfigItem, buyings ConfigItem, strategies ConfigItem, bookingMode int, request *tools.APIRequest) (bool, float64, map[tools.DataType]map[string]pricing.PricedItemITF, *Workflow, error) {
priceds := map[tools.DataType]map[string]pricing.PricedItemITF{}
ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, wf, priceds, request, wf.Graph.IsProcessing,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
return start.Add(time.Duration(wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(), request)) * time.Second), priced.GetExplicitDurationInS()
}, func(started time.Time, duration float64) *time.Time {
ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, wf.Graph.IsProcessing,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
d, err := wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(),
*instances.Get(res.GetID()), *partnerships.Get(res.GetID()), *buyings.Get(res.GetID()), *strategies.Get(res.GetID()),
bookingMode, request)
if err != nil {
return start, 0, err
}
return start.Add(time.Duration(d) * time.Second), priced.GetExplicitDurationInS(), nil
}, func(started time.Time, duration float64) (*time.Time, error) {
s := started.Add(time.Duration(duration))
return &s
return &s, nil
})
if err != nil {
return 0, priceds, nil, err
return false, 0, priceds, nil, err
}
if _, priceds, err = plan[resources.ResourceInterface](tools.DATA_RESOURCE, wf, priceds, request,
wf.Graph.IsData, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
return start, 0
}, func(started time.Time, duration float64) *time.Time {
return end
if _, priceds, err = plan[resources.ResourceInterface](tools.NATIVE_TOOL, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request,
wf.Graph.IsNativeTool, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
return start, 0, nil
}, func(started time.Time, duration float64) (*time.Time, error) {
return end, nil
}); err != nil {
return 0, priceds, nil, err
return false, 0, priceds, nil, err
}
for k, f := range map[tools.DataType]func(graph.GraphItem) bool{tools.STORAGE_RESOURCE: wf.Graph.IsStorage, tools.COMPUTE_RESOURCE: wf.Graph.IsCompute} {
if _, priceds, err = plan[resources.ResourceInterface](k, wf, priceds, request,
f, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
nearestStart, longestDuration := wf.Graph.GetAverageTimeRelatedToProcessingActivity(start, ps, res, func(i graph.GraphItem) (r resources.ResourceInterface) {
if _, priceds, err = plan[resources.ResourceInterface](tools.DATA_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request,
wf.Graph.IsData, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
return start, 0, nil
}, func(started time.Time, duration float64) (*time.Time, error) {
return end, nil
}); err != nil {
return false, 0, priceds, nil, err
}
for k, f := range map[tools.DataType]func(graph.GraphItem) bool{tools.STORAGE_RESOURCE: wf.Graph.IsStorage,
tools.COMPUTE_RESOURCE: wf.Graph.IsCompute} {
if _, priceds, err = plan[resources.ResourceInterface](k, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request,
f, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
nearestStart, longestDuration, err := wf.Graph.GetAverageTimeRelatedToProcessingActivity(start, ps, res, func(i graph.GraphItem) (r resources.ResourceInterface) {
if f(i) {
_, r = i.GetResource()
}
return r
}, request)
return start.Add(time.Duration(nearestStart) * time.Second), longestDuration
}, func(started time.Time, duration float64) *time.Time {
}, *instances.Get(res.GetID()), *partnerships.Get(res.GetID()),
*buyings.Get(res.GetID()), *strategies.Get(res.GetID()), bookingMode, request)
if err != nil {
return start, longestDuration, err
}
return start.Add(time.Duration(nearestStart) * time.Second), longestDuration, nil
}, func(started time.Time, duration float64) (*time.Time, error) {
s := started.Add(time.Duration(duration))
return &s
return &s, nil
}); err != nil {
return 0, priceds, nil, err
return false, 0, priceds, nil, err
}
}
longest := common.GetPlannerLongestTime(end, priceds, request)
if _, priceds, err = plan[resources.ResourceInterface](tools.WORKFLOW_RESOURCE, wf, priceds, request, wf.Graph.IsWorkflow,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
if _, priceds, err = plan[resources.ResourceInterface](tools.WORKFLOW_RESOURCE, instances, partnerships, buyings, strategies,
bookingMode, wf, priceds, request, wf.Graph.IsWorkflow,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
start := start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds, request)) * time.Second)
longest := float64(-1)
r, code, err := res.GetAccessor(request).LoadOne(res.GetID())
if code != 200 || err != nil {
return start, longest
return start, longest, err
}
if neoLongest, _, _, err := r.(*Workflow).Planify(start, end, request); err != nil {
return start, longest
_, neoLongest, priceds2, _, err := r.(*Workflow).Planify(start, end, instances, partnerships, buyings, strategies, bookingMode, request)
// should ... import priced
if err != nil {
return start, longest, err
} else if neoLongest > longest {
longest = neoLongest
}
return start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds, request)) * time.Second), longest
}, func(start time.Time, longest float64) *time.Time {
for k, v := range priceds2 {
if priceds[k] == nil {
priceds[k] = map[string]pricing.PricedItemITF{}
}
for k2, v2 := range v {
if priceds[k][k2] != nil {
v2.AddQuantity(priceds[k][k2].GetQuantity())
}
}
}
return start.Add(time.Duration(common.GetPlannerNearestStart(start, priceds, request)) * time.Second), longest, nil
}, func(start time.Time, longest float64) (*time.Time, error) {
s := start.Add(time.Duration(longest) * time.Second)
return &s
return &s, nil
}); err != nil {
return 0, priceds, nil, err
return false, 0, priceds, nil, err
}
return longest, priceds, wf, nil
isPreemptible := true
for _, first := range wf.GetFirstItems() {
_, res := first.GetResource()
if res.GetBookingModes()[booking.PREEMPTED] == nil {
isPreemptible = false
break
}
}
return isPreemptible, longest, priceds, wf, nil
}
// Returns a map of DataType (processing,computing,data,storage,worfklow) where each resource (identified by its UUID)
@@ -511,8 +604,10 @@ func (w *Workflow) GetItemsByResources() map[tools.DataType]map[string][]string
}
func plan[T resources.ResourceInterface](
dt tools.DataType, wf *Workflow, priceds map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest,
f func(graph.GraphItem) bool, start func(resources.ResourceInterface, pricing.PricedItemITF) (time.Time, float64), end func(time.Time, float64) *time.Time) ([]T, map[tools.DataType]map[string]pricing.PricedItemITF, error) {
dt tools.DataType, instances ConfigItem, partnerships ConfigItem, buyings ConfigItem, strategies ConfigItem, bookingMode int, wf *Workflow, priceds map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest,
f func(graph.GraphItem) bool,
start func(resources.ResourceInterface, pricing.PricedItemITF) (time.Time, float64, error),
end func(time.Time, float64) (*time.Time, error)) ([]T, map[tools.DataType]map[string]pricing.PricedItemITF, error) {
resources := []T{}
for _, item := range wf.GetGraphItems(f) {
if priceds[dt] == nil {
@@ -522,23 +617,34 @@ func plan[T resources.ResourceInterface](
if realItem == nil {
return resources, priceds, errors.New("could not load the processing resource")
}
priced := realItem.ConvertToPricedResource(dt, request)
priced, err := realItem.ConvertToPricedResource(dt, instances.Get(realItem.GetID()),
partnerships.Get(realItem.GetID()), buyings.Get(realItem.GetID()), strategies.Get(realItem.GetID()), &bookingMode, request)
if err != nil {
return resources, priceds, err
}
// Should be commented once the Pricing selection feature has been implemented, related to the commit d35ad440fa77763ec7f49ab34a85e47e75581b61
// if priced.SelectPricing() == nil {
// return resources, priceds, errors.New("no pricings are selected... can't proceed")
// }
started, duration := start(realItem, priced)
started, duration, err := start(realItem, priced)
if err != nil {
return resources, priceds, err
}
priced.SetLocationStart(started)
if duration >= 0 {
if e := end(started, duration); e != nil {
if e, err := end(started, duration); err == nil && e != nil {
priced.SetLocationEnd(*e)
}
}
if e := end(started, priced.GetExplicitDurationInS()); e != nil {
if e, err := end(started, priced.GetExplicitDurationInS()); err != nil && e != nil {
priced.SetLocationEnd(*e)
}
resources = append(resources, realItem.(T))
if priceds[dt][item.ID] != nil {
priced.AddQuantity(priceds[dt][item.ID].GetQuantity())
}
priceds[dt][item.ID] = priced
}
return resources, priceds, nil
}