From 9c2663601a2b4407fac027de470f3e311b6215bd Mon Sep 17 00:00:00 2001 From: mr Date: Thu, 23 Apr 2026 09:24:02 +0200 Subject: [PATCH] Service + Storage Binded to Compute --- models/live/live.go | 1 + models/models.go | 1 + models/peer/peer.go | 17 ++ models/resources/compute.go | 9 +- models/resources/models.go | 7 + models/resources/processing.go | 11 +- .../purchase_resource/purchase_resource.go | 1 + models/resources/resource.go | 6 + models/resources/resource_accessor.go | 6 +- models/resources/service.go | 197 ++++++++++++------ models/resources/tests/processing_test.go | 7 - models/workflow/graph/graph.go | 6 + models/workflow/graph/item.go | 3 + models/workflow/workflow.go | 28 ++- models/workflow/workflow_mongo_accessor.go | 2 + .../workflow_execution/workflow_execution.go | 16 ++ tools/enums.go | 5 +- 17 files changed, 235 insertions(+), 88 deletions(-) diff --git a/models/live/live.go b/models/live/live.go index f778ca3..6514291 100644 --- a/models/live/live.go +++ b/models/live/live.go @@ -57,6 +57,7 @@ func (ri *AbstractLive) Extend(typ ...string) map[string][]tools.DataType { ext[t] = append(ext[t], tools.COMPUTE_RESOURCE) ext[t] = append(ext[t], tools.STORAGE_RESOURCE) ext[t] = append(ext[t], tools.PROCESSING_RESOURCE) + ext[t] = append(ext[t], tools.SERVICE_RESOURCE) } } return ext diff --git a/models/models.go b/models/models.go index b8b4c80..f64a22d 100644 --- a/models/models.go +++ b/models/models.go @@ -31,6 +31,7 @@ var ModelsCatalog = map[string]func() utils.DBObject{ tools.COMPUTE_RESOURCE.String(): func() utils.DBObject { return &resource.ComputeResource{} }, tools.STORAGE_RESOURCE.String(): func() utils.DBObject { return &resource.StorageResource{} }, tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &resource.ProcessingResource{} }, + tools.SERVICE_RESOURCE.String(): func() utils.DBObject { return &resource.ServiceResource{} }, tools.NATIVE_TOOL.String(): func() utils.DBObject { return &resource.NativeTool{} }, tools.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} }, tools.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecution{} }, diff --git a/models/peer/peer.go b/models/peer/peer.go index b5b2b68..f6d85a1 100644 --- a/models/peer/peer.go +++ b/models/peer/peer.go @@ -7,6 +7,15 @@ import ( "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" + "github.com/biter777/countries" +) + +type PeerPerm int + +const ( + READ PeerRelation = iota + WRITE + MONITOR ) type PeerRelation int @@ -66,12 +75,20 @@ type PeerLocation struct { Latitude float64 `json:"latitude" bson:"latitude"` Longitude float64 `json:"longitude" bson:"longitude"` Granularity int `json:"granularity" bson:"granularity"` + + Country countries.CountryCode `json:"country,omitempty" bson:"country,omitempty"` + Timezone string `json:"timezone,omitempty" bson:"timezone,omitempty"` } // Peer is a struct that represents a peer type Peer struct { utils.AbstractObject + PeerPerms []PeerPerm `json:"peer_perms" bson:"peer_perms"` + + RelationLastChangeDate time.Time `json:"relation_last_change_date" bson:"relation_last_change_date"` + RelationLastChangeUser string `json:"relation_last_change_user" bson:"relation_last_change_user"` + Verify bool `json:"verify" bson:"verify"` OrganizationID string `json:"organization_id" bson:"organization_id"` PeerID string `json:"peer_id" bson:"peer_id" validate:"required"` diff --git a/models/resources/compute.go b/models/resources/compute.go index 61d43ad..706ff07 100755 --- a/models/resources/compute.go +++ b/models/resources/compute.go @@ -56,13 +56,16 @@ type ComputeNode struct { type ComputeResourceInstance struct { ResourceInstance[*ComputeResourcePartnership] - Source string `json:"source,omitempty" bson:"source,omitempty"` // Source is the source of the resource + Source string `json:"source,omitempty" bson:"source,omitempty"` SecurityLevel string `json:"security_level,omitempty" bson:"security_level,omitempty"` PowerSources []string `json:"power_sources,omitempty" bson:"power_sources,omitempty"` AnnualCO2Emissions float64 `json:"annual_co2_emissions,omitempty" bson:"co2_emissions,omitempty"` - CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model - GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model + CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` + GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` Nodes []*ComputeNode `json:"nodes,omitempty" bson:"nodes,omitempty"` + // AvailableStorages lists storage capabilities activatable on this compute unit (e.g. Minio, local volumes). + // These are shallow StorageResource entries — not independent catalog items — but carry full pricing structure. + AvailableStorages []*StorageResource `json:"available_storages,omitempty" bson:"available_storages,omitempty"` } // IsPeerless is always false for compute instances: a compute resource is diff --git a/models/resources/models.go b/models/resources/models.go index 5ff7ab7..dfd15da 100755 --- a/models/resources/models.go +++ b/models/resources/models.go @@ -12,6 +12,7 @@ type ResourceSet struct { Computes []string `bson:"computes,omitempty" json:"computes,omitempty"` Workflows []string `bson:"workflows,omitempty" json:"workflows,omitempty"` NativeTool []string `bson:"native,omitempty" json:"native,omitempty"` + Services []string `bson:"services,omitempty" json:"services,omitempty"` DataResources []*DataResource `bson:"-" json:"data_resources,omitempty"` StorageResources []*StorageResource `bson:"-" json:"storage_resources,omitempty"` @@ -19,6 +20,7 @@ type ResourceSet struct { ComputeResources []*ComputeResource `bson:"-" json:"compute_resources,omitempty"` WorkflowResources []*WorkflowResource `bson:"-" json:"workflow_resources,omitempty"` NativeTools []*NativeTool `bson:"-" json:"native_tools,omitempty"` + ServiceResources []*ServiceResource `bson:"-" json:"service_resources,omitempty"` } func (r *ResourceSet) Clear() { @@ -27,6 +29,7 @@ func (r *ResourceSet) Clear() { r.ProcessingResources = nil r.ComputeResources = nil r.WorkflowResources = nil + r.ServiceResources = nil } func (r *ResourceSet) Fill(request *tools.APIRequest) { @@ -37,6 +40,7 @@ func (r *ResourceSet) Fill(request *tools.APIRequest) { (&StorageResource{}): r.Storages, (&ProcessingResource{}): r.Processings, (&WorkflowResource{}): r.Workflows, + (&ServiceResource{}): r.Services, } { for _, id := range v { d, _, e := k.GetAccessor(request).LoadOne(id) @@ -52,6 +56,8 @@ func (r *ResourceSet) Fill(request *tools.APIRequest) { r.ProcessingResources = append(r.ProcessingResources, d.(*ProcessingResource)) case *WorkflowResource: r.WorkflowResources = append(r.WorkflowResources, d.(*WorkflowResource)) + case *ServiceResource: + r.ServiceResources = append(r.ServiceResources, d.(*ServiceResource)) } } } @@ -65,4 +71,5 @@ type ItemResource struct { Compute *ComputeResource `bson:"compute,omitempty" json:"compute,omitempty"` Workflow *WorkflowResource `bson:"workflow,omitempty" json:"workflow,omitempty"` NativeTool *NativeTool `bson:"native_tools,omitempty" json:"native_tools,omitempty"` + Service *ServiceResource `bson:"service,omitempty" json:"service,omitempty"` } diff --git a/models/resources/processing.go b/models/resources/processing.go index 13a3919..e026d54 100755 --- a/models/resources/processing.go +++ b/models/resources/processing.go @@ -29,9 +29,8 @@ type ProcessingUsage struct { type ProcessingResource struct { AbstractInstanciatedResource[*ProcessingInstance] IsEvent bool `json:"is_event,omitempty" bson:"is_event,omitempty"` - Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure - IsService bool `json:"is_service,omitempty" bson:"is_service,omitempty"` // IsService is a flag that indicates if the processing is a service - Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"` // Usage is the usage of the processing + Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` + Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"` OpenSource bool `json:"open_source" bson:"open_source" default:"false"` License string `json:"license,omitempty" bson:"license,omitempty"` Maturity string `json:"maturity,omitempty" bson:"maturity,omitempty"` @@ -69,7 +68,6 @@ type ProcessingResourcePartnership struct { type PricedProcessingResource struct { PricedResource[*ProcessingResourcePricingProfile] - IsService bool } func (r *PricedProcessingResource) ensurePricing() { @@ -102,10 +100,7 @@ func (a *PricedProcessingResource) GetExplicitDurationInS() float64 { a.BookingConfiguration = &BookingConfiguration{} } if a.BookingConfiguration.ExplicitBookingDurationS == 0 { - if a.IsService || a.BookingConfiguration.UsageStart == nil { - if a.IsService { - return -1 - } + if a.BookingConfiguration.UsageStart == nil { return (5 * time.Minute).Seconds() } return a.BookingConfiguration.UsageEnd.Sub(*a.BookingConfiguration.UsageStart).Seconds() diff --git a/models/resources/purchase_resource/purchase_resource.go b/models/resources/purchase_resource/purchase_resource.go index cb18dda..5e6ebcb 100644 --- a/models/resources/purchase_resource/purchase_resource.go +++ b/models/resources/purchase_resource/purchase_resource.go @@ -47,6 +47,7 @@ func (ri *PurchaseResource) Extend(typ ...string) map[string][]tools.DataType { ext[t] = append(ext[t], tools.COMPUTE_RESOURCE) ext[t] = append(ext[t], tools.STORAGE_RESOURCE) ext[t] = append(ext[t], tools.PROCESSING_RESOURCE) + ext[t] = append(ext[t], tools.SERVICE_RESOURCE) } } return ext diff --git a/models/resources/resource.go b/models/resources/resource.go index 8bf3b19..be7008f 100755 --- a/models/resources/resource.go +++ b/models/resources/resource.go @@ -511,6 +511,12 @@ func ToResource( return nil, err } return &data, nil + case tools.SERVICE_RESOURCE.EnumIndex(): + var data ServiceResource + if err := json.Unmarshal(payload, &data); err != nil { + return nil, err + } + return &data, nil } return nil, errors.New("can't found any data resources matching") } diff --git a/models/resources/resource_accessor.go b/models/resources/resource_accessor.go index 671197a..3aaf69f 100755 --- a/models/resources/resource_accessor.go +++ b/models/resources/resource_accessor.go @@ -18,8 +18,8 @@ type ResourceMongoAccessor[T ResourceInterface] struct { func NewAccessor[T ResourceInterface](t tools.DataType, request *tools.APIRequest) *ResourceMongoAccessor[T] { if !slices.Contains([]tools.DataType{ tools.COMPUTE_RESOURCE, tools.STORAGE_RESOURCE, - tools.PROCESSING_RESOURCE, tools.WORKFLOW_RESOURCE, - tools.DATA_RESOURCE, tools.NATIVE_TOOL, + tools.PROCESSING_RESOURCE, tools.SERVICE_RESOURCE, + tools.WORKFLOW_RESOURCE, tools.DATA_RESOURCE, tools.NATIVE_TOOL, }, t) { return nil } @@ -36,6 +36,8 @@ func NewAccessor[T ResourceInterface](t tools.DataType, request *tools.APIReques return &StorageResource{} case tools.PROCESSING_RESOURCE: return &ProcessingResource{} + case tools.SERVICE_RESOURCE: + return &ServiceResource{} case tools.WORKFLOW_RESOURCE: return &WorkflowResource{} case tools.DATA_RESOURCE: diff --git a/models/resources/service.go b/models/resources/service.go index bec6556..28df93c 100755 --- a/models/resources/service.go +++ b/models/resources/service.go @@ -12,49 +12,88 @@ import ( "github.com/google/uuid" ) -type ServiceUsage struct { - CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model - GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model - RAM *models.RAM `bson:"ram,omitempty" json:"ram,omitempty"` // RAM is the RAM +type ServiceMode int - StorageGb float64 `bson:"storage,omitempty" json:"storage,omitempty"` // Storage is the storage - Hypothesis string `bson:"hypothesis,omitempty" json:"hypothesis,omitempty"` - ScalingModel string `bson:"scaling_model,omitempty" json:"scaling_model,omitempty"` // ScalingModel is the scaling model +const ( + DEPLOYMENT ServiceMode = iota // deploy the service, pay for uptime — duration unbounded + HOSTED // use an existing service, pay per call — duration per request +) + +func (m ServiceMode) String() string { + return [...]string{"DEPLOYMENT", "HOSTED"}[m] +} + +type ServiceProtocol int + +const ( + HTTP ServiceProtocol = iota + GRPC + WEBSOCKET + TCP +) + +func (p ServiceProtocol) String() string { + return [...]string{"HTTP", "GRPC", "WEBSOCKET", "TCP"}[p] +} + +type ServiceUsage struct { + CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` + GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` + RAM *models.RAM `bson:"ram,omitempty" json:"ram,omitempty"` + StorageGb float64 `bson:"storage,omitempty" json:"storage,omitempty"` + Hypothesis string `bson:"hypothesis,omitempty" json:"hypothesis,omitempty"` + ScalingModel string `bson:"scaling_model,omitempty" json:"scaling_model,omitempty"` +} + +// ServiceResourceAccess describes how to reach the service once running. +// Populated for HOSTED instances (endpoint already known) and as a template for DEPLOYMENT. +type ServiceResourceAccess struct { + Container *models.Container `json:"container,omitempty" bson:"container,omitempty"` + Protocol ServiceProtocol `json:"protocol" bson:"protocol" default:"0"` + EndpointPattern string `json:"endpoint_pattern,omitempty" bson:"endpoint_pattern,omitempty"` + HealthCheckPath string `json:"health_check_path,omitempty" bson:"health_check_path,omitempty"` } -/* -* ServiceResource is a struct that represents a processing resource -* it defines the resource processing - */ type ServiceResource struct { AbstractInstanciatedResource[*ServiceInstance] - IsEvent bool `json:"is_event,omitempty" bson:"is_event,omitempty"` - Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure - IsService bool `json:"is_service,omitempty" bson:"is_service,omitempty"` // IsService is a flag that indicates if the processing is a service - Usage *ServiceUsage `bson:"usage,omitempty" json:"usage,omitempty"` // Usage is the usage of the processing + Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` + Usage *ServiceUsage `bson:"usage,omitempty" json:"usage,omitempty"` OpenSource bool `json:"open_source" bson:"open_source" default:"false"` License string `json:"license,omitempty" bson:"license,omitempty"` Maturity string `json:"maturity,omitempty" bson:"maturity,omitempty"` } func (r *ServiceResource) GetType() string { - return tools.PROCESSING_RESOURCE.String() + return tools.SERVICE_RESOURCE.String() } -type ServiceResourceAccess struct { - Container *models.Container `json:"container,omitempty" bson:"container,omitempty"` // Container is the container +func (d *ServiceResource) GetAccessor(request *tools.APIRequest) utils.Accessor { + return NewAccessor[*ServiceResource](tools.SERVICE_RESOURCE, request) +} + +func (abs *ServiceResource) ConvertToPricedResource(t tools.DataType, selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) { + if t != tools.SERVICE_RESOURCE { + return nil, errors.New("not the proper type expected : cannot convert to priced resource : have " + t.String() + " wait Service") + } + p, err := ConvertToPricedResource[*ServiceResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request) + if err != nil { + return nil, err + } + priced := p.(*PricedResource[*ServiceResourcePricingProfile]) + return &PricedServiceResource{PricedResource: *priced}, nil } type ServiceInstance struct { - ResourceInstance[*ResourcePartnerShip[*ServiceResourcePricingProfile]] - Access *ServiceResourceAccess `json:"access,omitempty" bson:"access,omitempty"` // Access is the access - SizeGB int `json:"size_gb,omitempty" bson:"size_gb,omitempty"` - ContentType string `json:"content_type,omitempty" bson:"content_type,omitempty"` + ResourceInstance[*ServiceResourcePartnership] + Mode ServiceMode `json:"mode" bson:"mode" default:"0"` + Access *ServiceResourceAccess `json:"access,omitempty" bson:"access,omitempty"` } +func (ri *ServiceInstance) IsPeerless() bool { return false } + func NewServiceInstance(name string, peerID string) ResourceInstanceITF { return &ServiceInstance{ - ResourceInstance: ResourceInstance[*ResourcePartnerShip[*ServiceResourcePricingProfile]]{ + ResourceInstance: ResourceInstance[*ServiceResourcePartnership]{ AbstractObject: utils.AbstractObject{ UUID: uuid.New().String(), Name: name, @@ -67,9 +106,62 @@ type ServiceResourcePartnership struct { ResourcePartnerShip[*ServiceResourcePricingProfile] } +// ServiceResourcePricingProfile handles both service modes: +// - DEPLOYMENT: uptime billing via ExploitPricingProfile (pay while service is up) +// - HOSTED: per-call billing via AccessPricingProfile (pay per request) +type ServiceResourcePricingProfile struct { + Mode ServiceMode `json:"mode" bson:"mode"` + UptimePricing *pricing.ExploitPricingProfile[pricing.TimePricingStrategy] `json:"uptime_pricing,omitempty" bson:"uptime_pricing,omitempty"` + AccessPricing *pricing.AccessPricingProfile[pricing.TimePricingStrategy] `json:"access_pricing,omitempty" bson:"access_pricing,omitempty"` +} + +func (p *ServiceResourcePricingProfile) ensure() { + if p.UptimePricing == nil { + p.UptimePricing = &pricing.ExploitPricingProfile[pricing.TimePricingStrategy]{} + } + if p.AccessPricing == nil { + p.AccessPricing = &pricing.AccessPricingProfile[pricing.TimePricingStrategy]{} + } +} + +func (p *ServiceResourcePricingProfile) IsPurchasable() bool { + p.ensure() + if p.Mode == DEPLOYMENT { + return p.UptimePricing.IsPurchasable() + } + return p.AccessPricing.IsPurchasable() +} + +func (p *ServiceResourcePricingProfile) IsBooked() bool { + p.ensure() + if p.Mode == DEPLOYMENT { + return p.UptimePricing.IsBooked() + } + return p.AccessPricing.IsBooked() +} + +func (p *ServiceResourcePricingProfile) GetPurchase() pricing.BuyingStrategy { + p.ensure() + if p.Mode == DEPLOYMENT { + return p.UptimePricing.GetPurchase() + } + return p.AccessPricing.GetPurchase() +} + +func (p *ServiceResourcePricingProfile) GetOverrideStrategyValue() int { + return -1 +} + +func (p *ServiceResourcePricingProfile) GetPriceHT(quantity float64, val float64, start time.Time, end time.Time, variations []*pricing.PricingVariation, params ...string) (float64, error) { + p.ensure() + if p.Mode == DEPLOYMENT { + return p.UptimePricing.GetPriceHT(quantity, val, start, end, variations, params...) + } + return p.AccessPricing.GetPriceHT(quantity, val, start, end, variations, params...) +} + type PricedServiceResource struct { PricedResource[*ServiceResourcePricingProfile] - IsService bool } func (r *PricedServiceResource) ensurePricing() { @@ -88,57 +180,30 @@ func (r *PricedServiceResource) IsBooked() bool { return r.SelectedPricing.IsBooked() } +func (r *PricedServiceResource) GetType() tools.DataType { + return tools.SERVICE_RESOURCE +} + func (r *PricedServiceResource) GetPriceHT() (float64, error) { r.ensurePricing() return r.PricedResource.GetPriceHT() } -func (r *PricedServiceResource) GetType() tools.DataType { - return tools.PROCESSING_RESOURCE -} - +// GetExplicitDurationInS returns -1 for DEPLOYMENT (unbounded uptime). +// For HOSTED, returns the actual call window duration. func (a *PricedServiceResource) GetExplicitDurationInS() float64 { + a.ensurePricing() + if a.SelectedPricing.Mode == DEPLOYMENT { + return -1 + } if a.BookingConfiguration == nil { a.BookingConfiguration = &BookingConfiguration{} } - if a.BookingConfiguration.ExplicitBookingDurationS == 0 { - if a.IsService || a.BookingConfiguration.UsageStart == nil { - if a.IsService { - return -1 - } - return (5 * time.Minute).Seconds() - } - return a.BookingConfiguration.UsageEnd.Sub(*a.BookingConfiguration.UsageStart).Seconds() + if a.BookingConfiguration.ExplicitBookingDurationS != 0 { + return a.BookingConfiguration.ExplicitBookingDurationS } - return a.BookingConfiguration.ExplicitBookingDurationS -} - -func (d *ServiceResource) GetAccessor(request *tools.APIRequest) utils.Accessor { - return NewAccessor[*ServiceResource](tools.PROCESSING_RESOURCE, request) // Create a new instance of the accessor -} - -func (abs *ServiceResource) ConvertToPricedResource(t tools.DataType, selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) { - if t != tools.PROCESSING_RESOURCE { - return nil, errors.New("not the proper type expected : cannot convert to priced resource : have " + t.String() + " wait Data") + if a.BookingConfiguration.UsageStart == nil || a.BookingConfiguration.UsageEnd == nil { + return (5 * time.Minute).Seconds() } - p, err := ConvertToPricedResource[*DataResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request) - if err != nil { - return nil, err - } - priced := p.(*PricedResource[*DataResourcePricingProfile]) - return &PricedDataResource{ - PricedResource: *priced, - }, nil -} - -type ServiceResourcePricingProfile struct { - pricing.AccessPricingProfile[pricing.TimePricingStrategy] // AccessPricingProfile is the pricing profile of a data it means that we can access the data for an amount of time -} - -func (p *ServiceResourcePricingProfile) IsPurchasable() bool { - return p.Pricing.BuyingStrategy == pricing.PERMANENT -} - -func (p *ServiceResourcePricingProfile) IsBooked() bool { - return p.Pricing.BuyingStrategy != pricing.PERMANENT + return a.BookingConfiguration.UsageEnd.Sub(*a.BookingConfiguration.UsageStart).Seconds() } diff --git a/models/resources/tests/processing_test.go b/models/resources/tests/processing_test.go index f81f4c4..aa3304c 100644 --- a/models/resources/tests/processing_test.go +++ b/models/resources/tests/processing_test.go @@ -30,13 +30,6 @@ func TestPricedProcessingResource_GetExplicitDurationInS(t *testing.T) { input PricedProcessingResource expected float64 }{ - { - name: "Service without explicit duration", - input: PricedProcessingResource{ - IsService: true, - }, - expected: -1, - }, { name: "Nil start time, non-service", input: PricedProcessingResource{ diff --git a/models/workflow/graph/graph.go b/models/workflow/graph/graph.go index c048692..3d563d7 100644 --- a/models/workflow/graph/graph.go +++ b/models/workflow/graph/graph.go @@ -45,6 +45,10 @@ func (wf *Graph) IsProcessing(item GraphItem) bool { return item.Processing != nil } +func (wf *Graph) IsService(item GraphItem) bool { + return item.Service != nil +} + func (wf *Graph) IsNativeTool(item GraphItem) bool { return item.NativeTool != nil } @@ -151,6 +155,8 @@ func (g *Graph) GetResource(id string) (tools.DataType, resources.ResourceInterf return tools.PROCESSING_RESOURCE, item.Processing } else if item.Storage != nil { return tools.STORAGE_RESOURCE, item.Storage + } else if item.Service != nil { + return tools.SERVICE_RESOURCE, item.Service } } return tools.INVALID, nil diff --git a/models/workflow/graph/item.go b/models/workflow/graph/item.go index d84600a..45511c5 100644 --- a/models/workflow/graph/item.go +++ b/models/workflow/graph/item.go @@ -27,6 +27,8 @@ func (g *GraphItem) GetResource() (tools.DataType, resources.ResourceInterface) return tools.STORAGE_RESOURCE, g.Storage } else if g.NativeTool != nil { return tools.NATIVE_TOOL, g.NativeTool + } else if g.Service != nil { + return tools.SERVICE_RESOURCE, g.Service } return tools.INVALID, nil } @@ -37,4 +39,5 @@ func (g *GraphItem) Clear() { g.Workflow = nil g.Processing = nil g.Storage = nil + g.Service = nil } diff --git a/models/workflow/workflow.go b/models/workflow/workflow.go index 39e3db6..d72e93f 100644 --- a/models/workflow/workflow.go +++ b/models/workflow/workflow.go @@ -91,6 +91,11 @@ func (d *Workflow) GetResources(dt tools.DataType) []resources.ResourceInterface itf = append(itf, d) } return itf + case tools.SERVICE_RESOURCE: + for _, d := range d.ServiceResources { + itf = append(itf, d) + } + return itf } return itf } @@ -229,6 +234,7 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A } d.generateResource(d.GetResources(tools.DATA_RESOURCE), request) d.generateResource(d.GetResources(tools.PROCESSING_RESOURCE), request) + d.generateResource(d.GetResources(tools.SERVICE_RESOURCE), request) d.generateResource(d.GetResources(tools.STORAGE_RESOURCE), request) d.generateResource(d.GetResources(tools.COMPUTE_RESOURCE), request) d.generateResource(d.GetResources(tools.WORKFLOW_RESOURCE), request) @@ -627,7 +633,8 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte priceds := map[tools.DataType]map[string]pricing.PricedItemITF{} var err error - // 2. Plan processings first so we can derive the total workflow duration. + // 2. Plan processings and services first so we can derive the total workflow duration. + // Services in DEPLOYMENT mode return duration=-1 (open-ended); HOSTED mode returns a bounded call window. ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, wf.Graph.IsProcessing, func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) { d, err := wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(), @@ -644,6 +651,24 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte if err != nil { return false, 0, priceds, nil, err } + if _, priceds, err = plan[*resources.ServiceResource](tools.SERVICE_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, wf.Graph.IsService, + func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) { + d, err := wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(), + *instances.Get(res.GetID()), *partnerships.Get(res.GetID()), *buyings.Get(res.GetID()), *strategies.Get(res.GetID()), + bookingMode, request) + if err != nil { + return start, 0, err + } + return start.Add(time.Duration(d) * time.Second), priced.GetExplicitDurationInS(), nil + }, func(started time.Time, duration float64) (*time.Time, error) { + if duration < 0 { + return nil, nil // DEPLOYMENT mode: open-ended + } + s := started.Add(time.Duration(duration) * time.Second) + return &s, nil + }); err != nil { + return false, 0, priceds, nil, err + } // Total workflow duration used as the booking window for compute/storage. // Returns -1 if any processing is a service (open-ended). @@ -793,6 +818,7 @@ func (w *Workflow) GetItemsByResources() map[tools.DataType]map[string][]string tools.DATA_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsData) }, tools.COMPUTE_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsCompute) }, tools.PROCESSING_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsProcessing) }, + tools.SERVICE_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsService) }, tools.WORKFLOW_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsWorkflow) }, } diff --git a/models/workflow/workflow_mongo_accessor.go b/models/workflow/workflow_mongo_accessor.go index 3686154..1a14c10 100644 --- a/models/workflow/workflow_mongo_accessor.go +++ b/models/workflow/workflow_mongo_accessor.go @@ -221,6 +221,8 @@ func (a *workflowMongoAccessor) verifyResource(obj utils.DBObject) utils.DBObjec access = resources.NewAccessor[*resources.DataResource](t, a.GetRequest()) case tools.NATIVE_TOOL: access = resources.NewAccessor[*resources.NativeTool](t, a.GetRequest()) + case tools.SERVICE_RESOURCE: + access = resources.NewAccessor[*resources.ServiceResource](t, a.GetRequest()) default: wf.Graph.Clear(resource.GetID()) } diff --git a/models/workflow_execution/workflow_execution.go b/models/workflow_execution/workflow_execution.go index 0e5e3e4..6e7615b 100755 --- a/models/workflow_execution/workflow_execution.go +++ b/models/workflow_execution/workflow_execution.go @@ -17,6 +17,17 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" ) +// EmbeddedStorageSelection records which storage capability was activated on a +// compute unit graph node, and which pricing options were selected for it. +// Key in WorkflowExecution.SelectedEmbeddedStorages is the compute graph node ID. +// A nil/absent entry means no storage was activated on that compute unit. +type EmbeddedStorageSelection struct { + StorageIndex int `json:"storage_index" bson:"storage_index"` // index in ComputeResourceInstance.AvailableStorages + PartnershipIndex int `json:"partnership_index" bson:"partnership_index"` // index in the storage's partnerships + BuyingIndex int `json:"buying_index" bson:"buying_index"` + StrategyIndex int `json:"strategy_index" bson:"strategy_index"` +} + // BookingState tracks the reservation and completion status of a single booking // within a workflow execution. // - IsBooked: true while the resource is actively reserved (set on WORKFLOW_STARTED_EVENT, @@ -56,6 +67,11 @@ type WorkflowExecution struct { SelectedPartnerships workflow.ConfigItem `json:"selected_partnerships"` SelectedBuyings workflow.ConfigItem `json:"selected_buyings"` SelectedStrategies workflow.ConfigItem `json:"selected_strategies"` + + // SelectedEmbeddedStorages records which storage capability was activated on + // each compute unit graph node (key = compute graph node ID). + // Populated by oc-scheduler, consumed by oc-monitord's argo builder. + SelectedEmbeddedStorages map[string]*EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty" bson:"selected_embedded_storages,omitempty"` } func (ri *WorkflowExecution) Extend(typ ...string) map[string][]tools.DataType { diff --git a/tools/enums.go b/tools/enums.go index 1d9c021..63bccae 100644 --- a/tools/enums.go +++ b/tools/enums.go @@ -33,6 +33,7 @@ const ( NATIVE_TOOL EXECUTION_VERIFICATION ALLOWED_IMAGE + SERVICE_RESOURCE ) var NOAPI = func() string { @@ -90,6 +91,7 @@ var InnerDefaultAPI = [...]func() string{ CATALOGAPI, SCHEDULERAPI, DATACENTERAPI, + CATALOGAPI, } // Bind the standard data name to the data type @@ -117,6 +119,7 @@ var Str = [...]string{ "native_tool", "execution_verification", "allowed_image", + "service_resource", } func FromString(comp string) int { @@ -152,7 +155,7 @@ func DataTypeList() []DataType { return []DataType{DATA_RESOURCE, PROCESSING_RESOURCE, STORAGE_RESOURCE, COMPUTE_RESOURCE, WORKFLOW_RESOURCE, WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY, ORDER, PURCHASE_RESOURCE, - LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL, EXECUTION_VERIFICATION, ALLOWED_IMAGE} + LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL, EXECUTION_VERIFICATION, ALLOWED_IMAGE, SERVICE_RESOURCE} } type PropalgationMessage struct {