5 Commits

Author SHA1 Message Date
pb
f026e30c6a printing in schedules 2025-04-30 12:59:57 +02:00
pb
2a737ad559 printing in schedules 2025-04-30 12:47:46 +02:00
pb
0de37348db printing in schedules 2025-04-30 12:37:55 +02:00
pb
64d1544951 printing in schedules 2025-04-30 11:39:40 +02:00
pb
bd194e6d47 printing compute id 2025-04-29 16:50:52 +02:00
6 changed files with 30 additions and 108 deletions

View File

@@ -287,7 +287,7 @@ func (m *MongoDB) Search(filters *dbs.Filters, collection_name string) (*mongo.C
return nil, 503, err return nil, 503, err
} }
opts := options.Find() opts := options.Find()
opts.SetLimit(1000) opts.SetLimit(100)
targetDBCollection := CollectionMap[collection_name] targetDBCollection := CollectionMap[collection_name]
orList := bson.A{} orList := bson.A{}
andList := bson.A{} andList := bson.A{}

View File

@@ -93,10 +93,10 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
} }
mypeer.AddExecution(*pexec) mypeer.AddExecution(*pexec)
NewShallowAccessor().UpdateOne(mypeer, peerID) // Update the peer in the db NewShallowAccessor().UpdateOne(mypeer, peerID) // Update the peer in the db
return nil, errors.New("peer is " + peerID + " not reachable") return nil, errors.New("peer is not reachable")
} else { } else {
if mypeer == nil { if mypeer == nil {
return nil, errors.New("peer " + peerID + " not found") return nil, errors.New("peer not found")
} }
// If the peer is reachable, launch the execution // If the peer is reachable, launch the execution
url = p.urlFormat((mypeer.Url), dt) + path // Format the URL url = p.urlFormat((mypeer.Url), dt) + path // Format the URL

View File

@@ -2,6 +2,7 @@ package workflow
import ( import (
"errors" "errors"
"fmt"
"time" "time"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area" "cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
@@ -110,6 +111,7 @@ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) {
accessor := (&resources.ComputeResource{}).GetAccessor(&tools.APIRequest{Caller: caller}) accessor := (&resources.ComputeResource{}).GetAccessor(&tools.APIRequest{Caller: caller})
for _, link := range wfa.Graph.Links { for _, link := range wfa.Graph.Links {
if ok, compute_id := link.IsComputeLink(*wfa.Graph); ok { // check if the link is a link between a compute and a resource if ok, compute_id := link.IsComputeLink(*wfa.Graph); ok { // check if the link is a link between a compute and a resource
fmt.Println("compute :" + compute_id)
compute, code, _ := accessor.LoadOne(compute_id) compute, code, _ := accessor.LoadOne(compute_id)
if code != 200 { if code != 200 {
continue continue

View File

@@ -112,8 +112,6 @@ func (d *WorkflowExecution) VerifyAuth(request *tools.APIRequest) bool {
func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[tools.DataType]map[string]pricing.PricedItemITF) []*booking.Booking { func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[tools.DataType]map[string]pricing.PricedItemITF) []*booking.Booking {
booking := d.bookEach(executionsID, wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE]) booking := d.bookEach(executionsID, wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE])
booking = append(booking, d.bookEach(executionsID, wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...) booking = append(booking, d.bookEach(executionsID, wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...)
booking = append(booking,d.bookEach(executionsID, wfID, tools.COMPUTE_RESOURCE, priceds[tools.COMPUTE_RESOURCE])...)
booking = append(booking,d.bookEach(executionsID, wfID, tools.DATA_RESOURCE, priceds[tools.DATA_RESOURCE])...)
return booking return booking
} }

View File

@@ -1,13 +1,12 @@
package workflow_execution package workflow_execution
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/peer"
@@ -16,7 +15,6 @@ import (
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/robfig/cron" "github.com/robfig/cron"
"github.com/rs/zerolog"
) )
/* /*
@@ -55,8 +53,6 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W
} }
func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) { func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) {
l := logs.GetLogger().With().Str("SchedulerID", ws.UUID).Logger()
l.Debug().Msg("Checking booking")
if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" {
return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined") return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined")
} }
@@ -75,69 +71,33 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) { if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) {
ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n" ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n"
} }
l.Debug().Msg("Getting executions")
execs, err := ws.getExecutions(wf) execs, err := ws.getExecutions(wf)
if err != nil { if err != nil {
return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err
} }
bookings := []*booking.Booking{} bookings := []*booking.Booking{}
for i, exec := range execs { for _, exec := range execs {
l.Debug().Msg("looping throughs execs : " + string(i))
bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...)
}
errCh := make(chan error, len(bookings))
var m sync.Mutex
for _, b := range bookings { for _, b := range bookings {
go getBooking(l, b, request, wf, execs, bookings, errCh, &m) // ------------ DELETE
} fmt.Println("Booking :")
book, _ := json.Marshal(b)
for i := 0; i < len(bookings); i++ { fmt.Println(string(book))
if err := <-errCh; err != nil { // ----------------
return false, wf, execs, bookings, err
}
}
return true, wf, execs, bookings, nil
}
func getBooking(l zerolog.Logger, b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) {
m.Lock()
c, err := getCallerCopy(request, errCh)
if err != nil {
errCh <- err
return
}
m.Unlock()
bl := l.With().Str("booking", b.UUID).Logger()
meth := request.Caller.URLS[tools.BOOKING][tools.GET] meth := request.Caller.URLS[tools.BOOKING][tools.GET]
meth = strings.ReplaceAll(meth, ":id", b.ResourceID) meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05"))
meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05"))
request.Caller.URLS[tools.BOOKING][tools.GET] = meth request.Caller.URLS[tools.BOOKING][tools.GET] = meth
bl.Debug().Msg("Get booking " + b.UUID + " on " + b.DestPeerID) fmt.Println("Peer exec on " + b.DestPeerID)
_, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c) _, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller)
bl.Debug().Msg("Received response from Get booking " + b.UUID + " on " + b.DestPeerID)
if err != nil { if err != nil {
errCh <- err return false, wf, execs, bookings, err
return }
} }
errCh <- nil
} }
return true, wf, execs, bookings, nil
func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) {
var c tools.HTTPCaller
err := request.Caller.DeepCopy(c)
if err != nil {
errCh <- err
return tools.HTTPCaller{}, nil
}
c.URLS = request.Caller.URLS
return c, err
} }
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) {
@@ -155,23 +115,19 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
ok, wf, executions, bookings, err := ws.CheckBooking(wfID, request) ok, wf, executions, bookings, err := ws.CheckBooking(wfID, request)
ws.WorkflowExecution = executions ws.WorkflowExecution = executions
if !ok || err != nil { if !ok || err != nil {
fmt.Println("Error here when scheduling")
return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
} }
ws.Workflow = wf ws.Workflow = wf
var errCh = make(chan error, len(bookings))
var m sync.Mutex
for _, booking := range bookings { for _, booking := range bookings {
go ws.BookExecs(booking, request, errCh, &m) fmt.Println("Trying to schedule on ")
} fmt.Println(booking.DestPeerID)
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
for i := 0; i < len(bookings); i++ { tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)
if err := <- errCh ; err != nil { if err != nil {
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
} }
} }
fmt.Println("Schedules") fmt.Println("Schedules")
for _, exec := range executions { for _, exec := range executions {
err := exec.PurgeDraft(request) err := exec.PurgeDraft(request)
@@ -185,30 +141,6 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
return ws, wf, executions, nil return ws, wf, executions, nil
} }
func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
m.Lock()
c, err := getCallerCopy(request, errCh)
if err != nil {
errCh <- err
return
}
m.Unlock()
l := logs.GetLogger().With().Str("SchedulerID", ws.UUID).Logger()
l.Debug().Msg("Booking " + booking.UUID + " on " + booking.DestPeerID)
_, err = (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), &c)
l.Debug().Msg("Received answer for booking " + booking.UUID + " on " + booking.DestPeerID)
if err != nil {
errCh <- err
return
}
errCh <- nil
}
/* /*
BOOKING IMPLIED TIME, not of subscription but of execution BOOKING IMPLIED TIME, not of subscription but of execution
so is processing time execution time applied on computes so is processing time execution time applied on computes

View File

@@ -63,16 +63,6 @@ func NewHTTPCaller(urls map[DataType]map[METHOD]string) *HTTPCaller {
} }
} }
// Creates a copy of the current caller, in order to have parallelized executions without race condition
func (c* HTTPCaller) DeepCopy(dst HTTPCaller) error {
bytes, err := json.Marshal(c)
if err != nil {
return err
}
return json.Unmarshal(bytes, &dst)
}
// CallGet calls the GET method on the HTTP server // CallGet calls the GET method on the HTTP server
func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) ([]byte, error) { func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, url+subpath, bytes.NewBuffer([]byte(""))) req, err := http.NewRequest(http.MethodGet, url+subpath, bytes.NewBuffer([]byte("")))