From 0d96cc53bf894dd281ea719301e76bad18c430a4 Mon Sep 17 00:00:00 2001 From: pb Date: Tue, 27 May 2025 11:58:55 +0200 Subject: [PATCH] transformed the loop that posted the booking on oc-datacenter to a threaded operation where each call is done in a goroutine --- .../workflow_execution/workflow_scheduler.go | 70 ++++++++++++++----- 1 file changed, 53 insertions(+), 17 deletions(-) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index 087086f..16ee325 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -55,7 +55,7 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W } func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) { - l := logs.GetLogger().With().Str("SchedulerID",ws.UUID).Logger() + l := logs.GetLogger().With().Str("SchedulerID", ws.UUID).Logger() l.Debug().Msg("Checking booking") if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined") @@ -88,9 +88,9 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) errCh := make(chan error, len(bookings)) var m sync.Mutex - + for _, b := range bookings { - go getBooking(l, b, request, wf, execs, bookings, errCh, &m) + go getBooking(l, b, request, wf, execs, bookings, errCh, &m) } for i := 0; i < len(bookings); i++ { @@ -103,18 +103,15 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) } func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) { - + m.Lock() - // deep copy du caller - var c tools.HTTPCaller - err := request.Caller.DeepCopy(c) - if err != nil{ + c, err := getCallerCopy(request, errCh) + if err != nil { errCh <- err return } - c.URLS = request.Caller.URLS m.Unlock() - // Delock + bl := l.With().Str("booking", b.UUID).Logger() meth := request.Caller.URLS[tools.BOOKING][tools.GET] meth = strings.ReplaceAll(meth, ":id", b.ResourceID) @@ -132,6 +129,17 @@ func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, errCh <- nil } +func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) { + var c tools.HTTPCaller + err := request.Caller.DeepCopy(c) + if err != nil { + errCh <- err + return tools.HTTPCaller{}, nil + } + c.URLS = request.Caller.URLS + return c, err +} + func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { if request == nil { return ws, nil, []*WorkflowExecution{}, errors.New("no request found") @@ -150,16 +158,20 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) } ws.Workflow = wf + + var errCh = make(chan error, len(bookings)) + var m sync.Mutex + for _, booking := range bookings { - l := logs.GetLogger().With().Str("SchedulerID",ws.UUID).Logger() - l.Debug().Msg("Booking " + booking.UUID + " on " + booking.DestPeerID) - _, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", - tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) - l.Debug().Msg("Received answer for booking " + booking.UUID + " on " + booking.DestPeerID) - if err != nil { - return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) + go ws.BookExecs(booking, request, errCh, &m) + + for i := 0; i < len(bookings); i++ { + if err := <- errCh ; err != nil { + return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) + } } } + fmt.Println("Schedules") for _, exec := range executions { err := exec.PurgeDraft(request) @@ -173,6 +185,30 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* return ws, wf, executions, nil } +func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) { + + m.Lock() + c, err := getCallerCopy(request, errCh) + if err != nil { + errCh <- err + return + } + m.Unlock() + + l := logs.GetLogger().With().Str("SchedulerID", ws.UUID).Logger() + l.Debug().Msg("Booking " + booking.UUID + " on " + booking.DestPeerID) + _, err = (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", + tools.BOOKING, tools.POST, booking.Serialize(booking), &c) + l.Debug().Msg("Received answer for booking " + booking.UUID + " on " + booking.DestPeerID) + + if err != nil { + errCh <- err + return + } + + errCh <- nil +} + /* BOOKING IMPLIED TIME, not of subscription but of execution so is processing time execution time applied on computes