diff --git a/conf/config.go b/conf/config.go index 29f431e..f2b1879 100644 --- a/conf/config.go +++ b/conf/config.go @@ -3,14 +3,16 @@ package conf import "sync" type Config struct { - Mode string - KubeHost string - KubePort string - KubeCA string - KubeCert string - KubeData string - MinioRootKey string - MinioRootSecret string + Mode string + KubeHost string + KubePort string + KubeCA string + KubeCert string + KubeData string + MinioRootKey string + MinioRootSecret string + MonitorMode string + MonitorAddress string } var instance *Config diff --git a/controllers/booking.go b/controllers/booking.go index c825086..cb1e569 100644 --- a/controllers/booking.go +++ b/controllers/booking.go @@ -6,6 +6,7 @@ import ( "fmt" "net/http" "oc-datacenter/infrastructure" + "oc-datacenter/infrastructure/monitor" "strconv" "time" @@ -123,7 +124,7 @@ var upgrader = websocket.Upgrader{ // @Success 200 {booking} models.booking // @router /:id [get] func (o *BookingController) Log() { - user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) + // user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) id := o.Ctx.Input.Param(":id") conn, err := upgrader.Upgrade(o.Ctx.ResponseWriter, o.Ctx.Request, nil) if err != nil { @@ -131,7 +132,13 @@ func (o *BookingController) Log() { return } defer conn.Close() - infrastructure.NewPrometheusService().Stream(id, 1*time.Second, user, peerID, groups, conn) + monitors, err := monitor.NewMonitorService() + if err != nil { + o.Ctx.WriteString("Monitor service unavailable: " + err.Error()) + return + } + ctx := monitor.StreamRegistry.Register(id) + monitors.Stream(ctx, id, 1*time.Second, conn) } // @Title Update @@ -246,7 +253,7 @@ func (o *BookingController) Post() { o.ServeJSON() return } - + if resp.ResourceType == tools.COMPUTE_RESOURCE { // later should check... health for any such as docker... res := oclib.NewRequest(oclib.LibDataEnum(oclib.COMPUTE_RESOURCE), user, peerID, groups, nil).LoadOne(resp.ResourceID) @@ -308,10 +315,14 @@ func (o *BookingController) Post() { return } if b.Data.(*booking.Booking).ResourceType == tools.COMPUTE_RESOURCE { - go func() { - time.Sleep(time.Until(b.Data.(*booking.Booking).ExpectedStartDate)) - infrastructure.NewPrometheusService().Stream(b.Data.GetID(), 1*time.Second, user, peerID, groups, nil) - }() + monitors, monErr := monitor.NewMonitorService() + if monErr == nil { + go func() { + time.Sleep(time.Until(b.Data.(*booking.Booking).ExpectedStartDate)) + ctx := monitor.StreamRegistry.Register(b.Data.(*booking.Booking).ExecutionsID) + monitors.Stream(ctx, b.Data.GetID(), 1*time.Second, nil) + }() + } } logger.Info().Msg("Creating new namespace : " + resp.ExecutionsID) @@ -452,7 +463,7 @@ func (o *BookingController) createNamespace(ns string) error { } ok, err := serv.GetNamespace(o.Ctx.Request.Context(), ns) - if ok != nil && err == nil { + if ok != nil && err == nil { logger.Debug().Msg("A namespace with name " + ns + " already exists") return nil } diff --git a/go.mod b/go.mod index 5700be6..f728911 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,9 @@ module oc-datacenter -go 1.24.2 - -toolchain go1.24.4 +go 1.24.6 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260204083845-d9f646aac28b + cloud.o-forge.io/core/oc-lib v0.0.0-20260212123952-403913d8cf13 github.com/beego/beego/v2 v2.3.8 github.com/golang-jwt/jwt/v5 v5.2.2 github.com/gorilla/websocket v1.5.3 @@ -24,6 +22,7 @@ require ( github.com/biter777/countries v1.7.5 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/ebitengine/purego v0.8.4 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect @@ -54,6 +53,7 @@ require ( github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/leodido/go-urn v1.4.0 // indirect + github.com/libp2p/go-libp2p/core v0.43.0-rc2 // indirect github.com/lufia/plan9stats v0.0.0-20250317134145-8bc96cf8fc35 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.14 // indirect diff --git a/go.sum b/go.sum index c065538..8ecd3e5 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260203150531-ef916fe2d995 h1:ZDRvnzTTNHgMm cloud.o-forge.io/core/oc-lib v0.0.0-20260203150531-ef916fe2d995/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks= cloud.o-forge.io/core/oc-lib v0.0.0-20260204083845-d9f646aac28b h1:/TkmuO5ERpHJCqNpKBlmzw8pYTVDGcFcDo+e1ndXlm0= cloud.o-forge.io/core/oc-lib v0.0.0-20260204083845-d9f646aac28b/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks= +cloud.o-forge.io/core/oc-lib v0.0.0-20260212123952-403913d8cf13 h1:DNIPQ7C+7wjbj5RUx29wLxuIe/wiSOcuUMlLRIv6Fvs= +cloud.o-forge.io/core/oc-lib v0.0.0-20260212123952-403913d8cf13/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/beego/beego/v2 v2.3.8 h1:wplhB1pF4TxR+2SS4PUej8eDoH4xGfxuHfS7wAk9VBc= github.com/beego/beego/v2 v2.3.8/go.mod h1:8vl9+RrXqvodrl9C8yivX1e6le6deCK6RWeq8R7gTTg= @@ -26,6 +28,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 h1:NMZiJj8QnKe1LgsbDayM4UoHwbvwDRwnI3hwNaAHRnc= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/ebitengine/purego v0.8.4 h1:CF7LEKg5FFOsASUj0+QwaXf8Ht6TlFxg09+S9wz0omw= @@ -120,6 +124,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/libp2p/go-libp2p/core v0.43.0-rc2 h1:1X1aDJNWhMfodJ/ynbaGLkgnC8f+hfBIqQDrzxFZOqI= +github.com/libp2p/go-libp2p/core v0.43.0-rc2/go.mod h1:NYeJ9lvyBv9nbDk2IuGb8gFKEOkIv/W5YRIy1pAJB2Q= github.com/lufia/plan9stats v0.0.0-20250317134145-8bc96cf8fc35 h1:PpXWgLPs+Fqr325bN2FD2ISlRRztXibcX6e8f5FR5Dc= github.com/lufia/plan9stats v0.0.0-20250317134145-8bc96cf8fc35/go.mod h1:autxFIvghDt3jPTLoqZ9OZ7s9qTGNAWmYCjVFWPX/zg= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= diff --git a/infrastructure/kubernetes.go b/infrastructure/kubernetes.go index 3478553..7433107 100644 --- a/infrastructure/kubernetes.go +++ b/infrastructure/kubernetes.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "oc-datacenter/conf" + "oc-datacenter/infrastructure/monitor" "strings" "time" @@ -27,7 +28,6 @@ import ( var gvrSources = schema.GroupVersionResource{Group: "multicluster.admiralty.io", Version: "v1alpha1", Resource: "sources"} var gvrTargets = schema.GroupVersionResource{Group: "multicluster.admiralty.io", Version: "v1alpha1", Resource: "targets"} - type KubernetesService struct { Set *kubernetes.Clientset } @@ -223,9 +223,7 @@ func (k *KubernetesService) DeleteNamespace(ctx context.Context, ns string) erro if err := k.Set.CoreV1().Namespaces().Delete(ctx, ns, metav1.DeleteOptions{}); err != nil { return errors.New("Error deleting namespace: " + err.Error()) } - LockKill.Lock() - Kill = append(Kill, ns) - LockKill.Unlock() + monitor.StreamRegistry.Cancel(ns) fmt.Println("Namespace deleted successfully!") return nil } @@ -385,7 +383,7 @@ func (k *KubernetesService) CreateKubeconfigSecret(context context.Context, kube return nil, err } - secretApplyConfig := apply.Secret("kube-secret-"+ oclib.GetConcatenatedName(peerId, executionId), + secretApplyConfig := apply.Secret("kube-secret-"+oclib.GetConcatenatedName(peerId, executionId), executionId). WithData(map[string][]byte{ "config": config, @@ -460,8 +458,8 @@ func (k *KubernetesService) DeleteKubeConfigSecret(executionID string) ([]byte, return []byte{}, nil } -func (k *KubernetesService) GetNamespace(context context.Context, executionID string)(*v1.Namespace,error){ - resp, err := k.Set.CoreV1().Namespaces().Get(context,executionID,metav1.GetOptions{}) +func (k *KubernetesService) GetNamespace(context context.Context, executionID string) (*v1.Namespace, error) { + resp, err := k.Set.CoreV1().Namespaces().Get(context, executionID, metav1.GetOptions{}) if apierrors.IsNotFound(err) { return nil, nil } @@ -474,7 +472,6 @@ func (k *KubernetesService) GetNamespace(context context.Context, executionID st return resp, nil } - func getCDRapiKube(client kubernetes.Clientset, ctx context.Context, path string) ([]byte, error) { resp, err := client.RESTClient().Get(). AbsPath(path). @@ -594,11 +591,11 @@ func (k *KubernetesService) CreateSecret(context context.Context, minioId string Type: v1.SecretTypeOpaque, Data: data, ObjectMeta: metav1.ObjectMeta{ - Name: minioId+"-secret-s3", + Name: minioId + "-secret-s3", }, } - _, err := k.Set.CoreV1().Secrets(executionID).Create(context,&s,metav1.CreateOptions{}) + _, err := k.Set.CoreV1().Secrets(executionID).Create(context, &s, metav1.CreateOptions{}) if err != nil { logger := oclib.GetLogger() logger.Error().Msg("An error happened when creating the secret holding minio credentials in namespace " + executionID + " : " + err.Error()) diff --git a/infrastructure/monitor/monitor.go b/infrastructure/monitor/monitor.go new file mode 100644 index 0000000..87be824 --- /dev/null +++ b/infrastructure/monitor/monitor.go @@ -0,0 +1,75 @@ +package monitor + +import ( + "context" + "errors" + "fmt" + "oc-datacenter/conf" + "sync" + "time" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/dbs" + "cloud.o-forge.io/core/oc-lib/models/booking" + "cloud.o-forge.io/core/oc-lib/models/common/models" + "cloud.o-forge.io/core/oc-lib/models/live" + "cloud.o-forge.io/core/oc-lib/models/resources" + "github.com/gorilla/websocket" +) + +type MonitorInterface interface { + Stream(ctx context.Context, bookingID string, interval time.Duration, ws *websocket.Conn) +} + +var _monitorService = map[string]func() MonitorInterface{ + "prometheus": func() MonitorInterface { return NewPrometheusService() }, + "vector": func() MonitorInterface { return NewVectorService() }, +} + +func NewMonitorService() (MonitorInterface, error) { + service, ok := _monitorService[conf.GetConfig().MonitorMode] + if !ok { + return nil, errors.New("monitor service not found") + } + return service(), nil +} + +func Call(book *booking.Booking, + f func(*live.LiveDatacenter, *resources.ComputeResourceInstance, + map[string]models.MetricsSnapshot, *sync.WaitGroup, *sync.Mutex)) (*booking.Booking, map[string]models.MetricsSnapshot) { + logger := oclib.GetLogger() + metrics := map[string]models.MetricsSnapshot{} + var wg sync.WaitGroup + var mu sync.Mutex + + cUAccess := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.LIVE_DATACENTER), nil) + cRAccess := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.COMPUTE_RESOURCE), nil) + + rr := cRAccess.LoadOne(book.ResourceID) + if rr.Err != "" { + logger.Err(fmt.Errorf("can't proceed because of unfound resource %s : %s", book.ResourceID, rr.Err)) + return book, metrics + } + computeRes := rr.ToComputeResource() + for _, instance := range computeRes.Instances { + res := cUAccess.Search(&dbs.Filters{ + And: map[string][]dbs.Filter{ + "source": {{Operator: dbs.EQUAL.String(), Value: instance.Source}}, + "abstractlive.resources_id": {{Operator: dbs.EQUAL.String(), Value: computeRes.GetID()}}, + }, + }, "", false) + if res.Err != "" { + continue + } + for _, r := range res.Data { + dc := r.(*live.LiveDatacenter) + if dc.MonitorPath == "" { + continue + } + wg.Add(1) + go f(dc, instance, metrics, &wg, &mu) + } + } + wg.Wait() + return book, metrics +} diff --git a/infrastructure/monitor/prometheus.go b/infrastructure/monitor/prometheus.go new file mode 100644 index 0000000..1b0bf47 --- /dev/null +++ b/infrastructure/monitor/prometheus.go @@ -0,0 +1,194 @@ +package monitor + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/models/booking" + "cloud.o-forge.io/core/oc-lib/models/common/models" + "cloud.o-forge.io/core/oc-lib/models/live" + "cloud.o-forge.io/core/oc-lib/models/resources" + "github.com/gorilla/websocket" +) + +type PrometheusResponse struct { + Status string `json:"status"` + Data struct { + ResultType string `json:"resultType"` + Result []struct { + Metric map[string]string `json:"metric"` + Value []interface{} `json:"value"` // [timestamp, value] + } `json:"result"` + } `json:"data"` +} + +var queriesMetrics = []string{ + "rate(container_cpu_usage_seconds_total{namespace=\"%s\"}[1m]) * 100", + "container_memory_usage_bytes{namespace=\"%s\"}", + "(container_fs_usage_bytes{namespace=\"%s\"}) / (container_fs_limit_bytes{namespace=\"%s\"}) * 100", + "DCGM_FI_DEV_GPU_UTIL{namespace=\"%s\"}", + "rate(container_fs_reads_bytes_total{namespace=\"%s\"}[1m])", + "rate(container_fs_writes_bytes_total{namespace=\"%s\"}[1m])", + "rate(container_network_receive_bytes_total{namespace=\"%s\"}[1m])", + "rate(container_network_transmit_bytes_total{namespace=\"%s\"}[1m])", + "rate(http_requests_total{namespace=\"%s\"}[1m])", + "(rate(http_requests_total{status=~\"5..\", namespace=\"%s\"}[1m]) / rate(http_requests_total{namespace=\"%s\"}[1m])) * 100", +} + +var httpClient = &http.Client{ + Timeout: 10 * time.Second, +} + +// StreamRegistry manages cancellation of active monitoring streams by namespace. +var StreamRegistry = &streamRegistry{ + streams: map[string]context.CancelFunc{}, +} + +type streamRegistry struct { + mu sync.Mutex + streams map[string]context.CancelFunc +} + +func (r *streamRegistry) Register(namespace string) context.Context { + r.mu.Lock() + defer r.mu.Unlock() + if cancel, ok := r.streams[namespace]; ok { + cancel() + } + ctx, cancel := context.WithCancel(context.Background()) + r.streams[namespace] = cancel + return ctx +} + +func (r *streamRegistry) Cancel(namespace string) { + r.mu.Lock() + defer r.mu.Unlock() + if cancel, ok := r.streams[namespace]; ok { + cancel() + delete(r.streams, namespace) + } +} + +type PrometheusService struct { +} + +func NewPrometheusService() *PrometheusService { + return &PrometheusService{} +} + +func (p *PrometheusService) queryPrometheus(ctx context.Context, promURL string, expr string, namespace string) models.Metric { + metric := models.Metric{Name: expr, Value: -1} + query := strings.ReplaceAll(expr, "%s", namespace) + reqURL := promURL + "/api/v1/query?query=" + url.QueryEscape(query) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, reqURL, nil) + if err != nil { + metric.Error = err + return metric + } + resp, err := httpClient.Do(req) + if err != nil { + metric.Error = err + return metric + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + metric.Error = err + return metric + } + var result PrometheusResponse + if err = json.Unmarshal(body, &result); err != nil { + metric.Error = err + return metric + } + if len(result.Data.Result) > 0 && len(result.Data.Result[0].Value) == 2 { + metric.Value, metric.Error = strconv.ParseFloat(fmt.Sprintf("%s", result.Data.Result[0].Value[1]), 64) + } + return metric +} + +func (p *PrometheusService) Stream(ctx context.Context, bookingID string, interval time.Duration, ws *websocket.Conn) { + logger := oclib.GetLogger() + max := 100 + count := 0 + mets := map[string][]models.MetricsSnapshot{} + bAccess := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.BOOKING), nil) + book := bAccess.LoadOne(bookingID) + if book.Err != "" { + logger.Err(fmt.Errorf("stop because of empty : %s", book.Err)) + return + } + + isActive := func(e *booking.Booking) bool { + if e.ExpectedEndDate == nil { + return true + } + return time.Now().Before(*e.ExpectedEndDate) + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for isActive(book.Data.(*booking.Booking)) { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + b, metrics := Call(book.Data.(*booking.Booking), + func(dc *live.LiveDatacenter, instance *resources.ComputeResourceInstance, + metrics map[string]models.MetricsSnapshot, + wg *sync.WaitGroup, mu *sync.Mutex) { + defer wg.Done() + for _, expr := range queriesMetrics { + if mm, ok := metrics[instance.Name]; !ok { + mu.Lock() + metrics[instance.Name] = models.MetricsSnapshot{ + From: instance.Source, + Metrics: []models.Metric{p.queryPrometheus(ctx, dc.MonitorPath, expr, book.Data.(*booking.Booking).ExecutionsID)}, + } + mu.Unlock() + } else { + mu.Lock() + mm.Metrics = append(mm.Metrics, p.queryPrometheus(ctx, dc.MonitorPath, expr, book.Data.(*booking.Booking).ExecutionsID)) + mu.Unlock() + } + } + }) + _ = b + count++ + + if ws != nil { + if err := ws.WriteJSON(metrics); err != nil { + logger.Err(fmt.Errorf("websocket write error: %w", err)) + return + } + } + + if count < max { + continue + } + + bk := book.Data.(*booking.Booking) + if bk.ExecutionMetrics == nil { + bk.ExecutionMetrics = mets + } else { + for kk, vv := range mets { + bk.ExecutionMetrics[kk] = append(bk.ExecutionMetrics[kk], vv...) + } + } + bk.GetAccessor(nil).UpdateOne(bk, bookingID) + mets = map[string][]models.MetricsSnapshot{} + count = 0 + } +} diff --git a/infrastructure/monitor/vector.go b/infrastructure/monitor/vector.go new file mode 100644 index 0000000..c5c1661 --- /dev/null +++ b/infrastructure/monitor/vector.go @@ -0,0 +1,153 @@ +package monitor + +import ( + "context" + "encoding/json" + "fmt" + "log" + "sync" + "time" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/models/booking" + "cloud.o-forge.io/core/oc-lib/models/common/models" + "cloud.o-forge.io/core/oc-lib/models/live" + "cloud.o-forge.io/core/oc-lib/models/resources" + "github.com/gorilla/websocket" +) + +// --- Structure métrique Vector --- +type VectorMetric struct { + Name string `json:"name"` + Value float64 `json:"value"` + Labels map[string]string `json:"labels"` + Timestamp int64 `json:"timestamp"` +} + +// --- Service Vector --- +type VectorService struct { + mu sync.Mutex + ExecutionMetrics map[string][]models.MetricsSnapshot // bookingID -> snapshots + sessions map[string]context.CancelFunc // optional: WS clients +} + +func NewVectorService() *VectorService { + return &VectorService{ + ExecutionMetrics: make(map[string][]models.MetricsSnapshot), + sessions: make(map[string]context.CancelFunc), + } +} + +// --- Connexion à un flux Vector en WS --- +func (v *VectorService) ListenVector(ctx context.Context, b *booking.Booking, interval time.Duration, ws *websocket.Conn) error { + max := 100 + count := 0 + mets := map[string][]models.MetricsSnapshot{} + isActive := func() bool { + if b.ExpectedEndDate == nil { + return true + } + return time.Now().Before(*b.ExpectedEndDate) + } + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for isActive() { + select { + case <-ctx.Done(): + return nil + case <-ticker.C: + } + + bb, metrics := Call(b, + func(dc *live.LiveDatacenter, instance *resources.ComputeResourceInstance, metrics map[string]models.MetricsSnapshot, + wg *sync.WaitGroup, mu *sync.Mutex) { + c, _, err := websocket.DefaultDialer.Dial(dc.MonitorPath, nil) + if err != nil { + return + } + defer c.Close() + _, msg, err := c.ReadMessage() + if err != nil { + log.Println("vector ws read error:", err) + return + } + + var m models.Metric + if err := json.Unmarshal(msg, &m); err != nil { + log.Println("json unmarshal error:", err) + return + } + mu.Lock() + if mm, ok := metrics[instance.Name]; !ok { + metrics[instance.Name] = models.MetricsSnapshot{ + From: instance.Source, + Metrics: []models.Metric{m}, + } + } else { + mm.Metrics = append(mm.Metrics, m) + } + + mu.Unlock() + }) + _ = bb + for k, v := range metrics { + mets[k] = append(mets[k], v) + } + count++ + + if ws != nil { + if err := ws.WriteJSON(metrics); err != nil { + return fmt.Errorf("websocket write error: %w", err) + } + } + + if count < max { + continue + } + + if b.ExecutionMetrics == nil { + b.ExecutionMetrics = mets + } else { + for kk, vv := range mets { + b.ExecutionMetrics[kk] = append(b.ExecutionMetrics[kk], vv...) + } + } + b.GetAccessor(nil).UpdateOne(b, b.GetID()) + mets = map[string][]models.MetricsSnapshot{} + count = 0 + } + + return nil +} + +// --- Permet d'ajouter un front WebSocket pour recevoir les metrics live --- +// --- Permet de récupérer le cache historique pour un booking --- +func (v *VectorService) GetCache(bookingID string) []models.MetricsSnapshot { + v.mu.Lock() + defer v.mu.Unlock() + snapshots := v.ExecutionMetrics[bookingID] + return snapshots +} + +// --- Exemple d'intégration avec un booking --- +func (v *VectorService) Stream(ctx context.Context, bookingID string, interval time.Duration, ws *websocket.Conn) { + logger := oclib.GetLogger() + go func() { + bAccess := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.BOOKING), nil) + book := bAccess.LoadOne(bookingID) + if book.Err != "" { + logger.Err(fmt.Errorf("stop because of empty : %s", book.Err)) + return + } + b := book.ToBookings() + if b == nil { + logger.Err(fmt.Errorf("stop because of empty is not a booking")) + return + } + if err := v.ListenVector(ctx, b, interval, ws); err != nil { + log.Printf("Vector listen error for booking %s: %v\n", b.GetID(), err) + } + }() +} diff --git a/infrastructure/prometheus.go b/infrastructure/prometheus.go deleted file mode 100644 index 8a21c28..0000000 --- a/infrastructure/prometheus.go +++ /dev/null @@ -1,204 +0,0 @@ -package infrastructure - -import ( - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" - "slices" - "strconv" - "sync" - "time" - - oclib "cloud.o-forge.io/core/oc-lib" - "cloud.o-forge.io/core/oc-lib/dbs" - "cloud.o-forge.io/core/oc-lib/models/booking" - "cloud.o-forge.io/core/oc-lib/models/common/models" - "cloud.o-forge.io/core/oc-lib/models/live" - "github.com/gorilla/websocket" -) - -type MetricsSnapshot struct { - From string `json:"origin"` - Metrics []Metric `json:"metrics"` -} - -type Metric struct { - Name string `json:"name"` - Value float64 `json:"value"` - Error error `json:"error"` -} - -type PrometheusResponse struct { - Status string `json:"status"` - Data struct { - ResultType string `json:"resultType"` - Result []struct { - Metric map[string]string `json:"metric"` - Value []interface{} `json:"value"` // [timestamp, value] - } `json:"result"` - } `json:"data"` -} - -var queriesMetrics = []string{ - "rate(container_cpu_usage_seconds_total{namespace=\"%s\"}[1m]) * 100", - "container_memory_usage_bytes{namespace=\"%s\"}", - "(container_fs_usage_bytes{namespace=\"%s\"}) / (container_fs_limit_bytes{namespace=\"%s\"}) * 100", - "DCGM_FI_DEV_GPU_UTIL{namespace=\"%s\"}", - // "system_load_average", - "rate(container_fs_reads_bytes_total{namespace=\"%s\"}[1m])", - "rate(container_fs_writes_bytes_total{namespace=\"%s\"}[1m])", - "rate(container_network_receive_bytes_total{namespace=\"%s\"}[1m])", - "rate(container_network_transmit_bytes_total{namespace=\"%s\"}[1m])", - // "system_network_latency_ms", - "rate(http_requests_total{namespace=\"%s\"}[1m])", - "(rate(http_requests_total{status=~\"5..\", namespace=\"%s\"}[1m]) / rate(http_requests_total{namespace=\"%s\"}[1m])) * 100", - // "app_mean_time_to_repair_seconds", - // "app_mean_time_between_failure_seconds", -} - -type PrometheusService struct { -} - -func NewPrometheusService() *PrometheusService { - return &PrometheusService{} -} - -func (p *PrometheusService) queryPrometheus(promURL string, expr string, namespace string) models.Metric { - metric := models.Metric{Name: expr, Value: -1} - resp, err := http.Get(promURL + "/api/v1/query?query=" + url.QueryEscape(fmt.Sprintf(expr, namespace))) - if err != nil { - metric.Error = err - } else { - defer resp.Body.Close() - if body, err := io.ReadAll(resp.Body); err == nil { - var result PrometheusResponse - if err = json.Unmarshal(body, &result); err == nil && len(result.Data.Result) > 0 && len(result.Data.Result[0].Value) == 2 { - metric.Value, metric.Error = strconv.ParseFloat(fmt.Sprintf("%s", result.Data.Result[0].Value[1]), 64) - } - } - } - return metric -} - -func (p *PrometheusService) Call(book *booking.Booking, user string, peerID string, groups []string) (*booking.Booking, map[string]models.MetricsSnapshot) { - logger := oclib.GetLogger() - var wg sync.WaitGroup - - metrics := map[string]models.MetricsSnapshot{} - // get all booking... from executions_id == namespace typed datacenter. - - cUAccess := oclib.NewRequest(oclib.LibDataEnum(oclib.LIVE_DATACENTER), user, peerID, groups, nil) - cRAccess := oclib.NewRequest(oclib.LibDataEnum(oclib.COMPUTE_RESOURCE), user, peerID, groups, nil) - - rr := cRAccess.LoadOne(book.ResourceID) - if rr.Err != "" { - logger.Err(fmt.Errorf("can't proceed because of unfound resource %s : %s", book.ResourceID, rr.Err)) - return book, metrics - } - computeRes := rr.ToComputeResource() - for _, instance := range computeRes.Instances { - res := cUAccess.Search(&dbs.Filters{ - And: map[string][]dbs.Filter{ - "source": {{Operator: dbs.EQUAL.String(), Value: instance.Source}}, - "abstractlive.resources_id": {{Operator: dbs.EQUAL.String(), Value: computeRes.GetID()}}, - }, - }, "", false) - if res.Err != "" { - continue - } - for _, r := range res.Data { - // TODO watch out ... to not exec on an absent datacenter... - if r.(*live.LiveDatacenter).MonitorPath == "" { - continue - } - wg.Add(1) - snapshot := models.MetricsSnapshot{From: instance.Source, Metrics: []models.Metric{}} - go func() { - defer wg.Done() - for _, expr := range queriesMetrics { - snapshot.Metrics = append(snapshot.Metrics, - p.queryPrometheus(r.(*live.LiveDatacenter).MonitorPath, expr, book.ExecutionsID)) - } - metrics[instance.Name] = snapshot - }() - } - } - wg.Wait() - return book, metrics -} - -var LockKill = &sync.Mutex{} - -// TODO kill procedure -func (p *PrometheusService) Stream(bookingID string, interval time.Duration, user string, peerID string, groups []string, websocket *websocket.Conn) { - logger := oclib.GetLogger() - max := 100 - bookIDS := []string{} - mets := map[string][]models.MetricsSnapshot{} - bAccess := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), user, peerID, groups, nil) - book := bAccess.LoadOne(bookingID) - if book.Err != "" { - logger.Err(fmt.Errorf("stop because of empty : %s", book.Err)) - } - f := func(e *booking.Booking) bool { - if e.ExpectedEndDate == nil { - return true - } - return time.Now().Before(*e.ExpectedEndDate) - } - for f(book.Data.(*booking.Booking)) { - if slices.Contains(Kill, book.Data.(*booking.Booking).ExecutionsID) { - newKill := []string{} - for _, k := range Kill { - if k != book.Data.(*booking.Booking).ExecutionsID { - newKill = append(newKill, k) - } - } - LockKill.Lock() - Kill = newKill - LockKill.Unlock() - break - } - - go func() { - book, metrics := p.Call(book.Data.(*booking.Booking), user, peerID, groups) - for k, v := range metrics { - if me, ok := mets[k]; !ok { - mets[k] = []models.MetricsSnapshot{v} - } else { - me = append(me, v) - mets[k] = me - } - } - bookIDS = append(bookIDS, bookingID) - if websocket != nil { - (*websocket).WriteJSON(metrics) - } - if len(bookIDS) != max { - return - } - if book.ExecutionMetrics == nil { - book.ExecutionMetrics = mets - } else { - for kk, vv := range mets { - if em, ok := book.ExecutionMetrics[kk]; !ok { - book.ExecutionMetrics[kk] = vv - } else { - em = append(em, vv...) - book.ExecutionMetrics[kk] = em - } - } - } - book.GetAccessor(nil).UpdateOne(book, bookingID) - bookIDS = []string{} - - }() - time.Sleep(interval) - } -} - -var Kill = []string{} - -// should add a datacenter... under juridiction... of opencloud... diff --git a/main.go b/main.go index fe780d2..dd4a982 100644 --- a/main.go +++ b/main.go @@ -7,6 +7,7 @@ import ( "os" oclib "cloud.o-forge.io/core/oc-lib" + beego "github.com/beego/beego/v2/server/web" ) const appname = "oc-datacenter" @@ -31,7 +32,9 @@ func main() { conf.GetConfig().KubeData = string(sDec) } + conf.GetConfig().MonitorMode = o.GetStringDefault("MONITOR_MODE", "prometheus") conf.GetConfig().MinioRootKey = o.GetStringDefault("MINIO_ADMIN_ACCESS", "") conf.GetConfig().MinioRootSecret = o.GetStringDefault("MINIO_ADMIN_SECRET", "") oclib.InitAPI(appname) + beego.Run() } diff --git a/routers/commentsRouter.go b/routers/commentsRouter.go index c6f63cd..23a737d 100644 --- a/routers/commentsRouter.go +++ b/routers/commentsRouter.go @@ -214,6 +214,15 @@ func init() { Filters: nil, Params: nil}) + beego.GlobalControllerRouter["oc-datacenter/controllers:VectorController"] = append(beego.GlobalControllerRouter["oc-datacenter/controllers:VectorController"], + beego.ControllerComments{ + Method: "Receive", + Router: `/`, + AllowHTTPMethods: []string{"post"}, + MethodParams: param.Make(), + Filters: nil, + Params: nil}) + beego.GlobalControllerRouter["oc-datacenter/controllers:VersionController"] = append(beego.GlobalControllerRouter["oc-datacenter/controllers:VersionController"], beego.ControllerComments{ Method: "GetAll",