debug catalog

This commit is contained in:
mr
2026-04-30 14:37:36 +02:00
parent c03b43f844
commit 74f01193fa
6 changed files with 36 additions and 15 deletions

View File

@@ -2,6 +2,7 @@ package infrastructure
import (
"encoding/json"
"fmt"
"slices"
"sync"
@@ -18,6 +19,7 @@ var ressourceCols = []oclib.LibDataEnum{
oclib.LibDataEnum(oclib.PROCESSING_RESOURCE),
oclib.LibDataEnum(oclib.STORAGE_RESOURCE),
oclib.LibDataEnum(oclib.WORKFLOW_RESOURCE),
oclib.LibDataEnum(oclib.SERVICE_RESOURCE),
}
var SearchMu sync.RWMutex
@@ -29,12 +31,15 @@ func EmitNATS(user string, groups []string, message tools.PropalgationMessage) {
b, _ := json.Marshal(message)
switch message.Action {
case tools.PB_SEARCH:
fmt.Println("EMITNATS")
SearchMu.Lock()
fmt.Println("afterlock")
SearchStream[user] = make(chan []byte, 128)
SearchStreamSeen[user] = make([]string, 128)
SearchStreamExtend[user] = make([]string, 128)
SearchStreamSeen[user] = []string{}
SearchStreamExtend[user] = []string{}
SearchMu.Unlock()
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-catalog",
Datatype: -1,
User: user,
@@ -69,6 +74,7 @@ func ListenNATS() {
if !slices.Contains(ressourceCols, oclib.LibDataEnum(resp.Datatype)) {
return
}
p, err := resources.ToResource(int(resp.Datatype), resp.Payload)
if err != nil {
return
@@ -76,31 +82,41 @@ func ListenNATS() {
// Exclude resources already present in the local catalog
if check := oclib.NewRequestAdmin(oclib.LibDataEnum(resp.Datatype), nil).LoadOne(p.GetID()); check.Data == nil {
p.SetNotInCatalog(true)
return
}
SearchMu.Lock()
wrapped, merr := json.Marshal(map[string]interface{}{
"dtype": p.GetType(),
"data": p,
})
fmt.Println("WRAPPED CHECK")
SearchMu.Lock()
if a := SearchStreamExtend[resp.User]; len(a) > 0 {
wrapped, merr = json.Marshal(map[string]interface{}{
"dtype": p.GetType(),
"data": oclib.GetExtend(p, p.Extend(a...), map[tools.DataType]map[string]interface{}{}),
})
}
if merr != nil {
alreadySeen := slices.Contains(SearchStreamSeen[resp.User], p.GetID())
if !alreadySeen {
SearchStreamSeen[resp.User] = append(SearchStreamSeen[resp.User], p.GetID())
}
if SearchStream[resp.User] == nil {
SearchStream[resp.User] = make(chan []byte)
}
ch := SearchStream[resp.User]
SearchMu.Unlock()
fmt.Println(resp.User, ch, merr, alreadySeen)
if merr != nil || alreadySeen {
return
}
if SearchStreamSeen[resp.User] != nil && slices.Contains(SearchStreamSeen[resp.User], p.GetID()) {
SearchStream[resp.User] <- wrapped // TODO when do we update it in our catalog ?
if ch != nil {
select {
case ch <- wrapped:
default:
fmt.Println("SEARCH channel full or closed, dropping", p.GetID())
}
}
if SearchStreamSeen[resp.User] == nil {
SearchStreamSeen[resp.User] = []string{}
}
SearchStreamSeen[resp.User] = append(SearchStreamSeen[resp.User], p.GetID())
SearchMu.Unlock()
},
// ── WORKFLOW_STEP_DONE_EVENT ─────────────────────────────────────────