Files
oc-datacenter/infrastructure/monitor/prometheus.go
2026-02-17 17:23:54 +01:00

195 lines
5.2 KiB
Go

package monitor
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/models/resources"
"github.com/gorilla/websocket"
)
type PrometheusResponse struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Result []struct {
Metric map[string]string `json:"metric"`
Value []interface{} `json:"value"` // [timestamp, value]
} `json:"result"`
} `json:"data"`
}
var queriesMetrics = []string{
"rate(container_cpu_usage_seconds_total{namespace=\"%s\"}[1m]) * 100",
"container_memory_usage_bytes{namespace=\"%s\"}",
"(container_fs_usage_bytes{namespace=\"%s\"}) / (container_fs_limit_bytes{namespace=\"%s\"}) * 100",
"DCGM_FI_DEV_GPU_UTIL{namespace=\"%s\"}",
"rate(container_fs_reads_bytes_total{namespace=\"%s\"}[1m])",
"rate(container_fs_writes_bytes_total{namespace=\"%s\"}[1m])",
"rate(container_network_receive_bytes_total{namespace=\"%s\"}[1m])",
"rate(container_network_transmit_bytes_total{namespace=\"%s\"}[1m])",
"rate(http_requests_total{namespace=\"%s\"}[1m])",
"(rate(http_requests_total{status=~\"5..\", namespace=\"%s\"}[1m]) / rate(http_requests_total{namespace=\"%s\"}[1m])) * 100",
}
var httpClient = &http.Client{
Timeout: 10 * time.Second,
}
// StreamRegistry manages cancellation of active monitoring streams by namespace.
var StreamRegistry = &streamRegistry{
streams: map[string]context.CancelFunc{},
}
type streamRegistry struct {
mu sync.Mutex
streams map[string]context.CancelFunc
}
func (r *streamRegistry) Register(namespace string) context.Context {
r.mu.Lock()
defer r.mu.Unlock()
if cancel, ok := r.streams[namespace]; ok {
cancel()
}
ctx, cancel := context.WithCancel(context.Background())
r.streams[namespace] = cancel
return ctx
}
func (r *streamRegistry) Cancel(namespace string) {
r.mu.Lock()
defer r.mu.Unlock()
if cancel, ok := r.streams[namespace]; ok {
cancel()
delete(r.streams, namespace)
}
}
type PrometheusService struct {
}
func NewPrometheusService() *PrometheusService {
return &PrometheusService{}
}
func (p *PrometheusService) queryPrometheus(ctx context.Context, promURL string, expr string, namespace string) models.Metric {
metric := models.Metric{Name: expr, Value: -1}
query := strings.ReplaceAll(expr, "%s", namespace)
reqURL := promURL + "/api/v1/query?query=" + url.QueryEscape(query)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil)
if err != nil {
metric.Error = err
return metric
}
resp, err := httpClient.Do(req)
if err != nil {
metric.Error = err
return metric
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
metric.Error = err
return metric
}
var result PrometheusResponse
if err = json.Unmarshal(body, &result); err != nil {
metric.Error = err
return metric
}
if len(result.Data.Result) > 0 && len(result.Data.Result[0].Value) == 2 {
metric.Value, metric.Error = strconv.ParseFloat(fmt.Sprintf("%s", result.Data.Result[0].Value[1]), 64)
}
return metric
}
func (p *PrometheusService) Stream(ctx context.Context, bookingID string, interval time.Duration, ws *websocket.Conn) {
logger := oclib.GetLogger()
max := 100
count := 0
mets := map[string][]models.MetricsSnapshot{}
bAccess := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.BOOKING), nil)
book := bAccess.LoadOne(bookingID)
if book.Err != "" {
logger.Err(fmt.Errorf("stop because of empty : %s", book.Err))
return
}
isActive := func(e *booking.Booking) bool {
if e.ExpectedEndDate == nil {
return true
}
return time.Now().Before(*e.ExpectedEndDate)
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for isActive(book.Data.(*booking.Booking)) {
select {
case <-ctx.Done():
return
case <-ticker.C:
}
b, metrics := Call(book.Data.(*booking.Booking),
func(dc *live.LiveDatacenter, instance *resources.ComputeResourceInstance,
metrics map[string]models.MetricsSnapshot,
wg *sync.WaitGroup, mu *sync.Mutex) {
defer wg.Done()
for _, expr := range queriesMetrics {
if mm, ok := metrics[instance.Name]; !ok {
mu.Lock()
metrics[instance.Name] = models.MetricsSnapshot{
From: instance.Source,
Metrics: []models.Metric{p.queryPrometheus(ctx, dc.MonitorPath, expr, book.Data.(*booking.Booking).ExecutionsID)},
}
mu.Unlock()
} else {
mu.Lock()
mm.Metrics = append(mm.Metrics, p.queryPrometheus(ctx, dc.MonitorPath, expr, book.Data.(*booking.Booking).ExecutionsID))
mu.Unlock()
}
}
})
_ = b
count++
if ws != nil {
if err := ws.WriteJSON(metrics); err != nil {
logger.Err(fmt.Errorf("websocket write error: %w", err))
return
}
}
if count < max {
continue
}
bk := book.Data.(*booking.Booking)
if bk.ExecutionMetrics == nil {
bk.ExecutionMetrics = mets
} else {
for kk, vv := range mets {
bk.ExecutionMetrics[kk] = append(bk.ExecutionMetrics[kk], vv...)
}
}
bk.GetAccessor(nil).UpdateOne(bk, bookingID)
mets = map[string][]models.MetricsSnapshot{}
count = 0
}
}