Adapt With Close

This commit is contained in:
mr
2026-03-12 16:20:28 +01:00
parent 57ec428d63
commit a574b55b8f
5 changed files with 26 additions and 4 deletions

View File

@@ -237,7 +237,19 @@ func (o *PeerController) DeleteState() {
func Websocket(ctx ctx.Context, user string, r http.ResponseWriter, w *http.Request) { func Websocket(ctx ctx.Context, user string, r http.ResponseWriter, w *http.Request) {
websocket.Handler(func(ws *websocket.Conn) { websocket.Handler(func(ws *websocket.Conn) {
defer ws.Close() defer func() {
ws.Close()
infrastructure.SearchMu.Lock()
if ch, ok := infrastructure.SearchStream[user]; ok {
close(ch)
delete(infrastructure.SearchStream, user)
}
infrastructure.SearchMu.Unlock()
infrastructure.EmitNATS(user, nil, tools.PropalgationMessage{
Action: tools.PB_CLOSE_SEARCH,
DataType: tools.PEER.EnumIndex(),
})
}()
for msg := range infrastructure.SearchStream[user] { for msg := range infrastructure.SearchStream[user] {
if websocket.JSON.Send(ws, msg) != nil { if websocket.JSON.Send(ws, msg) != nil {
return return

2
go.mod
View File

@@ -3,7 +3,7 @@ module oc-peer
go 1.25.0 go 1.25.0
require ( require (
cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2
github.com/beego/beego/v2 v2.3.8 github.com/beego/beego/v2 v2.3.8
github.com/smartystreets/goconvey v1.7.2 github.com/smartystreets/goconvey v1.7.2
) )

4
go.sum
View File

@@ -30,6 +30,10 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260302152414-542b0b73aba5 h1:h+Fkyj6cfwAir
cloud.o-forge.io/core/oc-lib v0.0.0-20260302152414-542b0b73aba5/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= cloud.o-forge.io/core/oc-lib v0.0.0-20260302152414-542b0b73aba5/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a h1:oCkb9l/Cvn0x6iicxIydrjfCNU+UHhKuklFgfzDa174= cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a h1:oCkb9l/Cvn0x6iicxIydrjfCNU+UHhKuklFgfzDa174=
cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= cloud.o-forge.io/core/oc-lib v0.0.0-20260312073634-2c9c42dd516a/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
cloud.o-forge.io/core/oc-lib v0.0.0-20260312104524-e28b79ac0d62 h1:sHzacZxPIKHyjL4EkgG/c7MI8gM1xmLdhaoUx2ZsH+M=
cloud.o-forge.io/core/oc-lib v0.0.0-20260312104524-e28b79ac0d62/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2 h1:DuB6SDThFVJVQ0iI0pZnBqtCE0uW+SNI7R7ndKixu2k=
cloud.o-forge.io/core/oc-lib v0.0.0-20260312141150-a335c905b3a2/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=

View File

@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"slices" "slices"
"sync"
oclib "cloud.o-forge.io/core/oc-lib" oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/config"
@@ -15,12 +16,15 @@ var ressourceCols = []oclib.LibDataEnum{
oclib.LibDataEnum(oclib.PEER), oclib.LibDataEnum(oclib.PEER),
} }
var SearchMu sync.RWMutex
var SearchStream = map[string]chan *peer.Peer{} var SearchStream = map[string]chan *peer.Peer{}
func EmitNATS(user string, groups []string, message tools.PropalgationMessage) { func EmitNATS(user string, groups []string, message tools.PropalgationMessage) {
b, _ := json.Marshal(message) b, _ := json.Marshal(message)
if message.Action == tools.PB_SEARCH { if message.Action == tools.PB_SEARCH {
SearchMu.Lock()
SearchStream[user] = make(chan *peer.Peer, 128) SearchStream[user] = make(chan *peer.Peer, 128)
SearchMu.Unlock()
} }
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-peer", FromApp: "oc-peer",
@@ -53,7 +57,7 @@ func EmitNATS(user string, groups []string, message tools.PropalgationMessage) {
func ListenNATS() { func ListenNATS() {
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
tools.CREATE_RESOURCE: func(resp tools.NATSResponse) { tools.CREATE_RESOURCE: func(resp tools.NATSResponse) {
if resp.FromApp == config.GetAppName() || resp.Datatype != tools.PEER { if resp.FromApp == config.GetAppName() || !slices.Contains(ressourceCols, oclib.LibDataEnum(resp.Datatype)) {
return return
} }
p := &peer.Peer{} p := &peer.Peer{}
@@ -96,7 +100,9 @@ func ListenNATS() {
err := json.Unmarshal(resp.Payload, p) err := json.Unmarshal(resp.Payload, p)
if err == nil { if err == nil {
fmt.Println("ADD in SEARCH STREAM") fmt.Println("ADD in SEARCH STREAM")
SearchMu.Lock()
SearchStream[resp.User] <- p // TODO when do we update it in our catalog ? SearchStream[resp.User] <- p // TODO when do we update it in our catalog ?
SearchMu.Unlock()
} }
}, },
}) })

2
ws.go
View File

@@ -19,7 +19,7 @@ func main() {
flag.Parse() flag.Parse()
args := flag.Args() args := flag.Args()
url := "ws://localhost:8093/oc/decentralized/search/test" url := "ws://localhost:8093/oc/decentralized/search/demo"
token := "" token := ""
if len(args) >= 1 { if len(args) >= 1 {