add sets up

This commit is contained in:
mr 2025-06-20 10:47:33 +02:00
parent 8b38249df7
commit 181b3249b8
7 changed files with 93 additions and 219 deletions

View File

@ -261,7 +261,6 @@ func (r *Request) Schedule(wfID string, scheduler *workflow_execution.WorkflowSc
if err != nil {
return nil, err
}
fmt.Println("BAM", ws)
return ws, nil
}
@ -279,19 +278,6 @@ func (r *Request) CheckBooking(wfID string, start string, end string, durationIn
return ok
}
func (r *Request) DraftOrder(scheduler *workflow_execution.WorkflowSchedule) (*order.Order, error) {
o := &order.Order{}
/*if err := o.DraftOrder(scheduler, &tools.APIRequest{
Caller: r.caller,
Username: r.user,
PeerID: r.peerID,
Groups: r.groups,
}); err != nil {
return nil, err
}*/
return o, nil
}
func (r *Request) PaymentTunnel(o *order.Order, scheduler *workflow_execution.WorkflowSchedule) error {
/*return o.Pay(scheduler, &tools.APIRequest{
Caller: r.caller,

View File

@ -1,19 +1,16 @@
package bill
import (
"errors"
"fmt"
"sync"
"time"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
)
@ -23,13 +20,10 @@ import (
type Bill struct {
utils.AbstractObject
OrderID string `json:"order_id" bson:"order_id" validate:"required"`
OrderBy string `json:"order_by" bson:"order_by" validate:"required"`
WorkflowID string `json:"workflow_id" bson:"workflow_id" validate:"required"`
WorkflowExecutionIDs []string `json:"workflow_execution_ids" bson:"workflow_execution_ids" validate:"required"`
Status enum.CompletionStatus `json:"status" bson:"status" default:"0"`
SubOrders map[string]*PeerOrder `json:"sub_orders" bson:"sub_orders"`
Total float64 `json:"total" bson:"total" validate:"required"`
OrderID string `json:"order_id" bson:"order_id" validate:"required"`
Status enum.CompletionStatus `json:"status" bson:"status" default:"0"`
SubOrders map[string]*PeerOrder `json:"sub_orders" bson:"sub_orders"`
Total float64 `json:"total" bson:"total" validate:"required"`
}
func (r *Bill) StoreDraftDefault() {
@ -47,210 +41,78 @@ func (r *Bill) CanDelete() bool {
return r.IsDraft // only draft order can be deleted
}
func (o *Bill) DraftOrder(scheduler *workflow_execution.WorkflowSchedule, request *tools.APIRequest) error {
// set the draft order from the model
if err := o.draftStoreFromModel(scheduler, request); err != nil {
return err
}
return nil
}
func (o *Bill) Pay(scheduler *workflow_execution.WorkflowSchedule, request *tools.APIRequest) error {
if _, err := o.draftBookOrder(scheduler, request); err != nil {
return err
}
o.Status = enum.PENDING
_, code, err := o.GetAccessor(request).UpdateOne(o, o.GetID())
if code != 200 || err != nil {
return errors.New("could not update the order" + fmt.Sprintf("%v", err))
}
if err := o.pay(request); err != nil { // pay the order
return err
} else {
o.IsDraft = false
}
for _, exec := range scheduler.WorkflowExecution {
exec.IsDraft = false
_, code, err := utils.GenericUpdateOne(exec, exec.GetID(),
workflow_execution.NewAccessor(request), &workflow_execution.WorkflowExecution{})
if code != 200 || err != nil {
return errors.New("could not update the workflow execution" + fmt.Sprintf("%v", err))
func DraftBill(order *order.Order, request *tools.APIRequest) (*Bill, error) {
peers := map[string][]*PeerItemOrder{}
for _, p := range order.Purchases {
// TODO : if once
if _, ok := peers[p.DestPeerID]; !ok {
peers[p.DestPeerID] = []*PeerItemOrder{}
}
peers[p.DestPeerID] = append(peers[p.DestPeerID], &PeerItemOrder{
Purchase: p,
Item: p.PricedItem,
})
}
_, code, err = o.GetAccessor(request).UpdateOne(o, o.GetID())
if code != 200 || err != nil {
return errors.New("could not update the order" + fmt.Sprintf("%v", err))
}
/*
TODO : TEMPORARY SET BOOKINGS TO UNDRAFT TO AVOID DELETION
BUT NEXT ONLY WHO IS PAYED WILL BE ALLOWED TO CHANGE IT
*/
return nil
}
func (o *Bill) draftStoreFromModel(scheduler *workflow_execution.WorkflowSchedule, request *tools.APIRequest) error {
if request == nil {
return errors.New("no request found")
}
fmt.Println("Drafting order", scheduler.Workflow)
if scheduler.Workflow == nil || scheduler.Workflow.Graph == nil { // if the workflow has no graph, return an error
return errors.New("no graph found")
}
o.SetName()
o.WorkflowID = scheduler.Workflow.GetID()
o.IsDraft = true
o.OrderBy = request.PeerID
o.WorkflowExecutionIDs = []string{} // create an array of ids
for _, exec := range scheduler.WorkflowExecution {
o.WorkflowExecutionIDs = append(o.WorkflowExecutionIDs, exec.GetID())
}
// set the name of the order
resourcesByPeer := map[string][]pricing.PricedItemITF{} // create a map of resources by peer
processings := scheduler.Workflow.GetPricedItem(scheduler.Workflow.Graph.IsProcessing, request,
int(scheduler.SelectedBuyingStrategy), scheduler.SelectedPricingStrategy) // get the processing items
datas := scheduler.Workflow.GetPricedItem(scheduler.Workflow.Graph.IsData, request,
int(scheduler.SelectedBuyingStrategy), scheduler.SelectedPricingStrategy) // get the data items
storages := scheduler.Workflow.GetPricedItem(scheduler.Workflow.Graph.IsStorage, request,
int(scheduler.SelectedBuyingStrategy), scheduler.SelectedPricingStrategy) // get the storage items
workflows := scheduler.Workflow.GetPricedItem(scheduler.Workflow.Graph.IsWorkflow, request,
int(scheduler.SelectedBuyingStrategy), scheduler.SelectedPricingStrategy) // get the workflow items
for _, items := range []map[string]pricing.PricedItemITF{processings, datas, storages, workflows} {
for _, item := range items {
if _, ok := resourcesByPeer[item.GetCreatorID()]; !ok {
resourcesByPeer[item.GetCreatorID()] = []pricing.PricedItemITF{}
for _, b := range order.Bookings {
// TODO : if once
isPurchased := false
for _, p := range order.Purchases {
if p.ResourceID == b.ResourceID {
isPurchased = true
break
}
resourcesByPeer[item.GetCreatorID()] = append(resourcesByPeer[item.GetCreatorID()], item)
}
}
for peerID, resources := range resourcesByPeer {
peerOrder := &PeerOrder{
Status: enum.DRAFTED,
PeerID: peerID,
if isPurchased {
continue
}
peerOrder.GenerateID()
for _, resource := range resources {
peerOrder.AddItem(resource, len(resources)) // TODO SPECIALS REF ADDITIONALS NOTES
if _, ok := peers[b.DestPeerID]; !ok {
peers[b.DestPeerID] = []*PeerItemOrder{}
}
if o.SubOrders == nil {
o.SubOrders = map[string]*PeerOrder{}
}
o.SubOrders[peerOrder.GetID()] = peerOrder
peers[b.DestPeerID] = append(peers[b.DestPeerID], &PeerItemOrder{
Item: b.PricedItem,
})
}
// search an order with same user name and same session id
err := o.SumUpBill(request)
if err != nil {
return err
}
// should store the order
res, code, err := o.GetAccessor(request).Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"workflow_id": {{Operator: dbs.EQUAL.String(), Value: o.WorkflowID}},
"order_by": {{Operator: dbs.EQUAL.String(), Value: request.PeerID}},
},
}, "", o.IsDraft)
if code != 200 || err != nil {
return errors.New("could not search the order" + fmt.Sprintf("%v", err))
}
if len(res) > 0 {
_, code, err := utils.GenericUpdateOne(o, res[0].GetID(), o.GetAccessor(request), o)
if code != 200 || err != nil {
return errors.New("could not update the order" + fmt.Sprintf("%v", err))
}
} else {
_, code, err := utils.GenericStoreOne(o, o.GetAccessor(request))
if code != 200 || err != nil {
return errors.New("could not store the order" + fmt.Sprintf("%v", err))
}
}
return nil
}
func (o *Bill) draftBookOrder(scheduler *workflow_execution.WorkflowSchedule, request *tools.APIRequest) ([]*booking.Booking, error) {
draftedBookings := []*booking.Booking{}
if request == nil {
return draftedBookings, errors.New("no request found")
}
for _, exec := range scheduler.WorkflowExecution {
_, priceds, _, err := scheduler.Workflow.Planify(exec.ExecDate, exec.EndDate, request,
int(scheduler.SelectedBuyingStrategy), scheduler.SelectedPricingStrategy)
peerOrders := map[string]*PeerOrder{}
for peerID, items := range peers {
pr, _, err := peer.NewAccessor(request).LoadOne(peerID)
if err != nil {
return draftedBookings, errors.New("could not planify the workflow" + fmt.Sprintf("%v", err))
return nil, err
}
bookings := exec.Book(scheduler.UUID, scheduler.Workflow.UUID, priceds)
for _, booking := range bookings {
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)
if err != nil {
return draftedBookings, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
}
draftedBookings = append(draftedBookings, booking)
pp := pr.(*peer.Peer)
peerOrders[peerID] = &PeerOrder{
PeerID: peerID,
BillingAddress: pp.WalletAddress,
Items: items,
}
}
return draftedBookings, nil
}
func (o *Bill) Quantity() int {
return len(o.WorkflowExecutionIDs)
}
func (d *Bill) SetName() {
d.Name = d.UUID + "_order_" + "_" + time.Now().UTC().Format("2006-01-02T15:04:05")
bill := &Bill{
AbstractObject: utils.AbstractObject{
Name: "bill_" + request.PeerID + "_" + time.Now().UTC().Format("2006-01-02T15:04:05"),
IsDraft: true,
},
OrderID: order.UUID,
Status: enum.PENDING,
SubOrders: peerOrders,
}
return bill.SumUpBill(request)
}
func (d *Bill) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request) // Create a new instance of the accessor
}
func (d *Bill) SumUpBill(request *tools.APIRequest) error {
func (d *Bill) SumUpBill(request *tools.APIRequest) (*Bill, error) {
for _, b := range d.SubOrders {
err := b.SumUpBill(request)
if err != nil {
return err
return d, err
}
d.Total += b.Total
}
return nil
}
// TO FINISH
func (d *Bill) pay(request *tools.APIRequest) error {
responses := make(chan *PeerOrder, len(d.SubOrders))
var wg *sync.WaitGroup
wg.Add(len(d.SubOrders))
for _, b := range d.SubOrders {
go b.Pay(request, responses, wg)
}
wg.Wait()
errs := ""
gotAnUnpaid := false
count := 0
for range responses {
res := <-responses
count++
if res != nil {
if res.Error != "" {
errs += res.Error
}
if res.Status != enum.PAID {
gotAnUnpaid = true
}
d.Status = enum.PARTIAL
d.SubOrders[res.GetID()] = res
if count == len(d.SubOrders) && !gotAnUnpaid {
d.Status = enum.PAID
}
}
}
if errs != "" {
return errors.New(errs)
}
return nil
return d, nil
}
type PeerOrder struct {
utils.AbstractObject
Error string `json:"error,omitempty" bson:"error,omitempty"`
PeerID string `json:"peer_id,omitempty" bson:"peer_id,omitempty"`
Status enum.CompletionStatus `json:"status" bson:"status" default:"0"`
@ -303,14 +165,10 @@ func (d *PeerOrder) AddItem(item pricing.PricedItemITF, quantity int) {
})
}
func (d *PeerOrder) SetName() {
d.Name = d.UUID + "_order_" + d.PeerID + "_" + time.Now().UTC().Format("2006-01-02T15:04:05")
}
type PeerItemOrder struct {
Quantity int `json:"quantity,omitempty" bson:"quantity,omitempty"`
Purchase purchase_resource.PurchaseResource `json:"purchase,omitempty" bson:"purchase,omitempty"`
Item pricing.PricedItemITF `json:"item,omitempty" bson:"item,omitempty"`
Quantity int `json:"quantity,omitempty" bson:"quantity,omitempty"`
Purchase *purchase_resource.PurchaseResource `json:"purchase,omitempty" bson:"purchase,omitempty"`
Item pricing.PricedItemITF `json:"item,omitempty" bson:"item,omitempty"`
}
func (d *PeerItemOrder) GetPrice(request *tools.APIRequest) (float64, error) {

View File

@ -6,6 +6,7 @@ import (
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"go.mongodb.org/mongo-driver/bson/primitive"
@ -15,7 +16,8 @@ import (
* Booking is a struct that represents a booking
*/
type Booking struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
PricedItem pricing.PricedItemITF `json:"priced_item,omitempty" bson:"priced_item,omitempty" validate:"required"`
ResumeMetrics map[string]map[string]models.MetricResume `json:"resume_metrics,omitempty" bson:"resume_metrics,omitempty"`
ExecutionMetrics map[string][]models.MetricsSnapshot `json:"metrics,omitempty" bson:"metrics,omitempty"`

View File

@ -17,8 +17,6 @@ import (
type Order struct {
utils.AbstractObject
ExecutionsID string `json:"executions_id" bson:"executions_id" validate:"required"`
OrderBy string `json:"order_by" bson:"order_by" validate:"required"`
WorkflowID string `json:"workflow_id" bson:"workflow_id" validate:"required"`
Purchases []*purchase_resource.PurchaseResource `json:"purchases" bson:"purchases"`
Bookings []*booking.Booking `json:"bookings" bson:"bookings"`
Status enum.CompletionStatus `json:"status" bson:"status" default:"0"`

View File

@ -3,6 +3,7 @@ package purchase_resource
import (
"time"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
@ -10,10 +11,11 @@ import (
type PurchaseResource struct {
utils.AbstractObject
DestPeerID string
ExecutionsID string `json:"executions_id,omitempty" bson:"executions_id,omitempty" validate:"required"` // ExecutionsID is the ID of the executions
EndDate *time.Time `json:"end_buying_date,omitempty" bson:"end_buying_date,omitempty"`
ResourceID string `json:"resource_id" bson:"resource_id" validate:"required"`
ResourceType tools.DataType `json:"resource_type" bson:"resource_type" validate:"required"`
PricedItem pricing.PricedItemITF `json:"priced_item,omitempty" bson:"priced_item,omitempty" validate:"required"`
ExecutionsID string `json:"executions_id,omitempty" bson:"executions_id,omitempty" validate:"required"` // ExecutionsID is the ID of the executions
EndDate *time.Time `json:"end_buying_date,omitempty" bson:"end_buying_date,omitempty"`
ResourceID string `json:"resource_id" bson:"resource_id" validate:"required"`
ResourceType tools.DataType `json:"resource_type" bson:"resource_type" validate:"required"`
}
func (d *PurchaseResource) GetAccessor(request *tools.APIRequest) utils.Accessor {

View File

@ -125,7 +125,7 @@ func (d *WorkflowExecution) Buy(bs pricing.BillingStrategy, executionsID string,
func (d *WorkflowExecution) buyEach(bs pricing.BillingStrategy, executionsID string, wfID string, dt tools.DataType, priceds map[string]pricing.PricedItemITF) []*purchase_resource.PurchaseResource {
items := []*purchase_resource.PurchaseResource{}
for itemID, priced := range priceds {
if !priced.IsPurchasable() && bs == pricing.BILL_ONCE { // buy only that must be buy
if !priced.IsPurchasable() || bs != pricing.BILL_ONCE { // buy only that must be buy
continue
}
if d.PeerBuyByGraph == nil {
@ -147,6 +147,7 @@ func (d *WorkflowExecution) buyEach(bs pricing.BillingStrategy, executionsID str
UUID: uuid.New().String(),
Name: d.GetName() + "_" + executionsID + "_" + wfID,
},
PricedItem: priced,
ExecutionsID: executionsID,
DestPeerID: priced.GetCreatorID(),
ResourceID: priced.GetID(),
@ -193,6 +194,7 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.
UUID: uuid.New().String(),
Name: d.GetName() + "_" + executionsID + "_" + wfID,
},
PricedItem: priced,
ExecutionsID: executionsID,
State: enum.SCHEDULED,
ResourceID: priced.GetID(),

View File

@ -7,9 +7,11 @@ import (
"sync"
"time"
"cloud.o-forge.io/core/oc-lib/models/bill"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/models/utils"
@ -88,6 +90,10 @@ func (ws *WorkflowSchedule) GetBuyAndBook(wfID string, request *tools.APIRequest
bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...)
}
if err := ws.GenerateOrder(purchased, bookings, request); err != nil {
return false, wf, execs, purchased, bookings, err
}
errCh := make(chan error, len(bookings))
var m sync.Mutex
@ -104,8 +110,28 @@ func (ws *WorkflowSchedule) GetBuyAndBook(wfID string, request *tools.APIRequest
return true, wf, execs, purchased, bookings, nil
}
func getBooking(b *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
func (ws *WorkflowSchedule) GenerateOrder(purchases []*purchase_resource.PurchaseResource, bookings []*booking.Booking, request *tools.APIRequest) error {
newOrder := &order.Order{
AbstractObject: utils.AbstractObject{
Name: "order_" + request.PeerID + "_" + time.Now().UTC().Format("2006-01-02T15:04:05"),
IsDraft: true,
},
ExecutionsID: ws.UUID,
Purchases: purchases,
Bookings: bookings,
Status: enum.PENDING,
}
if res, _, err := order.NewAccessor(request).StoreOne(newOrder); err == nil {
if _, err := bill.DraftBill(res.(*order.Order), request); err != nil {
return err
}
return nil
} else {
return err
}
}
func getBooking(b *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
m.Lock()
c, err := getCallerCopy(request, errCh)
if err != nil {