diff --git a/controllers/general.go b/controllers/general.go index eb97256..a1f3b9b 100755 --- a/controllers/general.go +++ b/controllers/general.go @@ -1,6 +1,8 @@ package controllers import ( + "context" + "fmt" "net/http" "oc-catalog/infrastructure" @@ -60,25 +62,47 @@ func (o *GeneralController) GetAll() { o.ServeJSON() } -func Websocket(user string, groups []string, dataType int, w http.ResponseWriter, r *http.Request) { +func Websocket(ctx context.Context, user string, groups []string, dataType int, r http.ResponseWriter, w *http.Request) { websocket.Handler(func(ws *websocket.Conn) { + done := make(chan struct{}) + go func() { + var discard interface{} + for { + if websocket.JSON.Receive(ws, &discard) != nil { + close(done) + return + } + } + }() defer func() { ws.Close() - infrastructure.SearchMu.Lock() if ch, ok := infrastructure.SearchStream[user]; ok { close(ch) + infrastructure.SearchMu.Lock() delete(infrastructure.SearchStream, user) + infrastructure.SearchMu.Unlock() } - infrastructure.SearchMu.Unlock() - infrastructure.EmitNATS(user, groups, tools.PropalgationMessage{ + fmt.Println("CLOSE !") + infrastructure.EmitNATS(user, nil, tools.PropalgationMessage{ Action: tools.PB_CLOSE_SEARCH, DataType: dataType, }) }() - for msg := range infrastructure.SearchStream[user] { - if websocket.JSON.Send(ws, msg) != nil { + for { + select { + case msg, ok := <-infrastructure.SearchStream[user]: + fmt.Println("FOR", msg, ok) + if !ok { + continue + } + if websocket.JSON.Send(ws, msg) != nil { + continue + } + case <-done: + return + case <-ctx.Done(): return } } - }).ServeHTTP(w, r) + }).ServeHTTP(r, w) } diff --git a/infrastructure/nats.go b/infrastructure/nats.go index dd7751c..ba76e9f 100644 --- a/infrastructure/nats.go +++ b/infrastructure/nats.go @@ -26,20 +26,18 @@ func EmitNATS(user string, groups []string, message tools.PropalgationMessage) { b, _ := json.Marshal(message) switch message.Action { case tools.PB_SEARCH: - if slices.Contains(ressourceCols, oclib.LibDataEnum(message.DataType)) { - SearchMu.Lock() - SearchStream[user] = make(chan resources.ResourceInterface, 128) - SearchMu.Unlock() - } + SearchMu.Lock() + SearchStream[user] = make(chan resources.ResourceInterface, 128) + SearchMu.Unlock() + tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ + FromApp: "oc-catalog", + Datatype: -1, + User: user, + Groups: groups, + Method: int(tools.PROPALGATION_EVENT), + Payload: b, + }) } - tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ - FromApp: "oc-catalog", - Datatype: -1, - User: user, - Groups: groups, - Method: int(tools.PROPALGATION_EVENT), - Payload: b, - }) } // un ressource quand on l'ajoute à notre catalogue elle nous est étrangère. @@ -70,6 +68,7 @@ func ListenNATS() { if err == nil { fmt.Println("SearchStream", p) SearchMu.Lock() + fmt.Println(SearchStream, resp.User) SearchStream[resp.User] <- p // TODO when do we update it in our catalog ?* SearchMu.Unlock() } diff --git a/routers/router.go b/routers/router.go index 0b2caf7..84073b9 100755 --- a/routers/router.go +++ b/routers/router.go @@ -36,7 +36,7 @@ func wsSearchHandler(dataType int) http.HandlerFunc { DataType: dataType, Payload: b, }) - controllers.Websocket(user, groups, dataType, w, r) + controllers.Websocket(r.Context(), user, groups, dataType, w, r) } } diff --git a/ws.go b/ws.go index a0bd567..20c628f 100644 --- a/ws.go +++ b/ws.go @@ -26,7 +26,7 @@ func main() { // ws://localhost:8087/oc/processing/decentralized/all/search/ // ws://localhost:8087/oc/storage/decentralized/all/search/ // ws://localhost:8087/oc/workflow/decentralized/all/search/ - url := "ws://localhost:8087/oc/resource/decentralized/all/search/demo" + url := "ws://localhost:8087/oc/resource/decentralized/known/search/builder" token := "" if len(args) >= 1 {