20 Commits

Author SHA1 Message Date
pb
387785b40c reimported logs without import cycle 2025-06-04 10:33:00 +02:00
pb
03dea55131 added another error log when the status is dead 2025-06-03 10:00:47 +02:00
pb
7b8aa989f6 added a log when an API is not reachable 2025-06-02 18:22:08 +02:00
pb
6ab6383144 corrected an use of the original http caller instead of the deep copy 2025-05-27 18:07:00 +02:00
pb
690d60f9d6 corrected the getBooking function parameters 2025-05-27 16:13:46 +02:00
pb
da0de80afd Booking check and booking post have been transformed in goroutine to improve performance when booking several execution with cron expressions 2025-05-27 15:38:24 +02:00
pb
cd7ae788b1 didn't put the blocking loop in the right place for post booking 2025-05-27 12:06:10 +02:00
pb
0d96cc53bf transformed the loop that posted the booking on oc-datacenter to a threaded operation where each call is done in a goroutine 2025-05-27 11:58:55 +02:00
pb
66fc3c5b35 added the passing of the request.Caller's URL to the deep copy 2025-05-27 11:34:44 +02:00
pb
5ab3eb8a38 forgot to pass the mutex as pointer and unlock it 2025-05-27 11:17:10 +02:00
pb
fec23b4acd modified HTTP caller to have a DeepCopy() method in order to parallelize calls without race conditions 2025-05-27 11:08:35 +02:00
pb
901622fee0 logging on the booking uuid before the post booking 2025-05-27 09:51:41 +02:00
pb
527e622774 correct the error channel 2025-05-26 19:21:28 +02:00
pb
7223b79fe8 correct the error channel 2025-05-26 19:16:39 +02:00
pb
1ade41aeae moved the code that execute the booking into a separated function so that it can be launched as goroutine and parallelize get booking$ 2025-05-26 19:05:17 +02:00
pb
58dc579255 added debug logging 2025-05-26 18:30:56 +02:00
pb
370dac201b In CheckBooking mooved the loop on bookings outside of the loop of execs, which seems to repeat the Peer execution on booking an exponential number of time 2025-05-26 17:55:45 +02:00
pb
2a763006db counting round in exec 2025-05-26 17:41:26 +02:00
pb
522c66653b Added logging for debug 2025-05-26 17:22:09 +02:00
pb
b57f050b81 increased the limit of returns by Mongo find() 2025-05-22 14:41:38 +02:00
4 changed files with 97 additions and 19 deletions

View File

@@ -287,7 +287,7 @@ func (m *MongoDB) Search(filters *dbs.Filters, collection_name string) (*mongo.C
return nil, 503, err return nil, 503, err
} }
opts := options.Find() opts := options.Find()
opts.SetLimit(100) opts.SetLimit(1000)
targetDBCollection := CollectionMap[collection_name] targetDBCollection := CollectionMap[collection_name]
orList := bson.A{} orList := bson.A{}
andList := bson.A{} andList := bson.A{}

View File

@@ -4,9 +4,9 @@ import (
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/peer"
@@ -78,22 +78,60 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
bookings := []*booking.Booking{} bookings := []*booking.Booking{}
for _, exec := range execs { for _, exec := range execs {
bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...)
}
errCh := make(chan error, len(bookings))
var m sync.Mutex
for _, b := range bookings { for _, b := range bookings {
meth := request.Caller.URLS[tools.BOOKING][tools.GET] go getBooking(b, request, wf, execs, bookings, errCh, &m)
meth = strings.ReplaceAll(meth, ":id", b.ResourceID) }
meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05"))
meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) for i := 0; i < len(bookings); i++ {
request.Caller.URLS[tools.BOOKING][tools.GET] = meth if err := <-errCh; err != nil {
_, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller)
if err != nil {
return false, wf, execs, bookings, err return false, wf, execs, bookings, err
} }
} }
}
return true, wf, execs, bookings, nil return true, wf, execs, bookings, nil
} }
func getBooking( b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) {
m.Lock()
c, err := getCallerCopy(request, errCh)
if err != nil {
errCh <- err
return
}
m.Unlock()
meth := c.URLS[tools.BOOKING][tools.GET]
meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05"))
meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05"))
c.URLS[tools.BOOKING][tools.GET] = meth
_, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c)
if err != nil {
errCh <- err
return
}
errCh <- nil
}
func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) {
var c tools.HTTPCaller
err := request.Caller.DeepCopy(c)
if err != nil {
errCh <- err
return tools.HTTPCaller{}, nil
}
c.URLS = request.Caller.URLS
return c, err
}
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) {
if request == nil { if request == nil {
return ws, nil, []*WorkflowExecution{}, errors.New("no request found") return ws, nil, []*WorkflowExecution{}, errors.New("no request found")
@@ -112,15 +150,20 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
} }
ws.Workflow = wf ws.Workflow = wf
var errCh = make(chan error, len(bookings))
var m sync.Mutex
for _, booking := range bookings { for _, booking := range bookings {
l := logs.GetLogger() go ws.BookExecs(booking, request, errCh, &m)
l.Info().Msg("Booking on " + booking.DestPeerID) }
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) for i := 0; i < len(bookings); i++ {
if err != nil { if err := <- errCh ; err != nil {
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
} }
} }
fmt.Println("Schedules") fmt.Println("Schedules")
for _, exec := range executions { for _, exec := range executions {
err := exec.PurgeDraft(request) err := exec.PurgeDraft(request)
@@ -134,6 +177,27 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
return ws, wf, executions, nil return ws, wf, executions, nil
} }
func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
m.Lock()
c, err := getCallerCopy(request, errCh)
if err != nil {
errCh <- err
return
}
m.Unlock()
_, err = (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), &c)
if err != nil {
errCh <- err
return
}
errCh <- nil
}
/* /*
BOOKING IMPLIED TIME, not of subscription but of execution BOOKING IMPLIED TIME, not of subscription but of execution
so is processing time execution time applied on computes so is processing time execution time applied on computes

View File

@@ -7,6 +7,7 @@ import (
"cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/dbs/mongo" "cloud.o-forge.io/core/oc-lib/dbs/mongo"
"cloud.o-forge.io/core/oc-lib/logs"
beego "github.com/beego/beego/v2/server/web" beego "github.com/beego/beego/v2/server/web"
) )
@@ -135,6 +136,7 @@ func (a *API) CheckRemotePeer(url string) (State, map[string]int) {
// CheckRemoteAPIs checks the state of remote APIs from your proper OC // CheckRemoteAPIs checks the state of remote APIs from your proper OC
func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error) { func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error) {
// Check if the database is up // Check if the database is up
l := logs.GetLogger()
new := map[string]string{} new := map[string]string{}
caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller
code := 0 code := 0
@@ -145,6 +147,7 @@ func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error)
var resp APIStatusResponse var resp APIStatusResponse
b, err := caller.CallGet("http://"+api.API()+":8080", "/oc/version/status") // Call the status endpoint of the remote API (standard OC status endpoint) b, err := caller.CallGet("http://"+api.API()+":8080", "/oc/version/status") // Call the status endpoint of the remote API (standard OC status endpoint)
if err != nil { if err != nil {
l.Error().Msg(api.String() + " not reachable")
state = REDUCED_SERVICE // If a remote API is not reachable, return reduced service state = REDUCED_SERVICE // If a remote API is not reachable, return reduced service
continue continue
} }
@@ -161,6 +164,7 @@ func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error)
reachable = true // If the remote API is reachable, set reachable to true cause we are not dead reachable = true // If the remote API is reachable, set reachable to true cause we are not dead
} }
if !reachable { if !reachable {
l.Error().Msg("Peer check returned no answers")
state = DEAD // If no remote API is reachable, return dead, nobody is alive state = DEAD // If no remote API is reachable, return dead, nobody is alive
} }
if code > 0 { if code > 0 {

View File

@@ -63,6 +63,16 @@ func NewHTTPCaller(urls map[DataType]map[METHOD]string) *HTTPCaller {
} }
} }
// Creates a copy of the current caller, in order to have parallelized executions without race condition
func (c* HTTPCaller) DeepCopy(dst HTTPCaller) error {
bytes, err := json.Marshal(c)
if err != nil {
return err
}
return json.Unmarshal(bytes, &dst)
}
// CallGet calls the GET method on the HTTP server // CallGet calls the GET method on the HTTP server
func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) ([]byte, error) { func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, url+subpath, bytes.NewBuffer([]byte(""))) req, err := http.NewRequest(http.MethodGet, url+subpath, bytes.NewBuffer([]byte("")))