package monitor import ( "context" "encoding/json" "fmt" "io" "net/http" "net/url" "strconv" "strings" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/common/models" "cloud.o-forge.io/core/oc-lib/models/live" "cloud.o-forge.io/core/oc-lib/models/resources" "github.com/gorilla/websocket" ) type PrometheusResponse struct { Status string `json:"status"` Data struct { ResultType string `json:"resultType"` Result []struct { Metric map[string]string `json:"metric"` Value []interface{} `json:"value"` // [timestamp, value] } `json:"result"` } `json:"data"` } var queriesMetrics = []string{ "rate(container_cpu_usage_seconds_total{namespace=\"%s\"}[1m]) * 100", "container_memory_usage_bytes{namespace=\"%s\"}", "(container_fs_usage_bytes{namespace=\"%s\"}) / (container_fs_limit_bytes{namespace=\"%s\"}) * 100", "DCGM_FI_DEV_GPU_UTIL{namespace=\"%s\"}", "rate(container_fs_reads_bytes_total{namespace=\"%s\"}[1m])", "rate(container_fs_writes_bytes_total{namespace=\"%s\"}[1m])", "rate(container_network_receive_bytes_total{namespace=\"%s\"}[1m])", "rate(container_network_transmit_bytes_total{namespace=\"%s\"}[1m])", "rate(http_requests_total{namespace=\"%s\"}[1m])", "(rate(http_requests_total{status=~\"5..\", namespace=\"%s\"}[1m]) / rate(http_requests_total{namespace=\"%s\"}[1m])) * 100", } var httpClient = &http.Client{ Timeout: 10 * time.Second, } // StreamRegistry manages cancellation of active monitoring streams by namespace. var StreamRegistry = &streamRegistry{ streams: map[string]context.CancelFunc{}, } type streamRegistry struct { mu sync.Mutex streams map[string]context.CancelFunc } func (r *streamRegistry) Register(namespace string) context.Context { r.mu.Lock() defer r.mu.Unlock() if cancel, ok := r.streams[namespace]; ok { cancel() } ctx, cancel := context.WithCancel(context.Background()) r.streams[namespace] = cancel return ctx } func (r *streamRegistry) Cancel(namespace string) { r.mu.Lock() defer r.mu.Unlock() if cancel, ok := r.streams[namespace]; ok { cancel() delete(r.streams, namespace) } } type PrometheusService struct { } func NewPrometheusService() *PrometheusService { return &PrometheusService{} } func (p *PrometheusService) queryPrometheus(ctx context.Context, promURL string, expr string, namespace string) models.Metric { metric := models.Metric{Name: expr, Value: -1} query := strings.ReplaceAll(expr, "%s", namespace) reqURL := promURL + "/api/v1/query?query=" + url.QueryEscape(query) req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil) if err != nil { metric.Error = err return metric } resp, err := httpClient.Do(req) if err != nil { metric.Error = err return metric } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { metric.Error = err return metric } var result PrometheusResponse if err = json.Unmarshal(body, &result); err != nil { metric.Error = err return metric } if len(result.Data.Result) > 0 && len(result.Data.Result[0].Value) == 2 { metric.Value, metric.Error = strconv.ParseFloat(fmt.Sprintf("%s", result.Data.Result[0].Value[1]), 64) } return metric } func (p *PrometheusService) Stream(ctx context.Context, bookingID string, interval time.Duration, ws *websocket.Conn) { logger := oclib.GetLogger() max := 100 count := 0 mets := map[string][]models.MetricsSnapshot{} bAccess := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.BOOKING), nil) book := bAccess.LoadOne(bookingID) if book.Err != "" { logger.Err(fmt.Errorf("stop because of empty : %s", book.Err)) return } isActive := func(e *booking.Booking) bool { if e.ExpectedEndDate == nil { return true } return time.Now().Before(*e.ExpectedEndDate) } ticker := time.NewTicker(interval) defer ticker.Stop() for isActive(book.Data.(*booking.Booking)) { select { case <-ctx.Done(): return case <-ticker.C: } b, metrics := Call(book.Data.(*booking.Booking), func(dc *live.LiveDatacenter, instance *resources.ComputeResourceInstance, metrics map[string]models.MetricsSnapshot, wg *sync.WaitGroup, mu *sync.Mutex) { defer wg.Done() for _, expr := range queriesMetrics { if mm, ok := metrics[instance.Name]; !ok { mu.Lock() metrics[instance.Name] = models.MetricsSnapshot{ From: instance.Source, Metrics: []models.Metric{p.queryPrometheus(ctx, dc.MonitorPath, expr, book.Data.(*booking.Booking).ExecutionsID)}, } mu.Unlock() } else { mu.Lock() mm.Metrics = append(mm.Metrics, p.queryPrometheus(ctx, dc.MonitorPath, expr, book.Data.(*booking.Booking).ExecutionsID)) mu.Unlock() } } }) _ = b count++ if ws != nil { if err := ws.WriteJSON(metrics); err != nil { logger.Err(fmt.Errorf("websocket write error: %w", err)) return } } if count < max { continue } bk := book.Data.(*booking.Booking) if bk.ExecutionMetrics == nil { bk.ExecutionMetrics = mets } else { for kk, vv := range mets { bk.ExecutionMetrics[kk] = append(bk.ExecutionMetrics[kk], vv...) } } bk.GetAccessor(nil).UpdateOne(bk, bookingID) mets = map[string][]models.MetricsSnapshot{} count = 0 } }