Fully Working Network Peers
This commit is contained in:
@@ -4,7 +4,6 @@ import (
|
||||
ctx "context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"oc-peer/infrastructure"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
@@ -12,7 +11,7 @@ import (
|
||||
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
beego "github.com/beego/beego/v2/server/web"
|
||||
"golang.org/x/net/websocket"
|
||||
"github.com/gorilla/websocket"
|
||||
)
|
||||
|
||||
// Operations about workflow
|
||||
@@ -25,7 +24,7 @@ type PeerController struct {
|
||||
// @Param search path string true "the word search you want to get"
|
||||
// @Param is_draft query string false
|
||||
// @Success 200 {workspace} models.workspace
|
||||
// @router /peer/search/:search [get]
|
||||
// @router /search/:search [get]
|
||||
func (o *PeerController) Search() {
|
||||
user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
||||
// store and return Id or post with UUIDLibDataEnum
|
||||
@@ -39,7 +38,7 @@ func (o *PeerController) Search() {
|
||||
// @Description find all peer
|
||||
// @Param is_draft query string false
|
||||
// @Success 200 {peer} models.peer
|
||||
// @router /peer [get]
|
||||
// @router / [get]
|
||||
func (o *PeerController) GetAll() {
|
||||
user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
||||
isDraft := o.Ctx.Input.Query("is_draft")
|
||||
@@ -60,7 +59,7 @@ func (o *PeerController) GetAll() {
|
||||
// @Description find peer by peerid
|
||||
// @Param id path string true "the peer id you want to get"
|
||||
// @Success 200 {peer} models.peer
|
||||
// @router /peer/:id [get]
|
||||
// @router /:id [get]
|
||||
func (o *PeerController) Get() {
|
||||
user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
||||
id := o.Ctx.Input.Param(":id")
|
||||
@@ -72,19 +71,10 @@ func (o *PeerController) Get() {
|
||||
// @Description find peer by peerid
|
||||
// @Param id path string true "the peer id you want to get"
|
||||
// @Success 200 {peer} models.peer
|
||||
// @router /peer/:id/valid [get]
|
||||
// @router /valid/:id [get]
|
||||
func (o *PeerController) Valid() {
|
||||
user, _, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
||||
id := o.Ctx.Input.Param(":id")
|
||||
/*if ok, _ := oclib.IsMySelf(peerID); !ok {
|
||||
o.Data["json"] = map[string]interface{}{
|
||||
"data": nil,
|
||||
"code": 400,
|
||||
"error": "can't validate a link relation",
|
||||
}
|
||||
o.ServeJSON()
|
||||
return
|
||||
}*/
|
||||
if ok, _ := oclib.IsMySelf(id); ok {
|
||||
o.Data["json"] = map[string]interface{}{
|
||||
"data": nil,
|
||||
@@ -94,6 +84,7 @@ func (o *PeerController) Valid() {
|
||||
o.ServeJSON()
|
||||
return
|
||||
}
|
||||
fmt.Println(id)
|
||||
req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||
l := req.LoadOne(id)
|
||||
if l.Data != nil && l.ToPeer().Verify && (l.ToPeer().Relation == peer.PARTNER || l.ToPeer().Relation == peer.PENDING_PARTNER) {
|
||||
@@ -122,67 +113,71 @@ func (o *PeerController) Valid() {
|
||||
o.ServeJSON()
|
||||
}
|
||||
|
||||
// @Title unknown
|
||||
// @Description add unknown peer by peerid
|
||||
// @Title known
|
||||
// @Description add kwown peer by peerid
|
||||
// @Param id path string true "the peer id you want to blacklist"
|
||||
// @Success 200 {peer} models.peer
|
||||
// @router /peer/:id/unknown [post]
|
||||
func (o *PeerController) Unknown() {
|
||||
// @router /:id/known [post]
|
||||
func (o *PeerController) Known() {
|
||||
user, _, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
||||
id := o.Ctx.Input.Param(":id")
|
||||
req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||
data := req.LoadOne(id)
|
||||
o.changeRelation(data.ToPeer(), user, groups, peer.NONE, req)
|
||||
|
||||
o.changeRelation(id, data.ToPeer(), user, groups, peer.NONE, req)
|
||||
}
|
||||
|
||||
// @Title Partner
|
||||
// @Description add partner peer by peerid
|
||||
// @Param id path string true "the peer id you want to blacklist"
|
||||
// @Success 200 {peer} models.peer
|
||||
// @router /peer/:id/partner [post]
|
||||
// @router /:id/partner [post]
|
||||
func (o *PeerController) Partner() {
|
||||
user, _, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
||||
id := o.Ctx.Input.Param(":id")
|
||||
req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||
data := req.LoadOne(id)
|
||||
o.changeRelation(data.ToPeer(), user, groups, peer.PARTNER, req)
|
||||
o.changeRelation(id, data.ToPeer(), user, groups, peer.PARTNER, req)
|
||||
}
|
||||
|
||||
// @Title Blacklist
|
||||
// @Description add blacklist peer by peerid
|
||||
// @Param id path string true "the peer id you want to blacklist"
|
||||
// @Success 200 {peer} models.peer
|
||||
// @router /peer/:id/blacklist [post]
|
||||
// @router /:id/blacklist [post]
|
||||
func (o *PeerController) Blacklist() {
|
||||
user, _, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
||||
id := o.Ctx.Input.Param(":id")
|
||||
req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||
data := req.LoadOne(id)
|
||||
o.changeRelation(data.ToPeer(), user, groups, peer.BLACKLIST, req)
|
||||
o.changeRelation(id, data.ToPeer(), user, groups, peer.BLACKLIST, req)
|
||||
}
|
||||
|
||||
// used from : peer ask, or response, only from peer origin is authorized to change...
|
||||
func (o *PeerController) changeRelation(dest *peer.Peer, user string, groups []string, relation peer.PeerRelation, request *oclib.Request) {
|
||||
/*if ok, _ := oclib.IsMySelf(request.PeerID); !ok {
|
||||
o.Data["json"] = map[string]interface{}{
|
||||
"data": nil,
|
||||
"code": 400,
|
||||
"error": "can't change relation if not ourself",
|
||||
func (o *PeerController) changeRelation(id string, dest *peer.Peer, user string, groups []string, relation peer.PeerRelation, request *oclib.Request) {
|
||||
infrastructure.SearchMu.Lock()
|
||||
if dest == nil && infrastructure.SearchStreamAction[user] != nil { // add auto in base if not existing
|
||||
req := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||
for _, pp := range infrastructure.SearchStreamAction[user] {
|
||||
if pp.GetID() == id {
|
||||
data := req.StoreOne(pp.Serialize(pp))
|
||||
dest = data.ToPeer()
|
||||
break
|
||||
}
|
||||
}
|
||||
o.ServeJSON()
|
||||
return
|
||||
}*/
|
||||
if ok, _ := oclib.IsMySelf(dest.GetID()); ok {
|
||||
o.Data["json"] = map[string]interface{}{
|
||||
"data": nil,
|
||||
"code": 400,
|
||||
"error": "can't change ourself",
|
||||
}
|
||||
o.ServeJSON()
|
||||
return
|
||||
}
|
||||
infrastructure.SearchMu.Unlock()
|
||||
// store and return Id or post with UUID
|
||||
if dest != nil {
|
||||
if ok, _ := oclib.IsMySelf(dest.GetID()); ok {
|
||||
o.Data["json"] = map[string]interface{}{
|
||||
"data": nil,
|
||||
"code": 400,
|
||||
"error": "can't change ourself",
|
||||
}
|
||||
o.ServeJSON()
|
||||
return
|
||||
}
|
||||
if !dest.Verify {
|
||||
switch relation {
|
||||
case peer.PARTNER:
|
||||
@@ -194,6 +189,7 @@ func (o *PeerController) changeRelation(dest *peer.Peer, user string, groups []s
|
||||
if dest.Verify && relation == peer.PENDING_PARTNER {
|
||||
relation = peer.PARTNER
|
||||
}
|
||||
fmt.Println("CHANGE REL", dest.GetID())
|
||||
data := request.UpdateOne(map[string]interface{}{
|
||||
"relation": relation,
|
||||
}, dest.GetID())
|
||||
@@ -225,7 +221,7 @@ func (o *PeerController) changeRelation(dest *peer.Peer, user string, groups []s
|
||||
// @Description delete state peer by peerid
|
||||
// @Param id path string true "the peer id you want to delete state"
|
||||
// @Success 200 {peer} models.peer
|
||||
// @router /peer/:id/undo_state [post]
|
||||
// @router /:id/undo_state [post]
|
||||
func (o *PeerController) DeleteState() {
|
||||
user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request)
|
||||
id := o.Ctx.Input.Param(":id")
|
||||
@@ -235,48 +231,52 @@ func (o *PeerController) DeleteState() {
|
||||
o.ServeJSON()
|
||||
}
|
||||
|
||||
func Websocket(ctx ctx.Context, user string, r http.ResponseWriter, w *http.Request) {
|
||||
websocket.Handler(func(ws *websocket.Conn) {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
var discard interface{}
|
||||
for {
|
||||
if websocket.JSON.Receive(ws, &discard) != nil {
|
||||
close(done)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer func() {
|
||||
ws.Close()
|
||||
if ch, ok := infrastructure.SearchStream[user]; ok {
|
||||
close(ch)
|
||||
infrastructure.SearchMu.Lock()
|
||||
delete(infrastructure.SearchStream, user)
|
||||
infrastructure.SearchMu.Unlock()
|
||||
}
|
||||
fmt.Println("CLOSE !")
|
||||
infrastructure.EmitNATS(user, nil, tools.PropalgationMessage{
|
||||
Action: tools.PB_CLOSE_SEARCH,
|
||||
DataType: tools.PEER.EnumIndex(),
|
||||
})
|
||||
}()
|
||||
func Websocket(ctx ctx.Context, user string, conn *websocket.Conn) {
|
||||
defer conn.Close()
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
var discard interface{}
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-infrastructure.SearchStream[user]:
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if websocket.JSON.Send(ws, msg) != nil {
|
||||
continue
|
||||
}
|
||||
case <-done:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
if conn.ReadJSON(&discard) != nil {
|
||||
close(done)
|
||||
return
|
||||
}
|
||||
}
|
||||
}).ServeHTTP(r, w)
|
||||
}()
|
||||
defer func() {
|
||||
if ch, ok := infrastructure.SearchStream[user]; ok {
|
||||
close(ch)
|
||||
infrastructure.SearchMu.Lock()
|
||||
delete(infrastructure.SearchStream, user)
|
||||
delete(infrastructure.SearchStreamAction, user)
|
||||
infrastructure.SearchMu.Unlock()
|
||||
}
|
||||
fmt.Println("CLOSE !")
|
||||
infrastructure.EmitNATS(user, nil, tools.PropalgationMessage{
|
||||
Action: tools.PB_CLOSE_SEARCH,
|
||||
DataType: tools.PEER.EnumIndex(),
|
||||
})
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case msg, ok := <-infrastructure.SearchStream[user]:
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
infrastructure.SearchMu.Lock()
|
||||
infrastructure.SearchStreamAction[user] = append(infrastructure.SearchStreamAction[user], msg)
|
||||
infrastructure.SearchMu.Unlock()
|
||||
|
||||
if conn.WriteJSON(msg) != nil {
|
||||
continue
|
||||
}
|
||||
case <-done:
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
Reference in New Issue
Block a user