Search & Gracefull Close on Websocket

This commit is contained in:
mr
2026-03-13 10:28:24 +01:00
parent a11d764729
commit 80bb9ffbed
4 changed files with 45 additions and 22 deletions

View File

@@ -1,6 +1,8 @@
package controllers
import (
"context"
"fmt"
"net/http"
"oc-catalog/infrastructure"
@@ -60,25 +62,47 @@ func (o *GeneralController) GetAll() {
o.ServeJSON()
}
func Websocket(user string, groups []string, dataType int, w http.ResponseWriter, r *http.Request) {
func Websocket(ctx context.Context, user string, groups []string, dataType int, r http.ResponseWriter, w *http.Request) {
websocket.Handler(func(ws *websocket.Conn) {
done := make(chan struct{})
go func() {
var discard interface{}
for {
if websocket.JSON.Receive(ws, &discard) != nil {
close(done)
return
}
}
}()
defer func() {
ws.Close()
infrastructure.SearchMu.Lock()
if ch, ok := infrastructure.SearchStream[user]; ok {
close(ch)
infrastructure.SearchMu.Lock()
delete(infrastructure.SearchStream, user)
}
infrastructure.SearchMu.Unlock()
infrastructure.EmitNATS(user, groups, tools.PropalgationMessage{
}
fmt.Println("CLOSE !")
infrastructure.EmitNATS(user, nil, tools.PropalgationMessage{
Action: tools.PB_CLOSE_SEARCH,
DataType: dataType,
})
}()
for msg := range infrastructure.SearchStream[user] {
for {
select {
case msg, ok := <-infrastructure.SearchStream[user]:
fmt.Println("FOR", msg, ok)
if !ok {
continue
}
if websocket.JSON.Send(ws, msg) != nil {
continue
}
case <-done:
return
case <-ctx.Done():
return
}
}
}).ServeHTTP(w, r)
}).ServeHTTP(r, w)
}

View File

@@ -26,12 +26,9 @@ func EmitNATS(user string, groups []string, message tools.PropalgationMessage) {
b, _ := json.Marshal(message)
switch message.Action {
case tools.PB_SEARCH:
if slices.Contains(ressourceCols, oclib.LibDataEnum(message.DataType)) {
SearchMu.Lock()
SearchStream[user] = make(chan resources.ResourceInterface, 128)
SearchMu.Unlock()
}
}
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-catalog",
Datatype: -1,
@@ -41,6 +38,7 @@ func EmitNATS(user string, groups []string, message tools.PropalgationMessage) {
Payload: b,
})
}
}
// un ressource quand on l'ajoute à notre catalogue elle nous est étrangère.
// pour se la réaffecté à soit, on peut alors changer le créator ID.
@@ -70,6 +68,7 @@ func ListenNATS() {
if err == nil {
fmt.Println("SearchStream", p)
SearchMu.Lock()
fmt.Println(SearchStream, resp.User)
SearchStream[resp.User] <- p // TODO when do we update it in our catalog ?*
SearchMu.Unlock()
}

View File

@@ -36,7 +36,7 @@ func wsSearchHandler(dataType int) http.HandlerFunc {
DataType: dataType,
Payload: b,
})
controllers.Websocket(user, groups, dataType, w, r)
controllers.Websocket(r.Context(), user, groups, dataType, w, r)
}
}

2
ws.go
View File

@@ -26,7 +26,7 @@ func main() {
// ws://localhost:8087/oc/processing/decentralized/all/search/<term>
// ws://localhost:8087/oc/storage/decentralized/all/search/<term>
// ws://localhost:8087/oc/workflow/decentralized/all/search/<term>
url := "ws://localhost:8087/oc/resource/decentralized/all/search/demo"
url := "ws://localhost:8087/oc/resource/decentralized/known/search/builder"
token := ""
if len(args) >= 1 {