From 8c6b047ab6cf0ec1c452766cad86c95eb2fc7933 Mon Sep 17 00:00:00 2001 From: mr Date: Fri, 13 Mar 2026 10:28:45 +0100 Subject: [PATCH] Gracefull stop on websocket --- controllers/peer.go | 29 +++++++++++++++++++++++++---- infrastructure/nats.go | 14 +++++++++++++- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/controllers/peer.go b/controllers/peer.go index a05f773..015bbce 100644 --- a/controllers/peer.go +++ b/controllers/peer.go @@ -237,21 +237,42 @@ func (o *PeerController) DeleteState() { func Websocket(ctx ctx.Context, user string, r http.ResponseWriter, w *http.Request) { websocket.Handler(func(ws *websocket.Conn) { + done := make(chan struct{}) + go func() { + var discard interface{} + for { + if websocket.JSON.Receive(ws, &discard) != nil { + close(done) + return + } + } + }() defer func() { ws.Close() - infrastructure.SearchMu.Lock() if ch, ok := infrastructure.SearchStream[user]; ok { close(ch) + infrastructure.SearchMu.Lock() delete(infrastructure.SearchStream, user) + infrastructure.SearchMu.Unlock() } - infrastructure.SearchMu.Unlock() + fmt.Println("CLOSE !") infrastructure.EmitNATS(user, nil, tools.PropalgationMessage{ Action: tools.PB_CLOSE_SEARCH, DataType: tools.PEER.EnumIndex(), }) }() - for msg := range infrastructure.SearchStream[user] { - if websocket.JSON.Send(ws, msg) != nil { + for { + select { + case msg, ok := <-infrastructure.SearchStream[user]: + if !ok { + continue + } + if websocket.JSON.Send(ws, msg) != nil { + continue + } + case <-done: + return + case <-ctx.Done(): return } } diff --git a/infrastructure/nats.go b/infrastructure/nats.go index 57276f5..1204665 100644 --- a/infrastructure/nats.go +++ b/infrastructure/nats.go @@ -8,6 +8,7 @@ import ( oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/config" + "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/tools" ) @@ -87,7 +88,18 @@ func ListenNATS() { }, p.GetID()) } } else if p.Relation != peer.SELF { - access.StoreOne(p.Serialize(p)) + if p.Relation != peer.SELF { + access.StoreOne(p.Serialize(p)) + } else if data := access.Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "relation": {{Operator: dbs.EQUAL.String(), Value: peer.SELF.EnumIndex()}}, + }, + }, "", false); len(data.Data) == 0 { + access.StoreOne(p.Serialize(p)) + } else if len(data.Data) == 1 && data.Data[0].GetID() != p.GetID() { + access.DeleteOne(data.Data[0].GetID()) + access.StoreOne(p.Serialize(p)) + } } }