From f048b420d74d832fa2147dbf442e8bb76fcf15f4 Mon Sep 17 00:00:00 2001 From: mr Date: Mon, 27 Apr 2026 11:16:50 +0200 Subject: [PATCH] Addon --- models/booking/planner/planner.go | 103 +++++- models/common/planner.go | 16 +- models/live/live_mongo_accessor.go | 2 + models/live/live_service.go | 42 +++ models/models.go | 1 + models/resources/dynamic.go | 298 ++++++++++++++++++ models/resources/interfaces.go | 1 + models/resources/models.go | 7 + models/resources/resource.go | 12 + models/resources/service.go | 7 +- models/workflow/graph/graph.go | 4 + models/workflow/graph/item.go | 3 + .../workflow_execution/workflow_execution.go | 10 +- tools/enums.go | 8 +- 14 files changed, 484 insertions(+), 30 deletions(-) create mode 100644 models/live/live_service.go create mode 100755 models/resources/dynamic.go diff --git a/models/booking/planner/planner.go b/models/booking/planner/planner.go index 47adb5c..0ea29c2 100644 --- a/models/booking/planner/planner.go +++ b/models/booking/planner/planner.go @@ -8,16 +8,18 @@ import ( "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/common/enum" + "cloud.o-forge.io/core/oc-lib/models/live" "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/tools" ) // InstanceCapacity holds the maximum available resources of a single resource instance. type InstanceCapacity struct { - CPUCores map[string]float64 `json:"cpu_cores,omitempty"` // model -> total cores - GPUMemGB map[string]float64 `json:"gpu_mem_gb,omitempty"` // model -> total memory GB - RAMGB float64 `json:"ram_gb,omitempty"` // total RAM GB - StorageGB float64 `json:"storage_gb,omitempty"` // total storage GB + CPUCores map[string]float64 `json:"cpu_cores,omitempty"` // model -> total cores + GPUMemGB map[string]float64 `json:"gpu_mem_gb,omitempty"` // model -> total memory GB + RAMGB float64 `json:"ram_gb,omitempty"` // total RAM GB + StorageGB float64 `json:"storage_gb,omitempty"` // total storage GB + MaxConcurrent float64 `json:"max_concurrent,omitempty"` // HOSTED service: max simultaneous callers } // ResourceRequest describes the resource amounts needed for a prospective booking. @@ -47,11 +49,14 @@ type PlannerITF interface { } // Planner is a volatile (non-persisted) object that organises bookings by resource. -// Only ComputeResource and StorageResource bookings appear in the schedule. +// ComputeResource, StorageResource and HOSTED ServiceResource bookings appear in the schedule. +// BlockedResources marks resources for which no matching Live was found at generation time: +// any availability check against a blocked resource returns false immediately. type Planner struct { - GeneratedAt time.Time `json:"generated_at"` - Schedule map[string][]*PlannerSlot `json:"schedule"` // resource_id -> slots - Capacities map[string]map[string]*InstanceCapacity `json:"capacities"` // resource_id -> instance_id -> max capacity + GeneratedAt time.Time `json:"generated_at"` + Schedule map[string][]*PlannerSlot `json:"schedule"` // resource_id -> slots + Capacities map[string]map[string]*InstanceCapacity `json:"capacities"` // resource_id -> instance_id -> max capacity + BlockedResources map[string]bool `json:"blocked_resources,omitempty"` // resource_id -> no Live found } // Generate builds a full Planner from all active bookings. @@ -86,9 +91,10 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) { bookings := append(confirmed, drafts...) p := &Planner{ - GeneratedAt: time.Now(), - Schedule: map[string][]*PlannerSlot{}, - Capacities: map[string]map[string]*InstanceCapacity{}, + GeneratedAt: time.Now(), + Schedule: map[string][]*PlannerSlot{}, + Capacities: map[string]map[string]*InstanceCapacity{}, + BlockedResources: map[string]bool{}, } for _, b := range bookings { @@ -100,8 +106,10 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) { continue } - // Only compute and storage resources are eligible - if bk.ResourceType != tools.COMPUTE_RESOURCE && bk.ResourceType != tools.STORAGE_RESOURCE { + // Eligible resource types: compute, storage, and HOSTED services. + if bk.ResourceType != tools.COMPUTE_RESOURCE && + bk.ResourceType != tools.STORAGE_RESOURCE && + bk.ResourceType != tools.SERVICE_RESOURCE { continue } @@ -111,7 +119,11 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) { end = &e } - instanceID, usage, cap := extractSlotData(bk, request) + instanceID, usage, cap, blocked := extractSlotData(bk, request) + if blocked { + p.BlockedResources[bk.ResourceID] = true + continue + } if instanceID == "" { instanceID = bk.InstanceID } @@ -151,6 +163,9 @@ func generate(request *tools.APIRequest, shallow bool) (*Planner, error) { // Slots targeting other instances are ignored. // If no capacity is known for this instance (never booked), it is fully available. func (p *Planner) Check(resourceID string, instanceID string, req *ResourceRequest, start time.Time, end *time.Time) bool { + if p.BlockedResources[resourceID] { + return false // no Live found at generation time — cannot book + } if end == nil { e := start.Add(5 * time.Minute) end = &e @@ -265,6 +280,11 @@ func toPercentages(req *ResourceRequest, cap *InstanceCapacity) map[string]float pct["storage"] = (*req.StorageGB / cap.StorageGB) * 100.0 } + // HOSTED service: each booking consumes one call slot. + if cap.MaxConcurrent > 0 { + pct["calls"] = (1.0 / cap.MaxConcurrent) * 100.0 + } + return pct } @@ -272,9 +292,11 @@ func toPercentages(req *ResourceRequest, cap *InstanceCapacity) map[string]float // Internal helpers // --------------------------------------------------------------------------- -// extractSlotData parses the booking's PricedItem, loads the corresponding resource, -// and returns the instance ID, usage percentages, and instance capacity in a single pass. -func extractSlotData(bk *booking.Booking, request *tools.APIRequest) (instanceID string, usage map[string]float64, cap *InstanceCapacity) { +// extractSlotData parses the booking's PricedItem, loads the corresponding Live resource +// as the authoritative capacity source, and returns the instance ID, usage percentages, +// capacity, and whether a matching Live was found. +// blocked=true means no Live exists for this resource; the resource must not be scheduled. +func extractSlotData(bk *booking.Booking, request *tools.APIRequest) (instanceID string, usage map[string]float64, cap *InstanceCapacity, blocked bool) { usage = map[string]float64{} if len(bk.PricedItem) == 0 { return @@ -289,6 +311,8 @@ func extractSlotData(bk *booking.Booking, request *tools.APIRequest) (instanceID instanceID, usage, cap = extractComputeSlot(b, bk.ResourceID, request) case tools.STORAGE_RESOURCE: instanceID, usage, cap = extractStorageSlot(b, bk.ResourceID, request) + case tools.SERVICE_RESOURCE: + instanceID, usage, cap, blocked = extractServiceSlot(b, bk.ResourceID, request) } return } @@ -381,6 +405,51 @@ func extractStorageSlot(pricedJSON []byte, resourceID string, request *tools.API return } +// extractServiceSlot extracts the instance ID, usage, and capacity for a HOSTED service booking. +// The LiveService is the authoritative source for MaxConcurrent — the ServiceResource is not trusted. +// If no LiveService references this resourceID, blocked=true signals the resource cannot be scheduled. +func extractServiceSlot(pricedJSON []byte, resourceID string, request *tools.APIRequest) (instanceID string, usage map[string]float64, cap *InstanceCapacity, blocked bool) { + usage = map[string]float64{} + + var priced resources.PricedServiceResource + if err := json.Unmarshal(pricedJSON, &priced); err != nil { + blocked = true + return + } + + // LiveService is the authoritative capacity source — look it up by resources_id. + liveResults, _, err := (&live.LiveService{}).GetAccessor(request).Search( + &dbs.Filters{ + And: map[string][]dbs.Filter{ + "resources_id": {{Operator: dbs.EQUAL.String(), Value: resourceID}}, + }, + }, "*", false, 0, 1) + if err != nil || len(liveResults) == 0 { + blocked = true // no Live → cannot schedule + return + } + ls := liveResults[0].(*live.LiveService) + if ls.MaxConcurrent <= 0 { + blocked = true + return + } + + // Instance ID: use the first instance referenced by the priced item. + instanceID = priced.GetID() + if instanceID == "" { + instanceID = resourceID // fallback: treat the resource itself as the instance key + } + + maxC := float64(ls.MaxConcurrent) + cap = &InstanceCapacity{ + CPUCores: map[string]float64{}, + GPUMemGB: map[string]float64{}, + MaxConcurrent: maxC, + } + usage["calls"] = (1.0 / maxC) * 100.0 + return +} + // findComputeInstance returns the instance referenced by the priced item's InstancesRefs, // falling back to the first available instance. func findComputeInstance(compute *resources.ComputeResource, refs map[string]string) *resources.ComputeResourceInstance { diff --git a/models/common/planner.go b/models/common/planner.go index 8b8cdf1..a4c08f1 100755 --- a/models/common/planner.go +++ b/models/common/planner.go @@ -27,16 +27,18 @@ func GetPlannerNearestStart(start time.Time, planned map[tools.DataType]map[stri return near } -// GetPlannerLongestTime returns the sum of all processing durations (conservative estimate). -// Returns -1 if any processing is a service (open-ended). +// GetPlannerLongestTime returns the sum of all processing+service durations. +// Returns -1 if any item is open-ended (no deadline). func GetPlannerLongestTime(planned map[tools.DataType]map[string]pricing.PricedItemITF) float64 { longestTime := float64(0) - for _, priced := range planned[tools.PROCESSING_RESOURCE] { - d := priced.GetExplicitDurationInS() - if d < 0 { - return -1 // service present: booking is open-ended + for _, dt := range []tools.DataType{tools.PROCESSING_RESOURCE, tools.SERVICE_RESOURCE} { + for _, priced := range planned[dt] { + d := priced.GetExplicitDurationInS() + if d < 0 { + return -1 + } + longestTime += d } - longestTime += d } return longestTime } diff --git a/models/live/live_mongo_accessor.go b/models/live/live_mongo_accessor.go index e4c54a9..56e4a21 100644 --- a/models/live/live_mongo_accessor.go +++ b/models/live/live_mongo_accessor.go @@ -27,6 +27,8 @@ func NewAccessor[T LiveInterface](t tools.DataType, request *tools.APIRequest) * return &LiveDatacenter{} case tools.LIVE_STORAGE: return &LiveStorage{} + case tools.LIVE_SERVICE: + return &LiveService{} } return &LiveDatacenter{} }, diff --git a/models/live/live_service.go b/models/live/live_service.go new file mode 100644 index 0000000..be770c7 --- /dev/null +++ b/models/live/live_service.go @@ -0,0 +1,42 @@ +package live + +import ( + "cloud.o-forge.io/core/oc-lib/models/resources" + "cloud.o-forge.io/core/oc-lib/models/utils" + "cloud.o-forge.io/core/oc-lib/tools" +) + +// LiveService is the authoritative description of a hosted service run by the peer. +// MaxConcurrent is the only capacity dimension that matters for scheduling: +// it caps the number of simultaneous callers the service can accept. +// All other service metadata (endpoint, protocol) is live-verified here +// rather than trusted from the ServiceResource, which may be stale. +type LiveService struct { + AbstractLive + MaxConcurrent int `json:"max_concurrent" bson:"max_concurrent"` + Protocol resources.ServiceProtocol `json:"protocol" bson:"protocol" default:"0"` + EndpointPattern string `json:"endpoint_pattern,omitempty" bson:"endpoint_pattern,omitempty"` + HealthCheckPath string `json:"health_check_path,omitempty" bson:"health_check_path,omitempty"` +} + +func (d *LiveService) GetAccessor(request *tools.APIRequest) utils.Accessor { + return NewAccessor[*LiveService](tools.LIVE_SERVICE, request) +} + +func (d *LiveService) GetResourceAccessor(request *tools.APIRequest) utils.Accessor { + return resources.NewAccessor[*resources.ServiceResource](tools.SERVICE_RESOURCE, request) +} + +func (d *LiveService) GetResource() resources.ResourceInterface { + return &resources.ServiceResource{} +} + +func (d *LiveService) GetResourceInstance() resources.ResourceInstanceITF { + return &resources.ServiceInstance{} +} + +func (d *LiveService) SetResourceInstance(res resources.ResourceInterface, i resources.ResourceInstanceITF) resources.ResourceInterface { + r := res.(*resources.ServiceResource) + r.AddInstances(i) + return r +} diff --git a/models/models.go b/models/models.go index f64a22d..75905ee 100644 --- a/models/models.go +++ b/models/models.go @@ -46,6 +46,7 @@ var ModelsCatalog = map[string]func() utils.DBObject{ tools.PURCHASE_RESOURCE.String(): func() utils.DBObject { return &purchase_resource.PurchaseResource{} }, tools.LIVE_DATACENTER.String(): func() utils.DBObject { return &live.LiveDatacenter{} }, tools.LIVE_STORAGE.String(): func() utils.DBObject { return &live.LiveStorage{} }, + tools.LIVE_SERVICE.String(): func() utils.DBObject { return &live.LiveService{} }, tools.BILL.String(): func() utils.DBObject { return &bill.Bill{} }, tools.EXECUTION_VERIFICATION.String(): func() utils.DBObject { return &execution_verification.ExecutionVerification{} }, tools.ALLOWED_IMAGE.String(): func() utils.DBObject { return &allowed_image.AllowedImage{} }, diff --git a/models/resources/dynamic.go b/models/resources/dynamic.go new file mode 100755 index 0000000..7acdbcb --- /dev/null +++ b/models/resources/dynamic.go @@ -0,0 +1,298 @@ +package resources + +import ( + "encoding/json" + "errors" + "fmt" + "reflect" + "slices" + "strings" + + "cloud.o-forge.io/core/oc-lib/dbs" + "cloud.o-forge.io/core/oc-lib/models/common/pricing" + "cloud.o-forge.io/core/oc-lib/models/utils" + "cloud.o-forge.io/core/oc-lib/tools" +) + +/* +* DynamicResource is a struct that represents a data resource +* it defines the resource data + */ +type DynamicResource struct { + AbstractResource + Type tools.DataType `bson:"type,omitempty" json:"type,omitempty"` + Filters map[string]interface{} `bson:"filters,omitempty" json:"filters,omitempty"` + SortRules map[string]string `bson:"rules,omitempty" json:"rules,omitempty"` + + PeerIds map[int]string `bson:"peer_ids,omitempty" json:"peer_ids,omitempty"` + ResourceIds map[int]string `bson:"resource_ids,omitempty" json:"resource_ids,omitempty"` + + SelectedIndex int `bson:"selected_index,omitempty" json:"selected_index,omitempty"` + SelectedPartnershipIndex *int `bson:"selected_partnership_index,omitempty" json:"selected_partnership_index,omitempty"` + + SelectedBuyingStrategy int `bson:"selected_buying_strategy" json:"selected_buying_strategy,omitempty"` + SelectedPricingStrategy int `bson:"selected_pricing_strategy" json:"selected_pricing_strategy,omitempty"` + + Instances []ResourceInstanceITF `bson:"instances,omitempty" json:"instances,omitempty"` + WatchedDynamicResource []string `bson:"watched_dynamic_resource,omitempty" json:"watched_dynamic_resource,omitempty"` +} + +func (d *DynamicResource) GetAccessor(request *tools.APIRequest) utils.Accessor { + return nil +} + +func (d *DynamicResource) SetAllowedInstances(request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF { + d.Instances = []ResourceInstanceITF{} + for k, v := range map[tools.DataType]ResourceInterface{ + tools.COMPUTE_RESOURCE: &ComputeResource{}, + tools.DATA_RESOURCE: &DataResource{}, + tools.STORAGE_RESOURCE: &StorageResource{}, + tools.PROCESSING_RESOURCE: &ProcessingResource{}, + tools.WORKFLOW_RESOURCE: &WorkflowResource{}} { + if d.Type != k { + continue + } + access := NewAccessor[*DynamicResource](k, request) + a, _, _ := access.Search(dbs.FiltersFromFlatMap(d.Filters, v), "", false, 0, 100000) + d.PeerIds = map[int]string{} + d.ResourceIds = map[int]string{} + for _, res := range a { + for _, i := range res.(ResourceInterface).SetAllowedInstances(request, instance_id...) { + d.PeerIds[len(d.Instances)] = res.GetCreatorID() + d.ResourceIds[len(d.Instances)] = res.GetID() + d.Instances = append(d.Instances, i) + } + } + break + } + sorted := make([]ResourceInstanceITF, len(d.Instances)) + copy(sorted, d.Instances) + slices.SortStableFunc(sorted, func(a, b ResourceInstanceITF) int { + d.SortRules["partnerships"] = "%v not contains 2" + return d.compareByRules(a, b, d.SortRules) + }) + d.WatchedDynamicResource = []string{} + return d.Instances +} + +func (d *DynamicResource) AddInstances(instance ResourceInstanceITF) { + d.Instances = append(d.Instances, instance) +} + +func (d *DynamicResource) GetSelectedInstance(index *int) ResourceInstanceITF { + if len(d.Instances) == 0 { + return nil + } + for i, inst := range d.Instances { + if slices.Contains(d.WatchedDynamicResource, inst.GetID()) { + continue + } + d.WatchedDynamicResource = append(d.WatchedDynamicResource, inst.GetID()) + d.SelectedIndex = i + + for i := range inst.GetPartnerships() { + if inst.GetProfile(d.PeerIds[i], &i, &d.SelectedBuyingStrategy, &d.SelectedPricingStrategy) != nil { + d.SelectedPartnershipIndex = &i + break + } + } + if d.SelectedPartnershipIndex == nil { + continue + } + return inst + } + return nil +} + +// compareByRules orders instances so those satisfying more sort rules come first. +// When both satisfy a rule, the one with the lower first-attribute value wins (ASC strict). +// Key format: "attrA" for single-%s rules, "attrA,attrB" for two-%s rules. +func (ri *DynamicResource) compareByRules(a, b ResourceInstanceITF, rules map[string]string) int { + ma := a.Serialize(a) + mb := b.Serialize(b) + for attrs, rule := range rules { + attrPaths := strings.Split(attrs, ",") + + aOk, aFirst := ri.ruleMatchesAny(rule, attrPaths, ma) + bOk, bFirst := ri.ruleMatchesAny(rule, attrPaths, mb) + + if aOk && !bOk { + return -1 + } + if !aOk && bOk { + return 1 + } + if aOk && bOk { + if aFirst < bFirst { + return -1 + } + if aFirst > bFirst { + return 1 + } + } + } + + return 0 +} + +// ruleMatchesAny checks if any value (or combination for 2-%s rules) satisfies rule. +// Arrays at any path level are iterated. Returns (matched, firstMatchingValue). +func (ri *DynamicResource) ruleMatchesAny(rule string, attrPaths []string, m map[string]interface{}) (bool, string) { + placeholders := strings.Count(rule, "%s") + if placeholders == 0 { + return false, "" + } + valsA := ri.getVals(strings.Split(strings.TrimSpace(attrPaths[0]), "."), m) + if placeholders == 1 { + for _, v := range valsA { + if ri.byRules(rule, v) { + return true, fmt.Sprintf("%v", v) + } + } + return false, "" + } + if len(attrPaths) < 2 { + return false, "" + } + valsB := ri.getVals(strings.Split(strings.TrimSpace(attrPaths[1]), "."), m) + for _, a := range valsA { + for _, b := range valsB { + if ri.byRules(rule, a, b) { + return true, fmt.Sprintf("%v", a) + } + } + } + return false, "" +} + +// getVals navigates attrs into m, collecting all leaf values. +// At each level it detects whether the value is a dict (map) or an array and acts accordingly: +// - array of maps → recurse into each element with the remaining path +// - array of scalars (leaf) → collect all as strings +// - map → recurse with the remaining path +func (ri *DynamicResource) getVals(attrs []string, m map[string]interface{}) []interface{} { + if len(attrs) == 0 { + return nil + } + attr := attrs[0] + if attr == "" || m[attr] == nil { + return nil + } + b, err := json.Marshal(m[attr]) + if err != nil { + return nil + } + // Leaf level: detect array vs scalar. + if len(attrs) == 1 { + var arr []interface{} + if err := json.Unmarshal(b, &arr); err == nil { + results := []interface{}{} + for _, v := range arr { + results = append(results, fmt.Sprintf("%v", v)) + } + return results + } + return []interface{}{m[attr]} + } + // Intermediate level: detect array of maps vs single map. + var arrMaps []map[string]interface{} + if err := json.Unmarshal(b, &arrMaps); err == nil { + results := []interface{}{} + for _, item := range arrMaps { + results = append(results, ri.getVals(attrs[1:], item)...) + } + return results + } + nm := map[string]interface{}{} + if err := json.Unmarshal(b, &nm); err != nil { + return nil + } + return ri.getVals(attrs[1:], nm) +} + +func (ri *DynamicResource) byRules(rule string, vals ...interface{}) bool { + if len(vals) == 0 { + return false + } + formatted := fmt.Sprintf(rule, vals...) + // hm hm + switch { + case strings.Contains(rule, "not contains"): + a := strings.Split(formatted, " not contains ") + if reflect.TypeOf(vals[0]).Kind() == reflect.Map { + return vals[0].(map[string]interface{})[fmt.Sprintf("%v", a[1])] != nil + } + return strings.Contains(a[0], a[1]) + case strings.Contains(rule, "contains"): + a := strings.Split(formatted, " contains ") + if reflect.TypeOf(vals[0]).Kind() == reflect.Map { + return vals[0].(map[string]interface{})[fmt.Sprintf("%v", a[1])] != nil + } + return strings.Contains(a[0], a[1]) + case strings.Contains(rule, "<="): + a := strings.Split(formatted, " <= ") + return len(a) > 1 && a[0] <= a[1] + case strings.Contains(rule, ">="): + a := strings.Split(formatted, " >= ") + return len(a) > 1 && a[0] >= a[1] + case strings.Contains(rule, "<>"), strings.Contains(rule, "not like"): + if strings.Contains(rule, "<>") { + a := strings.Split(formatted, " <> ") + return len(a) > 1 && !strings.Contains(a[0], a[1]) && !strings.Contains(a[1], a[0]) + } + a := strings.Split(formatted, " not like ") + return len(a) > 1 && !strings.Contains(a[0], a[1]) && !strings.Contains(a[1], a[0]) + case strings.Contains(rule, "<"): + a := strings.Split(formatted, " < ") + return len(a) > 1 && a[0] < a[1] + case strings.Contains(rule, ">"): + a := strings.Split(formatted, " > ") + return len(a) > 1 && a[0] > a[1] + case strings.Contains(rule, "=="): + a := strings.Split(formatted, " == ") + return len(a) > 1 && a[0] == a[1] + case strings.Contains(rule, "!="): + a := strings.Split(formatted, " != ") + return len(a) > 1 && a[0] != a[1] + case strings.Contains(rule, "like"): + a := strings.Split(formatted, " like ") + return len(a) > 1 && (strings.Contains(a[0], a[1]) || strings.Contains(a[1], a[0])) + } + return false +} + +func (r *DynamicResource) GetType() string { + return tools.DYNAMIC_RESOURCE.String() +} + +func (abs *DynamicResource) ConvertToPricedResource(t tools.DataType, selectedInstance *int, selectedPartnership *int, selectedBuyingStrategy *int, selectedStrategy *int, selectedBookingModeIndex *int, request *tools.APIRequest) (pricing.PricedItemITF, error) { + var p pricing.PricedItemITF + var err error + for _, v := range []tools.DataType{ + tools.COMPUTE_RESOURCE, + tools.DATA_RESOURCE, + tools.STORAGE_RESOURCE, + tools.PROCESSING_RESOURCE, + tools.WORKFLOW_RESOURCE, + } { + switch v { + case tools.COMPUTE_RESOURCE: + if p, err = ConvertToPricedResource[*ComputeResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request); err == nil { + return p.(*PricedResource[*ProcessingResourcePricingProfile]), nil + } + case tools.DATA_RESOURCE: + if p, err = ConvertToPricedResource[*DataResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request); err == nil { + return p.(*PricedResource[*DataResourcePricingProfile]), nil + } + case tools.STORAGE_RESOURCE: + if p, err = ConvertToPricedResource[*StorageResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request); err == nil { + return p.(*PricedResource[*StorageResourcePricingProfile]), nil + } + case tools.PROCESSING_RESOURCE: + if p, err = ConvertToPricedResource[*ProcessingResourcePricingProfile](t, selectedInstance, selectedPartnership, selectedBuyingStrategy, selectedStrategy, selectedBookingModeIndex, abs, request); err == nil { + return p.(*PricedResource[*ProcessingResourcePricingProfile]), nil + } + } + } + return nil, errors.New("can't convert priced resource") +} diff --git a/models/resources/interfaces.go b/models/resources/interfaces.go index f840a5c..51085b3 100755 --- a/models/resources/interfaces.go +++ b/models/resources/interfaces.go @@ -42,6 +42,7 @@ type ResourceInstanceITF interface { GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string) ClearPeerGroups() + GetPartnerships() []ResourcePartnerITF GetAverageDurationS() float64 UpdateAverageDuration(actualS float64) } diff --git a/models/resources/models.go b/models/resources/models.go index dfd15da..d012dd0 100755 --- a/models/resources/models.go +++ b/models/resources/models.go @@ -14,6 +14,9 @@ type ResourceSet struct { NativeTool []string `bson:"native,omitempty" json:"native,omitempty"` Services []string `bson:"services,omitempty" json:"services,omitempty"` + // DynamicResources are stored inline — no DB collection, resolved at runtime via SetAllowedInstances. + DynamicResources []*DynamicResource `bson:"dynamic_resources,omitempty" json:"dynamic_resources,omitempty"` + DataResources []*DataResource `bson:"-" json:"data_resources,omitempty"` StorageResources []*StorageResource `bson:"-" json:"storage_resources,omitempty"` ProcessingResources []*ProcessingResource `bson:"-" json:"processing_resources,omitempty"` @@ -62,6 +65,9 @@ func (r *ResourceSet) Fill(request *tools.APIRequest) { } } } + for _, d := range r.DynamicResources { + d.SetAllowedInstances(request) + } } type ItemResource struct { @@ -72,4 +78,5 @@ type ItemResource struct { Workflow *WorkflowResource `bson:"workflow,omitempty" json:"workflow,omitempty"` NativeTool *NativeTool `bson:"native_tools,omitempty" json:"native_tools,omitempty"` Service *ServiceResource `bson:"service,omitempty" json:"service,omitempty"` + Dynamic *DynamicResource `bson:"dynamic,omitempty" json:"dynamic,omitempty"` } diff --git a/models/resources/resource.go b/models/resources/resource.go index be7008f..34c2c32 100755 --- a/models/resources/resource.go +++ b/models/resources/resource.go @@ -19,6 +19,10 @@ import ( "github.com/google/uuid" ) +func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *dbs.Filters { + return dbs.FiltersFromFlatMap(flatMap, target) +} + // AbstractResource is the struct containing all of the attributes commons to all ressources type AbstractResource struct { utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) @@ -381,6 +385,14 @@ func (ri *ResourceInstance[T]) UpdateAverageDuration(actualS float64) { ri.AverageDurationSamples++ } +func (ri *ResourceInstance[T]) GetPartnerships() []ResourcePartnerITF { + rt := []ResourcePartnerITF{} + for _, p := range ri.Partnerships { + rt = append(rt, p) + } + return rt +} + type ResourcePartnerShip[T pricing.PricingProfileITF] struct { Namespace string `json:"namespace" bson:"namespace" default:"default-namespace"` PeerGroups map[string][]string `json:"peer_groups,omitempty" bson:"peer_groups,omitempty"` diff --git a/models/resources/service.go b/models/resources/service.go index 28df93c..e5d61d6 100755 --- a/models/resources/service.go +++ b/models/resources/service.go @@ -85,8 +85,9 @@ func (abs *ServiceResource) ConvertToPricedResource(t tools.DataType, selectedIn type ServiceInstance struct { ResourceInstance[*ServiceResourcePartnership] - Mode ServiceMode `json:"mode" bson:"mode" default:"0"` - Access *ServiceResourceAccess `json:"access,omitempty" bson:"access,omitempty"` + Mode ServiceMode `json:"mode" bson:"mode" default:"0"` + Access *ServiceResourceAccess `json:"access,omitempty" bson:"access,omitempty"` + MaxConcurrent int `json:"max_concurrent,omitempty" bson:"max_concurrent,omitempty"` } func (ri *ServiceInstance) IsPeerless() bool { return false } @@ -203,7 +204,7 @@ func (a *PricedServiceResource) GetExplicitDurationInS() float64 { return a.BookingConfiguration.ExplicitBookingDurationS } if a.BookingConfiguration.UsageStart == nil || a.BookingConfiguration.UsageEnd == nil { - return (5 * time.Minute).Seconds() + return -1 // no deadline specified: open-ended } return a.BookingConfiguration.UsageEnd.Sub(*a.BookingConfiguration.UsageStart).Seconds() } diff --git a/models/workflow/graph/graph.go b/models/workflow/graph/graph.go index 3d563d7..b43fcb9 100644 --- a/models/workflow/graph/graph.go +++ b/models/workflow/graph/graph.go @@ -69,6 +69,10 @@ func (wf *Graph) IsWorkflow(item GraphItem) bool { return item.Workflow != nil } +func (wf *Graph) IsDynamic(item GraphItem) bool { + return item.Dynamic != nil +} + func (g *Graph) GetAverageTimeRelatedToProcessingActivity(processings []*resources.ProcessingResource, resource resources.ResourceInterface, f func(GraphItem) resources.ResourceInterface, instance int, partnership int, buying int, strategy int, bookingMode int, request *tools.APIRequest) (float64, float64, error) { oneIsInfinite := false diff --git a/models/workflow/graph/item.go b/models/workflow/graph/item.go index 45511c5..1ec5491 100644 --- a/models/workflow/graph/item.go +++ b/models/workflow/graph/item.go @@ -29,6 +29,8 @@ func (g *GraphItem) GetResource() (tools.DataType, resources.ResourceInterface) return tools.NATIVE_TOOL, g.NativeTool } else if g.Service != nil { return tools.SERVICE_RESOURCE, g.Service + } else if g.Dynamic != nil { + return tools.DYNAMIC_RESOURCE, g.Dynamic } return tools.INVALID, nil } @@ -40,4 +42,5 @@ func (g *GraphItem) Clear() { g.Processing = nil g.Storage = nil g.Service = nil + g.Dynamic = nil } diff --git a/models/workflow_execution/workflow_execution.go b/models/workflow_execution/workflow_execution.go index 6e7615b..62c53f1 100755 --- a/models/workflow_execution/workflow_execution.go +++ b/models/workflow_execution/workflow_execution.go @@ -176,6 +176,7 @@ use of a datacenter or storage can't be buy for permanent access. func (d *WorkflowExecution) Buy(bs pricing.BillingStrategy, executionsID string, wfID string, priceds map[tools.DataType]map[string]pricing.PricedItemITF) []*purchase_resource.PurchaseResource { purchases := d.buyEach(bs, executionsID, wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE]) purchases = append(purchases, d.buyEach(bs, executionsID, wfID, tools.DATA_RESOURCE, priceds[tools.DATA_RESOURCE])...) + purchases = append(purchases, d.buyEach(bs, executionsID, wfID, tools.SERVICE_RESOURCE, priceds[tools.SERVICE_RESOURCE])...) d.PurchasesState = map[string]bool{} for _, p := range purchases { d.PurchasesState[p.GetID()] = false @@ -205,7 +206,11 @@ func (d *WorkflowExecution) buyEach(bs pricing.BillingStrategy, executionsID str var m map[string]interface{} b, _ := json.Marshal(priced) json.Unmarshal(b, &m) - end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second) + var endDate *time.Time + if durS := priced.GetExplicitDurationInS(); durS > 0 { + e := start.Add(time.Duration(durS) * time.Second) + endDate = &e + } bookingItem := &purchase_resource.PurchaseResource{ AbstractObject: utils.AbstractObject{ UUID: uuid.New().String(), @@ -219,7 +224,7 @@ func (d *WorkflowExecution) buyEach(bs pricing.BillingStrategy, executionsID str ResourceID: priced.GetID(), InstanceID: priced.GetInstanceID(), ResourceType: dt, - EndDate: &end, + EndDate: endDate, } items = append(items, bookingItem) d.PeerBuyByGraph[priced.GetCreatorID()][itemID] = append( @@ -231,6 +236,7 @@ func (d *WorkflowExecution) buyEach(bs pricing.BillingStrategy, executionsID str func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[tools.DataType]map[string]pricing.PricedItemITF) []*booking.Booking { booking := d.bookEach(executionsID, wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE]) booking = append(booking, d.bookEach(executionsID, wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...) + booking = append(booking, d.bookEach(executionsID, wfID, tools.SERVICE_RESOURCE, priceds[tools.SERVICE_RESOURCE])...) booking = append(booking, d.bookEach(executionsID, wfID, tools.COMPUTE_RESOURCE, priceds[tools.COMPUTE_RESOURCE])...) booking = append(booking, d.bookEach(executionsID, wfID, tools.DATA_RESOURCE, priceds[tools.DATA_RESOURCE])...) for _, p := range booking { diff --git a/tools/enums.go b/tools/enums.go index 63bccae..0b07fdc 100644 --- a/tools/enums.go +++ b/tools/enums.go @@ -34,6 +34,8 @@ const ( EXECUTION_VERIFICATION ALLOWED_IMAGE SERVICE_RESOURCE + DYNAMIC_RESOURCE + LIVE_SERVICE ) var NOAPI = func() string { @@ -92,6 +94,8 @@ var InnerDefaultAPI = [...]func() string{ SCHEDULERAPI, DATACENTERAPI, CATALOGAPI, + CATALOGAPI, + DATACENTERAPI, } // Bind the standard data name to the data type @@ -120,6 +124,8 @@ var Str = [...]string{ "execution_verification", "allowed_image", "service_resource", + "dynamic_resource", + "live_service", } func FromString(comp string) int { @@ -155,7 +161,7 @@ func DataTypeList() []DataType { return []DataType{DATA_RESOURCE, PROCESSING_RESOURCE, STORAGE_RESOURCE, COMPUTE_RESOURCE, WORKFLOW_RESOURCE, WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY, ORDER, PURCHASE_RESOURCE, - LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL, EXECUTION_VERIFICATION, ALLOWED_IMAGE, SERVICE_RESOURCE} + LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL, EXECUTION_VERIFICATION, ALLOWED_IMAGE, SERVICE_RESOURCE, DYNAMIC_RESOURCE, LIVE_SERVICE} } type PropalgationMessage struct {