diff --git a/Dockerfile b/Dockerfile index f47f1a7..e77dc48 100755 --- a/Dockerfile +++ b/Dockerfile @@ -3,6 +3,8 @@ FROM golang:alpine AS deps WORKDIR /app COPY go.mod go.sum ./ RUN sed -i '/replace/d' go.mod +RUN apk add --no-cache git +RUN git config --global url."https://cloud.o-forge.io/".insteadOf "http://cloud.o-forge.io/" RUN go mod download #---------------------------------------------------------------------------------------------- diff --git a/controllers/workspace.go b/controllers/workspace.go index 077c083..be0d5d4 100755 --- a/controllers/workspace.go +++ b/controllers/workspace.go @@ -6,7 +6,9 @@ import ( "strconv" oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/models/workspace" "cloud.o-forge.io/core/oc-lib/tools" + "oc-workspace/infrastructure" beego "github.com/beego/beego/v2/server/web" ) @@ -53,18 +55,101 @@ func (o *WorkspaceController) Search() { // @Success 200 {workspace} models.workspace // @router /:id [put] func (o *WorkspaceController) Put() { - //user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) - // store and return Id or post with UUID - caller := tools.NewHTTPCaller(paths) // generate a http caller to send to peer shared workspace + caller := tools.NewHTTPCaller(paths) caller.Disabled = oclib.IsQueryParamsEquals(o.Ctx.Input, "is_remote", true) var res map[string]interface{} id := o.Ctx.Input.Param(":id") - json.Unmarshal(o.Ctx.Input.CopyBody(10000), &res) - // o.Data["json"] = oclib.NewRequest(oclib.LibDataEnum(oclib.WORKSPACE), user, peerID, groups, caller).UpdateOne(res, id) + json.Unmarshal(o.Ctx.Input.CopyBody(2000000), &res) + + // Snapshot resource IDs before the update to detect additions and removals. + oldResources := loadWorkspaceResourceMap(id) + o.Data["json"] = oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKSPACE), caller).UpdateOne(res, id) + + // Emit watch/unwatch events for resources that changed in this workspace. + go diffAndEmitWatchers(oldResources, extractResourceMap(res)) + o.ServeJSON() } +// resourceEntry holds the resource ID and its creator's libp2p PeerID. +type resourceEntry struct { + creatorPeerID string + dataType tools.DataType +} + +// loadWorkspaceResourceMap returns the current resourceID→entry map for a workspace. +func loadWorkspaceResourceMap(id string) map[string]resourceEntry { + result := map[string]resourceEntry{} + data := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKSPACE), nil).LoadOne(id) + if data.Data == nil { + return result + } + w, ok := data.Data.(*workspace.Workspace) + if !ok { + return result + } + for _, r := range w.DataResources { result[r.GetID()] = resourceEntry{creatorPeerID: r.GetCreatorID(), dataType: tools.DATA_RESOURCE} } + for _, r := range w.ComputeResources { result[r.GetID()] = resourceEntry{creatorPeerID: r.GetCreatorID(), dataType: tools.COMPUTE_RESOURCE} } + for _, r := range w.StorageResources { result[r.GetID()] = resourceEntry{creatorPeerID: r.GetCreatorID(), dataType: tools.STORAGE_RESOURCE} } + for _, r := range w.ProcessingResources { result[r.GetID()] = resourceEntry{creatorPeerID: r.GetCreatorID(), dataType: tools.PROCESSING_RESOURCE} } + for _, r := range w.WorkflowResources { result[r.GetID()] = resourceEntry{creatorPeerID: r.GetCreatorID(), dataType: tools.WORKFLOW_RESOURCE} } + for _, r := range w.ServiceResources { result[r.GetID()] = resourceEntry{creatorPeerID: r.GetCreatorID(), dataType: tools.SERVICE_RESOURCE} } + for _, r := range w.DynamicResources { result[r.GetID()] = resourceEntry{creatorPeerID: r.GetCreatorID(), dataType: tools.DYNAMIC_RESOURCE} } + for _, r := range w.NativeTools { result[r.GetID()] = resourceEntry{creatorPeerID: r.GetCreatorID(), dataType: tools.NATIVE_TOOL} } + return result +} + +// extractResourceMap parses the PUT body to build a resourceID→entry map. +// creator_id is at the top level of each resource object (from Flutter toJSON()). +var resourceArrayKeys = map[string]tools.DataType{ + "data_resources": tools.DATA_RESOURCE, + "compute_resources": tools.COMPUTE_RESOURCE, + "storage_resources": tools.STORAGE_RESOURCE, + "processing_resources": tools.PROCESSING_RESOURCE, + "workflow_resources": tools.WORKFLOW_RESOURCE, + "service_resources": tools.SERVICE_RESOURCE, + "dynamic_resources": tools.DYNAMIC_RESOURCE, + "native_tools": tools.NATIVE_TOOL, +} + +func extractResourceMap(body map[string]interface{}) map[string]resourceEntry { + result := map[string]resourceEntry{} + for key, dt := range resourceArrayKeys { + arr, ok := body[key].([]interface{}) + if !ok { + continue + } + for _, rawR := range arr { + r, ok := rawR.(map[string]interface{}) + if !ok { + continue + } + id, _ := r["id"].(string) + creatorID, _ := r["creator_id"].(string) + if id != "" { + result[id] = resourceEntry{creatorPeerID: creatorID, dataType: dt} + } + } + } + return result +} + +// diffAndEmitWatchers emits PB_WATCH_RESOURCE for added non-self resources +// and PB_UNWATCH_RESOURCE for removed ones. +func diffAndEmitWatchers(old, new map[string]resourceEntry) { + for id, entry := range new { + if _, existed := old[id]; !existed { + infrastructure.EmitWatchResource(id, entry.creatorPeerID, entry.dataType) + } + } + for id, entry := range old { + if _, stillPresent := new[id]; !stillPresent { + infrastructure.EmitUnwatchResource(id, entry.creatorPeerID, entry.dataType) + } + } +} + // @Title Create // @Description create workspace // @Param data body json true "body for data content (Json format)" @@ -125,3 +210,4 @@ func (o *WorkspaceController) Delete() { o.Data["json"] = oclib.NewRequest(oclib.LibDataEnum(oclib.WORKSPACE), user, peerID, groups, caller).DeleteOne(id) o.ServeJSON() } + diff --git a/go.mod b/go.mod index 9426335..377834f 100755 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-workspace go 1.25.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260527135023-cef23b5f307b + cloud.o-forge.io/core/oc-lib v0.0.0-20260605135650-1425a3149455 github.com/beego/beego/v2 v2.3.8 github.com/smartystreets/goconvey v1.7.2 ) diff --git a/go.sum b/go.sum index fa83aba..bc2d658 100755 --- a/go.sum +++ b/go.sum @@ -14,6 +14,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260427111114-318fd522895d h1:VDx58DIq91kA4 cloud.o-forge.io/core/oc-lib v0.0.0-20260427111114-318fd522895d/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= cloud.o-forge.io/core/oc-lib v0.0.0-20260527135023-cef23b5f307b h1:TWhmHeurbBmdyevREh4+mHWOBehO2AK587RCIjCfvOc= cloud.o-forge.io/core/oc-lib v0.0.0-20260527135023-cef23b5f307b/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260605135650-1425a3149455 h1:hDsqGw1EUY2b4mB+aUFSuQO75t+l+Ow9vZgjHZDK3uw= +cloud.o-forge.io/core/oc-lib v0.0.0-20260605135650-1425a3149455/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/beego/beego/v2 v2.3.8 h1:wplhB1pF4TxR+2SS4PUej8eDoH4xGfxuHfS7wAk9VBc= github.com/beego/beego/v2 v2.3.8/go.mod h1:8vl9+RrXqvodrl9C8yivX1e6le6deCK6RWeq8R7gTTg= diff --git a/infrastructure/nats.go b/infrastructure/nats.go new file mode 100644 index 0000000..4c4968f --- /dev/null +++ b/infrastructure/nats.go @@ -0,0 +1,415 @@ +package infrastructure + +import ( + "encoding/json" + "fmt" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/dbs" + "cloud.o-forge.io/core/oc-lib/models/peer" + "cloud.o-forge.io/core/oc-lib/models/resources" + workspaceModel "cloud.o-forge.io/core/oc-lib/models/workspace" + "cloud.o-forge.io/core/oc-lib/tools" +) + +const appName = "oc-workspace" + +// ListenNATS subscribes to resource lifecycle events emitted by remote peers. +// On CREATE_RESOURCE: updates the resource in any workspace that already contains it. +// On REMOVE_RESOURCE: removes the resource and adds a notification to affected workspaces. +// oc-workspace handles ALL peer types except blacklisted ones (oc-catalog handles +// nano/member/master/orga-master peers for its own catalog collections). +func ListenNATS() { + tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ + tools.CREATE_RESOURCE: func(resp tools.NATSResponse) { + if resp.FromApp == appName { + return + } + handleResourceUpdate(resp) + }, + tools.REMOVE_RESOURCE: func(resp tools.NATSResponse) { + if resp.FromApp == appName { + return + } + handleResourceDelete(resp) + }, + }) +} + +func handleResourceUpdate(resp tools.NATSResponse) { + var raw map[string]interface{} + if err := json.Unmarshal(resp.Payload, &raw); err != nil { + return + } + id, _ := raw["id"].(string) + if id == "" { + return + } + creatorID, _ := raw["creator_id"].(string) + if isBlacklisted(creatorID) { + return + } + workspaces := findWorkspacesWithResource(id, resp.Datatype) + for _, w := range workspaces { + updateResourceInWorkspace(w, resp.Datatype, resp.Payload) + } + if len(workspaces) > 0 { + EmitWatchResource(id, creatorID, resp.Datatype) + } +} + +// EmitWatchResource tells oc-discovery to register our peer as a watcher for +// a non-self resource, so we receive future CREATE/DELETE events for it. +func EmitWatchResource(resourceID, creatorID string, dt tools.DataType) { + emitWatcherEvent(resourceID, creatorID, dt, tools.PB_WATCH_RESOURCE) +} + +// EmitUnwatchResource tells oc-discovery to deregister our peer as a watcher +// for a resource that was removed from all local workspaces. +func EmitUnwatchResource(resourceID, creatorID string, dt tools.DataType) { + emitWatcherEvent(resourceID, creatorID, dt, tools.PB_UNWATCH_RESOURCE) +} + +func emitWatcherEvent(resourceID, creatorID string, dt tools.DataType, action tools.PubSubAction) { + self, err := oclib.GetMySelf() + if err != nil || self == nil || creatorID == "" || creatorID == self.PeerID { + return + } + results, _, _ := peer.NewShallowAccessor().Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "peer_id": {{Operator: dbs.EQUAL.String(), Value: creatorID}}, + }, + }, "", false, 0, 1) + if len(results) == 0 { + return + } + p, ok := results[0].(*peer.Peer) + if !ok || p.PeerID == "" { + return + } + watchPayload, _ := json.Marshal(struct { + CreatorPeerID string `json:"creator_peer_id"` + ResourceID string `json:"resource_id"` + }{CreatorPeerID: p.PeerID, ResourceID: resourceID}) + msg, _ := json.Marshal(tools.PropalgationMessage{ + DataType: int(dt), + Action: action, + Payload: watchPayload, + }) + tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ + FromApp: appName, + Method: int(tools.PROPALGATION_EVENT), + Payload: msg, + }) +} + +func handleResourceDelete(resp tools.NATSResponse) { + var raw map[string]interface{} + if err := json.Unmarshal(resp.Payload, &raw); err != nil { + return + } + id, _ := raw["id"].(string) + if id == "" { + return + } + name, _ := raw["name"].(string) + for _, w := range findWorkspacesWithResource(id, resp.Datatype) { + removeResourceFromWorkspace(w, resp.Datatype, id, name) + } +} + +// findWorkspacesWithResource queries MongoDB for workspaces that embed a resource with the given ID. +func findWorkspacesWithResource(resourceID string, dt tools.DataType) []*workspaceModel.Workspace { + path := embeddedIDPath(dt) + if path == "" { + return nil + } + results := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKSPACE), nil).Search( + &dbs.Filters{ + And: map[string][]dbs.Filter{ + path: {{Operator: dbs.EQUAL.String(), Value: resourceID}}, + }, + }, "", false, 0, 0, + ) + var ws []*workspaceModel.Workspace + for _, r := range results.Data { + if w, ok := r.(*workspaceModel.Workspace); ok { + ws = append(ws, w) + } + } + return ws +} + +// embeddedIDPath returns the MongoDB dot-notation path to the resource ID within the typed +// embedded array of a workspace document. AbstractResource embeds AbstractObject, so the +// BSON path follows the struct nesting: .abstractresource.abstractobject.id. +func embeddedIDPath(dt tools.DataType) string { + const idSuffix = ".abstractresource.abstractobject.id" + switch dt { + case tools.DATA_RESOURCE: + return "data_resources" + idSuffix + case tools.COMPUTE_RESOURCE: + return "compute_resources" + idSuffix + case tools.STORAGE_RESOURCE: + return "storage_resources" + idSuffix + case tools.PROCESSING_RESOURCE: + return "processing_resources" + idSuffix + case tools.WORKFLOW_RESOURCE: + return "workflow_resources" + idSuffix + case tools.SERVICE_RESOURCE: + return "service_resources" + idSuffix + case tools.DYNAMIC_RESOURCE: + return "dynamic_resources" + idSuffix + case tools.NATIVE_TOOL: + return "native_tools" + idSuffix + default: + return "" + } +} + +func updateResourceInWorkspace(w *workspaceModel.Workspace, dt tools.DataType, payload []byte) { + updated := false + switch dt { + case tools.DATA_RESOURCE: + var r resources.DataResource + if err := json.Unmarshal(payload, &r); err != nil { + return + } + for i, e := range w.DataResources { + if e.GetID() == r.GetID() { + w.DataResources[i] = &r + updated = true + break + } + } + case tools.COMPUTE_RESOURCE: + var r resources.ComputeResource + if err := json.Unmarshal(payload, &r); err != nil { + return + } + for i, e := range w.ComputeResources { + if e.GetID() == r.GetID() { + w.ComputeResources[i] = &r + updated = true + break + } + } + case tools.STORAGE_RESOURCE: + var r resources.StorageResource + if err := json.Unmarshal(payload, &r); err != nil { + return + } + for i, e := range w.StorageResources { + if e.GetID() == r.GetID() { + w.StorageResources[i] = &r + updated = true + break + } + } + case tools.PROCESSING_RESOURCE: + var r resources.ProcessingResource + if err := json.Unmarshal(payload, &r); err != nil { + return + } + for i, e := range w.ProcessingResources { + if e.GetID() == r.GetID() { + w.ProcessingResources[i] = &r + updated = true + break + } + } + case tools.WORKFLOW_RESOURCE: + var r resources.WorkflowResource + if err := json.Unmarshal(payload, &r); err != nil { + return + } + for i, e := range w.WorkflowResources { + if e.GetID() == r.GetID() { + w.WorkflowResources[i] = &r + updated = true + break + } + } + case tools.SERVICE_RESOURCE: + var r resources.ServiceResource + if err := json.Unmarshal(payload, &r); err != nil { + return + } + for i, e := range w.ServiceResources { + if e.GetID() == r.GetID() { + w.ServiceResources[i] = &r + updated = true + break + } + } + case tools.DYNAMIC_RESOURCE: + var r resources.DynamicResource + if err := json.Unmarshal(payload, &r); err != nil { + return + } + for i, e := range w.DynamicResources { + if e.GetID() == r.GetID() { + w.DynamicResources[i] = &r + updated = true + break + } + } + case tools.NATIVE_TOOL: + var r resources.NativeTool + if err := json.Unmarshal(payload, &r); err != nil { + return + } + for i, e := range w.NativeTools { + if e.GetID() == r.GetID() { + w.NativeTools[i] = &r + updated = true + break + } + } + } + if updated { + oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKSPACE), nil).UpdateOne(w.Serialize(w), w.GetID()) + } +} + +func removeResourceFromWorkspace(w *workspaceModel.Workspace, dt tools.DataType, resourceID, name string) { + removed := false + switch dt { + case tools.DATA_RESOURCE: + var kept []*resources.DataResource + for _, r := range w.DataResources { + if r.GetID() == resourceID { + removed = true + } else { + kept = append(kept, r) + } + } + if removed { + w.DataResources = kept + w.Datas = removeID(w.Datas, resourceID) + } + case tools.COMPUTE_RESOURCE: + var kept []*resources.ComputeResource + for _, r := range w.ComputeResources { + if r.GetID() == resourceID { + removed = true + } else { + kept = append(kept, r) + } + } + if removed { + w.ComputeResources = kept + w.Computes = removeID(w.Computes, resourceID) + } + case tools.STORAGE_RESOURCE: + var kept []*resources.StorageResource + for _, r := range w.StorageResources { + if r.GetID() == resourceID { + removed = true + } else { + kept = append(kept, r) + } + } + if removed { + w.StorageResources = kept + w.Storages = removeID(w.Storages, resourceID) + } + case tools.PROCESSING_RESOURCE: + var kept []*resources.ProcessingResource + for _, r := range w.ProcessingResources { + if r.GetID() == resourceID { + removed = true + } else { + kept = append(kept, r) + } + } + if removed { + w.ProcessingResources = kept + w.Processings = removeID(w.Processings, resourceID) + } + case tools.WORKFLOW_RESOURCE: + var kept []*resources.WorkflowResource + for _, r := range w.WorkflowResources { + if r.GetID() == resourceID { + removed = true + } else { + kept = append(kept, r) + } + } + if removed { + w.WorkflowResources = kept + w.Workflows = removeID(w.Workflows, resourceID) + } + case tools.SERVICE_RESOURCE: + var kept []*resources.ServiceResource + for _, r := range w.ServiceResources { + if r.GetID() == resourceID { + removed = true + } else { + kept = append(kept, r) + } + } + if removed { + w.ServiceResources = kept + w.Services = removeID(w.Services, resourceID) + } + case tools.DYNAMIC_RESOURCE: + var kept []*resources.DynamicResource + for _, r := range w.DynamicResources { + if r.GetID() == resourceID { + removed = true + } else { + kept = append(kept, r) + } + } + if removed { + w.DynamicResources = kept + w.Dynamics = removeID(w.Dynamics, resourceID) + } + case tools.NATIVE_TOOL: + var kept []*resources.NativeTool + for _, r := range w.NativeTools { + if r.GetID() == resourceID { + removed = true + } else { + kept = append(kept, r) + } + } + if removed { + w.NativeTools = kept + w.NativeTool = removeID(w.NativeTool, resourceID) + } + } + if removed { + w.Notifications = append(w.Notifications, + fmt.Sprintf("resource %s (%s) removed: deleted by its creator peer", name, resourceID)) + oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKSPACE), nil).UpdateOne(w.Serialize(w), w.GetID()) + } +} + +func isBlacklisted(peerID string) bool { + if peerID == "" { + return false + } + results, _, _ := peer.NewShallowAccessor().Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "peer_id": {{Operator: dbs.EQUAL.String(), Value: peerID}}, + }, + }, "", false, 0, 1) + if len(results) > 0 { + if p, ok := results[0].(*peer.Peer); ok { + return p.Relation == peer.BLACKLIST + } + } + return false +} + +func removeID(ids []string, id string) []string { + var out []string + for _, v := range ids { + if v != id { + out = append(out, v) + } + } + return out +} diff --git a/main.go b/main.go index 0ce5f53..8a45e11 100755 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "oc-workspace/infrastructure" _ "oc-workspace/routers" oclib "cloud.o-forge.io/core/oc-lib" @@ -11,5 +12,6 @@ const appname = "oc-workspace" func main() { oclib.InitAPI(appname) + go infrastructure.ListenNATS() beego.Run() } diff --git a/routers/router.go b/routers/router.go index 542cbdc..1ba6f82 100755 --- a/routers/router.go +++ b/routers/router.go @@ -24,6 +24,5 @@ func init() { ), ), ) - beego.AddNamespace(ns) }