Order Flow Payment Draft

This commit is contained in:
mr
2025-01-13 11:24:07 +01:00
parent be3b09b683
commit 21a7ff9010
22 changed files with 436 additions and 490 deletions

View File

@@ -7,9 +7,8 @@ import (
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
"go.mongodb.org/mongo-driver/bson/primitive"
@@ -105,28 +104,25 @@ func (d *WorkflowExecutions) VerifyAuth(request *tools.APIRequest) bool {
return true
}
func (d *WorkflowExecutions) Book(wf *workflow.Workflow) []*booking.Booking {
booking := []*booking.Booking{}
for _, p := range wf.ProcessingResources {
booking = append(booking, d.toItemBooking(wf.GetByRelatedProcessing(p.GetID(), wf.IsStorage))...)
booking = append(booking, d.toItemBooking(wf.GetByRelatedProcessing(p.GetID(), wf.IsProcessing))...)
}
func (d *WorkflowExecutions) Book(priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) []*booking.Booking {
booking := d.bookEach(tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE], request)
booking = append(booking, d.bookEach(tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE], request)...)
return booking
}
func (d *WorkflowExecutions) toItemBooking(ss []resources.ShallowResourceInterface) []*booking.Booking {
func (d *WorkflowExecutions) bookEach(dt tools.DataType, priceds []pricing.PricedItemITF, request *tools.APIRequest) []*booking.Booking {
items := []*booking.Booking{}
for _, s := range ss {
for _, priced := range priceds {
start := d.ExecDate
if s := s.GetLocationStart(); s != nil {
if s := priced.GetLocationStart(); s != nil {
start = *s
}
end := start.Add(time.Duration(s.GetExplicitDurationInS()) * time.Second)
end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second)
bookingItem := &booking.Booking{
State: common.DRAFT,
ResourceID: s.GetID(),
ResourceType: s.GetType(),
DestPeerID: s.GetCreatorID(),
ResourceID: priced.GetID(),
ResourceType: dt,
DestPeerID: priced.GetCreatorID(),
ExpectedStartDate: start,
ExpectedEndDate: &end,
}

View File

@@ -8,10 +8,8 @@ import (
"cloud.o-forge.io/core/oc-lib/models/common"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/robfig/cron"
)
@@ -49,8 +47,8 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W
return ws
}
func (ws *WorkflowSchedule) CheckBooking(wfID string, caller *tools.HTTPCaller) (bool, *workflow.Workflow, []*WorkflowExecutions, error) {
if caller == nil && caller.URLS == nil && caller.URLS[tools.BOOKING] == nil || caller.URLS[tools.BOOKING][tools.POST] == "" {
func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecutions, error) {
if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.POST] == "" {
return false, nil, []*WorkflowExecutions{}, errors.New("no caller defined")
}
access := workflow.NewAccessor(nil)
@@ -59,13 +57,13 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, caller *tools.HTTPCaller)
return false, nil, []*WorkflowExecutions{}, errors.New("could not load the workflow with id: " + err.Error())
}
wf := res.(*workflow.Workflow)
wf, err = ws.planifyWorkflow(wf)
longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request)
if err != nil {
return false, wf, []*WorkflowExecutions{}, err
}
ws.DurationS = wf.GetLongestTime(ws.End)
ws.DurationS = longest
ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + "seconds."
if ws.End != nil && ws.Start.Add(time.Duration(wf.GetLongestTime(ws.End))*time.Second).After(*ws.End) {
if ws.End != nil && ws.Start.Add(time.Duration(longest)*time.Second).After(*ws.End) {
ws.Warning = "The workflow may be too long to be executed in the given time frame, we will try to book it anyway\n"
}
execs, err := ws.getExecutions(wf)
@@ -73,10 +71,10 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, caller *tools.HTTPCaller)
return false, wf, []*WorkflowExecutions{}, err
}
for _, exec := range execs {
bookings := exec.Book(wf)
bookings := exec.Book(priceds)
for _, booking := range bookings {
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POSTCHECK, booking.Serialize(booking), caller)
tools.BOOKING, tools.POSTCHECK, booking.Serialize(booking), request.Caller)
if err != nil {
return false, wf, execs, err
}
@@ -97,7 +95,7 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
if _, ok := methods[tools.POST]; !ok {
return nil, []*WorkflowExecutions{}, errors.New("no path found")
}
ok, wf, executions, err := ws.CheckBooking(wfID, request.Caller)
ok, wf, executions, err := ws.CheckBooking(wfID, request)
if !ok || err != nil {
return nil, []*WorkflowExecutions{}, errors.New("could not book the workflow" + fmt.Sprintf("%v", err))
}
@@ -117,57 +115,6 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
return wf, executions, nil
}
func (ws *WorkflowSchedule) planifyWorkflow(wf *workflow.Workflow) (*workflow.Workflow, error) {
processings := []*resources.CustomizedProcessingResource{}
for _, item := range wf.GetGraphItems(wf.IsProcessing) {
realItem := item.GetResource().(*resources.CustomizedProcessingResource)
timeFromStartS := wf.Graph.GetAverageTimeProcessingBeforeStart(0, realItem.GetID())
started := ws.Start.Add(time.Duration(timeFromStartS) * time.Second)
wf.Graph.SetItemStartUsage(item.ID, started)
wf.Graph.SetItemEndUsage(item.ID, started.Add(time.Duration(realItem.ExplicitBookingDurationS)))
processings = append(processings, realItem)
}
for _, item := range wf.GetGraphItems(wf.IsData) {
wf.Graph.SetItemStartUsage(item.ID, ws.Start)
wf.Graph.SetItemEndUsage(item.ID, *ws.End)
}
for _, f := range []func(graph.GraphItem) bool{wf.IsStorage, wf.IsCompute} {
for _, item := range wf.GetGraphItems(f) {
nearestStart, longestDuration := wf.Graph.GetAverageTimeRelatedToProcessingActivity(ws.Start, processings, item.GetResource(),
func(i graph.GraphItem) resources.ShallowResourceInterface {
if f(i) {
return i.GetResource()
} else {
return nil
}
})
started := ws.Start.Add(time.Duration(nearestStart) * time.Second)
wf.Graph.SetItemStartUsage(item.ID, started)
if longestDuration >= 0 {
wf.Graph.SetItemEndUsage(item.ID, started.Add(time.Duration(longestDuration)))
}
}
}
for _, item := range wf.GetGraphItems(wf.IsWorkflow) {
access := workflow.NewAccessor(nil)
res, code, err := access.LoadOne(item.GetResource().GetID())
if code != 200 || err != nil {
return nil, errors.New("could not load the workflow with id: " + fmt.Sprintf("%v", err.Error()))
}
innerWF := res.(*workflow.Workflow)
innerWF, err = ws.planifyWorkflow(innerWF)
started := ws.Start.Add(time.Duration(innerWF.GetNearestStart(ws.Start)) * time.Second)
wf.Graph.SetItemStartUsage(item.ID, started)
durationE := time.Duration(innerWF.GetLongestTime(ws.End))
if durationE < 0 {
continue
}
ended := ws.Start.Add(durationE * time.Second)
wf.Graph.SetItemEndUsage(item.ID, ended)
}
return wf, nil
}
/*
BOOKING IMPLIED TIME, not of subscription but of execution
so is processing time execution time applied on computes