workflow scheduler create booking with a booking execution lot id

This commit is contained in:
mr 2025-02-12 14:14:28 +01:00
parent 4833bcb710
commit 3e85fdc779
2 changed files with 19 additions and 18 deletions

View File

@ -261,7 +261,7 @@ func (r *Request) Schedule(wfID string, scheduler *workflow_execution.WorkflowSc
} }
func (r *Request) CheckBooking(wfID string, start string, end string, durationInS float64, cron string) bool { func (r *Request) CheckBooking(wfID string, start string, end string, durationInS float64, cron string) bool {
ok, _, _, err := workflow_execution.NewScheduler(start, end, durationInS, cron).CheckBooking(wfID, &tools.APIRequest{ ok, _, _, _, err := workflow_execution.NewScheduler(start, end, durationInS, cron).CheckBooking(wfID, &tools.APIRequest{
Caller: r.caller, Caller: r.caller,
Username: r.user, Username: r.user,
PeerID: r.peerID, PeerID: r.peerID,

View File

@ -6,6 +6,7 @@ import (
"strings" "strings"
"time" "time"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
@ -50,19 +51,19 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W
return ws return ws
} }
func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, error) { func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) {
if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" {
return false, nil, []*WorkflowExecution{}, errors.New("no caller defined") return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined")
} }
access := workflow.NewAccessor(request) access := workflow.NewAccessor(request)
res, code, err := access.LoadOne(wfID) res, code, err := access.LoadOne(wfID)
if code != 200 { if code != 200 {
return false, nil, []*WorkflowExecution{}, errors.New("could not load the workflow with id: " + err.Error()) return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("could not load the workflow with id: " + err.Error())
} }
wf := res.(*workflow.Workflow) wf := res.(*workflow.Workflow)
longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request) longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request)
if err != nil { if err != nil {
return false, wf, []*WorkflowExecution{}, err return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err
} }
ws.DurationS = longest ws.DurationS = longest
ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds." ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds."
@ -71,10 +72,11 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
} }
execs, err := ws.getExecutions(wf) execs, err := ws.getExecutions(wf)
if err != nil { if err != nil {
return false, wf, []*WorkflowExecution{}, err return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err
} }
bookings := []*booking.Booking{}
for _, exec := range execs { for _, exec := range execs {
bookings := exec.Book(ws.UUID, wfID, priceds) bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...)
for _, b := range bookings { for _, b := range bookings {
meth := request.Caller.URLS[tools.BOOKING][tools.GET] meth := request.Caller.URLS[tools.BOOKING][tools.GET]
meth = strings.ReplaceAll(meth, ":id", b.ResourceID) meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
@ -83,19 +85,11 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
request.Caller.URLS[tools.BOOKING][tools.GET] = meth request.Caller.URLS[tools.BOOKING][tools.GET] = meth
_, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller) _, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller)
if err != nil { if err != nil {
return false, wf, execs, err return false, wf, execs, bookings, err
} }
} }
for _, booking := range bookings {
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)
if err != nil {
return false, wf, execs, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
}
}
} }
return true, wf, execs, nil return true, wf, execs, bookings, nil
} }
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) {
@ -110,12 +104,19 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
if _, ok := methods[tools.GET]; !ok { if _, ok := methods[tools.GET]; !ok {
return ws, nil, []*WorkflowExecution{}, errors.New("no path found") return ws, nil, []*WorkflowExecution{}, errors.New("no path found")
} }
ok, wf, executions, err := ws.CheckBooking(wfID, request) ok, wf, executions, bookings, err := ws.CheckBooking(wfID, request)
ws.WorkflowExecution = executions ws.WorkflowExecution = executions
if !ok || err != nil { if !ok || err != nil {
return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
} }
ws.Workflow = wf ws.Workflow = wf
for _, booking := range bookings {
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)
if err != nil {
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
}
}
for _, exec := range executions { for _, exec := range executions {
err := exec.PurgeDraft(request) err := exec.PurgeDraft(request)
if err != nil { if err != nil {