Compare commits
18 Commits
a0a53f0477
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 3a66b42c01 | |||
| 58e97fbe74 | |||
| 1425a31494 | |||
| 6ee169f444 | |||
| 5be3c0a10a | |||
| d9723e6431 | |||
| c726361deb | |||
| d19ff1f8b2 | |||
| 69244163b4 | |||
| 842364d145 | |||
| 9ab374b720 | |||
| aa2bca48ef | |||
| 322ea38bb4 | |||
| c1490a7746 | |||
| 49f60d9416 | |||
| 548ed84b13 | |||
| 178cd48314 | |||
| b31df8cfed |
@@ -77,6 +77,7 @@ const (
|
||||
REFUND = tools.REFUND
|
||||
DISCOUNT = tools.DISCOUNT
|
||||
SUBSCRIPTION = tools.SUBSCRIPTION
|
||||
POLICY = tools.POLICY
|
||||
)
|
||||
|
||||
func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *dbs.Filters {
|
||||
|
||||
@@ -10,14 +10,13 @@ import (
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
)
|
||||
|
||||
|
||||
/*
|
||||
* Booking is a struct that represents a booking
|
||||
*/
|
||||
type Booking struct {
|
||||
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
|
||||
|
||||
FromNano string `json:"from_nano,omitempty" bson:"priced_item,omitempty"`
|
||||
FromNano string `json:"from_nano,omitempty" bson:"from_nano,omitempty"`
|
||||
PricedItem map[string]interface{} `json:"priced_item,omitempty" bson:"priced_item,omitempty"` // We need to add the validate:"required" tag once the pricing feature is implemented, removed to avoid handling the error
|
||||
|
||||
ResumeMetrics map[string]map[string]models.MetricResume `json:"resume_metrics,omitempty" bson:"resume_metrics,omitempty"`
|
||||
@@ -147,5 +146,5 @@ func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
|
||||
}
|
||||
|
||||
func (r *Booking) CanDelete() bool {
|
||||
return r.IsDraft // only draft bookings can be deleted
|
||||
return true // only draft bookings can be deleted
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"cloud.o-forge.io/core/oc-lib/models/execution_verification"
|
||||
"cloud.o-forge.io/core/oc-lib/models/live"
|
||||
"cloud.o-forge.io/core/oc-lib/models/order"
|
||||
"cloud.o-forge.io/core/oc-lib/models/peer/policy"
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
|
||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
@@ -58,6 +59,7 @@ var ModelsCatalog = map[string]func() utils.DBObject{
|
||||
tools.SUBSCRIPTION.String(): func() utils.DBObject { return &subscription.Subscription{} },
|
||||
tools.EXECUTION_VERIFICATION.String(): func() utils.DBObject { return &execution_verification.ExecutionVerification{} },
|
||||
tools.ALLOWED_IMAGE.String(): func() utils.DBObject { return &allowed_image.AllowedImage{} },
|
||||
tools.POLICY.String(): func() utils.DBObject { return &policy.Policy{} },
|
||||
}
|
||||
|
||||
// Model returns the model object based on the model type
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
package organization
|
||||
|
||||
// Organization holds descriptive data about a peer's organization.
|
||||
// It is optional — a peer without an organization has a nil Organization field.
|
||||
type Organization struct {
|
||||
Name string `json:"name,omitempty" bson:"name,omitempty"`
|
||||
Description string `json:"description,omitempty" bson:"description,omitempty"`
|
||||
Website string `json:"website,omitempty" bson:"website,omitempty"`
|
||||
Sector string `json:"sector,omitempty" bson:"sector,omitempty"`
|
||||
Country string `json:"country,omitempty" bson:"country,omitempty"`
|
||||
}
|
||||
+31
-1
@@ -5,6 +5,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/models/organization"
|
||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
"github.com/biter777/countries"
|
||||
@@ -30,9 +31,19 @@ const (
|
||||
NANO
|
||||
PENDING_NANO
|
||||
PENDING_MASTER
|
||||
ORGANIZATION_MASTER
|
||||
ORGANIZATION_MEMBER
|
||||
ORGANIZATION_PARTNER
|
||||
ORGANIZATION_MASTER_PENDING
|
||||
ORGANIZATION_MEMBER_PENDING
|
||||
)
|
||||
|
||||
var path = []string{"known", "self", "partner", "blacklist", "pending_partner", "master", "nano", "pending_nano", "pending_master"}
|
||||
var path = []string{
|
||||
"known", "self", "partner", "blacklist", "pending_partner",
|
||||
"master", "nano", "pending_nano", "pending_master",
|
||||
"organization_master", "organization_member", "organization_partner",
|
||||
"organization_master_pending", "organization_member_pending",
|
||||
}
|
||||
|
||||
func GetRelationPath(str string) int {
|
||||
for i, p := range path {
|
||||
@@ -121,6 +132,20 @@ type Peer struct {
|
||||
// When oc-discovery fails to reach a NANO, it routes the booking to MasterID instead.
|
||||
MasterID string `json:"master_id,omitempty" bson:"master_id,omitempty"`
|
||||
|
||||
// OrganizationMasterID is the MongoDB _id of the peer acting as this node's
|
||||
// organization master. Set automatically when an ORGANIZATION_MASTER relation
|
||||
// is validated (equivalent of MasterID for the Nano/Master hierarchy).
|
||||
OrganizationMasterID string `json:"organization_master_id,omitempty" bson:"organization_master_id,omitempty"`
|
||||
|
||||
// Organization holds optional descriptive data about the peer's organization.
|
||||
// Null when the peer has not registered any organization data.
|
||||
Organization *organization.Organization `json:"organization,omitempty" bson:"organization,omitempty"`
|
||||
|
||||
// PolicyID references the Policy document that governs which inbound
|
||||
// libp2p streams are authorized for this peer.
|
||||
// When empty, all non-vital streams are denied by default.
|
||||
PolicyID string `json:"policy_id,omitempty" bson:"policy_id,omitempty"`
|
||||
|
||||
// Volatile connectivity state — never persisted to DB (bson:"-").
|
||||
// Set in-memory by oc-peer when it receives a PEER_OBSERVE_RESPONSE_EVENT.
|
||||
// Considered offline when LastHeartbeat is older than 60 s (30 s interval + 30 s grace).
|
||||
@@ -137,6 +162,11 @@ func (ri *Peer) Extend(typ ...string) map[string][]tools.DataType {
|
||||
ext[t] = []tools.DataType{}
|
||||
}
|
||||
ext[t] = append(ext[t], tools.PEER)
|
||||
case "policy":
|
||||
if _, ok := ext[t]; !ok {
|
||||
ext[t] = []tools.DataType{}
|
||||
}
|
||||
ext[t] = append(ext[t], tools.POLICY)
|
||||
}
|
||||
}
|
||||
return ext
|
||||
|
||||
@@ -0,0 +1,30 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
)
|
||||
|
||||
// Policy defines which inbound libp2p streams are authorized for a peer.
|
||||
// Vital streams (planner, considers, minio/admiralty config, source-presign,
|
||||
// verify, observe, heartbeat) are always allowed regardless of policy.
|
||||
type Policy struct {
|
||||
utils.AbstractObject
|
||||
|
||||
// Resource CRUD
|
||||
AllowSearch bool `json:"allow_search" bson:"allow_search"`
|
||||
AllowCreate bool `json:"allow_create" bson:"allow_create"`
|
||||
AllowUpdate bool `json:"allow_update" bson:"allow_update"`
|
||||
AllowDelete bool `json:"allow_delete" bson:"allow_delete"`
|
||||
|
||||
// Resource freshness tracking
|
||||
AllowRegisterWatcher bool `json:"allow_register_watcher" bson:"allow_register_watcher"`
|
||||
AllowUnregisterWatcher bool `json:"allow_unregister_watcher" bson:"allow_unregister_watcher"`
|
||||
|
||||
// Organization partner confirmation
|
||||
AllowOrgPartnerConfirm bool `json:"allow_org_partner_confirm" bson:"allow_org_partner_confirm"`
|
||||
}
|
||||
|
||||
func (p *Policy) GetAccessor(request *tools.APIRequest) utils.Accessor {
|
||||
return NewAccessor(request)
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
package policy
|
||||
|
||||
import (
|
||||
"cloud.o-forge.io/core/oc-lib/dbs"
|
||||
"cloud.o-forge.io/core/oc-lib/logs"
|
||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
)
|
||||
|
||||
type policyMongoAccessor struct {
|
||||
utils.AbstractAccessor[*Policy]
|
||||
}
|
||||
|
||||
func NewAccessor(request *tools.APIRequest) *policyMongoAccessor {
|
||||
return &policyMongoAccessor{
|
||||
AbstractAccessor: utils.AbstractAccessor[*Policy]{
|
||||
Logger: logs.CreateLogger(tools.POLICY.String()),
|
||||
Request: request,
|
||||
Type: tools.POLICY,
|
||||
New: func() *Policy { return &Policy{} },
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (a *policyMongoAccessor) GetObjectFilters(search string) *dbs.Filters {
|
||||
return &dbs.Filters{
|
||||
Or: map[string][]dbs.Filter{
|
||||
"abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,14 @@
|
||||
package resources
|
||||
|
||||
// Consent represents a consent request attached to a resource.
|
||||
// ConsentString is the question displayed to the user.
|
||||
// Optional, when true, means the user may decline without blocking scheduling.
|
||||
// A nil Optional is treated as required (false).
|
||||
type Consent struct {
|
||||
ConsentString string `json:"consent_string" bson:"consent_string"`
|
||||
Optional *bool `json:"optional,omitempty" bson:"optional,omitempty"`
|
||||
}
|
||||
|
||||
func (c Consent) IsOptional() bool {
|
||||
return c.Optional != nil && *c.Optional
|
||||
}
|
||||
+225
-19
@@ -21,7 +21,7 @@ import (
|
||||
type DynamicResource struct {
|
||||
AbstractResource
|
||||
Type tools.DataType `bson:"type,omitempty" json:"type,omitempty"`
|
||||
Filters map[string]interface{} `bson:"filters,omitempty" json:"filters,omitempty"`
|
||||
Filters dbs.Filters `bson:"filters,omitempty" json:"filters,omitempty"`
|
||||
SortRules map[string]string `bson:"rules,omitempty" json:"rules,omitempty"`
|
||||
|
||||
PeerIds map[int]string `bson:"peer_ids,omitempty" json:"peer_ids,omitempty"`
|
||||
@@ -37,45 +37,249 @@ type DynamicResource struct {
|
||||
WatchedDynamicResource []string `bson:"watched_dynamic_resource,omitempty" json:"watched_dynamic_resource,omitempty"`
|
||||
}
|
||||
|
||||
// WorkspaceCandidatesProvider can be set by the workspace package to supply
|
||||
// contextual workspace resources for a given DataType and request without
|
||||
// creating a circular import (workspace → resources → workspace).
|
||||
// When set, SetAllowedInstances uses workspace-scoped resources instead of
|
||||
// the full catalog for requests that carry a username.
|
||||
var WorkspaceCandidatesProvider func(dt tools.DataType, request *tools.APIRequest) []ResourceInterface
|
||||
|
||||
func (d *DynamicResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DynamicResource) SetAllowedInstances(request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF {
|
||||
d.Instances = []ResourceInstanceITF{}
|
||||
for k, v := range map[tools.DataType]ResourceInterface{
|
||||
tools.COMPUTE_RESOURCE: &ComputeResource{},
|
||||
tools.DATA_RESOURCE: &DataResource{},
|
||||
tools.STORAGE_RESOURCE: &StorageResource{},
|
||||
tools.PROCESSING_RESOURCE: &ProcessingResource{},
|
||||
tools.WORKFLOW_RESOURCE: &WorkflowResource{}} {
|
||||
if d.Type != k {
|
||||
continue
|
||||
if WorkspaceCandidatesProvider != nil {
|
||||
candidates := WorkspaceCandidatesProvider(d.Type, request)
|
||||
return d.SetAllowedInstancesFromSet(candidates, request, instance_id...)
|
||||
}
|
||||
access := NewAccessor[*DynamicResource](k, request)
|
||||
a, _, _ := access.Search(dbs.FiltersFromFlatMap(d.Filters, v), "", false, 0, 100000)
|
||||
fmt.Println(a, dbs.FiltersFromFlatMap(d.Filters, v), d.Filters, v)
|
||||
d.Instances = []ResourceInstanceITF{}
|
||||
d.sortAndResetInstances()
|
||||
return d.Instances
|
||||
}
|
||||
|
||||
// SetAllowedInstancesFromSet fills d.Instances from a pre-loaded workspace resource set
|
||||
// instead of querying the catalog. Filters are applied in-memory against the candidates.
|
||||
// Called by WorkspaceResourceSet.Fill so dynamic resources only see workspace-scoped resources.
|
||||
func (d *DynamicResource) SetAllowedInstancesFromSet(candidates []ResourceInterface, request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF {
|
||||
d.Instances = []ResourceInstanceITF{}
|
||||
d.PeerIds = map[int]string{}
|
||||
d.ResourceIds = map[int]string{}
|
||||
for _, res := range a {
|
||||
for _, i := range res.(ResourceInterface).SetAllowedInstances(request, instance_id...) {
|
||||
for _, res := range candidates {
|
||||
if !d.matchesFilters(res) {
|
||||
continue
|
||||
}
|
||||
for _, i := range res.SetAllowedInstances(request, instance_id...) {
|
||||
d.PeerIds[len(d.Instances)] = res.GetCreatorID()
|
||||
d.ResourceIds[len(d.Instances)] = res.GetID()
|
||||
d.Instances = append(d.Instances, i)
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
d.sortAndResetInstances()
|
||||
return d.Instances
|
||||
}
|
||||
|
||||
func (d *DynamicResource) sortAndResetInstances() {
|
||||
if d.SortRules != nil {
|
||||
sorted := make([]ResourceInstanceITF, len(d.Instances))
|
||||
copy(sorted, d.Instances)
|
||||
slices.SortStableFunc(sorted, func(a, b ResourceInstanceITF) int {
|
||||
d.SortRules["partnerships"] = "%v not contains 2"
|
||||
return d.compareByRules(a, b, d.SortRules)
|
||||
})
|
||||
d.Instances = sorted
|
||||
}
|
||||
d.WatchedDynamicResource = []string{}
|
||||
return d.Instances
|
||||
}
|
||||
|
||||
// matchesFilters applies d.Filters in-memory against a serialized resource.
|
||||
// Keys in d.Filters are JSON tag names; Serialize returns JSON tag names — no bson conversion needed.
|
||||
func (d *DynamicResource) matchesFilters(res ResourceInterface) bool {
|
||||
if len(d.Filters.And) == 0 && len(d.Filters.Or) == 0 {
|
||||
return true
|
||||
}
|
||||
m := res.Serialize(res)
|
||||
for field, fs := range d.Filters.And {
|
||||
vals := nestedVals(m, strings.Split(field, "."))
|
||||
for _, f := range fs {
|
||||
if !anyMatchesOp(vals, f) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(d.Filters.Or) > 0 {
|
||||
matched := false
|
||||
for field, fs := range d.Filters.Or {
|
||||
vals := nestedVals(m, strings.Split(field, "."))
|
||||
for _, f := range fs {
|
||||
if anyMatchesOp(vals, f) {
|
||||
matched = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if matched {
|
||||
break
|
||||
}
|
||||
}
|
||||
if !matched {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// nestedVals navigates a dot-path into m and collects all leaf values.
|
||||
// Arrays at any level are expanded: each element is recursed into.
|
||||
func nestedVals(m map[string]interface{}, path []string) []interface{} {
|
||||
if len(path) == 0 || m == nil {
|
||||
return nil
|
||||
}
|
||||
val, ok := m[path[0]]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
if len(path) == 1 {
|
||||
if arr, ok := val.([]interface{}); ok {
|
||||
return arr
|
||||
}
|
||||
return []interface{}{val}
|
||||
}
|
||||
rest := path[1:]
|
||||
switch v := val.(type) {
|
||||
case map[string]interface{}:
|
||||
return nestedVals(v, rest)
|
||||
case []interface{}:
|
||||
var out []interface{}
|
||||
for _, elem := range v {
|
||||
if em, ok := elem.(map[string]interface{}); ok {
|
||||
out = append(out, nestedVals(em, rest)...)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// anyMatchesOp returns true if at least one value in vals satisfies filter f.
|
||||
func anyMatchesOp(vals []interface{}, f dbs.Filter) bool {
|
||||
if f.Operator == dbs.EXISTS.String() {
|
||||
exists := len(vals) > 0 && vals[0] != nil
|
||||
want := true
|
||||
if b, ok := f.Value.(bool); ok {
|
||||
want = b
|
||||
}
|
||||
return exists == want
|
||||
}
|
||||
if f.Operator == dbs.IN.String() {
|
||||
list, ok := f.Value.([]interface{})
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
for _, v := range vals {
|
||||
sv := fmt.Sprintf("%v", v)
|
||||
for _, item := range list {
|
||||
if sv == fmt.Sprintf("%v", item) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
for _, v := range vals {
|
||||
if opMatches(v, f) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func opMatches(val interface{}, f dbs.Filter) bool {
|
||||
switch f.Operator {
|
||||
case dbs.EQUAL.String():
|
||||
return fmt.Sprintf("%v", val) == fmt.Sprintf("%v", f.Value)
|
||||
case dbs.NOT.String():
|
||||
return fmt.Sprintf("%v", val) != fmt.Sprintf("%v", f.Value)
|
||||
case dbs.LIKE.String():
|
||||
return strings.Contains(strings.ToLower(fmt.Sprintf("%v", val)), strings.ToLower(fmt.Sprintf("%v", f.Value)))
|
||||
case dbs.GT.String(), dbs.GTE.String(), dbs.LT.String(), dbs.LTE.String():
|
||||
return numericCmp(val, f.Value, f.Operator)
|
||||
case dbs.ELEMMATCH.String():
|
||||
arr, ok := val.([]interface{})
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
sub, ok := f.Value.(map[string]interface{})
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
for _, elem := range arr {
|
||||
em, ok := elem.(map[string]interface{})
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
allOk := true
|
||||
for k, sv := range sub {
|
||||
if fmt.Sprintf("%v", em[k]) != fmt.Sprintf("%v", sv) {
|
||||
allOk = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if allOk {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func numericCmp(a, b interface{}, op string) bool {
|
||||
fa, aOk := toFloat64(a)
|
||||
fb, bOk := toFloat64(b)
|
||||
if !aOk || !bOk {
|
||||
sa, sb := fmt.Sprintf("%v", a), fmt.Sprintf("%v", b)
|
||||
switch op {
|
||||
case dbs.GT.String():
|
||||
return sa > sb
|
||||
case dbs.GTE.String():
|
||||
return sa >= sb
|
||||
case dbs.LT.String():
|
||||
return sa < sb
|
||||
case dbs.LTE.String():
|
||||
return sa <= sb
|
||||
}
|
||||
return false
|
||||
}
|
||||
switch op {
|
||||
case dbs.GT.String():
|
||||
return fa > fb
|
||||
case dbs.GTE.String():
|
||||
return fa >= fb
|
||||
case dbs.LT.String():
|
||||
return fa < fb
|
||||
case dbs.LTE.String():
|
||||
return fa <= fb
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func toFloat64(v interface{}) (float64, bool) {
|
||||
switch n := v.(type) {
|
||||
case float64:
|
||||
return n, true
|
||||
case float32:
|
||||
return float64(n), true
|
||||
case int:
|
||||
return float64(n), true
|
||||
case int32:
|
||||
return float64(n), true
|
||||
case int64:
|
||||
return float64(n), true
|
||||
}
|
||||
return 0, false
|
||||
}
|
||||
|
||||
|
||||
func (d *DynamicResource) AddInstances(instance ResourceInstanceITF) {
|
||||
d.Instances = append(d.Instances, instance)
|
||||
}
|
||||
@@ -92,13 +296,15 @@ func (d *DynamicResource) GetSelectedInstance(index *int) ResourceInstanceITF {
|
||||
d.SelectedIndex = i
|
||||
|
||||
for i := range inst.GetPartnerships() {
|
||||
fmt.Println(inst.GetProfile(d.PeerIds[i], &i, &d.SelectedBuyingStrategy, &d.SelectedPricingStrategy), d.PeerIds[i], &i, &d.SelectedBuyingStrategy, &d.SelectedPricingStrategy)
|
||||
if inst.GetProfile(d.PeerIds[i], &i, &d.SelectedBuyingStrategy, &d.SelectedPricingStrategy) != nil {
|
||||
d.SelectedPartnershipIndex = &i
|
||||
break
|
||||
}
|
||||
}
|
||||
if d.SelectedPartnershipIndex == nil {
|
||||
continue
|
||||
i := 0
|
||||
d.SelectedPartnershipIndex = &i
|
||||
}
|
||||
return inst
|
||||
}
|
||||
|
||||
@@ -29,6 +29,8 @@ type ResourceInterface interface {
|
||||
GetEnv() []models.Param
|
||||
GetInputs() []models.Param
|
||||
GetOutputs() []models.Param
|
||||
GetExploitationAuthorizations() []ExploitationAuthorization
|
||||
GetConsents() []Consent
|
||||
}
|
||||
|
||||
type ResourceInstanceITF interface {
|
||||
|
||||
+125
-1
@@ -15,7 +15,8 @@ type ResourceSet struct {
|
||||
Services []string `bson:"services,omitempty" json:"services,omitempty"`
|
||||
Dynamics []string `bson:"dynamics,omitempty" json:"dynamics,omitempty"`
|
||||
|
||||
// DynamicResources are stored inline — no DB collection, resolved at runtime via SetAllowedInstances.
|
||||
// Runtime-only resource objects — not persisted. Populated by Fill() from the ID lists above.
|
||||
// Use WorkspaceResourceSet when full object persistence is needed (workspace fluid catalog).
|
||||
DynamicResources []*DynamicResource `bson:"-" json:"dynamic_resources,omitempty"`
|
||||
DataResources []*DataResource `bson:"-" json:"data_resources,omitempty"`
|
||||
StorageResources []*StorageResource `bson:"-" json:"storage_resources,omitempty"`
|
||||
@@ -26,6 +27,129 @@ type ResourceSet struct {
|
||||
ServiceResources []*ServiceResource `bson:"-" json:"service_resources,omitempty"`
|
||||
}
|
||||
|
||||
// WorkspaceResourceSet mirrors ResourceSet but persists complete resource objects to MongoDB.
|
||||
// Use this in workspace documents where the workspace acts as a fluid resource catalog.
|
||||
// The *Resource fields are loaded from bson on read; Fill() skips catalog lookup when they are
|
||||
// already populated.
|
||||
type WorkspaceResourceSet struct {
|
||||
Datas []string `bson:"datas,omitempty" json:"datas,omitempty"`
|
||||
Storages []string `bson:"storages,omitempty" json:"storages,omitempty"`
|
||||
Processings []string `bson:"processings,omitempty" json:"processings,omitempty"`
|
||||
Computes []string `bson:"computes,omitempty" json:"computes,omitempty"`
|
||||
Workflows []string `bson:"workflows,omitempty" json:"workflows,omitempty"`
|
||||
NativeTool []string `bson:"native,omitempty" json:"native,omitempty"`
|
||||
Services []string `bson:"services,omitempty" json:"services,omitempty"`
|
||||
Dynamics []string `bson:"dynamics,omitempty" json:"dynamics,omitempty"`
|
||||
|
||||
DynamicResources []*DynamicResource `bson:"dynamic_resources,omitempty" json:"dynamic_resources,omitempty"`
|
||||
DataResources []*DataResource `bson:"data_resources,omitempty" json:"data_resources,omitempty"`
|
||||
StorageResources []*StorageResource `bson:"storage_resources,omitempty" json:"storage_resources,omitempty"`
|
||||
ProcessingResources []*ProcessingResource `bson:"processing_resources,omitempty" json:"processing_resources,omitempty"`
|
||||
ComputeResources []*ComputeResource `bson:"compute_resources,omitempty" json:"compute_resources,omitempty"`
|
||||
WorkflowResources []*WorkflowResource `bson:"workflow_resources,omitempty" json:"workflow_resources,omitempty"`
|
||||
NativeTools []*NativeTool `bson:"native_tools,omitempty" json:"native_tools,omitempty"`
|
||||
ServiceResources []*ServiceResource `bson:"service_resources,omitempty" json:"service_resources,omitempty"`
|
||||
}
|
||||
|
||||
func (r *WorkspaceResourceSet) Clear() {
|
||||
r.DataResources = nil
|
||||
r.StorageResources = nil
|
||||
r.ProcessingResources = nil
|
||||
r.ComputeResources = nil
|
||||
r.WorkflowResources = nil
|
||||
r.ServiceResources = nil
|
||||
r.DynamicResources = nil
|
||||
r.NativeTools = nil
|
||||
}
|
||||
|
||||
// Fill populates *Resource fields from their ID lists. When a field is already non-nil
|
||||
// (loaded from the workspace MongoDB document), the catalog lookup is skipped for that type.
|
||||
func (r *WorkspaceResourceSet) Fill(request *tools.APIRequest) {
|
||||
if r.DataResources == nil {
|
||||
for _, id := range r.Datas {
|
||||
if d, _, e := (&DataResource{}).GetAccessor(request).LoadOne(id); e == nil {
|
||||
r.DataResources = append(r.DataResources, d.(*DataResource))
|
||||
}
|
||||
}
|
||||
}
|
||||
if r.ComputeResources == nil {
|
||||
for _, id := range r.Computes {
|
||||
if d, _, e := (&ComputeResource{}).GetAccessor(request).LoadOne(id); e == nil {
|
||||
r.ComputeResources = append(r.ComputeResources, d.(*ComputeResource))
|
||||
}
|
||||
}
|
||||
}
|
||||
if r.StorageResources == nil {
|
||||
for _, id := range r.Storages {
|
||||
if d, _, e := (&StorageResource{}).GetAccessor(request).LoadOne(id); e == nil {
|
||||
r.StorageResources = append(r.StorageResources, d.(*StorageResource))
|
||||
}
|
||||
}
|
||||
}
|
||||
if r.ProcessingResources == nil {
|
||||
for _, id := range r.Processings {
|
||||
if d, _, e := (&ProcessingResource{}).GetAccessor(request).LoadOne(id); e == nil {
|
||||
r.ProcessingResources = append(r.ProcessingResources, d.(*ProcessingResource))
|
||||
}
|
||||
}
|
||||
}
|
||||
if r.WorkflowResources == nil {
|
||||
for _, id := range r.Workflows {
|
||||
if d, _, e := (&WorkflowResource{}).GetAccessor(request).LoadOne(id); e == nil {
|
||||
r.WorkflowResources = append(r.WorkflowResources, d.(*WorkflowResource))
|
||||
}
|
||||
}
|
||||
}
|
||||
if r.ServiceResources == nil {
|
||||
for _, id := range r.Services {
|
||||
if d, _, e := (&ServiceResource{}).GetAccessor(request).LoadOne(id); e == nil {
|
||||
r.ServiceResources = append(r.ServiceResources, d.(*ServiceResource))
|
||||
}
|
||||
}
|
||||
}
|
||||
if r.DynamicResources == nil {
|
||||
for _, id := range r.Dynamics {
|
||||
if d, _, e := (&DynamicResource{}).GetAccessor(request).LoadOne(id); e == nil {
|
||||
r.DynamicResources = append(r.DynamicResources, d.(*DynamicResource))
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, d := range r.DynamicResources {
|
||||
var candidates []ResourceInterface
|
||||
switch d.Type {
|
||||
case tools.COMPUTE_RESOURCE:
|
||||
for _, c := range r.ComputeResources {
|
||||
candidates = append(candidates, c)
|
||||
}
|
||||
case tools.DATA_RESOURCE:
|
||||
for _, c := range r.DataResources {
|
||||
candidates = append(candidates, c)
|
||||
}
|
||||
case tools.STORAGE_RESOURCE:
|
||||
for _, c := range r.StorageResources {
|
||||
candidates = append(candidates, c)
|
||||
}
|
||||
case tools.PROCESSING_RESOURCE:
|
||||
for _, c := range r.ProcessingResources {
|
||||
candidates = append(candidates, c)
|
||||
}
|
||||
case tools.WORKFLOW_RESOURCE:
|
||||
for _, c := range r.WorkflowResources {
|
||||
candidates = append(candidates, c)
|
||||
}
|
||||
case tools.SERVICE_RESOURCE:
|
||||
for _, c := range r.ServiceResources {
|
||||
candidates = append(candidates, c)
|
||||
}
|
||||
}
|
||||
if len(candidates) > 0 {
|
||||
d.SetAllowedInstancesFromSet(candidates, request)
|
||||
} else {
|
||||
d.SetAllowedInstances(request)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ResourceSet) Clear() {
|
||||
r.DataResources = nil
|
||||
r.StorageResources = nil
|
||||
|
||||
@@ -50,6 +50,10 @@ type AbstractResource struct {
|
||||
// NOT in a separate collection.
|
||||
// Visibility-filtered per requesting peer before any response is sent.
|
||||
ExploitationAuthorizations []ExploitationAuthorization `json:"exploitation_authorizations,omitempty" bson:"exploitation_authorizations,omitempty"`
|
||||
|
||||
// Consents lists the consent questions the user must acknowledge before
|
||||
// scheduling this resource. Consents with Optional=true may be skipped.
|
||||
Consents []Consent `json:"consents,omitempty" bson:"consents,omitempty"`
|
||||
}
|
||||
|
||||
func (ri *AbstractResource) Extend(typ ...string) map[string][]tools.DataType {
|
||||
@@ -100,6 +104,11 @@ func (r *AbstractResource) GetExploitationAuthorizations() []ExploitationAuthori
|
||||
return r.ExploitationAuthorizations
|
||||
}
|
||||
|
||||
// GetConsents returns the consent questions declared by this resource.
|
||||
func (r *AbstractResource) GetConsents() []Consent {
|
||||
return r.Consents
|
||||
}
|
||||
|
||||
// FilterExploitationAuthorizations removes AEs that are not visible to peerID.
|
||||
// Must be called before serializing the resource for a consumer peer.
|
||||
// The resource owner (CreatorID) always sees all AEs unfiltered.
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
package resources
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"slices"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/config"
|
||||
"cloud.o-forge.io/core/oc-lib/dbs"
|
||||
"cloud.o-forge.io/core/oc-lib/logs"
|
||||
"cloud.o-forge.io/core/oc-lib/models/common/models"
|
||||
@@ -110,6 +112,40 @@ func (dca *ResourceMongoAccessor[T]) LoadOne(id string) (utils.DBObject, int, er
|
||||
return data, code, err
|
||||
}
|
||||
|
||||
var workspaceResourceTypes = []tools.DataType{
|
||||
tools.COMPUTE_RESOURCE,
|
||||
tools.DATA_RESOURCE,
|
||||
tools.PROCESSING_RESOURCE,
|
||||
tools.STORAGE_RESOURCE,
|
||||
tools.WORKFLOW_RESOURCE,
|
||||
tools.SERVICE_RESOURCE,
|
||||
}
|
||||
|
||||
func emitResourceNATS(method tools.NATSMethod, dt tools.DataType, payload []byte) {
|
||||
if !slices.Contains(workspaceResourceTypes, dt) {
|
||||
return
|
||||
}
|
||||
tools.NewNATSCaller().SetNATSPub(method, tools.NATSResponse{
|
||||
FromApp: config.GetAppName(),
|
||||
Datatype: dt,
|
||||
Method: int(method),
|
||||
Payload: payload,
|
||||
})
|
||||
}
|
||||
|
||||
func (dca *ResourceMongoAccessor[T]) DeleteOne(id string) (utils.DBObject, int, error) {
|
||||
data, code, err := dca.AbstractAccessor.LoadOne(id)
|
||||
if err != nil {
|
||||
return data, code, err
|
||||
}
|
||||
res, code, err := dca.AbstractAccessor.DeleteOne(id)
|
||||
if err == nil && data != nil {
|
||||
b, _ := json.Marshal(data)
|
||||
go emitResourceNATS(tools.REMOVE_RESOURCE, dca.GetType(), b)
|
||||
}
|
||||
return res, code, err
|
||||
}
|
||||
|
||||
func (dca *ResourceMongoAccessor[T]) UpdateOne(set map[string]interface{}, id string) (utils.DBObject, int, error) {
|
||||
if dca.GetType() == tools.COMPUTE_RESOURCE {
|
||||
delete(set, "architecture")
|
||||
@@ -193,9 +229,119 @@ func (dca *ResourceMongoAccessor[T]) StoreOne(data utils.DBObject) (utils.DBObje
|
||||
"resources_id": idsToUpdate,
|
||||
}, i)
|
||||
}
|
||||
if err == nil && res != nil {
|
||||
b, _ := json.Marshal(res)
|
||||
go emitResourceNATS(tools.CREATE_RESOURCE, dca.GetType(), b)
|
||||
}
|
||||
return res, code, err
|
||||
}
|
||||
|
||||
// PurgedResourcePayload holds a silently-deleted resource's type and serialized payload.
|
||||
type PurgedResourcePayload struct {
|
||||
DT tools.DataType
|
||||
Payload []byte
|
||||
}
|
||||
|
||||
// purgeByType searches and silently deletes all resources of type T created by creatorID.
|
||||
// Uses AbstractAccessor.DeleteOne directly to bypass the NATS-emitting override.
|
||||
func purgeByType[T ResourceInterface](dt tools.DataType, creatorID string) []PurgedResourcePayload {
|
||||
a := NewAccessor[T](dt, nil)
|
||||
if a == nil {
|
||||
return nil
|
||||
}
|
||||
res, _, _ := a.Search(&dbs.Filters{
|
||||
And: map[string][]dbs.Filter{
|
||||
"creator_id": {{Operator: dbs.EQUAL.String(), Value: creatorID}},
|
||||
},
|
||||
}, "", false, 0, 10000)
|
||||
var result []PurgedResourcePayload
|
||||
for _, item := range res {
|
||||
b, err := json.Marshal(item)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
a.AbstractAccessor.DeleteOne(item.GetID())
|
||||
result = append(result, PurgedResourcePayload{DT: dt, Payload: b})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// PurgeCreatorResources deletes all catalog resources created by creatorPeerID from
|
||||
// the DB without emitting NATS. Used for non-blacklist peer privilege downgrades where
|
||||
// workspace state should be left untouched.
|
||||
func PurgeCreatorResources(creatorPeerID string) []PurgedResourcePayload {
|
||||
var result []PurgedResourcePayload
|
||||
result = append(result, purgeByType[*ComputeResource](tools.COMPUTE_RESOURCE, creatorPeerID)...)
|
||||
result = append(result, purgeByType[*DataResource](tools.DATA_RESOURCE, creatorPeerID)...)
|
||||
result = append(result, purgeByType[*ProcessingResource](tools.PROCESSING_RESOURCE, creatorPeerID)...)
|
||||
result = append(result, purgeByType[*StorageResource](tools.STORAGE_RESOURCE, creatorPeerID)...)
|
||||
result = append(result, purgeByType[*WorkflowResource](tools.WORKFLOW_RESOURCE, creatorPeerID)...)
|
||||
result = append(result, purgeByType[*ServiceResource](tools.SERVICE_RESOURCE, creatorPeerID)...)
|
||||
return result
|
||||
}
|
||||
|
||||
// FilterMapFromResourcePayload deserializes a resource payload by DataType, zeros out
|
||||
// the AbstractInstanciatedResource (and its AbstractResource / Instances sub-fields),
|
||||
// then marshals back to get only the concrete type's own JSON fields.
|
||||
// Returns nil for WORKFLOW_RESOURCE and unknown types.
|
||||
// JSON keys only — not BSON paths.
|
||||
func FilterMapFromResourcePayload(dt tools.DataType, payload []byte) map[string]interface{} {
|
||||
var m map[string]interface{}
|
||||
switch dt {
|
||||
case tools.COMPUTE_RESOURCE:
|
||||
var r ComputeResource
|
||||
if json.Unmarshal(payload, &r) != nil {
|
||||
return nil
|
||||
}
|
||||
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*ComputeResourceInstance]{}
|
||||
b, _ := json.Marshal(r)
|
||||
json.Unmarshal(b, &m)
|
||||
case tools.DATA_RESOURCE:
|
||||
var r DataResource
|
||||
if json.Unmarshal(payload, &r) != nil {
|
||||
return nil
|
||||
}
|
||||
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*DataInstance]{}
|
||||
b, _ := json.Marshal(r)
|
||||
json.Unmarshal(b, &m)
|
||||
case tools.PROCESSING_RESOURCE:
|
||||
var r ProcessingResource
|
||||
if json.Unmarshal(payload, &r) != nil {
|
||||
return nil
|
||||
}
|
||||
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*ProcessingInstance]{}
|
||||
b, _ := json.Marshal(r)
|
||||
json.Unmarshal(b, &m)
|
||||
case tools.STORAGE_RESOURCE:
|
||||
var r StorageResource
|
||||
if json.Unmarshal(payload, &r) != nil {
|
||||
return nil
|
||||
}
|
||||
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*StorageResourceInstance]{}
|
||||
b, _ := json.Marshal(r)
|
||||
json.Unmarshal(b, &m)
|
||||
case tools.SERVICE_RESOURCE:
|
||||
var r ServiceResource
|
||||
if json.Unmarshal(payload, &r) != nil {
|
||||
return nil
|
||||
}
|
||||
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*ServiceInstance]{}
|
||||
b, _ := json.Marshal(r)
|
||||
json.Unmarshal(b, &m)
|
||||
case tools.WORKFLOW_RESOURCE:
|
||||
var r WorkflowResource
|
||||
if json.Unmarshal(payload, &r) != nil {
|
||||
return nil
|
||||
}
|
||||
r.AbstractResource = AbstractResource{}
|
||||
b, _ := json.Marshal(r)
|
||||
json.Unmarshal(b, &m)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func (dca *ResourceMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
|
||||
return dca.StoreOne(data)
|
||||
}
|
||||
|
||||
@@ -150,7 +150,7 @@ func (ao *AbstractObject) UpToDate(user string, peer string, create bool) {
|
||||
ao.UpdateDate = time.Now()
|
||||
ao.UpdaterID = peer
|
||||
ao.UserUpdaterID = user
|
||||
if create && ao.CreatorID != "" {
|
||||
if create && ao.CreatorID == "" {
|
||||
ao.CreationDate = time.Now()
|
||||
ao.CreatorID = peer
|
||||
ao.UserCreatorID = user
|
||||
|
||||
+82
-3
@@ -129,9 +129,7 @@ func ModelGenericUpdateOne(change map[string]interface{}, id string, a Accessor)
|
||||
}
|
||||
|
||||
loaded := r.Serialize(r) // get the loaded object
|
||||
for k, v := range change { // apply the changes, with a flatten method
|
||||
loaded[k] = v
|
||||
}
|
||||
deepMerge(loaded, change)
|
||||
newObj := a.NewObj()
|
||||
b, err = json.Marshal(loaded)
|
||||
if err != nil {
|
||||
@@ -255,6 +253,87 @@ func IsMySelf(peerID string, wfa Accessor) (bool, string) {
|
||||
return peerID == pp.GetID(), pp.GetID()
|
||||
}
|
||||
|
||||
// deepMerge overlays patch values onto base, preserving base values for keys
|
||||
// absent from patch, nil patch values, and empty strings when base is non-empty.
|
||||
// This prevents partial frontend payloads from silently erasing server-managed
|
||||
// fields (source, env, country, owners, creator_id, creation_date, …).
|
||||
func deepMerge(base, patch map[string]interface{}) {
|
||||
for k, pv := range patch {
|
||||
bv := base[k]
|
||||
switch pvTyped := pv.(type) {
|
||||
case map[string]interface{}:
|
||||
if bvMap, ok := bv.(map[string]interface{}); ok {
|
||||
deepMerge(bvMap, pvTyped)
|
||||
} else {
|
||||
base[k] = pv
|
||||
}
|
||||
case []interface{}:
|
||||
if bvSlice, ok := bv.([]interface{}); ok {
|
||||
base[k] = mergeSlices(bvSlice, pvTyped)
|
||||
} else {
|
||||
base[k] = pv
|
||||
}
|
||||
case string:
|
||||
// Don't overwrite a non-empty base value with an empty string.
|
||||
if pvTyped != "" {
|
||||
base[k] = pv
|
||||
}
|
||||
default:
|
||||
if pv != nil {
|
||||
base[k] = pv
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// mergeSlices merges two slices element-wise.
|
||||
// For slices of maps it matches elements by their "id" field when available;
|
||||
// falls back to positional matching. An empty patch slice leaves base intact.
|
||||
func mergeSlices(base, patch []interface{}) []interface{} {
|
||||
if len(patch) == 0 {
|
||||
return base
|
||||
}
|
||||
for _, e := range patch {
|
||||
if _, ok := e.(map[string]interface{}); !ok {
|
||||
return patch // non-map elements: replace wholesale
|
||||
}
|
||||
}
|
||||
baseByID := map[string]map[string]interface{}{}
|
||||
for _, e := range base {
|
||||
if em, ok := e.(map[string]interface{}); ok {
|
||||
if id, ok := em["id"].(string); ok && id != "" {
|
||||
baseByID[id] = em
|
||||
}
|
||||
}
|
||||
}
|
||||
result := make([]interface{}, 0, len(patch))
|
||||
for i, pe := range patch {
|
||||
pm, _ := pe.(map[string]interface{})
|
||||
if pm == nil {
|
||||
result = append(result, pe)
|
||||
continue
|
||||
}
|
||||
var baseElem map[string]interface{}
|
||||
if id, ok := pm["id"].(string); ok && id != "" {
|
||||
baseElem = baseByID[id]
|
||||
}
|
||||
if baseElem == nil && i < len(base) {
|
||||
baseElem, _ = base[i].(map[string]interface{})
|
||||
}
|
||||
if baseElem != nil {
|
||||
merged := make(map[string]interface{}, len(baseElem))
|
||||
for k, v := range baseElem {
|
||||
merged[k] = v
|
||||
}
|
||||
deepMerge(merged, pm)
|
||||
result = append(result, merged)
|
||||
} else {
|
||||
result = append(result, pe)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func GenerateNodeID() (string, error) {
|
||||
folderStatic := "/var/lib/opencloud-node"
|
||||
if _, err := os.Stat(folderStatic); err == nil {
|
||||
|
||||
+221
-9
@@ -54,7 +54,11 @@ type Workflow struct {
|
||||
Outputs map[string][]models.Param `json:"outputs" bson:"outputs"`
|
||||
Args map[string][]string `json:"args" bson:"args"`
|
||||
Exposes map[string][]models.Expose `bson:"exposes" json:"exposes"` // Expose is the execution
|
||||
SelectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty"`
|
||||
SelectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty" bson:"selected_embedded_storages,omitempty"`
|
||||
|
||||
// StaleMap maps resource ID → stale bool. Populated at GET time from the
|
||||
// verify campaign results stored in oc-workflow's stale cache. Not persisted.
|
||||
StaleMap map[string]bool `json:"stale_map,omitempty" bson:"-"`
|
||||
}
|
||||
|
||||
func (d *Workflow) GetAccessor(request *tools.APIRequest) utils.Accessor {
|
||||
@@ -1064,8 +1068,16 @@ const (
|
||||
ViolationInvertedArrow ViolationType = "inverted_arrow"
|
||||
ViolationIsolatedProcessing ViolationType = "isolated_processing"
|
||||
ViolationStorageNotLinkedToProcessing ViolationType = "storage_not_linked_to_processing"
|
||||
ViolationDynamicNotConfigured ViolationType = "dynamic_not_configured"
|
||||
ViolationAnchorOnNonStorage ViolationType = "anchor_on_non_storage"
|
||||
)
|
||||
|
||||
// ocAnchorAccess is the magic value used by oc-monitord to inject storage
|
||||
// access credentials into a workflow step at runtime. It must only appear as
|
||||
// an output of a storage (or dynamic-storage) element.
|
||||
const ocAnchorAccess = "§oc:access§"
|
||||
|
||||
|
||||
// IntegrityViolation describes a single structural or semantic problem
|
||||
// found in the workflow graph.
|
||||
type IntegrityViolation struct {
|
||||
@@ -1103,6 +1115,8 @@ func (w *Workflow) ValidateIntegrity() []IntegrityViolation {
|
||||
violations = append(violations, w.detectInvertedArrows()...)
|
||||
violations = append(violations, w.detectIsolatedProcessings()...)
|
||||
violations = append(violations, w.detectOrphanedStorages()...)
|
||||
violations = append(violations, w.validateDynamicFilters()...)
|
||||
violations = append(violations, w.validateAnchorOutputs()...)
|
||||
return violations
|
||||
}
|
||||
|
||||
@@ -1180,14 +1194,12 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation {
|
||||
var name string
|
||||
switch {
|
||||
case w.Graph.IsProcessing(item) && item.Processing != nil:
|
||||
// IsService processings are long-running services and don't need a Compute booking.
|
||||
if item.Processing.IsService {
|
||||
continue
|
||||
}
|
||||
needsCompute = true
|
||||
name = item.Processing.GetName()
|
||||
case w.Graph.IsService(item) && item.Service != nil:
|
||||
// HOSTED services use an existing endpoint — no Compute booking needed.
|
||||
inst := item.Service.GetSelectedInstance(nil)
|
||||
if inst != nil {
|
||||
if si, ok := inst.(*resources.ServiceInstance); ok && si.Mode == resources.HOSTED {
|
||||
@@ -1196,6 +1208,9 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation {
|
||||
}
|
||||
needsCompute = true
|
||||
name = item.Service.GetName()
|
||||
case w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.PROCESSING_RESOURCE:
|
||||
needsCompute = true
|
||||
name = w.itemName(id)
|
||||
}
|
||||
if !needsCompute {
|
||||
continue
|
||||
@@ -1210,7 +1225,12 @@ func (w *Workflow) validateComputeLinks() []IntegrityViolation {
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
if other, ok := w.Graph.Items[otherID]; ok && w.Graph.IsCompute(other) {
|
||||
other, ok := w.Graph.Items[otherID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
// A concrete compute OR a dynamic node typed as compute satisfies the requirement.
|
||||
if w.Graph.IsCompute(other) || (w.Graph.IsDynamic(other) && other.Dynamic.Type == tools.COMPUTE_RESOURCE) {
|
||||
hasCompute = true
|
||||
break
|
||||
}
|
||||
@@ -1301,12 +1321,41 @@ func (w *Workflow) detectCycles() []IntegrityViolation {
|
||||
// validateDataStorageLinks checks that every Data item with a non-empty Source
|
||||
// has at least one Storage linked — the builder needs this to inject the
|
||||
// download step (curl or NATS/Minio protocol).
|
||||
// isStorageEquivalent returns true for items that can satisfy a Data node's
|
||||
// storage requirement: concrete storage, dynamic-storage, or a compute that
|
||||
// has at least one embedded storage available on one of its instances.
|
||||
func (w *Workflow) isStorageEquivalent(item graph.GraphItem) bool {
|
||||
if w.Graph.IsStorage(item) {
|
||||
return true
|
||||
}
|
||||
if w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE {
|
||||
return true
|
||||
}
|
||||
if w.Graph.IsCompute(item) && item.Compute != nil {
|
||||
for _, inst := range item.Compute.Instances {
|
||||
if len(inst.AvailableStorages) > 0 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (w *Workflow) validateDataStorageLinks() []IntegrityViolation {
|
||||
var violations []IntegrityViolation
|
||||
dataStorageLinks := w.Graph.GetDataStorageLinks()
|
||||
// Build the set of data item IDs that have a valid storage-equivalent link.
|
||||
linkedStorage := map[string]struct{}{}
|
||||
for _, dsl := range dataStorageLinks {
|
||||
linkedStorage[dsl.DataItemID] = struct{}{}
|
||||
for _, link := range w.Graph.Links {
|
||||
srcItem, srcOk := w.Graph.Items[link.Source.ID]
|
||||
dstItem, dstOk := w.Graph.Items[link.Destination.ID]
|
||||
if !srcOk || !dstOk {
|
||||
continue
|
||||
}
|
||||
if w.Graph.IsData(srcItem) && w.isStorageEquivalent(dstItem) {
|
||||
linkedStorage[link.Source.ID] = struct{}{}
|
||||
} else if w.isStorageEquivalent(srcItem) && w.Graph.IsData(dstItem) {
|
||||
linkedStorage[link.Destination.ID] = struct{}{}
|
||||
}
|
||||
}
|
||||
for id, item := range w.Graph.Items {
|
||||
if !w.Graph.IsData(item) || item.Data == nil {
|
||||
@@ -1492,12 +1541,162 @@ func (w *Workflow) detectIsolatedProcessings() []IntegrityViolation {
|
||||
return violations
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// AE validation helpers — centralised so both oc-scheduler and oc-schedulerd
|
||||
// share the same logic without code duplication.
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// BuildResourceIDSet constructs the per-type resource-ID map and the flat
|
||||
// coupling-membership set used by ValidateWorkflowAE.
|
||||
//
|
||||
// selectedEmbeddedStorages and selectedInstances come from the scheduling
|
||||
// request (WorkflowSchedule) or from the WorkflowExecution at launch time.
|
||||
// Embedded storages are NOT stored in Workflow.Storages (they are inside
|
||||
// ComputeResourceInstance.AvailableStorages), so they must be resolved here
|
||||
// to make them visible to the AE coupling check.
|
||||
func (w *Workflow) BuildResourceIDSet(
|
||||
selectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection,
|
||||
selectedInstances ConfigItem,
|
||||
) (map[tools.DataType][]string, map[string]struct{}) {
|
||||
resourcesByType := map[tools.DataType][]string{
|
||||
tools.DATA_RESOURCE: w.Datas,
|
||||
tools.PROCESSING_RESOURCE: w.Processings,
|
||||
tools.STORAGE_RESOURCE: append([]string{}, w.Storages...),
|
||||
tools.COMPUTE_RESOURCE: w.Computes,
|
||||
tools.WORKFLOW_RESOURCE: w.Workflows,
|
||||
tools.SERVICE_RESOURCE: w.Services,
|
||||
}
|
||||
idSet := map[string]struct{}{}
|
||||
for _, ids := range resourcesByType {
|
||||
for _, id := range ids {
|
||||
idSet[id] = struct{}{}
|
||||
}
|
||||
}
|
||||
for graphItemID, sel := range selectedEmbeddedStorages {
|
||||
if sel == nil {
|
||||
continue
|
||||
}
|
||||
c, ok := w.Graph.Items[graphItemID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
_, computeRes := c.GetResource()
|
||||
computeResource, ok := computeRes.(*resources.ComputeResource)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
computeIdx := 0
|
||||
if d := selectedInstances.Get(computeResource.GetID()); d != nil {
|
||||
computeIdx = *d
|
||||
}
|
||||
if computeIdx >= len(computeResource.Instances) {
|
||||
continue
|
||||
}
|
||||
computeInst := computeResource.Instances[computeIdx]
|
||||
if sel.StorageIndex >= len(computeInst.AvailableStorages) {
|
||||
continue
|
||||
}
|
||||
storageID := computeInst.AvailableStorages[sel.StorageIndex].GetID()
|
||||
if storageID == "" {
|
||||
continue
|
||||
}
|
||||
idSet[storageID] = struct{}{}
|
||||
resourcesByType[tools.STORAGE_RESOURCE] = append(resourcesByType[tools.STORAGE_RESOURCE], storageID)
|
||||
}
|
||||
return resourcesByType, idSet
|
||||
}
|
||||
|
||||
// ValidateWorkflowAE checks the ExploitationAuthorizations of every resource
|
||||
// referenced in resourcesByType against the coupling/peer/workflow constraints.
|
||||
//
|
||||
// loadResource is injected by the caller to avoid a circular import
|
||||
// (oc-lib/models/resources → oclib → oc-lib/models → resources).
|
||||
// A nil return from loadResource means "resource not found — skip".
|
||||
func (w *Workflow) ValidateWorkflowAE(
|
||||
workflowID, consumerPeerID string,
|
||||
resourcesByType map[tools.DataType][]string,
|
||||
idSet map[string]struct{},
|
||||
loadResource func(tools.DataType, string) resources.ResourceInterface,
|
||||
) []resources.AEViolation {
|
||||
now := time.Now().UTC()
|
||||
var violations []resources.AEViolation
|
||||
for dt, ids := range resourcesByType {
|
||||
for _, id := range ids {
|
||||
res := loadResource(dt, id)
|
||||
if res == nil {
|
||||
continue
|
||||
}
|
||||
for _, ae := range res.GetExploitationAuthorizations() {
|
||||
violations = append(violations, ae.CheckAE(id, workflowID, consumerPeerID, idSet, now)...)
|
||||
}
|
||||
}
|
||||
}
|
||||
return violations
|
||||
}
|
||||
|
||||
// detectOrphanedStorages warns when a storage node is not linked to any
|
||||
// processing node — it contributes no data flow to the workflow.
|
||||
// validateDynamicFilters mirrors oc-front's dynamic-not-configured check:
|
||||
// a dynamic node with no filters cannot be resolved by the scheduler.
|
||||
func (w *Workflow) validateDynamicFilters() []IntegrityViolation {
|
||||
var violations []IntegrityViolation
|
||||
for id, item := range w.Graph.Items {
|
||||
if !w.Graph.IsDynamic(item) {
|
||||
continue
|
||||
}
|
||||
f := item.Dynamic.Filters
|
||||
if len(f.And) == 0 && len(f.Or) == 0 {
|
||||
violations = append(violations, IntegrityViolation{
|
||||
Severity: SeverityError,
|
||||
Type: ViolationDynamicNotConfigured,
|
||||
ItemIDs: []string{id},
|
||||
Message: fmt.Sprintf(
|
||||
`"%s" is a dynamic %s with no filters — at least one filter is required for the scheduler to resolve it`,
|
||||
w.itemName(id), item.Dynamic.Type,
|
||||
),
|
||||
})
|
||||
}
|
||||
}
|
||||
return violations
|
||||
}
|
||||
|
||||
// validateAnchorOutputs mirrors oc-front's anchor-on-non-storage check:
|
||||
// the §oc:access§ magic value must only appear as an output of a storage,
|
||||
// dynamic-storage, or compute node that has an active embedded storage selected.
|
||||
func (w *Workflow) validateAnchorOutputs() []IntegrityViolation {
|
||||
var violations []IntegrityViolation
|
||||
for id, item := range w.Graph.Items {
|
||||
if w.Graph.IsStorage(item) || (w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE) {
|
||||
continue
|
||||
}
|
||||
if w.Graph.IsCompute(item) {
|
||||
if _, ok := w.SelectedEmbeddedStorages[id]; ok {
|
||||
continue
|
||||
}
|
||||
}
|
||||
for _, p := range w.Outputs[id] {
|
||||
if p.Value == ocAnchorAccess {
|
||||
violations = append(violations, IntegrityViolation{
|
||||
Severity: SeverityError,
|
||||
Type: ViolationAnchorOnNonStorage,
|
||||
ItemIDs: []string{id},
|
||||
Message: fmt.Sprintf(
|
||||
`"%s" has an access anchor (%s) as output — access anchors may only be outputs of storage elements`,
|
||||
w.itemName(id), ocAnchorAccess,
|
||||
),
|
||||
})
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return violations
|
||||
}
|
||||
|
||||
func (w *Workflow) detectOrphanedStorages() []IntegrityViolation {
|
||||
var violations []IntegrityViolation
|
||||
for id, item := range w.Graph.Items {
|
||||
if !w.Graph.IsStorage(item) {
|
||||
// Check both concrete storage and dynamic-storage nodes.
|
||||
if !w.Graph.IsStorage(item) && !(w.Graph.IsDynamic(item) && item.Dynamic.Type == tools.STORAGE_RESOURCE) {
|
||||
continue
|
||||
}
|
||||
linkedTopics := map[string]struct{}{}
|
||||
@@ -1510,7 +1709,10 @@ func (w *Workflow) detectOrphanedStorages() []IntegrityViolation {
|
||||
} else {
|
||||
continue
|
||||
}
|
||||
if other, ok := w.Graph.Items[otherID]; ok {
|
||||
other, ok := w.Graph.Items[otherID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
switch {
|
||||
case w.Graph.IsProcessing(other):
|
||||
linkedTopics["processing"] = struct{}{}
|
||||
@@ -1520,6 +1722,16 @@ func (w *Workflow) detectOrphanedStorages() []IntegrityViolation {
|
||||
linkedTopics["data"] = struct{}{}
|
||||
case w.Graph.IsService(other):
|
||||
linkedTopics["service"] = struct{}{}
|
||||
case w.Graph.IsDynamic(other):
|
||||
switch other.Dynamic.Type {
|
||||
case tools.PROCESSING_RESOURCE:
|
||||
linkedTopics["processing"] = struct{}{}
|
||||
case tools.COMPUTE_RESOURCE:
|
||||
linkedTopics["compute"] = struct{}{}
|
||||
case tools.DATA_RESOURCE:
|
||||
linkedTopics["data"] = struct{}{}
|
||||
case tools.SERVICE_RESOURCE:
|
||||
linkedTopics["service"] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -160,7 +160,7 @@ func (a *workflowMongoAccessor) execute(workflow *Workflow, delete bool, active
|
||||
if err == nil && len(resource) > 0 { // if the workspace already exists, update it
|
||||
w := &workspace.Workspace{
|
||||
Active: active,
|
||||
ResourceSet: resources.ResourceSet{
|
||||
WorkspaceResourceSet: resources.WorkspaceResourceSet{
|
||||
Datas: workflow.Datas,
|
||||
Processings: workflow.Processings,
|
||||
Storages: workflow.Storages,
|
||||
@@ -173,7 +173,7 @@ func (a *workflowMongoAccessor) execute(workflow *Workflow, delete bool, active
|
||||
a.workspaceAccessor.StoreOne(&workspace.Workspace{
|
||||
Active: active,
|
||||
AbstractObject: utils.AbstractObject{Name: workflow.Name + "_workspace"},
|
||||
ResourceSet: resources.ResourceSet{
|
||||
WorkspaceResourceSet: resources.WorkspaceResourceSet{
|
||||
Datas: workflow.Datas,
|
||||
Processings: workflow.Processings,
|
||||
Storages: workflow.Storages,
|
||||
|
||||
@@ -48,6 +48,10 @@ type WorkflowExecution struct {
|
||||
BookingsState map[string]BookingState `json:"bookings_state" bson:"bookings_state,omitempty"` // booking_id → reservation+completion status
|
||||
PurchasesState map[string]bool `json:"purchases_state" bson:"purchases_state,omitempty"` // purchase_id → confirmed
|
||||
|
||||
// ResourceConsents records which consent strings the user acknowledged per resource
|
||||
// (resource_id → list of acknowledged ConsentString values) at scheduling time.
|
||||
ResourceConsents map[string][]string `json:"resource_consents,omitempty" bson:"resource_consents,omitempty"`
|
||||
|
||||
// Graph is a lightweight, real-time summary of the workflow execution graph.
|
||||
// Keyed by workflow graph item ID; updated by oc-scheduler on each step-done event.
|
||||
// Consumed by oc-front to render the live execution panel via websocket updates.
|
||||
@@ -97,7 +101,7 @@ func (r *WorkflowExecution) CanUpdate(set utils.DBObject) (bool, utils.DBObject)
|
||||
}
|
||||
|
||||
func (r *WorkflowExecution) CanDelete() bool {
|
||||
return r.IsDraft // only draft bookings can be deleted
|
||||
return true // only draft bookings can be deleted
|
||||
}
|
||||
|
||||
func (wfa *WorkflowExecution) Equals(we *WorkflowExecution) bool {
|
||||
|
||||
@@ -1,23 +1,49 @@
|
||||
package workspace
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/dbs"
|
||||
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
|
||||
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
)
|
||||
|
||||
// trustedRelations holds peer relations that yield TrustMap = true for a resource.
|
||||
var trustedRelations = map[peer.PeerRelation]bool{
|
||||
peer.PARTNER: true,
|
||||
peer.MASTER: true,
|
||||
peer.NANO: true,
|
||||
peer.ORGANIZATION_MASTER: true,
|
||||
peer.ORGANIZATION_MEMBER: true,
|
||||
peer.ORGANIZATION_PARTNER: true,
|
||||
}
|
||||
|
||||
// Workspace is a struct that represents a workspace
|
||||
type Workspace struct {
|
||||
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
|
||||
resources.ResourceSet // ResourceSet contains the resources of the workspace (data, compute, processing, storage, workflow)
|
||||
IsContextual bool `json:"is_contextual" bson:"is_contextual" default:"false"` // IsContextual is a flag that indicates if the workspace is contextual
|
||||
Active bool `json:"active" bson:"active" default:"false"` // Active is a flag that indicates if the workspace is active
|
||||
Shared string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workspace
|
||||
resources.WorkspaceResourceSet // WorkspaceResourceSet persists both IDs and complete resource objects
|
||||
IsContextual bool `json:"is_contextual" bson:"is_contextual" default:"false"`
|
||||
Active bool `json:"active" bson:"active" default:"false"`
|
||||
Shared string `json:"shared,omitempty" bson:"shared,omitempty"`
|
||||
|
||||
// Notifications accumulates strings for auto-modifications (e.g. resource removed after peer blacklist).
|
||||
// Cleared by the owner via the notifications update endpoint.
|
||||
Notifications []string `json:"notifications,omitempty" bson:"notifications,omitempty"`
|
||||
|
||||
// TrustMap maps resource ID → trust bool based on the creator peer's relation.
|
||||
// Not persisted (bson:"-") — recomputed on every load by ComputeTrustAndClean.
|
||||
TrustMap map[string]bool `json:"trust_map,omitempty" bson:"-"`
|
||||
|
||||
// StaleMap maps resource ID → stale bool. Populated at GET time from the
|
||||
// verify campaign results stored in oc-workspace's stale cache. Not persisted.
|
||||
StaleMap map[string]bool `json:"stale_map,omitempty" bson:"-"`
|
||||
}
|
||||
|
||||
func (d *Workspace) GetAccessor(request *tools.APIRequest) utils.Accessor {
|
||||
return NewAccessor(request) // Create a new instance of the accessor
|
||||
return NewAccessor(request)
|
||||
}
|
||||
|
||||
func (ao *Workspace) VerifyAuth(callName string, request *tools.APIRequest) bool {
|
||||
@@ -30,3 +56,147 @@ func (ao *Workspace) VerifyAuth(callName string, request *tools.APIRequest) bool
|
||||
}
|
||||
return ao.AbstractObject.VerifyAuth(callName, request)
|
||||
}
|
||||
|
||||
// ComputeTrustAndClean populates TrustMap for all resources embedded in this workspace,
|
||||
// removes resources whose creator peer is blacklisted, and appends a deletion notification
|
||||
// for each removal. Returns true when at least one resource was removed (caller should persist).
|
||||
func (w *Workspace) ComputeTrustAndClean() bool {
|
||||
w.TrustMap = map[string]bool{}
|
||||
|
||||
selfPeer, _ := utils.GetMySelf(peer.NewShallowAccessor())
|
||||
var selfPeerID string
|
||||
if selfPeer != nil {
|
||||
if p, ok := selfPeer.(*peer.Peer); ok {
|
||||
selfPeerID = p.PeerID
|
||||
}
|
||||
}
|
||||
|
||||
// Cache peer relations to avoid redundant DB lookups per workspace load.
|
||||
cache := map[string]peer.PeerRelation{}
|
||||
relation := func(creatorID string) peer.PeerRelation {
|
||||
if r, ok := cache[creatorID]; ok {
|
||||
return r
|
||||
}
|
||||
if creatorID == selfPeerID {
|
||||
cache[creatorID] = peer.SELF
|
||||
return peer.SELF
|
||||
}
|
||||
results, _, _ := peer.NewShallowAccessor().Search(&dbs.Filters{
|
||||
And: map[string][]dbs.Filter{
|
||||
"peer_id": {{Operator: dbs.EQUAL.String(), Value: creatorID}},
|
||||
},
|
||||
}, "", false, 0, 1)
|
||||
rel := peer.NONE
|
||||
if len(results) > 0 {
|
||||
if p, ok := results[0].(*peer.Peer); ok {
|
||||
rel = p.Relation
|
||||
}
|
||||
}
|
||||
cache[creatorID] = rel
|
||||
return rel
|
||||
}
|
||||
|
||||
setTrust := func(id, creatorID string, rel peer.PeerRelation) {
|
||||
w.TrustMap[id] = (creatorID == selfPeerID) || trustedRelations[rel]
|
||||
}
|
||||
|
||||
changed := false
|
||||
|
||||
var keptData []*resources.DataResource
|
||||
for _, r := range w.DataResources {
|
||||
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
|
||||
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
|
||||
changed = true
|
||||
} else {
|
||||
setTrust(r.GetID(), r.GetCreatorID(), rel)
|
||||
keptData = append(keptData, r)
|
||||
}
|
||||
}
|
||||
w.DataResources = keptData
|
||||
|
||||
var keptCompute []*resources.ComputeResource
|
||||
for _, r := range w.ComputeResources {
|
||||
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
|
||||
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
|
||||
changed = true
|
||||
} else {
|
||||
setTrust(r.GetID(), r.GetCreatorID(), rel)
|
||||
keptCompute = append(keptCompute, r)
|
||||
}
|
||||
}
|
||||
w.ComputeResources = keptCompute
|
||||
|
||||
var keptStorage []*resources.StorageResource
|
||||
for _, r := range w.StorageResources {
|
||||
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
|
||||
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
|
||||
changed = true
|
||||
} else {
|
||||
setTrust(r.GetID(), r.GetCreatorID(), rel)
|
||||
keptStorage = append(keptStorage, r)
|
||||
}
|
||||
}
|
||||
w.StorageResources = keptStorage
|
||||
|
||||
var keptProcessing []*resources.ProcessingResource
|
||||
for _, r := range w.ProcessingResources {
|
||||
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
|
||||
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
|
||||
changed = true
|
||||
} else {
|
||||
setTrust(r.GetID(), r.GetCreatorID(), rel)
|
||||
keptProcessing = append(keptProcessing, r)
|
||||
}
|
||||
}
|
||||
w.ProcessingResources = keptProcessing
|
||||
|
||||
var keptWorkflow []*resources.WorkflowResource
|
||||
for _, r := range w.WorkflowResources {
|
||||
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
|
||||
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
|
||||
changed = true
|
||||
} else {
|
||||
setTrust(r.GetID(), r.GetCreatorID(), rel)
|
||||
keptWorkflow = append(keptWorkflow, r)
|
||||
}
|
||||
}
|
||||
w.WorkflowResources = keptWorkflow
|
||||
|
||||
var keptService []*resources.ServiceResource
|
||||
for _, r := range w.ServiceResources {
|
||||
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
|
||||
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
|
||||
changed = true
|
||||
} else {
|
||||
setTrust(r.GetID(), r.GetCreatorID(), rel)
|
||||
keptService = append(keptService, r)
|
||||
}
|
||||
}
|
||||
w.ServiceResources = keptService
|
||||
|
||||
var keptDynamic []*resources.DynamicResource
|
||||
for _, r := range w.DynamicResources {
|
||||
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
|
||||
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
|
||||
changed = true
|
||||
} else {
|
||||
setTrust(r.GetID(), r.GetCreatorID(), rel)
|
||||
keptDynamic = append(keptDynamic, r)
|
||||
}
|
||||
}
|
||||
w.DynamicResources = keptDynamic
|
||||
|
||||
var keptNative []*resources.NativeTool
|
||||
for _, r := range w.NativeTools {
|
||||
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
|
||||
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
|
||||
changed = true
|
||||
} else {
|
||||
setTrust(r.GetID(), r.GetCreatorID(), rel)
|
||||
keptNative = append(keptNative, r)
|
||||
}
|
||||
}
|
||||
w.NativeTools = keptNative
|
||||
|
||||
return changed
|
||||
}
|
||||
|
||||
@@ -7,10 +7,60 @@ import (
|
||||
"cloud.o-forge.io/core/oc-lib/logs"
|
||||
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
|
||||
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
)
|
||||
|
||||
func init() {
|
||||
resources.WorkspaceCandidatesProvider = func(dt tools.DataType, request *tools.APIRequest) []resources.ResourceInterface {
|
||||
res, _, _ := NewAccessor(request).Search(&dbs.Filters{
|
||||
And: map[string][]dbs.Filter{
|
||||
"user_creator_id": {{Operator: dbs.EQUAL.String(), Value: request.Username}},
|
||||
"active": {{Operator: dbs.EQUAL.String(), Value: true}},
|
||||
},
|
||||
}, "", false, 0, 1)
|
||||
if len(res) == 0 {
|
||||
return []resources.ResourceInterface{}
|
||||
}
|
||||
ws, ok := res[0].(*Workspace)
|
||||
if !ok {
|
||||
return []resources.ResourceInterface{}
|
||||
}
|
||||
// ws.Fill was already called by Search — typed slices are populated.
|
||||
// Return an empty non-nil slice when the workspace exists but has no
|
||||
// resources of the requested type: caller must not fall back to catalog.
|
||||
out := []resources.ResourceInterface{}
|
||||
switch dt {
|
||||
case tools.COMPUTE_RESOURCE:
|
||||
for _, c := range ws.ComputeResources {
|
||||
out = append(out, c)
|
||||
}
|
||||
case tools.DATA_RESOURCE:
|
||||
for _, c := range ws.DataResources {
|
||||
out = append(out, c)
|
||||
}
|
||||
case tools.STORAGE_RESOURCE:
|
||||
for _, c := range ws.StorageResources {
|
||||
out = append(out, c)
|
||||
}
|
||||
case tools.PROCESSING_RESOURCE:
|
||||
for _, c := range ws.ProcessingResources {
|
||||
out = append(out, c)
|
||||
}
|
||||
case tools.WORKFLOW_RESOURCE:
|
||||
for _, c := range ws.WorkflowResources {
|
||||
out = append(out, c)
|
||||
}
|
||||
case tools.SERVICE_RESOURCE:
|
||||
for _, c := range ws.ServiceResources {
|
||||
out = append(out, c)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
}
|
||||
|
||||
// Workspace is a struct that represents a workspace
|
||||
type workspaceMongoAccessor struct {
|
||||
utils.AbstractAccessor[*Workspace] // AbstractAccessor contains the basic fields of an accessor (model, caller)
|
||||
@@ -88,25 +138,39 @@ func (a *workspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject,
|
||||
|
||||
func (a *workspaceMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
|
||||
return utils.GenericLoadOne(id, a.New(), func(d utils.DBObject) (utils.DBObject, int, error) {
|
||||
d.(*Workspace).Fill(a.GetRequest())
|
||||
return d, 200, nil
|
||||
w := d.(*Workspace)
|
||||
w.Fill(a.GetRequest())
|
||||
a.applyTrustAndClean(w)
|
||||
return w, 200, nil
|
||||
}, a)
|
||||
}
|
||||
|
||||
func (a *workspaceMongoAccessor) LoadAll(isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
|
||||
return utils.GenericLoadAll[*Workspace](func(d utils.DBObject) utils.ShallowDBObject {
|
||||
d.(*Workspace).Fill(a.GetRequest())
|
||||
return d
|
||||
w := d.(*Workspace)
|
||||
w.Fill(a.GetRequest())
|
||||
a.applyTrustAndClean(w)
|
||||
return w
|
||||
}, isDraft, a, offset, limit)
|
||||
}
|
||||
|
||||
func (a *workspaceMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
|
||||
return utils.GenericSearch[*Workspace](filters, search, (&Workspace{}).GetObjectFilters(search), func(d utils.DBObject) utils.ShallowDBObject {
|
||||
d.(*Workspace).Fill(a.GetRequest())
|
||||
return d
|
||||
w := d.(*Workspace)
|
||||
w.Fill(a.GetRequest())
|
||||
a.applyTrustAndClean(w)
|
||||
return w
|
||||
}, isDraft, a, offset, limit)
|
||||
}
|
||||
|
||||
// applyTrustAndClean calls ComputeTrustAndClean and, when resources were removed due to
|
||||
// blacklisted peers, persists the cleaned workspace back to the database.
|
||||
func (a *workspaceMongoAccessor) applyTrustAndClean(w *Workspace) {
|
||||
if changed := w.ComputeTrustAndClean(); changed {
|
||||
utils.GenericUpdateOne(w.Serialize(w), w.GetID(), a)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
This function is used to share the workspace with the peers
|
||||
*/
|
||||
|
||||
+50
-1
@@ -40,6 +40,7 @@ const (
|
||||
REFUND
|
||||
DISCOUNT
|
||||
SUBSCRIPTION
|
||||
POLICY
|
||||
)
|
||||
|
||||
var NOAPI = func() string {
|
||||
@@ -104,6 +105,7 @@ var InnerDefaultAPI = [...]func() string{
|
||||
NOAPI,
|
||||
NOAPI,
|
||||
NOAPI,
|
||||
PEERSAPI,
|
||||
}
|
||||
|
||||
// Bind the standard data name to the data type
|
||||
@@ -138,6 +140,7 @@ var Str = [...]string{
|
||||
"refund",
|
||||
"discount",
|
||||
"subscription",
|
||||
"policy",
|
||||
}
|
||||
|
||||
func FromString(comp string) int {
|
||||
@@ -174,7 +177,7 @@ func DataTypeList() []DataType {
|
||||
WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY,
|
||||
ORDER, PURCHASE_RESOURCE,
|
||||
LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL, EXECUTION_VERIFICATION, ALLOWED_IMAGE, SERVICE_RESOURCE, DYNAMIC_RESOURCE, LIVE_SERVICE,
|
||||
PAYMENT, REFUND, DISCOUNT, SUBSCRIPTION}
|
||||
PAYMENT, REFUND, DISCOUNT, SUBSCRIPTION, POLICY}
|
||||
}
|
||||
|
||||
type PropalgationMessage struct {
|
||||
@@ -208,6 +211,40 @@ const (
|
||||
// for a private source resource (isReachable=false, Phase 4).
|
||||
// oc-discovery routes it to the resource owner peer via ProtocolSourcePresignResource.
|
||||
PB_SOURCE_PRESIGN
|
||||
|
||||
// PB_ORG_PARTNER is propagated via PB_PROPAGATE through oc-discovery to the
|
||||
// organization master's oc-discovery, which notifies its oc-peer via
|
||||
// ORG_PARTNER_EVENT. The master's oc-peer confirms or rejects by emitting a
|
||||
// PROPALGATION_EVENT back, which oc-discovery routes to the originating
|
||||
// oc-discovery, which in turn notifies our oc-peer via ORG_PARTNER_EVENT to
|
||||
// finalize the relation.
|
||||
PB_ORG_PARTNER
|
||||
|
||||
// PB_WATCH_RESOURCE is emitted by oc-workspace when a non-self resource is
|
||||
// stored in a workspace. oc-discovery contacts the creator peer to register
|
||||
// the watching peerID in the creator's watcher cache so it receives future
|
||||
// CREATE/DELETE events for that resource.
|
||||
// Payload: { "creator_peer_id": "...", "resource_id": "..." }
|
||||
PB_WATCH_RESOURCE
|
||||
|
||||
// PB_UNWATCH_RESOURCE is emitted by oc-workspace when a non-self resource is
|
||||
// removed from all workspaces. oc-discovery contacts the creator peer to
|
||||
// deregister the watching peerID from the creator's watcher cache.
|
||||
// Payload: { "creator_peer_id": "...", "resource_id": "..." }
|
||||
PB_UNWATCH_RESOURCE
|
||||
|
||||
// PB_BOOKING_SYNC is emitted by master every 24 h to each known NANO.
|
||||
// Payload: {"peer_id": nano.PeerID, "booking_sync_ids": ["id1", "id2", ...]}
|
||||
// Nano compares the list against its own confirmed bookings and calls
|
||||
// SendBookingToMaster for any it has that master is missing.
|
||||
PB_BOOKING_SYNC
|
||||
|
||||
// PB_VERIFY_RESOURCE is emitted by oc-workspace or oc-workflow on workspace
|
||||
// activation / workflow opening to verify that an embedded non-self resource
|
||||
// is still current. oc-discovery forwards the request to the creator peer via
|
||||
// ProtocolVerifyResource; the result comes back as a VERIFY_RESOURCE NATS event.
|
||||
// Payload: { "creator_peer_id": "…", "data_type": N, "resource_payload": {…} }
|
||||
PB_VERIFY_RESOURCE
|
||||
)
|
||||
|
||||
func GetActionString(ss string) PubSubAction {
|
||||
@@ -242,6 +279,14 @@ func GetActionString(ss string) PubSubAction {
|
||||
return PB_PROPAGATE
|
||||
case "source_presign":
|
||||
return PB_SOURCE_PRESIGN
|
||||
case "org_partner":
|
||||
return PB_ORG_PARTNER
|
||||
case "watch_resource":
|
||||
return PB_WATCH_RESOURCE
|
||||
case "unwatch_resource":
|
||||
return PB_UNWATCH_RESOURCE
|
||||
case "booking_sync":
|
||||
return PB_BOOKING_SYNC
|
||||
default:
|
||||
return NONE
|
||||
}
|
||||
@@ -266,6 +311,10 @@ var path = []string{
|
||||
"observe_close", // 14 PB_OBSERVE_CLOSE
|
||||
"propagate", // 15 PB_PROPAGATE
|
||||
"source_presign", // 16 PB_SOURCE_PRESIGN
|
||||
"org_partner", // 17 PB_ORG_PARTNER
|
||||
"watch_resource", // 18 PB_WATCH_RESOURCE
|
||||
"unwatch_resource", // 19 PB_UNWATCH_RESOURCE
|
||||
"booking_sync", // 20 PB_BOOKING_SYNC
|
||||
}
|
||||
|
||||
func (m PubSubAction) String() string {
|
||||
|
||||
+11
-2
@@ -32,7 +32,7 @@ var meths = []string{"remove execution", "create execution", "planner execution"
|
||||
"considers event", "admiralty config event", "minio config event", "pvc config event",
|
||||
"workflow started event", "workflow step done event", "workflow done event",
|
||||
"peer behavior event", "peer observe response event", "peer observe event",
|
||||
"source presign event",
|
||||
"source presign event", "org partner event", "verify resource event",
|
||||
}
|
||||
|
||||
const (
|
||||
@@ -85,6 +85,15 @@ const (
|
||||
// oc-datacenter listens to it to generate a pre-signed Minio URL and reply
|
||||
// via PB_CONSIDERS (Phase 4 — isReachable=false).
|
||||
SOURCE_PRESIGN_EVENT
|
||||
|
||||
// ORG_PARTNER_EVENT is emitted by a peer to its OrganizationMaster to ask:
|
||||
// "is peer X one of your members?". The master replies via the same channel.
|
||||
ORG_PARTNER_EVENT
|
||||
|
||||
// VERIFY_RESOURCE is emitted by oc-discovery when it receives a verify response
|
||||
// from a remote peer via ProtocolVerifyResource. oc-workspace and oc-workflow
|
||||
// listen to this event to update their stale caches.
|
||||
VERIFY_RESOURCE
|
||||
)
|
||||
|
||||
func (n NATSMethod) String() string {
|
||||
@@ -98,7 +107,7 @@ func NameToMethod(name string) NATSMethod {
|
||||
CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT, PVC_CONFIG_EVENT,
|
||||
WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT,
|
||||
PEER_BEHAVIOR_EVENT, PEER_OBSERVE_RESPONSE_EVENT, PEER_OBSERVE_EVENT,
|
||||
SOURCE_PRESIGN_EVENT} {
|
||||
SOURCE_PRESIGN_EVENT, ORG_PARTNER_EVENT, VERIFY_RESOURCE} {
|
||||
if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) {
|
||||
return v
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user