6 Commits

Author SHA1 Message Date
pb
d4139fa270 oui ok 2025-05-21 16:08:10 +02:00
pb
db09d2ea89 dispaying te dates in getExecutions 2025-05-21 16:02:20 +02:00
pb
e79e909805 dispaying te dates in getExecutions 2025-05-21 15:55:47 +02:00
pb
f7e4e11705 dispaying te dates in getExecutions 2025-05-21 15:44:37 +02:00
pb
77554cbcf5 dispaying te dates in getExecutions 2025-05-21 15:41:21 +02:00
pb
0783395121 dispaying te dates in getExecutions 2025-05-21 15:25:02 +02:00
5 changed files with 31 additions and 104 deletions

View File

@@ -287,7 +287,7 @@ func (m *MongoDB) Search(filters *dbs.Filters, collection_name string) (*mongo.C
return nil, 503, err return nil, 503, err
} }
opts := options.Find() opts := options.Find()
opts.SetLimit(1000) opts.SetLimit(100)
targetDBCollection := CollectionMap[collection_name] targetDBCollection := CollectionMap[collection_name]
orList := bson.A{} orList := bson.A{}
andList := bson.A{} andList := bson.A{}

View File

@@ -4,7 +4,6 @@ import (
"time" "time"
"cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
@@ -40,8 +39,6 @@ func (wfa *Booking) Check(id string, start time.Time, end *time.Time, parrallelA
end = &e end = &e
} }
accessor := NewAccessor(nil) accessor := NewAccessor(nil)
l := logs.GetLogger().With().Str("Search Check", "Booking").Logger()
l.Debug().Msg("Starting to search")
res, code, err := accessor.Search(&dbs.Filters{ res, code, err := accessor.Search(&dbs.Filters{
And: map[string][]dbs.Filter{ // check if there is a booking on the same compute resource by filtering on the compute_resource_id, the state and the execution date And: map[string][]dbs.Filter{ // check if there is a booking on the same compute resource by filtering on the compute_resource_id, the state and the execution date
"resource_id": {{Operator: dbs.EQUAL.String(), Value: id}}, "resource_id": {{Operator: dbs.EQUAL.String(), Value: id}},
@@ -52,9 +49,6 @@ func (wfa *Booking) Check(id string, start time.Time, end *time.Time, parrallelA
}, },
}, },
}, "", wfa.IsDraft) }, "", wfa.IsDraft)
l.Debug().Msg("Search finished")
if code != 200 { if code != 200 {
return false, err return false, err
} }

View File

@@ -1,12 +1,13 @@
package workflow_execution package workflow_execution
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/peer"
@@ -78,60 +79,22 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
bookings := []*booking.Booking{} bookings := []*booking.Booking{}
for _, exec := range execs { for _, exec := range execs {
bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...)
} for _, b := range bookings {
meth := request.Caller.URLS[tools.BOOKING][tools.GET]
errCh := make(chan error, len(bookings)) meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
var m sync.Mutex meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05"))
meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05"))
for _, b := range bookings { request.Caller.URLS[tools.BOOKING][tools.GET] = meth
go getBooking(b, request, wf, execs, bookings, errCh, &m) _, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller)
} if err != nil {
return false, wf, execs, bookings, err
for i := 0; i < len(bookings); i++ { }
if err := <-errCh; err != nil {
return false, wf, execs, bookings, err
} }
}
}
return true, wf, execs, bookings, nil return true, wf, execs, bookings, nil
} }
func getBooking( b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) {
m.Lock()
c, err := getCallerCopy(request, errCh)
if err != nil {
errCh <- err
return
}
m.Unlock()
meth := c.URLS[tools.BOOKING][tools.GET]
meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05"))
meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05"))
c.URLS[tools.BOOKING][tools.GET] = meth
_, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c)
if err != nil {
errCh <- err
return
}
errCh <- nil
}
func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) {
var c tools.HTTPCaller
err := request.Caller.DeepCopy(c)
if err != nil {
errCh <- err
return tools.HTTPCaller{}, nil
}
c.URLS = request.Caller.URLS
return c, err
}
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) {
if request == nil { if request == nil {
return ws, nil, []*WorkflowExecution{}, errors.New("no request found") return ws, nil, []*WorkflowExecution{}, errors.New("no request found")
@@ -150,20 +113,15 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
} }
ws.Workflow = wf ws.Workflow = wf
var errCh = make(chan error, len(bookings))
var m sync.Mutex
for _, booking := range bookings { for _, booking := range bookings {
go ws.BookExecs(booking, request, errCh, &m) l := logs.GetLogger()
} l.Info().Msg("Booking on " + booking.DestPeerID)
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
for i := 0; i < len(bookings); i++ { tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)
if err := <- errCh ; err != nil { if err != nil {
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
} }
} }
fmt.Println("Schedules") fmt.Println("Schedules")
for _, exec := range executions { for _, exec := range executions {
err := exec.PurgeDraft(request) err := exec.PurgeDraft(request)
@@ -177,27 +135,6 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
return ws, wf, executions, nil return ws, wf, executions, nil
} }
func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
m.Lock()
c, err := getCallerCopy(request, errCh)
if err != nil {
errCh <- err
return
}
m.Unlock()
_, err = (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), &c)
if err != nil {
errCh <- err
return
}
errCh <- nil
}
/* /*
BOOKING IMPLIED TIME, not of subscription but of execution BOOKING IMPLIED TIME, not of subscription but of execution
so is processing time execution time applied on computes so is processing time execution time applied on computes
@@ -217,6 +154,9 @@ func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*Workf
return workflows_executions, err return workflows_executions, err
} }
for _, date := range dates { for _, date := range dates {
fmt.Println("============")
fmt.Println("Date : " + fmt.Sprint(date))
fmt.Println("============")
obj := &WorkflowExecution{ obj := &WorkflowExecution{
AbstractObject: utils.AbstractObject{ AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(), // set the uuid of the execution UUID: uuid.New().String(), // set the uuid of the execution
@@ -253,8 +193,17 @@ func (ws *WorkflowSchedule) getDates() ([]Schedule, error) {
if err != nil { if err != nil {
return schedule, errors.New("Bad cron message: " + err.Error()) return schedule, errors.New("Bad cron message: " + err.Error())
} }
toto, _ := json.MarshalIndent(sched,"","")
fmt.Println(string(toto))
s := sched.Next(ws.Start)
fmt.Println("s.IsZero() : " + fmt.Sprint(s.IsZero()) + "s.Before(*ws.End) : " + fmt.Sprint(s.Before(*ws.End)) )
tata, _ := json.MarshalIndent(s,"","")
fmt.Println("s = " + string(tata) + " et ws.End = " + fmt.Sprint(ws.End))
// loop through the cron schedule to set the executions // loop through the cron schedule to set the executions
for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) { for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) {
fmt.Println("next execution :" + fmt.Sprint(s))
e := s.Add(time.Duration(ws.DurationS) * time.Second) e := s.Add(time.Duration(ws.DurationS) * time.Second)
schedule = append(schedule, Schedule{ schedule = append(schedule, Schedule{
Start: s, Start: s,

View File

@@ -7,9 +7,7 @@ import (
"cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/dbs/mongo" "cloud.o-forge.io/core/oc-lib/dbs/mongo"
"cloud.o-forge.io/core/oc-lib/logs"
beego "github.com/beego/beego/v2/server/web" beego "github.com/beego/beego/v2/server/web"
"github.com/google/uuid"
) )
type APIRequest struct { type APIRequest struct {
@@ -136,9 +134,6 @@ func (a *API) CheckRemotePeer(url string) (State, map[string]int) {
// CheckRemoteAPIs checks the state of remote APIs from your proper OC // CheckRemoteAPIs checks the state of remote APIs from your proper OC
func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error) { func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error) {
id := uuid.New()
l := logs.GetLogger().With().Str("id",id.String()).Logger()
l.Debug().Msg("Start checking")
// Check if the database is up // Check if the database is up
new := map[string]string{} new := map[string]string{}
caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller
@@ -147,7 +142,6 @@ func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error)
state := ALIVE state := ALIVE
reachable := false reachable := false
for _, api := range apis { // Check the state of each remote API in the list for _, api := range apis { // Check the state of each remote API in the list
l.Debug().Msg("Checking : " + api.String() + " at " + api.API())
var resp APIStatusResponse var resp APIStatusResponse
b, err := caller.CallGet("http://"+api.API()+":8080", "/oc/version/status") // Call the status endpoint of the remote API (standard OC status endpoint) b, err := caller.CallGet("http://"+api.API()+":8080", "/oc/version/status") // Call the status endpoint of the remote API (standard OC status endpoint)
if err != nil { if err != nil {

View File

@@ -63,16 +63,6 @@ func NewHTTPCaller(urls map[DataType]map[METHOD]string) *HTTPCaller {
} }
} }
// Creates a copy of the current caller, in order to have parallelized executions without race condition
func (c* HTTPCaller) DeepCopy(dst HTTPCaller) error {
bytes, err := json.Marshal(c)
if err != nil {
return err
}
return json.Unmarshal(bytes, &dst)
}
// CallGet calls the GET method on the HTTP server // CallGet calls the GET method on the HTTP server
func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) ([]byte, error) { func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, url+subpath, bytes.NewBuffer([]byte(""))) req, err := http.NewRequest(http.MethodGet, url+subpath, bytes.NewBuffer([]byte("")))