oc-datacenter/infrastructure/prometheus.go
2025-06-18 11:18:12 +02:00

204 lines
5.9 KiB
Go

package infrastructure
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"slices"
"strconv"
"sync"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/compute_units"
"github.com/gorilla/websocket"
)
type MetricsSnapshot struct {
From string `json:"origin"`
Metrics []Metric `json:"metrics"`
}
type Metric struct {
Name string `json:"name"`
Value float64 `json:"value"`
Error error `json:"error"`
}
type PrometheusResponse struct {
Status string `json:"status"`
Data struct {
ResultType string `json:"resultType"`
Result []struct {
Metric map[string]string `json:"metric"`
Value []interface{} `json:"value"` // [timestamp, value]
} `json:"result"`
} `json:"data"`
}
var queriesMetrics = []string{
"rate(container_cpu_usage_seconds_total{namespace=\"%s\"}[1m]) * 100",
"container_memory_usage_bytes{namespace=\"%s\"}",
"(container_fs_usage_bytes{namespace=\"%s\"}) / (container_fs_limit_bytes{namespace=\"%s\"}) * 100",
"DCGM_FI_DEV_GPU_UTIL{namespace=\"%s\"}",
// "system_load_average",
"rate(container_fs_reads_bytes_total{namespace=\"%s\"}[1m])",
"rate(container_fs_writes_bytes_total{namespace=\"%s\"}[1m])",
"rate(container_network_receive_bytes_total{namespace=\"%s\"}[1m])",
"rate(container_network_transmit_bytes_total{namespace=\"%s\"}[1m])",
// "system_network_latency_ms",
"rate(http_requests_total{namespace=\"%s\"}[1m])",
"(rate(http_requests_total{status=~\"5..\", namespace=\"%s\"}[1m]) / rate(http_requests_total{namespace=\"%s\"}[1m])) * 100",
// "app_mean_time_to_repair_seconds",
// "app_mean_time_between_failure_seconds",
}
type PrometheusService struct {
}
func NewPrometheusService() *PrometheusService {
return &PrometheusService{}
}
func (p *PrometheusService) queryPrometheus(promURL string, expr string, namespace string) models.Metric {
metric := models.Metric{Name: expr, Value: -1}
resp, err := http.Get(promURL + "/api/v1/query?query=" + url.QueryEscape(fmt.Sprintf(expr, namespace)))
if err != nil {
metric.Error = err
} else {
defer resp.Body.Close()
if body, err := io.ReadAll(resp.Body); err == nil {
var result PrometheusResponse
if err = json.Unmarshal(body, &result); err == nil && len(result.Data.Result) > 0 && len(result.Data.Result[0].Value) == 2 {
metric.Value, metric.Error = strconv.ParseFloat(fmt.Sprintf("%s", result.Data.Result[0].Value[1]), 64)
}
}
}
return metric
}
func (p *PrometheusService) Call(book *booking.Booking, user string, peerID string, groups []string) (*booking.Booking, map[string]models.MetricsSnapshot) {
var wg sync.WaitGroup
metrics := map[string]models.MetricsSnapshot{}
// get all booking... from executions_id == namespace typed datacenter.
cUAccess := oclib.NewRequest(oclib.LibDataEnum(oclib.COMPUTE_UNITS), user, peerID, groups, nil)
cRAccess := oclib.NewRequest(oclib.LibDataEnum(oclib.COMPUTE_RESOURCE), user, peerID, groups, nil)
rr := cRAccess.LoadOne(book.ResourceID)
if rr.Err != "" {
fmt.Errorf("can't proceed because of unfound resource %s : %s", book.ResourceID, rr.Err)
return book, metrics
}
computeRes := rr.ToComputeResource()
for _, instance := range computeRes.Instances {
res := cUAccess.Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"source": {{Operator: dbs.EQUAL.String(), Value: instance.Source}},
},
}, "", false)
if res.Err != "" {
continue
}
for _, r := range res.Data {
// TODO watch out ... to not exec on an absent datacenter...
if r.(*compute_units.ComputeUnits).MonitorPath == "" {
continue
}
wg.Add(1)
snapshot := models.MetricsSnapshot{From: instance.Source, Metrics: []models.Metric{}}
go func() {
defer wg.Done()
for _, expr := range queriesMetrics {
snapshot.Metrics = append(snapshot.Metrics,
p.queryPrometheus(r.(*compute_units.ComputeUnits).MonitorPath, expr, book.ExecutionsID))
}
metrics[instance.Name] = snapshot
}()
}
}
wg.Wait()
return book, metrics
}
var LockKill = &sync.Mutex{}
// TODO kill procedure
func (p *PrometheusService) Stream(bookingID string, interval time.Duration, user string, peerID string, groups []string, websocket *websocket.Conn) {
max := 100
bookIDS := []string{}
mets := map[string][]models.MetricsSnapshot{}
bAccess := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), user, peerID, groups, nil)
book := bAccess.LoadOne(bookingID)
if book.Err != "" {
fmt.Errorf("stop because of empty : %s", book.Err)
}
f := func(e *booking.Booking) bool {
if e.ExpectedEndDate == nil {
return true
}
return time.Now().Before(*e.ExpectedEndDate)
}
for f(book.Data.(*booking.Booking)) {
if slices.Contains(Kill, book.Data.(*booking.Booking).ExecutionsID) {
newKill := []string{}
for _, k := range Kill {
if k != book.Data.(*booking.Booking).ExecutionsID {
newKill = append(newKill, k)
}
}
LockKill.Lock()
Kill = newKill
LockKill.Unlock()
break
}
go func() {
book, metrics := p.Call(book.Data.(*booking.Booking), user, peerID, groups)
for k, v := range metrics {
if me, ok := mets[k]; !ok {
mets[k] = []models.MetricsSnapshot{v}
} else {
me = append(me, v)
mets[k] = me
}
}
bookIDS = append(bookIDS, bookingID)
if websocket != nil {
(*websocket).WriteJSON(metrics)
}
if len(bookIDS) != max {
return
}
if book.ExecutionMetrics == nil {
book.ExecutionMetrics = mets
} else {
for kk, vv := range mets {
if em, ok := book.ExecutionMetrics[kk]; !ok {
book.ExecutionMetrics[kk] = vv
} else {
em = append(em, vv...)
book.ExecutionMetrics[kk] = em
}
}
}
book.GetAccessor(nil).UpdateOne(book, bookingID)
bookIDS = []string{}
}()
time.Sleep(interval)
}
}
var Kill = []string{}
// should add a datacenter... under juridiction... of opencloud...