6 Commits

Author SHA1 Message Date
pb
d4139fa270 oui ok 2025-05-21 16:08:10 +02:00
pb
db09d2ea89 dispaying te dates in getExecutions 2025-05-21 16:02:20 +02:00
pb
e79e909805 dispaying te dates in getExecutions 2025-05-21 15:55:47 +02:00
pb
f7e4e11705 dispaying te dates in getExecutions 2025-05-21 15:44:37 +02:00
pb
77554cbcf5 dispaying te dates in getExecutions 2025-05-21 15:41:21 +02:00
pb
0783395121 dispaying te dates in getExecutions 2025-05-21 15:25:02 +02:00
3 changed files with 31 additions and 103 deletions

View File

@@ -287,7 +287,7 @@ func (m *MongoDB) Search(filters *dbs.Filters, collection_name string) (*mongo.C
return nil, 503, err return nil, 503, err
} }
opts := options.Find() opts := options.Find()
opts.SetLimit(1000) opts.SetLimit(100)
targetDBCollection := CollectionMap[collection_name] targetDBCollection := CollectionMap[collection_name]
orList := bson.A{} orList := bson.A{}
andList := bson.A{} andList := bson.A{}

View File

@@ -1,10 +1,10 @@
package workflow_execution package workflow_execution
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/logs"
@@ -16,7 +16,6 @@ import (
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/robfig/cron" "github.com/robfig/cron"
"github.com/rs/zerolog"
) )
/* /*
@@ -55,8 +54,6 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W
} }
func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) { func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) {
l := logs.GetLogger().With().Str("SchedulerID", ws.UUID).Logger()
l.Debug().Msg("Checking booking")
if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" {
return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined") return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined")
} }
@@ -75,71 +72,29 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) { if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) {
ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n" ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n"
} }
l.Debug().Msg("Getting executions")
execs, err := ws.getExecutions(wf) execs, err := ws.getExecutions(wf)
if err != nil { if err != nil {
return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err
} }
bookings := []*booking.Booking{} bookings := []*booking.Booking{}
for i, exec := range execs { for _, exec := range execs {
l.Debug().Msg("looping throughs execs : " + string(i))
bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...)
} for _, b := range bookings {
meth := request.Caller.URLS[tools.BOOKING][tools.GET]
errCh := make(chan error, len(bookings)) meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
var m sync.Mutex meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05"))
meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05"))
for _, b := range bookings { request.Caller.URLS[tools.BOOKING][tools.GET] = meth
go getBooking(l, b, request, wf, execs, bookings, errCh, &m) _, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller)
} if err != nil {
return false, wf, execs, bookings, err
for i := 0; i < len(bookings); i++ { }
if err := <-errCh; err != nil {
return false, wf, execs, bookings, err
} }
}
}
return true, wf, execs, bookings, nil return true, wf, execs, bookings, nil
} }
func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) {
m.Lock()
c, err := getCallerCopy(request, errCh)
if err != nil {
errCh <- err
return
}
m.Unlock()
bl := l.With().Str("booking", b.UUID).Logger()
meth := request.Caller.URLS[tools.BOOKING][tools.GET]
meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05"))
meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05"))
request.Caller.URLS[tools.BOOKING][tools.GET] = meth
bl.Debug().Msg("Get booking " + b.UUID + " on " + b.DestPeerID)
_, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c)
bl.Debug().Msg("Received response from Get booking " + b.UUID + " on " + b.DestPeerID)
if err != nil {
errCh <- err
return
}
errCh <- nil
}
func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) {
var c tools.HTTPCaller
err := request.Caller.DeepCopy(c)
if err != nil {
errCh <- err
return tools.HTTPCaller{}, nil
}
c.URLS = request.Caller.URLS
return c, err
}
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) {
if request == nil { if request == nil {
return ws, nil, []*WorkflowExecution{}, errors.New("no request found") return ws, nil, []*WorkflowExecution{}, errors.New("no request found")
@@ -158,20 +113,15 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
} }
ws.Workflow = wf ws.Workflow = wf
var errCh = make(chan error, len(bookings))
var m sync.Mutex
for _, booking := range bookings { for _, booking := range bookings {
go ws.BookExecs(booking, request, errCh, &m) l := logs.GetLogger()
} l.Info().Msg("Booking on " + booking.DestPeerID)
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
for i := 0; i < len(bookings); i++ { tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)
if err := <- errCh ; err != nil { if err != nil {
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
} }
} }
fmt.Println("Schedules") fmt.Println("Schedules")
for _, exec := range executions { for _, exec := range executions {
err := exec.PurgeDraft(request) err := exec.PurgeDraft(request)
@@ -185,30 +135,6 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
return ws, wf, executions, nil return ws, wf, executions, nil
} }
func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
m.Lock()
c, err := getCallerCopy(request, errCh)
if err != nil {
errCh <- err
return
}
m.Unlock()
l := logs.GetLogger().With().Str("SchedulerID", ws.UUID).Logger()
l.Debug().Msg("Booking " + booking.UUID + " on " + booking.DestPeerID)
_, err = (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), &c)
l.Debug().Msg("Received answer for booking " + booking.UUID + " on " + booking.DestPeerID)
if err != nil {
errCh <- err
return
}
errCh <- nil
}
/* /*
BOOKING IMPLIED TIME, not of subscription but of execution BOOKING IMPLIED TIME, not of subscription but of execution
so is processing time execution time applied on computes so is processing time execution time applied on computes
@@ -228,6 +154,9 @@ func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*Workf
return workflows_executions, err return workflows_executions, err
} }
for _, date := range dates { for _, date := range dates {
fmt.Println("============")
fmt.Println("Date : " + fmt.Sprint(date))
fmt.Println("============")
obj := &WorkflowExecution{ obj := &WorkflowExecution{
AbstractObject: utils.AbstractObject{ AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(), // set the uuid of the execution UUID: uuid.New().String(), // set the uuid of the execution
@@ -264,8 +193,17 @@ func (ws *WorkflowSchedule) getDates() ([]Schedule, error) {
if err != nil { if err != nil {
return schedule, errors.New("Bad cron message: " + err.Error()) return schedule, errors.New("Bad cron message: " + err.Error())
} }
toto, _ := json.MarshalIndent(sched,"","")
fmt.Println(string(toto))
s := sched.Next(ws.Start)
fmt.Println("s.IsZero() : " + fmt.Sprint(s.IsZero()) + "s.Before(*ws.End) : " + fmt.Sprint(s.Before(*ws.End)) )
tata, _ := json.MarshalIndent(s,"","")
fmt.Println("s = " + string(tata) + " et ws.End = " + fmt.Sprint(ws.End))
// loop through the cron schedule to set the executions // loop through the cron schedule to set the executions
for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) { for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) {
fmt.Println("next execution :" + fmt.Sprint(s))
e := s.Add(time.Duration(ws.DurationS) * time.Second) e := s.Add(time.Duration(ws.DurationS) * time.Second)
schedule = append(schedule, Schedule{ schedule = append(schedule, Schedule{
Start: s, Start: s,

View File

@@ -63,16 +63,6 @@ func NewHTTPCaller(urls map[DataType]map[METHOD]string) *HTTPCaller {
} }
} }
// Creates a copy of the current caller, in order to have parallelized executions without race condition
func (c* HTTPCaller) DeepCopy(dst HTTPCaller) error {
bytes, err := json.Marshal(c)
if err != nil {
return err
}
return json.Unmarshal(bytes, &dst)
}
// CallGet calls the GET method on the HTTP server // CallGet calls the GET method on the HTTP server
func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) ([]byte, error) { func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, url+subpath, bytes.NewBuffer([]byte(""))) req, err := http.NewRequest(http.MethodGet, url+subpath, bytes.NewBuffer([]byte("")))