21 Commits

Author SHA1 Message Date
pb
778ffa05a1 what is collaborative 2025-06-02 18:05:08 +02:00
pb
3c15907427 added id for logger 2025-06-02 10:34:58 +02:00
pb
9ae5f3b91d timing status checks 2025-05-28 18:19:07 +02:00
pb
3a2141aab5 adding log to measure search time 2025-05-28 16:22:26 +02:00
pb
6ab6383144 corrected an use of the original http caller instead of the deep copy 2025-05-27 18:07:00 +02:00
pb
690d60f9d6 corrected the getBooking function parameters 2025-05-27 16:13:46 +02:00
pb
da0de80afd Booking check and booking post have been transformed in goroutine to improve performance when booking several execution with cron expressions 2025-05-27 15:38:24 +02:00
pb
cd7ae788b1 didn't put the blocking loop in the right place for post booking 2025-05-27 12:06:10 +02:00
pb
0d96cc53bf transformed the loop that posted the booking on oc-datacenter to a threaded operation where each call is done in a goroutine 2025-05-27 11:58:55 +02:00
pb
66fc3c5b35 added the passing of the request.Caller's URL to the deep copy 2025-05-27 11:34:44 +02:00
pb
5ab3eb8a38 forgot to pass the mutex as pointer and unlock it 2025-05-27 11:17:10 +02:00
pb
fec23b4acd modified HTTP caller to have a DeepCopy() method in order to parallelize calls without race conditions 2025-05-27 11:08:35 +02:00
pb
901622fee0 logging on the booking uuid before the post booking 2025-05-27 09:51:41 +02:00
pb
527e622774 correct the error channel 2025-05-26 19:21:28 +02:00
pb
7223b79fe8 correct the error channel 2025-05-26 19:16:39 +02:00
pb
1ade41aeae moved the code that execute the booking into a separated function so that it can be launched as goroutine and parallelize get booking$ 2025-05-26 19:05:17 +02:00
pb
58dc579255 added debug logging 2025-05-26 18:30:56 +02:00
pb
370dac201b In CheckBooking mooved the loop on bookings outside of the loop of execs, which seems to repeat the Peer execution on booking an exponential number of time 2025-05-26 17:55:45 +02:00
pb
2a763006db counting round in exec 2025-05-26 17:41:26 +02:00
pb
522c66653b Added logging for debug 2025-05-26 17:22:09 +02:00
pb
b57f050b81 increased the limit of returns by Mongo find() 2025-05-22 14:41:38 +02:00
5 changed files with 105 additions and 32 deletions

View File

@@ -287,7 +287,7 @@ func (m *MongoDB) Search(filters *dbs.Filters, collection_name string) (*mongo.C
return nil, 503, err return nil, 503, err
} }
opts := options.Find() opts := options.Find()
opts.SetLimit(100) opts.SetLimit(1000)
targetDBCollection := CollectionMap[collection_name] targetDBCollection := CollectionMap[collection_name]
orList := bson.A{} orList := bson.A{}
andList := bson.A{} andList := bson.A{}

View File

@@ -4,6 +4,7 @@ import (
"time" "time"
"cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
@@ -39,6 +40,8 @@ func (wfa *Booking) Check(id string, start time.Time, end *time.Time, parrallelA
end = &e end = &e
} }
accessor := NewAccessor(nil) accessor := NewAccessor(nil)
l := logs.GetLogger().With().Str("Search Check", "Booking").Logger()
l.Debug().Msg("Starting to search")
res, code, err := accessor.Search(&dbs.Filters{ res, code, err := accessor.Search(&dbs.Filters{
And: map[string][]dbs.Filter{ // check if there is a booking on the same compute resource by filtering on the compute_resource_id, the state and the execution date And: map[string][]dbs.Filter{ // check if there is a booking on the same compute resource by filtering on the compute_resource_id, the state and the execution date
"resource_id": {{Operator: dbs.EQUAL.String(), Value: id}}, "resource_id": {{Operator: dbs.EQUAL.String(), Value: id}},
@@ -49,6 +52,9 @@ func (wfa *Booking) Check(id string, start time.Time, end *time.Time, parrallelA
}, },
}, },
}, "", wfa.IsDraft) }, "", wfa.IsDraft)
l.Debug().Msg("Search finished")
if code != 200 { if code != 200 {
return false, err return false, err
} }

View File

@@ -1,13 +1,12 @@
package workflow_execution package workflow_execution
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/peer"
@@ -79,22 +78,60 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
bookings := []*booking.Booking{} bookings := []*booking.Booking{}
for _, exec := range execs { for _, exec := range execs {
bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...)
for _, b := range bookings {
meth := request.Caller.URLS[tools.BOOKING][tools.GET]
meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05"))
meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05"))
request.Caller.URLS[tools.BOOKING][tools.GET] = meth
_, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller)
if err != nil {
return false, wf, execs, bookings, err
}
}
} }
errCh := make(chan error, len(bookings))
var m sync.Mutex
for _, b := range bookings {
go getBooking(b, request, wf, execs, bookings, errCh, &m)
}
for i := 0; i < len(bookings); i++ {
if err := <-errCh; err != nil {
return false, wf, execs, bookings, err
}
}
return true, wf, execs, bookings, nil return true, wf, execs, bookings, nil
} }
func getBooking( b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) {
m.Lock()
c, err := getCallerCopy(request, errCh)
if err != nil {
errCh <- err
return
}
m.Unlock()
meth := c.URLS[tools.BOOKING][tools.GET]
meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05"))
meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05"))
c.URLS[tools.BOOKING][tools.GET] = meth
_, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c)
if err != nil {
errCh <- err
return
}
errCh <- nil
}
func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) {
var c tools.HTTPCaller
err := request.Caller.DeepCopy(c)
if err != nil {
errCh <- err
return tools.HTTPCaller{}, nil
}
c.URLS = request.Caller.URLS
return c, err
}
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) {
if request == nil { if request == nil {
return ws, nil, []*WorkflowExecution{}, errors.New("no request found") return ws, nil, []*WorkflowExecution{}, errors.New("no request found")
@@ -113,15 +150,20 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
} }
ws.Workflow = wf ws.Workflow = wf
var errCh = make(chan error, len(bookings))
var m sync.Mutex
for _, booking := range bookings { for _, booking := range bookings {
l := logs.GetLogger() go ws.BookExecs(booking, request, errCh, &m)
l.Info().Msg("Booking on " + booking.DestPeerID) }
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) for i := 0; i < len(bookings); i++ {
if err != nil { if err := <- errCh ; err != nil {
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
} }
} }
fmt.Println("Schedules") fmt.Println("Schedules")
for _, exec := range executions { for _, exec := range executions {
err := exec.PurgeDraft(request) err := exec.PurgeDraft(request)
@@ -135,6 +177,27 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
return ws, wf, executions, nil return ws, wf, executions, nil
} }
func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
m.Lock()
c, err := getCallerCopy(request, errCh)
if err != nil {
errCh <- err
return
}
m.Unlock()
_, err = (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), &c)
if err != nil {
errCh <- err
return
}
errCh <- nil
}
/* /*
BOOKING IMPLIED TIME, not of subscription but of execution BOOKING IMPLIED TIME, not of subscription but of execution
so is processing time execution time applied on computes so is processing time execution time applied on computes
@@ -154,9 +217,6 @@ func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*Workf
return workflows_executions, err return workflows_executions, err
} }
for _, date := range dates { for _, date := range dates {
fmt.Println("============")
fmt.Println("Date : " + fmt.Sprint(date))
fmt.Println("============")
obj := &WorkflowExecution{ obj := &WorkflowExecution{
AbstractObject: utils.AbstractObject{ AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(), // set the uuid of the execution UUID: uuid.New().String(), // set the uuid of the execution
@@ -193,17 +253,8 @@ func (ws *WorkflowSchedule) getDates() ([]Schedule, error) {
if err != nil { if err != nil {
return schedule, errors.New("Bad cron message: " + err.Error()) return schedule, errors.New("Bad cron message: " + err.Error())
} }
toto, _ := json.MarshalIndent(sched,"","")
fmt.Println(string(toto))
s := sched.Next(ws.Start)
fmt.Println("s.IsZero() : " + fmt.Sprint(s.IsZero()) + "s.Before(*ws.End) : " + fmt.Sprint(s.Before(*ws.End)) )
tata, _ := json.MarshalIndent(s,"","")
fmt.Println("s = " + string(tata) + " et ws.End = " + fmt.Sprint(ws.End))
// loop through the cron schedule to set the executions // loop through the cron schedule to set the executions
for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) { for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) {
fmt.Println("next execution :" + fmt.Sprint(s))
e := s.Add(time.Duration(ws.DurationS) * time.Second) e := s.Add(time.Duration(ws.DurationS) * time.Second)
schedule = append(schedule, Schedule{ schedule = append(schedule, Schedule{
Start: s, Start: s,

View File

@@ -7,7 +7,9 @@ import (
"cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/dbs/mongo" "cloud.o-forge.io/core/oc-lib/dbs/mongo"
"cloud.o-forge.io/core/oc-lib/logs"
beego "github.com/beego/beego/v2/server/web" beego "github.com/beego/beego/v2/server/web"
"github.com/google/uuid"
) )
type APIRequest struct { type APIRequest struct {
@@ -134,6 +136,9 @@ func (a *API) CheckRemotePeer(url string) (State, map[string]int) {
// CheckRemoteAPIs checks the state of remote APIs from your proper OC // CheckRemoteAPIs checks the state of remote APIs from your proper OC
func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error) { func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error) {
id := uuid.New()
l := logs.GetLogger().With().Str("id",id.String()).Logger()
l.Debug().Msg("Start checking")
// Check if the database is up // Check if the database is up
new := map[string]string{} new := map[string]string{}
caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller
@@ -142,6 +147,7 @@ func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error)
state := ALIVE state := ALIVE
reachable := false reachable := false
for _, api := range apis { // Check the state of each remote API in the list for _, api := range apis { // Check the state of each remote API in the list
l.Debug().Msg("Checking : " + api.String() + " at " + api.API())
var resp APIStatusResponse var resp APIStatusResponse
b, err := caller.CallGet("http://"+api.API()+":8080", "/oc/version/status") // Call the status endpoint of the remote API (standard OC status endpoint) b, err := caller.CallGet("http://"+api.API()+":8080", "/oc/version/status") // Call the status endpoint of the remote API (standard OC status endpoint)
if err != nil { if err != nil {

View File

@@ -63,6 +63,16 @@ func NewHTTPCaller(urls map[DataType]map[METHOD]string) *HTTPCaller {
} }
} }
// Creates a copy of the current caller, in order to have parallelized executions without race condition
func (c* HTTPCaller) DeepCopy(dst HTTPCaller) error {
bytes, err := json.Marshal(c)
if err != nil {
return err
}
return json.Unmarshal(bytes, &dst)
}
// CallGet calls the GET method on the HTTP server // CallGet calls the GET method on the HTTP server
func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) ([]byte, error) { func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, url+subpath, bytes.NewBuffer([]byte(""))) req, err := http.NewRequest(http.MethodGet, url+subpath, bytes.NewBuffer([]byte("")))