add sets up

This commit is contained in:
mr 2025-06-20 08:10:52 +02:00
parent 4c2ecd3f41
commit d3cfe019e3
5 changed files with 21 additions and 19 deletions

View File

@ -266,7 +266,7 @@ func (r *Request) Schedule(wfID string, scheduler *workflow_execution.WorkflowSc
} }
func (r *Request) CheckBooking(wfID string, start string, end string, durationInS float64, cron string) bool { func (r *Request) CheckBooking(wfID string, start string, end string, durationInS float64, cron string) bool {
ok, _, _, _, err := workflow_execution.NewScheduler(start, end, durationInS, cron).CheckBooking(wfID, &tools.APIRequest{ ok, _, _, _, _, err := workflow_execution.NewScheduler(start, end, durationInS, cron).GetBuyAndBook(wfID, &tools.APIRequest{
Caller: r.caller, Caller: r.caller,
Username: r.user, Username: r.user,
PeerID: r.peerID, PeerID: r.peerID,

View File

@ -9,6 +9,7 @@ import (
type PurchaseResource struct { type PurchaseResource struct {
utils.AbstractObject utils.AbstractObject
DestPeerID string
EndDate *time.Time `json:"end_buying_date,omitempty" bson:"end_buying_date,omitempty"` EndDate *time.Time `json:"end_buying_date,omitempty" bson:"end_buying_date,omitempty"`
ResourceID string `json:"resource_id" bson:"resource_id" validate:"required"` ResourceID string `json:"resource_id" bson:"resource_id" validate:"required"`
ResourceType tools.DataType `json:"resource_type" bson:"resource_type" validate:"required"` ResourceType tools.DataType `json:"resource_type" bson:"resource_type" validate:"required"`

View File

@ -147,6 +147,7 @@ func (d *WorkflowExecution) buyEach(bs pricing.BillingStrategy, executionsID str
UUID: uuid.New().String(), UUID: uuid.New().String(),
Name: d.GetName() + "_" + executionsID + "_" + wfID, Name: d.GetName() + "_" + executionsID + "_" + wfID,
}, },
DestPeerID: priced.GetCreatorID(),
ResourceID: priced.GetID(), ResourceID: priced.GetID(),
ResourceType: dt, ResourceType: dt,
EndDate: &end, EndDate: &end,

View File

@ -58,19 +58,19 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W
return ws return ws
} }
func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) { func (ws *WorkflowSchedule) GetBuyAndBook(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*purchase_resource.PurchaseResource, []*booking.Booking, error) {
if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" {
return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined") return false, nil, []*WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, errors.New("no caller defined")
} }
access := workflow.NewAccessor(request) access := workflow.NewAccessor(request)
res, code, err := access.LoadOne(wfID) res, code, err := access.LoadOne(wfID)
if code != 200 { if code != 200 {
return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("could not load the workflow with id: " + err.Error()) return false, nil, []*WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, errors.New("could not load the workflow with id: " + err.Error())
} }
wf := res.(*workflow.Workflow) wf := res.(*workflow.Workflow)
longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request, int(ws.SelectedBuyingStrategy), ws.SelectedPricingStrategy) longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request, int(ws.SelectedBuyingStrategy), ws.SelectedPricingStrategy)
if err != nil { if err != nil {
return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err return false, wf, []*WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, err
} }
ws.DurationS = longest ws.DurationS = longest
ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds." ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds."
@ -79,7 +79,7 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
} }
execs, err := ws.GetExecutions(wf) execs, err := ws.GetExecutions(wf)
if err != nil { if err != nil {
return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err return false, wf, []*WorkflowExecution{}, []*purchase_resource.PurchaseResource{}, []*booking.Booking{}, err
} }
purchased := []*purchase_resource.PurchaseResource{} purchased := []*purchase_resource.PurchaseResource{}
bookings := []*booking.Booking{} bookings := []*booking.Booking{}
@ -97,11 +97,11 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
for i := 0; i < len(bookings); i++ { for i := 0; i < len(bookings); i++ {
if err := <-errCh; err != nil { if err := <-errCh; err != nil {
return false, wf, execs, bookings, err return false, wf, execs, purchased, bookings, err
} }
} }
return true, wf, execs, bookings, nil return true, wf, execs, purchased, bookings, nil
} }
func getBooking(b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) { func getBooking(b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) {
@ -152,7 +152,7 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
if _, ok := methods[tools.GET]; !ok { if _, ok := methods[tools.GET]; !ok {
return ws, nil, []*WorkflowExecution{}, errors.New("no path found") return ws, nil, []*WorkflowExecution{}, errors.New("no path found")
} }
ok, wf, executions, bookings, err := ws.CheckBooking(wfID, request) ok, wf, executions, purchases, bookings, err := ws.GetBuyAndBook(wfID, request)
ws.WorkflowExecution = executions ws.WorkflowExecution = executions
if !ok || err != nil { if !ok || err != nil {
return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
@ -162,8 +162,13 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
var errCh = make(chan error, len(bookings)) var errCh = make(chan error, len(bookings))
var m sync.Mutex var m sync.Mutex
for _, purchase := range purchases {
go ws.CallDatacenter(purchase, purchase.DestPeerID, tools.PURCHASE_RESOURCE, request, errCh, &m)
}
errCh = make(chan error, len(bookings))
for _, booking := range bookings { for _, booking := range bookings {
go ws.BookExecs(booking, request, errCh, &m) go ws.CallDatacenter(booking, booking.DestPeerID, tools.BOOKING, request, errCh, &m)
} }
for i := 0; i < len(bookings); i++ { for i := 0; i < len(bookings); i++ {
@ -185,8 +190,7 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
return ws, wf, executions, nil return ws, wf, executions, nil
} }
func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) { func (ws *WorkflowSchedule) CallDatacenter(purchase utils.DBObject, destPeerID string, dt tools.DataType, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
m.Lock() m.Lock()
c, err := getCallerCopy(request, errCh) c, err := getCallerCopy(request, errCh)
if err != nil { if err != nil {
@ -194,15 +198,10 @@ func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.A
return return
} }
m.Unlock() m.Unlock()
if _, err = (&peer.Peer{}).LaunchPeerExecution(destPeerID, "", dt, tools.POST, purchase.Serialize(purchase), &c); err != nil {
_, err = (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), &c)
if err != nil {
errCh <- err errCh <- err
return return
} }
errCh <- nil errCh <- nil
} }

View File

@ -37,6 +37,7 @@ var WORKFLOWAPI = "oc-workflow"
var WORKSPACEAPI = "oc-workspace" var WORKSPACEAPI = "oc-workspace"
var PEERSAPI = "oc-peer" var PEERSAPI = "oc-peer"
var DATACENTERAPI = "oc-datacenter" var DATACENTERAPI = "oc-datacenter"
var PURCHASEAPI = "oc-catalog/purchase"
var ADMIRALTY_SOURCEAPI = DATACENTERAPI + "/admiralty/source" var ADMIRALTY_SOURCEAPI = DATACENTERAPI + "/admiralty/source"
var ADMIRALTY_TARGETAPI = DATACENTERAPI + "/admiralty/target" var ADMIRALTY_TARGETAPI = DATACENTERAPI + "/admiralty/target"
var ADMIRALTY_SECRETAPI = DATACENTERAPI + "/admiralty/secret" var ADMIRALTY_SECRETAPI = DATACENTERAPI + "/admiralty/secret"
@ -61,7 +62,7 @@ var DefaultAPI = [...]string{
NOAPI, NOAPI,
NOAPI, NOAPI,
NOAPI, NOAPI,
NOAPI, PURCHASEAPI,
ADMIRALTY_SOURCEAPI, ADMIRALTY_SOURCEAPI,
ADMIRALTY_TARGETAPI, ADMIRALTY_TARGETAPI,
ADMIRALTY_SECRETAPI, ADMIRALTY_SECRETAPI,