From d3cfe019e3a38c589751f06f060f6cbdc4d9a318 Mon Sep 17 00:00:00 2001 From: mr Date: Fri, 20 Jun 2025 08:10:52 +0200 Subject: [PATCH] add sets up --- entrypoint.go | 2 +- .../purchase_resource/purchase_resource.go | 1 + .../workflow_execution/workflow_execution.go | 1 + .../workflow_execution/workflow_scheduler.go | 33 +++++++++---------- tools/enums.go | 3 +- 5 files changed, 21 insertions(+), 19 deletions(-) diff --git a/entrypoint.go b/entrypoint.go index 7f2cbd7..04124d2 100644 --- a/entrypoint.go +++ b/entrypoint.go @@ -266,7 +266,7 @@ func (r *Request) Schedule(wfID string, scheduler *workflow_execution.WorkflowSc } func (r *Request) CheckBooking(wfID string, start string, end string, durationInS float64, cron string) bool { - ok, _, _, _, err := workflow_execution.NewScheduler(start, end, durationInS, cron).CheckBooking(wfID, &tools.APIRequest{ + ok, _, _, _, _, err := workflow_execution.NewScheduler(start, end, durationInS, cron).GetBuyAndBook(wfID, &tools.APIRequest{ Caller: r.caller, Username: r.user, PeerID: r.peerID, diff --git a/models/resources/purchase_resource/purchase_resource.go b/models/resources/purchase_resource/purchase_resource.go index 0f02de2..cccb3b0 100644 --- a/models/resources/purchase_resource/purchase_resource.go +++ b/models/resources/purchase_resource/purchase_resource.go @@ -9,6 +9,7 @@ import ( type PurchaseResource struct { utils.AbstractObject + DestPeerID string EndDate *time.Time `json:"end_buying_date,omitempty" bson:"end_buying_date,omitempty"` ResourceID string `json:"resource_id" bson:"resource_id" validate:"required"` ResourceType tools.DataType `json:"resource_type" bson:"resource_type" validate:"required"` diff --git a/models/workflow_execution/workflow_execution.go b/models/workflow_execution/workflow_execution.go index 2cf65e9..06840de 100755 --- a/models/workflow_execution/workflow_execution.go +++ b/models/workflow_execution/workflow_execution.go @@ -147,6 +147,7 @@ func (d *WorkflowExecution) buyEach(bs pricing.BillingStrategy, executionsID str UUID: uuid.New().String(), Name: d.GetName() + "_" + executionsID + "_" + wfID, }, + DestPeerID: priced.GetCreatorID(), ResourceID: priced.GetID(), ResourceType: dt, EndDate: &end, diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index b0728bc..4863010 100755 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -58,19 +58,19 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W return ws } -func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) { +func (ws *WorkflowSchedule) GetBuyAndBook(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*purchase_resource.PurchaseResource, []*booking.Booking, error) { if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { - return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined") + return false, nil, []*WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, errors.New("no caller defined") } access := workflow.NewAccessor(request) res, code, err := access.LoadOne(wfID) if code != 200 { - return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("could not load the workflow with id: " + err.Error()) + return false, nil, []*WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, errors.New("could not load the workflow with id: " + err.Error()) } wf := res.(*workflow.Workflow) longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request, int(ws.SelectedBuyingStrategy), ws.SelectedPricingStrategy) if err != nil { - return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err + return false, wf, []*WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, err } ws.DurationS = longest ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds." @@ -79,7 +79,7 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) } execs, err := ws.GetExecutions(wf) if err != nil { - return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err + return false, wf, []*WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, err } purchased := []*purchase_resource.PurchaseResource{} bookings := []*booking.Booking{} @@ -97,11 +97,11 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) for i := 0; i < len(bookings); i++ { if err := <-errCh; err != nil { - return false, wf, execs, bookings, err + return false, wf, execs, purchased, bookings, err } } - return true, wf, execs, bookings, nil + return true, wf, execs, purchased, bookings, nil } func getBooking(b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) { @@ -152,7 +152,7 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* if _, ok := methods[tools.GET]; !ok { return ws, nil, []*WorkflowExecution{}, errors.New("no path found") } - ok, wf, executions, bookings, err := ws.CheckBooking(wfID, request) + ok, wf, executions, purchases, bookings, err := ws.GetBuyAndBook(wfID, request) ws.WorkflowExecution = executions if !ok || err != nil { return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) @@ -162,8 +162,13 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* var errCh = make(chan error, len(bookings)) var m sync.Mutex + for _, purchase := range purchases { + go ws.CallDatacenter(purchase, purchase.DestPeerID, tools.PURCHASE_RESOURCE, request, errCh, &m) + } + errCh = make(chan error, len(bookings)) + for _, booking := range bookings { - go ws.BookExecs(booking, request, errCh, &m) + go ws.CallDatacenter(booking, booking.DestPeerID, tools.BOOKING, request, errCh, &m) } for i := 0; i < len(bookings); i++ { @@ -185,8 +190,7 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (* return ws, wf, executions, nil } -func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) { - +func (ws *WorkflowSchedule) CallDatacenter(purchase utils.DBObject, destPeerID string, dt tools.DataType, request *tools.APIRequest, errCh chan error, m *sync.Mutex) { m.Lock() c, err := getCallerCopy(request, errCh) if err != nil { @@ -194,15 +198,10 @@ func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.A return } m.Unlock() - - _, err = (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", - tools.BOOKING, tools.POST, booking.Serialize(booking), &c) - - if err != nil { + if _, err = (&peer.Peer{}).LaunchPeerExecution(destPeerID, "", dt, tools.POST, purchase.Serialize(purchase), &c); err != nil { errCh <- err return } - errCh <- nil } diff --git a/tools/enums.go b/tools/enums.go index c0f1d42..38017cb 100644 --- a/tools/enums.go +++ b/tools/enums.go @@ -37,6 +37,7 @@ var WORKFLOWAPI = "oc-workflow" var WORKSPACEAPI = "oc-workspace" var PEERSAPI = "oc-peer" var DATACENTERAPI = "oc-datacenter" +var PURCHASEAPI = "oc-catalog/purchase" var ADMIRALTY_SOURCEAPI = DATACENTERAPI + "/admiralty/source" var ADMIRALTY_TARGETAPI = DATACENTERAPI + "/admiralty/target" var ADMIRALTY_SECRETAPI = DATACENTERAPI + "/admiralty/secret" @@ -61,7 +62,7 @@ var DefaultAPI = [...]string{ NOAPI, NOAPI, NOAPI, - NOAPI, + PURCHASEAPI, ADMIRALTY_SOURCEAPI, ADMIRALTY_TARGETAPI, ADMIRALTY_SECRETAPI,