launch streaming to update booking

This commit is contained in:
mr 2025-06-18 09:14:30 +02:00
parent f61c9f5df9
commit c0b8ac1eee
2 changed files with 44 additions and 34 deletions

View File

@ -281,6 +281,12 @@ func (o *BookingController) Post() {
o.ServeJSON() o.ServeJSON()
return return
} }
if b.Data.(*booking.Booking).ResourceType == tools.COMPUTE_RESOURCE {
go func() {
time.Sleep(time.Until(b.Data.(*booking.Booking).ExpectedStartDate))
infrastructure.NewPrometheusService().Stream(b.Data.GetID(), b.Data.(*booking.Booking).ExpectedEndDate, 1*time.Second, nil, nil)
}()
}
if err := o.createNamespace(resp.ExecutionsID); err != nil { if err := o.createNamespace(resp.ExecutionsID); err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())

View File

@ -131,46 +131,50 @@ func (p *PrometheusService) Call(bookingID string) (*booking.Booking, map[string
return book.Data.(*booking.Booking), metrics return book.Data.(*booking.Booking), metrics
} }
func (p *PrometheusService) Stream(bookingID string, end *time.Time, interval time.Duration, flusher http.Flusher, encoder *json.Encoder) { func (p *PrometheusService) Stream(bookingID string, end *time.Time, interval time.Duration, flusher *http.Flusher, encoder *json.Encoder) {
e := time.Now().UTC().Add(time.Hour * 1)
if end != nil { if end != nil {
max := 100 e = (*end).UTC()
bookIDS := []string{} }
mets := map[string][]models.MetricsSnapshot{} max := 100
for time.Now().Before(*end) { bookIDS := []string{}
go func() { mets := map[string][]models.MetricsSnapshot{}
book, metrics := p.Call(bookingID) for time.Now().Before(e) {
for k, v := range metrics { go func() {
if me, ok := mets[k]; !ok { book, metrics := p.Call(bookingID)
mets[k] = []models.MetricsSnapshot{v} for k, v := range metrics {
} else { if me, ok := mets[k]; !ok {
me = append(me, v) mets[k] = []models.MetricsSnapshot{v}
mets[k] = me } else {
} me = append(me, v)
mets[k] = me
} }
bookIDS = append(bookIDS, bookingID) }
bookIDS = append(bookIDS, bookingID)
if flusher != nil {
encoder.Encode(metrics) encoder.Encode(metrics)
flusher.Flush() (*flusher).Flush()
if len(bookIDS) == max { }
if book.ExecutionMetrics == nil { if len(bookIDS) != max {
book.ExecutionMetrics = mets return
}
if book.ExecutionMetrics == nil {
book.ExecutionMetrics = mets
} else {
for kk, vv := range mets {
if em, ok := book.ExecutionMetrics[kk]; !ok {
book.ExecutionMetrics[kk] = vv
} else { } else {
for kk, vv := range mets { em = append(em, vv...)
if em, ok := book.ExecutionMetrics[kk]; !ok { book.ExecutionMetrics[kk] = em
book.ExecutionMetrics[kk] = vv
} else {
em = append(em, vv...)
book.ExecutionMetrics[kk] = em
}
}
} }
book.GetAccessor(nil).UpdateOne(book, bookingID)
bookIDS = []string{}
} }
}() }
time.Sleep(interval) book.GetAccessor(nil).UpdateOne(book, bookingID)
} bookIDS = []string{}
} else {
// todo an anchor... detecting the end of a task. }()
time.Sleep(interval)
} }
} }