package infrastructure import ( "encoding/json" "fmt" "sync" "time" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/tools" ) const observeTimeout = 60 * time.Second const offlineRetryInterval = 60 * time.Second // PeerConnectivityMetrics is the quality snapshot received from oc-discovery // via PEER_OBSERVE_RESPONSE_EVENT and forwarded to the frontend via WebSocket. type PeerConnectivityMetrics struct { LatencyMs float64 `json:"latency_ms"` Speed string `json:"speed"` // "fast" | "slow" Reliability string `json:"reliability"` // "reliable" | "watch" TrustScore float64 `json:"trust_score"` LastSeenAt time.Time `json:"last_seen_at"` MissRate float64 `json:"miss_rate"` } // WSMessage is sent on every WebSocket endpoint (search and observe). // // Type == "peer" → Peer field is set // Type == "connectivity" → PeerID + Online + Metrics are set type WSMessage struct { Type string `json:"type"` Peer *peer.Peer `json:"peer,omitempty"` PeerID string `json:"peer_id,omitempty"` Online bool `json:"online,omitempty"` Metrics *PeerConnectivityMetrics `json:"metrics,omitempty"` } // ShallowPeer is the minimal peer representation the frontend sends when // requesting observation. oc-discovery uses Address/StreamAddress to reach it. type ShallowPeer struct { ID string `json:"id"` PeerID string `json:"peer_id"` Address string `json:"address"` StreamAddress string `json:"stream_address"` } // peerObserveCmd is the NATS payload sent to oc-discovery. // // Observe → User + Peers populated // Close → User + PeerIDs + Close=true // CloseAll → User + CloseAll=true type peerObserveCmd struct { User string `json:"user"` Peers []ShallowPeer `json:"peers,omitempty"` PeerIDs []string `json:"peer_ids,omitempty"` Close bool `json:"close,omitempty"` CloseAll bool `json:"close_all,omitempty"` } // ── online cache ───────────────────────────────────────────────────────────── type onlineState struct { online bool timer *time.Timer metrics *PeerConnectivityMetrics } var ( onlineMu sync.Mutex onlineCache = map[string]*onlineState{} offlineMu sync.Mutex offlineCache = map[string]struct{}{} ) // IsOnline returns whether peerID is currently considered online. func IsOnline(peerID string) bool { onlineMu.Lock() defer onlineMu.Unlock() if s, ok := onlineCache[peerID]; ok { return s.online } return false } // HandleHeartbeatBatch processes a batch of peer heartbeats with optional metrics. // Called from the PEER_OBSERVE_RESPONSE_EVENT NATS listener. func HandleHeartbeatBatch(peerIDs []string, metricsMap map[string]*PeerConnectivityMetrics) { for _, id := range peerIDs { var m *PeerConnectivityMetrics if metricsMap != nil { m = metricsMap[id] } setOnline(id, m) } } func setOnline(peerID string, metrics *PeerConnectivityMetrics) { fmt.Println("SET ONLINE ", peerID, metrics) onlineMu.Lock() s, exists := onlineCache[peerID] if !exists { s = &onlineState{} onlineCache[peerID] = s } s.online = true s.metrics = metrics if s.timer != nil { s.timer.Stop() } s.timer = time.AfterFunc(observeTimeout, func() { setOffline(peerID) }) onlineMu.Unlock() offlineMu.Lock() delete(offlineCache, peerID) offlineMu.Unlock() broadcastConnectivity(peerID, true, metrics) } func setOffline(peerID string) { onlineMu.Lock() s, ok := onlineCache[peerID] if !ok || !s.online { onlineMu.Unlock() return } s.online = false s.timer = nil onlineMu.Unlock() offlineMu.Lock() offlineCache[peerID] = struct{}{} offlineMu.Unlock() broadcastConnectivity(peerID, false, nil) } // broadcastConnectivity fans out a connectivity change to all observe sessions // watching peerID. func broadcastConnectivity(peerID string, online bool, metrics *PeerConnectivityMetrics) { fmt.Println("BORADCAST METRICS", metrics) msg := WSMessage{Type: "connectivity", PeerID: peerID, Online: online, Metrics: metrics} sessionsMu.RLock() for _, s := range sessions { for _, p := range s.peers { if p.PeerID == peerID { select { case s.ch <- msg: default: } break } } } sessionsMu.RUnlock() } // ── observe sessions ────────────────────────────────────────────────────────── // observeSession holds per-WS-connection state (user, watched peers, output channel). type observeSession struct { user string peers []ShallowPeer ch chan WSMessage } var ( sessionsMu sync.RWMutex sessions = map[string]*observeSession{} ) // RegisterObserveSession creates an empty session for connID / user. func RegisterObserveSession(connID, user string, ch chan WSMessage) { sessionsMu.Lock() sessions[connID] = &observeSession{user: user, ch: ch} sessionsMu.Unlock() } // AddObservedPeers merges new peers into the session, emits a NATS observe // command (user-scoped), and returns the current online state for each peer. func AddObservedPeers(connID string, peers []ShallowPeer) []WSMessage { fmt.Println("AddObservedPeers Concrete") sessionsMu.Lock() s, ok := sessions[connID] if !ok { sessionsMu.Unlock() fmt.Println("sessions IS NOT") return nil } existing := make(map[string]struct{}, len(s.peers)) for _, p := range s.peers { existing[p.PeerID] = struct{}{} } var newPeers []ShallowPeer for _, p := range peers { if _, dup := existing[p.PeerID]; !dup { s.peers = append(s.peers, p) newPeers = append(newPeers, p) } } user := s.user sessionsMu.Unlock() fmt.Println("newPeers", newPeers) if len(newPeers) > 0 { EmitObserve(user, newPeers) } return GetCurrentStates(peerIDsFrom(peers)) } // CloseObserveSession emits NATS close for all peers in the session, then // removes it. Call this on WS disconnect (normal or error). func CloseObserveSession(connID string) { sessionsMu.Lock() s, ok := sessions[connID] if !ok { sessionsMu.Unlock() return } delete(sessions, connID) user := s.user ids := peerIDsFrom(s.peers) sessionsMu.Unlock() if len(ids) > 0 { EmitClose(user, ids) } } // GetCurrentStates returns one WSMessage per peer ID from the online cache // (defaults to offline for unknown peers). func GetCurrentStates(peerIDs []string) []WSMessage { onlineMu.Lock() defer onlineMu.Unlock() msgs := make([]WSMessage, 0, len(peerIDs)) for _, id := range peerIDs { online := false var metrics *PeerConnectivityMetrics if s, ok := onlineCache[id]; ok { online = s.online metrics = s.metrics } msgs = append(msgs, WSMessage{Type: "connectivity", PeerID: id, Online: online, Metrics: metrics}) } return msgs } func peerIDsFrom(peers []ShallowPeer) []string { ids := make([]string, 0, len(peers)) for _, p := range peers { ids = append(ids, p.PeerID) } return ids } // ── NATS emission ───────────────────────────────────────────────────────────── // EmitObserve asks oc-discovery to start observing peers on behalf of user. func EmitObserve(user string, peers []ShallowPeer) { emitCmd(peerObserveCmd{User: user, Peers: peers}) } // EmitClose asks oc-discovery to stop observing peerIDs for user. func EmitClose(user string, peerIDs []string) { emitCmd(peerObserveCmd{User: user, PeerIDs: peerIDs, Close: true}) } // EmitCloseAll resets all observations for user in oc-discovery. // Pass an empty user to clear everything (startup reset). func EmitCloseAll(user string) { emitCmd(peerObserveCmd{User: user, CloseAll: true}) } func emitCmd(cmd peerObserveCmd) { b, err := json.Marshal(cmd) if err != nil { return } tools.NewNATSCaller().SetNATSPub(tools.PEER_OBSERVE_EVENT, tools.NATSResponse{ FromApp: "oc-peer", Datatype: tools.PEER, Method: int(tools.PEER_OBSERVE_EVENT), Payload: b, }) } // ── offline retry loop ──────────────────────────────────────────────────────── // StartOfflineRetryLoop re-requests observation for offline peers every // offlineRetryInterval, scoped to each active session's user. func StartOfflineRetryLoop() { go func() { ticker := time.NewTicker(offlineRetryInterval) defer ticker.Stop() for range ticker.C { offlineMu.Lock() offlineIDs := make(map[string]struct{}, len(offlineCache)) for id := range offlineCache { offlineIDs[id] = struct{}{} } offlineMu.Unlock() if len(offlineIDs) == 0 { continue } sessionsMu.RLock() for _, s := range sessions { var retry []ShallowPeer for _, p := range s.peers { if _, off := offlineIDs[p.PeerID]; off { retry = append(retry, p) } } if len(retry) > 0 { go EmitObserve(s.user, retry) } } sessionsMu.RUnlock() } }() }