Observe + metrics
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/config"
|
||||
@@ -19,14 +20,15 @@ var ressourceCols = []oclib.LibDataEnum{
|
||||
|
||||
var SearchMu sync.RWMutex
|
||||
var SearchStreamAction = map[string][]*peer.Peer{}
|
||||
var SearchStream = map[string]chan *peer.Peer{}
|
||||
var SearchStream = map[string]chan WSMessage{}
|
||||
|
||||
func EmitNATS(user string, groups []string, message tools.PropalgationMessage) {
|
||||
b, _ := json.Marshal(message)
|
||||
if message.Action == tools.PB_SEARCH {
|
||||
SearchMu.Lock()
|
||||
SearchStream[user] = make(chan *peer.Peer, 128)
|
||||
SearchStream[user] = make(chan WSMessage, 128)
|
||||
SearchStreamAction[user] = []*peer.Peer{}
|
||||
fmt.Println("NEW PB")
|
||||
SearchMu.Unlock()
|
||||
}
|
||||
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
|
||||
@@ -39,24 +41,6 @@ func EmitNATS(user string, groups []string, message tools.PropalgationMessage) {
|
||||
})
|
||||
}
|
||||
|
||||
// un ressource quand on l'ajoute à notre catalogue elle nous est étrangère.
|
||||
// pour se la réaffecté à soit, on peut alors changer le créator ID.
|
||||
// pour protéger une ressource l'idée serait de la signée.
|
||||
// si on la stocke en base, elle va se dépréciée plus encore si le user n'est pas un partenaire.
|
||||
// elle ne sera pas maintenue à jour. Si c'est une ressource publique et qu'elle change
|
||||
// l'offre peut disparaitre mais subsisté chez nous.
|
||||
// alors si on en dispose et qu'on souhaite l'exploité. On doit en vérifier la validité... ou...
|
||||
// la mettre à jour. Le problème de la mise à jour c'est qu'on peut facilement
|
||||
// overflow.... de stream pour avoir à jour sa ressource.
|
||||
// donc l'idée est que la vérification soit manuelle... ou lors d'une vérification de dernière instance.
|
||||
|
||||
// si une ressource est exploitée dans un workflow ou un shared workspace.
|
||||
// elle doit être vérifié par les pairs engagés.
|
||||
// si la donnée est déclaré comme donnée de l'emmetteur alors on vérifie que la signature est bien émise, par
|
||||
// l'emmetteur. Sinon... on doit interrogé le pair qui a émit la donnée. Est ce que la donnée est à jour.
|
||||
// lui va vérifier la signature de la ressource qu'il possède correspondante si elle existe, si non. AIE,
|
||||
// on met à jour mais on pète une erreur.
|
||||
|
||||
var self *peer.Peer
|
||||
|
||||
func ListenNATS() {
|
||||
@@ -65,15 +49,15 @@ func ListenNATS() {
|
||||
if resp.FromApp == config.GetAppName() || !slices.Contains(ressourceCols, oclib.LibDataEnum(resp.Datatype)) {
|
||||
return
|
||||
}
|
||||
self, _ := oclib.GetMySelf()
|
||||
if self != nil && self.IsNano {
|
||||
return
|
||||
}
|
||||
p := &peer.Peer{}
|
||||
if err := json.Unmarshal(resp.Payload, &p); err == nil {
|
||||
/*if err := verify(resp.Payload); err != nil {
|
||||
return // don't trust anyone... only friends and foes are privilege
|
||||
}*/
|
||||
|
||||
fmt.Println("CREATE_RESOURCE", p.GetID())
|
||||
|
||||
if ok, _ := oclib.IsMySelf(p.GetID()); ok {
|
||||
if self.GetID() == p.GetID() {
|
||||
fmt.Println("it's ourselve !")
|
||||
return
|
||||
}
|
||||
@@ -99,11 +83,24 @@ func ListenNATS() {
|
||||
"relation": p.Relation,
|
||||
}, p.GetID())
|
||||
}
|
||||
} else if p.Relation == peer.PENDING_NANO || p.Relation == peer.NANO {
|
||||
if data.ToPeer().Verify {
|
||||
access.UpdateOne(map[string]interface{}{
|
||||
"verify": false,
|
||||
"relation": peer.MASTER,
|
||||
}, p.GetID())
|
||||
} else {
|
||||
access.UpdateOne(map[string]interface{}{
|
||||
"verify": true,
|
||||
"relation": peer.PENDING_MASTER,
|
||||
}, p.GetID())
|
||||
}
|
||||
} else if p.Relation != peer.SELF && p.Relation != peer.BLACKLIST {
|
||||
if p.Relation == peer.PARTNER || p.Relation == peer.PENDING_PARTNER {
|
||||
p.Verify = true
|
||||
p.Relation = peer.PENDING_PARTNER
|
||||
}
|
||||
p.IsNano = config.GetConfig().IsNano
|
||||
access.StoreOne(p.Serialize(p))
|
||||
}
|
||||
}
|
||||
@@ -113,23 +110,54 @@ func ListenNATS() {
|
||||
return
|
||||
}
|
||||
p := &peer.Peer{}
|
||||
err := json.Unmarshal(resp.Payload, p)
|
||||
if err == nil {
|
||||
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||
fmt.Println("ADD in SEARCH STREAM", p.GetID())
|
||||
if s := access.Search(&dbs.Filters{
|
||||
And: map[string][]dbs.Filter{
|
||||
"peer_id": {{Operator: dbs.EQUAL.String(), Value: p.PeerID}},
|
||||
},
|
||||
}, "", false, 0, 1); len(s.Data) > 0 {
|
||||
p.Relation = s.Data[0].(*peer.Peer).Relation
|
||||
} else {
|
||||
p.NotInCatalog = true
|
||||
}
|
||||
SearchMu.Lock()
|
||||
SearchStream[resp.User] <- p // TODO when do we update it in our catalog ?
|
||||
SearchMu.Unlock()
|
||||
if err := json.Unmarshal(resp.Payload, p); err != nil {
|
||||
return
|
||||
}
|
||||
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.PEER), nil)
|
||||
fmt.Println("ADD in SEARCH STREAM", p.GetID())
|
||||
if s := access.Search(&dbs.Filters{
|
||||
And: map[string][]dbs.Filter{
|
||||
"peer_id": {{Operator: dbs.EQUAL.String(), Value: p.PeerID}},
|
||||
},
|
||||
}, "", false, 0, 1); len(s.Data) > 0 {
|
||||
p.Relation = s.Data[0].(*peer.Peer).Relation
|
||||
} else {
|
||||
p.NotInCatalog = true
|
||||
}
|
||||
|
||||
// Stamp volatile online state from cache.
|
||||
p.Online = IsOnline(p.PeerID)
|
||||
now := time.Now()
|
||||
if p.Online {
|
||||
p.LastHeartbeat = &now
|
||||
}
|
||||
|
||||
SearchMu.RLock()
|
||||
if ch, ok := SearchStream[resp.User]; ok {
|
||||
select {
|
||||
case ch <- WSMessage{Type: "peer", Peer: p}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
SearchMu.RUnlock()
|
||||
},
|
||||
|
||||
// PEER_OBSERVE_RESPONSE_EVENT is emitted by oc-discovery when it
|
||||
// receives a heartbeat from an observed remote peer.
|
||||
tools.PEER_OBSERVE_RESPONSE_EVENT: func(resp tools.NATSResponse) {
|
||||
var batch struct {
|
||||
PeerIDs []string `json:"peer_ids"`
|
||||
Metrics map[string]*PeerConnectivityMetrics `json:"metrics"`
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(resp.Payload, &batch); err != nil {
|
||||
return
|
||||
}
|
||||
if len(batch.PeerIDs) == 0 {
|
||||
return
|
||||
}
|
||||
fmt.Println("METRICS", batch.Metrics)
|
||||
HandleHeartbeatBatch(batch.PeerIDs, batch.Metrics)
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
326
infrastructure/observe.go
Normal file
326
infrastructure/observe.go
Normal file
@@ -0,0 +1,326 @@
|
||||
package infrastructure
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
)
|
||||
|
||||
const observeTimeout = 60 * time.Second
|
||||
const offlineRetryInterval = 60 * time.Second
|
||||
|
||||
// PeerConnectivityMetrics is the quality snapshot received from oc-discovery
|
||||
// via PEER_OBSERVE_RESPONSE_EVENT and forwarded to the frontend via WebSocket.
|
||||
type PeerConnectivityMetrics struct {
|
||||
LatencyMs float64 `json:"latency_ms"`
|
||||
Speed string `json:"speed"` // "fast" | "slow"
|
||||
Reliability string `json:"reliability"` // "reliable" | "watch"
|
||||
TrustScore float64 `json:"trust_score"`
|
||||
LastSeenAt time.Time `json:"last_seen_at"`
|
||||
MissRate float64 `json:"miss_rate"`
|
||||
}
|
||||
|
||||
// WSMessage is sent on every WebSocket endpoint (search and observe).
|
||||
//
|
||||
// Type == "peer" → Peer field is set
|
||||
// Type == "connectivity" → PeerID + Online + Metrics are set
|
||||
type WSMessage struct {
|
||||
Type string `json:"type"`
|
||||
Peer *peer.Peer `json:"peer,omitempty"`
|
||||
PeerID string `json:"peer_id,omitempty"`
|
||||
Online bool `json:"online,omitempty"`
|
||||
Metrics *PeerConnectivityMetrics `json:"metrics,omitempty"`
|
||||
}
|
||||
|
||||
// ShallowPeer is the minimal peer representation the frontend sends when
|
||||
// requesting observation. oc-discovery uses Address/StreamAddress to reach it.
|
||||
type ShallowPeer struct {
|
||||
ID string `json:"id"`
|
||||
PeerID string `json:"peer_id"`
|
||||
Address string `json:"address"`
|
||||
StreamAddress string `json:"stream_address"`
|
||||
}
|
||||
|
||||
// peerObserveCmd is the NATS payload sent to oc-discovery.
|
||||
//
|
||||
// Observe → User + Peers populated
|
||||
// Close → User + PeerIDs + Close=true
|
||||
// CloseAll → User + CloseAll=true
|
||||
type peerObserveCmd struct {
|
||||
User string `json:"user"`
|
||||
Peers []ShallowPeer `json:"peers,omitempty"`
|
||||
PeerIDs []string `json:"peer_ids,omitempty"`
|
||||
Close bool `json:"close,omitempty"`
|
||||
CloseAll bool `json:"close_all,omitempty"`
|
||||
}
|
||||
|
||||
// ── online cache ─────────────────────────────────────────────────────────────
|
||||
|
||||
type onlineState struct {
|
||||
online bool
|
||||
timer *time.Timer
|
||||
metrics *PeerConnectivityMetrics
|
||||
}
|
||||
|
||||
var (
|
||||
onlineMu sync.Mutex
|
||||
onlineCache = map[string]*onlineState{}
|
||||
|
||||
offlineMu sync.Mutex
|
||||
offlineCache = map[string]struct{}{}
|
||||
)
|
||||
|
||||
// IsOnline returns whether peerID is currently considered online.
|
||||
func IsOnline(peerID string) bool {
|
||||
onlineMu.Lock()
|
||||
defer onlineMu.Unlock()
|
||||
if s, ok := onlineCache[peerID]; ok {
|
||||
return s.online
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// HandleHeartbeatBatch processes a batch of peer heartbeats with optional metrics.
|
||||
// Called from the PEER_OBSERVE_RESPONSE_EVENT NATS listener.
|
||||
func HandleHeartbeatBatch(peerIDs []string, metricsMap map[string]*PeerConnectivityMetrics) {
|
||||
for _, id := range peerIDs {
|
||||
var m *PeerConnectivityMetrics
|
||||
if metricsMap != nil {
|
||||
m = metricsMap[id]
|
||||
}
|
||||
setOnline(id, m)
|
||||
}
|
||||
}
|
||||
|
||||
func setOnline(peerID string, metrics *PeerConnectivityMetrics) {
|
||||
fmt.Println("SET ONLINE ", peerID, metrics)
|
||||
onlineMu.Lock()
|
||||
s, exists := onlineCache[peerID]
|
||||
if !exists {
|
||||
s = &onlineState{}
|
||||
onlineCache[peerID] = s
|
||||
}
|
||||
s.online = true
|
||||
s.metrics = metrics
|
||||
if s.timer != nil {
|
||||
s.timer.Stop()
|
||||
}
|
||||
s.timer = time.AfterFunc(observeTimeout, func() { setOffline(peerID) })
|
||||
onlineMu.Unlock()
|
||||
|
||||
offlineMu.Lock()
|
||||
delete(offlineCache, peerID)
|
||||
offlineMu.Unlock()
|
||||
|
||||
broadcastConnectivity(peerID, true, metrics)
|
||||
}
|
||||
|
||||
func setOffline(peerID string) {
|
||||
onlineMu.Lock()
|
||||
s, ok := onlineCache[peerID]
|
||||
if !ok || !s.online {
|
||||
onlineMu.Unlock()
|
||||
return
|
||||
}
|
||||
s.online = false
|
||||
s.timer = nil
|
||||
onlineMu.Unlock()
|
||||
|
||||
offlineMu.Lock()
|
||||
offlineCache[peerID] = struct{}{}
|
||||
offlineMu.Unlock()
|
||||
|
||||
broadcastConnectivity(peerID, false, nil)
|
||||
}
|
||||
|
||||
// broadcastConnectivity fans out a connectivity change to all observe sessions
|
||||
// watching peerID.
|
||||
func broadcastConnectivity(peerID string, online bool, metrics *PeerConnectivityMetrics) {
|
||||
fmt.Println("BORADCAST METRICS", metrics)
|
||||
msg := WSMessage{Type: "connectivity", PeerID: peerID, Online: online, Metrics: metrics}
|
||||
|
||||
sessionsMu.RLock()
|
||||
for _, s := range sessions {
|
||||
for _, p := range s.peers {
|
||||
if p.PeerID == peerID {
|
||||
select {
|
||||
case s.ch <- msg:
|
||||
default:
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
sessionsMu.RUnlock()
|
||||
}
|
||||
|
||||
// ── observe sessions ──────────────────────────────────────────────────────────
|
||||
|
||||
// observeSession holds per-WS-connection state (user, watched peers, output channel).
|
||||
type observeSession struct {
|
||||
user string
|
||||
peers []ShallowPeer
|
||||
ch chan WSMessage
|
||||
}
|
||||
|
||||
var (
|
||||
sessionsMu sync.RWMutex
|
||||
sessions = map[string]*observeSession{}
|
||||
)
|
||||
|
||||
// RegisterObserveSession creates an empty session for connID / user.
|
||||
func RegisterObserveSession(connID, user string, ch chan WSMessage) {
|
||||
sessionsMu.Lock()
|
||||
sessions[connID] = &observeSession{user: user, ch: ch}
|
||||
sessionsMu.Unlock()
|
||||
}
|
||||
|
||||
// AddObservedPeers merges new peers into the session, emits a NATS observe
|
||||
// command (user-scoped), and returns the current online state for each peer.
|
||||
func AddObservedPeers(connID string, peers []ShallowPeer) []WSMessage {
|
||||
fmt.Println("AddObservedPeers Concrete")
|
||||
sessionsMu.Lock()
|
||||
s, ok := sessions[connID]
|
||||
if !ok {
|
||||
sessionsMu.Unlock()
|
||||
fmt.Println("sessions IS NOT")
|
||||
return nil
|
||||
}
|
||||
existing := make(map[string]struct{}, len(s.peers))
|
||||
for _, p := range s.peers {
|
||||
existing[p.PeerID] = struct{}{}
|
||||
}
|
||||
var newPeers []ShallowPeer
|
||||
for _, p := range peers {
|
||||
if _, dup := existing[p.PeerID]; !dup {
|
||||
s.peers = append(s.peers, p)
|
||||
newPeers = append(newPeers, p)
|
||||
}
|
||||
}
|
||||
user := s.user
|
||||
sessionsMu.Unlock()
|
||||
|
||||
fmt.Println("newPeers", newPeers)
|
||||
|
||||
if len(newPeers) > 0 {
|
||||
EmitObserve(user, newPeers)
|
||||
}
|
||||
return GetCurrentStates(peerIDsFrom(peers))
|
||||
}
|
||||
|
||||
// CloseObserveSession emits NATS close for all peers in the session, then
|
||||
// removes it. Call this on WS disconnect (normal or error).
|
||||
func CloseObserveSession(connID string) {
|
||||
sessionsMu.Lock()
|
||||
s, ok := sessions[connID]
|
||||
if !ok {
|
||||
sessionsMu.Unlock()
|
||||
return
|
||||
}
|
||||
delete(sessions, connID)
|
||||
user := s.user
|
||||
ids := peerIDsFrom(s.peers)
|
||||
sessionsMu.Unlock()
|
||||
|
||||
if len(ids) > 0 {
|
||||
EmitClose(user, ids)
|
||||
}
|
||||
}
|
||||
|
||||
// GetCurrentStates returns one WSMessage per peer ID from the online cache
|
||||
// (defaults to offline for unknown peers).
|
||||
func GetCurrentStates(peerIDs []string) []WSMessage {
|
||||
onlineMu.Lock()
|
||||
defer onlineMu.Unlock()
|
||||
msgs := make([]WSMessage, 0, len(peerIDs))
|
||||
for _, id := range peerIDs {
|
||||
online := false
|
||||
var metrics *PeerConnectivityMetrics
|
||||
if s, ok := onlineCache[id]; ok {
|
||||
online = s.online
|
||||
metrics = s.metrics
|
||||
}
|
||||
msgs = append(msgs, WSMessage{Type: "connectivity", PeerID: id, Online: online, Metrics: metrics})
|
||||
}
|
||||
return msgs
|
||||
}
|
||||
|
||||
func peerIDsFrom(peers []ShallowPeer) []string {
|
||||
ids := make([]string, 0, len(peers))
|
||||
for _, p := range peers {
|
||||
ids = append(ids, p.PeerID)
|
||||
}
|
||||
return ids
|
||||
}
|
||||
|
||||
// ── NATS emission ─────────────────────────────────────────────────────────────
|
||||
|
||||
// EmitObserve asks oc-discovery to start observing peers on behalf of user.
|
||||
func EmitObserve(user string, peers []ShallowPeer) {
|
||||
emitCmd(peerObserveCmd{User: user, Peers: peers})
|
||||
}
|
||||
|
||||
// EmitClose asks oc-discovery to stop observing peerIDs for user.
|
||||
func EmitClose(user string, peerIDs []string) {
|
||||
emitCmd(peerObserveCmd{User: user, PeerIDs: peerIDs, Close: true})
|
||||
}
|
||||
|
||||
// EmitCloseAll resets all observations for user in oc-discovery.
|
||||
// Pass an empty user to clear everything (startup reset).
|
||||
func EmitCloseAll(user string) {
|
||||
emitCmd(peerObserveCmd{User: user, CloseAll: true})
|
||||
}
|
||||
|
||||
func emitCmd(cmd peerObserveCmd) {
|
||||
b, err := json.Marshal(cmd)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
tools.NewNATSCaller().SetNATSPub(tools.PEER_OBSERVE_EVENT, tools.NATSResponse{
|
||||
FromApp: "oc-peer",
|
||||
Datatype: tools.PEER,
|
||||
Method: int(tools.PEER_OBSERVE_EVENT),
|
||||
Payload: b,
|
||||
})
|
||||
}
|
||||
|
||||
// ── offline retry loop ────────────────────────────────────────────────────────
|
||||
|
||||
// StartOfflineRetryLoop re-requests observation for offline peers every
|
||||
// offlineRetryInterval, scoped to each active session's user.
|
||||
func StartOfflineRetryLoop() {
|
||||
go func() {
|
||||
ticker := time.NewTicker(offlineRetryInterval)
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
offlineMu.Lock()
|
||||
offlineIDs := make(map[string]struct{}, len(offlineCache))
|
||||
for id := range offlineCache {
|
||||
offlineIDs[id] = struct{}{}
|
||||
}
|
||||
offlineMu.Unlock()
|
||||
|
||||
if len(offlineIDs) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
sessionsMu.RLock()
|
||||
for _, s := range sessions {
|
||||
var retry []ShallowPeer
|
||||
for _, p := range s.peers {
|
||||
if _, off := offlineIDs[p.PeerID]; off {
|
||||
retry = append(retry, p)
|
||||
}
|
||||
}
|
||||
if len(retry) > 0 {
|
||||
go EmitObserve(s.user, retry)
|
||||
}
|
||||
}
|
||||
sessionsMu.RUnlock()
|
||||
}
|
||||
}()
|
||||
}
|
||||
Reference in New Issue
Block a user