Adapt With Close
This commit is contained in:
@@ -60,9 +60,21 @@ func (o *GeneralController) GetAll() {
|
|||||||
o.ServeJSON()
|
o.ServeJSON()
|
||||||
}
|
}
|
||||||
|
|
||||||
func Websocket(user string, w http.ResponseWriter, r *http.Request) {
|
func Websocket(user string, groups []string, dataType int, w http.ResponseWriter, r *http.Request) {
|
||||||
websocket.Handler(func(ws *websocket.Conn) {
|
websocket.Handler(func(ws *websocket.Conn) {
|
||||||
defer ws.Close()
|
defer func() {
|
||||||
|
ws.Close()
|
||||||
|
infrastructure.SearchMu.Lock()
|
||||||
|
if ch, ok := infrastructure.SearchStream[user]; ok {
|
||||||
|
close(ch)
|
||||||
|
delete(infrastructure.SearchStream, user)
|
||||||
|
}
|
||||||
|
infrastructure.SearchMu.Unlock()
|
||||||
|
infrastructure.EmitNATS(user, groups, tools.PropalgationMessage{
|
||||||
|
Action: tools.PB_CLOSE_SEARCH,
|
||||||
|
DataType: dataType,
|
||||||
|
})
|
||||||
|
}()
|
||||||
for msg := range infrastructure.SearchStream[user] {
|
for msg := range infrastructure.SearchStream[user] {
|
||||||
if websocket.JSON.Send(ws, msg) != nil {
|
if websocket.JSON.Send(ws, msg) != nil {
|
||||||
return
|
return
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -3,7 +3,7 @@ module oc-catalog
|
|||||||
go 1.25.0
|
go 1.25.0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2
|
||||||
github.com/beego/beego/v2 v2.3.8
|
github.com/beego/beego/v2 v2.3.8
|
||||||
github.com/smartystreets/goconvey v1.7.2
|
github.com/smartystreets/goconvey v1.7.2
|
||||||
)
|
)
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -44,6 +44,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260304125443-a426bdf655e1 h1:bvVWoPSoxVmLm
|
|||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260304125443-a426bdf655e1/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260304125443-a426bdf655e1/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a h1:oCkb9l/Cvn0x6iicxIydrjfCNU+UHhKuklFgfzDa174=
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a h1:oCkb9l/Cvn0x6iicxIydrjfCNU+UHhKuklFgfzDa174=
|
||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2 h1:DuB6SDThFVJVQ0iI0pZnBqtCE0uW+SNI7R7ndKixu2k=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
||||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||||
github.com/beego/beego/v2 v2.3.4 h1:HurQEOGIEhLlPFCTR6ZDuQkybrUl2Ag2i6CdVD2rGiI=
|
github.com/beego/beego/v2 v2.3.4 h1:HurQEOGIEhLlPFCTR6ZDuQkybrUl2Ag2i6CdVD2rGiI=
|
||||||
github.com/beego/beego/v2 v2.3.4/go.mod h1:5cqHsOHJIxkq44tBpRvtDe59GuVRVv/9/tyVDxd5ce4=
|
github.com/beego/beego/v2 v2.3.4/go.mod h1:5cqHsOHJIxkq44tBpRvtDe59GuVRVv/9/tyVDxd5ce4=
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"slices"
|
"slices"
|
||||||
|
"sync"
|
||||||
|
|
||||||
oclib "cloud.o-forge.io/core/oc-lib"
|
oclib "cloud.o-forge.io/core/oc-lib"
|
||||||
"cloud.o-forge.io/core/oc-lib/models/resources"
|
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||||||
@@ -18,12 +19,18 @@ var ressourceCols = []oclib.LibDataEnum{
|
|||||||
oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE),
|
oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var SearchMu sync.RWMutex
|
||||||
var SearchStream = map[string]chan resources.ResourceInterface{}
|
var SearchStream = map[string]chan resources.ResourceInterface{}
|
||||||
|
|
||||||
func EmitNATS(user string, groups []string, message tools.PropalgationMessage) {
|
func EmitNATS(user string, groups []string, message tools.PropalgationMessage) {
|
||||||
b, _ := json.Marshal(message)
|
b, _ := json.Marshal(message)
|
||||||
if message.Action == tools.PB_SEARCH {
|
switch message.Action {
|
||||||
SearchStream[user] = make(chan resources.ResourceInterface, 128)
|
case tools.PB_SEARCH:
|
||||||
|
if slices.Contains(ressourceCols, oclib.LibDataEnum(message.DataType)) {
|
||||||
|
SearchMu.Lock()
|
||||||
|
SearchStream[user] = make(chan resources.ResourceInterface, 128)
|
||||||
|
SearchMu.Unlock()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
|
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
|
||||||
FromApp: "oc-catalog",
|
FromApp: "oc-catalog",
|
||||||
@@ -62,7 +69,9 @@ func ListenNATS() {
|
|||||||
p, err := resources.ToResource(int(resp.Datatype), resp.Payload)
|
p, err := resources.ToResource(int(resp.Datatype), resp.Payload)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
fmt.Println("SearchStream", p)
|
fmt.Println("SearchStream", p)
|
||||||
SearchStream[resp.User] <- p // TODO when do we update it in our catalog ?
|
SearchMu.Lock()
|
||||||
|
SearchStream[resp.User] <- p // TODO when do we update it in our catalog ?*
|
||||||
|
SearchMu.Unlock()
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ func wsSearchHandler(dataType int) http.HandlerFunc {
|
|||||||
DataType: dataType,
|
DataType: dataType,
|
||||||
Payload: b,
|
Payload: b,
|
||||||
})
|
})
|
||||||
controllers.Websocket(user, w, r)
|
controllers.Websocket(user, groups, dataType, w, r)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user