From 5cc04ee490153f43b1deed2470b09caaedf8bf11 Mon Sep 17 00:00:00 2001 From: mr Date: Fri, 17 Apr 2026 09:45:00 +0200 Subject: [PATCH] oc-lib --- go.mod | 20 +++++++++++++++++++- go.sum | 2 ++ models/peer/peer.go | 6 ++++++ tools/enums.go | 35 +++++++++++++++++++++++++++++++++-- tools/nats_caller.go | 15 +++++++++++++-- 5 files changed, 73 insertions(+), 5 deletions(-) diff --git a/go.mod b/go.mod index d09024f..6ccd277 100755 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/go-playground/validator/v10 v10.22.0 github.com/google/uuid v1.6.0 github.com/goraz/onion v0.1.3 + github.com/libp2p/go-libp2p/core v0.43.0-rc2 github.com/nats-io/nats.go v1.37.0 github.com/rs/zerolog v1.33.0 github.com/stretchr/testify v1.11.1 @@ -22,18 +23,33 @@ require ( github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/google/gnostic-models v0.7.0 // indirect + github.com/ipfs/go-cid v0.5.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/cpuid/v2 v2.2.10 // indirect + github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect + github.com/minio/sha256-simd v1.0.1 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect + github.com/mr-tron/base58 v1.2.0 // indirect + github.com/multiformats/go-base32 v0.1.0 // indirect + github.com/multiformats/go-base36 v0.2.0 // indirect + github.com/multiformats/go-multiaddr v0.16.0 // indirect + github.com/multiformats/go-multibase v0.2.0 // indirect + github.com/multiformats/go-multicodec v0.9.1 // indirect + github.com/multiformats/go-multihash v0.2.3 // indirect + github.com/multiformats/go-multistream v0.6.1 // indirect + github.com/multiformats/go-varint v0.0.7 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nats-io/nkeys v0.4.7 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/x448/float16 v0.8.4 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect go.yaml.in/yaml/v3 v3.0.4 // indirect + golang.org/x/exp v0.0.0-20250606033433-dcc06ee1d476 // indirect golang.org/x/oauth2 v0.30.0 // indirect golang.org/x/term v0.37.0 // indirect golang.org/x/time v0.9.0 // indirect @@ -42,6 +58,7 @@ require ( k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 // indirect + lukechampine.com/blake3 v1.4.1 // indirect sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect sigs.k8s.io/randfill v1.0.0 // indirect sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect @@ -67,7 +84,6 @@ require ( github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/leodido/go-urn v1.4.0 // indirect - github.com/libp2p/go-libp2p/core v0.43.0-rc2 github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/montanaflynn/stats v0.7.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -89,3 +105,5 @@ require ( k8s.io/api v0.35.1 k8s.io/client-go v0.35.1 ) + +replace github.com/libp2p/go-libp2p/core => github.com/libp2p/go-libp2p v0.47.0 \ No newline at end of file diff --git a/go.sum b/go.sum index e064456..ee8e75c 100755 --- a/go.sum +++ b/go.sum @@ -133,6 +133,8 @@ github.com/multiformats/go-multicodec v0.9.1 h1:x/Fuxr7ZuR4jJV4Os5g444F7xC4XmyUa github.com/multiformats/go-multicodec v0.9.1/go.mod h1:LLWNMtyV5ithSBUo3vFIMaeDy+h3EbkMTek1m+Fybbo= github.com/multiformats/go-multihash v0.2.3 h1:7Lyc8XfX/IY2jWb/gI7JP+o7JEq9hOa7BFvVU9RSh+U= github.com/multiformats/go-multihash v0.2.3/go.mod h1:dXgKXCXjBzdscBLk9JkjINiEsCKRVch90MdaGiKsvSM= +github.com/multiformats/go-multistream v0.6.1 h1:4aoX5v6T+yWmc2raBHsTvzmFhOI8WVOer28DeBBEYdQ= +github.com/multiformats/go-multistream v0.6.1/go.mod h1:ksQf6kqHAb6zIsyw7Zm+gAuVo57Qbq84E27YlYqavqw= github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= diff --git a/models/peer/peer.go b/models/peer/peer.go index a4a8490..efa8e6e 100644 --- a/models/peer/peer.go +++ b/models/peer/peer.go @@ -92,6 +92,12 @@ type Peer struct { TrustScore float64 `json:"trust_score" bson:"trust_score" default:"100"` BlacklistReason string `json:"blacklist_reason,omitempty" bson:"blacklist_reason,omitempty"` BehaviorWarnings []BehaviorWarning `json:"behavior_warnings,omitempty" bson:"behavior_warnings,omitempty"` + + // Volatile connectivity state — never persisted to DB (bson:"-"). + // Set in-memory by oc-peer when it receives a PEER_OBSERVE_RESPONSE_EVENT. + // Considered offline when LastHeartbeat is older than 60 s (30 s interval + 30 s grace). + Online bool `json:"online" bson:"-"` + LastHeartbeat *time.Time `json:"last_heartbeat,omitempty" bson:"-"` } func (ao *Peer) VerifyAuth(callName string, request *tools.APIRequest) bool { diff --git a/tools/enums.go b/tools/enums.go index 33256d7..1d9c021 100644 --- a/tools/enums.go +++ b/tools/enums.go @@ -177,6 +177,11 @@ const ( PB_PVC_CONFIG PB_CLOSE_SEARCH NONE + PB_OBSERVE + PB_OBSERVE_CLOSE + // PB_PROPAGATE is used by oc-discovery to broadcast a peer's online/offline + // state to other oc-discovery nodes in the federation via PROPALGATION_EVENT. + PB_PROPAGATE ) func GetActionString(ss string) PubSubAction { @@ -203,14 +208,40 @@ func GetActionString(ss string) PubSubAction { return PB_MINIO_CONFIG case "close_search": return PB_CLOSE_SEARCH + case "observe": + return PB_OBSERVE + case "observe_close": + return PB_OBSERVE_CLOSE + case "propagate": + return PB_PROPAGATE default: return NONE } } -var path = []string{"search", "search_response", "create", "update", "delete", "planner", "close_planner", - "considers", "admiralty_config", "minio_config", "close_search"} +// path aligns with PubSubAction iota values for String(). +var path = []string{ + "search", // 0 PB_SEARCH + "search_response", // 1 PB_SEARCH_RESPONSE + "create", // 2 PB_CREATE + "update", // 3 PB_UPDATE + "delete", // 4 PB_DELETE + "planner", // 5 PB_PLANNER + "close_planner", // 6 PB_CLOSE_PLANNER + "considers", // 7 PB_CONSIDERS + "admiralty_config", // 8 PB_ADMIRALTY_CONFIG + "minio_config", // 9 PB_MINIO_CONFIG + "pvc_config", // 10 PB_PVC_CONFIG + "close_search", // 11 PB_CLOSE_SEARCH + "none", // 12 NONE + "observe", // 13 PB_OBSERVE + "observe_close", // 14 PB_OBSERVE_CLOSE + "propagate", // 15 PB_PROPAGATE +} func (m PubSubAction) String() string { + if int(m) >= len(path) { + return "unknown" + } return strings.ToUpper(path[m]) } diff --git a/tools/nats_caller.go b/tools/nats_caller.go index 29a41ab..324590c 100644 --- a/tools/nats_caller.go +++ b/tools/nats_caller.go @@ -31,7 +31,7 @@ var meths = []string{"remove execution", "create execution", "planner execution" "propalgation event", "search event", "confirm event", "considers event", "admiralty config event", "minio config event", "pvc config event", "workflow started event", "workflow step done event", "workflow done event", - "peer behavior event", + "peer behavior event", "peer observe response event", "peer observe event", } const ( @@ -68,6 +68,16 @@ const ( // oc-discovery consumes it to update the peer's trust score and auto-blacklist // below threshold. PEER_BEHAVIOR_EVENT + + // PEER_OBSERVE_RESPONSE_EVENT is emitted by oc-discovery each time it receives + // a heartbeat from an observed remote peer. oc-peer listens to this event to + // update the WS connectivity state for its clients. + PEER_OBSERVE_RESPONSE_EVENT + + // PEER_OBSERVE_EVENT is emitted by oc-peer to request oc-discovery to start + // or stop observing a remote peer. Payload contains the target peer_id and a + // boolean close flag. + PEER_OBSERVE_EVENT ) func (n NATSMethod) String() string { @@ -79,7 +89,8 @@ func NameToMethod(name string) NATSMethod { for _, v := range [...]NATSMethod{REMOVE_EXECUTION, CREATE_EXECUTION, PLANNER_EXECUTION, DISCOVERY, WORKFLOW_EVENT, ARGO_KUBE_EVENT, CREATE_RESOURCE, REMOVE_RESOURCE, PROPALGATION_EVENT, SEARCH_EVENT, CONFIRM_EVENT, CONSIDERS_EVENT, ADMIRALTY_CONFIG_EVENT, MINIO_CONFIG_EVENT, PVC_CONFIG_EVENT, - WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT} { + WORKFLOW_STARTED_EVENT, WORKFLOW_STEP_DONE_EVENT, WORKFLOW_DONE_EVENT, + PEER_BEHAVIOR_EVENT, PEER_OBSERVE_RESPONSE_EVENT, PEER_OBSERVE_EVENT} { if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) { return v }