transformed the loop that posted the booking on oc-datacenter to a threaded operation where each call is done in a goroutine

This commit is contained in:
pb 2025-05-27 11:58:55 +02:00
parent 66fc3c5b35
commit 0d96cc53bf

View File

@ -55,7 +55,7 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W
} }
func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) { func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) {
l := logs.GetLogger().With().Str("SchedulerID",ws.UUID).Logger() l := logs.GetLogger().With().Str("SchedulerID", ws.UUID).Logger()
l.Debug().Msg("Checking booking") l.Debug().Msg("Checking booking")
if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" {
return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined") return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined")
@ -88,9 +88,9 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
errCh := make(chan error, len(bookings)) errCh := make(chan error, len(bookings))
var m sync.Mutex var m sync.Mutex
for _, b := range bookings { for _, b := range bookings {
go getBooking(l, b, request, wf, execs, bookings, errCh, &m) go getBooking(l, b, request, wf, execs, bookings, errCh, &m)
} }
for i := 0; i < len(bookings); i++ { for i := 0; i < len(bookings); i++ {
@ -103,18 +103,15 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
} }
func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) { func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) {
m.Lock() m.Lock()
// deep copy du caller c, err := getCallerCopy(request, errCh)
var c tools.HTTPCaller if err != nil {
err := request.Caller.DeepCopy(c)
if err != nil{
errCh <- err errCh <- err
return return
} }
c.URLS = request.Caller.URLS
m.Unlock() m.Unlock()
// Delock
bl := l.With().Str("booking", b.UUID).Logger() bl := l.With().Str("booking", b.UUID).Logger()
meth := request.Caller.URLS[tools.BOOKING][tools.GET] meth := request.Caller.URLS[tools.BOOKING][tools.GET]
meth = strings.ReplaceAll(meth, ":id", b.ResourceID) meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
@ -132,6 +129,17 @@ func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest,
errCh <- nil errCh <- nil
} }
func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) {
var c tools.HTTPCaller
err := request.Caller.DeepCopy(c)
if err != nil {
errCh <- err
return tools.HTTPCaller{}, nil
}
c.URLS = request.Caller.URLS
return c, err
}
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) {
if request == nil { if request == nil {
return ws, nil, []*WorkflowExecution{}, errors.New("no request found") return ws, nil, []*WorkflowExecution{}, errors.New("no request found")
@ -150,16 +158,20 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
} }
ws.Workflow = wf ws.Workflow = wf
var errCh = make(chan error, len(bookings))
var m sync.Mutex
for _, booking := range bookings { for _, booking := range bookings {
l := logs.GetLogger().With().Str("SchedulerID",ws.UUID).Logger() go ws.BookExecs(booking, request, errCh, &m)
l.Debug().Msg("Booking " + booking.UUID + " on " + booking.DestPeerID)
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", for i := 0; i < len(bookings); i++ {
tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) if err := <- errCh ; err != nil {
l.Debug().Msg("Received answer for booking " + booking.UUID + " on " + booking.DestPeerID) return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
if err != nil { }
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
} }
} }
fmt.Println("Schedules") fmt.Println("Schedules")
for _, exec := range executions { for _, exec := range executions {
err := exec.PurgeDraft(request) err := exec.PurgeDraft(request)
@ -173,6 +185,30 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
return ws, wf, executions, nil return ws, wf, executions, nil
} }
func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
m.Lock()
c, err := getCallerCopy(request, errCh)
if err != nil {
errCh <- err
return
}
m.Unlock()
l := logs.GetLogger().With().Str("SchedulerID", ws.UUID).Logger()
l.Debug().Msg("Booking " + booking.UUID + " on " + booking.DestPeerID)
_, err = (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), &c)
l.Debug().Msg("Received answer for booking " + booking.UUID + " on " + booking.DestPeerID)
if err != nil {
errCh <- err
return
}
errCh <- nil
}
/* /*
BOOKING IMPLIED TIME, not of subscription but of execution BOOKING IMPLIED TIME, not of subscription but of execution
so is processing time execution time applied on computes so is processing time execution time applied on computes