This commit is contained in:
mr
2026-06-23 09:40:33 +02:00
parent 58e97fbe74
commit 3a66b42c01
8 changed files with 476 additions and 38 deletions
+213 -31
View File
@@ -37,37 +37,49 @@ type DynamicResource struct {
WatchedDynamicResource []string `bson:"watched_dynamic_resource,omitempty" json:"watched_dynamic_resource,omitempty"` WatchedDynamicResource []string `bson:"watched_dynamic_resource,omitempty" json:"watched_dynamic_resource,omitempty"`
} }
// WorkspaceCandidatesProvider can be set by the workspace package to supply
// contextual workspace resources for a given DataType and request without
// creating a circular import (workspace → resources → workspace).
// When set, SetAllowedInstances uses workspace-scoped resources instead of
// the full catalog for requests that carry a username.
var WorkspaceCandidatesProvider func(dt tools.DataType, request *tools.APIRequest) []ResourceInterface
func (d *DynamicResource) GetAccessor(request *tools.APIRequest) utils.Accessor { func (d *DynamicResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
return nil return nil
} }
func (d *DynamicResource) SetAllowedInstances(request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF { func (d *DynamicResource) SetAllowedInstances(request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF {
d.Instances = []ResourceInstanceITF{} if WorkspaceCandidatesProvider != nil {
for k, v := range map[tools.DataType]ResourceInterface{ candidates := WorkspaceCandidatesProvider(d.Type, request)
tools.COMPUTE_RESOURCE: &ComputeResource{}, return d.SetAllowedInstancesFromSet(candidates, request, instance_id...)
tools.DATA_RESOURCE: &DataResource{},
tools.STORAGE_RESOURCE: &StorageResource{},
tools.PROCESSING_RESOURCE: &ProcessingResource{},
tools.WORKFLOW_RESOURCE: &WorkflowResource{}} {
if d.Type != k {
continue
} }
access := d.getResAccessor(k, request) d.Instances = []ResourceInstanceITF{}
m := map[string]interface{}{} d.sortAndResetInstances()
b, _ := json.Marshal(d.Filters) return d.Instances
json.Unmarshal(b, &m) }
a, _, _ := access.Search(dbs.FiltersFromFlatMap(m, v), "", false, 0, 100000)
// SetAllowedInstancesFromSet fills d.Instances from a pre-loaded workspace resource set
// instead of querying the catalog. Filters are applied in-memory against the candidates.
// Called by WorkspaceResourceSet.Fill so dynamic resources only see workspace-scoped resources.
func (d *DynamicResource) SetAllowedInstancesFromSet(candidates []ResourceInterface, request *tools.APIRequest, instance_id ...string) []ResourceInstanceITF {
d.Instances = []ResourceInstanceITF{}
d.PeerIds = map[int]string{} d.PeerIds = map[int]string{}
d.ResourceIds = map[int]string{} d.ResourceIds = map[int]string{}
for _, res := range a { for _, res := range candidates {
for _, i := range res.(ResourceInterface).SetAllowedInstances(request, instance_id...) { if !d.matchesFilters(res) {
continue
}
for _, i := range res.SetAllowedInstances(request, instance_id...) {
d.PeerIds[len(d.Instances)] = res.GetCreatorID() d.PeerIds[len(d.Instances)] = res.GetCreatorID()
d.ResourceIds[len(d.Instances)] = res.GetID() d.ResourceIds[len(d.Instances)] = res.GetID()
d.Instances = append(d.Instances, i) d.Instances = append(d.Instances, i)
} }
} }
break d.sortAndResetInstances()
return d.Instances
} }
func (d *DynamicResource) sortAndResetInstances() {
if d.SortRules != nil { if d.SortRules != nil {
sorted := make([]ResourceInstanceITF, len(d.Instances)) sorted := make([]ResourceInstanceITF, len(d.Instances))
copy(sorted, d.Instances) copy(sorted, d.Instances)
@@ -77,27 +89,197 @@ func (d *DynamicResource) SetAllowedInstances(request *tools.APIRequest, instanc
}) })
d.Instances = sorted d.Instances = sorted
} }
d.WatchedDynamicResource = []string{} d.WatchedDynamicResource = []string{}
return d.Instances
} }
func (d *DynamicResource) getResAccessor(dt tools.DataType, request *tools.APIRequest) utils.Accessor { // matchesFilters applies d.Filters in-memory against a serialized resource.
switch dt { // Keys in d.Filters are JSON tag names; Serialize returns JSON tag names — no bson conversion needed.
case tools.COMPUTE_RESOURCE: func (d *DynamicResource) matchesFilters(res ResourceInterface) bool {
return NewAccessor[*ComputeResource](dt, request) if len(d.Filters.And) == 0 && len(d.Filters.Or) == 0 {
case tools.PROCESSING_RESOURCE: return true
return NewAccessor[*ProcessingResource](dt, request) }
case tools.DATA_RESOURCE: m := res.Serialize(res)
return NewAccessor[*DataResource](dt, request) for field, fs := range d.Filters.And {
case tools.STORAGE_RESOURCE: vals := nestedVals(m, strings.Split(field, "."))
return NewAccessor[*ServiceResource](dt, request) for _, f := range fs {
case tools.WORKFLOW_RESOURCE: if !anyMatchesOp(vals, f) {
return NewAccessor[*WorkflowResource](dt, request) return false
}
}
}
if len(d.Filters.Or) > 0 {
matched := false
for field, fs := range d.Filters.Or {
vals := nestedVals(m, strings.Split(field, "."))
for _, f := range fs {
if anyMatchesOp(vals, f) {
matched = true
break
}
}
if matched {
break
}
}
if !matched {
return false
}
}
return true
}
// nestedVals navigates a dot-path into m and collects all leaf values.
// Arrays at any level are expanded: each element is recursed into.
func nestedVals(m map[string]interface{}, path []string) []interface{} {
if len(path) == 0 || m == nil {
return nil
}
val, ok := m[path[0]]
if !ok {
return nil
}
if len(path) == 1 {
if arr, ok := val.([]interface{}); ok {
return arr
}
return []interface{}{val}
}
rest := path[1:]
switch v := val.(type) {
case map[string]interface{}:
return nestedVals(v, rest)
case []interface{}:
var out []interface{}
for _, elem := range v {
if em, ok := elem.(map[string]interface{}); ok {
out = append(out, nestedVals(em, rest)...)
}
}
return out
} }
return nil return nil
} }
// anyMatchesOp returns true if at least one value in vals satisfies filter f.
func anyMatchesOp(vals []interface{}, f dbs.Filter) bool {
if f.Operator == dbs.EXISTS.String() {
exists := len(vals) > 0 && vals[0] != nil
want := true
if b, ok := f.Value.(bool); ok {
want = b
}
return exists == want
}
if f.Operator == dbs.IN.String() {
list, ok := f.Value.([]interface{})
if !ok {
return false
}
for _, v := range vals {
sv := fmt.Sprintf("%v", v)
for _, item := range list {
if sv == fmt.Sprintf("%v", item) {
return true
}
}
}
return false
}
for _, v := range vals {
if opMatches(v, f) {
return true
}
}
return false
}
func opMatches(val interface{}, f dbs.Filter) bool {
switch f.Operator {
case dbs.EQUAL.String():
return fmt.Sprintf("%v", val) == fmt.Sprintf("%v", f.Value)
case dbs.NOT.String():
return fmt.Sprintf("%v", val) != fmt.Sprintf("%v", f.Value)
case dbs.LIKE.String():
return strings.Contains(strings.ToLower(fmt.Sprintf("%v", val)), strings.ToLower(fmt.Sprintf("%v", f.Value)))
case dbs.GT.String(), dbs.GTE.String(), dbs.LT.String(), dbs.LTE.String():
return numericCmp(val, f.Value, f.Operator)
case dbs.ELEMMATCH.String():
arr, ok := val.([]interface{})
if !ok {
return false
}
sub, ok := f.Value.(map[string]interface{})
if !ok {
return false
}
for _, elem := range arr {
em, ok := elem.(map[string]interface{})
if !ok {
continue
}
allOk := true
for k, sv := range sub {
if fmt.Sprintf("%v", em[k]) != fmt.Sprintf("%v", sv) {
allOk = false
break
}
}
if allOk {
return true
}
}
return false
}
return false
}
func numericCmp(a, b interface{}, op string) bool {
fa, aOk := toFloat64(a)
fb, bOk := toFloat64(b)
if !aOk || !bOk {
sa, sb := fmt.Sprintf("%v", a), fmt.Sprintf("%v", b)
switch op {
case dbs.GT.String():
return sa > sb
case dbs.GTE.String():
return sa >= sb
case dbs.LT.String():
return sa < sb
case dbs.LTE.String():
return sa <= sb
}
return false
}
switch op {
case dbs.GT.String():
return fa > fb
case dbs.GTE.String():
return fa >= fb
case dbs.LT.String():
return fa < fb
case dbs.LTE.String():
return fa <= fb
}
return false
}
func toFloat64(v interface{}) (float64, bool) {
switch n := v.(type) {
case float64:
return n, true
case float32:
return float64(n), true
case int:
return float64(n), true
case int32:
return float64(n), true
case int64:
return float64(n), true
}
return 0, false
}
func (d *DynamicResource) AddInstances(instance ResourceInstanceITF) { func (d *DynamicResource) AddInstances(instance ResourceInstanceITF) {
d.Instances = append(d.Instances, instance) d.Instances = append(d.Instances, instance)
} }
+31
View File
@@ -115,9 +115,40 @@ func (r *WorkspaceResourceSet) Fill(request *tools.APIRequest) {
} }
} }
for _, d := range r.DynamicResources { for _, d := range r.DynamicResources {
var candidates []ResourceInterface
switch d.Type {
case tools.COMPUTE_RESOURCE:
for _, c := range r.ComputeResources {
candidates = append(candidates, c)
}
case tools.DATA_RESOURCE:
for _, c := range r.DataResources {
candidates = append(candidates, c)
}
case tools.STORAGE_RESOURCE:
for _, c := range r.StorageResources {
candidates = append(candidates, c)
}
case tools.PROCESSING_RESOURCE:
for _, c := range r.ProcessingResources {
candidates = append(candidates, c)
}
case tools.WORKFLOW_RESOURCE:
for _, c := range r.WorkflowResources {
candidates = append(candidates, c)
}
case tools.SERVICE_RESOURCE:
for _, c := range r.ServiceResources {
candidates = append(candidates, c)
}
}
if len(candidates) > 0 {
d.SetAllowedInstancesFromSet(candidates, request)
} else {
d.SetAllowedInstances(request) d.SetAllowedInstances(request)
} }
} }
}
func (r *ResourceSet) Clear() { func (r *ResourceSet) Clear() {
r.DataResources = nil r.DataResources = nil
+146
View File
@@ -1,10 +1,12 @@
package resources package resources
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"slices" "slices"
"cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/common/models" "cloud.o-forge.io/core/oc-lib/models/common/models"
@@ -110,6 +112,40 @@ func (dca *ResourceMongoAccessor[T]) LoadOne(id string) (utils.DBObject, int, er
return data, code, err return data, code, err
} }
var workspaceResourceTypes = []tools.DataType{
tools.COMPUTE_RESOURCE,
tools.DATA_RESOURCE,
tools.PROCESSING_RESOURCE,
tools.STORAGE_RESOURCE,
tools.WORKFLOW_RESOURCE,
tools.SERVICE_RESOURCE,
}
func emitResourceNATS(method tools.NATSMethod, dt tools.DataType, payload []byte) {
if !slices.Contains(workspaceResourceTypes, dt) {
return
}
tools.NewNATSCaller().SetNATSPub(method, tools.NATSResponse{
FromApp: config.GetAppName(),
Datatype: dt,
Method: int(method),
Payload: payload,
})
}
func (dca *ResourceMongoAccessor[T]) DeleteOne(id string) (utils.DBObject, int, error) {
data, code, err := dca.AbstractAccessor.LoadOne(id)
if err != nil {
return data, code, err
}
res, code, err := dca.AbstractAccessor.DeleteOne(id)
if err == nil && data != nil {
b, _ := json.Marshal(data)
go emitResourceNATS(tools.REMOVE_RESOURCE, dca.GetType(), b)
}
return res, code, err
}
func (dca *ResourceMongoAccessor[T]) UpdateOne(set map[string]interface{}, id string) (utils.DBObject, int, error) { func (dca *ResourceMongoAccessor[T]) UpdateOne(set map[string]interface{}, id string) (utils.DBObject, int, error) {
if dca.GetType() == tools.COMPUTE_RESOURCE { if dca.GetType() == tools.COMPUTE_RESOURCE {
delete(set, "architecture") delete(set, "architecture")
@@ -193,9 +229,119 @@ func (dca *ResourceMongoAccessor[T]) StoreOne(data utils.DBObject) (utils.DBObje
"resources_id": idsToUpdate, "resources_id": idsToUpdate,
}, i) }, i)
} }
if err == nil && res != nil {
b, _ := json.Marshal(res)
go emitResourceNATS(tools.CREATE_RESOURCE, dca.GetType(), b)
}
return res, code, err return res, code, err
} }
// PurgedResourcePayload holds a silently-deleted resource's type and serialized payload.
type PurgedResourcePayload struct {
DT tools.DataType
Payload []byte
}
// purgeByType searches and silently deletes all resources of type T created by creatorID.
// Uses AbstractAccessor.DeleteOne directly to bypass the NATS-emitting override.
func purgeByType[T ResourceInterface](dt tools.DataType, creatorID string) []PurgedResourcePayload {
a := NewAccessor[T](dt, nil)
if a == nil {
return nil
}
res, _, _ := a.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"creator_id": {{Operator: dbs.EQUAL.String(), Value: creatorID}},
},
}, "", false, 0, 10000)
var result []PurgedResourcePayload
for _, item := range res {
b, err := json.Marshal(item)
if err != nil {
continue
}
a.AbstractAccessor.DeleteOne(item.GetID())
result = append(result, PurgedResourcePayload{DT: dt, Payload: b})
}
return result
}
// PurgeCreatorResources deletes all catalog resources created by creatorPeerID from
// the DB without emitting NATS. Used for non-blacklist peer privilege downgrades where
// workspace state should be left untouched.
func PurgeCreatorResources(creatorPeerID string) []PurgedResourcePayload {
var result []PurgedResourcePayload
result = append(result, purgeByType[*ComputeResource](tools.COMPUTE_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*DataResource](tools.DATA_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*ProcessingResource](tools.PROCESSING_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*StorageResource](tools.STORAGE_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*WorkflowResource](tools.WORKFLOW_RESOURCE, creatorPeerID)...)
result = append(result, purgeByType[*ServiceResource](tools.SERVICE_RESOURCE, creatorPeerID)...)
return result
}
// FilterMapFromResourcePayload deserializes a resource payload by DataType, zeros out
// the AbstractInstanciatedResource (and its AbstractResource / Instances sub-fields),
// then marshals back to get only the concrete type's own JSON fields.
// Returns nil for WORKFLOW_RESOURCE and unknown types.
// JSON keys only — not BSON paths.
func FilterMapFromResourcePayload(dt tools.DataType, payload []byte) map[string]interface{} {
var m map[string]interface{}
switch dt {
case tools.COMPUTE_RESOURCE:
var r ComputeResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*ComputeResourceInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.DATA_RESOURCE:
var r DataResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*DataInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.PROCESSING_RESOURCE:
var r ProcessingResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*ProcessingInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.STORAGE_RESOURCE:
var r StorageResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*StorageResourceInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.SERVICE_RESOURCE:
var r ServiceResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractInstanciatedResource = AbstractInstanciatedResource[*ServiceInstance]{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
case tools.WORKFLOW_RESOURCE:
var r WorkflowResource
if json.Unmarshal(payload, &r) != nil {
return nil
}
r.AbstractResource = AbstractResource{}
b, _ := json.Marshal(r)
json.Unmarshal(b, &m)
default:
return nil
}
return m
}
func (dca *ResourceMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObject, int, error) { func (dca *ResourceMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
return dca.StoreOne(data) return dca.StoreOne(data)
} }
+4
View File
@@ -55,6 +55,10 @@ type Workflow struct {
Args map[string][]string `json:"args" bson:"args"` Args map[string][]string `json:"args" bson:"args"`
Exposes map[string][]models.Expose `bson:"exposes" json:"exposes"` // Expose is the execution Exposes map[string][]models.Expose `bson:"exposes" json:"exposes"` // Expose is the execution
SelectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty" bson:"selected_embedded_storages,omitempty"` SelectedEmbeddedStorages map[string]*resources.EmbeddedStorageSelection `json:"selected_embedded_storages,omitempty" bson:"selected_embedded_storages,omitempty"`
// StaleMap maps resource ID → stale bool. Populated at GET time from the
// verify campaign results stored in oc-workflow's stale cache. Not persisted.
StaleMap map[string]bool `json:"stale_map,omitempty" bson:"-"`
} }
func (d *Workflow) GetAccessor(request *tools.APIRequest) utils.Accessor { func (d *Workflow) GetAccessor(request *tools.APIRequest) utils.Accessor {
+4
View File
@@ -36,6 +36,10 @@ type Workspace struct {
// TrustMap maps resource ID → trust bool based on the creator peer's relation. // TrustMap maps resource ID → trust bool based on the creator peer's relation.
// Not persisted (bson:"-") — recomputed on every load by ComputeTrustAndClean. // Not persisted (bson:"-") — recomputed on every load by ComputeTrustAndClean.
TrustMap map[string]bool `json:"trust_map,omitempty" bson:"-"` TrustMap map[string]bool `json:"trust_map,omitempty" bson:"-"`
// StaleMap maps resource ID → stale bool. Populated at GET time from the
// verify campaign results stored in oc-workspace's stale cache. Not persisted.
StaleMap map[string]bool `json:"stale_map,omitempty" bson:"-"`
} }
func (d *Workspace) GetAccessor(request *tools.APIRequest) utils.Accessor { func (d *Workspace) GetAccessor(request *tools.APIRequest) utils.Accessor {
@@ -7,10 +7,60 @@ import (
"cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area" "cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
"cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
) )
func init() {
resources.WorkspaceCandidatesProvider = func(dt tools.DataType, request *tools.APIRequest) []resources.ResourceInterface {
res, _, _ := NewAccessor(request).Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"user_creator_id": {{Operator: dbs.EQUAL.String(), Value: request.Username}},
"active": {{Operator: dbs.EQUAL.String(), Value: true}},
},
}, "", false, 0, 1)
if len(res) == 0 {
return []resources.ResourceInterface{}
}
ws, ok := res[0].(*Workspace)
if !ok {
return []resources.ResourceInterface{}
}
// ws.Fill was already called by Search — typed slices are populated.
// Return an empty non-nil slice when the workspace exists but has no
// resources of the requested type: caller must not fall back to catalog.
out := []resources.ResourceInterface{}
switch dt {
case tools.COMPUTE_RESOURCE:
for _, c := range ws.ComputeResources {
out = append(out, c)
}
case tools.DATA_RESOURCE:
for _, c := range ws.DataResources {
out = append(out, c)
}
case tools.STORAGE_RESOURCE:
for _, c := range ws.StorageResources {
out = append(out, c)
}
case tools.PROCESSING_RESOURCE:
for _, c := range ws.ProcessingResources {
out = append(out, c)
}
case tools.WORKFLOW_RESOURCE:
for _, c := range ws.WorkflowResources {
out = append(out, c)
}
case tools.SERVICE_RESOURCE:
for _, c := range ws.ServiceResources {
out = append(out, c)
}
}
return out
}
}
// Workspace is a struct that represents a workspace // Workspace is a struct that represents a workspace
type workspaceMongoAccessor struct { type workspaceMongoAccessor struct {
utils.AbstractAccessor[*Workspace] // AbstractAccessor contains the basic fields of an accessor (model, caller) utils.AbstractAccessor[*Workspace] // AbstractAccessor contains the basic fields of an accessor (model, caller)
+16
View File
@@ -232,6 +232,19 @@ const (
// deregister the watching peerID from the creator's watcher cache. // deregister the watching peerID from the creator's watcher cache.
// Payload: { "creator_peer_id": "...", "resource_id": "..." } // Payload: { "creator_peer_id": "...", "resource_id": "..." }
PB_UNWATCH_RESOURCE PB_UNWATCH_RESOURCE
// PB_BOOKING_SYNC is emitted by master every 24 h to each known NANO.
// Payload: {"peer_id": nano.PeerID, "booking_sync_ids": ["id1", "id2", ...]}
// Nano compares the list against its own confirmed bookings and calls
// SendBookingToMaster for any it has that master is missing.
PB_BOOKING_SYNC
// PB_VERIFY_RESOURCE is emitted by oc-workspace or oc-workflow on workspace
// activation / workflow opening to verify that an embedded non-self resource
// is still current. oc-discovery forwards the request to the creator peer via
// ProtocolVerifyResource; the result comes back as a VERIFY_RESOURCE NATS event.
// Payload: { "creator_peer_id": "…", "data_type": N, "resource_payload": {…} }
PB_VERIFY_RESOURCE
) )
func GetActionString(ss string) PubSubAction { func GetActionString(ss string) PubSubAction {
@@ -272,6 +285,8 @@ func GetActionString(ss string) PubSubAction {
return PB_WATCH_RESOURCE return PB_WATCH_RESOURCE
case "unwatch_resource": case "unwatch_resource":
return PB_UNWATCH_RESOURCE return PB_UNWATCH_RESOURCE
case "booking_sync":
return PB_BOOKING_SYNC
default: default:
return NONE return NONE
} }
@@ -299,6 +314,7 @@ var path = []string{
"org_partner", // 17 PB_ORG_PARTNER "org_partner", // 17 PB_ORG_PARTNER
"watch_resource", // 18 PB_WATCH_RESOURCE "watch_resource", // 18 PB_WATCH_RESOURCE
"unwatch_resource", // 19 PB_UNWATCH_RESOURCE "unwatch_resource", // 19 PB_UNWATCH_RESOURCE
"booking_sync", // 20 PB_BOOKING_SYNC
} }
func (m PubSubAction) String() string { func (m PubSubAction) String() string {
+7 -2
View File
@@ -32,7 +32,7 @@ var meths = []string{"remove execution", "create execution", "planner execution"
"considers event", "admiralty config event", "minio config event", "pvc config event", "considers event", "admiralty config event", "minio config event", "pvc config event",
"workflow started event", "workflow step done event", "workflow done event", "workflow started event", "workflow step done event", "workflow done event",
"peer behavior event", "peer observe response event", "peer observe event", "peer behavior event", "peer observe response event", "peer observe event",
"source presign event", "org partner event", "source presign event", "org partner event", "verify resource event",
} }
const ( const (
@@ -89,6 +89,11 @@ const (
// ORG_PARTNER_EVENT is emitted by a peer to its OrganizationMaster to ask: // ORG_PARTNER_EVENT is emitted by a peer to its OrganizationMaster to ask:
// "is peer X one of your members?". The master replies via the same channel. // "is peer X one of your members?". The master replies via the same channel.
ORG_PARTNER_EVENT ORG_PARTNER_EVENT
// VERIFY_RESOURCE is emitted by oc-discovery when it receives a verify response
// from a remote peer via ProtocolVerifyResource. oc-workspace and oc-workflow
// listen to this event to update their stale caches.
VERIFY_RESOURCE
) )
func (n NATSMethod) String() string { func (n NATSMethod) String() string {
@@ -102,7 +107,7 @@ func NameToMethod(name string) NATSMethod {
CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT, PVC_CONFIG_EVENT, CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT, PVC_CONFIG_EVENT,
WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT, WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT,
PEER_BEHAVIOR_EVENT, PEER_OBSERVE_RESPONSE_EVENT, PEER_OBSERVE_EVENT, PEER_BEHAVIOR_EVENT, PEER_OBSERVE_RESPONSE_EVENT, PEER_OBSERVE_EVENT,
SOURCE_PRESIGN_EVENT, ORG_PARTNER_EVENT} { SOURCE_PRESIGN_EVENT, ORG_PARTNER_EVENT, VERIFY_RESOURCE} {
if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) { if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) {
return v return v
} }