From c2105225021820be90f36546f6d1085c946f534f Mon Sep 17 00:00:00 2001 From: mr Date: Mon, 23 Feb 2026 17:18:16 +0100 Subject: [PATCH] Decentralized Peer Search --- controllers/peer.go | 45 ++++++++++++++++++++++++++++++ go.mod | 2 +- go.sum | 2 ++ infrastructure/nats.go | 62 ++++++++++++++++++++++++++++++++++++++++++ main.go | 2 ++ 5 files changed, 112 insertions(+), 1 deletion(-) create mode 100644 infrastructure/nats.go diff --git a/controllers/peer.go b/controllers/peer.go index acacf0f..3705cfa 100644 --- a/controllers/peer.go +++ b/controllers/peer.go @@ -1,7 +1,10 @@ package controllers import ( + ctx "context" "encoding/json" + "net/http" + "oc-peer/infrastructure" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/config" @@ -9,6 +12,8 @@ import ( "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/tools" beego "github.com/beego/beego/v2/server/web" + "github.com/beego/beego/v2/server/web/context" + "golang.org/x/net/websocket" ) // Operations about workflow @@ -241,6 +246,46 @@ func (o *PeerController) DeleteState() { o.ServeJSON() } +// @Title Search Decentralized +// @Description find workflow by key word +// @Param search path string true "the search you want to get" +// @Param is_draft query string false "draft wished" +// @Success 200 {workflow} models.workflow +// @router /decentralized/search/:search[get] +func (o *PeerController) SearchDecentralized() { + user, _, _ := oclib.ExtractTokenInfo(*o.Ctx.Request) + search := o.Ctx.Input.Param(":search") + b, err := json.Marshal(map[string]string{ + "search": search, + }) + infrastructure.EmitNATS(user, tools.PropalgationMessage{ + Action: tools.PB_SEARCH, + DataType: tools.PEER.EnumIndex(), + Payload: b, + }) + if err != nil { + o.Data["json"] = map[string]interface{}{ + "data": nil, + "code": 400, + "error": err, + } + o.ServeJSON() + return + } + Websocket(o.Ctx.Request.Context(), user, o.Ctx.ResponseWriter, o.Ctx.Request) +} + +func Websocket(ctx ctx.Context, user string, r *context.Response, w *http.Request) { + websocket.Handler(func(ws *websocket.Conn) { + defer ws.Close() + for { + if msg, ok := <-infrastructure.SearchStream[user]; !ok || websocket.Message.Send(ws, msg) != nil { + return + } + } + }).ServeHTTP(r, w) +} + /* Un pair change le statut d'un autre pair, alors ce dernier est joins automatiquement, on ne peut pas s'auto lié seul un externe peut faire ce processus de demande. On change le pair pour pouvoir le mettre à jour, alors, le lien se met à jour automatiquement. p1 -> update status -> link (p2) -> p2 response -> update status -> link (p1) diff --git a/go.mod b/go.mod index 6310e22..8618b68 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.24.6 toolchain go1.24.11 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260218132556-0b41e2505e2f + cloud.o-forge.io/core/oc-lib v0.0.0-20260223145640-e039fa56b64c github.com/beego/beego/v2 v2.3.8 github.com/smartystreets/goconvey v1.7.2 ) diff --git a/go.sum b/go.sum index 612e6f9..4303b26 100644 --- a/go.sum +++ b/go.sum @@ -22,6 +22,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260212123952-403913d8cf13 h1:DNIPQ7C+7wjbj cloud.o-forge.io/core/oc-lib v0.0.0-20260212123952-403913d8cf13/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo= cloud.o-forge.io/core/oc-lib v0.0.0-20260218132556-0b41e2505e2f h1:OFuJhi23D/UNwn8Jo30HDt/Sm2Ea1ljUk6IVicYSuAQ= cloud.o-forge.io/core/oc-lib v0.0.0-20260218132556-0b41e2505e2f/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo= +cloud.o-forge.io/core/oc-lib v0.0.0-20260223145640-e039fa56b64c h1:3PRvQdSSGjmw+Txkf0zWs3F+V9URq22zQCLR3o7bNBY= +cloud.o-forge.io/core/oc-lib v0.0.0-20260223145640-e039fa56b64c/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= diff --git a/infrastructure/nats.go b/infrastructure/nats.go new file mode 100644 index 0000000..d8b062c --- /dev/null +++ b/infrastructure/nats.go @@ -0,0 +1,62 @@ +package infrastructure + +import ( + "encoding/json" + "slices" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/models/peer" + "cloud.o-forge.io/core/oc-lib/tools" +) + +var ressourceCols = []oclib.LibDataEnum{ + oclib.LibDataEnum(oclib.PEER), +} + +var SearchStream = map[string]chan *peer.Peer{} + +func EmitNATS(user string, message tools.PropalgationMessage) { + b, _ := json.Marshal(message) + if message.Action == tools.PB_SEARCH { + SearchStream[user] = make(chan *peer.Peer, 128) + } + tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ + FromApp: "oc-peer", + Datatype: -1, + Method: int(tools.PROPALGATION_EVENT), + Payload: b, + }) +} + +// un ressource quand on l'ajoute à notre catalogue elle nous est étrangère. +// pour se la réaffecté à soit, on peut alors changer le créator ID. +// pour protéger une ressource l'idée serait de la signée. +// si on la stocke en base, elle va se dépréciée plus encore si le user n'est pas un partenaire. +// elle ne sera pas maintenue à jour. Si c'est une ressource publique et qu'elle change +// l'offre peut disparaitre mais subsisté chez nous. +// alors si on en dispose et qu'on souhaite l'exploité. On doit en vérifier la validité... ou... +// la mettre à jour. Le problème de la mise à jour c'est qu'on peut facilement +// overflow.... de stream pour avoir à jour sa ressource. +// donc l'idée est que la vérification soit manuelle... ou lors d'une vérification de dernière instance. + +// si une ressource est exploitée dans un workflow ou un shared workspace. +// elle doit être vérifié par les pairs engagés. +// si la donnée est déclaré comme donnée de l'emmetteur alors on vérifie que la signature est bien émise, par +// l'emmetteur. Sinon... on doit interrogé le pair qui a émit la donnée. Est ce que la donnée est à jour. +// lui va vérifier la signature de la ressource qu'il possède correspondante si elle existe, si non. AIE, +// on met à jour mais on pète une erreur. + +func ListenNATS() { + tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ + tools.SEARCH_EVENT: func(resp tools.NATSResponse) { + if !slices.Contains(ressourceCols, oclib.LibDataEnum(resp.Datatype)) { + return + } + p := &peer.Peer{} + err := json.Unmarshal(resp.Payload, p) + if err == nil { + SearchStream[resp.User] <- p // TODO when do we update it in our catalog ? + } + }, + }) +} diff --git a/main.go b/main.go index c2ae041..feccdb5 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "oc-peer/infrastructure" _ "oc-peer/routers" oclib "cloud.o-forge.io/core/oc-lib" @@ -13,5 +14,6 @@ const appname = "oc-peer" func main() { oclib.InitAPI(appname) go oclib.InitNATSDecentralizedEmitter(tools.PEER) + go infrastructure.ListenNATS() beego.Run() }