Service + Storage Binded to Compute

This commit is contained in:
mr
2026-04-23 09:24:02 +02:00
parent 538496cd60
commit 9c2663601a
17 changed files with 235 additions and 88 deletions

View File

@@ -57,6 +57,7 @@ func (ri *AbstractLive) Extend(typ ...string) map[string][]tools.DataType {
ext[t] = append(ext[t], tools.COMPUTE_RESOURCE)
ext[t] = append(ext[t], tools.STORAGE_RESOURCE)
ext[t] = append(ext[t], tools.PROCESSING_RESOURCE)
ext[t] = append(ext[t], tools.SERVICE_RESOURCE)
}
}
return ext

View File

@@ -31,6 +31,7 @@ var ModelsCatalog = map[string]func() utils.DBObject{
tools.COMPUTE_RESOURCE.String(): func() utils.DBObject { return &resource.ComputeResource{} },
tools.STORAGE_RESOURCE.String(): func() utils.DBObject { return &resource.StorageResource{} },
tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &resource.ProcessingResource{} },
tools.SERVICE_RESOURCE.String(): func() utils.DBObject { return &resource.ServiceResource{} },
tools.NATIVE_TOOL.String(): func() utils.DBObject { return &resource.NativeTool{} },
tools.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} },
tools.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecution{} },

View File

@@ -7,6 +7,15 @@ import (
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/biter777/countries"
)
type PeerPerm int
const (
READ PeerRelation = iota
WRITE
MONITOR
)
type PeerRelation int
@@ -66,12 +75,20 @@ type PeerLocation struct {
Latitude float64 `json:"latitude" bson:"latitude"`
Longitude float64 `json:"longitude" bson:"longitude"`
Granularity int `json:"granularity" bson:"granularity"`
Country countries.CountryCode `json:"country,omitempty" bson:"country,omitempty"`
Timezone string `json:"timezone,omitempty" bson:"timezone,omitempty"`
}
// Peer is a struct that represents a peer
type Peer struct {
utils.AbstractObject
PeerPerms []PeerPerm `json:"peer_perms" bson:"peer_perms"`
RelationLastChangeDate time.Time `json:"relation_last_change_date" bson:"relation_last_change_date"`
RelationLastChangeUser string `json:"relation_last_change_user" bson:"relation_last_change_user"`
Verify bool `json:"verify" bson:"verify"`
OrganizationID string `json:"organization_id" bson:"organization_id"`
PeerID string `json:"peer_id" bson:"peer_id" validate:"required"`

View File

@@ -56,13 +56,16 @@ type ComputeNode struct {
type ComputeResourceInstance struct {
ResourceInstance[*ComputeResourcePartnership]
Source string `json:"source,omitempty" bson:"source,omitempty"` // Source is the source of the resource
Source string `json:"source,omitempty" bson:"source,omitempty"`
SecurityLevel string `json:"security_level,omitempty" bson:"security_level,omitempty"`
PowerSources []string `json:"power_sources,omitempty" bson:"power_sources,omitempty"`
AnnualCO2Emissions float64 `json:"annual_co2_emissions,omitempty" bson:"co2_emissions,omitempty"`
CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model
GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model
CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"`
GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"`
Nodes []*ComputeNode `json:"nodes,omitempty" bson:"nodes,omitempty"`
// AvailableStorages lists storage capabilities activatable on this compute unit (e.g. Minio, local volumes).
// These are shallow StorageResource entries — not independent catalog items — but carry full pricing structure.
AvailableStorages []*StorageResource `json:"available_storages,omitempty" bson:"available_storages,omitempty"`
}
// IsPeerless is always false for compute instances: a compute resource is

View File

@@ -12,6 +12,7 @@ type ResourceSet struct {
Computes []string `bson:"computes,omitempty" json:"computes,omitempty"`
Workflows []string `bson:"workflows,omitempty" json:"workflows,omitempty"`
NativeTool []string `bson:"native,omitempty" json:"native,omitempty"`
Services []string `bson:"services,omitempty" json:"services,omitempty"`
DataResources []*DataResource `bson:"-" json:"data_resources,omitempty"`
StorageResources []*StorageResource `bson:"-" json:"storage_resources,omitempty"`
@@ -19,6 +20,7 @@ type ResourceSet struct {
ComputeResources []*ComputeResource `bson:"-" json:"compute_resources,omitempty"`
WorkflowResources []*WorkflowResource `bson:"-" json:"workflow_resources,omitempty"`
NativeTools []*NativeTool `bson:"-" json:"native_tools,omitempty"`
ServiceResources []*ServiceResource `bson:"-" json:"service_resources,omitempty"`
}
func (r *ResourceSet) Clear() {
@@ -27,6 +29,7 @@ func (r *ResourceSet) Clear() {
r.ProcessingResources = nil
r.ComputeResources = nil
r.WorkflowResources = nil
r.ServiceResources = nil
}
func (r *ResourceSet) Fill(request *tools.APIRequest) {
@@ -37,6 +40,7 @@ func (r *ResourceSet) Fill(request *tools.APIRequest) {
(&StorageResource{}): r.Storages,
(&ProcessingResource{}): r.Processings,
(&WorkflowResource{}): r.Workflows,
(&ServiceResource{}): r.Services,
} {
for _, id := range v {
d, _, e := k.GetAccessor(request).LoadOne(id)
@@ -52,6 +56,8 @@ func (r *ResourceSet) Fill(request *tools.APIRequest) {
r.ProcessingResources = append(r.ProcessingResources, d.(*ProcessingResource))
case *WorkflowResource:
r.WorkflowResources = append(r.WorkflowResources, d.(*WorkflowResource))
case *ServiceResource:
r.ServiceResources = append(r.ServiceResources, d.(*ServiceResource))
}
}
}
@@ -65,4 +71,5 @@ type ItemResource struct {
Compute *ComputeResource `bson:"compute,omitempty" json:"compute,omitempty"`
Workflow *WorkflowResource `bson:"workflow,omitempty" json:"workflow,omitempty"`
NativeTool *NativeTool `bson:"native_tools,omitempty" json:"native_tools,omitempty"`
Service *ServiceResource `bson:"service,omitempty" json:"service,omitempty"`
}

View File

@@ -29,9 +29,8 @@ type ProcessingUsage struct {
type ProcessingResource struct {
AbstractInstanciatedResource[*ProcessingInstance]
IsEvent bool `json:"is_event,omitempty" bson:"is_event,omitempty"`
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
IsService bool `json:"is_service,omitempty" bson:"is_service,omitempty"` // IsService is a flag that indicates if the processing is a service
Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"` // Usage is the usage of the processing
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"`
Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"`
OpenSource bool `json:"open_source" bson:"open_source" default:"false"`
License string `json:"license,omitempty" bson:"license,omitempty"`
Maturity string `json:"maturity,omitempty" bson:"maturity,omitempty"`
@@ -69,7 +68,6 @@ type ProcessingResourcePartnership struct {
type PricedProcessingResource struct {
PricedResource[*ProcessingResourcePricingProfile]
IsService bool
}
func (r *PricedProcessingResource) ensurePricing() {
@@ -102,10 +100,7 @@ func (a *PricedProcessingResource) GetExplicitDurationInS() float64 {
a.BookingConfiguration = &BookingConfiguration{}
}
if a.BookingConfiguration.ExplicitBookingDurationS == 0 {
if a.IsService || a.BookingConfiguration.UsageStart == nil {
if a.IsService {
return -1
}
if a.BookingConfiguration.UsageStart == nil {
return (5 * time.Minute).Seconds()
}
return a.BookingConfiguration.UsageEnd.Sub(*a.BookingConfiguration.UsageStart).Seconds()

View File

@@ -47,6 +47,7 @@ func (ri *PurchaseResource) Extend(typ ...string) map[string][]tools.DataType {
ext[t] = append(ext[t], tools.COMPUTE_RESOURCE)
ext[t] = append(ext[t], tools.STORAGE_RESOURCE)
ext[t] = append(ext[t], tools.PROCESSING_RESOURCE)
ext[t] = append(ext[t], tools.SERVICE_RESOURCE)
}
}
return ext

View File

@@ -511,6 +511,12 @@ func ToResource(
return nil, err
}
return &data, nil
case tools.SERVICE_RESOURCE.EnumIndex():
var data ServiceResource
if err := json.Unmarshal(payload, &data); err != nil {
return nil, err
}
return &data, nil
}
return nil, errors.New("can't found any data resources matching")
}

View File

@@ -18,8 +18,8 @@ type ResourceMongoAccessor[T ResourceInterface] struct {
func NewAccessor[T ResourceInterface](t tools.DataType, request *tools.APIRequest) *ResourceMongoAccessor[T] {
if !slices.Contains([]tools.DataType{
tools.COMPUTE_RESOURCE, tools.STORAGE_RESOURCE,
tools.PROCESSING_RESOURCE, tools.WORKFLOW_RESOURCE,
tools.DATA_RESOURCE, tools.NATIVE_TOOL,
tools.PROCESSING_RESOURCE, tools.SERVICE_RESOURCE,
tools.WORKFLOW_RESOURCE, tools.DATA_RESOURCE, tools.NATIVE_TOOL,
}, t) {
return nil
}
@@ -36,6 +36,8 @@ func NewAccessor[T ResourceInterface](t tools.DataType, request *tools.APIReques
return &StorageResource{}
case tools.PROCESSING_RESOURCE:
return &ProcessingResource{}
case tools.SERVICE_RESOURCE:
return &ServiceResource{}
case tools.WORKFLOW_RESOURCE:
return &WorkflowResource{}
case tools.DATA_RESOURCE:

View File

@@ -12,49 +12,88 @@ import (
"github.com/google/uuid"
)
type ServiceUsage struct {
CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model
GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model
RAM *models.RAM `bson:"ram,omitempty" json:"ram,omitempty"` // RAM is the RAM
type ServiceMode int
StorageGb float64 `bson:"storage,omitempty" json:"storage,omitempty"` // Storage is the storage
Hypothesis string `bson:"hypothesis,omitempty" json:"hypothesis,omitempty"`
ScalingModel string `bson:"scaling_model,omitempty" json:"scaling_model,omitempty"` // ScalingModel is the scaling model
const (
DEPLOYMENT ServiceMode = iota // deploy the service, pay for uptime — duration unbounded
HOSTED // use an existing service, pay per call — duration per request
)
func (m ServiceMode) String() string {
return [...]string{"DEPLOYMENT", "HOSTED"}[m]
}
type ServiceProtocol int
const (
HTTP ServiceProtocol = iota
GRPC
WEBSOCKET
TCP
)
func (p ServiceProtocol) String() string {
return [...]string{"HTTP", "GRPC", "WEBSOCKET", "TCP"}[p]
}
type ServiceUsage struct {
CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"`
GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"`
RAM *models.RAM `bson:"ram,omitempty" json:"ram,omitempty"`
StorageGb float64 `bson:"storage,omitempty" json:"storage,omitempty"`
Hypothesis string `bson:"hypothesis,omitempty" json:"hypothesis,omitempty"`
ScalingModel string `bson:"scaling_model,omitempty" json:"scaling_model,omitempty"`
}
// ServiceResourceAccess describes how to reach the service once running.
// Populated for HOSTED instances (endpoint already known) and as a template for DEPLOYMENT.
type ServiceResourceAccess struct {
Container *models.Container `json:"container,omitempty" bson:"container,omitempty"`
Protocol ServiceProtocol `json:"protocol" bson:"protocol" default:"0"`
EndpointPattern string `json:"endpoint_pattern,omitempty" bson:"endpoint_pattern,omitempty"`
HealthCheckPath string `json:"health_check_path,omitempty" bson:"health_check_path,omitempty"`
}
/*
* ServiceResource is a struct that represents a processing resource
* it defines the resource processing
*/
type ServiceResource struct {
AbstractInstanciatedResource[*ServiceInstance]
IsEvent bool `json:"is_event,omitempty" bson:"is_event,omitempty"`
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
IsService bool `json:"is_service,omitempty" bson:"is_service,omitempty"` // IsService is a flag that indicates if the processing is a service
Usage *ServiceUsage `bson:"usage,omitempty" json:"usage,omitempty"` // Usage is the usage of the processing
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"`
Usage *ServiceUsage `bson:"usage,omitempty" json:"usage,omitempty"`
OpenSource bool `json:"open_source" bson:"open_source" default:"false"`
License string `json:"license,omitempty" bson:"license,omitempty"`
Maturity string `json:"maturity,omitempty" bson:"maturity,omitempty"`
}
func (r *ServiceResource) GetType() string {
return tools.PROCESSING_RESOURCE.String()
return tools.SERVICE_RESOURCE.String()
}
type ServiceResourceAccess struct {
Container *models.Container `json:"container,omitempty" bson:"container,omitempty"` // Container is the container
func (d *ServiceResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*ServiceResource](tools.SERVICE_RESOURCE, request)
}
func (abs *ServiceResource) ConvertToPricedResource(t tools.DataType, selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) {
if t != tools.SERVICE_RESOURCE {
return nil, errors.New("not the proper type expected : cannot convert to priced resource : have " + t.String() + " wait Service")
}
p, err := ConvertToPricedResource[*ServiceResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request)
if err != nil {
return nil, err
}
priced := p.(*PricedResource[*ServiceResourcePricingProfile])
return &PricedServiceResource{PricedResource: *priced}, nil
}
type ServiceInstance struct {
ResourceInstance[*ResourcePartnerShip[*ServiceResourcePricingProfile]]
Access *ServiceResourceAccess `json:"access,omitempty" bson:"access,omitempty"` // Access is the access
SizeGB int `json:"size_gb,omitempty" bson:"size_gb,omitempty"`
ContentType string `json:"content_type,omitempty" bson:"content_type,omitempty"`
ResourceInstance[*ServiceResourcePartnership]
Mode ServiceMode `json:"mode" bson:"mode" default:"0"`
Access *ServiceResourceAccess `json:"access,omitempty" bson:"access,omitempty"`
}
func (ri *ServiceInstance) IsPeerless() bool { return false }
func NewServiceInstance(name string, peerID string) ResourceInstanceITF {
return &ServiceInstance{
ResourceInstance: ResourceInstance[*ResourcePartnerShip[*ServiceResourcePricingProfile]]{
ResourceInstance: ResourceInstance[*ServiceResourcePartnership]{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: name,
@@ -67,9 +106,62 @@ type ServiceResourcePartnership struct {
ResourcePartnerShip[*ServiceResourcePricingProfile]
}
// ServiceResourcePricingProfile handles both service modes:
// - DEPLOYMENT: uptime billing via ExploitPricingProfile (pay while service is up)
// - HOSTED: per-call billing via AccessPricingProfile (pay per request)
type ServiceResourcePricingProfile struct {
Mode ServiceMode `json:"mode" bson:"mode"`
UptimePricing *pricing.ExploitPricingProfile[pricing.TimePricingStrategy] `json:"uptime_pricing,omitempty" bson:"uptime_pricing,omitempty"`
AccessPricing *pricing.AccessPricingProfile[pricing.TimePricingStrategy] `json:"access_pricing,omitempty" bson:"access_pricing,omitempty"`
}
func (p *ServiceResourcePricingProfile) ensure() {
if p.UptimePricing == nil {
p.UptimePricing = &pricing.ExploitPricingProfile[pricing.TimePricingStrategy]{}
}
if p.AccessPricing == nil {
p.AccessPricing = &pricing.AccessPricingProfile[pricing.TimePricingStrategy]{}
}
}
func (p *ServiceResourcePricingProfile) IsPurchasable() bool {
p.ensure()
if p.Mode == DEPLOYMENT {
return p.UptimePricing.IsPurchasable()
}
return p.AccessPricing.IsPurchasable()
}
func (p *ServiceResourcePricingProfile) IsBooked() bool {
p.ensure()
if p.Mode == DEPLOYMENT {
return p.UptimePricing.IsBooked()
}
return p.AccessPricing.IsBooked()
}
func (p *ServiceResourcePricingProfile) GetPurchase() pricing.BuyingStrategy {
p.ensure()
if p.Mode == DEPLOYMENT {
return p.UptimePricing.GetPurchase()
}
return p.AccessPricing.GetPurchase()
}
func (p *ServiceResourcePricingProfile) GetOverrideStrategyValue() int {
return -1
}
func (p *ServiceResourcePricingProfile) GetPriceHT(quantity float64, val float64, start time.Time, end time.Time, variations []*pricing.PricingVariation, params ...string) (float64, error) {
p.ensure()
if p.Mode == DEPLOYMENT {
return p.UptimePricing.GetPriceHT(quantity, val, start, end, variations, params...)
}
return p.AccessPricing.GetPriceHT(quantity, val, start, end, variations, params...)
}
type PricedServiceResource struct {
PricedResource[*ServiceResourcePricingProfile]
IsService bool
}
func (r *PricedServiceResource) ensurePricing() {
@@ -88,57 +180,30 @@ func (r *PricedServiceResource) IsBooked() bool {
return r.SelectedPricing.IsBooked()
}
func (r *PricedServiceResource) GetType() tools.DataType {
return tools.SERVICE_RESOURCE
}
func (r *PricedServiceResource) GetPriceHT() (float64, error) {
r.ensurePricing()
return r.PricedResource.GetPriceHT()
}
func (r *PricedServiceResource) GetType() tools.DataType {
return tools.PROCESSING_RESOURCE
}
// GetExplicitDurationInS returns -1 for DEPLOYMENT (unbounded uptime).
// For HOSTED, returns the actual call window duration.
func (a *PricedServiceResource) GetExplicitDurationInS() float64 {
a.ensurePricing()
if a.SelectedPricing.Mode == DEPLOYMENT {
return -1
}
if a.BookingConfiguration == nil {
a.BookingConfiguration = &BookingConfiguration{}
}
if a.BookingConfiguration.ExplicitBookingDurationS == 0 {
if a.IsService || a.BookingConfiguration.UsageStart == nil {
if a.IsService {
return -1
if a.BookingConfiguration.ExplicitBookingDurationS != 0 {
return a.BookingConfiguration.ExplicitBookingDurationS
}
if a.BookingConfiguration.UsageStart == nil || a.BookingConfiguration.UsageEnd == nil {
return (5 * time.Minute).Seconds()
}
return a.BookingConfiguration.UsageEnd.Sub(*a.BookingConfiguration.UsageStart).Seconds()
}
return a.BookingConfiguration.ExplicitBookingDurationS
}
func (d *ServiceResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*ServiceResource](tools.PROCESSING_RESOURCE, request) // Create a new instance of the accessor
}
func (abs *ServiceResource) ConvertToPricedResource(t tools.DataType, selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) {
if t != tools.PROCESSING_RESOURCE {
return nil, errors.New("not the proper type expected : cannot convert to priced resource : have " + t.String() + " wait Data")
}
p, err := ConvertToPricedResource[*DataResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request)
if err != nil {
return nil, err
}
priced := p.(*PricedResource[*DataResourcePricingProfile])
return &PricedDataResource{
PricedResource: *priced,
}, nil
}
type ServiceResourcePricingProfile struct {
pricing.AccessPricingProfile[pricing.TimePricingStrategy] // AccessPricingProfile is the pricing profile of a data it means that we can access the data for an amount of time
}
func (p *ServiceResourcePricingProfile) IsPurchasable() bool {
return p.Pricing.BuyingStrategy == pricing.PERMANENT
}
func (p *ServiceResourcePricingProfile) IsBooked() bool {
return p.Pricing.BuyingStrategy != pricing.PERMANENT
}

View File

@@ -30,13 +30,6 @@ func TestPricedProcessingResource_GetExplicitDurationInS(t *testing.T) {
input PricedProcessingResource
expected float64
}{
{
name: "Service without explicit duration",
input: PricedProcessingResource{
IsService: true,
},
expected: -1,
},
{
name: "Nil start time, non-service",
input: PricedProcessingResource{

View File

@@ -45,6 +45,10 @@ func (wf *Graph) IsProcessing(item GraphItem) bool {
return item.Processing != nil
}
func (wf *Graph) IsService(item GraphItem) bool {
return item.Service != nil
}
func (wf *Graph) IsNativeTool(item GraphItem) bool {
return item.NativeTool != nil
}
@@ -151,6 +155,8 @@ func (g *Graph) GetResource(id string) (tools.DataType, resources.ResourceInterf
return tools.PROCESSING_RESOURCE, item.Processing
} else if item.Storage != nil {
return tools.STORAGE_RESOURCE, item.Storage
} else if item.Service != nil {
return tools.SERVICE_RESOURCE, item.Service
}
}
return tools.INVALID, nil

View File

@@ -27,6 +27,8 @@ func (g *GraphItem) GetResource() (tools.DataType, resources.ResourceInterface)
return tools.STORAGE_RESOURCE, g.Storage
} else if g.NativeTool != nil {
return tools.NATIVE_TOOL, g.NativeTool
} else if g.Service != nil {
return tools.SERVICE_RESOURCE, g.Service
}
return tools.INVALID, nil
}
@@ -37,4 +39,5 @@ func (g *GraphItem) Clear() {
g.Workflow = nil
g.Processing = nil
g.Storage = nil
g.Service = nil
}

View File

@@ -91,6 +91,11 @@ func (d *Workflow) GetResources(dt tools.DataType) []resources.ResourceInterface
itf = append(itf, d)
}
return itf
case tools.SERVICE_RESOURCE:
for _, d := range d.ServiceResources {
itf = append(itf, d)
}
return itf
}
return itf
}
@@ -229,6 +234,7 @@ func (d *Workflow) ExtractFromPlantUML(plantUML multipart.File, request *tools.A
}
d.generateResource(d.GetResources(tools.DATA_RESOURCE), request)
d.generateResource(d.GetResources(tools.PROCESSING_RESOURCE), request)
d.generateResource(d.GetResources(tools.SERVICE_RESOURCE), request)
d.generateResource(d.GetResources(tools.STORAGE_RESOURCE), request)
d.generateResource(d.GetResources(tools.COMPUTE_RESOURCE), request)
d.generateResource(d.GetResources(tools.WORKFLOW_RESOURCE), request)
@@ -627,7 +633,8 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
priceds := map[tools.DataType]map[string]pricing.PricedItemITF{}
var err error
// 2. Plan processings first so we can derive the total workflow duration.
// 2. Plan processings and services first so we can derive the total workflow duration.
// Services in DEPLOYMENT mode return duration=-1 (open-ended); HOSTED mode returns a bounded call window.
ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, wf.Graph.IsProcessing,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
d, err := wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(),
@@ -644,6 +651,24 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, instances ConfigIte
if err != nil {
return false, 0, priceds, nil, err
}
if _, priceds, err = plan[*resources.ServiceResource](tools.SERVICE_RESOURCE, instances, partnerships, buyings, strategies, bookingMode, wf, priceds, request, wf.Graph.IsService,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64, error) {
d, err := wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(),
*instances.Get(res.GetID()), *partnerships.Get(res.GetID()), *buyings.Get(res.GetID()), *strategies.Get(res.GetID()),
bookingMode, request)
if err != nil {
return start, 0, err
}
return start.Add(time.Duration(d) * time.Second), priced.GetExplicitDurationInS(), nil
}, func(started time.Time, duration float64) (*time.Time, error) {
if duration < 0 {
return nil, nil // DEPLOYMENT mode: open-ended
}
s := started.Add(time.Duration(duration) * time.Second)
return &s, nil
}); err != nil {
return false, 0, priceds, nil, err
}
// Total workflow duration used as the booking window for compute/storage.
// Returns -1 if any processing is a service (open-ended).
@@ -793,6 +818,7 @@ func (w *Workflow) GetItemsByResources() map[tools.DataType]map[string][]string
tools.DATA_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsData) },
tools.COMPUTE_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsCompute) },
tools.PROCESSING_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsProcessing) },
tools.SERVICE_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsService) },
tools.WORKFLOW_RESOURCE: func() []graph.GraphItem { return w.GetGraphItems(w.Graph.IsWorkflow) },
}

View File

@@ -221,6 +221,8 @@ func (a *workflowMongoAccessor) verifyResource(obj utils.DBObject) utils.DBObjec
access = resources.NewAccessor[*resources.DataResource](t, a.GetRequest())
case tools.NATIVE_TOOL:
access = resources.NewAccessor[*resources.NativeTool](t, a.GetRequest())
case tools.SERVICE_RESOURCE:
access = resources.NewAccessor[*resources.ServiceResource](t, a.GetRequest())
default:
wf.Graph.Clear(resource.GetID())
}

View File

@@ -17,6 +17,17 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
)
// EmbeddedStorageSelection records which storage capability was activated on a
// compute unit graph node, and which pricing options were selected for it.
// Key in WorkflowExecution.SelectedEmbeddedStorages is the compute graph node ID.
// A nil/absent entry means no storage was activated on that compute unit.
type EmbeddedStorageSelection struct {
StorageIndex int `json:"storage_index" bson:"storage_index"` // index in ComputeResourceInstance.AvailableStorages
PartnershipIndex int `json:"partnership_index" bson:"partnership_index"` // index in the storage's partnerships
BuyingIndex int `json:"buying_index" bson:"buying_index"`
StrategyIndex int `json:"strategy_index" bson:"strategy_index"`
}
// BookingState tracks the reservation and completion status of a single booking
// within a workflow execution.
// - IsBooked: true while the resource is actively reserved (set on WORKFLOW_STARTED_EVENT,
@@ -56,6 +67,11 @@ type WorkflowExecution struct {
SelectedPartnerships workflow.ConfigItem `json:"selected_partnerships"`
SelectedBuyings workflow.ConfigItem `json:"selected_buyings"`
SelectedStrategies workflow.ConfigItem `json:"selected_strategies"`
// SelectedEmbeddedStorages records which storage capability was activated on
// each compute unit graph node (key = compute graph node ID).
// Populated by oc-scheduler, consumed by oc-monitord's argo builder.
SelectedEmbeddedStorages map[string]*EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty" bson:"selected_embedded_storages,omitempty"`
}
func (ri *WorkflowExecution) Extend(typ ...string) map[string][]tools.DataType {

View File

@@ -33,6 +33,7 @@ const (
NATIVE_TOOL
EXECUTION_VERIFICATION
ALLOWED_IMAGE
SERVICE_RESOURCE
)
var NOAPI = func() string {
@@ -90,6 +91,7 @@ var InnerDefaultAPI = [...]func() string{
CATALOGAPI,
SCHEDULERAPI,
DATACENTERAPI,
CATALOGAPI,
}
// Bind the standard data name to the data type
@@ -117,6 +119,7 @@ var Str = [...]string{
"native_tool",
"execution_verification",
"allowed_image",
"service_resource",
}
func FromString(comp string) int {
@@ -152,7 +155,7 @@ func DataTypeList() []DataType {
return []DataType{DATA_RESOURCE, PROCESSING_RESOURCE, STORAGE_RESOURCE, COMPUTE_RESOURCE, WORKFLOW_RESOURCE,
WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY,
ORDER, PURCHASE_RESOURCE,
LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL, EXECUTION_VERIFICATION, ALLOWED_IMAGE}
LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL, EXECUTION_VERIFICATION, ALLOWED_IMAGE, SERVICE_RESOURCE}
}
type PropalgationMessage struct {