From 21a7ff90104aec8fb92fc3f6c5d40f14cc27632e Mon Sep 17 00:00:00 2001 From: mr Date: Mon, 13 Jan 2025 11:24:07 +0100 Subject: [PATCH] Order Flow Payment Draft --- doc/order_model.puml | 8 +- entrypoint.go | 7 +- models/common/pricing/interfaces.go | 7 +- models/common/pricing/pricing_profile.go | 4 +- models/order/order.go | 20 +- models/resources/compute.go | 23 ++- models/resources/data.go | 23 ++- models/resources/interfaces.go | 11 +- models/resources/models.go | 40 +--- models/resources/priced_resource.go | 90 +++++++++ .../priced_resource/priced_resource.go | 6 - models/resources/processing.go | 10 +- models/resources/resource.go | 184 +++++------------- models/resources/storage.go | 23 ++- models/resources/workflow.go | 98 ++-------- models/utils/abstracts.go | 4 + models/utils/interfaces.go | 1 + models/workflow/graph/graph.go | 90 ++++----- models/workflow/workflow.go | 180 +++++++++++++---- .../workflow_history_mongo_accessor.go | 2 +- .../workflow_execution/workflow_execution.go | 26 ++- .../workflow_execution/workflow_scheduler.go | 69 +------ 22 files changed, 436 insertions(+), 490 deletions(-) create mode 100644 models/resources/priced_resource.go delete mode 100644 models/resources/priced_resource/priced_resource.go diff --git a/doc/order_model.puml b/doc/order_model.puml index 98211b7..22d9491 100644 --- a/doc/order_model.puml +++ b/doc/order_model.puml @@ -27,8 +27,8 @@ class AbstractResource { VerifyAuth(request) bool } -AbstractResource "1 " --* "many " InstanceITF -AbstractCustomizedResource "1 " --* "1 " InstanceITF +AbstractResource "1 " --* "many " ResourceInstanceITF +AbstractCustomizedResource "1 " --* "1 " ResourceInstanceITF AbstractResource ^-- ComputeResource AbstractResource ^-- DataResource @@ -114,14 +114,14 @@ class CustomizedStorageResource { } class CustomizedWorkflowResource {} -interface InstanceITF { +interface ResourceInstanceITF { GetID() string VerifyPartnership() bool // eval if there is one partnership per peer groups in every instance GetPeerGroups() []ResourcePartnerITF, []map[string][]string ClearPeerGroups() } -InstanceITF -- ResourceInstance +ResourceInstanceITF -- ResourceInstance ResourceInstance ^-- ComputeResourceInstance ResourceInstance ^-- StorageResourceInstance ResourceInstance "many " --* "1 " ResourcePartnerITF diff --git a/entrypoint.go b/entrypoint.go index 7b5abe7..8830f01 100644 --- a/entrypoint.go +++ b/entrypoint.go @@ -303,7 +303,12 @@ func (r *Request) Schedule(wfID string, start string, end string, durationInS fl } func (r *Request) CheckBooking(wfID string, start string, end string, durationInS float64, cron string) bool { - ok, _, _, err := workflow_execution.NewScheduler(start, end, durationInS, cron).CheckBooking(wfID, r.caller) + ok, _, _, err := workflow_execution.NewScheduler(start, end, durationInS, cron).CheckBooking(wfID, &tools.APIRequest{ + Caller: r.caller, + Username: r.user, + PeerID: r.peerID, + Groups: r.groups, + }) if err != nil { fmt.Println(err) return false diff --git a/models/common/pricing/interfaces.go b/models/common/pricing/interfaces.go index 539053e..a7605f9 100644 --- a/models/common/pricing/interfaces.go +++ b/models/common/pricing/interfaces.go @@ -9,9 +9,12 @@ import ( type PricedItemITF interface { GetID() string GetType() tools.DataType - IsPurchased(request *tools.APIRequest) bool + IsPurchased() bool GetCreatorID() string GetLocationStart() *time.Time + SetLocationStart(start time.Time) + SetLocationEnd(end time.Time) GetLocationEnd() *time.Time - GetPrice(request *tools.APIRequest) (float64, error) + GetExplicitDurationInS() float64 + GetPrice() (float64, error) } diff --git a/models/common/pricing/pricing_profile.go b/models/common/pricing/pricing_profile.go index bf9c20c..e103404 100644 --- a/models/common/pricing/pricing_profile.go +++ b/models/common/pricing/pricing_profile.go @@ -2,13 +2,11 @@ package pricing import ( "time" - - "cloud.o-forge.io/core/oc-lib/tools" ) type PricingProfileITF interface { GetID() string - GetPrice(quantity float64, val float64, start time.Time, end time.Time, request *tools.APIRequest, params ...string) (float64, error) + GetPrice(quantity float64, val float64, start time.Time, end time.Time, params ...string) (float64, error) IsPurchased() bool GetOverrideStrategyValue() int } diff --git a/models/order/order.go b/models/order/order.go index ccd67da..08e539f 100644 --- a/models/order/order.go +++ b/models/order/order.go @@ -115,10 +115,10 @@ func (o *Order) draftStoreFromModel(scheduler *workflow_execution.WorkflowSchedu // set the name of the order resourcesByPeer := map[string][]pricing.PricedItemITF{} // create a map of resources by peer - processings := scheduler.Workflow.GetPricedItem(scheduler.Workflow.IsProcessing) // get the processing items - datas := scheduler.Workflow.GetPricedItem(scheduler.Workflow.IsData) // get the data items - storages := scheduler.Workflow.GetPricedItem(scheduler.Workflow.IsStorage) // get the storage items - workflows := scheduler.Workflow.GetPricedItem(scheduler.Workflow.IsWorkflow) // get the workflow items + processings := scheduler.Workflow.GetPricedItem(scheduler.Workflow.IsProcessing, request) // get the processing items + datas := scheduler.Workflow.GetPricedItem(scheduler.Workflow.IsData, request) // get the data items + storages := scheduler.Workflow.GetPricedItem(scheduler.Workflow.IsStorage, request) // get the storage items + workflows := scheduler.Workflow.GetPricedItem(scheduler.Workflow.IsWorkflow, request) // get the workflow items for _, items := range []map[string]pricing.PricedItemITF{processings, datas, storages, workflows} { for _, item := range items { if _, ok := resourcesByPeer[item.GetCreatorID()]; !ok { @@ -134,7 +134,7 @@ func (o *Order) draftStoreFromModel(scheduler *workflow_execution.WorkflowSchedu } peerOrder.GenerateID() for _, resource := range resources { - peerOrder.AddItem(resource, len(scheduler.WorkflowExecutions)) // TODO SPECIALS REF ADDITIONALS NOTES + peerOrder.AddItem(resource, len(resources)) // TODO SPECIALS REF ADDITIONALS NOTES } o.SubOrders[peerOrder.GetID()] = peerOrder } @@ -172,7 +172,11 @@ func (o *Order) draftBookOrder(scheduler *workflow_execution.WorkflowSchedule, r return draftedBookings, errors.New("no request found") } for _, exec := range scheduler.WorkflowExecutions { - bookings := exec.Book(scheduler.Workflow) + _, priceds, _, err := scheduler.Workflow.Planify(exec.ExecDate, exec.EndDate, request) + if err != nil { + return draftedBookings, errors.New("could not planify the workflow" + fmt.Sprintf("%v", err)) + } + bookings := exec.Book(priceds) for _, booking := range bookings { _, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) @@ -261,7 +265,7 @@ func (d *PeerOrder) Pay(request *tools.APIRequest, response chan *PeerOrder, wg d.Status = PAID // TO REMOVE LATER IT'S A MOCK if d.Status == PAID { for _, b := range d.Items { - if !b.Item.IsPurchased(request) { + if !b.Item.IsPurchased() { continue } accessor := purchase_resource.NewAccessor(request) @@ -322,7 +326,7 @@ func (d *PeerItemOrder) GetPrice(request *tools.APIRequest) (float64, error) { } } } - p, err := d.Item.GetPrice(request) + p, err := d.Item.GetPrice() if err != nil { return 0, err } diff --git a/models/resources/compute.go b/models/resources/compute.go index cb18839..e96bc98 100644 --- a/models/resources/compute.go +++ b/models/resources/compute.go @@ -68,7 +68,7 @@ func (p *ComputeResourcePricingProfile) GetOverrideStrategyValue() int { // NOT A PROPER QUANTITY // amountOfData is the number of CPUs, GPUs or RAM dependings on the params -func (p *ComputeResourcePricingProfile) GetPrice(amountOfData float64, explicitDuration float64, start time.Time, end time.Time, request *tools.APIRequest, params ...string) (float64, error) { +func (p *ComputeResourcePricingProfile) GetPrice(amountOfData float64, explicitDuration float64, start time.Time, end time.Time, params ...string) (float64, error) { if len(params) < 1 { return 0, errors.New("params must be set") } @@ -108,38 +108,37 @@ func (p *ComputeResourcePricingProfile) GetPrice(amountOfData float64, explicitD return pp, nil } -type CustomizedComputeResource struct { - AbstractCustomizedResource[*ComputeResourceInstance] +type PricedComputeResource struct { + PricedResource CPUsLocated map[string]float64 `json:"cpus_in_use" bson:"cpus_in_use"` // CPUsInUse is the list of CPUs in use GPUsLocated map[string]float64 `json:"gpus_in_use" bson:"gpus_in_use"` // GPUsInUse is the list of GPUs in use RAMLocated float64 `json:"ram_in_use" bson:"ram_in_use"` // RAMInUse is the RAM in use } -func (r *CustomizedComputeResource) GetType() tools.DataType { +func (r *PricedComputeResource) GetType() tools.DataType { return tools.COMPUTE_RESOURCE } -func (r *CustomizedComputeResource) GetPrice(request *tools.APIRequest) (float64, error) { +func (r *PricedComputeResource) GetPrice() (float64, error) { if r.UsageStart == nil || r.UsageEnd == nil { return 0, errors.New("Usage start and end must be set") } - partner := r.GetPartnership(request) - if partner != nil && partner.GetPricing(r.SelectedPricing) != nil { - return 0, errors.New("Pricing strategy not found") + if r.SelectedPricing == nil { + return 0, errors.New("Selected pricing must be set") } - pricing := partner.GetPricing(r.SelectedPricing) + pricing := *r.SelectedPricing price := float64(0) for _, l := range []map[string]float64{r.CPUsLocated, r.GPUsLocated} { for model, amountOfData := range l { - cpus, err := pricing.GetPrice(float64(amountOfData), r.ExplicitBookingDurationS, *r.UsageStart, *r.UsageEnd, request, "cpus", model) + cpus, err := pricing.GetPrice(float64(amountOfData), r.ExplicitBookingDurationS, *r.UsageStart, *r.UsageEnd, "cpus", model) if err != nil { return 0, err } price += cpus } } - ram, err := pricing.GetPrice(r.RAMLocated, r.ExplicitBookingDurationS, *r.UsageStart, *r.UsageEnd, request, "ram") + ram, err := pricing.GetPrice(r.RAMLocated, r.ExplicitBookingDurationS, *r.UsageStart, *r.UsageEnd, "ram") if err != nil { return 0, err } @@ -151,7 +150,7 @@ func (r *CustomizedComputeResource) GetPrice(request *tools.APIRequest) (float64 * FillWithDefaultProcessingUsage fills the order item with the default processing usage * it depends on the processing usage only if nothing is set, during order */ -func (i *CustomizedComputeResource) FillWithDefaultProcessingUsage(usage *ProcessingUsage) { +func (i *PricedComputeResource) FillWithDefaultProcessingUsage(usage *ProcessingUsage) { for _, cpu := range usage.CPUs { if _, ok := i.CPUsLocated[cpu.Model]; !ok { i.CPUsLocated[cpu.Model] = 0 diff --git a/models/resources/data.go b/models/resources/data.go index 8959600..88bdb90 100644 --- a/models/resources/data.go +++ b/models/resources/data.go @@ -93,7 +93,7 @@ func (p *DataResourcePricingProfile) GetOverrideStrategyValue() int { return p.Pricing.OverrideStrategy.GetStrategyValue() } -func (p *DataResourcePricingProfile) GetPrice(amountOfData float64, explicitDuration float64, start time.Time, end time.Time, request *tools.APIRequest, params ...string) (float64, error) { +func (p *DataResourcePricingProfile) GetPrice(amountOfData float64, explicitDuration float64, start time.Time, end time.Time, params ...string) (float64, error) { return p.Pricing.GetPrice(amountOfData, explicitDuration, start, &end) } @@ -101,31 +101,30 @@ func (p *DataResourcePricingProfile) IsPurchased() bool { return p.Pricing.BuyingStrategy != pricing.PAY_PER_USE } -type CustomizedDataResource struct { - AbstractCustomizedResource[*ResourceInstance[*DataResourcePartnership]] - StorageGB float64 `json:"storage_gb,omitempty" bson:"storage_gb,omitempty"` +type PricedDataResource struct { + PricedResource + UsageStorageGB float64 `json:"storage_gb,omitempty" bson:"storage_gb,omitempty"` } -func (r *CustomizedDataResource) GetType() tools.DataType { +func (r *PricedDataResource) GetType() tools.DataType { return tools.DATA_RESOURCE } -func (r *CustomizedDataResource) GetPrice(request *tools.APIRequest) (float64, error) { +func (r *PricedDataResource) GetPrice() (float64, error) { if r.UsageStart == nil || r.UsageEnd == nil { return 0, errors.New("Usage start and end must be set") } - partner := r.GetPartnership(request) - if partner != nil && partner.GetPricing(r.SelectedPricing) != nil { - return 0, errors.New("Pricing strategy not found") + if r.SelectedPricing == nil { + return 0, errors.New("Selected pricing must be set") } - pricing := partner.GetPricing(r.SelectedPricing) + pricing := *r.SelectedPricing var err error amountOfData := float64(1) if pricing.GetOverrideStrategyValue() >= 0 { - amountOfData, err = ToDataResourcePricingStrategy(pricing.GetOverrideStrategyValue()).GetQuantity(r.StorageGB) + amountOfData, err = ToDataResourcePricingStrategy(pricing.GetOverrideStrategyValue()).GetQuantity(r.UsageStorageGB) if err != nil { return 0, err } } - return pricing.GetPrice(amountOfData, r.ExplicitBookingDurationS, *r.UsageStart, *r.UsageEnd, request) + return pricing.GetPrice(amountOfData, r.ExplicitBookingDurationS, *r.UsageStart, *r.UsageEnd) } diff --git a/models/resources/interfaces.go b/models/resources/interfaces.go index 035e2fd..b8c8a24 100644 --- a/models/resources/interfaces.go +++ b/models/resources/interfaces.go @@ -26,22 +26,21 @@ type ShallowResourceInterface interface { type ResourceInterface interface { utils.DBObject Trim() - GetCreatorID() string - VerifyPartnerships() bool - GetPartnership(request *tools.APIRequest) ResourcePartnerITF + ConvertToPricedResource(t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF SetAllowedInstances(request *tools.APIRequest) SetResourceModel(model *resource_model.ResourceModel) } -type InstanceITF interface { +type ResourceInstanceITF interface { GetID() string - VerifyPartnerships() bool + GetName() string + GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string) ClearPeerGroups() } type ResourcePartnerITF interface { - GetPricing(id string) pricing.PricingProfileITF + GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF GetPeerGroups() map[string][]string ClearPeerGroups() } diff --git a/models/resources/models.go b/models/resources/models.go index 47d9bc3..9d0b670 100644 --- a/models/resources/models.go +++ b/models/resources/models.go @@ -1,20 +1,10 @@ package resources import ( - "time" - "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" ) -type ExploitedResourceSet struct { - DataResources []*CustomizedDataResource `bson:"-" json:"data_resources,omitempty"` - StorageResources []*CustomizedStorageResource `bson:"-" json:"storage_resources,omitempty"` - ProcessingResources []*CustomizedProcessingResource `bson:"-" json:"processing_resources,omitempty"` - ComputeResources []*CustomizedComputeResource `bson:"-" json:"compute_resources,omitempty"` - WorkflowResources []*CustomizedWorkflowResource `bson:"-" json:"workflow_resources,omitempty"` -} - type ResourceSet struct { Datas []string `bson:"datas,omitempty" json:"datas,omitempty"` Storages []string `bson:"storages,omitempty" json:"storages,omitempty"` @@ -65,28 +55,10 @@ func (r *ResourceSet) Fill(request *tools.APIRequest) { } } -type ItemExploitedResource struct { - Data *CustomizedDataResource `bson:"data,omitempty" json:"data,omitempty"` - Processing *CustomizedProcessingResource `bson:"processing,omitempty" json:"processing,omitempty"` - Storage *CustomizedStorageResource `bson:"storage,omitempty" json:"storage,omitempty"` - Compute *CustomizedComputeResource `bson:"compute,omitempty" json:"compute,omitempty"` - Workflow *CustomizedWorkflowResource `bson:"workflow,omitempty" json:"workflow,omitempty"` -} - -func (w *ItemExploitedResource) SetItemEndUsage(end time.Time) { - for _, item := range []ShallowResourceInterface{w.Data, w.Processing, w.Storage, w.Compute, w.Workflow} { - if item != nil { - item.SetItemEndUsage(end) - } - - } -} - -func (w *ItemExploitedResource) SetItemStartUsage(start time.Time) { - for _, item := range []ShallowResourceInterface{w.Data, w.Processing, w.Storage, w.Compute, w.Workflow} { - if item != nil { - item.SetItemStartUsage(start) - } - - } +type ItemResource struct { + Data *DataResource `bson:"data,omitempty" json:"data,omitempty"` + Processing *ProcessingResource `bson:"processing,omitempty" json:"processing,omitempty"` + Storage *StorageResource `bson:"storage,omitempty" json:"storage,omitempty"` + Compute *ComputeResource `bson:"compute,omitempty" json:"compute,omitempty"` + Workflow *WorkflowResource `bson:"workflow,omitempty" json:"workflow,omitempty"` } diff --git a/models/resources/priced_resource.go b/models/resources/priced_resource.go new file mode 100644 index 0000000..b45ad8c --- /dev/null +++ b/models/resources/priced_resource.go @@ -0,0 +1,90 @@ +package resources + +import ( + "errors" + "time" + + "cloud.o-forge.io/core/oc-lib/models/common/pricing" + "cloud.o-forge.io/core/oc-lib/tools" +) + +type PricedResource struct { + Name string `json:"name,omitempty" bson:"name,omitempty"` + Logo string `json:"logo,omitempty" bson:"logo,omitempty"` + InstancesRefs map[string]string `json:"instances_refs,omitempty" bson:"instances_refs,omitempty"` + PricingProfiles map[string][]pricing.PricingProfileITF `json:"pricing_profiles,omitempty" bson:"pricing_profiles,omitempty"` + SelectedPricing *pricing.PricingProfileITF `json:"selected_pricing,omitempty" bson:"selected_pricing,omitempty"` + ExplicitBookingDurationS float64 `json:"explicit_location_duration_s,omitempty" bson:"explicit_location_duration_s,omitempty"` + UsageStart *time.Time `json:"start,omitempty" bson:"start,omitempty"` + UsageEnd *time.Time `json:"end,omitempty" bson:"end,omitempty"` + CreatorID string `json:"peer_id,omitempty" bson:"peer_id,omitempty"` + ResourceID string `json:"resource_id,omitempty" bson:"resource_id,omitempty"` + ResourceType tools.DataType `json:"resource_type,omitempty" bson:"resource_type,omitempty"` +} + +func (abs *PricedResource) GetID() string { + return abs.ResourceID +} + +func (abs *PricedResource) GetType() tools.DataType { + return abs.ResourceType +} + +func (abs *PricedResource) GetCreatorID() string { + return abs.CreatorID +} + +func (abs *PricedResource) SetStartUsage(start time.Time) { + if abs.UsageStart == nil { + abs.UsageStart = &start + } +} + +func (abs *PricedResource) SetEndUsage(end time.Time) { + if abs.UsageEnd == nil { + abs.UsageEnd = &end + } +} + +func (abs *PricedResource) IsPurchased() bool { + if abs.SelectedPricing == nil { + return false + } + return (*abs.SelectedPricing).IsPurchased() +} + +func (abs *PricedResource) GetLocationEnd() *time.Time { + return abs.UsageEnd +} + +func (abs *PricedResource) GetLocationStart() *time.Time { + return abs.UsageStart +} + +func (abs *PricedResource) SetLocationStart(start time.Time) { + abs.UsageStart = &start +} + +func (abs *PricedResource) SetLocationEnd(end time.Time) { + abs.UsageEnd = &end +} + +func (abs *PricedResource) GetExplicitDurationInS() float64 { + if abs.ExplicitBookingDurationS == 0 { + if abs.UsageEnd == nil || abs.UsageStart == nil { + return time.Duration(1 * time.Hour).Seconds() + } + return abs.UsageEnd.Sub(*abs.UsageStart).Seconds() + } + return abs.ExplicitBookingDurationS +} + +func (r *PricedResource) GetPrice() (float64, error) { + if r.UsageStart == nil || r.UsageEnd == nil { + return 0, errors.New("Usage start and end must be set") + } + if r.SelectedPricing == nil { + return 0, errors.New("Selected pricing must be set") + } + return (*r.SelectedPricing).GetPrice(1, 0, *r.UsageStart, *r.UsageEnd) +} diff --git a/models/resources/priced_resource/priced_resource.go b/models/resources/priced_resource/priced_resource.go deleted file mode 100644 index 22fefbb..0000000 --- a/models/resources/priced_resource/priced_resource.go +++ /dev/null @@ -1,6 +0,0 @@ -package priced_resource - -type PricedResource struct { -} - -/*TODO*/ diff --git a/models/resources/processing.go b/models/resources/processing.go index c88f504..e4634ba 100644 --- a/models/resources/processing.go +++ b/models/resources/processing.go @@ -34,16 +34,16 @@ type ProcessingResource struct { Container common.Container `json:"container,omitempty" bson:"container,omitempty"` // Container is the container } -type CustomizedProcessingResource struct { - AbstractCustomizedResource[*ResourceInstance[*ResourcePartnerShip[*ProcessingResourcePricingProfile]]] +type PricedProcessingResource struct { + PricedResource IsService bool } -func (r *CustomizedProcessingResource) GetType() tools.DataType { +func (r *PricedProcessingResource) GetType() tools.DataType { return tools.PROCESSING_RESOURCE } -func (a *CustomizedProcessingResource) GetExplicitDurationInS() float64 { +func (a *PricedProcessingResource) GetExplicitDurationInS() float64 { if a.ExplicitBookingDurationS == 0 { if a.IsService || a.UsageStart == nil { if a.IsService { @@ -68,6 +68,6 @@ func (p *ProcessingResourcePricingProfile) IsPurchased() bool { return p.Pricing.BuyingStrategy != pricing.PAY_PER_USE } -func (p *ProcessingResourcePricingProfile) GetPrice(amountOfData float64, val float64, start time.Time, end time.Time, request *tools.APIRequest, params ...string) (float64, error) { +func (p *ProcessingResourcePricingProfile) GetPrice(amountOfData float64, val float64, start time.Time, end time.Time, params ...string) (float64, error) { return p.Pricing.GetPrice(amountOfData, val, start, &end) } diff --git a/models/resources/resource.go b/models/resources/resource.go index b10d687..dfee5e3 100644 --- a/models/resources/resource.go +++ b/models/resources/resource.go @@ -1,9 +1,7 @@ package resources import ( - "errors" "slices" - "time" "cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/models/common/pricing" @@ -24,7 +22,7 @@ import ( * AbstractResource is a struct that represents a resource * it defines the resource data */ -type abstractResource struct { +type AbsResource struct { utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) Logo string `json:"logo,omitempty" bson:"logo,omitempty" validate:"required"` // Logo is the logo of the resource Description string `json:"description,omitempty" bson:"description,omitempty"` // Description is the description of the resource @@ -34,71 +32,58 @@ type abstractResource struct { UsageRestrictions string `bson:"usage_restrictions,omitempty" json:"usage_restrictions,omitempty"` } -func (r *abstractResource) StoreDraftDefault() { +func (r *AbsResource) StoreDraftDefault() { r.IsDraft = true } - -func (r *AbstractCustomizedResource[T]) CanUpdate(set utils.DBObject) (bool, utils.DBObject) { +func (r *AbsResource) CanUpdate(set utils.DBObject) (bool, utils.DBObject) { if r.IsDraft != set.IsDrafted() && set.IsDrafted() { return true, set // only state can be updated } return r.IsDraft != set.IsDrafted() && set.IsDrafted(), set } -func (r *abstractResource) CanDelete() bool { +func (r *AbsResource) CanDelete() bool { return r.IsDraft // only draft bookings can be deleted } -func (ao *abstractResource) GetAccessor(request *tools.APIRequest) utils.Accessor { +func (ao *AbsResource) GetAccessor(request *tools.APIRequest) utils.Accessor { return nil } -func (ao *abstractResource) GetCreatorID() string { - return ao.CreatorID -} - -func (abs *abstractResource) SetResourceModel(model *resource_model.ResourceModel) { +func (abs *AbsResource) SetResourceModel(model *resource_model.ResourceModel) { abs.ResourceModel = model } -type AbstractResource[T InstanceITF] struct { - abstractResource +type AbstractResource[T ResourceInstanceITF] struct { + AbsResource Instances []T `json:"instances,omitempty" bson:"instances,omitempty"` // Bill is the bill of the resource // Bill is the bill of the resource } +func (abs *AbstractResource[T]) ConvertToPricedResource( + t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF { + instances := map[string]string{} + profiles := map[string][]pricing.PricingProfileITF{} + for _, instance := range abs.Instances { + instances[instance.GetID()] = instance.GetName() + profiles[instance.GetID()] = instance.GetPricingsProfiles(request.PeerID, request.Groups) + } + return &PricedResource{ + Name: abs.Name, + Logo: abs.Logo, + ResourceID: abs.UUID, + ResourceType: t, + InstancesRefs: instances, + PricingProfiles: profiles, + CreatorID: abs.CreatorID, + } +} + func (abs *AbstractResource[T]) SetAllowedInstances(request *tools.APIRequest) { abs.Instances = verifyAuthAction[T](abs.Instances, request) } -func (abs *AbstractResource[T]) GetPartnership(request *tools.APIRequest) ResourcePartnerITF { - for _, instance := range abs.Instances { - partners, grps := instance.GetPeerGroups() - for i, p := range grps { - if request == nil { - continue - } - if _, ok := p[request.PeerID]; ok { - return partners[i] - } - } - } - return nil -} - -func (abs *AbstractResource[T]) VerifyPartnerships() bool { - // a peer can be part of only one partnership by instance - // may we need to define partnership in a different DB - for _, instance := range abs.Instances { - if !instance.VerifyPartnerships() { - return false - } - } - return true -} - func (d *AbstractResource[T]) Trim() { if ok, _ := (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: d.CreatorID}}).IsMySelf(); !ok { - // TODO clean up the peer groups that are not in the allowed peers group for _, instance := range d.Instances { instance.ClearPeerGroups() } @@ -109,78 +94,7 @@ func (abs *AbstractResource[T]) VerifyAuth(request *tools.APIRequest) bool { return len(verifyAuthAction[T](abs.Instances, request)) > 0 || abs.AbstractObject.VerifyAuth(request) } -type AbstractCustomizedResource[T InstanceITF] struct { - abstractResource - ExplicitBookingDurationS float64 `json:"explicit_location_duration_s,omitempty" bson:"explicit_location_duration_s,omitempty"` - UsageStart *time.Time `json:"start,omitempty" bson:"start,omitempty"` - UsageEnd *time.Time `json:"end,omitempty" bson:"end,omitempty"` - SelectedInstance T `json:"selected_instance,omitempty" bson:"selected_instance,omitempty"` - SelectedPricing string `json:"selected_pricing,omitempty" bson:"selected_pricing,omitempty"` -} - -func (abs *AbstractCustomizedResource[T]) SetStartUsage(start time.Time) { - if abs.UsageStart == nil { - abs.UsageStart = &start - } -} - -func (abs *AbstractCustomizedResource[T]) SetEndUsage(end time.Time) { - if abs.UsageEnd == nil { - abs.UsageEnd = &end - } -} - -func (abs *AbstractCustomizedResource[T]) IsPurchased(request *tools.APIRequest) bool { - return abs.GetPartnership(request).GetPricing(abs.SelectedPricing).IsPurchased() -} - -func (abs *AbstractCustomizedResource[T]) GetLocationEnd() *time.Time { - return abs.UsageEnd -} - -func (abs *AbstractCustomizedResource[T]) GetLocationStart() *time.Time { - return abs.UsageStart -} - -func (abs *AbstractCustomizedResource[T]) GetExplicitDurationInS() float64 { - if abs.ExplicitBookingDurationS == 0 { - if abs.UsageEnd == nil || abs.UsageStart == nil { - return time.Duration(1 * time.Hour).Seconds() - } - return abs.UsageEnd.Sub(*abs.UsageStart).Seconds() - } - return abs.ExplicitBookingDurationS -} - -func (abs *AbstractCustomizedResource[T]) GetPricingID() string { - return abs.SelectedPricing -} - -func (r *AbstractCustomizedResource[T]) GetPrice(request *tools.APIRequest) (float64, error) { - if r.UsageStart == nil || r.UsageEnd == nil { - return 0, errors.New("Usage start and end must be set") - } - partner := r.GetPartnership(request) - if partner != nil && partner.GetPricing(r.SelectedPricing) != nil { - return 0, errors.New("Pricing strategy not found") - } - return partner.GetPricing(r.SelectedPricing).GetPrice(1, 0, *r.UsageStart, *r.UsageEnd, request) -} - -func (abs *AbstractCustomizedResource[T]) GetPartnership(request *tools.APIRequest) ResourcePartnerITF { - partners, grps := abs.SelectedInstance.GetPeerGroups() - for i, p := range grps { - if request == nil { - continue - } - if _, ok := p[request.PeerID]; ok { - return partners[i] - } - } - return nil -} - -func verifyAuthAction[T InstanceITF](baseInstance []T, request *tools.APIRequest) []T { +func verifyAuthAction[T ResourceInstanceITF](baseInstance []T, request *tools.APIRequest) []T { instances := []T{} for _, instance := range baseInstance { _, peerGroups := instance.GetPeerGroups() @@ -205,6 +119,7 @@ func verifyAuthAction[T InstanceITF](baseInstance []T, request *tools.APIRequest type ResourceInstance[T ResourcePartnerITF] struct { UUID string `json:"id,omitempty" bson:"id,omitempty"` + Name string `json:"name,omitempty" bson:"name,omitempty"` Location geopoint.GeoPoint `json:"location,omitempty" bson:"location,omitempty"` Country countries.CountryCode `json:"country,omitempty" bson:"country,omitempty"` // Url string `json:"url,omitempty" bson:"url,omitempty"` @@ -216,24 +131,16 @@ func (ri *ResourceInstance[T]) GetID() string { return ri.UUID } -func (r *ResourceInstance[T]) VerifyPartnerships() bool { - peersMultiple := map[string]int{} - for _, p := range r.Partnerships { - for k, g := range p.GetPeerGroups() { - for _, v := range g { - if _, ok := peersMultiple[k+"_"+v]; !ok { - peersMultiple[k+"_"+v] = 0 - } - peersMultiple[k+"_"+v]++ - } - } +func (ri *ResourceInstance[T]) GetName() string { + return ri.Name +} + +func (ri *ResourceInstance[T]) GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF { + pricings := []pricing.PricingProfileITF{} + for _, p := range ri.Partnerships { + pricings = append(pricings, p.GetPricingsProfiles(peerID, groups)...) } - for _, p := range peersMultiple { - if p > 1 { - return false - } - } - return true + return pricings } func (ri *ResourceInstance[T]) GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string) { @@ -258,8 +165,19 @@ type ResourcePartnerShip[T pricing.PricingProfileITF] struct { PricingProfiles map[string]T `json:"pricing,omitempty" bson:"pricing,omitempty"` } -func (rp *ResourcePartnerShip[T]) GetPricing(id string) pricing.PricingProfileITF { - return rp.PricingProfiles[id] +func (ri *ResourcePartnerShip[T]) GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF { + if ri.PeerGroups[peerID] != nil { + for _, p := range ri.PeerGroups[peerID] { + if slices.Contains(groups, p) { + profiles := []pricing.PricingProfileITF{} + for _, ri := range ri.PricingProfiles { + profiles = append(profiles, ri) + } + return profiles + } + } + } + return []pricing.PricingProfileITF{} } func (rp *ResourcePartnerShip[T]) GetPeerGroups() map[string][]string { diff --git a/models/resources/storage.go b/models/resources/storage.go index 6b629f6..bb96f55 100644 --- a/models/resources/storage.go +++ b/models/resources/storage.go @@ -103,35 +103,34 @@ func (p *StorageResourcePricingProfile) IsPurchased() bool { return p.Pricing.BuyingStrategy != pricing.PAY_PER_USE } -func (p *StorageResourcePricingProfile) GetPrice(amountOfData float64, val float64, start time.Time, end time.Time, request *tools.APIRequest, params ...string) (float64, error) { +func (p *StorageResourcePricingProfile) GetPrice(amountOfData float64, val float64, start time.Time, end time.Time, params ...string) (float64, error) { return p.Pricing.GetPrice(amountOfData, val, start, &end) } -type CustomizedStorageResource struct { - AbstractCustomizedResource[*StorageResourceInstance] - StorageGB float64 `json:"storage_gb,omitempty" bson:"storage_gb,omitempty"` +type PricedStorageResource struct { + PricedResource + UsageStorageGB float64 `json:"storage_gb,omitempty" bson:"storage_gb,omitempty"` } -func (r *CustomizedStorageResource) GetType() tools.DataType { +func (r *PricedStorageResource) GetType() tools.DataType { return tools.STORAGE_RESOURCE } -func (r *CustomizedStorageResource) GetPrice(request *tools.APIRequest) (float64, error) { +func (r *PricedStorageResource) GetPrice() (float64, error) { if r.UsageStart == nil || r.UsageEnd == nil { return 0, errors.New("Usage start and end must be set") } - partner := r.GetPartnership(request) - if partner != nil && partner.GetPricing(r.SelectedPricing) != nil { - return 0, errors.New("Pricing strategy not found") + if r.SelectedPricing == nil { + return 0, errors.New("Selected pricing must be set") } - pricing := partner.GetPricing(r.SelectedPricing) + pricing := *r.SelectedPricing var err error amountOfData := float64(1) if pricing.GetOverrideStrategyValue() >= 0 { - amountOfData, err = ToStorageResourcePricingStrategy(pricing.GetOverrideStrategyValue()).GetQuantity(r.StorageGB) + amountOfData, err = ToStorageResourcePricingStrategy(pricing.GetOverrideStrategyValue()).GetQuantity(r.UsageStorageGB) if err != nil { return 0, err } } - return pricing.GetPrice(amountOfData, r.ExplicitBookingDurationS, *r.UsageStart, *r.UsageEnd, request) + return pricing.GetPrice(amountOfData, r.ExplicitBookingDurationS, *r.UsageStart, *r.UsageEnd) } diff --git a/models/resources/workflow.go b/models/resources/workflow.go index 8592e41..894e5a3 100644 --- a/models/resources/workflow.go +++ b/models/resources/workflow.go @@ -1,103 +1,41 @@ package resources import ( - "time" - - "cloud.o-forge.io/core/oc-lib/models/utils" + "cloud.o-forge.io/core/oc-lib/models/common/pricing" "cloud.o-forge.io/core/oc-lib/tools" ) -// COMPLEX SHOULD BE REFACTORED // we don't have any information about the accessor type abstractWorkflowResource struct { - ExploitedResourceSet WorkflowID string `bson:"workflow_id,omitempty" json:"workflow_id,omitempty"` // WorkflowID is the ID of the native workflow } // WorkflowResource is a struct that represents a workflow resource // it defines the resource workflow type WorkflowResource struct { - AbstractResource[*ResourceInstance[*ResourcePartnerShip[*WorkflowResourcePricingProfile]]] + AbsResource abstractWorkflowResource } -type CustomizedWorkflowResource struct { - AbstractCustomizedResource[*ResourceInstance[*ResourcePartnerShip[*WorkflowResourcePricingProfile]]] - abstractWorkflowResource +func (w *WorkflowResource) Trim() { + /*EMPTY AND PROUD TO BE*/ } -func (r *CustomizedWorkflowResource) GetType() tools.DataType { - return tools.WORKFLOW_RESOURCE +func (w *WorkflowResource) SetAllowedInstances(request *tools.APIRequest) { + /*EMPTY AND PROUD TO BE*/ } -func (d *WorkflowResource) GetAccessor(request *tools.APIRequest) utils.Accessor { - return NewAccessor[*WorkflowResource](tools.WORKFLOW_RESOURCE, request, func() utils.DBObject { return &WorkflowResource{} }) // Create a new instance of the accessor -} - -type WorkflowResourcePricingProfile struct { - ID string `json:"id,omitempty" bson:"id,omitempty"` - ExploitedResourceSet -} - -func (p *WorkflowResourcePricingProfile) GetOverrideStrategyValue() int { - return -1 -} - -func (p *WorkflowResourcePricingProfile) GetID() string { - return p.ID -} - -func (p *WorkflowResourcePricingProfile) IsPurchased() bool { - return false -} - -/* -* Missing wich Instance is selected -* - */ - -func (p *WorkflowResourcePricingProfile) GetPrice(amountOfData float64, val float64, start time.Time, end time.Time, request *tools.APIRequest, params ...string) (float64, error) { - // load workflow - price := float64(0) - pp, err := getPrice[*CustomizedDataResource](p.DataResources, amountOfData, val, start, end, request, params...) - if err != nil { - return 0, err +func (w *WorkflowResource) ConvertToPricedResource( + t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF { + instances := map[string]string{} + profiles := map[string][]pricing.PricingProfileITF{} + return &PricedResource{ + Name: w.Name, + Logo: w.Logo, + ResourceID: w.UUID, + ResourceType: t, + InstancesRefs: instances, + PricingProfiles: profiles, + CreatorID: w.CreatorID, } - price += pp - pp, err = getPrice[*CustomizedStorageResource](p.StorageResources, amountOfData, val, start, end, request, params...) - if err != nil { - return 0, err - } - price += pp - pp, err = getPrice[*CustomizedProcessingResource](p.ProcessingResources, amountOfData, val, start, end, request, params...) - if err != nil { - return 0, err - } - price += pp - pp, err = getPrice[*CustomizedComputeResource](p.ComputeResources, amountOfData, val, start, end, request, params...) - if err != nil { - return 0, err - } - price += pp - pp, err = getPrice[*CustomizedWorkflowResource](p.WorkflowResources, amountOfData, val, start, end, request, params...) - if err != nil { - return 0, err - } - price += pp - return price, nil -} - -func getPrice[T ShallowResourceInterface](arr []T, amountOfData float64, val float64, start time.Time, end time.Time, request *tools.APIRequest, params ...string) (float64, error) { - // load workflow - price := float64(0) - for _, data := range arr { - partner := data.GetPartnership(request) - pricing := partner.GetPricing(data.GetPricingID()) - pp, err := pricing.GetPrice(amountOfData, val, start, end, request) - if err != nil { - return 0, err - } - price += pp - } - return price, nil } diff --git a/models/utils/abstracts.go b/models/utils/abstracts.go index c57baee..5f1c50f 100644 --- a/models/utils/abstracts.go +++ b/models/utils/abstracts.go @@ -70,6 +70,10 @@ func (ao AbstractObject) GetName() string { return ao.Name } +func (ao *AbstractObject) GetCreatorID() string { + return ao.CreatorID +} + func (ao *AbstractObject) UpToDate(user string, create bool) { ao.UpdateDate = time.Now() ao.UpdaterID = user diff --git a/models/utils/interfaces.go b/models/utils/interfaces.go index b82de6d..8cc6c1b 100644 --- a/models/utils/interfaces.go +++ b/models/utils/interfaces.go @@ -20,6 +20,7 @@ type DBObject interface { GenerateID() GetID() string GetName() string + GetCreatorID() string IsDrafted() bool StoreDraftDefault() CanUpdate(set DBObject) (bool, DBObject) diff --git a/models/workflow/graph/graph.go b/models/workflow/graph/graph.go index 9419480..3ad7bf7 100644 --- a/models/workflow/graph/graph.go +++ b/models/workflow/graph/graph.go @@ -3,7 +3,6 @@ package graph import ( "time" - "cloud.o-forge.io/core/oc-lib/models/common/pricing" "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/tools" ) @@ -15,7 +14,8 @@ type Graph struct { Links []GraphLink `bson:"links" json:"links" default:"{}" validate:"required"` // Links is the list of links between elements in the graph } -func (g *Graph) GetAverageTimeRelatedToProcessingActivity(start time.Time, processings []*resources.CustomizedProcessingResource, resource resources.ShallowResourceInterface, f func(GraphItem) resources.ShallowResourceInterface) (float64, float64) { +func (g *Graph) GetAverageTimeRelatedToProcessingActivity(start time.Time, processings []*resources.ProcessingResource, resource resources.ResourceInterface, + f func(GraphItem) resources.ResourceInterface, request *tools.APIRequest) (float64, float64) { nearestStart := float64(10000000000) oneIsInfinite := false longestDuration := float64(0) @@ -27,16 +27,17 @@ func (g *Graph) GetAverageTimeRelatedToProcessingActivity(start time.Time, proce } else if link.Source.ID == processing.GetID() && f(g.Items[link.Source.ID]) != nil && f(g.Items[link.Source.ID]).GetID() == resource.GetID() { // if the source is the processing and the destination is not a compute source = link.Destination.ID } + priced := processing.ConvertToPricedResource(tools.PROCESSING_RESOURCE, request) if source != "" { - if processing.UsageStart != nil { - near := float64(processing.UsageStart.Sub(start).Seconds()) + if priced.GetLocationStart() != nil { + near := float64(priced.GetLocationStart().Sub(start).Seconds()) if near < nearestStart { nearestStart = near } } - if processing.UsageEnd != nil { - duration := float64(processing.UsageEnd.Sub(*processing.UsageStart).Seconds()) + if priced.GetLocationEnd() != nil { + duration := float64(priced.GetLocationEnd().Sub(*priced.GetLocationStart()).Seconds()) if longestDuration < duration { longestDuration = duration } @@ -52,18 +53,10 @@ func (g *Graph) GetAverageTimeRelatedToProcessingActivity(start time.Time, proce return nearestStart, longestDuration } -func (g *Graph) SetItemStartUsage(graphItemID string, start time.Time) { - g.Items[graphItemID].SetItemStartUsage(start) -} - -func (g *Graph) SetItemEndUsage(graphItemID string, end time.Time) { - g.Items[graphItemID].SetItemEndUsage(end) -} - /* * GetAverageTimeBeforeStart is a function that returns the average time before the start of a processing */ -func (g *Graph) GetAverageTimeProcessingBeforeStart(average float64, processingID string) float64 { +func (g *Graph) GetAverageTimeProcessingBeforeStart(average float64, processingID string, request *tools.APIRequest) float64 { currents := []float64{} // list of current time for _, link := range g.Links { // for each link var source string // source is the source of the link @@ -75,13 +68,17 @@ func (g *Graph) GetAverageTimeProcessingBeforeStart(average float64, processingI if source == "" { // if source is empty, continue continue } - _, item := g.GetResource(source) // get the resource of the source - current := item.GetExplicitDurationInS() // get the explicit duration of the item - if current < 0 { // if current is negative, its means that duration of a before could be infinite continue + dt, r := g.GetResource(source) // get the resource of the source + if r == nil { // if item is nil, continue + continue + } + priced := r.ConvertToPricedResource(dt, request) + current := priced.GetExplicitDurationInS() // get the explicit duration of the item + if current < 0 { // if current is negative, its means that duration of a before could be infinite continue return current } - current += g.GetAverageTimeProcessingBeforeStart(current, source) // get the average time before start of the source - currents = append(currents, current) // append the current to the currents + current += g.GetAverageTimeProcessingBeforeStart(current, source, request) // get the average time before start of the source + currents = append(currents, current) // append the current to the currents } var max float64 // get the max time to wait dependancies to finish for _, current := range currents { @@ -92,60 +89,45 @@ func (g *Graph) GetAverageTimeProcessingBeforeStart(average float64, processingI return max } -func (g *Graph) GetResource(id string) (string, resources.ShallowResourceInterface) { +func (g *Graph) GetResource(id string) (tools.DataType, resources.ResourceInterface) { if item, ok := g.Items[id]; ok { if item.Data != nil { - return tools.DATA_RESOURCE.String(), item.Data + return tools.DATA_RESOURCE, item.Data } else if item.Compute != nil { - return tools.COMPUTE_RESOURCE.String(), item.Compute + return tools.COMPUTE_RESOURCE, item.Compute } else if item.Workflow != nil { - return tools.WORKFLOW_RESOURCE.String(), item.Workflow + return tools.WORKFLOW_RESOURCE, item.Workflow } else if item.Processing != nil { - return tools.PROCESSING_RESOURCE.String(), item.Processing + return tools.PROCESSING_RESOURCE, item.Processing } else if item.Storage != nil { - return tools.STORAGE_RESOURCE.String(), item.Storage + return tools.STORAGE_RESOURCE, item.Storage } } - return "", nil + return tools.INVALID, nil } // GraphItem is a struct that represents an item in a graph type GraphItem struct { - ID string `bson:"id" json:"id" validate:"required"` // ID is the unique identifier of the item - Width float64 `bson:"width" json:"width" validate:"required"` // Width is the graphical width of the item - Height float64 `bson:"height" json:"height" validate:"required"` // Height is the graphical height of the item - Position Position `bson:"position" json:"position" validate:"required"` // Position is the graphical position of the item - *resources.ItemExploitedResource // ItemResource is the resource of the item affected to the item + ID string `bson:"id" json:"id" validate:"required"` // ID is the unique identifier of the item + Width float64 `bson:"width" json:"width" validate:"required"` // Width is the graphical width of the item + Height float64 `bson:"height" json:"height" validate:"required"` // Height is the graphical height of the item + Position Position `bson:"position" json:"position" validate:"required"` // Position is the graphical position of the item + *resources.ItemResource // ItemResource is the resource of the item affected to the item } -func (g *GraphItem) GetResource() resources.ShallowResourceInterface { +func (g *GraphItem) GetResource() (tools.DataType, resources.ResourceInterface) { if g.Data != nil { - return g.Data + return tools.DATA_RESOURCE, g.Data } else if g.Compute != nil { - return g.Compute + return tools.COMPUTE_RESOURCE, g.Compute } else if g.Workflow != nil { - return g.Workflow + return tools.WORKFLOW_RESOURCE, g.Workflow } else if g.Processing != nil { - return g.Processing + return tools.PROCESSING_RESOURCE, g.Processing } else if g.Storage != nil { - return g.Storage + return tools.STORAGE_RESOURCE, g.Storage } - return nil -} - -func (g *GraphItem) GetPricedItem() pricing.PricedItemITF { - if g.Data != nil { - return g.Data - } else if g.Compute != nil { - return g.Compute - } else if g.Workflow != nil { - return g.Workflow - } else if g.Processing != nil { - return g.Processing - } else if g.Storage != nil { - return g.Storage - } - return nil + return tools.INVALID, nil } // GraphLink is a struct that represents a link between two items in a graph diff --git a/models/workflow/workflow.go b/models/workflow/workflow.go index 23fb62b..ca1dc52 100644 --- a/models/workflow/workflow.go +++ b/models/workflow/workflow.go @@ -2,6 +2,7 @@ package workflow import ( "errors" + "fmt" "time" "cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area" @@ -40,42 +41,42 @@ func (w *AbstractWorkflow) GetGraphItems(f func(item graph.GraphItem) bool) (lis return } -func (w *AbstractWorkflow) GetResources(f func(item graph.GraphItem) bool) map[string]resources.ShallowResourceInterface { - list_datas := map[string]resources.ShallowResourceInterface{} +func (w *AbstractWorkflow) GetResources(f func(item graph.GraphItem) bool) map[string]resources.ResourceInterface { + list_datas := map[string]resources.ResourceInterface{} for _, item := range w.Graph.Items { if f(item) { - res := item.GetResource() + _, res := item.GetResource() list_datas[res.GetID()] = res } } return list_datas } -func (w *AbstractWorkflow) GetPricedItem(f func(item graph.GraphItem) bool) map[string]pricing.PricedItemITF { +func (w *AbstractWorkflow) GetPricedItem(f func(item graph.GraphItem) bool, request *tools.APIRequest) map[string]pricing.PricedItemITF { list_datas := map[string]pricing.PricedItemITF{} for _, item := range w.Graph.Items { if f(item) { - res := item.GetResource() - ord := item.GetPricedItem() + dt, res := item.GetResource() + ord := res.ConvertToPricedResource(dt, request) list_datas[res.GetID()] = ord } } return list_datas } -func (w *AbstractWorkflow) GetByRelatedProcessing(processingID string, g func(item graph.GraphItem) bool) []resources.ShallowResourceInterface { - storages := []resources.ShallowResourceInterface{} +func (w *AbstractWorkflow) GetByRelatedProcessing(processingID string, g func(item graph.GraphItem) bool) []resources.ResourceInterface { + storages := []resources.ResourceInterface{} for _, link := range w.Graph.Links { nodeID := link.Destination.ID - var node resources.ShallowResourceInterface + var node resources.ResourceInterface if g(w.Graph.Items[link.Source.ID]) { item := w.Graph.Items[link.Source.ID] - node = item.GetResource() + _, node = item.GetResource() } if node == nil && g(w.Graph.Items[link.Destination.ID]) { // if the source is not a storage, we consider that the destination is the storage nodeID = link.Source.ID item := w.Graph.Items[link.Destination.ID] // and the processing is the source - node = item.GetResource() // we are looking for the storage as destination + _, node = item.GetResource() // we are looking for the storage as destination } if processingID == nodeID && node != nil { // if the storage is linked to the processing storages = append(storages, node) @@ -113,37 +114,12 @@ type Workflow struct { AbstractWorkflow // AbstractWorkflow contains the basic fields of a workflow } -func (w *Workflow) GetNearestStart(start time.Time) float64 { - near := float64(10000000000) - for _, item := range w.Graph.Items { - if item.GetResource().GetLocationStart() == nil { - continue - } - newS := item.GetResource().GetLocationStart() - if newS.Sub(start).Seconds() < near { - near = newS.Sub(start).Seconds() - } - // get the nearest start from start var +func (w *Workflow) getPricedItem(item graph.GraphItem, request *tools.APIRequest) pricing.PricedItemITF { + dt, res := item.GetResource() + if dt == tools.INVALID { + return nil } - return near -} - -func (w *Workflow) GetLongestTime(end *time.Time) float64 { - if end == nil { - return -1 - } - longestTime := float64(0) - for _, item := range w.GetGraphItems(w.IsProcessing) { - if item.GetResource().GetLocationEnd() == nil { - continue - } - newS := item.GetResource().GetLocationEnd() - if longestTime < newS.Sub(*end).Seconds() { - longestTime = newS.Sub(*end).Seconds() - } - // get the nearest start from start var - } - return longestTime + return res.ConvertToPricedResource(dt, request) } func (ao *Workflow) VerifyAuth(request *tools.APIRequest) bool { @@ -188,3 +164,125 @@ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) { } return true, nil } + +func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIRequest) (float64, map[tools.DataType][]pricing.PricedItemITF, *Workflow, error) { + processings := []*resources.ProcessingResource{} + priceds := map[tools.DataType][]pricing.PricedItemITF{} + priceds[tools.PROCESSING_RESOURCE] = []pricing.PricedItemITF{} + for _, item := range wf.GetGraphItems(wf.IsProcessing) { + dt, realItem := item.GetResource() + if realItem == nil { + return 0, priceds, nil, errors.New("could not load the processing resource") + } + priced := realItem.ConvertToPricedResource(dt, request) + timeFromStartS := wf.Graph.GetAverageTimeProcessingBeforeStart(0, realItem.GetID(), request) + started := start.Add(time.Duration(timeFromStartS) * time.Second) + priced.SetLocationStart(started) + priced.SetLocationEnd(started.Add(time.Duration(priced.GetExplicitDurationInS()))) + processings = append(processings, realItem.(*resources.ProcessingResource)) + priceds[tools.PROCESSING_RESOURCE] = append(priceds[tools.PROCESSING_RESOURCE], priced) + } + priceds[tools.DATA_RESOURCE] = []pricing.PricedItemITF{} + for _, item := range wf.GetGraphItems(wf.IsData) { + dt, realItem := item.GetResource() + if realItem == nil { + continue + } + priced := realItem.ConvertToPricedResource(dt, request) + priced.SetLocationStart(start) + priced.SetLocationEnd(*end) + priceds[tools.PROCESSING_RESOURCE] = append(priceds[tools.PROCESSING_RESOURCE], priced) + } + for _, f := range []func(graph.GraphItem) bool{wf.IsStorage, wf.IsCompute} { + for _, item := range wf.GetGraphItems(f) { + dt, r := item.GetResource() + if r == nil { + continue + } + if priceds[dt] == nil { + priceds[dt] = []pricing.PricedItemITF{} + } + priced := r.ConvertToPricedResource(dt, request) + nearestStart, longestDuration := wf.Graph.GetAverageTimeRelatedToProcessingActivity(start, processings, r, + func(i graph.GraphItem) resources.ResourceInterface { + if f(i) { + _, r := i.GetResource() + return r + } else { + return nil + } + }, request) + started := start.Add(time.Duration(nearestStart) * time.Second) + priced.SetLocationStart(started) + if longestDuration >= 0 { + priced.SetLocationEnd(started.Add(time.Duration(longestDuration))) + } + priceds[dt] = append(priceds[dt], priced) + } + } + longest := wf.getLongestTime(end, priceds, request) + priceds[tools.WORKFLOW_RESOURCE] = []pricing.PricedItemITF{} + for _, item := range wf.GetGraphItems(wf.IsWorkflow) { + access := NewAccessor(nil) + _, r := item.GetResource() + if r == nil { + return 0, priceds, nil, errors.New("could not load the workflow") + } + priced := r.ConvertToPricedResource(tools.WORKFLOW_RESOURCE, request) + res, code, err := access.LoadOne(r.GetID()) + if code != 200 || err != nil { + return 0, priceds, nil, errors.New("could not load the workflow with id: " + fmt.Sprintf("%v", err.Error())) + } + neoLongest := float64(0) + innerWF := res.(*Workflow) + neoLongest, _, innerWF, err = innerWF.Planify(start, end, request) + if neoLongest > longest { + longest = neoLongest + } + started := start.Add(time.Duration(wf.getNearestStart(start, priceds, request)) * time.Second) + priced.SetLocationStart(started) + durationE := time.Duration(longest) + if durationE < 0 { + continue + } + ended := start.Add(durationE * time.Second) + priced.SetLocationEnd(ended) + priceds[tools.WORKFLOW_RESOURCE] = append(priceds[tools.WORKFLOW_RESOURCE], priced) + } + return longest, priceds, wf, nil +} + +func (wf *Workflow) getNearestStart(start time.Time, priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 { + near := float64(10000000000) + for _, items := range priceds { + for _, priced := range items { + if priced.GetLocationStart() == nil { + continue + } + newS := priced.GetLocationStart() + if newS.Sub(start).Seconds() < near { + near = newS.Sub(start).Seconds() + } + } + // get the nearest start from start var + } + return near +} + +func (wf *Workflow) getLongestTime(end *time.Time, priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 { + if end == nil { + return -1 + } + longestTime := float64(0) + for _, priced := range priceds[tools.PROCESSING_RESOURCE] { + if priced.GetLocationEnd() == nil { + continue + } + newS := priced.GetLocationEnd() + if longestTime < newS.Sub(*end).Seconds() { + longestTime = newS.Sub(*end).Seconds() + } + // get the nearest start from start var + } + return longestTime +} diff --git a/models/workflow/workflow_history_mongo_accessor.go b/models/workflow/workflow_history_mongo_accessor.go index b5808cc..eb10c1e 100644 --- a/models/workflow/workflow_history_mongo_accessor.go +++ b/models/workflow/workflow_history_mongo_accessor.go @@ -8,7 +8,7 @@ import ( type WorkflowHistory struct{ Workflow } -func (d *WorkflowHistory) GetAccessor(request tools.APIRequest) utils.Accessor { +func (d *WorkflowHistory) GetAccessor(request *tools.APIRequest) utils.Accessor { return NewAccessorHistory(request) // Create a new instance of the accessor } func (r *WorkflowHistory) GenerateID() { diff --git a/models/workflow_execution/workflow_execution.go b/models/workflow_execution/workflow_execution.go index fcc1da5..c026faf 100644 --- a/models/workflow_execution/workflow_execution.go +++ b/models/workflow_execution/workflow_execution.go @@ -7,9 +7,8 @@ import ( "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/common" - "cloud.o-forge.io/core/oc-lib/models/resources" + "cloud.o-forge.io/core/oc-lib/models/common/pricing" "cloud.o-forge.io/core/oc-lib/models/utils" - "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/tools" "github.com/google/uuid" "go.mongodb.org/mongo-driver/bson/primitive" @@ -105,28 +104,25 @@ func (d *WorkflowExecutions) VerifyAuth(request *tools.APIRequest) bool { return true } -func (d *WorkflowExecutions) Book(wf *workflow.Workflow) []*booking.Booking { - booking := []*booking.Booking{} - for _, p := range wf.ProcessingResources { - booking = append(booking, d.toItemBooking(wf.GetByRelatedProcessing(p.GetID(), wf.IsStorage))...) - booking = append(booking, d.toItemBooking(wf.GetByRelatedProcessing(p.GetID(), wf.IsProcessing))...) - } +func (d *WorkflowExecutions) Book(priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) []*booking.Booking { + booking := d.bookEach(tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE], request) + booking = append(booking, d.bookEach(tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE], request)...) return booking } -func (d *WorkflowExecutions) toItemBooking(ss []resources.ShallowResourceInterface) []*booking.Booking { +func (d *WorkflowExecutions) bookEach(dt tools.DataType, priceds []pricing.PricedItemITF, request *tools.APIRequest) []*booking.Booking { items := []*booking.Booking{} - for _, s := range ss { + for _, priced := range priceds { start := d.ExecDate - if s := s.GetLocationStart(); s != nil { + if s := priced.GetLocationStart(); s != nil { start = *s } - end := start.Add(time.Duration(s.GetExplicitDurationInS()) * time.Second) + end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second) bookingItem := &booking.Booking{ State: common.DRAFT, - ResourceID: s.GetID(), - ResourceType: s.GetType(), - DestPeerID: s.GetCreatorID(), + ResourceID: priced.GetID(), + ResourceType: dt, + DestPeerID: priced.GetCreatorID(), ExpectedStartDate: start, ExpectedEndDate: &end, } diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index f793ccd..a262bce 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -8,10 +8,8 @@ import ( "cloud.o-forge.io/core/oc-lib/models/common" "cloud.o-forge.io/core/oc-lib/models/peer" - "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/workflow" - "cloud.o-forge.io/core/oc-lib/models/workflow/graph" "cloud.o-forge.io/core/oc-lib/tools" "github.com/robfig/cron" ) @@ -49,8 +47,8 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W return ws } -func (ws *WorkflowSchedule) CheckBooking(wfID string, caller *tools.HTTPCaller) (bool, *workflow.Workflow, []*WorkflowExecutions, error) { - if caller == nil && caller.URLS == nil && caller.URLS[tools.BOOKING] == nil || caller.URLS[tools.BOOKING][tools.POST] == "" { +func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecutions, error) { + if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.POST] == "" { return false, nil, []*WorkflowExecutions{}, errors.New("no caller defined") } access := workflow.NewAccessor(nil) @@ -59,13 +57,13 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, caller *tools.HTTPCaller) return false, nil, []*WorkflowExecutions{}, errors.New("could not load the workflow with id: " + err.Error()) } wf := res.(*workflow.Workflow) - wf, err = ws.planifyWorkflow(wf) + longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request) if err != nil { return false, wf, []*WorkflowExecutions{}, err } - ws.DurationS = wf.GetLongestTime(ws.End) + ws.DurationS = longest ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + "seconds." - if ws.End != nil && ws.Start.Add(time.Duration(wf.GetLongestTime(ws.End))*time.Second).After(*ws.End) { + if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) { ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n" } execs, err := ws.getExecutions(wf) @@ -73,10 +71,10 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, caller *tools.HTTPCaller) return false, wf, []*WorkflowExecutions{}, err } for _, exec := range execs { - bookings := exec.Book(wf) + bookings := exec.Book(priceds) for _, booking := range bookings { _, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", - tools.BOOKING, tools.POSTCHECK, booking.Serialize(booking), caller) + tools.BOOKING, tools.POSTCHECK, booking.Serialize(booking), request.Caller) if err != nil { return false, wf, execs, err } @@ -97,7 +95,7 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* if _, ok := methods[tools.POST]; !ok { return nil, []*WorkflowExecutions{}, errors.New("no path found") } - ok, wf, executions, err := ws.CheckBooking(wfID, request.Caller) + ok, wf, executions, err := ws.CheckBooking(wfID, request) if !ok || err != nil { return nil, []*WorkflowExecutions{}, errors.New("could not book the workflow" + fmt.Sprintf("%v", err)) } @@ -117,57 +115,6 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* return wf, executions, nil } -func (ws *WorkflowSchedule) planifyWorkflow(wf *workflow.Workflow) (*workflow.Workflow, error) { - processings := []*resources.CustomizedProcessingResource{} - for _, item := range wf.GetGraphItems(wf.IsProcessing) { - realItem := item.GetResource().(*resources.CustomizedProcessingResource) - timeFromStartS := wf.Graph.GetAverageTimeProcessingBeforeStart(0, realItem.GetID()) - started := ws.Start.Add(time.Duration(timeFromStartS) * time.Second) - wf.Graph.SetItemStartUsage(item.ID, started) - wf.Graph.SetItemEndUsage(item.ID, started.Add(time.Duration(realItem.ExplicitBookingDurationS))) - processings = append(processings, realItem) - } - for _, item := range wf.GetGraphItems(wf.IsData) { - wf.Graph.SetItemStartUsage(item.ID, ws.Start) - wf.Graph.SetItemEndUsage(item.ID, *ws.End) - } - for _, f := range []func(graph.GraphItem) bool{wf.IsStorage, wf.IsCompute} { - for _, item := range wf.GetGraphItems(f) { - nearestStart, longestDuration := wf.Graph.GetAverageTimeRelatedToProcessingActivity(ws.Start, processings, item.GetResource(), - func(i graph.GraphItem) resources.ShallowResourceInterface { - if f(i) { - return i.GetResource() - } else { - return nil - } - }) - started := ws.Start.Add(time.Duration(nearestStart) * time.Second) - wf.Graph.SetItemStartUsage(item.ID, started) - if longestDuration >= 0 { - wf.Graph.SetItemEndUsage(item.ID, started.Add(time.Duration(longestDuration))) - } - } - } - for _, item := range wf.GetGraphItems(wf.IsWorkflow) { - access := workflow.NewAccessor(nil) - res, code, err := access.LoadOne(item.GetResource().GetID()) - if code != 200 || err != nil { - return nil, errors.New("could not load the workflow with id: " + fmt.Sprintf("%v", err.Error())) - } - innerWF := res.(*workflow.Workflow) - innerWF, err = ws.planifyWorkflow(innerWF) - started := ws.Start.Add(time.Duration(innerWF.GetNearestStart(ws.Start)) * time.Second) - wf.Graph.SetItemStartUsage(item.ID, started) - durationE := time.Duration(innerWF.GetLongestTime(ws.End)) - if durationE < 0 { - continue - } - ended := ws.Start.Add(durationE * time.Second) - wf.Graph.SetItemEndUsage(item.ID, ended) - } - return wf, nil -} - /* BOOKING IMPLIED TIME, not of subscription but of execution so is processing time execution time applied on computes