package infrastructure import ( "encoding/json" "fmt" "io" "net/http" "net/url" "strconv" "sync" "time" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/common/models" "cloud.o-forge.io/core/oc-lib/models/compute_units" ) type MetricsSnapshot struct { From string `json:"origin"` Metrics []Metric `json:"metrics"` } type Metric struct { Name string `json:"name"` Value float64 `json:"value"` Error error `json:"error"` } type PrometheusResponse struct { Status string `json:"status"` Data struct { ResultType string `json:"resultType"` Result []struct { Metric map[string]string `json:"metric"` Value []interface{} `json:"value"` // [timestamp, value] } `json:"result"` } `json:"data"` } var queriesMetrics = []string{ "rate(container_cpu_usage_seconds_total{namespace=\"%s\"}[1m]) * 100", "container_memory_usage_bytes{namespace=\"%s\"}", "(container_fs_usage_bytes{namespace=\"%s\"}) / (container_fs_limit_bytes{namespace=\"%s\"}) * 100", "DCGM_FI_DEV_GPU_UTIL{namespace=\"%s\"}", // "system_load_average", "rate(container_fs_reads_bytes_total{namespace=\"%s\"}[1m])", "rate(container_fs_writes_bytes_total{namespace=\"%s\"}[1m])", "rate(container_network_receive_bytes_total{namespace=\"%s\"}[1m])", "rate(container_network_transmit_bytes_total{namespace=\"%s\"}[1m])", // "system_network_latency_ms", "rate(http_requests_total{namespace=\"%s\"}[1m])", "(rate(http_requests_total{status=~\"5..\", namespace=\"%s\"}[1m]) / rate(http_requests_total{namespace=\"%s\"}[1m])) * 100", // "app_mean_time_to_repair_seconds", // "app_mean_time_between_failure_seconds", } type PrometheusService struct { } func NewPrometheusService() *PrometheusService { return &PrometheusService{} } func (p *PrometheusService) queryPrometheus(promURL string, expr string, namespace string) models.Metric { metric := models.Metric{Name: expr, Value: -1} resp, err := http.Get(promURL + "/api/v1/query?query=" + url.QueryEscape(fmt.Sprintf(expr, namespace))) if err != nil { metric.Error = err } else { defer resp.Body.Close() if body, err := io.ReadAll(resp.Body); err == nil { var result PrometheusResponse if err = json.Unmarshal(body, &result); err == nil && len(result.Data.Result) > 0 && len(result.Data.Result[0].Value) == 2 { metric.Value, metric.Error = strconv.ParseFloat(fmt.Sprintf("%s", result.Data.Result[0].Value[1]), 64) } } } return metric } func (p *PrometheusService) Call(bookingID string) (*booking.Booking, map[string]models.MetricsSnapshot) { var wg sync.WaitGroup metrics := map[string]models.MetricsSnapshot{} // get all booking... from executions_id == namespace typed datacenter. bAccess := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", "", []string{}, nil) book := bAccess.LoadOne(bookingID) if book.Err != "" { fmt.Errorf("stop because of empty : %s", book.Err) return nil, metrics } cUAccess := oclib.NewRequest(oclib.LibDataEnum(oclib.COMPUTE_UNITS), "", "", []string{}, nil) cRAccess := oclib.NewRequest(oclib.LibDataEnum(oclib.COMPUTE_RESOURCE), "", "", []string{}, nil) rr := cRAccess.LoadOne(book.Data.(*booking.Booking).ResourceID) if rr.Err != "" { fmt.Errorf("can't proceed because of unfound resource %s : %s", book.Data.(*booking.Booking).ResourceID, rr.Err) return book.Data.(*booking.Booking), metrics } computeRes := rr.ToComputeResource() for _, instance := range computeRes.Instances { res := cUAccess.Search(&dbs.Filters{ And: map[string][]dbs.Filter{ "source": {{Operator: dbs.EQUAL.String(), Value: instance.Source}}, }, }, "", false) if res.Err != "" { continue } for _, r := range res.Data { // TODO watch out ... to not exec on an absent datacenter... if r.(*compute_units.ComputeUnits).MonitorPath == "" { continue } wg.Add(1) snapshot := models.MetricsSnapshot{From: instance.Source, Metrics: []models.Metric{}} go func() { defer wg.Done() for _, expr := range queriesMetrics { snapshot.Metrics = append(snapshot.Metrics, p.queryPrometheus(r.(*compute_units.ComputeUnits).MonitorPath, expr, book.Data.(*booking.Booking).ExecutionsID)) } metrics[instance.Name] = snapshot }() } } wg.Wait() return book.Data.(*booking.Booking), metrics } func (p *PrometheusService) Stream(bookingID string, end *time.Time, interval time.Duration, flusher http.Flusher, encoder *json.Encoder) { if end != nil { max := 100 count := 0 mets := map[string][]models.MetricsSnapshot{} for time.Now().Before(*end) { go func() { count++ book, metrics := p.Call(bookingID) for k, v := range metrics { if me, ok := mets[k]; !ok { mets[k] = []models.MetricsSnapshot{v} } else { me = append(me, v) mets[k] = me } } encoder.Encode(metrics) flusher.Flush() if count == max { if book.ExecutionMetrics == nil { book.ExecutionMetrics = mets } else { for kk, vv := range mets { if em, ok := book.ExecutionMetrics[kk]; !ok { book.ExecutionMetrics[kk] = vv } else { em = append(em, vv...) book.ExecutionMetrics[kk] = em } } } book.GetAccessor(nil).UpdateOne(book, bookingID) } }() time.Sleep(interval) } } else { // todo an anchor... detecting the end of a task. } } // should add a datacenter... under juridiction... of opencloud...