Decentralized Peer Search

This commit is contained in:
mr
2026-02-23 17:18:16 +01:00
parent ac700deca5
commit c210522502
5 changed files with 112 additions and 1 deletions

View File

@@ -1,7 +1,10 @@
package controllers
import (
ctx "context"
"encoding/json"
"net/http"
"oc-peer/infrastructure"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/config"
@@ -9,6 +12,8 @@ import (
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/tools"
beego "github.com/beego/beego/v2/server/web"
"github.com/beego/beego/v2/server/web/context"
"golang.org/x/net/websocket"
)
// Operations about workflow
@@ -241,6 +246,46 @@ func (o *PeerController) DeleteState() {
o.ServeJSON()
}
// @Title Search Decentralized
// @Description find workflow by key word
// @Param search path string true "the search you want to get"
// @Param is_draft query string false "draft wished"
// @Success 200 {workflow} models.workflow
// @router /decentralized/search/:search[get]
func (o *PeerController) SearchDecentralized() {
user, _, _ := oclib.ExtractTokenInfo(*o.Ctx.Request)
search := o.Ctx.Input.Param(":search")
b, err := json.Marshal(map[string]string{
"search": search,
})
infrastructure.EmitNATS(user, tools.PropalgationMessage{
Action: tools.PB_SEARCH,
DataType: tools.PEER.EnumIndex(),
Payload: b,
})
if err != nil {
o.Data["json"] = map[string]interface{}{
"data": nil,
"code": 400,
"error": err,
}
o.ServeJSON()
return
}
Websocket(o.Ctx.Request.Context(), user, o.Ctx.ResponseWriter, o.Ctx.Request)
}
func Websocket(ctx ctx.Context, user string, r *context.Response, w *http.Request) {
websocket.Handler(func(ws *websocket.Conn) {
defer ws.Close()
for {
if msg, ok := <-infrastructure.SearchStream[user]; !ok || websocket.Message.Send(ws, msg) != nil {
return
}
}
}).ServeHTTP(r, w)
}
/*
Un pair change le statut d'un autre pair, alors ce dernier est joins automatiquement, on ne peut pas s'auto lié seul un externe peut faire ce processus de demande.
On change le pair pour pouvoir le mettre à jour, alors, le lien se met à jour automatiquement. p1 -> update status -> link (p2) -> p2 response -> update status -> link (p1)

2
go.mod
View File

@@ -5,7 +5,7 @@ go 1.24.6
toolchain go1.24.11
require (
cloud.o-forge.io/core/oc-lib v0.0.0-20260218132556-0b41e2505e2f
cloud.o-forge.io/core/oc-lib v0.0.0-20260223145640-e039fa56b64c
github.com/beego/beego/v2 v2.3.8
github.com/smartystreets/goconvey v1.7.2
)

2
go.sum
View File

@@ -22,6 +22,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260212123952-403913d8cf13 h1:DNIPQ7C+7wjbj
cloud.o-forge.io/core/oc-lib v0.0.0-20260212123952-403913d8cf13/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo=
cloud.o-forge.io/core/oc-lib v0.0.0-20260218132556-0b41e2505e2f h1:OFuJhi23D/UNwn8Jo30HDt/Sm2Ea1ljUk6IVicYSuAQ=
cloud.o-forge.io/core/oc-lib v0.0.0-20260218132556-0b41e2505e2f/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo=
cloud.o-forge.io/core/oc-lib v0.0.0-20260223145640-e039fa56b64c h1:3PRvQdSSGjmw+Txkf0zWs3F+V9URq22zQCLR3o7bNBY=
cloud.o-forge.io/core/oc-lib v0.0.0-20260223145640-e039fa56b64c/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=

62
infrastructure/nats.go Normal file
View File

@@ -0,0 +1,62 @@
package infrastructure
import (
"encoding/json"
"slices"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/tools"
)
var ressourceCols = []oclib.LibDataEnum{
oclib.LibDataEnum(oclib.PEER),
}
var SearchStream = map[string]chan *peer.Peer{}
func EmitNATS(user string, message tools.PropalgationMessage) {
b, _ := json.Marshal(message)
if message.Action == tools.PB_SEARCH {
SearchStream[user] = make(chan *peer.Peer, 128)
}
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-peer",
Datatype: -1,
Method: int(tools.PROPALGATION_EVENT),
Payload: b,
})
}
// un ressource quand on l'ajoute à notre catalogue elle nous est étrangère.
// pour se la réaffecté à soit, on peut alors changer le créator ID.
// pour protéger une ressource l'idée serait de la signée.
// si on la stocke en base, elle va se dépréciée plus encore si le user n'est pas un partenaire.
// elle ne sera pas maintenue à jour. Si c'est une ressource publique et qu'elle change
// l'offre peut disparaitre mais subsisté chez nous.
// alors si on en dispose et qu'on souhaite l'exploité. On doit en vérifier la validité... ou...
// la mettre à jour. Le problème de la mise à jour c'est qu'on peut facilement
// overflow.... de stream pour avoir à jour sa ressource.
// donc l'idée est que la vérification soit manuelle... ou lors d'une vérification de dernière instance.
// si une ressource est exploitée dans un workflow ou un shared workspace.
// elle doit être vérifié par les pairs engagés.
// si la donnée est déclaré comme donnée de l'emmetteur alors on vérifie que la signature est bien émise, par
// l'emmetteur. Sinon... on doit interrogé le pair qui a émit la donnée. Est ce que la donnée est à jour.
// lui va vérifier la signature de la ressource qu'il possède correspondante si elle existe, si non. AIE,
// on met à jour mais on pète une erreur.
func ListenNATS() {
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
tools.SEARCH_EVENT: func(resp tools.NATSResponse) {
if !slices.Contains(ressourceCols, oclib.LibDataEnum(resp.Datatype)) {
return
}
p := &peer.Peer{}
err := json.Unmarshal(resp.Payload, p)
if err == nil {
SearchStream[resp.User] <- p // TODO when do we update it in our catalog ?
}
},
})
}

View File

@@ -1,6 +1,7 @@
package main
import (
"oc-peer/infrastructure"
_ "oc-peer/routers"
oclib "cloud.o-forge.io/core/oc-lib"
@@ -13,5 +14,6 @@ const appname = "oc-peer"
func main() {
oclib.InitAPI(appname)
go oclib.InitNATSDecentralizedEmitter(tools.PEER)
go infrastructure.ListenNATS()
beego.Run()
}