Compare commits

..

15 Commits

Author SHA1 Message Date
mr
c53e25e69a add purchase resource in model catalog 2025-02-13 10:32:49 +01:00
mr
4aebb48e73 add purchase resource in model catalog 2025-02-13 10:32:49 +01:00
mr
c9690fc1ba add purchase resource in model catalog 2025-02-13 10:32:49 +01:00
mr
cd10104089 add purchase resource in model catalog 2025-02-13 10:32:49 +01:00
mr
f6bc19123c add purchase resource in model catalog 2025-02-13 10:32:49 +01:00
mr
305e5919f9 add purchase resource in model catalog 2025-02-13 10:32:49 +01:00
mr
2ee2320a03 add purchase resource in model catalog 2025-02-13 10:32:49 +01:00
mr
9a4cf686ca add purchase resource in model catalog 2025-02-13 10:32:49 +01:00
mr
5376cf7681 workflow execution evolved 2025-02-13 10:32:49 +01:00
mr
2f0eb9ca35 workflow execution evolved 2025-02-13 10:32:49 +01:00
mr
c1d0932301 workflow execution evolved 2025-02-13 10:32:49 +01:00
mr
82cb2aa76a workflow execution evolved 2025-02-13 10:32:49 +01:00
mr
2cbbb49ce2 workflow execution evolved 2025-02-13 10:32:49 +01:00
mr
3c615fd3bc workflow execution evolved 2025-02-13 10:32:49 +01:00
pb
91d3e5c3cd Added a nil verification to StoreOne and UpdateOne for Graph object 2025-02-13 10:32:49 +01:00
29 changed files with 186 additions and 433 deletions

View File

@ -26,12 +26,12 @@ import (
func GetConfLoader() *onion.Onion { func GetConfLoader() *onion.Onion {
logger := zerolog.New(os.Stdout).With().Timestamp().Logger() logger := zerolog.New(os.Stdout).With().Timestamp().Logger()
AppName := GetAppName() AppName := GetAppName()
EnvPrefix := "OC_" EnvPrefix := strings.ToUpper(AppName[0:2]+AppName[3:]) + "_"
defaultConfigFile := "/etc/oc/" + AppName[3:] + ".json" defaultConfigFile := "/etc/oc/" + AppName[3:] + ".json"
localConfigFile := "./" + AppName[3:] + ".json" localConfigFile := "./" + AppName[3:] + ".json"
var configFile string var configFile string
var o *onion.Onion var o *onion.Onion
l3 := GetEnvVarLayer(EnvPrefix) l3 := onion.NewEnvLayerPrefix("_", EnvPrefix)
l2, err := onion.NewFileLayer(localConfigFile, nil) l2, err := onion.NewFileLayer(localConfigFile, nil)
if err == nil { if err == nil {
logger.Info().Msg("Local config file found " + localConfigFile + ", overriding default file") logger.Info().Msg("Local config file found " + localConfigFile + ", overriding default file")
@ -54,17 +54,3 @@ func GetConfLoader() *onion.Onion {
} }
return o return o
} }
func GetEnvVarLayer(prefix string) onion.Layer {
envVars := make(map[string]interface{})
for _, e := range os.Environ() {
pair := strings.SplitN(e, "=", 2)
key := pair[0]
if strings.HasPrefix(key, prefix) {
envVars[strings.TrimPrefix(key, prefix)] = pair[1]
}
}
return onion.NewMapLayer(envVars)
}

View File

@ -287,7 +287,7 @@ func (m *MongoDB) Search(filters *dbs.Filters, collection_name string) (*mongo.C
return nil, 503, err return nil, 503, err
} }
opts := options.Find() opts := options.Find()
opts.SetLimit(1000) opts.SetLimit(100)
targetDBCollection := CollectionMap[collection_name] targetDBCollection := CollectionMap[collection_name]
orList := bson.A{} orList := bson.A{}
andList := bson.A{} andList := bson.A{}

View File

@ -261,7 +261,7 @@ func (r *Request) Schedule(wfID string, scheduler *workflow_execution.WorkflowSc
} }
func (r *Request) CheckBooking(wfID string, start string, end string, durationInS float64, cron string) bool { func (r *Request) CheckBooking(wfID string, start string, end string, durationInS float64, cron string) bool {
ok, _, _, _, err := workflow_execution.NewScheduler(start, end, durationInS, cron).CheckBooking(wfID, &tools.APIRequest{ ok, _, _, err := workflow_execution.NewScheduler(start, end, durationInS, cron).CheckBooking(wfID, &tools.APIRequest{
Caller: r.caller, Caller: r.caller,
Username: r.user, Username: r.user,
PeerID: r.peerID, PeerID: r.peerID,
@ -563,9 +563,9 @@ func (l *LibData) ToRule() *rule.Rule {
return nil return nil
} }
func (l *LibData) ToWorkflowExecution() *workflow_execution.WorkflowExecution { func (l *LibData) ToWorkflowExecution() *workflow_execution.WorkflowExecutions {
if l.Data.GetAccessor(nil).GetType() == tools.WORKFLOW_EXECUTION { if l.Data.GetAccessor(nil).GetType() == tools.WORKFLOW_EXECUTION {
return l.Data.(*workflow_execution.WorkflowExecution) return l.Data.(*workflow_execution.WorkflowExecutions)
} }
return nil return nil
} }

View File

@ -15,7 +15,6 @@ import (
*/ */
type Booking struct { type Booking struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
ExecutionsID string `json:"executions_id,omitempty" bson:"executions_id,omitempty" validate:"required"` // ExecutionsID is the ID of the executions
DestPeerID string `json:"dest_peer_id,omitempty"` // DestPeerID is the ID of the destination peer DestPeerID string `json:"dest_peer_id,omitempty"` // DestPeerID is the ID of the destination peer
WorkflowID string `json:"workflow_id,omitempty" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow WorkflowID string `json:"workflow_id,omitempty" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
ExecutionID string `json:"execution_id,omitempty" bson:"execution_id,omitempty" validate:"required"` ExecutionID string `json:"execution_id,omitempty" bson:"execution_id,omitempty" validate:"required"`
@ -81,6 +80,10 @@ func (d *Booking) GetDelayOnDuration() time.Duration {
return d.GetRealDuration() - d.GetUsualDuration() return d.GetRealDuration() - d.GetUsualDuration()
} }
func (d *Booking) GetName() string {
return d.GetID() + "_" + d.ExpectedStartDate.String()
}
func (d *Booking) GetAccessor(request *tools.APIRequest) utils.Accessor { func (d *Booking) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request) // Create a new instance of the accessor return NewAccessor(request) // Create a new instance of the accessor
} }
@ -90,7 +93,7 @@ func (d *Booking) VerifyAuth(request *tools.APIRequest) bool {
} }
func (r *Booking) StoreDraftDefault() { func (r *Booking) StoreDraftDefault() {
r.IsDraft = false r.IsDraft = true
} }
func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) { func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {

View File

@ -1,7 +1,6 @@
package booking package booking
import ( import (
"errors"
"time" "time"
"cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/dbs"
@ -34,11 +33,7 @@ func (a *bookingMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error)
} }
func (a *bookingMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { func (a *bookingMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
if set.(*Booking).State == 0 { return utils.GenericUpdateOne(set, id, a, &Booking{})
return nil, 400, errors.New("state is required")
}
realSet := &Booking{State: set.(*Booking).State}
return utils.GenericUpdateOne(realSet, id, a, &Booking{})
} }
func (a *bookingMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { func (a *bookingMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
@ -51,15 +46,13 @@ func (a *bookingMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int
func (a *bookingMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { func (a *bookingMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne[*Booking](id, func(d utils.DBObject) (utils.DBObject, int, error) { return utils.GenericLoadOne[*Booking](id, func(d utils.DBObject) (utils.DBObject, int, error) {
now := time.Now() if d.(*Booking).State == enum.DRAFT && time.Now().UTC().After(d.(*Booking).ExpectedStartDate) {
now = now.Add(time.Second * -60)
if d.(*Booking).State == enum.DRAFT && now.UTC().After(d.(*Booking).ExpectedStartDate) {
return utils.GenericDeleteOne(d.GetID(), a) return utils.GenericDeleteOne(d.GetID(), a)
} }
if (d.(*Booking).ExpectedEndDate) == nil { if (d.(*Booking).ExpectedEndDate) == nil {
d.(*Booking).State = enum.FORGOTTEN d.(*Booking).State = enum.FORGOTTEN
utils.GenericRawUpdateOne(d, id, a) utils.GenericRawUpdateOne(d, id, a)
} else if d.(*Booking).State == enum.SCHEDULED && now.UTC().After(d.(*Booking).ExpectedStartDate) { } else if d.(*Booking).State == enum.SCHEDULED && time.Now().UTC().After(*&d.(*Booking).ExpectedStartDate) {
d.(*Booking).State = enum.DELAYED d.(*Booking).State = enum.DELAYED
utils.GenericRawUpdateOne(d, id, a) utils.GenericRawUpdateOne(d, id, a)
} }
@ -77,13 +70,11 @@ func (a *bookingMongoAccessor) Search(filters *dbs.Filters, search string, isDra
func (a *bookingMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject { func (a *bookingMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject {
return func(d utils.DBObject) utils.ShallowDBObject { return func(d utils.DBObject) utils.ShallowDBObject {
now := time.Now() if d.(*Booking).State == enum.DRAFT && time.Now().UTC().After(d.(*Booking).ExpectedStartDate) {
now = now.Add(time.Second * -60)
if d.(*Booking).State == enum.DRAFT && now.UTC().After(d.(*Booking).ExpectedStartDate) {
utils.GenericDeleteOne(d.GetID(), a) utils.GenericDeleteOne(d.GetID(), a)
return nil return nil
} }
if d.(*Booking).State == enum.SCHEDULED && now.UTC().After(d.(*Booking).ExpectedStartDate) { if d.(*Booking).State == enum.SCHEDULED && time.Now().UTC().After(*&d.(*Booking).ExpectedStartDate) {
d.(*Booking).State = enum.DELAYED d.(*Booking).State = enum.DELAYED
utils.GenericRawUpdateOne(d, d.GetID(), a) utils.GenericRawUpdateOne(d, d.GetID(), a)
} }

View File

@ -42,15 +42,13 @@ const (
S3 S3
MEMORY MEMORY
HARDWARE HARDWARE
AZURE
GCS
) )
// String() - Returns the string representation of the storage type // String() - Returns the string representation of the storage type
func (t StorageType) String() string { func (t StorageType) String() string {
return [...]string{"FILE", "STREAM", "API", "DATABASE", "S3", "MEMORY", "HARDWARE", "AZURE", "GCS"}[t] return [...]string{"FILE", "STREAM", "API", "DATABASE", "S3", "MEMORY", "HARDWARE"}[t]
} }
func TypeList() []StorageType { func TypeList() []StorageType {
return []StorageType{FILE, STREAM, API, DATABASE, S3, MEMORY, HARDWARE, AZURE, GCS} return []StorageType{FILE, STREAM, API, DATABASE, S3, MEMORY, HARDWARE}
} }

View File

@ -7,7 +7,7 @@ import (
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
) )
func GetPlannerNearestStart(start time.Time, planned map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest) float64 { func GetPlannerNearestStart(start time.Time, planned map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 {
near := float64(10000000000) // set a high value near := float64(10000000000) // set a high value
for _, items := range planned { // loop through the planned items for _, items := range planned { // loop through the planned items
for _, priced := range items { // loop through the priced items for _, priced := range items { // loop through the priced items
@ -23,7 +23,7 @@ func GetPlannerNearestStart(start time.Time, planned map[tools.DataType]map[stri
return near return near
} }
func GetPlannerLongestTime(end *time.Time, planned map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest) float64 { func GetPlannerLongestTime(end *time.Time, planned map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 {
if end == nil { if end == nil {
return -1 return -1
} }

View File

@ -28,7 +28,7 @@ var models = map[string]func() utils.DBObject{
tools.STORAGE_RESOURCE.String(): func() utils.DBObject { return &resource.StorageResource{} }, tools.STORAGE_RESOURCE.String(): func() utils.DBObject { return &resource.StorageResource{} },
tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &resource.ProcessingResource{} }, tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &resource.ProcessingResource{} },
tools.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} }, tools.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} },
tools.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecution{} }, tools.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecutions{} },
tools.WORKSPACE.String(): func() utils.DBObject { return &w3.Workspace{} }, tools.WORKSPACE.String(): func() utils.DBObject { return &w3.Workspace{} },
tools.PEER.String(): func() utils.DBObject { return &peer.Peer{} }, tools.PEER.String(): func() utils.DBObject { return &peer.Peer{} },
tools.COLLABORATIVE_AREA.String(): func() utils.DBObject { return &collaborative_area.CollaborativeArea{} }, tools.COLLABORATIVE_AREA.String(): func() utils.DBObject { return &collaborative_area.CollaborativeArea{} },

View File

@ -68,10 +68,10 @@ func (o *Order) Pay(scheduler *workflow_execution.WorkflowSchedule, request *too
} else { } else {
o.IsDraft = false o.IsDraft = false
} }
for _, exec := range scheduler.WorkflowExecution { for _, exec := range scheduler.WorkflowExecutions {
exec.IsDraft = false exec.IsDraft = false
_, code, err := utils.GenericUpdateOne(exec, exec.GetID(), _, code, err := utils.GenericUpdateOne(exec, exec.GetID(),
workflow_execution.NewAccessor(request), &workflow_execution.WorkflowExecution{}) workflow_execution.NewAccessor(request), &workflow_execution.WorkflowExecutions{})
if code != 200 || err != nil { if code != 200 || err != nil {
return errors.New("could not update the workflow execution" + fmt.Sprintf("%v", err)) return errors.New("could not update the workflow execution" + fmt.Sprintf("%v", err))
} }
@ -100,7 +100,7 @@ func (o *Order) draftStoreFromModel(scheduler *workflow_execution.WorkflowSchedu
o.IsDraft = true o.IsDraft = true
o.OrderBy = request.PeerID o.OrderBy = request.PeerID
o.WorkflowExecutionIDs = []string{} // create an array of ids o.WorkflowExecutionIDs = []string{} // create an array of ids
for _, exec := range scheduler.WorkflowExecution { for _, exec := range scheduler.WorkflowExecutions {
o.WorkflowExecutionIDs = append(o.WorkflowExecutionIDs, exec.GetID()) o.WorkflowExecutionIDs = append(o.WorkflowExecutionIDs, exec.GetID())
} }
// set the name of the order // set the name of the order
@ -166,12 +166,12 @@ func (o *Order) draftBookOrder(scheduler *workflow_execution.WorkflowSchedule, r
if request == nil { if request == nil {
return draftedBookings, errors.New("no request found") return draftedBookings, errors.New("no request found")
} }
for _, exec := range scheduler.WorkflowExecution { for _, exec := range scheduler.WorkflowExecutions {
_, priceds, _, err := scheduler.Workflow.Planify(exec.ExecDate, exec.EndDate, request) _, priceds, _, err := scheduler.Workflow.Planify(exec.ExecDate, exec.EndDate, request)
if err != nil { if err != nil {
return draftedBookings, errors.New("could not planify the workflow" + fmt.Sprintf("%v", err)) return draftedBookings, errors.New("could not planify the workflow" + fmt.Sprintf("%v", err))
} }
bookings := exec.Book(scheduler.UUID, scheduler.Workflow.UUID, priceds) bookings := exec.Book(scheduler.Workflow.UUID, priceds)
for _, booking := range bookings { for _, booking := range bookings {
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "", _, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller) tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)

View File

@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"regexp"
"strings" "strings"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
@ -28,40 +29,47 @@ type PeerCache struct {
} }
// urlFormat formats the URL of the peer with the data type API function // urlFormat formats the URL of the peer with the data type API function
func (p *PeerCache) urlFormat(hostUrl string, dt tools.DataType) string { func (p *PeerCache) urlFormat(url string, dt tools.DataType) string {
// localhost is replaced by the local peer URL // localhost is replaced by the local peer URL
// because localhost must collide on a web request security protocol // because localhost must collide on a web request security protocol
/*localhost := "" localhost := ""
if strings.Contains(hostUrl, "localhost") { if strings.Contains(url, "localhost") {
localhost = "localhost" localhost = "localhost"
} }
if strings.Contains(hostUrl, "127.0.0.1") { if strings.Contains(url, "127.0.0.1") {
localhost = "127.0.0.1" localhost = "127.0.0.1"
} }
if localhost != "" { if localhost != "" {
r := regexp.MustCompile("(" + localhost + ":[0-9]+)") r := regexp.MustCompile("(" + localhost + ":[0-9]+)")
t := r.FindString(hostUrl) t := r.FindString(url)
if t != "" { if t != "" {
hostUrl = strings.Replace(hostUrl, t, dt.API()+":8080/oc", -1) url = strings.Replace(url, t, dt.API()+":8080/oc", -1)
} else { } else {
hostUrl = strings.ReplaceAll(hostUrl, localhost, dt.API()+":8080/oc") url = strings.ReplaceAll(url, localhost, dt.API()+":8080/oc")
} }
} else {*/ } else {
hostUrl = hostUrl + "/" + strings.ReplaceAll(dt.API(), "oc-", "") url = url + "/" + dt.API()
//} }
fmt.Println("Contacting", hostUrl) return url
return hostUrl
} }
// checkPeerStatus checks the status of a peer // checkPeerStatus checks the status of a peer
func (p *PeerCache) checkPeerStatus(peerID string, appName string) (*Peer, bool) { func (p *PeerCache) checkPeerStatus(peerID string, appName string, caller *tools.HTTPCaller) (*Peer, bool) {
api := tools.API{} api := tools.API{}
access := NewShallowAccessor() access := NewShallowAccessor()
res, code, _ := access.LoadOne(peerID) // Load the peer from db res, code, _ := access.LoadOne(peerID) // Load the peer from db
if code != 200 { // no peer no party if code != 200 { // no peer no party
return nil, false return nil, false
} }
url := p.urlFormat(res.(*Peer).Url, tools.PEER) + "/status" // Format the URL methods := caller.URLS[tools.PEER] // Get the methods url of the peer
if methods == nil {
return res.(*Peer), false
}
meth := methods[tools.POST] // Get the POST method to check status
if meth == "" {
return res.(*Peer), false
}
url := p.urlFormat(res.(*Peer).Url, tools.PEER) + meth // Format the URL
state, services := api.CheckRemotePeer(url) state, services := api.CheckRemotePeer(url)
res.(*Peer).ServicesState = services // Update the services states of the peer res.(*Peer).ServicesState = services // Update the services states of the peer
access.UpdateOne(res, peerID) // Update the peer in the db access.UpdateOne(res, peerID) // Update the peer in the db
@ -69,37 +77,36 @@ func (p *PeerCache) checkPeerStatus(peerID string, appName string) (*Peer, bool)
} }
// LaunchPeerExecution launches an execution on a peer // LaunchPeerExecution launches an execution on a peer
// The method contacts the path described by : peer.Url + datatype path (from enums) + replacement of id by dataID
func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string, func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
dt tools.DataType, method tools.METHOD, body interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) { dt tools.DataType, method tools.METHOD, body interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
fmt.Println("Launching peer execution on", caller.URLS, dt, method) fmt.Println("Launching peer execution on", caller.URLS, dt, method)
methods := caller.URLS[dt] // Get the methods url of the data type methods := caller.URLS[dt] // Get the methods url of the data type
if m, ok := methods[method]; !ok || m == "" { if m, ok := methods[method]; !ok || m == "" {
return nil, errors.New("Requested method " + method.String() + " not declared in HTTPCaller") return nil, errors.New("no path found")
} }
path := methods[method] // Get the path corresponding to the action we want to execute meth := methods[method] // Get the method url to execute
path = strings.ReplaceAll(path, ":id", dataID) // Replace the id in the path in case of a DELETE / UPDATE method (it's a standard naming in OC) meth = strings.ReplaceAll(meth, ":id", dataID) // Replace the id in the url in case of a DELETE / UPDATE method (it's a standard naming in OC)
url := "" url := ""
// Check the status of the peer // Check the status of the peer
if mypeer, ok := p.checkPeerStatus(peerID, dt.API()); !ok && mypeer != nil { if mypeer, ok := p.checkPeerStatus(peerID, dt.API(), caller); !ok && mypeer != nil {
// If the peer is not reachable, add the execution to the failed executions list // If the peer is not reachable, add the execution to the failed executions list
pexec := &PeerExecution{ pexec := &PeerExecution{
Method: method.String(), Method: method.String(),
Url: p.urlFormat((mypeer.Url), dt) + path, // the url is constitued of : host URL + resource path + action path (ex : mypeer.com/datacenter/resourcetype/path/to/action) Url: p.urlFormat((mypeer.Url)+meth, dt),
Body: body, Body: body,
DataType: dt.EnumIndex(), DataType: dt.EnumIndex(),
DataID: dataID, DataID: dataID,
} }
mypeer.AddExecution(*pexec) mypeer.AddExecution(*pexec)
NewShallowAccessor().UpdateOne(mypeer, peerID) // Update the peer in the db NewShallowAccessor().UpdateOne(mypeer, peerID) // Update the peer in the db
return nil, errors.New("peer is " + peerID + " not reachable") return nil, errors.New("peer is not reachable")
} else { } else {
if mypeer == nil { if mypeer == nil {
return nil, errors.New("peer " + peerID + " not found") return nil, errors.New("peer not found")
} }
// If the peer is reachable, launch the execution // If the peer is reachable, launch the execution
url = p.urlFormat((mypeer.Url), dt) + path // Format the URL url = p.urlFormat((mypeer.Url)+meth, dt) // Format the URL
tmp := mypeer.FailedExecution // Get the failed executions list tmp := mypeer.FailedExecution // Get the failed executions list
mypeer.FailedExecution = []PeerExecution{} // Reset the failed executions list mypeer.FailedExecution = []PeerExecution{} // Reset the failed executions list
NewShallowAccessor().UpdateOne(mypeer, peerID) // Update the peer in the db NewShallowAccessor().UpdateOne(mypeer, peerID) // Update the peer in the db

View File

@ -17,7 +17,7 @@ import (
* it defines the resource compute * it defines the resource compute
*/ */
type ComputeResource struct { type ComputeResource struct {
AbstractInstanciatedResource[*ComputeResourceInstance] AbstractIntanciatedResource[*ComputeResourceInstance]
Architecture string `json:"architecture,omitempty" bson:"architecture,omitempty"` // Architecture is the architecture Architecture string `json:"architecture,omitempty" bson:"architecture,omitempty"` // Architecture is the architecture
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
} }
@ -35,7 +35,7 @@ func (abs *ComputeResource) ConvertToPricedResource(
if t != tools.COMPUTE_RESOURCE { if t != tools.COMPUTE_RESOURCE {
return nil return nil
} }
p := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, request) p := abs.AbstractIntanciatedResource.ConvertToPricedResource(t, request)
priced := p.(*PricedResource) priced := p.(*PricedResource)
return &PricedComputeResource{ return &PricedComputeResource{
PricedResource: *priced, PricedResource: *priced,
@ -52,7 +52,6 @@ type ComputeNode struct {
type ComputeResourceInstance struct { type ComputeResourceInstance struct {
ResourceInstance[*ComputeResourcePartnership] ResourceInstance[*ComputeResourcePartnership]
Source string `json:"source,omitempty" bson:"source,omitempty"` // Source is the source of the resource
SecurityLevel string `json:"security_level,omitempty" bson:"security_level,omitempty"` SecurityLevel string `json:"security_level,omitempty" bson:"security_level,omitempty"`
PowerSources []string `json:"power_sources,omitempty" bson:"power_sources,omitempty"` PowerSources []string `json:"power_sources,omitempty" bson:"power_sources,omitempty"`
AnnualCO2Emissions float64 `json:"annual_co2_emissions,omitempty" bson:"co2_emissions,omitempty"` AnnualCO2Emissions float64 `json:"annual_co2_emissions,omitempty" bson:"co2_emissions,omitempty"`

View File

@ -16,7 +16,7 @@ import (
* it defines the resource data * it defines the resource data
*/ */
type DataResource struct { type DataResource struct {
AbstractInstanciatedResource[*DataInstance] AbstractIntanciatedResource[*DataInstance]
Type string `bson:"type,omitempty" json:"type,omitempty"` Type string `bson:"type,omitempty" json:"type,omitempty"`
Quality string `bson:"quality,omitempty" json:"quality,omitempty"` Quality string `bson:"quality,omitempty" json:"quality,omitempty"`
OpenData bool `bson:"open_data" json:"open_data" default:"false"` // Type is the type of the storage OpenData bool `bson:"open_data" json:"open_data" default:"false"` // Type is the type of the storage
@ -42,7 +42,7 @@ func (abs *DataResource) ConvertToPricedResource(
if t != tools.DATA_RESOURCE { if t != tools.DATA_RESOURCE {
return nil return nil
} }
p := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, request) p := abs.AbstractIntanciatedResource.ConvertToPricedResource(t, request)
priced := p.(*PricedResource) priced := p.(*PricedResource)
return &PricedDataResource{ return &PricedDataResource{
PricedResource: *priced, PricedResource: *priced,

View File

@ -12,7 +12,6 @@ type ResourceInterface interface {
ConvertToPricedResource(t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF ConvertToPricedResource(t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF
GetType() string GetType() string
GetSelectedInstance() utils.DBObject GetSelectedInstance() utils.DBObject
ClearEnv() utils.DBObject
SetAllowedInstances(request *tools.APIRequest) SetAllowedInstances(request *tools.APIRequest)
} }
@ -21,7 +20,6 @@ type ResourceInstanceITF interface {
GetID() string GetID() string
GetName() string GetName() string
StoreDraftDefault() StoreDraftDefault()
ClearEnv()
GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF
GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string) GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string)
ClearPeerGroups() ClearPeerGroups()

View File

@ -25,7 +25,7 @@ type ProcessingUsage struct {
* it defines the resource processing * it defines the resource processing
*/ */
type ProcessingResource struct { type ProcessingResource struct {
AbstractInstanciatedResource[*ProcessingInstance] AbstractIntanciatedResource[*ProcessingInstance]
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
IsService bool `json:"is_service,omitempty" bson:"is_service,omitempty"` // IsService is a flag that indicates if the processing is a service IsService bool `json:"is_service,omitempty" bson:"is_service,omitempty"` // IsService is a flag that indicates if the processing is a service
Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"` // Usage is the usage of the processing Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"` // Usage is the usage of the processing

View File

@ -47,12 +47,12 @@ func (r *AbstractResource) CanDelete() bool {
return r.IsDraft // only draft bookings can be deleted return r.IsDraft // only draft bookings can be deleted
} }
type AbstractInstanciatedResource[T ResourceInstanceITF] struct { type AbstractIntanciatedResource[T ResourceInstanceITF] struct {
AbstractResource // AbstractResource contains the basic fields of an object (id, name) AbstractResource // AbstractResource contains the basic fields of an object (id, name)
Instances []T `json:"instances,omitempty" bson:"instances,omitempty"` // Bill is the bill of the resource // Bill is the bill of the resource Instances []T `json:"instances,omitempty" bson:"instances,omitempty"` // Bill is the bill of the resource // Bill is the bill of the resource
} }
func (abs *AbstractInstanciatedResource[T]) ConvertToPricedResource( func (abs *AbstractIntanciatedResource[T]) ConvertToPricedResource(
t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF { t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF {
instances := map[string]string{} instances := map[string]string{}
profiles := []pricing.PricingProfileITF{} profiles := []pricing.PricingProfileITF{}
@ -71,14 +71,7 @@ func (abs *AbstractInstanciatedResource[T]) ConvertToPricedResource(
} }
} }
func (abs *AbstractInstanciatedResource[T]) ClearEnv() utils.DBObject { func (r *AbstractIntanciatedResource[T]) GetSelectedInstance() utils.DBObject {
for _, instance := range abs.Instances {
instance.ClearEnv()
}
return abs
}
func (r *AbstractInstanciatedResource[T]) GetSelectedInstance() utils.DBObject {
if r.SelectedInstanceIndex != nil && len(r.Instances) > *r.SelectedInstanceIndex { if r.SelectedInstanceIndex != nil && len(r.Instances) > *r.SelectedInstanceIndex {
return r.Instances[*r.SelectedInstanceIndex] return r.Instances[*r.SelectedInstanceIndex]
} }
@ -88,14 +81,11 @@ func (r *AbstractInstanciatedResource[T]) GetSelectedInstance() utils.DBObject {
return nil return nil
} }
func (abs *AbstractInstanciatedResource[T]) SetAllowedInstances(request *tools.APIRequest) { func (abs *AbstractIntanciatedResource[T]) SetAllowedInstances(request *tools.APIRequest) {
if request != nil && request.PeerID == abs.CreatorID && request.PeerID != "" {
return
}
abs.Instances = verifyAuthAction[T](abs.Instances, request) abs.Instances = verifyAuthAction[T](abs.Instances, request)
} }
func (d *AbstractInstanciatedResource[T]) Trim() { func (d *AbstractIntanciatedResource[T]) Trim() {
d.Type = d.GetType() d.Type = d.GetType()
if ok, _ := (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: d.CreatorID}}).IsMySelf(); !ok { if ok, _ := (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: d.CreatorID}}).IsMySelf(); !ok {
for _, instance := range d.Instances { for _, instance := range d.Instances {
@ -104,7 +94,7 @@ func (d *AbstractInstanciatedResource[T]) Trim() {
} }
} }
func (abs *AbstractInstanciatedResource[T]) VerifyAuth(request *tools.APIRequest) bool { func (abs *AbstractIntanciatedResource[T]) VerifyAuth(request *tools.APIRequest) bool {
return len(verifyAuthAction[T](abs.Instances, request)) > 0 || abs.AbstractObject.VerifyAuth(request) return len(verifyAuthAction[T](abs.Instances, request)) > 0 || abs.AbstractObject.VerifyAuth(request)
} }
@ -137,11 +127,6 @@ type GeoPoint struct {
Longitude float64 `json:"longitude,omitempty" bson:"longitude,omitempty"` Longitude float64 `json:"longitude,omitempty" bson:"longitude,omitempty"`
} }
type Credentials struct {
Login string `json:"login,omitempty" bson:"login,omitempty"`
Pass string `json:"password,omitempty" bson:"password,omitempty"`
}
type ResourceInstance[T ResourcePartnerITF] struct { type ResourceInstance[T ResourcePartnerITF] struct {
utils.AbstractObject utils.AbstractObject
Location GeoPoint `json:"location,omitempty" bson:"location,omitempty"` Location GeoPoint `json:"location,omitempty" bson:"location,omitempty"`
@ -153,12 +138,6 @@ type ResourceInstance[T ResourcePartnerITF] struct {
Partnerships []T `json:"partnerships,omitempty" bson:"partnerships,omitempty"` Partnerships []T `json:"partnerships,omitempty" bson:"partnerships,omitempty"`
} }
func (ri *ResourceInstance[T]) ClearEnv() {
ri.Env = []models.Param{}
ri.Inputs = []models.Param{}
ri.Outputs = []models.Param{}
}
func (ri *ResourceInstance[T]) GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF { func (ri *ResourceInstance[T]) GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF {
pricings := []pricing.PricingProfileITF{} pricings := []pricing.PricingProfileITF{}
for _, p := range ri.Partnerships { for _, p := range ri.Partnerships {

View File

@ -65,12 +65,6 @@ func (wfa *resourceMongoAccessor[T]) LoadAll(isDraft bool) ([]utils.ShallowDBObj
} }
func (wfa *resourceMongoAccessor[T]) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) { func (wfa *resourceMongoAccessor[T]) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) {
if filters == nil && search == "*" {
return utils.GenericLoadAll[T](func(d utils.DBObject) utils.ShallowDBObject {
d.(T).SetAllowedInstances(wfa.Request)
return d
}, isDraft, wfa)
}
return utils.GenericSearch[T](filters, search, wfa.getResourceFilter(search), return utils.GenericSearch[T](filters, search, wfa.getResourceFilter(search),
func(d utils.DBObject) utils.ShallowDBObject { func(d utils.DBObject) utils.ShallowDBObject {
d.(T).SetAllowedInstances(wfa.Request) d.(T).SetAllowedInstances(wfa.Request)
@ -79,6 +73,9 @@ func (wfa *resourceMongoAccessor[T]) Search(filters *dbs.Filters, search string,
} }
func (abs *resourceMongoAccessor[T]) getResourceFilter(search string) *dbs.Filters { func (abs *resourceMongoAccessor[T]) getResourceFilter(search string) *dbs.Filters {
if search == "*" {
search = ""
}
return &dbs.Filters{ return &dbs.Filters{
Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided
"abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, "abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},

View File

@ -17,7 +17,7 @@ import (
* it defines the resource storage * it defines the resource storage
*/ */
type StorageResource struct { type StorageResource struct {
AbstractInstanciatedResource[*StorageResourceInstance] // AbstractResource contains the basic fields of an object (id, name) AbstractIntanciatedResource[*StorageResourceInstance] // AbstractResource contains the basic fields of an object (id, name)
StorageType enum.StorageType `bson:"storage_type" json:"storage_type" default:"-1"` // Type is the type of the storage StorageType enum.StorageType `bson:"storage_type" json:"storage_type" default:"-1"` // Type is the type of the storage
Acronym string `bson:"acronym,omitempty" json:"acronym,omitempty"` // Acronym is the acronym of the storage Acronym string `bson:"acronym,omitempty" json:"acronym,omitempty"` // Acronym is the acronym of the storage
} }
@ -35,7 +35,7 @@ func (abs *StorageResource) ConvertToPricedResource(
if t != tools.STORAGE_RESOURCE { if t != tools.STORAGE_RESOURCE {
return nil return nil
} }
p := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, request) p := abs.AbstractIntanciatedResource.ConvertToPricedResource(t, request)
priced := p.(*PricedResource) priced := p.(*PricedResource)
return &PricedStorageResource{ return &PricedStorageResource{
PricedResource: *priced, PricedResource: *priced,
@ -44,7 +44,6 @@ func (abs *StorageResource) ConvertToPricedResource(
type StorageResourceInstance struct { type StorageResourceInstance struct {
ResourceInstance[*StorageResourcePartnership] ResourceInstance[*StorageResourcePartnership]
Credentials *Credentials `json:"credentials,omitempty" bson:"credentials,omitempty"`
Source string `bson:"source,omitempty" json:"source,omitempty"` // Source is the source of the storage Source string `bson:"source,omitempty" json:"source,omitempty"` // Source is the source of the storage
Local bool `bson:"local" json:"local"` Local bool `bson:"local" json:"local"`
SecurityLevel string `bson:"security_level,omitempty" json:"security_level,omitempty"` SecurityLevel string `bson:"security_level,omitempty" json:"security_level,omitempty"`
@ -55,13 +54,6 @@ type StorageResourceInstance struct {
Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` // Throughput is the throughput of the storage Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` // Throughput is the throughput of the storage
} }
func (ri *StorageResourceInstance) ClearEnv() {
ri.Credentials = nil
ri.Env = []models.Param{}
ri.Inputs = []models.Param{}
ri.Outputs = []models.Param{}
}
func (ri *StorageResourceInstance) StoreDraftDefault() { func (ri *StorageResourceInstance) StoreDraftDefault() {
found := false found := false
for _, p := range ri.ResourceInstance.Env { for _, p := range ri.ResourceInstance.Env {

View File

@ -23,10 +23,6 @@ func (r *WorkflowResource) GetType() string {
return tools.WORKFLOW_RESOURCE.String() return tools.WORKFLOW_RESOURCE.String()
} }
func (d *WorkflowResource) ClearEnv() utils.DBObject {
return d
}
func (d *WorkflowResource) Trim() { func (d *WorkflowResource) Trim() {
/* EMPTY */ /* EMPTY */
} }

View File

@ -91,7 +91,7 @@ func (ao *AbstractObject) UpToDate(user string, peer string, create bool) {
} }
func (ao *AbstractObject) VerifyAuth(request *tools.APIRequest) bool { func (ao *AbstractObject) VerifyAuth(request *tools.APIRequest) bool {
return ao.AccessMode == Public || (request != nil && ao.CreatorID == request.PeerID && request.PeerID != "") return ao.AccessMode == Public || (request != nil && ao.CreatorID == request.PeerID)
} }
func (ao *AbstractObject) GetObjectFilters(search string) *dbs.Filters { func (ao *AbstractObject) GetObjectFilters(search string) *dbs.Filters {

View File

@ -66,6 +66,7 @@ func GenericDeleteOne(id string, a Accessor) (DBObject, int, error) {
return nil, 403, errors.New("you are not allowed to delete :" + a.GetType().String()) return nil, 403, errors.New("you are not allowed to delete :" + a.GetType().String())
} }
if err != nil { if err != nil {
a.GetLogger().Error().Msg("Could not retrieve " + id + " to db. Error: " + err.Error())
return nil, code, err return nil, code, err
} }
if a.ShouldVerifyAuth() && !res.VerifyAuth(a.GetRequest()) { if a.ShouldVerifyAuth() && !res.VerifyAuth(a.GetRequest()) {
@ -113,6 +114,7 @@ func GenericLoadOne[T DBObject](id string, f func(DBObject) (DBObject, int, erro
var data T var data T
res_mongo, code, err := mongo.MONGOService.LoadOne(id, a.GetType().String()) res_mongo, code, err := mongo.MONGOService.LoadOne(id, a.GetType().String())
if err != nil { if err != nil {
a.GetLogger().Error().Msg("Could not retrieve " + id + " from db. Error: " + err.Error())
return nil, code, err return nil, code, err
} }
res_mongo.Decode(&data) res_mongo.Decode(&data)
@ -126,6 +128,7 @@ func genericLoadAll[T DBObject](res *mgb.Cursor, code int, err error, onlyDraft
objs := []ShallowDBObject{} objs := []ShallowDBObject{}
var results []T var results []T
if err != nil { if err != nil {
a.GetLogger().Error().Msg("Could not retrieve any from db. Error: " + err.Error())
return nil, code, err return nil, code, err
} }
if err = res.All(mongo.MngoCtx, &results); err != nil { if err = res.All(mongo.MngoCtx, &results); err != nil {

View File

@ -128,8 +128,8 @@ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) {
return true, nil return true, nil
} }
func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIRequest) (float64, map[tools.DataType]map[string]pricing.PricedItemITF, *Workflow, error) { func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIRequest) (float64, map[tools.DataType][]pricing.PricedItemITF, *Workflow, error) {
priceds := map[tools.DataType]map[string]pricing.PricedItemITF{} priceds := map[tools.DataType][]pricing.PricedItemITF{}
ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, wf, priceds, request, wf.Graph.IsProcessing, ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, wf, priceds, request, wf.Graph.IsProcessing,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) { func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
return start.Add(time.Duration(wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(), request)) * time.Second), priced.GetExplicitDurationInS() return start.Add(time.Duration(wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(), request)) * time.Second), priced.GetExplicitDurationInS()
@ -189,12 +189,12 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIR
return longest, priceds, wf, nil return longest, priceds, wf, nil
} }
func plan[T resources.ResourceInterface](dt tools.DataType, wf *Workflow, priceds map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest, func plan[T resources.ResourceInterface](dt tools.DataType, wf *Workflow, priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest,
f func(graph.GraphItem) bool, start func(resources.ResourceInterface, pricing.PricedItemITF) (time.Time, float64), end func(time.Time, float64) *time.Time) ([]T, map[tools.DataType]map[string]pricing.PricedItemITF, error) { f func(graph.GraphItem) bool, start func(resources.ResourceInterface, pricing.PricedItemITF) (time.Time, float64), end func(time.Time, float64) *time.Time) ([]T, map[tools.DataType][]pricing.PricedItemITF, error) {
resources := []T{} resources := []T{}
for _, item := range wf.GetGraphItems(f) { for _, item := range wf.GetGraphItems(f) {
if priceds[dt] == nil { if priceds[dt] == nil {
priceds[dt] = map[string]pricing.PricedItemITF{} priceds[dt] = []pricing.PricedItemITF{}
} }
dt, realItem := item.GetResource() dt, realItem := item.GetResource()
if realItem == nil { if realItem == nil {
@ -212,7 +212,7 @@ func plan[T resources.ResourceInterface](dt tools.DataType, wf *Workflow, priced
priced.SetLocationEnd(*e) priced.SetLocationEnd(*e)
} }
resources = append(resources, realItem.(T)) resources = append(resources, realItem.(T))
priceds[dt][item.ID] = priced priceds[dt] = append(priceds[dt], priced)
} }
return resources, priceds, nil return resources, priceds, nil
} }

View File

@ -95,12 +95,12 @@ func (a *workflowMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.
if set.(*Workflow).Graph != nil && set.(*Workflow).Graph.Partial { if set.(*Workflow).Graph != nil && set.(*Workflow).Graph.Partial {
return nil, 403, errors.New("you are not allowed to update a partial workflow") return nil, 403, errors.New("you are not allowed to update a partial workflow")
} }
res, code, err := utils.GenericUpdateOne(set, id, a, &Workflow{}) res, code, err := utils.GenericUpdateOne(a.verifyResource(set), id, a, &Workflow{})
if code != 200 { if code != 200 {
return nil, code, err return nil, code, err
} }
workflow := res.(*Workflow) workflow := res.(*Workflow)
a.execute(workflow, false, true) // update the workspace for the workflow a.execute(workflow, false, false) // update the workspace for the workflow
a.share(workflow, false, a.GetCaller()) // share the update to the peers a.share(workflow, false, a.GetCaller()) // share the update to the peers
return res, code, nil return res, code, nil
} }
@ -119,19 +119,12 @@ func (a *workflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, i
workflow := res.(*Workflow) workflow := res.(*Workflow)
a.share(workflow, false, a.GetCaller()) // share the creation to the peers a.share(workflow, false, a.GetCaller()) // share the creation to the peers
a.execute(workflow, false, true) // store the workspace for the workflow a.execute(workflow, false, false) // store the workspace for the workflow
return res, code, nil return res, code, nil
} }
// CopyOne copies a workflow in the database // CopyOne copies a workflow in the database
func (a *workflowMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) { func (a *workflowMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
wf := data.(*Workflow)
for _, item := range wf.Graph.Items {
_, obj := item.GetResource()
if obj != nil {
obj.ClearEnv()
}
}
return utils.GenericStoreOne(data, a) return utils.GenericStoreOne(data, a)
} }
@ -214,10 +207,10 @@ func (a *workflowMongoAccessor) verifyResource(obj utils.DBObject) utils.DBObjec
} else if t == tools.DATA_RESOURCE { } else if t == tools.DATA_RESOURCE {
access = resources.NewAccessor[*resources.DataResource](t, a.GetRequest(), func() utils.DBObject { return &resources.DataResource{} }) access = resources.NewAccessor[*resources.DataResource](t, a.GetRequest(), func() utils.DBObject { return &resources.DataResource{} })
} else { } else {
wf.Graph.Clear(resource.GetID()) wf.Graph.Clear(item.Data.GetID())
} }
if error := utils.VerifyAccess(access, resource.GetID()); error != nil { if error := utils.VerifyAccess(access, resource.GetID()); error != nil {
wf.Graph.Clear(resource.GetID()) wf.Graph.Clear(item.Data.GetID())
} }
} }
return wf return wf

View File

@ -15,41 +15,39 @@ import (
) )
/* /*
* WorkflowExecution is a struct that represents a list of workflow executions * WorkflowExecutions is a struct that represents a list of workflow executions
* Warning: No user can write (del, post, put) a workflow execution, it is only used by the system * Warning: No user can write (del, post, put) a workflow execution, it is only used by the system
* workflows generate their own executions * workflows generate their own executions
*/ */
type WorkflowExecution struct { type WorkflowExecutions struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
PeerBookByGraph map[string]map[string][]string `json:"peer_book_by_graph,omitempty" bson:"peer_book_by_graph,omitempty"` // BookByResource is a map of the resource id and the list of the booking id
ExecutionsID string `json:"executions_id,omitempty" bson:"executions_id,omitempty"`
ExecDate time.Time `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` // ExecDate is the execution date of the workflow, is required ExecDate time.Time `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` // ExecDate is the execution date of the workflow, is required
EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` // EndDate is the end date of the workflow EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` // EndDate is the end date of the workflow
State enum.BookingStatus `json:"state" bson:"state" default:"0"` // TEMPORARY TODO DEFAULT 1 -> 0 State is the state of the workflow State enum.BookingStatus `json:"state" bson:"state" default:"0"` // TEMPORARY TODO DEFAULT 1 -> 0 State is the state of the workflow
WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
} }
func (r *WorkflowExecution) StoreDraftDefault() { func (r *WorkflowExecutions) StoreDraftDefault() {
r.IsDraft = false // TODO: TEMPORARY r.IsDraft = true // TODO: TEMPORARY
r.State = enum.SCHEDULED r.State = enum.DRAFT
} }
func (r *WorkflowExecution) CanUpdate(set utils.DBObject) (bool, utils.DBObject) { func (r *WorkflowExecutions) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
if r.State != set.(*WorkflowExecution).State { if r.State != set.(*WorkflowExecutions).State {
return true, &WorkflowExecution{State: set.(*WorkflowExecution).State} // only state can be updated return true, &WorkflowExecutions{State: set.(*WorkflowExecutions).State} // only state can be updated
} }
return !r.IsDraft, set // only draft buying can be updated return !r.IsDraft, set // only draft buying can be updated
} }
func (r *WorkflowExecution) CanDelete() bool { func (r *WorkflowExecutions) CanDelete() bool {
return r.IsDraft // only draft bookings can be deleted return r.IsDraft // only draft bookings can be deleted
} }
func (wfa *WorkflowExecution) Equals(we *WorkflowExecution) bool { func (wfa *WorkflowExecutions) Equals(we *WorkflowExecutions) bool {
return wfa.ExecDate.Equal(we.ExecDate) && wfa.WorkflowID == we.WorkflowID return wfa.ExecDate.Equal(we.ExecDate) && wfa.WorkflowID == we.WorkflowID
} }
func (ws *WorkflowExecution) PurgeDraft(request *tools.APIRequest) error { func (ws *WorkflowExecutions) PurgeDraft(request *tools.APIRequest) error {
if ws.EndDate == nil { if ws.EndDate == nil {
// if no end... then Book like a savage // if no end... then Book like a savage
e := ws.ExecDate.Add(time.Hour) e := ws.ExecDate.Add(time.Hour)
@ -76,7 +74,7 @@ func (ws *WorkflowExecution) PurgeDraft(request *tools.APIRequest) error {
} }
// tool to transform the argo status to a state // tool to transform the argo status to a state
func (wfa *WorkflowExecution) ArgoStatusToState(status string) *WorkflowExecution { func (wfa *WorkflowExecutions) ArgoStatusToState(status string) *WorkflowExecutions {
status = strings.ToLower(status) status = strings.ToLower(status)
switch status { switch status {
case "succeeded": // Succeeded case "succeeded": // Succeeded
@ -91,56 +89,38 @@ func (wfa *WorkflowExecution) ArgoStatusToState(status string) *WorkflowExecutio
return wfa return wfa
} }
func (r *WorkflowExecution) GenerateID() { func (r *WorkflowExecutions) GenerateID() {
if r.UUID == "" {
r.UUID = uuid.New().String() r.UUID = uuid.New().String()
} }
}
func (d *WorkflowExecution) GetName() string { func (d *WorkflowExecutions) GetName() string {
return d.UUID + "_" + d.ExecDate.String() return d.UUID + "_" + d.ExecDate.String()
} }
func (d *WorkflowExecution) GetAccessor(request *tools.APIRequest) utils.Accessor { func (d *WorkflowExecutions) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request) // Create a new instance of the accessor return NewAccessor(request) // Create a new instance of the accessor
} }
func (d *WorkflowExecution) VerifyAuth(request *tools.APIRequest) bool { func (d *WorkflowExecutions) VerifyAuth(request *tools.APIRequest) bool {
return true return true
} }
func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[tools.DataType]map[string]pricing.PricedItemITF) []*booking.Booking { func (d *WorkflowExecutions) Book(wfID string, priceds map[tools.DataType][]pricing.PricedItemITF) []*booking.Booking {
booking := d.bookEach(executionsID, wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE]) booking := d.bookEach(wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE])
booking = append(booking, d.bookEach(executionsID, wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...) booking = append(booking, d.bookEach(wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...)
booking = append(booking,d.bookEach(executionsID, wfID, tools.COMPUTE_RESOURCE, priceds[tools.COMPUTE_RESOURCE])...)
booking = append(booking,d.bookEach(executionsID, wfID, tools.DATA_RESOURCE, priceds[tools.DATA_RESOURCE])...)
return booking return booking
} }
func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.DataType, priceds map[string]pricing.PricedItemITF) []*booking.Booking { func (d *WorkflowExecutions) bookEach(wfID string, dt tools.DataType, priceds []pricing.PricedItemITF) []*booking.Booking {
items := []*booking.Booking{} items := []*booking.Booking{}
for itemID, priced := range priceds { for _, priced := range priceds {
if d.PeerBookByGraph == nil {
d.PeerBookByGraph = map[string]map[string][]string{}
}
if d.PeerBookByGraph[priced.GetCreatorID()] == nil {
d.PeerBookByGraph[priced.GetCreatorID()] = map[string][]string{}
}
if d.PeerBookByGraph[priced.GetCreatorID()][itemID] == nil {
d.PeerBookByGraph[priced.GetCreatorID()][itemID] = []string{}
}
start := d.ExecDate start := d.ExecDate
if s := priced.GetLocationStart(); s != nil { if s := priced.GetLocationStart(); s != nil {
start = *s start = *s
} }
end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second) end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second)
bookingItem := &booking.Booking{ bookingItem := &booking.Booking{
AbstractObject: utils.AbstractObject{ State: enum.DRAFT,
UUID: uuid.New().String(),
Name: d.GetName() + "_" + executionsID + "_" + wfID,
},
ExecutionsID: executionsID,
State: enum.SCHEDULED,
ResourceID: priced.GetID(), ResourceID: priced.GetID(),
ResourceType: dt, ResourceType: dt,
DestPeerID: priced.GetCreatorID(), DestPeerID: priced.GetCreatorID(),
@ -150,8 +130,6 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.
ExpectedEndDate: &end, ExpectedEndDate: &end,
} }
items = append(items, bookingItem) items = append(items, bookingItem)
d.PeerBookByGraph[priced.GetCreatorID()][itemID] = append(
d.PeerBookByGraph[priced.GetCreatorID()][itemID], bookingItem.GetID())
} }
return items return items
} }

View File

@ -43,11 +43,11 @@ func (wfa *workflowExecutionMongoAccessor) DeleteOne(id string) (utils.DBObject,
} }
func (wfa *workflowExecutionMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { func (wfa *workflowExecutionMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
if set.(*WorkflowExecution).State == 0 { if set.(*WorkflowExecutions).State == 0 {
return nil, 400, errors.New("state is required") return nil, 400, errors.New("state is required")
} }
realSet := WorkflowExecution{State: set.(*WorkflowExecution).State} realSet := WorkflowExecutions{State: set.(*WorkflowExecutions).State}
return utils.GenericUpdateOne(&realSet, id, wfa, &WorkflowExecution{}) return utils.GenericUpdateOne(&realSet, id, wfa, &WorkflowExecutions{})
} }
func (wfa *workflowExecutionMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { func (wfa *workflowExecutionMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
@ -59,15 +59,13 @@ func (wfa *workflowExecutionMongoAccessor) CopyOne(data utils.DBObject) (utils.D
} }
func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne[*WorkflowExecution](id, func(d utils.DBObject) (utils.DBObject, int, error) { return utils.GenericLoadOne[*WorkflowExecutions](id, func(d utils.DBObject) (utils.DBObject, int, error) {
now := time.Now() if d.(*WorkflowExecutions).State == enum.DRAFT && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
now = now.Add(time.Second * -60)
if d.(*WorkflowExecution).State == enum.DRAFT && !a.shallow && now.UTC().After(d.(*WorkflowExecution).ExecDate) {
utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request)) utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request))
return nil, 404, errors.New("not found") return nil, 404, errors.New("not found")
} }
if d.(*WorkflowExecution).State == enum.SCHEDULED && !a.shallow && now.UTC().After(d.(*WorkflowExecution).ExecDate) { if d.(*WorkflowExecutions).State == enum.SCHEDULED && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
d.(*WorkflowExecution).State = enum.FORGOTTEN d.(*WorkflowExecutions).State = enum.FORGOTTEN
utils.GenericRawUpdateOne(d, id, newShallowAccessor(a.Request)) utils.GenericRawUpdateOne(d, id, newShallowAccessor(a.Request))
} }
return d, 200, nil return d, 200, nil
@ -75,23 +73,21 @@ func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int
} }
func (a *workflowExecutionMongoAccessor) LoadAll(isDraft bool) ([]utils.ShallowDBObject, int, error) { func (a *workflowExecutionMongoAccessor) LoadAll(isDraft bool) ([]utils.ShallowDBObject, int, error) {
return utils.GenericLoadAll[*WorkflowExecution](a.getExec(), isDraft, a) return utils.GenericLoadAll[*WorkflowExecutions](a.getExec(), isDraft, a)
} }
func (a *workflowExecutionMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) { func (a *workflowExecutionMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) {
return utils.GenericSearch[*WorkflowExecution](filters, search, a.GetExecFilters(search), a.getExec(), isDraft, a) return utils.GenericSearch[*WorkflowExecutions](filters, search, a.GetExecFilters(search), a.getExec(), isDraft, a)
} }
func (a *workflowExecutionMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject { func (a *workflowExecutionMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject {
return func(d utils.DBObject) utils.ShallowDBObject { return func(d utils.DBObject) utils.ShallowDBObject {
now := time.Now() if d.(*WorkflowExecutions).State == enum.DRAFT && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
now = now.Add(time.Second * -60)
if d.(*WorkflowExecution).State == enum.DRAFT && now.UTC().After(d.(*WorkflowExecution).ExecDate) {
utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request)) utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request))
return nil return nil
} }
if d.(*WorkflowExecution).State == enum.SCHEDULED && now.UTC().After(d.(*WorkflowExecution).ExecDate) { if d.(*WorkflowExecutions).State == enum.SCHEDULED && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
d.(*WorkflowExecution).State = enum.FORGOTTEN d.(*WorkflowExecutions).State = enum.FORGOTTEN
utils.GenericRawUpdateOne(d, d.GetID(), newShallowAccessor(a.Request)) utils.GenericRawUpdateOne(d, d.GetID(), newShallowAccessor(a.Request))
return d return d
} }

View File

@ -4,16 +4,13 @@ import (
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
"sync"
"time" "time"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/tools" "cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
"github.com/robfig/cron" "github.com/robfig/cron"
) )
@ -23,9 +20,8 @@ import (
*/ */
// it's a flying object only use in a session time. It's not stored in the database // it's a flying object only use in a session time. It's not stored in the database
type WorkflowSchedule struct { type WorkflowSchedule struct {
UUID string `json:"id" validate:"required"` // ExecutionsID is the list of the executions id of the workflow
Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule
WorkflowExecution []*WorkflowExecution `json:"workflow_executions,omitempty"` // WorkflowExecution is the list of executions of the workflow WorkflowExecutions []*WorkflowExecutions `json:"workflow_executions,omitempty"` // WorkflowExecutions is the list of executions of the workflow
Message string `json:"message,omitempty"` // Message is the message of the schedule Message string `json:"message,omitempty"` // Message is the message of the schedule
Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule
Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time
@ -40,7 +36,6 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W
return nil return nil
} }
ws := &WorkflowSchedule{ ws := &WorkflowSchedule{
UUID: uuid.New().String(),
Start: s, Start: s,
DurationS: durationInS, DurationS: durationInS,
Cron: cron, Cron: cron,
@ -52,19 +47,19 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W
return ws return ws
} }
func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) { func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecutions, error) {
if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" { if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" {
return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined") return false, nil, []*WorkflowExecutions{}, errors.New("no caller defined")
} }
access := workflow.NewAccessor(request) access := workflow.NewAccessor(request)
res, code, err := access.LoadOne(wfID) res, code, err := access.LoadOne(wfID)
if code != 200 { if code != 200 {
return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("could not load the workflow with id: " + err.Error()) return false, nil, []*WorkflowExecutions{}, errors.New("could not load the workflow with id: " + err.Error())
} }
wf := res.(*workflow.Workflow) wf := res.(*workflow.Workflow)
longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request) longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request)
if err != nil { if err != nil {
return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err return false, wf, []*WorkflowExecutions{}, err
} }
ws.DurationS = longest ws.DurationS = longest
ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds." ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds."
@ -73,131 +68,58 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
} }
execs, err := ws.getExecutions(wf) execs, err := ws.getExecutions(wf)
if err != nil { if err != nil {
return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err return false, wf, []*WorkflowExecutions{}, err
} }
bookings := []*booking.Booking{}
for _, exec := range execs { for _, exec := range execs {
bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...) bookings := exec.Book(wfID, priceds)
}
errCh := make(chan error, len(bookings))
var m sync.Mutex
for _, b := range bookings { for _, b := range bookings {
go getBooking(b, request, wf, execs, bookings, errCh, &m) meth := request.Caller.URLS[tools.BOOKING][tools.GET]
}
for i := 0; i < len(bookings); i++ {
if err := <-errCh; err != nil {
return false, wf, execs, bookings, err
}
}
return true, wf, execs, bookings, nil
}
func getBooking( b *booking.Booking, request *tools.APIRequest, wf *workflow.Workflow, execs []*WorkflowExecution, bookings []*booking.Booking, errCh chan error, m *sync.Mutex) {
m.Lock()
c, err := getCallerCopy(request, errCh)
if err != nil {
errCh <- err
return
}
m.Unlock()
meth := c.URLS[tools.BOOKING][tools.GET]
meth = strings.ReplaceAll(meth, ":id", b.ResourceID) meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05")) meth = strings.ReplaceAll(meth, ":start_date", b.ExpectedStartDate.Format("2006-01-02T15:04:05"))
meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05")) meth = strings.ReplaceAll(meth, ":end_date", b.ExpectedEndDate.Format("2006-01-02T15:04:05"))
c.URLS[tools.BOOKING][tools.GET] = meth request.Caller.URLS[tools.BOOKING][tools.GET] = meth
_, err = (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, &c) _, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller)
if err != nil { if err != nil {
errCh <- err return false, wf, execs, err
return }
}
}
return true, wf, execs, nil
} }
errCh <- nil func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecutions, error) {
}
func getCallerCopy(request *tools.APIRequest, errCh chan error) (tools.HTTPCaller, error) {
var c tools.HTTPCaller
err := request.Caller.DeepCopy(c)
if err != nil {
errCh <- err
return tools.HTTPCaller{}, nil
}
c.URLS = request.Caller.URLS
return c, err
}
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) {
if request == nil { if request == nil {
return ws, nil, []*WorkflowExecution{}, errors.New("no request found") return ws, nil, []*WorkflowExecutions{}, errors.New("no request found")
} }
c := request.Caller c := request.Caller
if c == nil || c.URLS == nil || c.URLS[tools.BOOKING] == nil { if c == nil || c.URLS == nil || c.URLS[tools.BOOKING] == nil {
return ws, nil, []*WorkflowExecution{}, errors.New("no caller defined") return ws, nil, []*WorkflowExecutions{}, errors.New("no caller defined")
} }
methods := c.URLS[tools.BOOKING] methods := c.URLS[tools.BOOKING]
if _, ok := methods[tools.GET]; !ok { if _, ok := methods[tools.GET]; !ok {
return ws, nil, []*WorkflowExecution{}, errors.New("no path found") return ws, nil, []*WorkflowExecutions{}, errors.New("no path found")
} }
ok, wf, executions, bookings, err := ws.CheckBooking(wfID, request) ok, wf, executions, err := ws.CheckBooking(wfID, request)
ws.WorkflowExecution = executions
if !ok || err != nil { if !ok || err != nil {
return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err)) return ws, nil, []*WorkflowExecutions{}, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
} }
ws.Workflow = wf ws.Workflow = wf
ws.WorkflowExecutions = executions
var errCh = make(chan error, len(bookings))
var m sync.Mutex
for _, booking := range bookings {
go ws.BookExecs(booking, request, errCh, &m)
}
for i := 0; i < len(bookings); i++ {
if err := <- errCh ; err != nil {
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
}
}
fmt.Println("Schedules")
for _, exec := range executions { for _, exec := range executions {
err := exec.PurgeDraft(request) err := exec.PurgeDraft(request)
if err != nil { if err != nil {
return ws, nil, []*WorkflowExecution{}, errors.New("purge draft" + fmt.Sprintf("%v", err)) return ws, nil, []*WorkflowExecutions{}, errors.New("purge draft" + fmt.Sprintf("%v", err))
} }
exec.GenerateID()
exec.StoreDraftDefault() exec.StoreDraftDefault()
// Should DELETE the previous execution2
utils.GenericStoreOne(exec, NewAccessor(request)) utils.GenericStoreOne(exec, NewAccessor(request))
} }
fmt.Println("Schedules")
return ws, wf, executions, nil return ws, wf, executions, nil
} }
func (ws *WorkflowSchedule) BookExecs(booking *booking.Booking, request *tools.APIRequest, errCh chan error, m *sync.Mutex) {
m.Lock()
c, err := getCallerCopy(request, errCh)
if err != nil {
errCh <- err
return
}
m.Unlock()
_, err = (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), &c)
if err != nil {
errCh <- err
return
}
errCh <- nil
}
/* /*
BOOKING IMPLIED TIME, not of subscription but of execution BOOKING IMPLIED TIME, not of subscription but of execution
so is processing time execution time applied on computes so is processing time execution time applied on computes
@ -210,19 +132,17 @@ VERIFY THAT WE HANDLE DIFFERENCE BETWEEN LOCATION TIME && BOOKING
* getExecutions is a function that returns the executions of a workflow * getExecutions is a function that returns the executions of a workflow
* it returns an array of workflow_execution.WorkflowExecution * it returns an array of workflow_execution.WorkflowExecution
*/ */
func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*WorkflowExecution, error) { func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*WorkflowExecutions, error) {
workflows_executions := []*WorkflowExecution{} workflows_executions := []*WorkflowExecutions{}
dates, err := ws.getDates() dates, err := ws.getDates()
if err != nil { if err != nil {
return workflows_executions, err return workflows_executions, err
} }
for _, date := range dates { for _, date := range dates {
obj := &WorkflowExecution{ obj := &WorkflowExecutions{
AbstractObject: utils.AbstractObject{ AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(), // set the uuid of the execution
Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution
}, },
ExecutionsID: ws.UUID,
ExecDate: date.Start, // set the execution date ExecDate: date.Start, // set the execution date
EndDate: date.End, // set the end date EndDate: date.End, // set the end date
State: enum.DRAFT, // set the state to 1 (scheduled) State: enum.DRAFT, // set the state to 1 (scheduled)

View File

@ -73,10 +73,8 @@ func (a *workspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject,
filters := &dbs.Filters{ filters := &dbs.Filters{
Or: map[string][]dbs.Filter{ Or: map[string][]dbs.Filter{
"abstractobject.name": {{Operator: dbs.LIKE.String(), Value: data.GetName() + "_workspace"}}, "abstractobject.name": {{Operator: dbs.LIKE.String(), Value: data.GetName() + "_workspace"}},
"abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: a.GetPeerID()}},
}, },
} }
// filters *dbs.Filters, word string, isDraft bool
res, _, err := a.Search(filters, "", true) // Search for the workspace res, _, err := a.Search(filters, "", true) // Search for the workspace
if err == nil && len(res) > 0 { // If the workspace already exists, return an error if err == nil && len(res) > 0 { // If the workspace already exists, return an error
return nil, 409, errors.New("a workspace with the same name already exists") return nil, 409, errors.New("a workspace with the same name already exists")

View File

@ -7,7 +7,6 @@ import (
"cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/dbs/mongo" "cloud.o-forge.io/core/oc-lib/dbs/mongo"
"cloud.o-forge.io/core/oc-lib/logs"
beego "github.com/beego/beego/v2/server/web" beego "github.com/beego/beego/v2/server/web"
) )
@ -116,8 +115,8 @@ func (a *API) SubscribeRouter(infos []*beego.ControllerInfo) {
// CheckRemotePeer checks the state of a remote peer // CheckRemotePeer checks the state of a remote peer
func (a *API) CheckRemotePeer(url string) (State, map[string]int) { func (a *API) CheckRemotePeer(url string) (State, map[string]int) {
// Check if the database is up // Check if the database is up
var resp APIStatusResponse
caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller
var resp APIStatusResponse
b, err := caller.CallPost(url, "", map[string]interface{}{}) // Call the status endpoint of the peer b, err := caller.CallPost(url, "", map[string]interface{}{}) // Call the status endpoint of the peer
if err != nil { if err != nil {
return DEAD, map[string]int{} // If the peer is not reachable, return dead return DEAD, map[string]int{} // If the peer is not reachable, return dead
@ -136,7 +135,6 @@ func (a *API) CheckRemotePeer(url string) (State, map[string]int) {
// CheckRemoteAPIs checks the state of remote APIs from your proper OC // CheckRemoteAPIs checks the state of remote APIs from your proper OC
func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error) { func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error) {
// Check if the database is up // Check if the database is up
l := logs.GetLogger()
new := map[string]string{} new := map[string]string{}
caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller
code := 0 code := 0
@ -147,7 +145,6 @@ func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error)
var resp APIStatusResponse var resp APIStatusResponse
b, err := caller.CallGet("http://"+api.API()+":8080", "/oc/version/status") // Call the status endpoint of the remote API (standard OC status endpoint) b, err := caller.CallGet("http://"+api.API()+":8080", "/oc/version/status") // Call the status endpoint of the remote API (standard OC status endpoint)
if err != nil { if err != nil {
l.Error().Msg(api.String() + " not reachable")
state = REDUCED_SERVICE // If a remote API is not reachable, return reduced service state = REDUCED_SERVICE // If a remote API is not reachable, return reduced service
continue continue
} }
@ -164,7 +161,6 @@ func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error)
reachable = true // If the remote API is reachable, set reachable to true cause we are not dead reachable = true // If the remote API is reachable, set reachable to true cause we are not dead
} }
if !reachable { if !reachable {
l.Error().Msg("Peer check returned no answers")
state = DEAD // If no remote API is reachable, return dead, nobody is alive state = DEAD // If no remote API is reachable, return dead, nobody is alive
} }
if code > 0 { if code > 0 {

View File

@ -21,11 +21,6 @@ const (
WORKSPACE_HISTORY WORKSPACE_HISTORY
ORDER ORDER
PURCHASE_RESOURCE PURCHASE_RESOURCE
ADMIRALTY_SOURCE
ADMIRALTY_TARGET
ADMIRALTY_SECRET
ADMIRALTY_KUBECONFIG
ADMIRALTY_NODES
) )
var NOAPI = "" var NOAPI = ""
@ -35,11 +30,6 @@ var WORKFLOWAPI = "oc-workflow"
var WORKSPACEAPI = "oc-workspace" var WORKSPACEAPI = "oc-workspace"
var PEERSAPI = "oc-peer" var PEERSAPI = "oc-peer"
var DATACENTERAPI = "oc-datacenter" var DATACENTERAPI = "oc-datacenter"
var ADMIRALTY_SOURCEAPI = DATACENTERAPI+"/admiralty/source"
var ADMIRALTY_TARGETAPI = DATACENTERAPI+"/admiralty/target"
var ADMIRALTY_SECRETAPI = DATACENTERAPI+"/admiralty/secret"
var ADMIRALTY_KUBECONFIGAPI = DATACENTERAPI+"/admiralty/kubeconfig"
var ADMIRALTY_NODESAPI = DATACENTERAPI+"/admiralty/node"
// Bind the standard API name to the data type // Bind the standard API name to the data type
var DefaultAPI = [...]string{ var DefaultAPI = [...]string{
@ -60,11 +50,6 @@ var DefaultAPI = [...]string{
NOAPI, NOAPI,
NOAPI, NOAPI,
NOAPI, NOAPI,
ADMIRALTY_SOURCEAPI,
ADMIRALTY_TARGETAPI,
ADMIRALTY_SECRETAPI,
ADMIRALTY_KUBECONFIGAPI,
ADMIRALTY_NODESAPI,
} }
// Bind the standard data name to the data type // Bind the standard data name to the data type
@ -86,11 +71,6 @@ var Str = [...]string{
"workspace_history", "workspace_history",
"order", "order",
"purchase_resource", "purchase_resource",
"admiralty_source",
"admiralty_target",
"admiralty_secret",
"admiralty_kubeconfig",
"admiralty_node",
} }
func FromInt(i int) string { func FromInt(i int) string {
@ -111,5 +91,5 @@ func (d DataType) EnumIndex() int {
} }
func DataTypeList() []DataType { func DataTypeList() []DataType {
return []DataType{DATA_RESOURCE, PROCESSING_RESOURCE, STORAGE_RESOURCE, COMPUTE_RESOURCE, WORKFLOW_RESOURCE, WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY, ORDER, PURCHASE_RESOURCE,ADMIRALTY_SOURCE,ADMIRALTY_TARGET,ADMIRALTY_SECRET,ADMIRALTY_KUBECONFIG,ADMIRALTY_NODES} return []DataType{DATA_RESOURCE, PROCESSING_RESOURCE, STORAGE_RESOURCE, COMPUTE_RESOURCE, WORKFLOW_RESOURCE, WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY, ORDER, PURCHASE_RESOURCE}
} }

View File

@ -3,7 +3,6 @@ package tools
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"net/http" "net/http"
"net/url" "net/url"
@ -52,7 +51,6 @@ var HTTPCallerInstance = &HTTPCaller{} // Singleton instance of the HTTPCaller
type HTTPCaller struct { type HTTPCaller struct {
URLS map[DataType]map[METHOD]string // Map of the different methods and their urls URLS map[DataType]map[METHOD]string // Map of the different methods and their urls
Disabled bool // Disabled flag Disabled bool // Disabled flag
LastResults map[string]interface{} // Used to store information regarding the last execution of a given method on a given data type
} }
// NewHTTPCaller creates a new instance of the HTTP Caller // NewHTTPCaller creates a new instance of the HTTP Caller
@ -63,16 +61,6 @@ func NewHTTPCaller(urls map[DataType]map[METHOD]string) *HTTPCaller {
} }
} }
// Creates a copy of the current caller, in order to have parallelized executions without race condition
func (c* HTTPCaller) DeepCopy(dst HTTPCaller) error {
bytes, err := json.Marshal(c)
if err != nil {
return err
}
return json.Unmarshal(bytes, &dst)
}
// CallGet calls the GET method on the HTTP server // CallGet calls the GET method on the HTTP server
func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) ([]byte, error) { func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, url+subpath, bytes.NewBuffer([]byte(""))) req, err := http.NewRequest(http.MethodGet, url+subpath, bytes.NewBuffer([]byte("")))
@ -88,33 +76,17 @@ func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) (
return nil, err return nil, err
} }
defer resp.Body.Close() defer resp.Body.Close()
err = caller.StoreResp(resp) return io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return caller.LastResults["body"].([]byte), nil
} }
// CallPut calls the DELETE method on the HTTP server // CallPut calls the DELETE method on the HTTP server
func (caller *HTTPCaller) CallDelete(url string, subpath string) ([]byte, error) { func (caller *HTTPCaller) CallDelete(url string, subpath string) ([]byte, error) {
req, err := http.NewRequest("DELETE", url+subpath, nil) resp, err := http.NewRequest("DELETE", url+subpath, nil)
if err != nil { if err != nil || resp == nil || resp.Body == nil {
return nil, err
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil || req == nil || req.Body == nil {
return nil, err return nil, err
} }
defer resp.Body.Close() defer resp.Body.Close()
return io.ReadAll(resp.Body)
err = caller.StoreResp(resp)
if err != nil {
return nil, err
}
return caller.LastResults["body"].([]byte), nil
} }
// CallPost calls the POST method on the HTTP server // CallPost calls the POST method on the HTTP server
@ -133,12 +105,7 @@ func (caller *HTTPCaller) CallPost(url string, subpath string, body interface{},
return nil, err return nil, err
} }
defer resp.Body.Close() defer resp.Body.Close()
err = caller.StoreResp(resp) return io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return caller.LastResults["body"].([]byte), nil
} }
// CallPost calls the POST method on the HTTP server // CallPost calls the POST method on the HTTP server
@ -156,12 +123,7 @@ func (caller *HTTPCaller) CallPut(url string, subpath string, body map[string]in
return nil, err return nil, err
} }
defer resp.Body.Close() defer resp.Body.Close()
err = caller.StoreResp(resp) return io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return caller.LastResults["body"].([]byte), nil
} }
// CallRaw calls the Raw method on the HTTP server // CallRaw calls the Raw method on the HTTP server
@ -181,12 +143,7 @@ func (caller *HTTPCaller) CallRaw(method string, url string, subpath string,
req.AddCookie(c) req.AddCookie(c)
} }
client := &http.Client{} client := &http.Client{}
resp, err := client.Do(req) return client.Do(req)
if err != nil {
return nil, err
}
return resp, nil
} }
// CallRaw calls the Raw method on the HTTP server // CallRaw calls the Raw method on the HTTP server
@ -206,17 +163,3 @@ func (caller *HTTPCaller) CallForm(method string, url string, subpath string,
client := &http.Client{} client := &http.Client{}
return client.Do(req) return client.Do(req)
} }
func (caller *HTTPCaller) StoreResp(resp *http.Response) error {
caller.LastResults = make(map[string]interface{})
caller.LastResults["header"] = resp.Header
caller.LastResults["code"] = resp.StatusCode
data, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("Error reading the body of the last request")
return err
}
caller.LastResults["body"] = data
return nil
}