modified HTTP caller to have a DeepCopy() method in order to parallelize calls without race conditions
This commit is contained in:
parent
901622fee0
commit
fec23b4acd
@ -4,6 +4,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"cloud.o-forge.io/core/oc-lib/logs"
|
"cloud.o-forge.io/core/oc-lib/logs"
|
||||||
@ -86,9 +87,10 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
|
|||||||
}
|
}
|
||||||
|
|
||||||
errCh := make(chan error, len(bookings))
|
errCh := make(chan error, len(bookings))
|
||||||
|
var m sync.Mutex
|
||||||
|
|
||||||
for _, b := range bookings {
|
for _, b := range bookings {
|
||||||
go getBooking(l, b, request, wf, execs, bookings, errCh)
|
go getBooking(l, b, request, wf, execs, bookings, errCh, m)
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < len(bookings); i++ {
|
for i := 0; i < len(bookings); i++ {
|
||||||
@ -100,7 +102,17 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
|
|||||||
return true, wf, execs, bookings, nil
|
return true, wf, execs, bookings, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error) {
|
func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m sync.Mutex) {
|
||||||
|
|
||||||
|
m.Lock()
|
||||||
|
// deep copy du caller
|
||||||
|
var c tools.HTTPCaller
|
||||||
|
err := request.Caller.DeepCopy(c)
|
||||||
|
if err != nil{
|
||||||
|
errCh <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Delock
|
||||||
bl := l.With().Str("booking", b.UUID).Logger()
|
bl := l.With().Str("booking", b.UUID).Logger()
|
||||||
meth := request.Caller.URLS[tools.BOOKING][tools.GET]
|
meth := request.Caller.URLS[tools.BOOKING][tools.GET]
|
||||||
meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
|
meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
|
||||||
@ -108,7 +120,7 @@ func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest,
|
|||||||
meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05"))
|
meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05"))
|
||||||
request.Caller.URLS[tools.BOOKING][tools.GET] = meth
|
request.Caller.URLS[tools.BOOKING][tools.GET] = meth
|
||||||
bl.Debug().Msg("Get booking " + b.UUID + " on " + b.DestPeerID)
|
bl.Debug().Msg("Get booking " + b.UUID + " on " + b.DestPeerID)
|
||||||
_, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller)
|
_, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c)
|
||||||
bl.Debug().Msg("Received response from Get booking " + b.UUID + " on " + b.DestPeerID)
|
bl.Debug().Msg("Received response from Get booking " + b.UUID + " on " + b.DestPeerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errCh <- err
|
errCh <- err
|
||||||
|
@ -63,6 +63,16 @@ func NewHTTPCaller(urls map[DataType]map[METHOD]string) *HTTPCaller {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Creates a copy of the current caller, in order to have parallelized executions without race condition
|
||||||
|
func (c* HTTPCaller) DeepCopy(dst HTTPCaller) error {
|
||||||
|
bytes, err := json.Marshal(c)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return json.Unmarshal(bytes, &dst)
|
||||||
|
}
|
||||||
|
|
||||||
// CallGet calls the GET method on the HTTP server
|
// CallGet calls the GET method on the HTTP server
|
||||||
func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) ([]byte, error) {
|
func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) ([]byte, error) {
|
||||||
req, err := http.NewRequest(http.MethodGet, url+subpath, bytes.NewBuffer([]byte("")))
|
req, err := http.NewRequest(http.MethodGet, url+subpath, bytes.NewBuffer([]byte("")))
|
||||||
|
Loading…
Reference in New Issue
Block a user