From adbab0f5d7c9000c4a444ad7c9c00b7debe9cb76 Mon Sep 17 00:00:00 2001 From: pb Date: Wed, 30 Apr 2025 16:13:49 +0200 Subject: [PATCH 01/23] added more info on error returned by LaunchPeerExecution() --- models/peer/peer_cache.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/models/peer/peer_cache.go b/models/peer/peer_cache.go index 789c0e1..c57e843 100644 --- a/models/peer/peer_cache.go +++ b/models/peer/peer_cache.go @@ -93,10 +93,10 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string, } mypeer.AddExecution(*pexec) NewShallowAccessor().UpdateOne(mypeer, peerID) // Update the peer in the db - return nil, errors.New("peer is not reachable") + return nil, errors.New("peer is " + peerID + " not reachable") } else { if mypeer == nil { - return nil, errors.New("peer not found") + return nil, errors.New("peer " + peerID + " not found") } // If the peer is reachable, launch the execution url = p.urlFormat((mypeer.Url), dt) + path // Format the URL From 1499def6ad863b89ff32c7348905a9c86500c1fa Mon Sep 17 00:00:00 2001 From: pb Date: Wed, 7 May 2025 14:54:31 +0200 Subject: [PATCH 02/23] added booking on the computing and data resource's peers --- models/workflow_execution/workflow_execution.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/models/workflow_execution/workflow_execution.go b/models/workflow_execution/workflow_execution.go index fdc21ea..014bc9c 100644 --- a/models/workflow_execution/workflow_execution.go +++ b/models/workflow_execution/workflow_execution.go @@ -112,6 +112,8 @@ func (d *WorkflowExecution) VerifyAuth(request *tools.APIRequest) bool { func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[tools.DataType]map[string]pricing.PricedItemITF) []*booking.Booking { booking := d.bookEach(executionsID, wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE]) booking = append(booking, d.bookEach(executionsID, wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...) + booking = append(booking,d.bookEach(executionsID, wfID, tools.COMPUTE_RESOURCE, priceds[tools.COMPUTE_RESOURCE])...) + booking = append(booking,d.bookEach(executionsID, wfID, tools.DATA_RESOURCE, priceds[tools.DATA_RESOURCE])...) return booking } From 41ebcf150a2e1480adcd6690e7d72c19ac45a52e Mon Sep 17 00:00:00 2001 From: pb Date: Wed, 7 May 2025 18:16:38 +0200 Subject: [PATCH 03/23] added logging when booking --- models/workflow_execution/workflow_scheduler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index d32fd49..0f314b1 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -6,6 +6,7 @@ import ( "strings" "time" + "cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/peer" @@ -112,6 +113,8 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* } ws.Workflow = wf for _, booking := range bookings { + l := logs.GetLogger() + l.Info().Msg("Booking on " + booking.DestPeerID) _, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) if err != nil { From b57f050b81103fa38299072b5c69231f2d6765b3 Mon Sep 17 00:00:00 2001 From: pb Date: Thu, 22 May 2025 14:41:38 +0200 Subject: [PATCH 04/23] increased the limit of returns by Mongo find() --- dbs/mongo/mongo.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbs/mongo/mongo.go b/dbs/mongo/mongo.go index 3bf9055..76f0877 100644 --- a/dbs/mongo/mongo.go +++ b/dbs/mongo/mongo.go @@ -287,7 +287,7 @@ func (m *MongoDB) Search(filters *dbs.Filters, collection_name string) (*mongo.C return nil, 503, err } opts := options.Find() - opts.SetLimit(100) + opts.SetLimit(1000) targetDBCollection := CollectionMap[collection_name] orList := bson.A{} andList := bson.A{} From 522c66653b821f5c2cf3b063bfa2070b27b8d86c Mon Sep 17 00:00:00 2001 From: pb Date: Mon, 26 May 2025 17:22:09 +0200 Subject: [PATCH 05/23] Added logging for debug --- models/workflow_execution/workflow_scheduler.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index 0f314b1..f41a23b 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -53,6 +53,8 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W } func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) { + l := logs.GetLogger().With().Str("SchedulerID",ws.UUID).Logger() + l.Debug().Msg("Checking booking") if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined") } @@ -71,12 +73,14 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) { ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n" } + l.Debug().Msg("Getting executions") execs, err := ws.getExecutions(wf) if err != nil { return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err } bookings := []*booking.Booking{} for _, exec := range execs { + l.Debug().Msg("looping throughs execs") bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) for _, b := range bookings { meth := request.Caller.URLS[tools.BOOKING][tools.GET] @@ -84,7 +88,9 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) request.Caller.URLS[tools.BOOKING][tools.GET] = meth + l.Debug().Msg("Get booking on" + b.DestPeerID) _, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller) + l.Debug().Msg("Received response from Get booking on" + b.DestPeerID) if err != nil { return false, wf, execs, bookings, err } @@ -113,10 +119,11 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* } ws.Workflow = wf for _, booking := range bookings { - l := logs.GetLogger() + l := logs.GetLogger().With().Str("SchedulerID",ws.UUID).Logger() l.Info().Msg("Booking on " + booking.DestPeerID) _, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) + l.Debug().Msg("Received answer for booking on " + booking.DestPeerID) if err != nil { return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) } From 2a763006db1bb605d3c1c0a2c03e0a383ef293b6 Mon Sep 17 00:00:00 2001 From: pb Date: Mon, 26 May 2025 17:41:26 +0200 Subject: [PATCH 06/23] counting round in exec --- models/workflow_execution/workflow_scheduler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index f41a23b..33743cf 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -79,8 +79,8 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err } bookings := []*booking.Booking{} - for _, exec := range execs { - l.Debug().Msg("looping throughs execs") + for i, exec := range execs { + l.Debug().Msg("looping throughs execs : " + string(i)) bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) for _, b := range bookings { meth := request.Caller.URLS[tools.BOOKING][tools.GET] @@ -90,7 +90,7 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) request.Caller.URLS[tools.BOOKING][tools.GET] = meth l.Debug().Msg("Get booking on" + b.DestPeerID) _, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller) - l.Debug().Msg("Received response from Get booking on" + b.DestPeerID) + l.Debug().Msg("Received response from Get booking on " + b.DestPeerID) if err != nil { return false, wf, execs, bookings, err } From 370dac201b43a7cec83bd811344aba1d00252254 Mon Sep 17 00:00:00 2001 From: pb Date: Mon, 26 May 2025 17:55:45 +0200 Subject: [PATCH 07/23] In CheckBooking mooved the loop on bookings outside of the loop of execs, which seems to repeat the Peer execution on booking an exponential number of time --- .../workflow_execution/workflow_scheduler.go | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index 33743cf..eaf983c 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -82,18 +82,18 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) for i, exec := range execs { l.Debug().Msg("looping throughs execs : " + string(i)) bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) - for _, b := range bookings { - meth := request.Caller.URLS[tools.BOOKING][tools.GET] - meth = strings.ReplaceAll(meth, ":id", b.ResourceID) - meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) - meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) - request.Caller.URLS[tools.BOOKING][tools.GET] = meth - l.Debug().Msg("Get booking on" + b.DestPeerID) - _, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller) - l.Debug().Msg("Received response from Get booking on " + b.DestPeerID) - if err != nil { - return false, wf, execs, bookings, err - } + } + for _, b := range bookings { + meth := request.Caller.URLS[tools.BOOKING][tools.GET] + meth = strings.ReplaceAll(meth, ":id", b.ResourceID) + meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) + meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) + request.Caller.URLS[tools.BOOKING][tools.GET] = meth + l.Debug().Msg("Get booking on" + b.DestPeerID) + _, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller) + l.Debug().Msg("Received response from Get booking on " + b.DestPeerID) + if err != nil { + return false, wf, execs, bookings, err } } From 58dc579255f04e736c3e5d4eb352aa5eaee62833 Mon Sep 17 00:00:00 2001 From: pb Date: Mon, 26 May 2025 18:30:56 +0200 Subject: [PATCH 08/23] added debug logging --- models/workflow_execution/workflow_scheduler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index eaf983c..7ec29e2 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -84,14 +84,15 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) } for _, b := range bookings { + bl := l.With().Str("booking",b.UUID).Logger() meth := request.Caller.URLS[tools.BOOKING][tools.GET] meth = strings.ReplaceAll(meth, ":id", b.ResourceID) meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) request.Caller.URLS[tools.BOOKING][tools.GET] = meth - l.Debug().Msg("Get booking on" + b.DestPeerID) + bl.Debug().Msg("Get booking on" + b.DestPeerID) _, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller) - l.Debug().Msg("Received response from Get booking on " + b.DestPeerID) + bl.Debug().Msg("Received response from Get booking on " + b.DestPeerID) if err != nil { return false, wf, execs, bookings, err } From 1ade41aeae84563e8748d26d76abe78dbe8cecf7 Mon Sep 17 00:00:00 2001 From: pb Date: Mon, 26 May 2025 19:05:17 +0200 Subject: [PATCH 09/23] moved the code that execute the booking into a separated function so that it can be launched as goroutine and parallelize get booking$ --- .../workflow_execution/workflow_scheduler.go | 36 +++++++++++++------ 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index 7ec29e2..8b9ca26 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -15,6 +15,7 @@ import ( "cloud.o-forge.io/core/oc-lib/tools" "github.com/google/uuid" "github.com/robfig/cron" + "github.com/rs/zerolog" ) /* @@ -83,24 +84,39 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) l.Debug().Msg("looping throughs execs : " + string(i)) bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) } + + errCh := make(chan error, len(bookings)) + for _, b := range bookings { - bl := l.With().Str("booking",b.UUID).Logger() - meth := request.Caller.URLS[tools.BOOKING][tools.GET] - meth = strings.ReplaceAll(meth, ":id", b.ResourceID) - meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) - meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) - request.Caller.URLS[tools.BOOKING][tools.GET] = meth - bl.Debug().Msg("Get booking on" + b.DestPeerID) - _, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller) - bl.Debug().Msg("Received response from Get booking on " + b.DestPeerID) + go getBooking(l, b, request, wf, execs, bookings, errCh) + } + + for err := range(errCh){ if err != nil { return false, wf, execs, bookings, err } - } + return true, wf, execs, bookings, nil } +func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error) { + bl := l.With().Str("booking", b.UUID).Logger() + meth := request.Caller.URLS[tools.BOOKING][tools.GET] + meth = strings.ReplaceAll(meth, ":id", b.ResourceID) + meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) + meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) + request.Caller.URLS[tools.BOOKING][tools.GET] = meth + bl.Debug().Msg("Get booking " + b.UUID + "on" + b.DestPeerID) + _, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller) + bl.Debug().Msg("Received response from Get booking on " + b.DestPeerID) + if err != nil { + errCh <- err + } + + errCh <- nil +} + func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { if request == nil { return ws, nil, []*WorkflowExecution{}, errors.New("no request found") From 7223b79fe8ceab1563f833edee1d19578c999a49 Mon Sep 17 00:00:00 2001 From: pb Date: Mon, 26 May 2025 19:16:39 +0200 Subject: [PATCH 10/23] correct the error channel --- models/workflow_execution/workflow_scheduler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index 8b9ca26..91888e3 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -112,6 +112,7 @@ func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, bl.Debug().Msg("Received response from Get booking on " + b.DestPeerID) if err != nil { errCh <- err + return } errCh <- nil From 527e622774167a65236219f3ca4d6f805cae61a3 Mon Sep 17 00:00:00 2001 From: pb Date: Mon, 26 May 2025 19:21:28 +0200 Subject: [PATCH 11/23] correct the error channel --- models/workflow_execution/workflow_scheduler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index 91888e3..b1c6b38 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -91,8 +91,8 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) go getBooking(l, b, request, wf, execs, bookings, errCh) } - for err := range(errCh){ - if err != nil { + for i := 0; i < len(bookings); i++ { + if err := <-errCh; err != nil { return false, wf, execs, bookings, err } } From 901622fee0e16ef34bc3c8b08c63bbabe3a8918c Mon Sep 17 00:00:00 2001 From: pb Date: Tue, 27 May 2025 09:51:41 +0200 Subject: [PATCH 12/23] logging on the booking uuid before the post booking --- models/workflow_execution/workflow_scheduler.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index b1c6b38..e7bc20e 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -107,9 +107,9 @@ func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) request.Caller.URLS[tools.BOOKING][tools.GET] = meth - bl.Debug().Msg("Get booking " + b.UUID + "on" + b.DestPeerID) + bl.Debug().Msg("Get booking " + b.UUID + " on " + b.DestPeerID) _, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller) - bl.Debug().Msg("Received response from Get booking on " + b.DestPeerID) + bl.Debug().Msg("Received response from Get booking " + b.UUID + " on " + b.DestPeerID) if err != nil { errCh <- err return @@ -138,10 +138,10 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* ws.Workflow = wf for _, booking := range bookings { l := logs.GetLogger().With().Str("SchedulerID",ws.UUID).Logger() - l.Info().Msg("Booking on " + booking.DestPeerID) + l.Debug().Msg("Booking " + booking.UUID + " on " + booking.DestPeerID) _, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) - l.Debug().Msg("Received answer for booking on " + booking.DestPeerID) + l.Debug().Msg("Received answer for booking " + booking.UUID + " on " + booking.DestPeerID) if err != nil { return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) } From fec23b4acdc513aeb8bada35c8fb530c5565b198 Mon Sep 17 00:00:00 2001 From: pb Date: Tue, 27 May 2025 11:08:35 +0200 Subject: [PATCH 13/23] modified HTTP caller to have a DeepCopy() method in order to parallelize calls without race conditions --- .../workflow_execution/workflow_scheduler.go | 18 +++++++++++++++--- tools/remote_caller.go | 10 ++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index e7bc20e..e08f960 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "cloud.o-forge.io/core/oc-lib/logs" @@ -86,9 +87,10 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) } errCh := make(chan error, len(bookings)) + var m sync.Mutex for _, b := range bookings { - go getBooking(l, b, request, wf, execs, bookings, errCh) + go getBooking(l, b, request, wf, execs, bookings, errCh, m) } for i := 0; i < len(bookings); i++ { @@ -100,7 +102,17 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) return true, wf, execs, bookings, nil } -func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error) { +func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m sync.Mutex) { + + m.Lock() + // deep copy du caller + var c tools.HTTPCaller + err := request.Caller.DeepCopy(c) + if err != nil{ + errCh <- err + return + } + // Delock bl := l.With().Str("booking", b.UUID).Logger() meth := request.Caller.URLS[tools.BOOKING][tools.GET] meth = strings.ReplaceAll(meth, ":id", b.ResourceID) @@ -108,7 +120,7 @@ func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) request.Caller.URLS[tools.BOOKING][tools.GET] = meth bl.Debug().Msg("Get booking " + b.UUID + " on " + b.DestPeerID) - _, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller) + _, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c) bl.Debug().Msg("Received response from Get booking " + b.UUID + " on " + b.DestPeerID) if err != nil { errCh <- err diff --git a/tools/remote_caller.go b/tools/remote_caller.go index 96fae94..c90ab0a 100644 --- a/tools/remote_caller.go +++ b/tools/remote_caller.go @@ -63,6 +63,16 @@ func NewHTTPCaller(urls map[DataType]map[METHOD]string) *HTTPCaller { } } +// Creates a copy of the current caller, in order to have parallelized executions without race condition +func (c* HTTPCaller) DeepCopy(dst HTTPCaller) error { + bytes, err := json.Marshal(c) + if err != nil { + return err + } + + return json.Unmarshal(bytes, &dst) +} + // CallGet calls the GET method on the HTTP server func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) ([]byte, error) { req, err := http.NewRequest(http.MethodGet, url+subpath, bytes.NewBuffer([]byte(""))) From 5ab3eb8a386f500fb49fa5e76cc71195e8b39fa4 Mon Sep 17 00:00:00 2001 From: pb Date: Tue, 27 May 2025 11:17:10 +0200 Subject: [PATCH 14/23] forgot to pass the mutex as pointer and unlock it --- models/workflow_execution/workflow_scheduler.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index e08f960..7206d0f 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -90,7 +90,7 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) var m sync.Mutex for _, b := range bookings { - go getBooking(l, b, request, wf, execs, bookings, errCh, m) + go getBooking(l, b, request, wf, execs, bookings, errCh, &m) } for i := 0; i < len(bookings); i++ { @@ -102,7 +102,7 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) return true, wf, execs, bookings, nil } -func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m sync.Mutex) { +func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) { m.Lock() // deep copy du caller @@ -112,6 +112,7 @@ func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, errCh <- err return } + m.Unlock() // Delock bl := l.With().Str("booking", b.UUID).Logger() meth := request.Caller.URLS[tools.BOOKING][tools.GET] From 66fc3c5b355487d20266dbc88197e0d4b153331f Mon Sep 17 00:00:00 2001 From: pb Date: Tue, 27 May 2025 11:34:44 +0200 Subject: [PATCH 15/23] added the passing of the request.Caller's URL to the deep copy --- models/workflow_execution/workflow_scheduler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index 7206d0f..087086f 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -112,6 +112,7 @@ func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, errCh <- err return } + c.URLS = request.Caller.URLS m.Unlock() // Delock bl := l.With().Str("booking", b.UUID).Logger() From 0d96cc53bf894dd281ea719301e76bad18c430a4 Mon Sep 17 00:00:00 2001 From: pb Date: Tue, 27 May 2025 11:58:55 +0200 Subject: [PATCH 16/23] transformed the loop that posted the booking on oc-datacenter to a threaded operation where each call is done in a goroutine --- .../workflow_execution/workflow_scheduler.go | 70 ++++++++++++++----- 1 file changed, 53 insertions(+), 17 deletions(-) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index 087086f..16ee325 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -55,7 +55,7 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W } func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) { - l := logs.GetLogger().With().Str("SchedulerID",ws.UUID).Logger() + l := logs.GetLogger().With().Str("SchedulerID", ws.UUID).Logger() l.Debug().Msg("Checking booking") if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined") @@ -88,9 +88,9 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) errCh := make(chan error, len(bookings)) var m sync.Mutex - + for _, b := range bookings { - go getBooking(l, b, request, wf, execs, bookings, errCh, &m) + go getBooking(l, b, request, wf, execs, bookings, errCh, &m) } for i := 0; i < len(bookings); i++ { @@ -103,18 +103,15 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) } func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) { - + m.Lock() - // deep copy du caller - var c tools.HTTPCaller - err := request.Caller.DeepCopy(c) - if err != nil{ + c, err := getCallerCopy(request, errCh) + if err != nil { errCh <- err return } - c.URLS = request.Caller.URLS m.Unlock() - // Delock + bl := l.With().Str("booking", b.UUID).Logger() meth := request.Caller.URLS[tools.BOOKING][tools.GET] meth = strings.ReplaceAll(meth, ":id", b.ResourceID) @@ -132,6 +129,17 @@ func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, errCh <- nil } +func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) { + var c tools.HTTPCaller + err := request.Caller.DeepCopy(c) + if err != nil { + errCh <- err + return tools.HTTPCaller{}, nil + } + c.URLS = request.Caller.URLS + return c, err +} + func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { if request == nil { return ws, nil, []*WorkflowExecution{}, errors.New("no request found") @@ -150,16 +158,20 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) } ws.Workflow = wf + + var errCh = make(chan error, len(bookings)) + var m sync.Mutex + for _, booking := range bookings { - l := logs.GetLogger().With().Str("SchedulerID",ws.UUID).Logger() - l.Debug().Msg("Booking " + booking.UUID + " on " + booking.DestPeerID) - _, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", - tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) - l.Debug().Msg("Received answer for booking " + booking.UUID + " on " + booking.DestPeerID) - if err != nil { - return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) + go ws.BookExecs(booking, request, errCh, &m) + + for i := 0; i < len(bookings); i++ { + if err := <- errCh ; err != nil { + return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) + } } } + fmt.Println("Schedules") for _, exec := range executions { err := exec.PurgeDraft(request) @@ -173,6 +185,30 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* return ws, wf, executions, nil } +func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) { + + m.Lock() + c, err := getCallerCopy(request, errCh) + if err != nil { + errCh <- err + return + } + m.Unlock() + + l := logs.GetLogger().With().Str("SchedulerID", ws.UUID).Logger() + l.Debug().Msg("Booking " + booking.UUID + " on " + booking.DestPeerID) + _, err = (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", + tools.BOOKING, tools.POST, booking.Serialize(booking), &c) + l.Debug().Msg("Received answer for booking " + booking.UUID + " on " + booking.DestPeerID) + + if err != nil { + errCh <- err + return + } + + errCh <- nil +} + /* BOOKING IMPLIED TIME, not of subscription but of execution so is processing time execution time applied on computes From cd7ae788b16a57dd0b040e3aaacfa51c1ad94cc9 Mon Sep 17 00:00:00 2001 From: pb Date: Tue, 27 May 2025 12:06:10 +0200 Subject: [PATCH 17/23] didn't put the blocking loop in the right place for post booking --- models/workflow_execution/workflow_scheduler.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index 16ee325..d956092 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -164,11 +164,11 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* for _, booking := range bookings { go ws.BookExecs(booking, request, errCh, &m) - - for i := 0; i < len(bookings); i++ { - if err := <- errCh ; err != nil { - return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) - } + } + + for i := 0; i < len(bookings); i++ { + if err := <- errCh ; err != nil { + return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) } } From da0de80afdd825c9ea214f6700fd6fd7d8f9df56 Mon Sep 17 00:00:00 2001 From: pb Date: Tue, 27 May 2025 15:38:24 +0200 Subject: [PATCH 18/23] Booking check and booking post have been transformed in goroutine to improve performance when booking several execution with cron expressions --- models/workflow_execution/workflow_scheduler.go | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index d956092..68b27cf 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -7,7 +7,6 @@ import ( "sync" "time" - "cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/peer" @@ -55,8 +54,6 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W } func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) { - l := logs.GetLogger().With().Str("SchedulerID", ws.UUID).Logger() - l.Debug().Msg("Checking booking") if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined") } @@ -75,14 +72,12 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) { ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n" } - l.Debug().Msg("Getting executions") execs, err := ws.getExecutions(wf) if err != nil { return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err } bookings := []*booking.Booking{} - for i, exec := range execs { - l.Debug().Msg("looping throughs execs : " + string(i)) + for _, exec := range execs { bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) } @@ -112,15 +107,13 @@ func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, } m.Unlock() - bl := l.With().Str("booking", b.UUID).Logger() meth := request.Caller.URLS[tools.BOOKING][tools.GET] meth = strings.ReplaceAll(meth, ":id", b.ResourceID) meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) request.Caller.URLS[tools.BOOKING][tools.GET] = meth - bl.Debug().Msg("Get booking " + b.UUID + " on " + b.DestPeerID) _, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c) - bl.Debug().Msg("Received response from Get booking " + b.UUID + " on " + b.DestPeerID) + if err != nil { errCh <- err return @@ -195,11 +188,8 @@ func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.A } m.Unlock() - l := logs.GetLogger().With().Str("SchedulerID", ws.UUID).Logger() - l.Debug().Msg("Booking " + booking.UUID + " on " + booking.DestPeerID) _, err = (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", tools.BOOKING, tools.POST, booking.Serialize(booking), &c) - l.Debug().Msg("Received answer for booking " + booking.UUID + " on " + booking.DestPeerID) if err != nil { errCh <- err From 690d60f9d6c9f002f5af1a4c47448ef0af28f9b2 Mon Sep 17 00:00:00 2001 From: pb Date: Tue, 27 May 2025 16:13:46 +0200 Subject: [PATCH 19/23] corrected the getBooking function parameters --- models/workflow_execution/workflow_scheduler.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index 68b27cf..e4be90e 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -15,7 +15,6 @@ import ( "cloud.o-forge.io/core/oc-lib/tools" "github.com/google/uuid" "github.com/robfig/cron" - "github.com/rs/zerolog" ) /* @@ -85,7 +84,7 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) var m sync.Mutex for _, b := range bookings { - go getBooking(l, b, request, wf, execs, bookings, errCh, &m) + go getBooking(b, request, wf, execs, bookings, errCh, &m) } for i := 0; i < len(bookings); i++ { @@ -97,7 +96,7 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) return true, wf, execs, bookings, nil } -func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) { +func getBooking( b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) { m.Lock() c, err := getCallerCopy(request, errCh) From 6ab6383144e56f31516537e4bbbe38f811ac62f0 Mon Sep 17 00:00:00 2001 From: pb Date: Tue, 27 May 2025 18:07:00 +0200 Subject: [PATCH 20/23] corrected an use of the original http caller instead of the deep copy --- models/workflow_execution/workflow_scheduler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index e4be90e..23e87d7 100644 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -106,11 +106,11 @@ func getBooking( b *booking.Booking, request *tools.APIRequest, wf *workflow.Wor } m.Unlock() - meth := request.Caller.URLS[tools.BOOKING][tools.GET] + meth := c.URLS[tools.BOOKING][tools.GET] meth = strings.ReplaceAll(meth, ":id", b.ResourceID) meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) - request.Caller.URLS[tools.BOOKING][tools.GET] = meth + c.URLS[tools.BOOKING][tools.GET] = meth _, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c) if err != nil { From 7b8aa989f6ec0e6923a0c806470757f7b7053fe6 Mon Sep 17 00:00:00 2001 From: pb Date: Mon, 2 Jun 2025 18:22:08 +0200 Subject: [PATCH 21/23] added a log when an API is not reachable --- tools/api.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tools/api.go b/tools/api.go index e213ecd..08cb9ef 100644 --- a/tools/api.go +++ b/tools/api.go @@ -5,6 +5,7 @@ import ( "errors" "strings" + oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/dbs/mongo" beego "github.com/beego/beego/v2/server/web" @@ -145,6 +146,8 @@ func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error) var resp APIStatusResponse b, err := caller.CallGet("http://"+api.API()+":8080", "/oc/version/status") // Call the status endpoint of the remote API (standard OC status endpoint) if err != nil { + l := oclib.GetLogger() + l.Error().Msg(api.String() + " not reachable") state = REDUCED_SERVICE // If a remote API is not reachable, return reduced service continue } From 03dea551315b317123fc580365fe084a0778a2a2 Mon Sep 17 00:00:00 2001 From: pb Date: Tue, 3 Jun 2025 10:00:47 +0200 Subject: [PATCH 22/23] added another error log when the status is dead --- tools/api.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tools/api.go b/tools/api.go index 08cb9ef..c4d17f7 100644 --- a/tools/api.go +++ b/tools/api.go @@ -136,6 +136,7 @@ func (a *API) CheckRemotePeer(url string) (State, map[string]int) { // CheckRemoteAPIs checks the state of remote APIs from your proper OC func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error) { // Check if the database is up + l := oclib.GetLogger() new := map[string]string{} caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller code := 0 @@ -146,7 +147,6 @@ func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error) var resp APIStatusResponse b, err := caller.CallGet("http://"+api.API()+":8080", "/oc/version/status") // Call the status endpoint of the remote API (standard OC status endpoint) if err != nil { - l := oclib.GetLogger() l.Error().Msg(api.String() + " not reachable") state = REDUCED_SERVICE // If a remote API is not reachable, return reduced service continue @@ -164,6 +164,7 @@ func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error) reachable = true // If the remote API is reachable, set reachable to true cause we are not dead } if !reachable { + l.Error().Msg("Peer check returned no answers") state = DEAD // If no remote API is reachable, return dead, nobody is alive } if code > 0 { From 387785b40cb0bfe792c0c7f19862c28aa9b6cfea Mon Sep 17 00:00:00 2001 From: pb Date: Wed, 4 Jun 2025 10:33:00 +0200 Subject: [PATCH 23/23] reimported logs without import cycle --- tools/api.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/api.go b/tools/api.go index c4d17f7..7b09cb0 100644 --- a/tools/api.go +++ b/tools/api.go @@ -5,9 +5,9 @@ import ( "errors" "strings" - oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/dbs/mongo" + "cloud.o-forge.io/core/oc-lib/logs" beego "github.com/beego/beego/v2/server/web" ) @@ -136,7 +136,7 @@ func (a *API) CheckRemotePeer(url string) (State, map[string]int) { // CheckRemoteAPIs checks the state of remote APIs from your proper OC func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error) { // Check if the database is up - l := oclib.GetLogger() + l := logs.GetLogger() new := map[string]string{} caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller code := 0