15 Commits

Author SHA1 Message Date
mr
c53e25e69a add purchase resource in model catalog 2025-02-13 10:32:49 +01:00
mr
4aebb48e73 add purchase resource in model catalog 2025-02-13 10:32:49 +01:00
mr
c9690fc1ba add purchase resource in model catalog 2025-02-13 10:32:49 +01:00
mr
cd10104089 add purchase resource in model catalog 2025-02-13 10:32:49 +01:00
mr
f6bc19123c add purchase resource in model catalog 2025-02-13 10:32:49 +01:00
mr
305e5919f9 add purchase resource in model catalog 2025-02-13 10:32:49 +01:00
mr
2ee2320a03 add purchase resource in model catalog 2025-02-13 10:32:49 +01:00
mr
9a4cf686ca add purchase resource in model catalog 2025-02-13 10:32:49 +01:00
mr
5376cf7681 workflow execution evolved 2025-02-13 10:32:49 +01:00
mr
2f0eb9ca35 workflow execution evolved 2025-02-13 10:32:49 +01:00
mr
c1d0932301 workflow execution evolved 2025-02-13 10:32:49 +01:00
mr
82cb2aa76a workflow execution evolved 2025-02-13 10:32:49 +01:00
mr
2cbbb49ce2 workflow execution evolved 2025-02-13 10:32:49 +01:00
mr
3c615fd3bc workflow execution evolved 2025-02-13 10:32:49 +01:00
pb
91d3e5c3cd Added a nil verification to StoreOne and UpdateOne for Graph object 2025-02-13 10:32:49 +01:00
20 changed files with 124 additions and 197 deletions

View File

@@ -261,7 +261,7 @@ func (r *Request) Schedule(wfID string, scheduler *workflow_execution.WorkflowSc
} }
func (r *Request) CheckBooking(wfID string, start string, end string, durationInS float64, cron string) bool { func (r *Request) CheckBooking(wfID string, start string, end string, durationInS float64, cron string) bool {
ok, _, _, _, err := workflow_execution.NewScheduler(start, end, durationInS, cron).CheckBooking(wfID, &tools.APIRequest{ ok, _, _, err := workflow_execution.NewScheduler(start, end, durationInS, cron).CheckBooking(wfID, &tools.APIRequest{
Caller: r.caller, Caller: r.caller,
Username: r.user, Username: r.user,
PeerID: r.peerID, PeerID: r.peerID,
@@ -563,9 +563,9 @@ func (l *LibData) ToRule() *rule.Rule {
return nil return nil
} }
func (l *LibData) ToWorkflowExecution() *workflow_execution.WorkflowExecution { func (l *LibData) ToWorkflowExecution() *workflow_execution.WorkflowExecutions {
if l.Data.GetAccessor(nil).GetType() == tools.WORKFLOW_EXECUTION { if l.Data.GetAccessor(nil).GetType() == tools.WORKFLOW_EXECUTION {
return l.Data.(*workflow_execution.WorkflowExecution) return l.Data.(*workflow_execution.WorkflowExecutions)
} }
return nil return nil
} }

View File

@@ -15,7 +15,6 @@ import (
*/ */
type Booking struct { type Booking struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
ExecutionsID string `json:"executions_id,omitempty" bson:"executions_id,omitempty"` // ExecutionsID is the ID of the executions
DestPeerID string `json:"dest_peer_id,omitempty"` // DestPeerID is the ID of the destination peer DestPeerID string `json:"dest_peer_id,omitempty"` // DestPeerID is the ID of the destination peer
WorkflowID string `json:"workflow_id,omitempty" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow WorkflowID string `json:"workflow_id,omitempty" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
ExecutionID string `json:"execution_id,omitempty" bson:"execution_id,omitempty" validate:"required"` ExecutionID string `json:"execution_id,omitempty" bson:"execution_id,omitempty" validate:"required"`
@@ -81,6 +80,10 @@ func (d *Booking) GetDelayOnDuration() time.Duration {
return d.GetRealDuration() - d.GetUsualDuration() return d.GetRealDuration() - d.GetUsualDuration()
} }
func (d *Booking) GetName() string {
return d.GetID() + "_" + d.ExpectedStartDate.String()
}
func (d *Booking) GetAccessor(request *tools.APIRequest) utils.Accessor { func (d *Booking) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request) // Create a new instance of the accessor return NewAccessor(request) // Create a new instance of the accessor
} }
@@ -90,7 +93,7 @@ func (d *Booking) VerifyAuth(request *tools.APIRequest) bool {
} }
func (r *Booking) StoreDraftDefault() { func (r *Booking) StoreDraftDefault() {
r.IsDraft = false r.IsDraft = true
} }
func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) { func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {

View File

@@ -1,7 +1,6 @@
package booking package booking
import ( import (
"errors"
"time" "time"
"cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/dbs"
@@ -34,11 +33,7 @@ func (a *bookingMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error)
} }
func (a *bookingMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { func (a *bookingMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
if set.(*Booking).State == 0 { return utils.GenericUpdateOne(set, id, a, &Booking{})
return nil, 400, errors.New("state is required")
}
realSet := &Booking{State: set.(*Booking).State}
return utils.GenericUpdateOne(realSet, id, a, &Booking{})
} }
func (a *bookingMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { func (a *bookingMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {

View File

@@ -42,15 +42,13 @@ const (
S3 S3
MEMORY MEMORY
HARDWARE HARDWARE
AZURE
GCS
) )
// String() - Returns the string representation of the storage type // String() - Returns the string representation of the storage type
func (t StorageType) String() string { func (t StorageType) String() string {
return [...]string{"FILE", "STREAM", "API", "DATABASE", "S3", "MEMORY", "HARDWARE", "AZURE", "GCS"}[t] return [...]string{"FILE", "STREAM", "API", "DATABASE", "S3", "MEMORY", "HARDWARE"}[t]
} }
func TypeList() []StorageType { func TypeList() []StorageType {
return []StorageType{FILE, STREAM, API, DATABASE, S3, MEMORY, HARDWARE, AZURE, GCS} return []StorageType{FILE, STREAM, API, DATABASE, S3, MEMORY, HARDWARE}
} }

View File

@@ -7,7 +7,7 @@ import (
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
) )
func GetPlannerNearestStart(start time.Time, planned map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest) float64 { func GetPlannerNearestStart(start time.Time, planned map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 {
near := float64(10000000000) // set a high value near := float64(10000000000) // set a high value
for _, items := range planned { // loop through the planned items for _, items := range planned { // loop through the planned items
for _, priced := range items { // loop through the priced items for _, priced := range items { // loop through the priced items
@@ -23,7 +23,7 @@ func GetPlannerNearestStart(start time.Time, planned map[tools.DataType]map[stri
return near return near
} }
func GetPlannerLongestTime(end *time.Time, planned map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest) float64 { func GetPlannerLongestTime(end *time.Time, planned map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 {
if end == nil { if end == nil {
return -1 return -1
} }

View File

@@ -28,7 +28,7 @@ var models = map[string]func() utils.DBObject{
tools.STORAGE_RESOURCE.String(): func() utils.DBObject { return &resource.StorageResource{} }, tools.STORAGE_RESOURCE.String(): func() utils.DBObject { return &resource.StorageResource{} },
tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &resource.ProcessingResource{} }, tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &resource.ProcessingResource{} },
tools.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} }, tools.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} },
tools.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecution{} }, tools.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecutions{} },
tools.WORKSPACE.String(): func() utils.DBObject { return &w3.Workspace{} }, tools.WORKSPACE.String(): func() utils.DBObject { return &w3.Workspace{} },
tools.PEER.String(): func() utils.DBObject { return &peer.Peer{} }, tools.PEER.String(): func() utils.DBObject { return &peer.Peer{} },
tools.COLLABORATIVE_AREA.String(): func() utils.DBObject { return &collaborative_area.CollaborativeArea{} }, tools.COLLABORATIVE_AREA.String(): func() utils.DBObject { return &collaborative_area.CollaborativeArea{} },

View File

@@ -68,10 +68,10 @@ func (o *Order) Pay(scheduler *workflow_execution.WorkflowSchedule, request *too
} else { } else {
o.IsDraft = false o.IsDraft = false
} }
for _, exec := range scheduler.WorkflowExecution { for _, exec := range scheduler.WorkflowExecutions {
exec.IsDraft = false exec.IsDraft = false
_, code, err := utils.GenericUpdateOne(exec, exec.GetID(), _, code, err := utils.GenericUpdateOne(exec, exec.GetID(),
workflow_execution.NewAccessor(request), &workflow_execution.WorkflowExecution{}) workflow_execution.NewAccessor(request), &workflow_execution.WorkflowExecutions{})
if code != 200 || err != nil { if code != 200 || err != nil {
return errors.New("could not update the workflow execution" + fmt.Sprintf("%v", err)) return errors.New("could not update the workflow execution" + fmt.Sprintf("%v", err))
} }
@@ -100,7 +100,7 @@ func (o *Order) draftStoreFromModel(scheduler *workflow_execution.WorkflowSchedu
o.IsDraft = true o.IsDraft = true
o.OrderBy = request.PeerID o.OrderBy = request.PeerID
o.WorkflowExecutionIDs = []string{} // create an array of ids o.WorkflowExecutionIDs = []string{} // create an array of ids
for _, exec := range scheduler.WorkflowExecution { for _, exec := range scheduler.WorkflowExecutions {
o.WorkflowExecutionIDs = append(o.WorkflowExecutionIDs, exec.GetID()) o.WorkflowExecutionIDs = append(o.WorkflowExecutionIDs, exec.GetID())
} }
// set the name of the order // set the name of the order
@@ -166,12 +166,12 @@ func (o *Order) draftBookOrder(scheduler *workflow_execution.WorkflowSchedule, r
if request == nil { if request == nil {
return draftedBookings, errors.New("no request found") return draftedBookings, errors.New("no request found")
} }
for _, exec := range scheduler.WorkflowExecution { for _, exec := range scheduler.WorkflowExecutions {
_, priceds, _, err := scheduler.Workflow.Planify(exec.ExecDate, exec.EndDate, request) _, priceds, _, err := scheduler.Workflow.Planify(exec.ExecDate, exec.EndDate, request)
if err != nil { if err != nil {
return draftedBookings, errors.New("could not planify the workflow" + fmt.Sprintf("%v", err)) return draftedBookings, errors.New("could not planify the workflow" + fmt.Sprintf("%v", err))
} }
bookings := exec.Book(scheduler.UUID, scheduler.Workflow.UUID, priceds) bookings := exec.Book(scheduler.Workflow.UUID, priceds)
for _, booking := range bookings { for _, booking := range bookings {
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", _, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)

View File

@@ -17,7 +17,7 @@ import (
* it defines the resource compute * it defines the resource compute
*/ */
type ComputeResource struct { type ComputeResource struct {
AbstractInstanciatedResource[*ComputeResourceInstance] AbstractIntanciatedResource[*ComputeResourceInstance]
Architecture string `json:"architecture,omitempty" bson:"architecture,omitempty"` // Architecture is the architecture Architecture string `json:"architecture,omitempty" bson:"architecture,omitempty"` // Architecture is the architecture
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
} }
@@ -35,7 +35,7 @@ func (abs *ComputeResource) ConvertToPricedResource(
if t != tools.COMPUTE_RESOURCE { if t != tools.COMPUTE_RESOURCE {
return nil return nil
} }
p := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, request) p := abs.AbstractIntanciatedResource.ConvertToPricedResource(t, request)
priced := p.(*PricedResource) priced := p.(*PricedResource)
return &PricedComputeResource{ return &PricedComputeResource{
PricedResource: *priced, PricedResource: *priced,
@@ -52,7 +52,6 @@ type ComputeNode struct {
type ComputeResourceInstance struct { type ComputeResourceInstance struct {
ResourceInstance[*ComputeResourcePartnership] ResourceInstance[*ComputeResourcePartnership]
Source string `json:"source,omitempty" bson:"source,omitempty"` // Source is the source of the resource
SecurityLevel string `json:"security_level,omitempty" bson:"security_level,omitempty"` SecurityLevel string `json:"security_level,omitempty" bson:"security_level,omitempty"`
PowerSources []string `json:"power_sources,omitempty" bson:"power_sources,omitempty"` PowerSources []string `json:"power_sources,omitempty" bson:"power_sources,omitempty"`
AnnualCO2Emissions float64 `json:"annual_co2_emissions,omitempty" bson:"co2_emissions,omitempty"` AnnualCO2Emissions float64 `json:"annual_co2_emissions,omitempty" bson:"co2_emissions,omitempty"`

View File

@@ -16,7 +16,7 @@ import (
* it defines the resource data * it defines the resource data
*/ */
type DataResource struct { type DataResource struct {
AbstractInstanciatedResource[*DataInstance] AbstractIntanciatedResource[*DataInstance]
Type string `bson:"type,omitempty" json:"type,omitempty"` Type string `bson:"type,omitempty" json:"type,omitempty"`
Quality string `bson:"quality,omitempty" json:"quality,omitempty"` Quality string `bson:"quality,omitempty" json:"quality,omitempty"`
OpenData bool `bson:"open_data" json:"open_data" default:"false"` // Type is the type of the storage OpenData bool `bson:"open_data" json:"open_data" default:"false"` // Type is the type of the storage
@@ -42,7 +42,7 @@ func (abs *DataResource) ConvertToPricedResource(
if t != tools.DATA_RESOURCE { if t != tools.DATA_RESOURCE {
return nil return nil
} }
p := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, request) p := abs.AbstractIntanciatedResource.ConvertToPricedResource(t, request)
priced := p.(*PricedResource) priced := p.(*PricedResource)
return &PricedDataResource{ return &PricedDataResource{
PricedResource: *priced, PricedResource: *priced,

View File

@@ -12,7 +12,6 @@ type ResourceInterface interface {
ConvertToPricedResource(t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF ConvertToPricedResource(t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF
GetType() string GetType() string
GetSelectedInstance() utils.DBObject GetSelectedInstance() utils.DBObject
ClearEnv() utils.DBObject
SetAllowedInstances(request *tools.APIRequest) SetAllowedInstances(request *tools.APIRequest)
} }
@@ -21,7 +20,6 @@ type ResourceInstanceITF interface {
GetID() string GetID() string
GetName() string GetName() string
StoreDraftDefault() StoreDraftDefault()
ClearEnv()
GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF
GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string) GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string)
ClearPeerGroups() ClearPeerGroups()

View File

@@ -25,7 +25,7 @@ type ProcessingUsage struct {
* it defines the resource processing * it defines the resource processing
*/ */
type ProcessingResource struct { type ProcessingResource struct {
AbstractInstanciatedResource[*ProcessingInstance] AbstractIntanciatedResource[*ProcessingInstance]
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
IsService bool `json:"is_service,omitempty" bson:"is_service,omitempty"` // IsService is a flag that indicates if the processing is a service IsService bool `json:"is_service,omitempty" bson:"is_service,omitempty"` // IsService is a flag that indicates if the processing is a service
Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"` // Usage is the usage of the processing Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"` // Usage is the usage of the processing

View File

@@ -47,12 +47,12 @@ func (r *AbstractResource) CanDelete() bool {
return r.IsDraft // only draft bookings can be deleted return r.IsDraft // only draft bookings can be deleted
} }
type AbstractInstanciatedResource[T ResourceInstanceITF] struct { type AbstractIntanciatedResource[T ResourceInstanceITF] struct {
AbstractResource // AbstractResource contains the basic fields of an object (id, name) AbstractResource // AbstractResource contains the basic fields of an object (id, name)
Instances []T `json:"instances,omitempty" bson:"instances,omitempty"` // Bill is the bill of the resource // Bill is the bill of the resource Instances []T `json:"instances,omitempty" bson:"instances,omitempty"` // Bill is the bill of the resource // Bill is the bill of the resource
} }
func (abs *AbstractInstanciatedResource[T]) ConvertToPricedResource( func (abs *AbstractIntanciatedResource[T]) ConvertToPricedResource(
t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF { t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF {
instances := map[string]string{} instances := map[string]string{}
profiles := []pricing.PricingProfileITF{} profiles := []pricing.PricingProfileITF{}
@@ -71,14 +71,7 @@ func (abs *AbstractInstanciatedResource[T]) ConvertToPricedResource(
} }
} }
func (abs *AbstractInstanciatedResource[T]) ClearEnv() utils.DBObject { func (r *AbstractIntanciatedResource[T]) GetSelectedInstance() utils.DBObject {
for _, instance := range abs.Instances {
instance.ClearEnv()
}
return abs
}
func (r *AbstractInstanciatedResource[T]) GetSelectedInstance() utils.DBObject {
if r.SelectedInstanceIndex != nil && len(r.Instances) > *r.SelectedInstanceIndex { if r.SelectedInstanceIndex != nil && len(r.Instances) > *r.SelectedInstanceIndex {
return r.Instances[*r.SelectedInstanceIndex] return r.Instances[*r.SelectedInstanceIndex]
} }
@@ -88,11 +81,11 @@ func (r *AbstractInstanciatedResource[T]) GetSelectedInstance() utils.DBObject {
return nil return nil
} }
func (abs *AbstractInstanciatedResource[T]) SetAllowedInstances(request *tools.APIRequest) { func (abs *AbstractIntanciatedResource[T]) SetAllowedInstances(request *tools.APIRequest) {
abs.Instances = verifyAuthAction[T](abs.Instances, request) abs.Instances = verifyAuthAction[T](abs.Instances, request)
} }
func (d *AbstractInstanciatedResource[T]) Trim() { func (d *AbstractIntanciatedResource[T]) Trim() {
d.Type = d.GetType() d.Type = d.GetType()
if ok, _ := (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: d.CreatorID}}).IsMySelf(); !ok { if ok, _ := (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: d.CreatorID}}).IsMySelf(); !ok {
for _, instance := range d.Instances { for _, instance := range d.Instances {
@@ -101,7 +94,7 @@ func (d *AbstractInstanciatedResource[T]) Trim() {
} }
} }
func (abs *AbstractInstanciatedResource[T]) VerifyAuth(request *tools.APIRequest) bool { func (abs *AbstractIntanciatedResource[T]) VerifyAuth(request *tools.APIRequest) bool {
return len(verifyAuthAction[T](abs.Instances, request)) > 0 || abs.AbstractObject.VerifyAuth(request) return len(verifyAuthAction[T](abs.Instances, request)) > 0 || abs.AbstractObject.VerifyAuth(request)
} }
@@ -134,11 +127,6 @@ type GeoPoint struct {
Longitude float64 `json:"longitude,omitempty" bson:"longitude,omitempty"` Longitude float64 `json:"longitude,omitempty" bson:"longitude,omitempty"`
} }
type Credentials struct {
Login string `json:"login,omitempty" bson:"login,omitempty"`
Pass string `json:"password,omitempty" bson:"password,omitempty"`
}
type ResourceInstance[T ResourcePartnerITF] struct { type ResourceInstance[T ResourcePartnerITF] struct {
utils.AbstractObject utils.AbstractObject
Location GeoPoint `json:"location,omitempty" bson:"location,omitempty"` Location GeoPoint `json:"location,omitempty" bson:"location,omitempty"`
@@ -150,12 +138,6 @@ type ResourceInstance[T ResourcePartnerITF] struct {
Partnerships []T `json:"partnerships,omitempty" bson:"partnerships,omitempty"` Partnerships []T `json:"partnerships,omitempty" bson:"partnerships,omitempty"`
} }
func (ri *ResourceInstance[T]) ClearEnv() {
ri.Env = []models.Param{}
ri.Inputs = []models.Param{}
ri.Outputs = []models.Param{}
}
func (ri *ResourceInstance[T]) GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF { func (ri *ResourceInstance[T]) GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF {
pricings := []pricing.PricingProfileITF{} pricings := []pricing.PricingProfileITF{}
for _, p := range ri.Partnerships { for _, p := range ri.Partnerships {

View File

@@ -17,7 +17,7 @@ import (
* it defines the resource storage * it defines the resource storage
*/ */
type StorageResource struct { type StorageResource struct {
AbstractInstanciatedResource[*StorageResourceInstance] // AbstractResource contains the basic fields of an object (id, name) AbstractIntanciatedResource[*StorageResourceInstance] // AbstractResource contains the basic fields of an object (id, name)
StorageType enum.StorageType `bson:"storage_type" json:"storage_type" default:"-1"` // Type is the type of the storage StorageType enum.StorageType `bson:"storage_type" json:"storage_type" default:"-1"` // Type is the type of the storage
Acronym string `bson:"acronym,omitempty" json:"acronym,omitempty"` // Acronym is the acronym of the storage Acronym string `bson:"acronym,omitempty" json:"acronym,omitempty"` // Acronym is the acronym of the storage
} }
@@ -35,7 +35,7 @@ func (abs *StorageResource) ConvertToPricedResource(
if t != tools.STORAGE_RESOURCE { if t != tools.STORAGE_RESOURCE {
return nil return nil
} }
p := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, request) p := abs.AbstractIntanciatedResource.ConvertToPricedResource(t, request)
priced := p.(*PricedResource) priced := p.(*PricedResource)
return &PricedStorageResource{ return &PricedStorageResource{
PricedResource: *priced, PricedResource: *priced,
@@ -44,7 +44,6 @@ func (abs *StorageResource) ConvertToPricedResource(
type StorageResourceInstance struct { type StorageResourceInstance struct {
ResourceInstance[*StorageResourcePartnership] ResourceInstance[*StorageResourcePartnership]
Credentials *Credentials `json:"credentials,omitempty" bson:"credentials,omitempty"`
Source string `bson:"source,omitempty" json:"source,omitempty"` // Source is the source of the storage Source string `bson:"source,omitempty" json:"source,omitempty"` // Source is the source of the storage
Local bool `bson:"local" json:"local"` Local bool `bson:"local" json:"local"`
SecurityLevel string `bson:"security_level,omitempty" json:"security_level,omitempty"` SecurityLevel string `bson:"security_level,omitempty" json:"security_level,omitempty"`
@@ -55,13 +54,6 @@ type StorageResourceInstance struct {
Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` // Throughput is the throughput of the storage Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` // Throughput is the throughput of the storage
} }
func (ri *StorageResourceInstance) ClearEnv() {
ri.Credentials = nil
ri.Env = []models.Param{}
ri.Inputs = []models.Param{}
ri.Outputs = []models.Param{}
}
func (ri *StorageResourceInstance) StoreDraftDefault() { func (ri *StorageResourceInstance) StoreDraftDefault() {
found := false found := false
for _, p := range ri.ResourceInstance.Env { for _, p := range ri.ResourceInstance.Env {

View File

@@ -23,10 +23,6 @@ func (r *WorkflowResource) GetType() string {
return tools.WORKFLOW_RESOURCE.String() return tools.WORKFLOW_RESOURCE.String()
} }
func (d *WorkflowResource) ClearEnv() utils.DBObject {
return d
}
func (d *WorkflowResource) Trim() { func (d *WorkflowResource) Trim() {
/* EMPTY */ /* EMPTY */
} }

View File

@@ -66,6 +66,7 @@ func GenericDeleteOne(id string, a Accessor) (DBObject, int, error) {
return nil, 403, errors.New("you are not allowed to delete :" + a.GetType().String()) return nil, 403, errors.New("you are not allowed to delete :" + a.GetType().String())
} }
if err != nil { if err != nil {
a.GetLogger().Error().Msg("Could not retrieve " + id + " to db. Error: " + err.Error())
return nil, code, err return nil, code, err
} }
if a.ShouldVerifyAuth() && !res.VerifyAuth(a.GetRequest()) { if a.ShouldVerifyAuth() && !res.VerifyAuth(a.GetRequest()) {
@@ -113,6 +114,7 @@ func GenericLoadOne[T DBObject](id string, f func(DBObject) (DBObject, int, erro
var data T var data T
res_mongo, code, err := mongo.MONGOService.LoadOne(id, a.GetType().String()) res_mongo, code, err := mongo.MONGOService.LoadOne(id, a.GetType().String())
if err != nil { if err != nil {
a.GetLogger().Error().Msg("Could not retrieve " + id + " from db. Error: " + err.Error())
return nil, code, err return nil, code, err
} }
res_mongo.Decode(&data) res_mongo.Decode(&data)
@@ -126,6 +128,7 @@ func genericLoadAll[T DBObject](res *mgb.Cursor, code int, err error, onlyDraft
objs := []ShallowDBObject{} objs := []ShallowDBObject{}
var results []T var results []T
if err != nil { if err != nil {
a.GetLogger().Error().Msg("Could not retrieve any from db. Error: " + err.Error())
return nil, code, err return nil, code, err
} }
if err = res.All(mongo.MngoCtx, &results); err != nil { if err = res.All(mongo.MngoCtx, &results); err != nil {

View File

@@ -128,8 +128,8 @@ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) {
return true, nil return true, nil
} }
func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIRequest) (float64, map[tools.DataType]map[string]pricing.PricedItemITF, *Workflow, error) { func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIRequest) (float64, map[tools.DataType][]pricing.PricedItemITF, *Workflow, error) {
priceds := map[tools.DataType]map[string]pricing.PricedItemITF{} priceds := map[tools.DataType][]pricing.PricedItemITF{}
ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, wf, priceds, request, wf.Graph.IsProcessing, ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, wf, priceds, request, wf.Graph.IsProcessing,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) { func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
return start.Add(time.Duration(wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(), request)) * time.Second), priced.GetExplicitDurationInS() return start.Add(time.Duration(wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(), request)) * time.Second), priced.GetExplicitDurationInS()
@@ -189,12 +189,12 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIR
return longest, priceds, wf, nil return longest, priceds, wf, nil
} }
func plan[T resources.ResourceInterface](dt tools.DataType, wf *Workflow, priceds map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest, func plan[T resources.ResourceInterface](dt tools.DataType, wf *Workflow, priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest,
f func(graph.GraphItem) bool, start func(resources.ResourceInterface, pricing.PricedItemITF) (time.Time, float64), end func(time.Time, float64) *time.Time) ([]T, map[tools.DataType]map[string]pricing.PricedItemITF, error) { f func(graph.GraphItem) bool, start func(resources.ResourceInterface, pricing.PricedItemITF) (time.Time, float64), end func(time.Time, float64) *time.Time) ([]T, map[tools.DataType][]pricing.PricedItemITF, error) {
resources := []T{} resources := []T{}
for _, item := range wf.GetGraphItems(f) { for _, item := range wf.GetGraphItems(f) {
if priceds[dt] == nil { if priceds[dt] == nil {
priceds[dt] = map[string]pricing.PricedItemITF{} priceds[dt] = []pricing.PricedItemITF{}
} }
dt, realItem := item.GetResource() dt, realItem := item.GetResource()
if realItem == nil { if realItem == nil {
@@ -212,7 +212,7 @@ func plan[T resources.ResourceInterface](dt tools.DataType, wf *Workflow, priced
priced.SetLocationEnd(*e) priced.SetLocationEnd(*e)
} }
resources = append(resources, realItem.(T)) resources = append(resources, realItem.(T))
priceds[dt][item.ID] = priced priceds[dt] = append(priceds[dt], priced)
} }
return resources, priceds, nil return resources, priceds, nil
} }

View File

@@ -100,7 +100,7 @@ func (a *workflowMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.
return nil, code, err return nil, code, err
} }
workflow := res.(*Workflow) workflow := res.(*Workflow)
a.execute(workflow, false, true) // update the workspace for the workflow a.execute(workflow, false, false) // update the workspace for the workflow
a.share(workflow, false, a.GetCaller()) // share the update to the peers a.share(workflow, false, a.GetCaller()) // share the update to the peers
return res, code, nil return res, code, nil
} }
@@ -119,19 +119,12 @@ func (a *workflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, i
workflow := res.(*Workflow) workflow := res.(*Workflow)
a.share(workflow, false, a.GetCaller()) // share the creation to the peers a.share(workflow, false, a.GetCaller()) // share the creation to the peers
a.execute(workflow, false, true) // store the workspace for the workflow a.execute(workflow, false, false) // store the workspace for the workflow
return res, code, nil return res, code, nil
} }
// CopyOne copies a workflow in the database // CopyOne copies a workflow in the database
func (a *workflowMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) { func (a *workflowMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
wf := data.(*Workflow)
for _, item := range wf.Graph.Items {
_, obj := item.GetResource()
if obj != nil {
obj.ClearEnv()
}
}
return utils.GenericStoreOne(data, a) return utils.GenericStoreOne(data, a)
} }
@@ -214,10 +207,10 @@ func (a *workflowMongoAccessor) verifyResource(obj utils.DBObject) utils.DBObjec
} else if t == tools.DATA_RESOURCE { } else if t == tools.DATA_RESOURCE {
access = resources.NewAccessor[*resources.DataResource](t, a.GetRequest(), func() utils.DBObject { return &resources.DataResource{} }) access = resources.NewAccessor[*resources.DataResource](t, a.GetRequest(), func() utils.DBObject { return &resources.DataResource{} })
} else { } else {
wf.Graph.Clear(resource.GetID()) wf.Graph.Clear(item.Data.GetID())
} }
if error := utils.VerifyAccess(access, resource.GetID()); error != nil { if error := utils.VerifyAccess(access, resource.GetID()); error != nil {
wf.Graph.Clear(resource.GetID()) wf.Graph.Clear(item.Data.GetID())
} }
} }
return wf return wf

View File

@@ -15,41 +15,39 @@ import (
) )
/* /*
* WorkflowExecution is a struct that represents a list of workflow executions * WorkflowExecutions is a struct that represents a list of workflow executions
* Warning: No user can write (del, post, put) a workflow execution, it is only used by the system * Warning: No user can write (del, post, put) a workflow execution, it is only used by the system
* workflows generate their own executions * workflows generate their own executions
*/ */
type WorkflowExecution struct { type WorkflowExecutions struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
PeerBookByGraph map[string]map[string][]string `json:"peer_book_by_graph,omitempty" bson:"peer_book_by_graph,omitempty"` // BookByResource is a map of the resource id and the list of the booking id
ExecutionsID string `json:"executions_id,omitempty" bson:"executions_id,omitempty"`
ExecDate time.Time `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` // ExecDate is the execution date of the workflow, is required ExecDate time.Time `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` // ExecDate is the execution date of the workflow, is required
EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` // EndDate is the end date of the workflow EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` // EndDate is the end date of the workflow
State enum.BookingStatus `json:"state" bson:"state" default:"0"` // TEMPORARY TODO DEFAULT 1 -> 0 State is the state of the workflow State enum.BookingStatus `json:"state" bson:"state" default:"0"` // TEMPORARY TODO DEFAULT 1 -> 0 State is the state of the workflow
WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
} }
func (r *WorkflowExecution) StoreDraftDefault() { func (r *WorkflowExecutions) StoreDraftDefault() {
r.IsDraft = false // TODO: TEMPORARY r.IsDraft = true // TODO: TEMPORARY
r.State = enum.SCHEDULED r.State = enum.DRAFT
} }
func (r *WorkflowExecution) CanUpdate(set utils.DBObject) (bool, utils.DBObject) { func (r *WorkflowExecutions) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
if r.State != set.(*WorkflowExecution).State { if r.State != set.(*WorkflowExecutions).State {
return true, &WorkflowExecution{State: set.(*WorkflowExecution).State} // only state can be updated return true, &WorkflowExecutions{State: set.(*WorkflowExecutions).State} // only state can be updated
} }
return !r.IsDraft, set // only draft buying can be updated return !r.IsDraft, set // only draft buying can be updated
} }
func (r *WorkflowExecution) CanDelete() bool { func (r *WorkflowExecutions) CanDelete() bool {
return r.IsDraft // only draft bookings can be deleted return r.IsDraft // only draft bookings can be deleted
} }
func (wfa *WorkflowExecution) Equals(we *WorkflowExecution) bool { func (wfa *WorkflowExecutions) Equals(we *WorkflowExecutions) bool {
return wfa.ExecDate.Equal(we.ExecDate) && wfa.WorkflowID == we.WorkflowID return wfa.ExecDate.Equal(we.ExecDate) && wfa.WorkflowID == we.WorkflowID
} }
func (ws *WorkflowExecution) PurgeDraft(request *tools.APIRequest) error { func (ws *WorkflowExecutions) PurgeDraft(request *tools.APIRequest) error {
if ws.EndDate == nil { if ws.EndDate == nil {
// if no end... then Book like a savage // if no end... then Book like a savage
e := ws.ExecDate.Add(time.Hour) e := ws.ExecDate.Add(time.Hour)
@@ -76,7 +74,7 @@ func (ws *WorkflowExecution) PurgeDraft(request *tools.APIRequest) error {
} }
// tool to transform the argo status to a state // tool to transform the argo status to a state
func (wfa *WorkflowExecution) ArgoStatusToState(status string) *WorkflowExecution { func (wfa *WorkflowExecutions) ArgoStatusToState(status string) *WorkflowExecutions {
status = strings.ToLower(status) status = strings.ToLower(status)
switch status { switch status {
case "succeeded": // Succeeded case "succeeded": // Succeeded
@@ -91,54 +89,38 @@ func (wfa *WorkflowExecution) ArgoStatusToState(status string) *WorkflowExecutio
return wfa return wfa
} }
func (r *WorkflowExecution) GenerateID() { func (r *WorkflowExecutions) GenerateID() {
if r.UUID == "" {
r.UUID = uuid.New().String() r.UUID = uuid.New().String()
}
} }
func (d *WorkflowExecution) GetName() string { func (d *WorkflowExecutions) GetName() string {
return d.UUID + "_" + d.ExecDate.String() return d.UUID + "_" + d.ExecDate.String()
} }
func (d *WorkflowExecution) GetAccessor(request *tools.APIRequest) utils.Accessor { func (d *WorkflowExecutions) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request) // Create a new instance of the accessor return NewAccessor(request) // Create a new instance of the accessor
} }
func (d *WorkflowExecution) VerifyAuth(request *tools.APIRequest) bool { func (d *WorkflowExecutions) VerifyAuth(request *tools.APIRequest) bool {
return true return true
} }
func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[tools.DataType]map[string]pricing.PricedItemITF) []*booking.Booking { func (d *WorkflowExecutions) Book(wfID string, priceds map[tools.DataType][]pricing.PricedItemITF) []*booking.Booking {
booking := d.bookEach(executionsID, wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE]) booking := d.bookEach(wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE])
booking = append(booking, d.bookEach(executionsID, wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...) booking = append(booking, d.bookEach(wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...)
return booking return booking
} }
func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.DataType, priceds map[string]pricing.PricedItemITF) []*booking.Booking { func (d *WorkflowExecutions) bookEach(wfID string, dt tools.DataType, priceds []pricing.PricedItemITF) []*booking.Booking {
items := []*booking.Booking{} items := []*booking.Booking{}
for itemID, priced := range priceds { for _, priced := range priceds {
if d.PeerBookByGraph == nil {
d.PeerBookByGraph = map[string]map[string][]string{}
}
if d.PeerBookByGraph[priced.GetCreatorID()] == nil {
d.PeerBookByGraph[priced.GetCreatorID()] = map[string][]string{}
}
if d.PeerBookByGraph[priced.GetCreatorID()][itemID] == nil {
d.PeerBookByGraph[priced.GetCreatorID()][itemID] = []string{}
}
start := d.ExecDate start := d.ExecDate
if s := priced.GetLocationStart(); s != nil { if s := priced.GetLocationStart(); s != nil {
start = *s start = *s
} }
end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second) end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second)
bookingItem := &booking.Booking{ bookingItem := &booking.Booking{
AbstractObject: utils.AbstractObject{ State: enum.DRAFT,
UUID: uuid.New().String(),
Name: d.GetName() + "_" + executionsID + "_" + wfID,
},
ExecutionsID: executionsID,
State: enum.SCHEDULED,
ResourceID: priced.GetID(), ResourceID: priced.GetID(),
ResourceType: dt, ResourceType: dt,
DestPeerID: priced.GetCreatorID(), DestPeerID: priced.GetCreatorID(),
@@ -148,8 +130,6 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.
ExpectedEndDate: &end, ExpectedEndDate: &end,
} }
items = append(items, bookingItem) items = append(items, bookingItem)
d.PeerBookByGraph[priced.GetCreatorID()][itemID] = append(
d.PeerBookByGraph[priced.GetCreatorID()][itemID], bookingItem.GetID())
} }
return items return items
} }

View File

@@ -43,11 +43,11 @@ func (wfa *workflowExecutionMongoAccessor) DeleteOne(id string) (utils.DBObject,
} }
func (wfa *workflowExecutionMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { func (wfa *workflowExecutionMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
if set.(*WorkflowExecution).State == 0 { if set.(*WorkflowExecutions).State == 0 {
return nil, 400, errors.New("state is required") return nil, 400, errors.New("state is required")
} }
realSet := WorkflowExecution{State: set.(*WorkflowExecution).State} realSet := WorkflowExecutions{State: set.(*WorkflowExecutions).State}
return utils.GenericUpdateOne(&realSet, id, wfa, &WorkflowExecution{}) return utils.GenericUpdateOne(&realSet, id, wfa, &WorkflowExecutions{})
} }
func (wfa *workflowExecutionMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { func (wfa *workflowExecutionMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
@@ -59,13 +59,13 @@ func (wfa *workflowExecutionMongoAccessor) CopyOne(data utils.DBObject) (utils.D
} }
func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne[*WorkflowExecution](id, func(d utils.DBObject) (utils.DBObject, int, error) { return utils.GenericLoadOne[*WorkflowExecutions](id, func(d utils.DBObject) (utils.DBObject, int, error) {
if d.(*WorkflowExecution).State == enum.DRAFT && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecution).ExecDate) { if d.(*WorkflowExecutions).State == enum.DRAFT && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request)) utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request))
return nil, 404, errors.New("not found") return nil, 404, errors.New("not found")
} }
if d.(*WorkflowExecution).State == enum.SCHEDULED && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecution).ExecDate) { if d.(*WorkflowExecutions).State == enum.SCHEDULED && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
d.(*WorkflowExecution).State = enum.FORGOTTEN d.(*WorkflowExecutions).State = enum.FORGOTTEN
utils.GenericRawUpdateOne(d, id, newShallowAccessor(a.Request)) utils.GenericRawUpdateOne(d, id, newShallowAccessor(a.Request))
} }
return d, 200, nil return d, 200, nil
@@ -73,21 +73,21 @@ func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int
} }
func (a *workflowExecutionMongoAccessor) LoadAll(isDraft bool) ([]utils.ShallowDBObject, int, error) { func (a *workflowExecutionMongoAccessor) LoadAll(isDraft bool) ([]utils.ShallowDBObject, int, error) {
return utils.GenericLoadAll[*WorkflowExecution](a.getExec(), isDraft, a) return utils.GenericLoadAll[*WorkflowExecutions](a.getExec(), isDraft, a)
} }
func (a *workflowExecutionMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) { func (a *workflowExecutionMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) {
return utils.GenericSearch[*WorkflowExecution](filters, search, a.GetExecFilters(search), a.getExec(), isDraft, a) return utils.GenericSearch[*WorkflowExecutions](filters, search, a.GetExecFilters(search), a.getExec(), isDraft, a)
} }
func (a *workflowExecutionMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject { func (a *workflowExecutionMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject {
return func(d utils.DBObject) utils.ShallowDBObject { return func(d utils.DBObject) utils.ShallowDBObject {
if d.(*WorkflowExecution).State == enum.DRAFT && time.Now().UTC().After(d.(*WorkflowExecution).ExecDate) { if d.(*WorkflowExecutions).State == enum.DRAFT && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request)) utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request))
return nil return nil
} }
if d.(*WorkflowExecution).State == enum.SCHEDULED && time.Now().UTC().After(d.(*WorkflowExecution).ExecDate) { if d.(*WorkflowExecutions).State == enum.SCHEDULED && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
d.(*WorkflowExecution).State = enum.FORGOTTEN d.(*WorkflowExecutions).State = enum.FORGOTTEN
utils.GenericRawUpdateOne(d, d.GetID(), newShallowAccessor(a.Request)) utils.GenericRawUpdateOne(d, d.GetID(), newShallowAccessor(a.Request))
return d return d
} }

View File

@@ -6,13 +6,11 @@ import (
"strings" "strings"
"time" "time"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
"github.com/robfig/cron" "github.com/robfig/cron"
) )
@@ -22,9 +20,8 @@ import (
*/ */
// it's a flying object only use in a session time. It's not stored in the database // it's a flying object only use in a session time. It's not stored in the database
type WorkflowSchedule struct { type WorkflowSchedule struct {
UUID string `json:"id" validate:"required"` // ExecutionsID is the list of the executions id of the workflow
Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule
WorkflowExecution []*WorkflowExecution `json:"workflow_executions,omitempty"` // WorkflowExecution is the list of executions of the workflow WorkflowExecutions []*WorkflowExecutions `json:"workflow_executions,omitempty"` // WorkflowExecutions is the list of executions of the workflow
Message string `json:"message,omitempty"` // Message is the message of the schedule Message string `json:"message,omitempty"` // Message is the message of the schedule
Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule
Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time
@@ -39,7 +36,6 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W
return nil return nil
} }
ws := &WorkflowSchedule{ ws := &WorkflowSchedule{
UUID: uuid.New().String(),
Start: s, Start: s,
DurationS: durationInS, DurationS: durationInS,
Cron: cron, Cron: cron,
@@ -51,19 +47,19 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W
return ws return ws
} }
func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) { func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecutions, error) {
if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" {
return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined") return false, nil, []*WorkflowExecutions{}, errors.New("no caller defined")
} }
access := workflow.NewAccessor(request) access := workflow.NewAccessor(request)
res, code, err := access.LoadOne(wfID) res, code, err := access.LoadOne(wfID)
if code != 200 { if code != 200 {
return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("could not load the workflow with id: " + err.Error()) return false, nil, []*WorkflowExecutions{}, errors.New("could not load the workflow with id: " + err.Error())
} }
wf := res.(*workflow.Workflow) wf := res.(*workflow.Workflow)
longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request) longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request)
if err != nil { if err != nil {
return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err return false, wf, []*WorkflowExecutions{}, err
} }
ws.DurationS = longest ws.DurationS = longest
ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds." ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds."
@@ -72,11 +68,10 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
} }
execs, err := ws.getExecutions(wf) execs, err := ws.getExecutions(wf)
if err != nil { if err != nil {
return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err return false, wf, []*WorkflowExecutions{}, err
} }
bookings := []*booking.Booking{}
for _, exec := range execs { for _, exec := range execs {
bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) bookings := exec.Book(wfID, priceds)
for _, b := range bookings { for _, b := range bookings {
meth := request.Caller.URLS[tools.BOOKING][tools.GET] meth := request.Caller.URLS[tools.BOOKING][tools.GET]
meth = strings.ReplaceAll(meth, ":id", b.ResourceID) meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
@@ -85,47 +80,42 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
request.Caller.URLS[tools.BOOKING][tools.GET] = meth request.Caller.URLS[tools.BOOKING][tools.GET] = meth
_, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller) _, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller)
if err != nil { if err != nil {
return false, wf, execs, bookings, err return false, wf, execs, err
} }
} }
} }
return true, wf, execs, bookings, nil return true, wf, execs, nil
} }
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) { func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecutions, error) {
if request == nil { if request == nil {
return ws, nil, []*WorkflowExecution{}, errors.New("no request found") return ws, nil, []*WorkflowExecutions{}, errors.New("no request found")
} }
c := request.Caller c := request.Caller
if c == nil || c.URLS == nil || c.URLS[tools.BOOKING] == nil { if c == nil || c.URLS == nil || c.URLS[tools.BOOKING] == nil {
return ws, nil, []*WorkflowExecution{}, errors.New("no caller defined") return ws, nil, []*WorkflowExecutions{}, errors.New("no caller defined")
} }
methods := c.URLS[tools.BOOKING] methods := c.URLS[tools.BOOKING]
if _, ok := methods[tools.GET]; !ok { if _, ok := methods[tools.GET]; !ok {
return ws, nil, []*WorkflowExecution{}, errors.New("no path found") return ws, nil, []*WorkflowExecutions{}, errors.New("no path found")
} }
ok, wf, executions, bookings, err := ws.CheckBooking(wfID, request) ok, wf, executions, err := ws.CheckBooking(wfID, request)
ws.WorkflowExecution = executions
if !ok || err != nil { if !ok || err != nil {
return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) return ws, nil, []*WorkflowExecutions{}, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
} }
ws.Workflow = wf ws.Workflow = wf
for _, booking := range bookings { ws.WorkflowExecutions = executions
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)
if err != nil {
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
}
}
for _, exec := range executions { for _, exec := range executions {
err := exec.PurgeDraft(request) err := exec.PurgeDraft(request)
if err != nil { if err != nil {
return ws, nil, []*WorkflowExecution{}, errors.New("purge draft" + fmt.Sprintf("%v", err)) return ws, nil, []*WorkflowExecutions{}, errors.New("purge draft" + fmt.Sprintf("%v", err))
} }
exec.GenerateID()
exec.StoreDraftDefault() exec.StoreDraftDefault()
// Should DELETE the previous execution2 // Should DELETE the previous execution2
fmt.Println(utils.GenericStoreOne(exec, NewAccessor(request))) utils.GenericStoreOne(exec, NewAccessor(request))
} }
return ws, wf, executions, nil return ws, wf, executions, nil
} }
@@ -142,19 +132,17 @@ VERIFY THAT WE HANDLE DIFFERENCE BETWEEN LOCATION TIME && BOOKING
* getExecutions is a function that returns the executions of a workflow * getExecutions is a function that returns the executions of a workflow
* it returns an array of workflow_execution.WorkflowExecution * it returns an array of workflow_execution.WorkflowExecution
*/ */
func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*WorkflowExecution, error) { func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*WorkflowExecutions, error) {
workflows_executions := []*WorkflowExecution{} workflows_executions := []*WorkflowExecutions{}
dates, err := ws.getDates() dates, err := ws.getDates()
if err != nil { if err != nil {
return workflows_executions, err return workflows_executions, err
} }
for _, date := range dates { for _, date := range dates {
obj := &WorkflowExecution{ obj := &WorkflowExecutions{
AbstractObject: utils.AbstractObject{ AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(), // set the uuid of the execution
Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution
}, },
ExecutionsID: ws.UUID,
ExecDate: date.Start, // set the execution date ExecDate: date.Start, // set the execution date
EndDate: date.End, // set the end date EndDate: date.End, // set the end date
State: enum.DRAFT, // set the state to 1 (scheduled) State: enum.DRAFT, // set the state to 1 (scheduled)