package infrastructure import ( "encoding/json" "fmt" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resources" workspaceModel "cloud.o-forge.io/core/oc-lib/models/workspace" "cloud.o-forge.io/core/oc-lib/tools" ) const appName = "oc-workspace" // ListenNATS subscribes to resource lifecycle events emitted by remote peers. // On CREATE_RESOURCE: updates the resource in any workspace that already contains it. // On REMOVE_RESOURCE: removes the resource and adds a notification to affected workspaces. // oc-workspace handles ALL peer types except blacklisted ones (oc-catalog handles // nano/member/master/orga-master peers for its own catalog collections). func ListenNATS() { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ tools.CREATE_RESOURCE: func(resp tools.NATSResponse) { if resp.FromApp == appName { return } handleResourceUpdate(resp) }, tools.REMOVE_RESOURCE: func(resp tools.NATSResponse) { if resp.FromApp == appName { return } handleResourceDelete(resp) }, }) } func handleResourceUpdate(resp tools.NATSResponse) { var raw map[string]interface{} if err := json.Unmarshal(resp.Payload, &raw); err != nil { return } id, _ := raw["id"].(string) if id == "" { return } creatorID, _ := raw["creator_id"].(string) if isBlacklisted(creatorID) { return } workspaces := findWorkspacesWithResource(id, resp.Datatype) for _, w := range workspaces { updateResourceInWorkspace(w, resp.Datatype, resp.Payload) } if len(workspaces) > 0 { EmitWatchResource(id, creatorID, resp.Datatype) } } // EmitWatchResource tells oc-discovery to register our peer as a watcher for // a non-self resource, so we receive future CREATE/DELETE events for it. func EmitWatchResource(resourceID, creatorID string, dt tools.DataType) { emitWatcherEvent(resourceID, creatorID, dt, tools.PB_WATCH_RESOURCE) } // EmitUnwatchResource tells oc-discovery to deregister our peer as a watcher // for a resource that was removed from all local workspaces. func EmitUnwatchResource(resourceID, creatorID string, dt tools.DataType) { emitWatcherEvent(resourceID, creatorID, dt, tools.PB_UNWATCH_RESOURCE) } func emitWatcherEvent(resourceID, creatorID string, dt tools.DataType, action tools.PubSubAction) { self, err := oclib.GetMySelf() if err != nil || self == nil || creatorID == "" || creatorID == self.PeerID { return } results, _, _ := peer.NewShallowAccessor().Search(&dbs.Filters{ And: map[string][]dbs.Filter{ "peer_id": {{Operator: dbs.EQUAL.String(), Value: creatorID}}, }, }, "", false, 0, 1) if len(results) == 0 { return } p, ok := results[0].(*peer.Peer) if !ok || p.PeerID == "" { return } watchPayload, _ := json.Marshal(struct { CreatorPeerID string `json:"creator_peer_id"` ResourceID string `json:"resource_id"` }{CreatorPeerID: p.PeerID, ResourceID: resourceID}) msg, _ := json.Marshal(tools.PropalgationMessage{ DataType: int(dt), Action: action, Payload: watchPayload, }) tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: appName, Method: int(tools.PROPALGATION_EVENT), Payload: msg, }) } func handleResourceDelete(resp tools.NATSResponse) { var raw map[string]interface{} if err := json.Unmarshal(resp.Payload, &raw); err != nil { return } id, _ := raw["id"].(string) if id == "" { return } name, _ := raw["name"].(string) for _, w := range findWorkspacesWithResource(id, resp.Datatype) { removeResourceFromWorkspace(w, resp.Datatype, id, name) } } // findWorkspacesWithResource queries MongoDB for workspaces that embed a resource with the given ID. func findWorkspacesWithResource(resourceID string, dt tools.DataType) []*workspaceModel.Workspace { path := embeddedIDPath(dt) if path == "" { return nil } results := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKSPACE), nil).Search( &dbs.Filters{ And: map[string][]dbs.Filter{ path: {{Operator: dbs.EQUAL.String(), Value: resourceID}}, }, }, "", false, 0, 0, ) var ws []*workspaceModel.Workspace for _, r := range results.Data { if w, ok := r.(*workspaceModel.Workspace); ok { ws = append(ws, w) } } return ws } // embeddedIDPath returns the MongoDB dot-notation path to the resource ID within the typed // embedded array of a workspace document. AbstractResource embeds AbstractObject, so the // BSON path follows the struct nesting: .abstractresource.abstractobject.id. func embeddedIDPath(dt tools.DataType) string { const idSuffix = ".abstractresource.abstractobject.id" switch dt { case tools.DATA_RESOURCE: return "data_resources" + idSuffix case tools.COMPUTE_RESOURCE: return "compute_resources" + idSuffix case tools.STORAGE_RESOURCE: return "storage_resources" + idSuffix case tools.PROCESSING_RESOURCE: return "processing_resources" + idSuffix case tools.WORKFLOW_RESOURCE: return "workflow_resources" + idSuffix case tools.SERVICE_RESOURCE: return "service_resources" + idSuffix case tools.DYNAMIC_RESOURCE: return "dynamic_resources" + idSuffix case tools.NATIVE_TOOL: return "native_tools" + idSuffix default: return "" } } func updateResourceInWorkspace(w *workspaceModel.Workspace, dt tools.DataType, payload []byte) { updated := false switch dt { case tools.DATA_RESOURCE: var r resources.DataResource if err := json.Unmarshal(payload, &r); err != nil { return } for i, e := range w.DataResources { if e.GetID() == r.GetID() { w.DataResources[i] = &r updated = true break } } case tools.COMPUTE_RESOURCE: var r resources.ComputeResource if err := json.Unmarshal(payload, &r); err != nil { return } for i, e := range w.ComputeResources { if e.GetID() == r.GetID() { w.ComputeResources[i] = &r updated = true break } } case tools.STORAGE_RESOURCE: var r resources.StorageResource if err := json.Unmarshal(payload, &r); err != nil { return } for i, e := range w.StorageResources { if e.GetID() == r.GetID() { w.StorageResources[i] = &r updated = true break } } case tools.PROCESSING_RESOURCE: var r resources.ProcessingResource if err := json.Unmarshal(payload, &r); err != nil { return } for i, e := range w.ProcessingResources { if e.GetID() == r.GetID() { w.ProcessingResources[i] = &r updated = true break } } case tools.WORKFLOW_RESOURCE: var r resources.WorkflowResource if err := json.Unmarshal(payload, &r); err != nil { return } for i, e := range w.WorkflowResources { if e.GetID() == r.GetID() { w.WorkflowResources[i] = &r updated = true break } } case tools.SERVICE_RESOURCE: var r resources.ServiceResource if err := json.Unmarshal(payload, &r); err != nil { return } for i, e := range w.ServiceResources { if e.GetID() == r.GetID() { w.ServiceResources[i] = &r updated = true break } } case tools.DYNAMIC_RESOURCE: var r resources.DynamicResource if err := json.Unmarshal(payload, &r); err != nil { return } for i, e := range w.DynamicResources { if e.GetID() == r.GetID() { w.DynamicResources[i] = &r updated = true break } } case tools.NATIVE_TOOL: var r resources.NativeTool if err := json.Unmarshal(payload, &r); err != nil { return } for i, e := range w.NativeTools { if e.GetID() == r.GetID() { w.NativeTools[i] = &r updated = true break } } } if updated { oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKSPACE), nil).UpdateOne(w.Serialize(w), w.GetID()) } } func removeResourceFromWorkspace(w *workspaceModel.Workspace, dt tools.DataType, resourceID, name string) { removed := false switch dt { case tools.DATA_RESOURCE: var kept []*resources.DataResource for _, r := range w.DataResources { if r.GetID() == resourceID { removed = true } else { kept = append(kept, r) } } if removed { w.DataResources = kept w.Datas = removeID(w.Datas, resourceID) } case tools.COMPUTE_RESOURCE: var kept []*resources.ComputeResource for _, r := range w.ComputeResources { if r.GetID() == resourceID { removed = true } else { kept = append(kept, r) } } if removed { w.ComputeResources = kept w.Computes = removeID(w.Computes, resourceID) } case tools.STORAGE_RESOURCE: var kept []*resources.StorageResource for _, r := range w.StorageResources { if r.GetID() == resourceID { removed = true } else { kept = append(kept, r) } } if removed { w.StorageResources = kept w.Storages = removeID(w.Storages, resourceID) } case tools.PROCESSING_RESOURCE: var kept []*resources.ProcessingResource for _, r := range w.ProcessingResources { if r.GetID() == resourceID { removed = true } else { kept = append(kept, r) } } if removed { w.ProcessingResources = kept w.Processings = removeID(w.Processings, resourceID) } case tools.WORKFLOW_RESOURCE: var kept []*resources.WorkflowResource for _, r := range w.WorkflowResources { if r.GetID() == resourceID { removed = true } else { kept = append(kept, r) } } if removed { w.WorkflowResources = kept w.Workflows = removeID(w.Workflows, resourceID) } case tools.SERVICE_RESOURCE: var kept []*resources.ServiceResource for _, r := range w.ServiceResources { if r.GetID() == resourceID { removed = true } else { kept = append(kept, r) } } if removed { w.ServiceResources = kept w.Services = removeID(w.Services, resourceID) } case tools.DYNAMIC_RESOURCE: var kept []*resources.DynamicResource for _, r := range w.DynamicResources { if r.GetID() == resourceID { removed = true } else { kept = append(kept, r) } } if removed { w.DynamicResources = kept w.Dynamics = removeID(w.Dynamics, resourceID) } case tools.NATIVE_TOOL: var kept []*resources.NativeTool for _, r := range w.NativeTools { if r.GetID() == resourceID { removed = true } else { kept = append(kept, r) } } if removed { w.NativeTools = kept w.NativeTool = removeID(w.NativeTool, resourceID) } } if removed { w.Notifications = append(w.Notifications, fmt.Sprintf("resource %s (%s) removed: deleted by its creator peer", name, resourceID)) oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKSPACE), nil).UpdateOne(w.Serialize(w), w.GetID()) } } func isBlacklisted(peerID string) bool { if peerID == "" { return false } results, _, _ := peer.NewShallowAccessor().Search(&dbs.Filters{ And: map[string][]dbs.Filter{ "peer_id": {{Operator: dbs.EQUAL.String(), Value: peerID}}, }, }, "", false, 0, 1) if len(results) > 0 { if p, ok := results[0].(*peer.Peer); ok { return p.Relation == peer.BLACKLIST } } return false } func removeID(ids []string, id string) []string { var out []string for _, v := range ids { if v != id { out = append(out, v) } } return out }