This commit is contained in:
mr
2026-06-22 07:50:01 +02:00
parent 1425a31494
commit 58e97fbe74
10 changed files with 385 additions and 16 deletions
+1
View File
@@ -77,6 +77,7 @@ const (
REFUND = tools.REFUND
DISCOUNT = tools.DISCOUNT
SUBSCRIPTION = tools.SUBSCRIPTION
POLICY = tools.POLICY
)
func FiltersFromFlatMap(flatMap map[string]interface{}, target interface{}) *dbs.Filters {
+2
View File
@@ -11,6 +11,7 @@ import (
"cloud.o-forge.io/core/oc-lib/models/execution_verification"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/peer/policy"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
@@ -58,6 +59,7 @@ var ModelsCatalog = map[string]func() utils.DBObject{
tools.SUBSCRIPTION.String(): func() utils.DBObject { return &subscription.Subscription{} },
tools.EXECUTION_VERIFICATION.String(): func() utils.DBObject { return &execution_verification.ExecutionVerification{} },
tools.ALLOWED_IMAGE.String(): func() utils.DBObject { return &allowed_image.AllowedImage{} },
tools.POLICY.String(): func() utils.DBObject { return &policy.Policy{} },
}
// Model returns the model object based on the model type
+10
View File
@@ -141,6 +141,11 @@ type Peer struct {
// Null when the peer has not registered any organization data.
Organization *organization.Organization `json:"organization,omitempty" bson:"organization,omitempty"`
// PolicyID references the Policy document that governs which inbound
// libp2p streams are authorized for this peer.
// When empty, all non-vital streams are denied by default.
PolicyID string `json:"policy_id,omitempty" bson:"policy_id,omitempty"`
// Volatile connectivity state — never persisted to DB (bson:"-").
// Set in-memory by oc-peer when it receives a PEER_OBSERVE_RESPONSE_EVENT.
// Considered offline when LastHeartbeat is older than 60 s (30 s interval + 30 s grace).
@@ -157,6 +162,11 @@ func (ri *Peer) Extend(typ ...string) map[string][]tools.DataType {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.PEER)
case "policy":
if _, ok := ext[t]; !ok {
ext[t] = []tools.DataType{}
}
ext[t] = append(ext[t], tools.POLICY)
}
}
return ext
+30
View File
@@ -0,0 +1,30 @@
package policy
import (
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
// Policy defines which inbound libp2p streams are authorized for a peer.
// Vital streams (planner, considers, minio/admiralty config, source-presign,
// verify, observe, heartbeat) are always allowed regardless of policy.
type Policy struct {
utils.AbstractObject
// Resource CRUD
AllowSearch bool `json:"allow_search" bson:"allow_search"`
AllowCreate bool `json:"allow_create" bson:"allow_create"`
AllowUpdate bool `json:"allow_update" bson:"allow_update"`
AllowDelete bool `json:"allow_delete" bson:"allow_delete"`
// Resource freshness tracking
AllowRegisterWatcher bool `json:"allow_register_watcher" bson:"allow_register_watcher"`
AllowUnregisterWatcher bool `json:"allow_unregister_watcher" bson:"allow_unregister_watcher"`
// Organization partner confirmation
AllowOrgPartnerConfirm bool `json:"allow_org_partner_confirm" bson:"allow_org_partner_confirm"`
}
func (p *Policy) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request)
}
@@ -0,0 +1,31 @@
package policy
import (
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
type policyMongoAccessor struct {
utils.AbstractAccessor[*Policy]
}
func NewAccessor(request *tools.APIRequest) *policyMongoAccessor {
return &policyMongoAccessor{
AbstractAccessor: utils.AbstractAccessor[*Policy]{
Logger: logs.CreateLogger(tools.POLICY.String()),
Request: request,
Type: tools.POLICY,
New: func() *Policy { return &Policy{} },
},
}
}
func (a *policyMongoAccessor) GetObjectFilters(search string) *dbs.Filters {
return &dbs.Filters{
Or: map[string][]dbs.Filter{
"abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
},
}
}
+94 -1
View File
@@ -15,7 +15,8 @@ type ResourceSet struct {
Services []string `bson:"services,omitempty" json:"services,omitempty"`
Dynamics []string `bson:"dynamics,omitempty" json:"dynamics,omitempty"`
// DynamicResources are stored inline — no DB collection, resolved at runtime via SetAllowedInstances.
// Runtime-only resource objects — not persisted. Populated by Fill() from the ID lists above.
// Use WorkspaceResourceSet when full object persistence is needed (workspace fluid catalog).
DynamicResources []*DynamicResource `bson:"-" json:"dynamic_resources,omitempty"`
DataResources []*DataResource `bson:"-" json:"data_resources,omitempty"`
StorageResources []*StorageResource `bson:"-" json:"storage_resources,omitempty"`
@@ -26,6 +27,98 @@ type ResourceSet struct {
ServiceResources []*ServiceResource `bson:"-" json:"service_resources,omitempty"`
}
// WorkspaceResourceSet mirrors ResourceSet but persists complete resource objects to MongoDB.
// Use this in workspace documents where the workspace acts as a fluid resource catalog.
// The *Resource fields are loaded from bson on read; Fill() skips catalog lookup when they are
// already populated.
type WorkspaceResourceSet struct {
Datas []string `bson:"datas,omitempty" json:"datas,omitempty"`
Storages []string `bson:"storages,omitempty" json:"storages,omitempty"`
Processings []string `bson:"processings,omitempty" json:"processings,omitempty"`
Computes []string `bson:"computes,omitempty" json:"computes,omitempty"`
Workflows []string `bson:"workflows,omitempty" json:"workflows,omitempty"`
NativeTool []string `bson:"native,omitempty" json:"native,omitempty"`
Services []string `bson:"services,omitempty" json:"services,omitempty"`
Dynamics []string `bson:"dynamics,omitempty" json:"dynamics,omitempty"`
DynamicResources []*DynamicResource `bson:"dynamic_resources,omitempty" json:"dynamic_resources,omitempty"`
DataResources []*DataResource `bson:"data_resources,omitempty" json:"data_resources,omitempty"`
StorageResources []*StorageResource `bson:"storage_resources,omitempty" json:"storage_resources,omitempty"`
ProcessingResources []*ProcessingResource `bson:"processing_resources,omitempty" json:"processing_resources,omitempty"`
ComputeResources []*ComputeResource `bson:"compute_resources,omitempty" json:"compute_resources,omitempty"`
WorkflowResources []*WorkflowResource `bson:"workflow_resources,omitempty" json:"workflow_resources,omitempty"`
NativeTools []*NativeTool `bson:"native_tools,omitempty" json:"native_tools,omitempty"`
ServiceResources []*ServiceResource `bson:"service_resources,omitempty" json:"service_resources,omitempty"`
}
func (r *WorkspaceResourceSet) Clear() {
r.DataResources = nil
r.StorageResources = nil
r.ProcessingResources = nil
r.ComputeResources = nil
r.WorkflowResources = nil
r.ServiceResources = nil
r.DynamicResources = nil
r.NativeTools = nil
}
// Fill populates *Resource fields from their ID lists. When a field is already non-nil
// (loaded from the workspace MongoDB document), the catalog lookup is skipped for that type.
func (r *WorkspaceResourceSet) Fill(request *tools.APIRequest) {
if r.DataResources == nil {
for _, id := range r.Datas {
if d, _, e := (&DataResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.DataResources = append(r.DataResources, d.(*DataResource))
}
}
}
if r.ComputeResources == nil {
for _, id := range r.Computes {
if d, _, e := (&ComputeResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.ComputeResources = append(r.ComputeResources, d.(*ComputeResource))
}
}
}
if r.StorageResources == nil {
for _, id := range r.Storages {
if d, _, e := (&StorageResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.StorageResources = append(r.StorageResources, d.(*StorageResource))
}
}
}
if r.ProcessingResources == nil {
for _, id := range r.Processings {
if d, _, e := (&ProcessingResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.ProcessingResources = append(r.ProcessingResources, d.(*ProcessingResource))
}
}
}
if r.WorkflowResources == nil {
for _, id := range r.Workflows {
if d, _, e := (&WorkflowResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.WorkflowResources = append(r.WorkflowResources, d.(*WorkflowResource))
}
}
}
if r.ServiceResources == nil {
for _, id := range r.Services {
if d, _, e := (&ServiceResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.ServiceResources = append(r.ServiceResources, d.(*ServiceResource))
}
}
}
if r.DynamicResources == nil {
for _, id := range r.Dynamics {
if d, _, e := (&DynamicResource{}).GetAccessor(request).LoadOne(id); e == nil {
r.DynamicResources = append(r.DynamicResources, d.(*DynamicResource))
}
}
}
for _, d := range r.DynamicResources {
d.SetAllowedInstances(request)
}
}
func (r *ResourceSet) Clear() {
r.DataResources = nil
r.StorageResources = nil
+2 -2
View File
@@ -160,7 +160,7 @@ func (a *workflowMongoAccessor) execute(workflow *Workflow, delete bool, active
if err == nil && len(resource) > 0 { // if the workspace already exists, update it
w := &workspace.Workspace{
Active: active,
ResourceSet: resources.ResourceSet{
WorkspaceResourceSet: resources.WorkspaceResourceSet{
Datas: workflow.Datas,
Processings: workflow.Processings,
Storages: workflow.Storages,
@@ -173,7 +173,7 @@ func (a *workflowMongoAccessor) execute(workflow *Workflow, delete bool, active
a.workspaceAccessor.StoreOne(&workspace.Workspace{
Active: active,
AbstractObject: utils.AbstractObject{Name: workflow.Name + "_workspace"},
ResourceSet: resources.ResourceSet{
WorkspaceResourceSet: resources.WorkspaceResourceSet{
Datas: workflow.Datas,
Processings: workflow.Processings,
Storages: workflow.Storages,
+172 -6
View File
@@ -1,23 +1,45 @@
package workspace
import (
"fmt"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
// trustedRelations holds peer relations that yield TrustMap = true for a resource.
var trustedRelations = map[peer.PeerRelation]bool{
peer.PARTNER: true,
peer.MASTER: true,
peer.NANO: true,
peer.ORGANIZATION_MASTER: true,
peer.ORGANIZATION_MEMBER: true,
peer.ORGANIZATION_PARTNER: true,
}
// Workspace is a struct that represents a workspace
type Workspace struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
resources.ResourceSet // ResourceSet contains the resources of the workspace (data, compute, processing, storage, workflow)
IsContextual bool `json:"is_contextual" bson:"is_contextual" default:"false"` // IsContextual is a flag that indicates if the workspace is contextual
Active bool `json:"active" bson:"active" default:"false"` // Active is a flag that indicates if the workspace is active
Shared string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workspace
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
resources.WorkspaceResourceSet // WorkspaceResourceSet persists both IDs and complete resource objects
IsContextual bool `json:"is_contextual" bson:"is_contextual" default:"false"`
Active bool `json:"active" bson:"active" default:"false"`
Shared string `json:"shared,omitempty" bson:"shared,omitempty"`
// Notifications accumulates strings for auto-modifications (e.g. resource removed after peer blacklist).
// Cleared by the owner via the notifications update endpoint.
Notifications []string `json:"notifications,omitempty" bson:"notifications,omitempty"`
// TrustMap maps resource ID → trust bool based on the creator peer's relation.
// Not persisted (bson:"-") — recomputed on every load by ComputeTrustAndClean.
TrustMap map[string]bool `json:"trust_map,omitempty" bson:"-"`
}
func (d *Workspace) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request) // Create a new instance of the accessor
return NewAccessor(request)
}
func (ao *Workspace) VerifyAuth(callName string, request *tools.APIRequest) bool {
@@ -30,3 +52,147 @@ func (ao *Workspace) VerifyAuth(callName string, request *tools.APIRequest) bool
}
return ao.AbstractObject.VerifyAuth(callName, request)
}
// ComputeTrustAndClean populates TrustMap for all resources embedded in this workspace,
// removes resources whose creator peer is blacklisted, and appends a deletion notification
// for each removal. Returns true when at least one resource was removed (caller should persist).
func (w *Workspace) ComputeTrustAndClean() bool {
w.TrustMap = map[string]bool{}
selfPeer, _ := utils.GetMySelf(peer.NewShallowAccessor())
var selfPeerID string
if selfPeer != nil {
if p, ok := selfPeer.(*peer.Peer); ok {
selfPeerID = p.PeerID
}
}
// Cache peer relations to avoid redundant DB lookups per workspace load.
cache := map[string]peer.PeerRelation{}
relation := func(creatorID string) peer.PeerRelation {
if r, ok := cache[creatorID]; ok {
return r
}
if creatorID == selfPeerID {
cache[creatorID] = peer.SELF
return peer.SELF
}
results, _, _ := peer.NewShallowAccessor().Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"peer_id": {{Operator: dbs.EQUAL.String(), Value: creatorID}},
},
}, "", false, 0, 1)
rel := peer.NONE
if len(results) > 0 {
if p, ok := results[0].(*peer.Peer); ok {
rel = p.Relation
}
}
cache[creatorID] = rel
return rel
}
setTrust := func(id, creatorID string, rel peer.PeerRelation) {
w.TrustMap[id] = (creatorID == selfPeerID) || trustedRelations[rel]
}
changed := false
var keptData []*resources.DataResource
for _, r := range w.DataResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptData = append(keptData, r)
}
}
w.DataResources = keptData
var keptCompute []*resources.ComputeResource
for _, r := range w.ComputeResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptCompute = append(keptCompute, r)
}
}
w.ComputeResources = keptCompute
var keptStorage []*resources.StorageResource
for _, r := range w.StorageResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptStorage = append(keptStorage, r)
}
}
w.StorageResources = keptStorage
var keptProcessing []*resources.ProcessingResource
for _, r := range w.ProcessingResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptProcessing = append(keptProcessing, r)
}
}
w.ProcessingResources = keptProcessing
var keptWorkflow []*resources.WorkflowResource
for _, r := range w.WorkflowResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptWorkflow = append(keptWorkflow, r)
}
}
w.WorkflowResources = keptWorkflow
var keptService []*resources.ServiceResource
for _, r := range w.ServiceResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptService = append(keptService, r)
}
}
w.ServiceResources = keptService
var keptDynamic []*resources.DynamicResource
for _, r := range w.DynamicResources {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptDynamic = append(keptDynamic, r)
}
}
w.DynamicResources = keptDynamic
var keptNative []*resources.NativeTool
for _, r := range w.NativeTools {
if rel := relation(r.GetCreatorID()); rel == peer.BLACKLIST {
w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: creator peer blacklisted", r.GetName(), r.GetID()))
changed = true
} else {
setTrust(r.GetID(), r.GetCreatorID(), rel)
keptNative = append(keptNative, r)
}
}
w.NativeTools = keptNative
return changed
}
+20 -6
View File
@@ -88,25 +88,39 @@ func (a *workspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject,
func (a *workspaceMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne(id, a.New(), func(d utils.DBObject) (utils.DBObject, int, error) {
d.(*Workspace).Fill(a.GetRequest())
return d, 200, nil
w := d.(*Workspace)
w.Fill(a.GetRequest())
a.applyTrustAndClean(w)
return w, 200, nil
}, a)
}
func (a *workspaceMongoAccessor) LoadAll(isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
return utils.GenericLoadAll[*Workspace](func(d utils.DBObject) utils.ShallowDBObject {
d.(*Workspace).Fill(a.GetRequest())
return d
w := d.(*Workspace)
w.Fill(a.GetRequest())
a.applyTrustAndClean(w)
return w
}, isDraft, a, offset, limit)
}
func (a *workspaceMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool, offset int64, limit int64) ([]utils.ShallowDBObject, int, error) {
return utils.GenericSearch[*Workspace](filters, search, (&Workspace{}).GetObjectFilters(search), func(d utils.DBObject) utils.ShallowDBObject {
d.(*Workspace).Fill(a.GetRequest())
return d
w := d.(*Workspace)
w.Fill(a.GetRequest())
a.applyTrustAndClean(w)
return w
}, isDraft, a, offset, limit)
}
// applyTrustAndClean calls ComputeTrustAndClean and, when resources were removed due to
// blacklisted peers, persists the cleaned workspace back to the database.
func (a *workspaceMongoAccessor) applyTrustAndClean(w *Workspace) {
if changed := w.ComputeTrustAndClean(); changed {
utils.GenericUpdateOne(w.Serialize(w), w.GetID(), a)
}
}
/*
This function is used to share the workspace with the peers
*/
+23 -1
View File
@@ -40,6 +40,7 @@ const (
REFUND
DISCOUNT
SUBSCRIPTION
POLICY
)
var NOAPI = func() string {
@@ -104,6 +105,7 @@ var InnerDefaultAPI = [...]func() string{
NOAPI,
NOAPI,
NOAPI,
PEERSAPI,
}
// Bind the standard data name to the data type
@@ -138,6 +140,7 @@ var Str = [...]string{
"refund",
"discount",
"subscription",
"policy",
}
func FromString(comp string) int {
@@ -174,7 +177,7 @@ func DataTypeList() []DataType {
WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY,
ORDER, PURCHASE_RESOURCE,
LIVE_DATACENTER, LIVE_STORAGE, BILL, NATIVE_TOOL, EXECUTION_VERIFICATION, ALLOWED_IMAGE, SERVICE_RESOURCE, DYNAMIC_RESOURCE, LIVE_SERVICE,
PAYMENT, REFUND, DISCOUNT, SUBSCRIPTION}
PAYMENT, REFUND, DISCOUNT, SUBSCRIPTION, POLICY}
}
type PropalgationMessage struct {
@@ -216,6 +219,19 @@ const (
// oc-discovery, which in turn notifies our oc-peer via ORG_PARTNER_EVENT to
// finalize the relation.
PB_ORG_PARTNER
// PB_WATCH_RESOURCE is emitted by oc-workspace when a non-self resource is
// stored in a workspace. oc-discovery contacts the creator peer to register
// the watching peerID in the creator's watcher cache so it receives future
// CREATE/DELETE events for that resource.
// Payload: { "creator_peer_id": "...", "resource_id": "..." }
PB_WATCH_RESOURCE
// PB_UNWATCH_RESOURCE is emitted by oc-workspace when a non-self resource is
// removed from all workspaces. oc-discovery contacts the creator peer to
// deregister the watching peerID from the creator's watcher cache.
// Payload: { "creator_peer_id": "...", "resource_id": "..." }
PB_UNWATCH_RESOURCE
)
func GetActionString(ss string) PubSubAction {
@@ -252,6 +268,10 @@ func GetActionString(ss string) PubSubAction {
return PB_SOURCE_PRESIGN
case "org_partner":
return PB_ORG_PARTNER
case "watch_resource":
return PB_WATCH_RESOURCE
case "unwatch_resource":
return PB_UNWATCH_RESOURCE
default:
return NONE
}
@@ -277,6 +297,8 @@ var path = []string{
"propagate", // 15 PB_PROPAGATE
"source_presign", // 16 PB_SOURCE_PRESIGN
"org_partner", // 17 PB_ORG_PARTNER
"watch_resource", // 18 PB_WATCH_RESOURCE
"unwatch_resource", // 19 PB_UNWATCH_RESOURCE
}
func (m PubSubAction) String() string {