From a574b55b8f92356d8870a80b61b2ad914005b99d Mon Sep 17 00:00:00 2001 From: mr Date: Thu, 12 Mar 2026 16:20:28 +0100 Subject: [PATCH] Adapt With Close --- controllers/peer.go | 14 +++++++++++++- go.mod | 2 +- go.sum | 4 ++++ infrastructure/nats.go | 8 +++++++- ws.go | 2 +- 5 files changed, 26 insertions(+), 4 deletions(-) diff --git a/controllers/peer.go b/controllers/peer.go index 030c152..a05f773 100644 --- a/controllers/peer.go +++ b/controllers/peer.go @@ -237,7 +237,19 @@ func (o *PeerController) DeleteState() { func Websocket(ctx ctx.Context, user string, r http.ResponseWriter, w *http.Request) { websocket.Handler(func(ws *websocket.Conn) { - defer ws.Close() + defer func() { + ws.Close() + infrastructure.SearchMu.Lock() + if ch, ok := infrastructure.SearchStream[user]; ok { + close(ch) + delete(infrastructure.SearchStream, user) + } + infrastructure.SearchMu.Unlock() + infrastructure.EmitNATS(user, nil, tools.PropalgationMessage{ + Action: tools.PB_CLOSE_SEARCH, + DataType: tools.PEER.EnumIndex(), + }) + }() for msg := range infrastructure.SearchStream[user] { if websocket.JSON.Send(ws, msg) != nil { return diff --git a/go.mod b/go.mod index ded99c1..08edcb7 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-peer go 1.25.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a + cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2 github.com/beego/beego/v2 v2.3.8 github.com/smartystreets/goconvey v1.7.2 ) diff --git a/go.sum b/go.sum index df62527..03df4d1 100644 --- a/go.sum +++ b/go.sum @@ -30,6 +30,10 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260302152414-542b0b73aba5 h1:h+Fkyj6cfwAir cloud.o-forge.io/core/oc-lib v0.0.0-20260302152414-542b0b73aba5/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a h1:oCkb9l/Cvn0x6iicxIydrjfCNU+UHhKuklFgfzDa174= cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260312104524-e28b79ac0d62 h1:sHzacZxPIKHyjL4EkgG/c7MI8gM1xmLdhaoUx2ZsH+M= +cloud.o-forge.io/core/oc-lib v0.0.0-20260312104524-e28b79ac0d62/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2 h1:DuB6SDThFVJVQ0iI0pZnBqtCE0uW+SNI7R7ndKixu2k= +cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/infrastructure/nats.go b/infrastructure/nats.go index f5fdfc6..57276f5 100644 --- a/infrastructure/nats.go +++ b/infrastructure/nats.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "slices" + "sync" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/config" @@ -15,12 +16,15 @@ var ressourceCols = []oclib.LibDataEnum{ oclib.LibDataEnum(oclib.PEER), } +var SearchMu sync.RWMutex var SearchStream = map[string]chan *peer.Peer{} func EmitNATS(user string, groups []string, message tools.PropalgationMessage) { b, _ := json.Marshal(message) if message.Action == tools.PB_SEARCH { + SearchMu.Lock() SearchStream[user] = make(chan *peer.Peer, 128) + SearchMu.Unlock() } tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-peer", @@ -53,7 +57,7 @@ func EmitNATS(user string, groups []string, message tools.PropalgationMessage) { func ListenNATS() { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ tools.CREATE_RESOURCE: func(resp tools.NATSResponse) { - if resp.FromApp == config.GetAppName() || resp.Datatype != tools.PEER { + if resp.FromApp == config.GetAppName() || !slices.Contains(ressourceCols, oclib.LibDataEnum(resp.Datatype)) { return } p := &peer.Peer{} @@ -96,7 +100,9 @@ func ListenNATS() { err := json.Unmarshal(resp.Payload, p) if err == nil { fmt.Println("ADD in SEARCH STREAM") + SearchMu.Lock() SearchStream[resp.User] <- p // TODO when do we update it in our catalog ? + SearchMu.Unlock() } }, }) diff --git a/ws.go b/ws.go index 5defcc5..e387db7 100644 --- a/ws.go +++ b/ws.go @@ -19,7 +19,7 @@ func main() { flag.Parse() args := flag.Args() - url := "ws://localhost:8093/oc/decentralized/search/test" + url := "ws://localhost:8093/oc/decentralized/search/demo" token := "" if len(args) >= 1 {