From 4e1e3f20afe7b06d2042279a8fd8e3918fa57ea0 Mon Sep 17 00:00:00 2001 From: mr Date: Wed, 18 Jun 2025 11:18:12 +0200 Subject: [PATCH] add websocket route --- controllers/booking.go | 25 ++++++++++++++++++++++++- go.mod | 1 + go.sum | 2 ++ infrastructure/prometheus.go | 35 ++++++++++++++++++----------------- 4 files changed, 45 insertions(+), 18 deletions(-) diff --git a/controllers/booking.go b/controllers/booking.go index aedfbad..e385f37 100644 --- a/controllers/booking.go +++ b/controllers/booking.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "net/http" "oc-datacenter/infrastructure" "time" @@ -13,6 +14,7 @@ import ( b "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/tools" beego "github.com/beego/beego/v2/server/web" + "github.com/gorilla/websocket" "go.mongodb.org/mongo-driver/bson/primitive" ) @@ -109,6 +111,27 @@ func (o *BookingController) Get() { o.ServeJSON() } +var upgrader = websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { return true }, // allow all origins +} + +// @Title Log +// @Description find booking by id +// @Param id path string true "the id you want to get" +// @Success 200 {booking} models.booking +// @router /:id [get] +func (o *BookingController) Log() { + user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) + id := o.Ctx.Input.Param(":id") + conn, err := upgrader.Upgrade(o.Ctx.ResponseWriter, o.Ctx.Request, nil) + if err != nil { + o.Ctx.WriteString("WebSocket upgrade failed: " + err.Error()) + return + } + defer conn.Close() + infrastructure.NewPrometheusService().Stream(id, 1*time.Second, user, peerID, groups, conn) +} + // @Title Update // @Description create computes // @Param id path string true "the compute id you want to get" @@ -284,7 +307,7 @@ func (o *BookingController) Post() { if b.Data.(*booking.Booking).ResourceType == tools.COMPUTE_RESOURCE { go func() { time.Sleep(time.Until(b.Data.(*booking.Booking).ExpectedStartDate)) - infrastructure.NewPrometheusService().Stream(b.Data.GetID(), b.Data.(*booking.Booking).ExpectedEndDate, 1*time.Second, nil, nil) + infrastructure.NewPrometheusService().Stream(b.Data.GetID(), 1*time.Second, user, peerID, groups, nil) }() } diff --git a/go.mod b/go.mod index f0f2801..c47cceb 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( require ( github.com/go-ole/go-ole v1.2.6 // indirect + github.com/gorilla/websocket v1.5.3 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect diff --git a/go.sum b/go.sum index 75a40a0..fb8d906 100644 --- a/go.sum +++ b/go.sum @@ -87,6 +87,8 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/goraz/onion v0.1.3 h1:KhyvbDA2b70gcz/d5izfwTiOH8SmrvV43AsVzpng3n0= github.com/goraz/onion v0.1.3/go.mod h1:XEmz1XoBz+wxTgWB8NwuvRm4RAu3vKxvrmYtzK+XCuQ= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= diff --git a/infrastructure/prometheus.go b/infrastructure/prometheus.go index 90e98c4..14111fa 100644 --- a/infrastructure/prometheus.go +++ b/infrastructure/prometheus.go @@ -16,6 +16,7 @@ import ( "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/common/models" "cloud.o-forge.io/core/oc-lib/models/compute_units" + "github.com/gorilla/websocket" ) type MetricsSnapshot struct { @@ -81,14 +82,14 @@ func (p *PrometheusService) queryPrometheus(promURL string, expr string, namespa return metric } -func (p *PrometheusService) Call(book *booking.Booking) (*booking.Booking, map[string]models.MetricsSnapshot) { +func (p *PrometheusService) Call(book *booking.Booking, user string, peerID string, groups []string) (*booking.Booking, map[string]models.MetricsSnapshot) { var wg sync.WaitGroup metrics := map[string]models.MetricsSnapshot{} // get all booking... from executions_id == namespace typed datacenter. - cUAccess := oclib.NewRequest(oclib.LibDataEnum(oclib.COMPUTE_UNITS), "", "", []string{}, nil) - cRAccess := oclib.NewRequest(oclib.LibDataEnum(oclib.COMPUTE_RESOURCE), "", "", []string{}, nil) + cUAccess := oclib.NewRequest(oclib.LibDataEnum(oclib.COMPUTE_UNITS), user, peerID, groups, nil) + cRAccess := oclib.NewRequest(oclib.LibDataEnum(oclib.COMPUTE_RESOURCE), user, peerID, groups, nil) rr := cRAccess.LoadOne(book.ResourceID) if rr.Err != "" { @@ -129,23 +130,24 @@ func (p *PrometheusService) Call(book *booking.Booking) (*booking.Booking, map[s var LockKill = &sync.Mutex{} -func (p *PrometheusService) Stream(bookingID string, end *time.Time, interval time.Duration, flusher *http.Flusher, encoder *json.Encoder) { - f := func(e *time.Time) bool { - if end == nil { - return true - } - return time.Now().Before(*e) - } +// TODO kill procedure +func (p *PrometheusService) Stream(bookingID string, interval time.Duration, user string, peerID string, groups []string, websocket *websocket.Conn) { + max := 100 bookIDS := []string{} mets := map[string][]models.MetricsSnapshot{} - bAccess := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", "", []string{}, nil) + bAccess := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), user, peerID, groups, nil) book := bAccess.LoadOne(bookingID) if book.Err != "" { fmt.Errorf("stop because of empty : %s", book.Err) } - - for f(end) { + f := func(e *booking.Booking) bool { + if e.ExpectedEndDate == nil { + return true + } + return time.Now().Before(*e.ExpectedEndDate) + } + for f(book.Data.(*booking.Booking)) { if slices.Contains(Kill, book.Data.(*booking.Booking).ExecutionsID) { newKill := []string{} for _, k := range Kill { @@ -160,7 +162,7 @@ func (p *PrometheusService) Stream(bookingID string, end *time.Time, interval ti } go func() { - book, metrics := p.Call(book.Data.(*booking.Booking)) + book, metrics := p.Call(book.Data.(*booking.Booking), user, peerID, groups) for k, v := range metrics { if me, ok := mets[k]; !ok { mets[k] = []models.MetricsSnapshot{v} @@ -170,9 +172,8 @@ func (p *PrometheusService) Stream(bookingID string, end *time.Time, interval ti } } bookIDS = append(bookIDS, bookingID) - if flusher != nil { - encoder.Encode(metrics) - (*flusher).Flush() + if websocket != nil { + (*websocket).WriteJSON(metrics) } if len(bookIDS) != max { return