diff --git a/entrypoint.go b/entrypoint.go index 747ef76..f36ecc2 100644 --- a/entrypoint.go +++ b/entrypoint.go @@ -261,7 +261,6 @@ func (r *Request) Schedule(wfID string, scheduler *workflow_execution.WorkflowSc if err != nil { return nil, err } - fmt.Println("BAM", ws) return ws, nil } @@ -279,19 +278,6 @@ func (r *Request) CheckBooking(wfID string, start string, end string, durationIn return ok } -func (r *Request) DraftOrder(scheduler *workflow_execution.WorkflowSchedule) (*order.Order, error) { - o := &order.Order{} - /*if err := o.DraftOrder(scheduler, &tools.APIRequest{ - Caller: r.caller, - Username: r.user, - PeerID: r.peerID, - Groups: r.groups, - }); err != nil { - return nil, err - }*/ - return o, nil -} - func (r *Request) PaymentTunnel(o *order.Order, scheduler *workflow_execution.WorkflowSchedule) error { /*return o.Pay(scheduler, &tools.APIRequest{ Caller: r.caller, diff --git a/models/bill/bill.go b/models/bill/bill.go index 1919f9f..17e8888 100644 --- a/models/bill/bill.go +++ b/models/bill/bill.go @@ -1,19 +1,16 @@ package bill import ( - "errors" - "fmt" "sync" "time" "cloud.o-forge.io/core/oc-lib/dbs" - "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/pricing" + "cloud.o-forge.io/core/oc-lib/models/order" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource" "cloud.o-forge.io/core/oc-lib/models/utils" - "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" ) @@ -23,13 +20,10 @@ import ( type Bill struct { utils.AbstractObject - OrderID string `json:"order_id" bson:"order_id" validate:"required"` - OrderBy string `json:"order_by" bson:"order_by" validate:"required"` - WorkflowID string `json:"workflow_id" bson:"workflow_id" validate:"required"` - WorkflowExecutionIDs []string `json:"workflow_execution_ids" bson:"workflow_execution_ids" validate:"required"` - Status enum.CompletionStatus `json:"status" bson:"status" default:"0"` - SubOrders map[string]*PeerOrder `json:"sub_orders" bson:"sub_orders"` - Total float64 `json:"total" bson:"total" validate:"required"` + OrderID string `json:"order_id" bson:"order_id" validate:"required"` + Status enum.CompletionStatus `json:"status" bson:"status" default:"0"` + SubOrders map[string]*PeerOrder `json:"sub_orders" bson:"sub_orders"` + Total float64 `json:"total" bson:"total" validate:"required"` } func (r *Bill) StoreDraftDefault() { @@ -47,210 +41,78 @@ func (r *Bill) CanDelete() bool { return r.IsDraft // only draft order can be deleted } -func (o *Bill) DraftOrder(scheduler *workflow_execution.WorkflowSchedule, request *tools.APIRequest) error { - // set the draft order from the model - if err := o.draftStoreFromModel(scheduler, request); err != nil { - return err - } - return nil -} - -func (o *Bill) Pay(scheduler *workflow_execution.WorkflowSchedule, request *tools.APIRequest) error { - if _, err := o.draftBookOrder(scheduler, request); err != nil { - return err - } - o.Status = enum.PENDING - _, code, err := o.GetAccessor(request).UpdateOne(o, o.GetID()) - if code != 200 || err != nil { - return errors.New("could not update the order" + fmt.Sprintf("%v", err)) - } - if err := o.pay(request); err != nil { // pay the order - return err - } else { - o.IsDraft = false - } - for _, exec := range scheduler.WorkflowExecution { - exec.IsDraft = false - _, code, err := utils.GenericUpdateOne(exec, exec.GetID(), - workflow_execution.NewAccessor(request), &workflow_execution.WorkflowExecution{}) - if code != 200 || err != nil { - return errors.New("could not update the workflow execution" + fmt.Sprintf("%v", err)) +func DraftBill(order *order.Order, request *tools.APIRequest) (*Bill, error) { + peers := map[string][]*PeerItemOrder{} + for _, p := range order.Purchases { + // TODO : if once + if _, ok := peers[p.DestPeerID]; !ok { + peers[p.DestPeerID] = []*PeerItemOrder{} } + peers[p.DestPeerID] = append(peers[p.DestPeerID], &PeerItemOrder{ + Purchase: p, + Item: p.PricedItem, + }) } - _, code, err = o.GetAccessor(request).UpdateOne(o, o.GetID()) - if code != 200 || err != nil { - return errors.New("could not update the order" + fmt.Sprintf("%v", err)) - } - /* - TODO : TEMPORARY SET BOOKINGS TO UNDRAFT TO AVOID DELETION - BUT NEXT ONLY WHO IS PAYED WILL BE ALLOWED TO CHANGE IT - */ - return nil -} - -func (o *Bill) draftStoreFromModel(scheduler *workflow_execution.WorkflowSchedule, request *tools.APIRequest) error { - if request == nil { - return errors.New("no request found") - } - fmt.Println("Drafting order", scheduler.Workflow) - if scheduler.Workflow == nil || scheduler.Workflow.Graph == nil { // if the workflow has no graph, return an error - return errors.New("no graph found") - } - o.SetName() - o.WorkflowID = scheduler.Workflow.GetID() - o.IsDraft = true - o.OrderBy = request.PeerID - o.WorkflowExecutionIDs = []string{} // create an array of ids - for _, exec := range scheduler.WorkflowExecution { - o.WorkflowExecutionIDs = append(o.WorkflowExecutionIDs, exec.GetID()) - } - // set the name of the order - resourcesByPeer := map[string][]pricing.PricedItemITF{} // create a map of resources by peer - - processings := scheduler.Workflow.GetPricedItem(scheduler.Workflow.Graph.IsProcessing, request, - int(scheduler.SelectedBuyingStrategy), scheduler.SelectedPricingStrategy) // get the processing items - datas := scheduler.Workflow.GetPricedItem(scheduler.Workflow.Graph.IsData, request, - int(scheduler.SelectedBuyingStrategy), scheduler.SelectedPricingStrategy) // get the data items - storages := scheduler.Workflow.GetPricedItem(scheduler.Workflow.Graph.IsStorage, request, - int(scheduler.SelectedBuyingStrategy), scheduler.SelectedPricingStrategy) // get the storage items - workflows := scheduler.Workflow.GetPricedItem(scheduler.Workflow.Graph.IsWorkflow, request, - int(scheduler.SelectedBuyingStrategy), scheduler.SelectedPricingStrategy) // get the workflow items - for _, items := range []map[string]pricing.PricedItemITF{processings, datas, storages, workflows} { - for _, item := range items { - if _, ok := resourcesByPeer[item.GetCreatorID()]; !ok { - resourcesByPeer[item.GetCreatorID()] = []pricing.PricedItemITF{} + for _, b := range order.Bookings { + // TODO : if once + isPurchased := false + for _, p := range order.Purchases { + if p.ResourceID == b.ResourceID { + isPurchased = true + break } - resourcesByPeer[item.GetCreatorID()] = append(resourcesByPeer[item.GetCreatorID()], item) } - } - for peerID, resources := range resourcesByPeer { - peerOrder := &PeerOrder{ - Status: enum.DRAFTED, - PeerID: peerID, + if isPurchased { + continue } - peerOrder.GenerateID() - for _, resource := range resources { - peerOrder.AddItem(resource, len(resources)) // TODO SPECIALS REF ADDITIONALS NOTES + if _, ok := peers[b.DestPeerID]; !ok { + peers[b.DestPeerID] = []*PeerItemOrder{} } - if o.SubOrders == nil { - o.SubOrders = map[string]*PeerOrder{} - } - o.SubOrders[peerOrder.GetID()] = peerOrder + peers[b.DestPeerID] = append(peers[b.DestPeerID], &PeerItemOrder{ + Item: b.PricedItem, + }) } - // search an order with same user name and same session id - err := o.SumUpBill(request) - if err != nil { - return err - } - // should store the order - res, code, err := o.GetAccessor(request).Search(&dbs.Filters{ - And: map[string][]dbs.Filter{ - "workflow_id": {{Operator: dbs.EQUAL.String(), Value: o.WorkflowID}}, - "order_by": {{Operator: dbs.EQUAL.String(), Value: request.PeerID}}, - }, - }, "", o.IsDraft) - if code != 200 || err != nil { - return errors.New("could not search the order" + fmt.Sprintf("%v", err)) - } - if len(res) > 0 { - _, code, err := utils.GenericUpdateOne(o, res[0].GetID(), o.GetAccessor(request), o) - if code != 200 || err != nil { - return errors.New("could not update the order" + fmt.Sprintf("%v", err)) - } - } else { - _, code, err := utils.GenericStoreOne(o, o.GetAccessor(request)) - if code != 200 || err != nil { - return errors.New("could not store the order" + fmt.Sprintf("%v", err)) - } - } - return nil -} - -func (o *Bill) draftBookOrder(scheduler *workflow_execution.WorkflowSchedule, request *tools.APIRequest) ([]*booking.Booking, error) { - draftedBookings := []*booking.Booking{} - if request == nil { - return draftedBookings, errors.New("no request found") - } - for _, exec := range scheduler.WorkflowExecution { - _, priceds, _, err := scheduler.Workflow.Planify(exec.ExecDate, exec.EndDate, request, - int(scheduler.SelectedBuyingStrategy), scheduler.SelectedPricingStrategy) + peerOrders := map[string]*PeerOrder{} + for peerID, items := range peers { + pr, _, err := peer.NewAccessor(request).LoadOne(peerID) if err != nil { - return draftedBookings, errors.New("could not planify the workflow" + fmt.Sprintf("%v", err)) + return nil, err } - bookings := exec.Book(scheduler.UUID, scheduler.Workflow.UUID, priceds) - for _, booking := range bookings { - _, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", - tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) - if err != nil { - return draftedBookings, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err)) - } - draftedBookings = append(draftedBookings, booking) + pp := pr.(*peer.Peer) + peerOrders[peerID] = &PeerOrder{ + PeerID: peerID, + BillingAddress: pp.WalletAddress, + Items: items, } } - return draftedBookings, nil -} - -func (o *Bill) Quantity() int { - return len(o.WorkflowExecutionIDs) -} - -func (d *Bill) SetName() { - d.Name = d.UUID + "_order_" + "_" + time.Now().UTC().Format("2006-01-02T15:04:05") + bill := &Bill{ + AbstractObject: utils.AbstractObject{ + Name: "bill_" + request.PeerID + "_" + time.Now().UTC().Format("2006-01-02T15:04:05"), + IsDraft: true, + }, + OrderID: order.UUID, + Status: enum.PENDING, + SubOrders: peerOrders, + } + return bill.SumUpBill(request) } func (d *Bill) GetAccessor(request *tools.APIRequest) utils.Accessor { return NewAccessor(request) // Create a new instance of the accessor } -func (d *Bill) SumUpBill(request *tools.APIRequest) error { +func (d *Bill) SumUpBill(request *tools.APIRequest) (*Bill, error) { for _, b := range d.SubOrders { err := b.SumUpBill(request) if err != nil { - return err + return d, err } d.Total += b.Total } - return nil -} - -// TO FINISH -func (d *Bill) pay(request *tools.APIRequest) error { - responses := make(chan *PeerOrder, len(d.SubOrders)) - var wg *sync.WaitGroup - wg.Add(len(d.SubOrders)) - for _, b := range d.SubOrders { - go b.Pay(request, responses, wg) - } - wg.Wait() - errs := "" - gotAnUnpaid := false - count := 0 - for range responses { - res := <-responses - count++ - if res != nil { - if res.Error != "" { - errs += res.Error - } - if res.Status != enum.PAID { - gotAnUnpaid = true - } - d.Status = enum.PARTIAL - d.SubOrders[res.GetID()] = res - if count == len(d.SubOrders) && !gotAnUnpaid { - d.Status = enum.PAID - } - } - } - - if errs != "" { - return errors.New(errs) - } - return nil + return d, nil } type PeerOrder struct { - utils.AbstractObject Error string `json:"error,omitempty" bson:"error,omitempty"` PeerID string `json:"peer_id,omitempty" bson:"peer_id,omitempty"` Status enum.CompletionStatus `json:"status" bson:"status" default:"0"` @@ -303,14 +165,10 @@ func (d *PeerOrder) AddItem(item pricing.PricedItemITF, quantity int) { }) } -func (d *PeerOrder) SetName() { - d.Name = d.UUID + "_order_" + d.PeerID + "_" + time.Now().UTC().Format("2006-01-02T15:04:05") -} - type PeerItemOrder struct { - Quantity int `json:"quantity,omitempty" bson:"quantity,omitempty"` - Purchase purchase_resource.PurchaseResource `json:"purchase,omitempty" bson:"purchase,omitempty"` - Item pricing.PricedItemITF `json:"item,omitempty" bson:"item,omitempty"` + Quantity int `json:"quantity,omitempty" bson:"quantity,omitempty"` + Purchase *purchase_resource.PurchaseResource `json:"purchase,omitempty" bson:"purchase,omitempty"` + Item pricing.PricedItemITF `json:"item,omitempty" bson:"item,omitempty"` } func (d *PeerItemOrder) GetPrice(request *tools.APIRequest) (float64, error) { diff --git a/models/booking/booking.go b/models/booking/booking.go index 74e9ffe..bdc031b 100644 --- a/models/booking/booking.go +++ b/models/booking/booking.go @@ -6,6 +6,7 @@ import ( "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/models" + "cloud.o-forge.io/core/oc-lib/models/common/pricing" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" "go.mongodb.org/mongo-driver/bson/primitive" @@ -15,7 +16,8 @@ import ( * Booking is a struct that represents a booking */ type Booking struct { - utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) + utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) + PricedItem pricing.PricedItemITF `json:"priced_item,omitempty" bson:"priced_item,omitempty" validate:"required"` ResumeMetrics map[string]map[string]models.MetricResume `json:"resume_metrics,omitempty" bson:"resume_metrics,omitempty"` ExecutionMetrics map[string][]models.MetricsSnapshot `json:"metrics,omitempty" bson:"metrics,omitempty"` diff --git a/models/order/order.go b/models/order/order.go index 655c6a9..574c91d 100644 --- a/models/order/order.go +++ b/models/order/order.go @@ -17,8 +17,6 @@ import ( type Order struct { utils.AbstractObject ExecutionsID string `json:"executions_id" bson:"executions_id" validate:"required"` - OrderBy string `json:"order_by" bson:"order_by" validate:"required"` - WorkflowID string `json:"workflow_id" bson:"workflow_id" validate:"required"` Purchases []*purchase_resource.PurchaseResource `json:"purchases" bson:"purchases"` Bookings []*booking.Booking `json:"bookings" bson:"bookings"` Status enum.CompletionStatus `json:"status" bson:"status" default:"0"` diff --git a/models/resources/purchase_resource/purchase_resource.go b/models/resources/purchase_resource/purchase_resource.go index a7496f8..ca528ae 100644 --- a/models/resources/purchase_resource/purchase_resource.go +++ b/models/resources/purchase_resource/purchase_resource.go @@ -3,6 +3,7 @@ package purchase_resource import ( "time" + "cloud.o-forge.io/core/oc-lib/models/common/pricing" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" ) @@ -10,10 +11,11 @@ import ( type PurchaseResource struct { utils.AbstractObject DestPeerID string - ExecutionsID string `json:"executions_id,omitempty" bson:"executions_id,omitempty" validate:"required"` // ExecutionsID is the ID of the executions - EndDate *time.Time `json:"end_buying_date,omitempty" bson:"end_buying_date,omitempty"` - ResourceID string `json:"resource_id" bson:"resource_id" validate:"required"` - ResourceType tools.DataType `json:"resource_type" bson:"resource_type" validate:"required"` + PricedItem pricing.PricedItemITF `json:"priced_item,omitempty" bson:"priced_item,omitempty" validate:"required"` + ExecutionsID string `json:"executions_id,omitempty" bson:"executions_id,omitempty" validate:"required"` // ExecutionsID is the ID of the executions + EndDate *time.Time `json:"end_buying_date,omitempty" bson:"end_buying_date,omitempty"` + ResourceID string `json:"resource_id" bson:"resource_id" validate:"required"` + ResourceType tools.DataType `json:"resource_type" bson:"resource_type" validate:"required"` } func (d *PurchaseResource) GetAccessor(request *tools.APIRequest) utils.Accessor { diff --git a/models/workflow_execution/workflow_execution.go b/models/workflow_execution/workflow_execution.go index cb70b24..39ef327 100755 --- a/models/workflow_execution/workflow_execution.go +++ b/models/workflow_execution/workflow_execution.go @@ -125,7 +125,7 @@ func (d *WorkflowExecution) Buy(bs pricing.BillingStrategy, executionsID string, func (d *WorkflowExecution) buyEach(bs pricing.BillingStrategy, executionsID string, wfID string, dt tools.DataType, priceds map[string]pricing.PricedItemITF) []*purchase_resource.PurchaseResource { items := []*purchase_resource.PurchaseResource{} for itemID, priced := range priceds { - if !priced.IsPurchasable() && bs == pricing.BILL_ONCE { // buy only that must be buy + if !priced.IsPurchasable() || bs != pricing.BILL_ONCE { // buy only that must be buy continue } if d.PeerBuyByGraph == nil { @@ -147,6 +147,7 @@ func (d *WorkflowExecution) buyEach(bs pricing.BillingStrategy, executionsID str UUID: uuid.New().String(), Name: d.GetName() + "_" + executionsID + "_" + wfID, }, + PricedItem: priced, ExecutionsID: executionsID, DestPeerID: priced.GetCreatorID(), ResourceID: priced.GetID(), @@ -193,6 +194,7 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools. UUID: uuid.New().String(), Name: d.GetName() + "_" + executionsID + "_" + wfID, }, + PricedItem: priced, ExecutionsID: executionsID, State: enum.SCHEDULED, ResourceID: priced.GetID(), diff --git a/models/workflow_execution/workflow_scheduler.go b/models/workflow_execution/workflow_scheduler.go index e32947a..3590047 100755 --- a/models/workflow_execution/workflow_scheduler.go +++ b/models/workflow_execution/workflow_scheduler.go @@ -7,9 +7,11 @@ import ( "sync" "time" + "cloud.o-forge.io/core/oc-lib/models/bill" "cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/pricing" + "cloud.o-forge.io/core/oc-lib/models/order" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource" "cloud.o-forge.io/core/oc-lib/models/utils" @@ -88,6 +90,10 @@ func (ws *WorkflowSchedule) GetBuyAndBook(wfID string, request *tools.APIRequest bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) } + if err := ws.GenerateOrder(purchased, bookings, request); err != nil { + return false, wf, execs, purchased, bookings, err + } + errCh := make(chan error, len(bookings)) var m sync.Mutex @@ -104,8 +110,28 @@ func (ws *WorkflowSchedule) GetBuyAndBook(wfID string, request *tools.APIRequest return true, wf, execs, purchased, bookings, nil } -func getBooking(b *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) { +func (ws *WorkflowSchedule) GenerateOrder(purchases []*purchase_resource.PurchaseResource, bookings []*booking.Booking, request *tools.APIRequest) error { + newOrder := &order.Order{ + AbstractObject: utils.AbstractObject{ + Name: "order_" + request.PeerID + "_" + time.Now().UTC().Format("2006-01-02T15:04:05"), + IsDraft: true, + }, + ExecutionsID: ws.UUID, + Purchases: purchases, + Bookings: bookings, + Status: enum.PENDING, + } + if res, _, err := order.NewAccessor(request).StoreOne(newOrder); err == nil { + if _, err := bill.DraftBill(res.(*order.Order), request); err != nil { + return err + } + return nil + } else { + return err + } +} +func getBooking(b *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) { m.Lock() c, err := getCallerCopy(request, errCh) if err != nil {