From 74f01193fa2dee1512822fd6a906d22e123469f4 Mon Sep 17 00:00:00 2001 From: mr Date: Thu, 30 Apr 2026 14:37:36 +0200 Subject: [PATCH] debug catalog --- controllers/general.go | 3 ++- docker-compose.yml | 1 + go.mod | 2 +- go.sum | 2 ++ infrastructure/nats.go | 42 +++++++++++++++++++++++++++++------------- routers/router.go | 1 + 6 files changed, 36 insertions(+), 15 deletions(-) diff --git a/controllers/general.go b/controllers/general.go index 7a978c0..f3fa9ec 100755 --- a/controllers/general.go +++ b/controllers/general.go @@ -85,7 +85,7 @@ func Websocket(ctx context.Context, user string, groups []string, dataType int, infrastructure.SearchMu.Unlock() } fmt.Println("CLOSE !") - infrastructure.EmitNATS(user, nil, tools.PropalgationMessage{ + go infrastructure.EmitNATS(user, nil, tools.PropalgationMessage{ Action: tools.PB_CLOSE_SEARCH, DataType: dataType, }) @@ -93,6 +93,7 @@ func Websocket(ctx context.Context, user string, groups []string, dataType int, for { select { case msg, ok := <-infrastructure.SearchStream[user]: + fmt.Println("msg", msg, ok) if !ok { continue } diff --git a/docker-compose.yml b/docker-compose.yml index 8ed612c..abe3e5f 100755 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,6 +11,7 @@ services: networks: - oc labels: + - "traefik.stack=peer1" - "traefik.enable=true" - "traefik.http.routers.catalog.entrypoints=web" - "traefik.http.routers.catalog.rule=PathPrefix(`/catalog`)" diff --git a/go.mod b/go.mod index c17fc04..a9959c4 100755 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-catalog go 1.25.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260429050913-47d487ea8011 + cloud.o-forge.io/core/oc-lib v0.0.0-20260429095623-9bb3d897b305 github.com/beego/beego/v2 v2.3.8 github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 github.com/smartystreets/goconvey v1.7.2 diff --git a/go.sum b/go.sum index e146cb2..2f898bb 100755 --- a/go.sum +++ b/go.sum @@ -32,6 +32,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260427111114-318fd522895d h1:VDx58DIq91kA4 cloud.o-forge.io/core/oc-lib v0.0.0-20260427111114-318fd522895d/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= cloud.o-forge.io/core/oc-lib v0.0.0-20260429050913-47d487ea8011 h1:owV5pQ+mS5xDCKEcGTO+BgsyYrKjkISL8LDsmjEb/3s= cloud.o-forge.io/core/oc-lib v0.0.0-20260429050913-47d487ea8011/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260429095623-9bb3d897b305 h1:1A6enYMMjK+2nFd187doD8LOMbnHxl+8EZRf6gqs8Yw= +cloud.o-forge.io/core/oc-lib v0.0.0-20260429095623-9bb3d897b305/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= diff --git a/infrastructure/nats.go b/infrastructure/nats.go index 8a5c6bb..350376e 100644 --- a/infrastructure/nats.go +++ b/infrastructure/nats.go @@ -2,6 +2,7 @@ package infrastructure import ( "encoding/json" + "fmt" "slices" "sync" @@ -18,6 +19,7 @@ var ressourceCols = []oclib.LibDataEnum{ oclib.LibDataEnum(oclib.PROCESSING_RESOURCE), oclib.LibDataEnum(oclib.STORAGE_RESOURCE), oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE), + oclib.LibDataEnum(oclib.SERVICE_RESOURCE), } var SearchMu sync.RWMutex @@ -29,12 +31,15 @@ func EmitNATS(user string, groups []string, message tools.PropalgationMessage) { b, _ := json.Marshal(message) switch message.Action { case tools.PB_SEARCH: + fmt.Println("EMITNATS") SearchMu.Lock() + fmt.Println("afterlock") SearchStream[user] = make(chan []byte, 128) - SearchStreamSeen[user] = make([]string, 128) - SearchStreamExtend[user] = make([]string, 128) + SearchStreamSeen[user] = []string{} + SearchStreamExtend[user] = []string{} SearchMu.Unlock() - tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ + + go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-catalog", Datatype: -1, User: user, @@ -69,6 +74,7 @@ func ListenNATS() { if !slices.Contains(ressourceCols, oclib.LibDataEnum(resp.Datatype)) { return } + p, err := resources.ToResource(int(resp.Datatype), resp.Payload) if err != nil { return @@ -76,31 +82,41 @@ func ListenNATS() { // Exclude resources already present in the local catalog if check := oclib.NewRequestAdmin(oclib.LibDataEnum(resp.Datatype), nil).LoadOne(p.GetID()); check.Data == nil { p.SetNotInCatalog(true) - return } - SearchMu.Lock() wrapped, merr := json.Marshal(map[string]interface{}{ "dtype": p.GetType(), "data": p, }) + fmt.Println("WRAPPED CHECK") + + SearchMu.Lock() if a := SearchStreamExtend[resp.User]; len(a) > 0 { wrapped, merr = json.Marshal(map[string]interface{}{ "dtype": p.GetType(), "data": oclib.GetExtend(p, p.Extend(a...), map[tools.DataType]map[string]interface{}{}), }) } - if merr != nil { + alreadySeen := slices.Contains(SearchStreamSeen[resp.User], p.GetID()) + if !alreadySeen { + SearchStreamSeen[resp.User] = append(SearchStreamSeen[resp.User], p.GetID()) + } + if SearchStream[resp.User] == nil { + SearchStream[resp.User] = make(chan []byte) + } + ch := SearchStream[resp.User] + SearchMu.Unlock() + fmt.Println(resp.User, ch, merr, alreadySeen) + if merr != nil || alreadySeen { return } - if SearchStreamSeen[resp.User] != nil && slices.Contains(SearchStreamSeen[resp.User], p.GetID()) { - SearchStream[resp.User] <- wrapped // TODO when do we update it in our catalog ? + if ch != nil { + select { + case ch <- wrapped: + default: + fmt.Println("SEARCH channel full or closed, dropping", p.GetID()) + } } - if SearchStreamSeen[resp.User] == nil { - SearchStreamSeen[resp.User] = []string{} - } - SearchStreamSeen[resp.User] = append(SearchStreamSeen[resp.User], p.GetID()) - SearchMu.Unlock() }, // ── WORKFLOW_STEP_DONE_EVENT ───────────────────────────────────────── diff --git a/routers/router.go b/routers/router.go index 6172fae..e6bd4d5 100755 --- a/routers/router.go +++ b/routers/router.go @@ -54,6 +54,7 @@ func TypedSearchHandler(w http.ResponseWriter, r *http.Request) { } user, _, groups := oclib.ExtractTokenInfoWs(*r) b, _ := json.Marshal(map[string]string{"search": search, "type": t}) + fmt.Println("SENDE", user, search) infrastructure.EmitNATS(user, groups, tools.PropalgationMessage{ Action: tools.PB_SEARCH, DataType: dataType,