Compare commits
15 Commits
bugfix/sch
...
payment
Author | SHA1 | Date | |
---|---|---|---|
|
c53e25e69a | ||
|
4aebb48e73 | ||
|
c9690fc1ba | ||
|
cd10104089 | ||
|
f6bc19123c | ||
|
305e5919f9 | ||
|
2ee2320a03 | ||
|
9a4cf686ca | ||
|
5376cf7681 | ||
|
2f0eb9ca35 | ||
|
c1d0932301 | ||
|
82cb2aa76a | ||
|
2cbbb49ce2 | ||
|
3c615fd3bc | ||
91d3e5c3cd |
@@ -26,12 +26,12 @@ import (
|
|||||||
func GetConfLoader() *onion.Onion {
|
func GetConfLoader() *onion.Onion {
|
||||||
logger := zerolog.New(os.Stdout).With().Timestamp().Logger()
|
logger := zerolog.New(os.Stdout).With().Timestamp().Logger()
|
||||||
AppName := GetAppName()
|
AppName := GetAppName()
|
||||||
EnvPrefix := "OC_"
|
EnvPrefix := strings.ToUpper(AppName[0:2]+AppName[3:]) + "_"
|
||||||
defaultConfigFile := "/etc/oc/" + AppName[3:] + ".json"
|
defaultConfigFile := "/etc/oc/" + AppName[3:] + ".json"
|
||||||
localConfigFile := "./" + AppName[3:] + ".json"
|
localConfigFile := "./" + AppName[3:] + ".json"
|
||||||
var configFile string
|
var configFile string
|
||||||
var o *onion.Onion
|
var o *onion.Onion
|
||||||
l3 := GetEnvVarLayer(EnvPrefix)
|
l3 := onion.NewEnvLayerPrefix("_", EnvPrefix)
|
||||||
l2, err := onion.NewFileLayer(localConfigFile, nil)
|
l2, err := onion.NewFileLayer(localConfigFile, nil)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
logger.Info().Msg("Local config file found " + localConfigFile + ", overriding default file")
|
logger.Info().Msg("Local config file found " + localConfigFile + ", overriding default file")
|
||||||
@@ -54,17 +54,3 @@ func GetConfLoader() *onion.Onion {
|
|||||||
}
|
}
|
||||||
return o
|
return o
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetEnvVarLayer(prefix string) onion.Layer {
|
|
||||||
envVars := make(map[string]interface{})
|
|
||||||
|
|
||||||
for _, e := range os.Environ() {
|
|
||||||
pair := strings.SplitN(e, "=", 2)
|
|
||||||
key := pair[0]
|
|
||||||
if strings.HasPrefix(key, prefix) {
|
|
||||||
envVars[strings.TrimPrefix(key, prefix)] = pair[1]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return onion.NewMapLayer(envVars)
|
|
||||||
}
|
|
||||||
|
@@ -261,7 +261,7 @@ func (r *Request) Schedule(wfID string, scheduler *workflow_execution.WorkflowSc
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *Request) CheckBooking(wfID string, start string, end string, durationInS float64, cron string) bool {
|
func (r *Request) CheckBooking(wfID string, start string, end string, durationInS float64, cron string) bool {
|
||||||
ok, _, _, _, err := workflow_execution.NewScheduler(start, end, durationInS, cron).CheckBooking(wfID, &tools.APIRequest{
|
ok, _, _, err := workflow_execution.NewScheduler(start, end, durationInS, cron).CheckBooking(wfID, &tools.APIRequest{
|
||||||
Caller: r.caller,
|
Caller: r.caller,
|
||||||
Username: r.user,
|
Username: r.user,
|
||||||
PeerID: r.peerID,
|
PeerID: r.peerID,
|
||||||
@@ -563,9 +563,9 @@ func (l *LibData) ToRule() *rule.Rule {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LibData) ToWorkflowExecution() *workflow_execution.WorkflowExecution {
|
func (l *LibData) ToWorkflowExecution() *workflow_execution.WorkflowExecutions {
|
||||||
if l.Data.GetAccessor(nil).GetType() == tools.WORKFLOW_EXECUTION {
|
if l.Data.GetAccessor(nil).GetType() == tools.WORKFLOW_EXECUTION {
|
||||||
return l.Data.(*workflow_execution.WorkflowExecution)
|
return l.Data.(*workflow_execution.WorkflowExecutions)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@@ -15,7 +15,6 @@ import (
|
|||||||
*/
|
*/
|
||||||
type Booking struct {
|
type Booking struct {
|
||||||
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
|
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
|
||||||
ExecutionsID string `json:"executions_id,omitempty" bson:"executions_id,omitempty" validate:"required"` // ExecutionsID is the ID of the executions
|
|
||||||
DestPeerID string `json:"dest_peer_id,omitempty"` // DestPeerID is the ID of the destination peer
|
DestPeerID string `json:"dest_peer_id,omitempty"` // DestPeerID is the ID of the destination peer
|
||||||
WorkflowID string `json:"workflow_id,omitempty" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
|
WorkflowID string `json:"workflow_id,omitempty" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
|
||||||
ExecutionID string `json:"execution_id,omitempty" bson:"execution_id,omitempty" validate:"required"`
|
ExecutionID string `json:"execution_id,omitempty" bson:"execution_id,omitempty" validate:"required"`
|
||||||
@@ -81,6 +80,10 @@ func (d *Booking) GetDelayOnDuration() time.Duration {
|
|||||||
return d.GetRealDuration() - d.GetUsualDuration()
|
return d.GetRealDuration() - d.GetUsualDuration()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *Booking) GetName() string {
|
||||||
|
return d.GetID() + "_" + d.ExpectedStartDate.String()
|
||||||
|
}
|
||||||
|
|
||||||
func (d *Booking) GetAccessor(request *tools.APIRequest) utils.Accessor {
|
func (d *Booking) GetAccessor(request *tools.APIRequest) utils.Accessor {
|
||||||
return NewAccessor(request) // Create a new instance of the accessor
|
return NewAccessor(request) // Create a new instance of the accessor
|
||||||
}
|
}
|
||||||
@@ -90,7 +93,7 @@ func (d *Booking) VerifyAuth(request *tools.APIRequest) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (r *Booking) StoreDraftDefault() {
|
func (r *Booking) StoreDraftDefault() {
|
||||||
r.IsDraft = false
|
r.IsDraft = true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
|
func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
|
||||||
|
@@ -1,7 +1,6 @@
|
|||||||
package booking
|
package booking
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"cloud.o-forge.io/core/oc-lib/dbs"
|
"cloud.o-forge.io/core/oc-lib/dbs"
|
||||||
@@ -34,11 +33,7 @@ func (a *bookingMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *bookingMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
|
func (a *bookingMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
|
||||||
if set.(*Booking).State == 0 {
|
return utils.GenericUpdateOne(set, id, a, &Booking{})
|
||||||
return nil, 400, errors.New("state is required")
|
|
||||||
}
|
|
||||||
realSet := &Booking{State: set.(*Booking).State}
|
|
||||||
return utils.GenericUpdateOne(realSet, id, a, &Booking{})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *bookingMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
|
func (a *bookingMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
|
||||||
@@ -51,15 +46,13 @@ func (a *bookingMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int
|
|||||||
|
|
||||||
func (a *bookingMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
|
func (a *bookingMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
|
||||||
return utils.GenericLoadOne[*Booking](id, func(d utils.DBObject) (utils.DBObject, int, error) {
|
return utils.GenericLoadOne[*Booking](id, func(d utils.DBObject) (utils.DBObject, int, error) {
|
||||||
now := time.Now()
|
if d.(*Booking).State == enum.DRAFT && time.Now().UTC().After(d.(*Booking).ExpectedStartDate) {
|
||||||
now = now.Add(time.Second * -60)
|
|
||||||
if d.(*Booking).State == enum.DRAFT && now.UTC().After(d.(*Booking).ExpectedStartDate) {
|
|
||||||
return utils.GenericDeleteOne(d.GetID(), a)
|
return utils.GenericDeleteOne(d.GetID(), a)
|
||||||
}
|
}
|
||||||
if (d.(*Booking).ExpectedEndDate) == nil {
|
if (d.(*Booking).ExpectedEndDate) == nil {
|
||||||
d.(*Booking).State = enum.FORGOTTEN
|
d.(*Booking).State = enum.FORGOTTEN
|
||||||
utils.GenericRawUpdateOne(d, id, a)
|
utils.GenericRawUpdateOne(d, id, a)
|
||||||
} else if d.(*Booking).State == enum.SCHEDULED && now.UTC().After(d.(*Booking).ExpectedStartDate) {
|
} else if d.(*Booking).State == enum.SCHEDULED && time.Now().UTC().After(*&d.(*Booking).ExpectedStartDate) {
|
||||||
d.(*Booking).State = enum.DELAYED
|
d.(*Booking).State = enum.DELAYED
|
||||||
utils.GenericRawUpdateOne(d, id, a)
|
utils.GenericRawUpdateOne(d, id, a)
|
||||||
}
|
}
|
||||||
@@ -77,13 +70,11 @@ func (a *bookingMongoAccessor) Search(filters *dbs.Filters, search string, isDra
|
|||||||
|
|
||||||
func (a *bookingMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject {
|
func (a *bookingMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject {
|
||||||
return func(d utils.DBObject) utils.ShallowDBObject {
|
return func(d utils.DBObject) utils.ShallowDBObject {
|
||||||
now := time.Now()
|
if d.(*Booking).State == enum.DRAFT && time.Now().UTC().After(d.(*Booking).ExpectedStartDate) {
|
||||||
now = now.Add(time.Second * -60)
|
|
||||||
if d.(*Booking).State == enum.DRAFT && now.UTC().After(d.(*Booking).ExpectedStartDate) {
|
|
||||||
utils.GenericDeleteOne(d.GetID(), a)
|
utils.GenericDeleteOne(d.GetID(), a)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if d.(*Booking).State == enum.SCHEDULED && now.UTC().After(d.(*Booking).ExpectedStartDate) {
|
if d.(*Booking).State == enum.SCHEDULED && time.Now().UTC().After(*&d.(*Booking).ExpectedStartDate) {
|
||||||
d.(*Booking).State = enum.DELAYED
|
d.(*Booking).State = enum.DELAYED
|
||||||
utils.GenericRawUpdateOne(d, d.GetID(), a)
|
utils.GenericRawUpdateOne(d, d.GetID(), a)
|
||||||
}
|
}
|
||||||
|
@@ -42,15 +42,13 @@ const (
|
|||||||
S3
|
S3
|
||||||
MEMORY
|
MEMORY
|
||||||
HARDWARE
|
HARDWARE
|
||||||
AZURE
|
|
||||||
GCS
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// String() - Returns the string representation of the storage type
|
// String() - Returns the string representation of the storage type
|
||||||
func (t StorageType) String() string {
|
func (t StorageType) String() string {
|
||||||
return [...]string{"FILE", "STREAM", "API", "DATABASE", "S3", "MEMORY", "HARDWARE", "AZURE", "GCS"}[t]
|
return [...]string{"FILE", "STREAM", "API", "DATABASE", "S3", "MEMORY", "HARDWARE"}[t]
|
||||||
}
|
}
|
||||||
|
|
||||||
func TypeList() []StorageType {
|
func TypeList() []StorageType {
|
||||||
return []StorageType{FILE, STREAM, API, DATABASE, S3, MEMORY, HARDWARE, AZURE, GCS}
|
return []StorageType{FILE, STREAM, API, DATABASE, S3, MEMORY, HARDWARE}
|
||||||
}
|
}
|
||||||
|
@@ -7,7 +7,7 @@ import (
|
|||||||
"cloud.o-forge.io/core/oc-lib/tools"
|
"cloud.o-forge.io/core/oc-lib/tools"
|
||||||
)
|
)
|
||||||
|
|
||||||
func GetPlannerNearestStart(start time.Time, planned map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest) float64 {
|
func GetPlannerNearestStart(start time.Time, planned map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 {
|
||||||
near := float64(10000000000) // set a high value
|
near := float64(10000000000) // set a high value
|
||||||
for _, items := range planned { // loop through the planned items
|
for _, items := range planned { // loop through the planned items
|
||||||
for _, priced := range items { // loop through the priced items
|
for _, priced := range items { // loop through the priced items
|
||||||
@@ -23,7 +23,7 @@ func GetPlannerNearestStart(start time.Time, planned map[tools.DataType]map[stri
|
|||||||
return near
|
return near
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetPlannerLongestTime(end *time.Time, planned map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest) float64 {
|
func GetPlannerLongestTime(end *time.Time, planned map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 {
|
||||||
if end == nil {
|
if end == nil {
|
||||||
return -1
|
return -1
|
||||||
}
|
}
|
||||||
|
@@ -28,7 +28,7 @@ var models = map[string]func() utils.DBObject{
|
|||||||
tools.STORAGE_RESOURCE.String(): func() utils.DBObject { return &resource.StorageResource{} },
|
tools.STORAGE_RESOURCE.String(): func() utils.DBObject { return &resource.StorageResource{} },
|
||||||
tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &resource.ProcessingResource{} },
|
tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &resource.ProcessingResource{} },
|
||||||
tools.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} },
|
tools.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} },
|
||||||
tools.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecution{} },
|
tools.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecutions{} },
|
||||||
tools.WORKSPACE.String(): func() utils.DBObject { return &w3.Workspace{} },
|
tools.WORKSPACE.String(): func() utils.DBObject { return &w3.Workspace{} },
|
||||||
tools.PEER.String(): func() utils.DBObject { return &peer.Peer{} },
|
tools.PEER.String(): func() utils.DBObject { return &peer.Peer{} },
|
||||||
tools.COLLABORATIVE_AREA.String(): func() utils.DBObject { return &collaborative_area.CollaborativeArea{} },
|
tools.COLLABORATIVE_AREA.String(): func() utils.DBObject { return &collaborative_area.CollaborativeArea{} },
|
||||||
|
@@ -68,10 +68,10 @@ func (o *Order) Pay(scheduler *workflow_execution.WorkflowSchedule, request *too
|
|||||||
} else {
|
} else {
|
||||||
o.IsDraft = false
|
o.IsDraft = false
|
||||||
}
|
}
|
||||||
for _, exec := range scheduler.WorkflowExecution {
|
for _, exec := range scheduler.WorkflowExecutions {
|
||||||
exec.IsDraft = false
|
exec.IsDraft = false
|
||||||
_, code, err := utils.GenericUpdateOne(exec, exec.GetID(),
|
_, code, err := utils.GenericUpdateOne(exec, exec.GetID(),
|
||||||
workflow_execution.NewAccessor(request), &workflow_execution.WorkflowExecution{})
|
workflow_execution.NewAccessor(request), &workflow_execution.WorkflowExecutions{})
|
||||||
if code != 200 || err != nil {
|
if code != 200 || err != nil {
|
||||||
return errors.New("could not update the workflow execution" + fmt.Sprintf("%v", err))
|
return errors.New("could not update the workflow execution" + fmt.Sprintf("%v", err))
|
||||||
}
|
}
|
||||||
@@ -100,7 +100,7 @@ func (o *Order) draftStoreFromModel(scheduler *workflow_execution.WorkflowSchedu
|
|||||||
o.IsDraft = true
|
o.IsDraft = true
|
||||||
o.OrderBy = request.PeerID
|
o.OrderBy = request.PeerID
|
||||||
o.WorkflowExecutionIDs = []string{} // create an array of ids
|
o.WorkflowExecutionIDs = []string{} // create an array of ids
|
||||||
for _, exec := range scheduler.WorkflowExecution {
|
for _, exec := range scheduler.WorkflowExecutions {
|
||||||
o.WorkflowExecutionIDs = append(o.WorkflowExecutionIDs, exec.GetID())
|
o.WorkflowExecutionIDs = append(o.WorkflowExecutionIDs, exec.GetID())
|
||||||
}
|
}
|
||||||
// set the name of the order
|
// set the name of the order
|
||||||
@@ -166,12 +166,12 @@ func (o *Order) draftBookOrder(scheduler *workflow_execution.WorkflowSchedule, r
|
|||||||
if request == nil {
|
if request == nil {
|
||||||
return draftedBookings, errors.New("no request found")
|
return draftedBookings, errors.New("no request found")
|
||||||
}
|
}
|
||||||
for _, exec := range scheduler.WorkflowExecution {
|
for _, exec := range scheduler.WorkflowExecutions {
|
||||||
_, priceds, _, err := scheduler.Workflow.Planify(exec.ExecDate, exec.EndDate, request)
|
_, priceds, _, err := scheduler.Workflow.Planify(exec.ExecDate, exec.EndDate, request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return draftedBookings, errors.New("could not planify the workflow" + fmt.Sprintf("%v", err))
|
return draftedBookings, errors.New("could not planify the workflow" + fmt.Sprintf("%v", err))
|
||||||
}
|
}
|
||||||
bookings := exec.Book(scheduler.UUID, scheduler.Workflow.UUID, priceds)
|
bookings := exec.Book(scheduler.Workflow.UUID, priceds)
|
||||||
for _, booking := range bookings {
|
for _, booking := range bookings {
|
||||||
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
|
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
|
||||||
tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)
|
tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)
|
||||||
|
@@ -4,6 +4,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"cloud.o-forge.io/core/oc-lib/tools"
|
"cloud.o-forge.io/core/oc-lib/tools"
|
||||||
@@ -28,40 +29,47 @@ type PeerCache struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// urlFormat formats the URL of the peer with the data type API function
|
// urlFormat formats the URL of the peer with the data type API function
|
||||||
func (p *PeerCache) urlFormat(hostUrl string, dt tools.DataType) string {
|
func (p *PeerCache) urlFormat(url string, dt tools.DataType) string {
|
||||||
// localhost is replaced by the local peer URL
|
// localhost is replaced by the local peer URL
|
||||||
// because localhost must collide on a web request security protocol
|
// because localhost must collide on a web request security protocol
|
||||||
/*localhost := ""
|
localhost := ""
|
||||||
if strings.Contains(hostUrl, "localhost") {
|
if strings.Contains(url, "localhost") {
|
||||||
localhost = "localhost"
|
localhost = "localhost"
|
||||||
}
|
}
|
||||||
if strings.Contains(hostUrl, "127.0.0.1") {
|
if strings.Contains(url, "127.0.0.1") {
|
||||||
localhost = "127.0.0.1"
|
localhost = "127.0.0.1"
|
||||||
}
|
}
|
||||||
if localhost != "" {
|
if localhost != "" {
|
||||||
r := regexp.MustCompile("(" + localhost + ":[0-9]+)")
|
r := regexp.MustCompile("(" + localhost + ":[0-9]+)")
|
||||||
t := r.FindString(hostUrl)
|
t := r.FindString(url)
|
||||||
if t != "" {
|
if t != "" {
|
||||||
hostUrl = strings.Replace(hostUrl, t, dt.API()+":8080/oc", -1)
|
url = strings.Replace(url, t, dt.API()+":8080/oc", -1)
|
||||||
} else {
|
} else {
|
||||||
hostUrl = strings.ReplaceAll(hostUrl, localhost, dt.API()+":8080/oc")
|
url = strings.ReplaceAll(url, localhost, dt.API()+":8080/oc")
|
||||||
}
|
}
|
||||||
} else {*/
|
} else {
|
||||||
hostUrl = hostUrl + "/" + strings.ReplaceAll(dt.API(), "oc-", "")
|
url = url + "/" + dt.API()
|
||||||
//}
|
}
|
||||||
fmt.Println("Contacting", hostUrl)
|
return url
|
||||||
return hostUrl
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkPeerStatus checks the status of a peer
|
// checkPeerStatus checks the status of a peer
|
||||||
func (p *PeerCache) checkPeerStatus(peerID string, appName string) (*Peer, bool) {
|
func (p *PeerCache) checkPeerStatus(peerID string, appName string, caller *tools.HTTPCaller) (*Peer, bool) {
|
||||||
api := tools.API{}
|
api := tools.API{}
|
||||||
access := NewShallowAccessor()
|
access := NewShallowAccessor()
|
||||||
res, code, _ := access.LoadOne(peerID) // Load the peer from db
|
res, code, _ := access.LoadOne(peerID) // Load the peer from db
|
||||||
if code != 200 { // no peer no party
|
if code != 200 { // no peer no party
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
url := p.urlFormat(res.(*Peer).Url, tools.PEER) + "/status" // Format the URL
|
methods := caller.URLS[tools.PEER] // Get the methods url of the peer
|
||||||
|
if methods == nil {
|
||||||
|
return res.(*Peer), false
|
||||||
|
}
|
||||||
|
meth := methods[tools.POST] // Get the POST method to check status
|
||||||
|
if meth == "" {
|
||||||
|
return res.(*Peer), false
|
||||||
|
}
|
||||||
|
url := p.urlFormat(res.(*Peer).Url, tools.PEER) + meth // Format the URL
|
||||||
state, services := api.CheckRemotePeer(url)
|
state, services := api.CheckRemotePeer(url)
|
||||||
res.(*Peer).ServicesState = services // Update the services states of the peer
|
res.(*Peer).ServicesState = services // Update the services states of the peer
|
||||||
access.UpdateOne(res, peerID) // Update the peer in the db
|
access.UpdateOne(res, peerID) // Update the peer in the db
|
||||||
@@ -69,37 +77,36 @@ func (p *PeerCache) checkPeerStatus(peerID string, appName string) (*Peer, bool)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// LaunchPeerExecution launches an execution on a peer
|
// LaunchPeerExecution launches an execution on a peer
|
||||||
// The method contacts the path described by : peer.Url + datatype path (from enums) + replacement of id by dataID
|
|
||||||
func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
|
func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
|
||||||
dt tools.DataType, method tools.METHOD, body interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
|
dt tools.DataType, method tools.METHOD, body interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
|
||||||
fmt.Println("Launching peer execution on", caller.URLS, dt, method)
|
fmt.Println("Launching peer execution on", caller.URLS, dt, method)
|
||||||
methods := caller.URLS[dt] // Get the methods url of the data type
|
methods := caller.URLS[dt] // Get the methods url of the data type
|
||||||
if m, ok := methods[method]; !ok || m == "" {
|
if m, ok := methods[method]; !ok || m == "" {
|
||||||
return nil, errors.New("Requested method " + method.String() + " not declared in HTTPCaller")
|
return nil, errors.New("no path found")
|
||||||
}
|
}
|
||||||
path := methods[method] // Get the path corresponding to the action we want to execute
|
meth := methods[method] // Get the method url to execute
|
||||||
path = strings.ReplaceAll(path, ":id", dataID) // Replace the id in the path in case of a DELETE / UPDATE method (it's a standard naming in OC)
|
meth = strings.ReplaceAll(meth, ":id", dataID) // Replace the id in the url in case of a DELETE / UPDATE method (it's a standard naming in OC)
|
||||||
url := ""
|
url := ""
|
||||||
|
|
||||||
// Check the status of the peer
|
// Check the status of the peer
|
||||||
if mypeer, ok := p.checkPeerStatus(peerID, dt.API()); !ok && mypeer != nil {
|
if mypeer, ok := p.checkPeerStatus(peerID, dt.API(), caller); !ok && mypeer != nil {
|
||||||
// If the peer is not reachable, add the execution to the failed executions list
|
// If the peer is not reachable, add the execution to the failed executions list
|
||||||
pexec := &PeerExecution{
|
pexec := &PeerExecution{
|
||||||
Method: method.String(),
|
Method: method.String(),
|
||||||
Url: p.urlFormat((mypeer.Url), dt) + path, // the url is constitued of : host URL + resource path + action path (ex : mypeer.com/datacenter/resourcetype/path/to/action)
|
Url: p.urlFormat((mypeer.Url)+meth, dt),
|
||||||
Body: body,
|
Body: body,
|
||||||
DataType: dt.EnumIndex(),
|
DataType: dt.EnumIndex(),
|
||||||
DataID: dataID,
|
DataID: dataID,
|
||||||
}
|
}
|
||||||
mypeer.AddExecution(*pexec)
|
mypeer.AddExecution(*pexec)
|
||||||
NewShallowAccessor().UpdateOne(mypeer, peerID) // Update the peer in the db
|
NewShallowAccessor().UpdateOne(mypeer, peerID) // Update the peer in the db
|
||||||
return nil, errors.New("peer is " + peerID + " not reachable")
|
return nil, errors.New("peer is not reachable")
|
||||||
} else {
|
} else {
|
||||||
if mypeer == nil {
|
if mypeer == nil {
|
||||||
return nil, errors.New("peer " + peerID + " not found")
|
return nil, errors.New("peer not found")
|
||||||
}
|
}
|
||||||
// If the peer is reachable, launch the execution
|
// If the peer is reachable, launch the execution
|
||||||
url = p.urlFormat((mypeer.Url), dt) + path // Format the URL
|
url = p.urlFormat((mypeer.Url)+meth, dt) // Format the URL
|
||||||
tmp := mypeer.FailedExecution // Get the failed executions list
|
tmp := mypeer.FailedExecution // Get the failed executions list
|
||||||
mypeer.FailedExecution = []PeerExecution{} // Reset the failed executions list
|
mypeer.FailedExecution = []PeerExecution{} // Reset the failed executions list
|
||||||
NewShallowAccessor().UpdateOne(mypeer, peerID) // Update the peer in the db
|
NewShallowAccessor().UpdateOne(mypeer, peerID) // Update the peer in the db
|
||||||
|
@@ -17,7 +17,7 @@ import (
|
|||||||
* it defines the resource compute
|
* it defines the resource compute
|
||||||
*/
|
*/
|
||||||
type ComputeResource struct {
|
type ComputeResource struct {
|
||||||
AbstractInstanciatedResource[*ComputeResourceInstance]
|
AbstractIntanciatedResource[*ComputeResourceInstance]
|
||||||
Architecture string `json:"architecture,omitempty" bson:"architecture,omitempty"` // Architecture is the architecture
|
Architecture string `json:"architecture,omitempty" bson:"architecture,omitempty"` // Architecture is the architecture
|
||||||
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
|
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
|
||||||
}
|
}
|
||||||
@@ -35,7 +35,7 @@ func (abs *ComputeResource) ConvertToPricedResource(
|
|||||||
if t != tools.COMPUTE_RESOURCE {
|
if t != tools.COMPUTE_RESOURCE {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
p := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, request)
|
p := abs.AbstractIntanciatedResource.ConvertToPricedResource(t, request)
|
||||||
priced := p.(*PricedResource)
|
priced := p.(*PricedResource)
|
||||||
return &PricedComputeResource{
|
return &PricedComputeResource{
|
||||||
PricedResource: *priced,
|
PricedResource: *priced,
|
||||||
@@ -52,7 +52,6 @@ type ComputeNode struct {
|
|||||||
|
|
||||||
type ComputeResourceInstance struct {
|
type ComputeResourceInstance struct {
|
||||||
ResourceInstance[*ComputeResourcePartnership]
|
ResourceInstance[*ComputeResourcePartnership]
|
||||||
Source string `json:"source,omitempty" bson:"source,omitempty"` // Source is the source of the resource
|
|
||||||
SecurityLevel string `json:"security_level,omitempty" bson:"security_level,omitempty"`
|
SecurityLevel string `json:"security_level,omitempty" bson:"security_level,omitempty"`
|
||||||
PowerSources []string `json:"power_sources,omitempty" bson:"power_sources,omitempty"`
|
PowerSources []string `json:"power_sources,omitempty" bson:"power_sources,omitempty"`
|
||||||
AnnualCO2Emissions float64 `json:"annual_co2_emissions,omitempty" bson:"co2_emissions,omitempty"`
|
AnnualCO2Emissions float64 `json:"annual_co2_emissions,omitempty" bson:"co2_emissions,omitempty"`
|
||||||
|
@@ -16,7 +16,7 @@ import (
|
|||||||
* it defines the resource data
|
* it defines the resource data
|
||||||
*/
|
*/
|
||||||
type DataResource struct {
|
type DataResource struct {
|
||||||
AbstractInstanciatedResource[*DataInstance]
|
AbstractIntanciatedResource[*DataInstance]
|
||||||
Type string `bson:"type,omitempty" json:"type,omitempty"`
|
Type string `bson:"type,omitempty" json:"type,omitempty"`
|
||||||
Quality string `bson:"quality,omitempty" json:"quality,omitempty"`
|
Quality string `bson:"quality,omitempty" json:"quality,omitempty"`
|
||||||
OpenData bool `bson:"open_data" json:"open_data" default:"false"` // Type is the type of the storage
|
OpenData bool `bson:"open_data" json:"open_data" default:"false"` // Type is the type of the storage
|
||||||
@@ -42,7 +42,7 @@ func (abs *DataResource) ConvertToPricedResource(
|
|||||||
if t != tools.DATA_RESOURCE {
|
if t != tools.DATA_RESOURCE {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
p := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, request)
|
p := abs.AbstractIntanciatedResource.ConvertToPricedResource(t, request)
|
||||||
priced := p.(*PricedResource)
|
priced := p.(*PricedResource)
|
||||||
return &PricedDataResource{
|
return &PricedDataResource{
|
||||||
PricedResource: *priced,
|
PricedResource: *priced,
|
||||||
|
@@ -12,7 +12,6 @@ type ResourceInterface interface {
|
|||||||
ConvertToPricedResource(t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF
|
ConvertToPricedResource(t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF
|
||||||
GetType() string
|
GetType() string
|
||||||
GetSelectedInstance() utils.DBObject
|
GetSelectedInstance() utils.DBObject
|
||||||
ClearEnv() utils.DBObject
|
|
||||||
SetAllowedInstances(request *tools.APIRequest)
|
SetAllowedInstances(request *tools.APIRequest)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -21,7 +20,6 @@ type ResourceInstanceITF interface {
|
|||||||
GetID() string
|
GetID() string
|
||||||
GetName() string
|
GetName() string
|
||||||
StoreDraftDefault()
|
StoreDraftDefault()
|
||||||
ClearEnv()
|
|
||||||
GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF
|
GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF
|
||||||
GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string)
|
GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string)
|
||||||
ClearPeerGroups()
|
ClearPeerGroups()
|
||||||
|
@@ -25,7 +25,7 @@ type ProcessingUsage struct {
|
|||||||
* it defines the resource processing
|
* it defines the resource processing
|
||||||
*/
|
*/
|
||||||
type ProcessingResource struct {
|
type ProcessingResource struct {
|
||||||
AbstractInstanciatedResource[*ProcessingInstance]
|
AbstractIntanciatedResource[*ProcessingInstance]
|
||||||
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
|
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
|
||||||
IsService bool `json:"is_service,omitempty" bson:"is_service,omitempty"` // IsService is a flag that indicates if the processing is a service
|
IsService bool `json:"is_service,omitempty" bson:"is_service,omitempty"` // IsService is a flag that indicates if the processing is a service
|
||||||
Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"` // Usage is the usage of the processing
|
Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"` // Usage is the usage of the processing
|
||||||
|
@@ -47,12 +47,12 @@ func (r *AbstractResource) CanDelete() bool {
|
|||||||
return r.IsDraft // only draft bookings can be deleted
|
return r.IsDraft // only draft bookings can be deleted
|
||||||
}
|
}
|
||||||
|
|
||||||
type AbstractInstanciatedResource[T ResourceInstanceITF] struct {
|
type AbstractIntanciatedResource[T ResourceInstanceITF] struct {
|
||||||
AbstractResource // AbstractResource contains the basic fields of an object (id, name)
|
AbstractResource // AbstractResource contains the basic fields of an object (id, name)
|
||||||
Instances []T `json:"instances,omitempty" bson:"instances,omitempty"` // Bill is the bill of the resource // Bill is the bill of the resource
|
Instances []T `json:"instances,omitempty" bson:"instances,omitempty"` // Bill is the bill of the resource // Bill is the bill of the resource
|
||||||
}
|
}
|
||||||
|
|
||||||
func (abs *AbstractInstanciatedResource[T]) ConvertToPricedResource(
|
func (abs *AbstractIntanciatedResource[T]) ConvertToPricedResource(
|
||||||
t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF {
|
t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF {
|
||||||
instances := map[string]string{}
|
instances := map[string]string{}
|
||||||
profiles := []pricing.PricingProfileITF{}
|
profiles := []pricing.PricingProfileITF{}
|
||||||
@@ -71,14 +71,7 @@ func (abs *AbstractInstanciatedResource[T]) ConvertToPricedResource(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (abs *AbstractInstanciatedResource[T]) ClearEnv() utils.DBObject {
|
func (r *AbstractIntanciatedResource[T]) GetSelectedInstance() utils.DBObject {
|
||||||
for _, instance := range abs.Instances {
|
|
||||||
instance.ClearEnv()
|
|
||||||
}
|
|
||||||
return abs
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *AbstractInstanciatedResource[T]) GetSelectedInstance() utils.DBObject {
|
|
||||||
if r.SelectedInstanceIndex != nil && len(r.Instances) > *r.SelectedInstanceIndex {
|
if r.SelectedInstanceIndex != nil && len(r.Instances) > *r.SelectedInstanceIndex {
|
||||||
return r.Instances[*r.SelectedInstanceIndex]
|
return r.Instances[*r.SelectedInstanceIndex]
|
||||||
}
|
}
|
||||||
@@ -88,14 +81,11 @@ func (r *AbstractInstanciatedResource[T]) GetSelectedInstance() utils.DBObject {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (abs *AbstractInstanciatedResource[T]) SetAllowedInstances(request *tools.APIRequest) {
|
func (abs *AbstractIntanciatedResource[T]) SetAllowedInstances(request *tools.APIRequest) {
|
||||||
if request != nil && request.PeerID == abs.CreatorID && request.PeerID != "" {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
abs.Instances = verifyAuthAction[T](abs.Instances, request)
|
abs.Instances = verifyAuthAction[T](abs.Instances, request)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *AbstractInstanciatedResource[T]) Trim() {
|
func (d *AbstractIntanciatedResource[T]) Trim() {
|
||||||
d.Type = d.GetType()
|
d.Type = d.GetType()
|
||||||
if ok, _ := (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: d.CreatorID}}).IsMySelf(); !ok {
|
if ok, _ := (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: d.CreatorID}}).IsMySelf(); !ok {
|
||||||
for _, instance := range d.Instances {
|
for _, instance := range d.Instances {
|
||||||
@@ -104,7 +94,7 @@ func (d *AbstractInstanciatedResource[T]) Trim() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (abs *AbstractInstanciatedResource[T]) VerifyAuth(request *tools.APIRequest) bool {
|
func (abs *AbstractIntanciatedResource[T]) VerifyAuth(request *tools.APIRequest) bool {
|
||||||
return len(verifyAuthAction[T](abs.Instances, request)) > 0 || abs.AbstractObject.VerifyAuth(request)
|
return len(verifyAuthAction[T](abs.Instances, request)) > 0 || abs.AbstractObject.VerifyAuth(request)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -137,11 +127,6 @@ type GeoPoint struct {
|
|||||||
Longitude float64 `json:"longitude,omitempty" bson:"longitude,omitempty"`
|
Longitude float64 `json:"longitude,omitempty" bson:"longitude,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Credentials struct {
|
|
||||||
Login string `json:"login,omitempty" bson:"login,omitempty"`
|
|
||||||
Pass string `json:"password,omitempty" bson:"password,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type ResourceInstance[T ResourcePartnerITF] struct {
|
type ResourceInstance[T ResourcePartnerITF] struct {
|
||||||
utils.AbstractObject
|
utils.AbstractObject
|
||||||
Location GeoPoint `json:"location,omitempty" bson:"location,omitempty"`
|
Location GeoPoint `json:"location,omitempty" bson:"location,omitempty"`
|
||||||
@@ -153,12 +138,6 @@ type ResourceInstance[T ResourcePartnerITF] struct {
|
|||||||
Partnerships []T `json:"partnerships,omitempty" bson:"partnerships,omitempty"`
|
Partnerships []T `json:"partnerships,omitempty" bson:"partnerships,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ri *ResourceInstance[T]) ClearEnv() {
|
|
||||||
ri.Env = []models.Param{}
|
|
||||||
ri.Inputs = []models.Param{}
|
|
||||||
ri.Outputs = []models.Param{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ri *ResourceInstance[T]) GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF {
|
func (ri *ResourceInstance[T]) GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF {
|
||||||
pricings := []pricing.PricingProfileITF{}
|
pricings := []pricing.PricingProfileITF{}
|
||||||
for _, p := range ri.Partnerships {
|
for _, p := range ri.Partnerships {
|
||||||
|
@@ -65,12 +65,6 @@ func (wfa *resourceMongoAccessor[T]) LoadAll(isDraft bool) ([]utils.ShallowDBObj
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wfa *resourceMongoAccessor[T]) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) {
|
func (wfa *resourceMongoAccessor[T]) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) {
|
||||||
if filters == nil && search == "*" {
|
|
||||||
return utils.GenericLoadAll[T](func(d utils.DBObject) utils.ShallowDBObject {
|
|
||||||
d.(T).SetAllowedInstances(wfa.Request)
|
|
||||||
return d
|
|
||||||
}, isDraft, wfa)
|
|
||||||
}
|
|
||||||
return utils.GenericSearch[T](filters, search, wfa.getResourceFilter(search),
|
return utils.GenericSearch[T](filters, search, wfa.getResourceFilter(search),
|
||||||
func(d utils.DBObject) utils.ShallowDBObject {
|
func(d utils.DBObject) utils.ShallowDBObject {
|
||||||
d.(T).SetAllowedInstances(wfa.Request)
|
d.(T).SetAllowedInstances(wfa.Request)
|
||||||
@@ -79,6 +73,9 @@ func (wfa *resourceMongoAccessor[T]) Search(filters *dbs.Filters, search string,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (abs *resourceMongoAccessor[T]) getResourceFilter(search string) *dbs.Filters {
|
func (abs *resourceMongoAccessor[T]) getResourceFilter(search string) *dbs.Filters {
|
||||||
|
if search == "*" {
|
||||||
|
search = ""
|
||||||
|
}
|
||||||
return &dbs.Filters{
|
return &dbs.Filters{
|
||||||
Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided
|
Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided
|
||||||
"abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
|
"abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
|
||||||
|
@@ -17,7 +17,7 @@ import (
|
|||||||
* it defines the resource storage
|
* it defines the resource storage
|
||||||
*/
|
*/
|
||||||
type StorageResource struct {
|
type StorageResource struct {
|
||||||
AbstractInstanciatedResource[*StorageResourceInstance] // AbstractResource contains the basic fields of an object (id, name)
|
AbstractIntanciatedResource[*StorageResourceInstance] // AbstractResource contains the basic fields of an object (id, name)
|
||||||
StorageType enum.StorageType `bson:"storage_type" json:"storage_type" default:"-1"` // Type is the type of the storage
|
StorageType enum.StorageType `bson:"storage_type" json:"storage_type" default:"-1"` // Type is the type of the storage
|
||||||
Acronym string `bson:"acronym,omitempty" json:"acronym,omitempty"` // Acronym is the acronym of the storage
|
Acronym string `bson:"acronym,omitempty" json:"acronym,omitempty"` // Acronym is the acronym of the storage
|
||||||
}
|
}
|
||||||
@@ -35,7 +35,7 @@ func (abs *StorageResource) ConvertToPricedResource(
|
|||||||
if t != tools.STORAGE_RESOURCE {
|
if t != tools.STORAGE_RESOURCE {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
p := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, request)
|
p := abs.AbstractIntanciatedResource.ConvertToPricedResource(t, request)
|
||||||
priced := p.(*PricedResource)
|
priced := p.(*PricedResource)
|
||||||
return &PricedStorageResource{
|
return &PricedStorageResource{
|
||||||
PricedResource: *priced,
|
PricedResource: *priced,
|
||||||
@@ -44,7 +44,6 @@ func (abs *StorageResource) ConvertToPricedResource(
|
|||||||
|
|
||||||
type StorageResourceInstance struct {
|
type StorageResourceInstance struct {
|
||||||
ResourceInstance[*StorageResourcePartnership]
|
ResourceInstance[*StorageResourcePartnership]
|
||||||
Credentials *Credentials `json:"credentials,omitempty" bson:"credentials,omitempty"`
|
|
||||||
Source string `bson:"source,omitempty" json:"source,omitempty"` // Source is the source of the storage
|
Source string `bson:"source,omitempty" json:"source,omitempty"` // Source is the source of the storage
|
||||||
Local bool `bson:"local" json:"local"`
|
Local bool `bson:"local" json:"local"`
|
||||||
SecurityLevel string `bson:"security_level,omitempty" json:"security_level,omitempty"`
|
SecurityLevel string `bson:"security_level,omitempty" json:"security_level,omitempty"`
|
||||||
@@ -55,13 +54,6 @@ type StorageResourceInstance struct {
|
|||||||
Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` // Throughput is the throughput of the storage
|
Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` // Throughput is the throughput of the storage
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ri *StorageResourceInstance) ClearEnv() {
|
|
||||||
ri.Credentials = nil
|
|
||||||
ri.Env = []models.Param{}
|
|
||||||
ri.Inputs = []models.Param{}
|
|
||||||
ri.Outputs = []models.Param{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ri *StorageResourceInstance) StoreDraftDefault() {
|
func (ri *StorageResourceInstance) StoreDraftDefault() {
|
||||||
found := false
|
found := false
|
||||||
for _, p := range ri.ResourceInstance.Env {
|
for _, p := range ri.ResourceInstance.Env {
|
||||||
|
@@ -23,10 +23,6 @@ func (r *WorkflowResource) GetType() string {
|
|||||||
return tools.WORKFLOW_RESOURCE.String()
|
return tools.WORKFLOW_RESOURCE.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *WorkflowResource) ClearEnv() utils.DBObject {
|
|
||||||
return d
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *WorkflowResource) Trim() {
|
func (d *WorkflowResource) Trim() {
|
||||||
/* EMPTY */
|
/* EMPTY */
|
||||||
}
|
}
|
||||||
|
@@ -91,7 +91,7 @@ func (ao *AbstractObject) UpToDate(user string, peer string, create bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ao *AbstractObject) VerifyAuth(request *tools.APIRequest) bool {
|
func (ao *AbstractObject) VerifyAuth(request *tools.APIRequest) bool {
|
||||||
return ao.AccessMode == Public || (request != nil && ao.CreatorID == request.PeerID && request.PeerID != "")
|
return ao.AccessMode == Public || (request != nil && ao.CreatorID == request.PeerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ao *AbstractObject) GetObjectFilters(search string) *dbs.Filters {
|
func (ao *AbstractObject) GetObjectFilters(search string) *dbs.Filters {
|
||||||
|
@@ -66,6 +66,7 @@ func GenericDeleteOne(id string, a Accessor) (DBObject, int, error) {
|
|||||||
return nil, 403, errors.New("you are not allowed to delete :" + a.GetType().String())
|
return nil, 403, errors.New("you are not allowed to delete :" + a.GetType().String())
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
a.GetLogger().Error().Msg("Could not retrieve " + id + " to db. Error: " + err.Error())
|
||||||
return nil, code, err
|
return nil, code, err
|
||||||
}
|
}
|
||||||
if a.ShouldVerifyAuth() && !res.VerifyAuth(a.GetRequest()) {
|
if a.ShouldVerifyAuth() && !res.VerifyAuth(a.GetRequest()) {
|
||||||
@@ -113,6 +114,7 @@ func GenericLoadOne[T DBObject](id string, f func(DBObject) (DBObject, int, erro
|
|||||||
var data T
|
var data T
|
||||||
res_mongo, code, err := mongo.MONGOService.LoadOne(id, a.GetType().String())
|
res_mongo, code, err := mongo.MONGOService.LoadOne(id, a.GetType().String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
a.GetLogger().Error().Msg("Could not retrieve " + id + " from db. Error: " + err.Error())
|
||||||
return nil, code, err
|
return nil, code, err
|
||||||
}
|
}
|
||||||
res_mongo.Decode(&data)
|
res_mongo.Decode(&data)
|
||||||
@@ -126,6 +128,7 @@ func genericLoadAll[T DBObject](res *mgb.Cursor, code int, err error, onlyDraft
|
|||||||
objs := []ShallowDBObject{}
|
objs := []ShallowDBObject{}
|
||||||
var results []T
|
var results []T
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
a.GetLogger().Error().Msg("Could not retrieve any from db. Error: " + err.Error())
|
||||||
return nil, code, err
|
return nil, code, err
|
||||||
}
|
}
|
||||||
if err = res.All(mongo.MngoCtx, &results); err != nil {
|
if err = res.All(mongo.MngoCtx, &results); err != nil {
|
||||||
|
@@ -128,8 +128,8 @@ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) {
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIRequest) (float64, map[tools.DataType]map[string]pricing.PricedItemITF, *Workflow, error) {
|
func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIRequest) (float64, map[tools.DataType][]pricing.PricedItemITF, *Workflow, error) {
|
||||||
priceds := map[tools.DataType]map[string]pricing.PricedItemITF{}
|
priceds := map[tools.DataType][]pricing.PricedItemITF{}
|
||||||
ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, wf, priceds, request, wf.Graph.IsProcessing,
|
ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, wf, priceds, request, wf.Graph.IsProcessing,
|
||||||
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
|
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
|
||||||
return start.Add(time.Duration(wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(), request)) * time.Second), priced.GetExplicitDurationInS()
|
return start.Add(time.Duration(wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(), request)) * time.Second), priced.GetExplicitDurationInS()
|
||||||
@@ -189,12 +189,12 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIR
|
|||||||
return longest, priceds, wf, nil
|
return longest, priceds, wf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func plan[T resources.ResourceInterface](dt tools.DataType, wf *Workflow, priceds map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest,
|
func plan[T resources.ResourceInterface](dt tools.DataType, wf *Workflow, priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest,
|
||||||
f func(graph.GraphItem) bool, start func(resources.ResourceInterface, pricing.PricedItemITF) (time.Time, float64), end func(time.Time, float64) *time.Time) ([]T, map[tools.DataType]map[string]pricing.PricedItemITF, error) {
|
f func(graph.GraphItem) bool, start func(resources.ResourceInterface, pricing.PricedItemITF) (time.Time, float64), end func(time.Time, float64) *time.Time) ([]T, map[tools.DataType][]pricing.PricedItemITF, error) {
|
||||||
resources := []T{}
|
resources := []T{}
|
||||||
for _, item := range wf.GetGraphItems(f) {
|
for _, item := range wf.GetGraphItems(f) {
|
||||||
if priceds[dt] == nil {
|
if priceds[dt] == nil {
|
||||||
priceds[dt] = map[string]pricing.PricedItemITF{}
|
priceds[dt] = []pricing.PricedItemITF{}
|
||||||
}
|
}
|
||||||
dt, realItem := item.GetResource()
|
dt, realItem := item.GetResource()
|
||||||
if realItem == nil {
|
if realItem == nil {
|
||||||
@@ -212,7 +212,7 @@ func plan[T resources.ResourceInterface](dt tools.DataType, wf *Workflow, priced
|
|||||||
priced.SetLocationEnd(*e)
|
priced.SetLocationEnd(*e)
|
||||||
}
|
}
|
||||||
resources = append(resources, realItem.(T))
|
resources = append(resources, realItem.(T))
|
||||||
priceds[dt][item.ID] = priced
|
priceds[dt] = append(priceds[dt], priced)
|
||||||
}
|
}
|
||||||
return resources, priceds, nil
|
return resources, priceds, nil
|
||||||
}
|
}
|
||||||
|
@@ -95,12 +95,12 @@ func (a *workflowMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.
|
|||||||
if set.(*Workflow).Graph != nil && set.(*Workflow).Graph.Partial {
|
if set.(*Workflow).Graph != nil && set.(*Workflow).Graph.Partial {
|
||||||
return nil, 403, errors.New("you are not allowed to update a partial workflow")
|
return nil, 403, errors.New("you are not allowed to update a partial workflow")
|
||||||
}
|
}
|
||||||
res, code, err := utils.GenericUpdateOne(set, id, a, &Workflow{})
|
res, code, err := utils.GenericUpdateOne(a.verifyResource(set), id, a, &Workflow{})
|
||||||
if code != 200 {
|
if code != 200 {
|
||||||
return nil, code, err
|
return nil, code, err
|
||||||
}
|
}
|
||||||
workflow := res.(*Workflow)
|
workflow := res.(*Workflow)
|
||||||
a.execute(workflow, false, true) // update the workspace for the workflow
|
a.execute(workflow, false, false) // update the workspace for the workflow
|
||||||
a.share(workflow, false, a.GetCaller()) // share the update to the peers
|
a.share(workflow, false, a.GetCaller()) // share the update to the peers
|
||||||
return res, code, nil
|
return res, code, nil
|
||||||
}
|
}
|
||||||
@@ -119,19 +119,12 @@ func (a *workflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, i
|
|||||||
workflow := res.(*Workflow)
|
workflow := res.(*Workflow)
|
||||||
|
|
||||||
a.share(workflow, false, a.GetCaller()) // share the creation to the peers
|
a.share(workflow, false, a.GetCaller()) // share the creation to the peers
|
||||||
a.execute(workflow, false, true) // store the workspace for the workflow
|
a.execute(workflow, false, false) // store the workspace for the workflow
|
||||||
return res, code, nil
|
return res, code, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CopyOne copies a workflow in the database
|
// CopyOne copies a workflow in the database
|
||||||
func (a *workflowMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
|
func (a *workflowMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
|
||||||
wf := data.(*Workflow)
|
|
||||||
for _, item := range wf.Graph.Items {
|
|
||||||
_, obj := item.GetResource()
|
|
||||||
if obj != nil {
|
|
||||||
obj.ClearEnv()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return utils.GenericStoreOne(data, a)
|
return utils.GenericStoreOne(data, a)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -214,10 +207,10 @@ func (a *workflowMongoAccessor) verifyResource(obj utils.DBObject) utils.DBObjec
|
|||||||
} else if t == tools.DATA_RESOURCE {
|
} else if t == tools.DATA_RESOURCE {
|
||||||
access = resources.NewAccessor[*resources.DataResource](t, a.GetRequest(), func() utils.DBObject { return &resources.DataResource{} })
|
access = resources.NewAccessor[*resources.DataResource](t, a.GetRequest(), func() utils.DBObject { return &resources.DataResource{} })
|
||||||
} else {
|
} else {
|
||||||
wf.Graph.Clear(resource.GetID())
|
wf.Graph.Clear(item.Data.GetID())
|
||||||
}
|
}
|
||||||
if error := utils.VerifyAccess(access, resource.GetID()); error != nil {
|
if error := utils.VerifyAccess(access, resource.GetID()); error != nil {
|
||||||
wf.Graph.Clear(resource.GetID())
|
wf.Graph.Clear(item.Data.GetID())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return wf
|
return wf
|
||||||
|
@@ -15,41 +15,39 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WorkflowExecution is a struct that represents a list of workflow executions
|
* WorkflowExecutions is a struct that represents a list of workflow executions
|
||||||
* Warning: No user can write (del, post, put) a workflow execution, it is only used by the system
|
* Warning: No user can write (del, post, put) a workflow execution, it is only used by the system
|
||||||
* workflows generate their own executions
|
* workflows generate their own executions
|
||||||
*/
|
*/
|
||||||
type WorkflowExecution struct {
|
type WorkflowExecutions struct {
|
||||||
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
|
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
|
||||||
PeerBookByGraph map[string]map[string][]string `json:"peer_book_by_graph,omitempty" bson:"peer_book_by_graph,omitempty"` // BookByResource is a map of the resource id and the list of the booking id
|
|
||||||
ExecutionsID string `json:"executions_id,omitempty" bson:"executions_id,omitempty"`
|
|
||||||
ExecDate time.Time `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` // ExecDate is the execution date of the workflow, is required
|
ExecDate time.Time `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` // ExecDate is the execution date of the workflow, is required
|
||||||
EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` // EndDate is the end date of the workflow
|
EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` // EndDate is the end date of the workflow
|
||||||
State enum.BookingStatus `json:"state" bson:"state" default:"0"` // TEMPORARY TODO DEFAULT 1 -> 0 State is the state of the workflow
|
State enum.BookingStatus `json:"state" bson:"state" default:"0"` // TEMPORARY TODO DEFAULT 1 -> 0 State is the state of the workflow
|
||||||
WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
|
WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *WorkflowExecution) StoreDraftDefault() {
|
func (r *WorkflowExecutions) StoreDraftDefault() {
|
||||||
r.IsDraft = false // TODO: TEMPORARY
|
r.IsDraft = true // TODO: TEMPORARY
|
||||||
r.State = enum.SCHEDULED
|
r.State = enum.DRAFT
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *WorkflowExecution) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
|
func (r *WorkflowExecutions) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
|
||||||
if r.State != set.(*WorkflowExecution).State {
|
if r.State != set.(*WorkflowExecutions).State {
|
||||||
return true, &WorkflowExecution{State: set.(*WorkflowExecution).State} // only state can be updated
|
return true, &WorkflowExecutions{State: set.(*WorkflowExecutions).State} // only state can be updated
|
||||||
}
|
}
|
||||||
return !r.IsDraft, set // only draft buying can be updated
|
return !r.IsDraft, set // only draft buying can be updated
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *WorkflowExecution) CanDelete() bool {
|
func (r *WorkflowExecutions) CanDelete() bool {
|
||||||
return r.IsDraft // only draft bookings can be deleted
|
return r.IsDraft // only draft bookings can be deleted
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wfa *WorkflowExecution) Equals(we *WorkflowExecution) bool {
|
func (wfa *WorkflowExecutions) Equals(we *WorkflowExecutions) bool {
|
||||||
return wfa.ExecDate.Equal(we.ExecDate) && wfa.WorkflowID == we.WorkflowID
|
return wfa.ExecDate.Equal(we.ExecDate) && wfa.WorkflowID == we.WorkflowID
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *WorkflowExecution) PurgeDraft(request *tools.APIRequest) error {
|
func (ws *WorkflowExecutions) PurgeDraft(request *tools.APIRequest) error {
|
||||||
if ws.EndDate == nil {
|
if ws.EndDate == nil {
|
||||||
// if no end... then Book like a savage
|
// if no end... then Book like a savage
|
||||||
e := ws.ExecDate.Add(time.Hour)
|
e := ws.ExecDate.Add(time.Hour)
|
||||||
@@ -76,7 +74,7 @@ func (ws *WorkflowExecution) PurgeDraft(request *tools.APIRequest) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// tool to transform the argo status to a state
|
// tool to transform the argo status to a state
|
||||||
func (wfa *WorkflowExecution) ArgoStatusToState(status string) *WorkflowExecution {
|
func (wfa *WorkflowExecutions) ArgoStatusToState(status string) *WorkflowExecutions {
|
||||||
status = strings.ToLower(status)
|
status = strings.ToLower(status)
|
||||||
switch status {
|
switch status {
|
||||||
case "succeeded": // Succeeded
|
case "succeeded": // Succeeded
|
||||||
@@ -91,56 +89,38 @@ func (wfa *WorkflowExecution) ArgoStatusToState(status string) *WorkflowExecutio
|
|||||||
return wfa
|
return wfa
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *WorkflowExecution) GenerateID() {
|
func (r *WorkflowExecutions) GenerateID() {
|
||||||
if r.UUID == "" {
|
|
||||||
r.UUID = uuid.New().String()
|
r.UUID = uuid.New().String()
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *WorkflowExecution) GetName() string {
|
func (d *WorkflowExecutions) GetName() string {
|
||||||
return d.UUID + "_" + d.ExecDate.String()
|
return d.UUID + "_" + d.ExecDate.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *WorkflowExecution) GetAccessor(request *tools.APIRequest) utils.Accessor {
|
func (d *WorkflowExecutions) GetAccessor(request *tools.APIRequest) utils.Accessor {
|
||||||
return NewAccessor(request) // Create a new instance of the accessor
|
return NewAccessor(request) // Create a new instance of the accessor
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *WorkflowExecution) VerifyAuth(request *tools.APIRequest) bool {
|
func (d *WorkflowExecutions) VerifyAuth(request *tools.APIRequest) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[tools.DataType]map[string]pricing.PricedItemITF) []*booking.Booking {
|
func (d *WorkflowExecutions) Book(wfID string, priceds map[tools.DataType][]pricing.PricedItemITF) []*booking.Booking {
|
||||||
booking := d.bookEach(executionsID, wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE])
|
booking := d.bookEach(wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE])
|
||||||
booking = append(booking, d.bookEach(executionsID, wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...)
|
booking = append(booking, d.bookEach(wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...)
|
||||||
booking = append(booking,d.bookEach(executionsID, wfID, tools.COMPUTE_RESOURCE, priceds[tools.COMPUTE_RESOURCE])...)
|
|
||||||
booking = append(booking,d.bookEach(executionsID, wfID, tools.DATA_RESOURCE, priceds[tools.DATA_RESOURCE])...)
|
|
||||||
return booking
|
return booking
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.DataType, priceds map[string]pricing.PricedItemITF) []*booking.Booking {
|
func (d *WorkflowExecutions) bookEach(wfID string, dt tools.DataType, priceds []pricing.PricedItemITF) []*booking.Booking {
|
||||||
items := []*booking.Booking{}
|
items := []*booking.Booking{}
|
||||||
for itemID, priced := range priceds {
|
for _, priced := range priceds {
|
||||||
if d.PeerBookByGraph == nil {
|
|
||||||
d.PeerBookByGraph = map[string]map[string][]string{}
|
|
||||||
}
|
|
||||||
if d.PeerBookByGraph[priced.GetCreatorID()] == nil {
|
|
||||||
d.PeerBookByGraph[priced.GetCreatorID()] = map[string][]string{}
|
|
||||||
}
|
|
||||||
if d.PeerBookByGraph[priced.GetCreatorID()][itemID] == nil {
|
|
||||||
d.PeerBookByGraph[priced.GetCreatorID()][itemID] = []string{}
|
|
||||||
}
|
|
||||||
start := d.ExecDate
|
start := d.ExecDate
|
||||||
if s := priced.GetLocationStart(); s != nil {
|
if s := priced.GetLocationStart(); s != nil {
|
||||||
start = *s
|
start = *s
|
||||||
}
|
}
|
||||||
end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second)
|
end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second)
|
||||||
bookingItem := &booking.Booking{
|
bookingItem := &booking.Booking{
|
||||||
AbstractObject: utils.AbstractObject{
|
State: enum.DRAFT,
|
||||||
UUID: uuid.New().String(),
|
|
||||||
Name: d.GetName() + "_" + executionsID + "_" + wfID,
|
|
||||||
},
|
|
||||||
ExecutionsID: executionsID,
|
|
||||||
State: enum.SCHEDULED,
|
|
||||||
ResourceID: priced.GetID(),
|
ResourceID: priced.GetID(),
|
||||||
ResourceType: dt,
|
ResourceType: dt,
|
||||||
DestPeerID: priced.GetCreatorID(),
|
DestPeerID: priced.GetCreatorID(),
|
||||||
@@ -150,8 +130,6 @@ func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.
|
|||||||
ExpectedEndDate: &end,
|
ExpectedEndDate: &end,
|
||||||
}
|
}
|
||||||
items = append(items, bookingItem)
|
items = append(items, bookingItem)
|
||||||
d.PeerBookByGraph[priced.GetCreatorID()][itemID] = append(
|
|
||||||
d.PeerBookByGraph[priced.GetCreatorID()][itemID], bookingItem.GetID())
|
|
||||||
}
|
}
|
||||||
return items
|
return items
|
||||||
}
|
}
|
||||||
|
@@ -43,11 +43,11 @@ func (wfa *workflowExecutionMongoAccessor) DeleteOne(id string) (utils.DBObject,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wfa *workflowExecutionMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
|
func (wfa *workflowExecutionMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
|
||||||
if set.(*WorkflowExecution).State == 0 {
|
if set.(*WorkflowExecutions).State == 0 {
|
||||||
return nil, 400, errors.New("state is required")
|
return nil, 400, errors.New("state is required")
|
||||||
}
|
}
|
||||||
realSet := WorkflowExecution{State: set.(*WorkflowExecution).State}
|
realSet := WorkflowExecutions{State: set.(*WorkflowExecutions).State}
|
||||||
return utils.GenericUpdateOne(&realSet, id, wfa, &WorkflowExecution{})
|
return utils.GenericUpdateOne(&realSet, id, wfa, &WorkflowExecutions{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wfa *workflowExecutionMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
|
func (wfa *workflowExecutionMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
|
||||||
@@ -59,15 +59,13 @@ func (wfa *workflowExecutionMongoAccessor) CopyOne(data utils.DBObject) (utils.D
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
|
func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
|
||||||
return utils.GenericLoadOne[*WorkflowExecution](id, func(d utils.DBObject) (utils.DBObject, int, error) {
|
return utils.GenericLoadOne[*WorkflowExecutions](id, func(d utils.DBObject) (utils.DBObject, int, error) {
|
||||||
now := time.Now()
|
if d.(*WorkflowExecutions).State == enum.DRAFT && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
|
||||||
now = now.Add(time.Second * -60)
|
|
||||||
if d.(*WorkflowExecution).State == enum.DRAFT && !a.shallow && now.UTC().After(d.(*WorkflowExecution).ExecDate) {
|
|
||||||
utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request))
|
utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request))
|
||||||
return nil, 404, errors.New("not found")
|
return nil, 404, errors.New("not found")
|
||||||
}
|
}
|
||||||
if d.(*WorkflowExecution).State == enum.SCHEDULED && !a.shallow && now.UTC().After(d.(*WorkflowExecution).ExecDate) {
|
if d.(*WorkflowExecutions).State == enum.SCHEDULED && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
|
||||||
d.(*WorkflowExecution).State = enum.FORGOTTEN
|
d.(*WorkflowExecutions).State = enum.FORGOTTEN
|
||||||
utils.GenericRawUpdateOne(d, id, newShallowAccessor(a.Request))
|
utils.GenericRawUpdateOne(d, id, newShallowAccessor(a.Request))
|
||||||
}
|
}
|
||||||
return d, 200, nil
|
return d, 200, nil
|
||||||
@@ -75,23 +73,21 @@ func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *workflowExecutionMongoAccessor) LoadAll(isDraft bool) ([]utils.ShallowDBObject, int, error) {
|
func (a *workflowExecutionMongoAccessor) LoadAll(isDraft bool) ([]utils.ShallowDBObject, int, error) {
|
||||||
return utils.GenericLoadAll[*WorkflowExecution](a.getExec(), isDraft, a)
|
return utils.GenericLoadAll[*WorkflowExecutions](a.getExec(), isDraft, a)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *workflowExecutionMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) {
|
func (a *workflowExecutionMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) {
|
||||||
return utils.GenericSearch[*WorkflowExecution](filters, search, a.GetExecFilters(search), a.getExec(), isDraft, a)
|
return utils.GenericSearch[*WorkflowExecutions](filters, search, a.GetExecFilters(search), a.getExec(), isDraft, a)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *workflowExecutionMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject {
|
func (a *workflowExecutionMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject {
|
||||||
return func(d utils.DBObject) utils.ShallowDBObject {
|
return func(d utils.DBObject) utils.ShallowDBObject {
|
||||||
now := time.Now()
|
if d.(*WorkflowExecutions).State == enum.DRAFT && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
|
||||||
now = now.Add(time.Second * -60)
|
|
||||||
if d.(*WorkflowExecution).State == enum.DRAFT && now.UTC().After(d.(*WorkflowExecution).ExecDate) {
|
|
||||||
utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request))
|
utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if d.(*WorkflowExecution).State == enum.SCHEDULED && now.UTC().After(d.(*WorkflowExecution).ExecDate) {
|
if d.(*WorkflowExecutions).State == enum.SCHEDULED && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
|
||||||
d.(*WorkflowExecution).State = enum.FORGOTTEN
|
d.(*WorkflowExecutions).State = enum.FORGOTTEN
|
||||||
utils.GenericRawUpdateOne(d, d.GetID(), newShallowAccessor(a.Request))
|
utils.GenericRawUpdateOne(d, d.GetID(), newShallowAccessor(a.Request))
|
||||||
return d
|
return d
|
||||||
}
|
}
|
||||||
|
@@ -1,20 +1,16 @@
|
|||||||
package workflow_execution
|
package workflow_execution
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"cloud.o-forge.io/core/oc-lib/logs"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/booking"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
||||||
"cloud.o-forge.io/core/oc-lib/models/peer"
|
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||||
"cloud.o-forge.io/core/oc-lib/models/utils"
|
"cloud.o-forge.io/core/oc-lib/models/utils"
|
||||||
"cloud.o-forge.io/core/oc-lib/models/workflow"
|
"cloud.o-forge.io/core/oc-lib/models/workflow"
|
||||||
"cloud.o-forge.io/core/oc-lib/tools"
|
"cloud.o-forge.io/core/oc-lib/tools"
|
||||||
"github.com/google/uuid"
|
|
||||||
"github.com/robfig/cron"
|
"github.com/robfig/cron"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -24,9 +20,8 @@ import (
|
|||||||
*/
|
*/
|
||||||
// it's a flying object only use in a session time. It's not stored in the database
|
// it's a flying object only use in a session time. It's not stored in the database
|
||||||
type WorkflowSchedule struct {
|
type WorkflowSchedule struct {
|
||||||
UUID string `json:"id" validate:"required"` // ExecutionsID is the list of the executions id of the workflow
|
|
||||||
Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule
|
Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule
|
||||||
WorkflowExecution []*WorkflowExecution `json:"workflow_executions,omitempty"` // WorkflowExecution is the list of executions of the workflow
|
WorkflowExecutions []*WorkflowExecutions `json:"workflow_executions,omitempty"` // WorkflowExecutions is the list of executions of the workflow
|
||||||
Message string `json:"message,omitempty"` // Message is the message of the schedule
|
Message string `json:"message,omitempty"` // Message is the message of the schedule
|
||||||
Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule
|
Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule
|
||||||
Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time
|
Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time
|
||||||
@@ -41,7 +36,6 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
ws := &WorkflowSchedule{
|
ws := &WorkflowSchedule{
|
||||||
UUID: uuid.New().String(),
|
|
||||||
Start: s,
|
Start: s,
|
||||||
DurationS: durationInS,
|
DurationS: durationInS,
|
||||||
Cron: cron,
|
Cron: cron,
|
||||||
@@ -53,19 +47,19 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W
|
|||||||
return ws
|
return ws
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) {
|
func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecutions, error) {
|
||||||
if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" {
|
if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" {
|
||||||
return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined")
|
return false, nil, []*WorkflowExecutions{}, errors.New("no caller defined")
|
||||||
}
|
}
|
||||||
access := workflow.NewAccessor(request)
|
access := workflow.NewAccessor(request)
|
||||||
res, code, err := access.LoadOne(wfID)
|
res, code, err := access.LoadOne(wfID)
|
||||||
if code != 200 {
|
if code != 200 {
|
||||||
return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("could not load the workflow with id: " + err.Error())
|
return false, nil, []*WorkflowExecutions{}, errors.New("could not load the workflow with id: " + err.Error())
|
||||||
}
|
}
|
||||||
wf := res.(*workflow.Workflow)
|
wf := res.(*workflow.Workflow)
|
||||||
longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request)
|
longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err
|
return false, wf, []*WorkflowExecutions{}, err
|
||||||
}
|
}
|
||||||
ws.DurationS = longest
|
ws.DurationS = longest
|
||||||
ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds."
|
ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds."
|
||||||
@@ -74,11 +68,10 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
|
|||||||
}
|
}
|
||||||
execs, err := ws.getExecutions(wf)
|
execs, err := ws.getExecutions(wf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err
|
return false, wf, []*WorkflowExecutions{}, err
|
||||||
}
|
}
|
||||||
bookings := []*booking.Booking{}
|
|
||||||
for _, exec := range execs {
|
for _, exec := range execs {
|
||||||
bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...)
|
bookings := exec.Book(wfID, priceds)
|
||||||
for _, b := range bookings {
|
for _, b := range bookings {
|
||||||
meth := request.Caller.URLS[tools.BOOKING][tools.GET]
|
meth := request.Caller.URLS[tools.BOOKING][tools.GET]
|
||||||
meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
|
meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
|
||||||
@@ -87,51 +80,43 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
|
|||||||
request.Caller.URLS[tools.BOOKING][tools.GET] = meth
|
request.Caller.URLS[tools.BOOKING][tools.GET] = meth
|
||||||
_, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller)
|
_, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, wf, execs, bookings, err
|
return false, wf, execs, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
return true, wf, execs, bookings, nil
|
return true, wf, execs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) {
|
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecutions, error) {
|
||||||
if request == nil {
|
if request == nil {
|
||||||
return ws, nil, []*WorkflowExecution{}, errors.New("no request found")
|
return ws, nil, []*WorkflowExecutions{}, errors.New("no request found")
|
||||||
}
|
}
|
||||||
c := request.Caller
|
c := request.Caller
|
||||||
if c == nil || c.URLS == nil || c.URLS[tools.BOOKING] == nil {
|
if c == nil || c.URLS == nil || c.URLS[tools.BOOKING] == nil {
|
||||||
return ws, nil, []*WorkflowExecution{}, errors.New("no caller defined")
|
return ws, nil, []*WorkflowExecutions{}, errors.New("no caller defined")
|
||||||
}
|
}
|
||||||
methods := c.URLS[tools.BOOKING]
|
methods := c.URLS[tools.BOOKING]
|
||||||
if _, ok := methods[tools.GET]; !ok {
|
if _, ok := methods[tools.GET]; !ok {
|
||||||
return ws, nil, []*WorkflowExecution{}, errors.New("no path found")
|
return ws, nil, []*WorkflowExecutions{}, errors.New("no path found")
|
||||||
}
|
}
|
||||||
ok, wf, executions, bookings, err := ws.CheckBooking(wfID, request)
|
ok, wf, executions, err := ws.CheckBooking(wfID, request)
|
||||||
ws.WorkflowExecution = executions
|
|
||||||
if !ok || err != nil {
|
if !ok || err != nil {
|
||||||
return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
|
return ws, nil, []*WorkflowExecutions{}, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
ws.Workflow = wf
|
ws.Workflow = wf
|
||||||
for _, booking := range bookings {
|
ws.WorkflowExecutions = executions
|
||||||
l := logs.GetLogger()
|
|
||||||
l.Info().Msg("Booking on " + booking.DestPeerID)
|
|
||||||
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
|
|
||||||
tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)
|
|
||||||
if err != nil {
|
|
||||||
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fmt.Println("Schedules")
|
|
||||||
for _, exec := range executions {
|
for _, exec := range executions {
|
||||||
err := exec.PurgeDraft(request)
|
err := exec.PurgeDraft(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ws, nil, []*WorkflowExecution{}, errors.New("purge draft" + fmt.Sprintf("%v", err))
|
return ws, nil, []*WorkflowExecutions{}, errors.New("purge draft" + fmt.Sprintf("%v", err))
|
||||||
}
|
}
|
||||||
|
exec.GenerateID()
|
||||||
exec.StoreDraftDefault()
|
exec.StoreDraftDefault()
|
||||||
|
// Should DELETE the previous execution2
|
||||||
utils.GenericStoreOne(exec, NewAccessor(request))
|
utils.GenericStoreOne(exec, NewAccessor(request))
|
||||||
}
|
}
|
||||||
fmt.Println("Schedules")
|
|
||||||
return ws, wf, executions, nil
|
return ws, wf, executions, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -147,22 +132,17 @@ VERIFY THAT WE HANDLE DIFFERENCE BETWEEN LOCATION TIME && BOOKING
|
|||||||
* getExecutions is a function that returns the executions of a workflow
|
* getExecutions is a function that returns the executions of a workflow
|
||||||
* it returns an array of workflow_execution.WorkflowExecution
|
* it returns an array of workflow_execution.WorkflowExecution
|
||||||
*/
|
*/
|
||||||
func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*WorkflowExecution, error) {
|
func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*WorkflowExecutions, error) {
|
||||||
workflows_executions := []*WorkflowExecution{}
|
workflows_executions := []*WorkflowExecutions{}
|
||||||
dates, err := ws.getDates()
|
dates, err := ws.getDates()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return workflows_executions, err
|
return workflows_executions, err
|
||||||
}
|
}
|
||||||
for _, date := range dates {
|
for _, date := range dates {
|
||||||
fmt.Println("============")
|
obj := &WorkflowExecutions{
|
||||||
fmt.Println("Date : " + fmt.Sprint(date))
|
|
||||||
fmt.Println("============")
|
|
||||||
obj := &WorkflowExecution{
|
|
||||||
AbstractObject: utils.AbstractObject{
|
AbstractObject: utils.AbstractObject{
|
||||||
UUID: uuid.New().String(), // set the uuid of the execution
|
|
||||||
Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution
|
Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution
|
||||||
},
|
},
|
||||||
ExecutionsID: ws.UUID,
|
|
||||||
ExecDate: date.Start, // set the execution date
|
ExecDate: date.Start, // set the execution date
|
||||||
EndDate: date.End, // set the end date
|
EndDate: date.End, // set the end date
|
||||||
State: enum.DRAFT, // set the state to 1 (scheduled)
|
State: enum.DRAFT, // set the state to 1 (scheduled)
|
||||||
@@ -193,17 +173,8 @@ func (ws *WorkflowSchedule) getDates() ([]Schedule, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return schedule, errors.New("Bad cron message: " + err.Error())
|
return schedule, errors.New("Bad cron message: " + err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
toto, _ := json.MarshalIndent(sched,"","")
|
|
||||||
fmt.Println(string(toto))
|
|
||||||
s := sched.Next(ws.Start)
|
|
||||||
fmt.Println("s.IsZero() : " + fmt.Sprint(s.IsZero()) + "s.Before(*ws.End) : " + fmt.Sprint(s.Before(*ws.End)) )
|
|
||||||
tata, _ := json.MarshalIndent(s,"","")
|
|
||||||
fmt.Println("s = " + string(tata) + " et ws.End = " + fmt.Sprint(ws.End))
|
|
||||||
|
|
||||||
// loop through the cron schedule to set the executions
|
// loop through the cron schedule to set the executions
|
||||||
for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) {
|
for s := sched.Next(ws.Start); !s.IsZero() && s.Before(*ws.End); s = sched.Next(s) {
|
||||||
fmt.Println("next execution :" + fmt.Sprint(s))
|
|
||||||
e := s.Add(time.Duration(ws.DurationS) * time.Second)
|
e := s.Add(time.Duration(ws.DurationS) * time.Second)
|
||||||
schedule = append(schedule, Schedule{
|
schedule = append(schedule, Schedule{
|
||||||
Start: s,
|
Start: s,
|
||||||
|
@@ -73,10 +73,8 @@ func (a *workspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject,
|
|||||||
filters := &dbs.Filters{
|
filters := &dbs.Filters{
|
||||||
Or: map[string][]dbs.Filter{
|
Or: map[string][]dbs.Filter{
|
||||||
"abstractobject.name": {{Operator: dbs.LIKE.String(), Value: data.GetName() + "_workspace"}},
|
"abstractobject.name": {{Operator: dbs.LIKE.String(), Value: data.GetName() + "_workspace"}},
|
||||||
"abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: a.GetPeerID()}},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
// filters *dbs.Filters, word string, isDraft bool
|
|
||||||
res, _, err := a.Search(filters, "", true) // Search for the workspace
|
res, _, err := a.Search(filters, "", true) // Search for the workspace
|
||||||
if err == nil && len(res) > 0 { // If the workspace already exists, return an error
|
if err == nil && len(res) > 0 { // If the workspace already exists, return an error
|
||||||
return nil, 409, errors.New("a workspace with the same name already exists")
|
return nil, 409, errors.New("a workspace with the same name already exists")
|
||||||
|
@@ -115,8 +115,8 @@ func (a *API) SubscribeRouter(infos []*beego.ControllerInfo) {
|
|||||||
// CheckRemotePeer checks the state of a remote peer
|
// CheckRemotePeer checks the state of a remote peer
|
||||||
func (a *API) CheckRemotePeer(url string) (State, map[string]int) {
|
func (a *API) CheckRemotePeer(url string) (State, map[string]int) {
|
||||||
// Check if the database is up
|
// Check if the database is up
|
||||||
var resp APIStatusResponse
|
|
||||||
caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller
|
caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller
|
||||||
|
var resp APIStatusResponse
|
||||||
b, err := caller.CallPost(url, "", map[string]interface{}{}) // Call the status endpoint of the peer
|
b, err := caller.CallPost(url, "", map[string]interface{}{}) // Call the status endpoint of the peer
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return DEAD, map[string]int{} // If the peer is not reachable, return dead
|
return DEAD, map[string]int{} // If the peer is not reachable, return dead
|
||||||
|
@@ -21,11 +21,6 @@ const (
|
|||||||
WORKSPACE_HISTORY
|
WORKSPACE_HISTORY
|
||||||
ORDER
|
ORDER
|
||||||
PURCHASE_RESOURCE
|
PURCHASE_RESOURCE
|
||||||
ADMIRALTY_SOURCE
|
|
||||||
ADMIRALTY_TARGET
|
|
||||||
ADMIRALTY_SECRET
|
|
||||||
ADMIRALTY_KUBECONFIG
|
|
||||||
ADMIRALTY_NODES
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var NOAPI = ""
|
var NOAPI = ""
|
||||||
@@ -35,11 +30,6 @@ var WORKFLOWAPI = "oc-workflow"
|
|||||||
var WORKSPACEAPI = "oc-workspace"
|
var WORKSPACEAPI = "oc-workspace"
|
||||||
var PEERSAPI = "oc-peer"
|
var PEERSAPI = "oc-peer"
|
||||||
var DATACENTERAPI = "oc-datacenter"
|
var DATACENTERAPI = "oc-datacenter"
|
||||||
var ADMIRALTY_SOURCEAPI = DATACENTERAPI+"/admiralty/source"
|
|
||||||
var ADMIRALTY_TARGETAPI = DATACENTERAPI+"/admiralty/target"
|
|
||||||
var ADMIRALTY_SECRETAPI = DATACENTERAPI+"/admiralty/secret"
|
|
||||||
var ADMIRALTY_KUBECONFIGAPI = DATACENTERAPI+"/admiralty/kubeconfig"
|
|
||||||
var ADMIRALTY_NODESAPI = DATACENTERAPI+"/admiralty/node"
|
|
||||||
|
|
||||||
// Bind the standard API name to the data type
|
// Bind the standard API name to the data type
|
||||||
var DefaultAPI = [...]string{
|
var DefaultAPI = [...]string{
|
||||||
@@ -60,11 +50,6 @@ var DefaultAPI = [...]string{
|
|||||||
NOAPI,
|
NOAPI,
|
||||||
NOAPI,
|
NOAPI,
|
||||||
NOAPI,
|
NOAPI,
|
||||||
ADMIRALTY_SOURCEAPI,
|
|
||||||
ADMIRALTY_TARGETAPI,
|
|
||||||
ADMIRALTY_SECRETAPI,
|
|
||||||
ADMIRALTY_KUBECONFIGAPI,
|
|
||||||
ADMIRALTY_NODESAPI,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Bind the standard data name to the data type
|
// Bind the standard data name to the data type
|
||||||
@@ -86,11 +71,6 @@ var Str = [...]string{
|
|||||||
"workspace_history",
|
"workspace_history",
|
||||||
"order",
|
"order",
|
||||||
"purchase_resource",
|
"purchase_resource",
|
||||||
"admiralty_source",
|
|
||||||
"admiralty_target",
|
|
||||||
"admiralty_secret",
|
|
||||||
"admiralty_kubeconfig",
|
|
||||||
"admiralty_node",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func FromInt(i int) string {
|
func FromInt(i int) string {
|
||||||
@@ -111,5 +91,5 @@ func (d DataType) EnumIndex() int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func DataTypeList() []DataType {
|
func DataTypeList() []DataType {
|
||||||
return []DataType{DATA_RESOURCE, PROCESSING_RESOURCE, STORAGE_RESOURCE, COMPUTE_RESOURCE, WORKFLOW_RESOURCE, WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY, ORDER, PURCHASE_RESOURCE,ADMIRALTY_SOURCE,ADMIRALTY_TARGET,ADMIRALTY_SECRET,ADMIRALTY_KUBECONFIG,ADMIRALTY_NODES}
|
return []DataType{DATA_RESOURCE, PROCESSING_RESOURCE, STORAGE_RESOURCE, COMPUTE_RESOURCE, WORKFLOW_RESOURCE, WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY, ORDER, PURCHASE_RESOURCE}
|
||||||
}
|
}
|
||||||
|
@@ -3,7 +3,6 @@ package tools
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
@@ -52,7 +51,6 @@ var HTTPCallerInstance = &HTTPCaller{} // Singleton instance of the HTTPCaller
|
|||||||
type HTTPCaller struct {
|
type HTTPCaller struct {
|
||||||
URLS map[DataType]map[METHOD]string // Map of the different methods and their urls
|
URLS map[DataType]map[METHOD]string // Map of the different methods and their urls
|
||||||
Disabled bool // Disabled flag
|
Disabled bool // Disabled flag
|
||||||
LastResults map[string]interface{} // Used to store information regarding the last execution of a given method on a given data type
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHTTPCaller creates a new instance of the HTTP Caller
|
// NewHTTPCaller creates a new instance of the HTTP Caller
|
||||||
@@ -78,33 +76,17 @@ func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) (
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
err = caller.StoreResp(resp)
|
return io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return caller.LastResults["body"].([]byte), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// CallPut calls the DELETE method on the HTTP server
|
// CallPut calls the DELETE method on the HTTP server
|
||||||
func (caller *HTTPCaller) CallDelete(url string, subpath string) ([]byte, error) {
|
func (caller *HTTPCaller) CallDelete(url string, subpath string) ([]byte, error) {
|
||||||
req, err := http.NewRequest("DELETE", url+subpath, nil)
|
resp, err := http.NewRequest("DELETE", url+subpath, nil)
|
||||||
if err != nil {
|
if err != nil || resp == nil || resp.Body == nil {
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
client := &http.Client{}
|
|
||||||
resp, err := client.Do(req)
|
|
||||||
if err != nil || req == nil || req.Body == nil {
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
return io.ReadAll(resp.Body)
|
||||||
err = caller.StoreResp(resp)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return caller.LastResults["body"].([]byte), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// CallPost calls the POST method on the HTTP server
|
// CallPost calls the POST method on the HTTP server
|
||||||
@@ -123,12 +105,7 @@ func (caller *HTTPCaller) CallPost(url string, subpath string, body interface{},
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
err = caller.StoreResp(resp)
|
return io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return caller.LastResults["body"].([]byte), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// CallPost calls the POST method on the HTTP server
|
// CallPost calls the POST method on the HTTP server
|
||||||
@@ -146,12 +123,7 @@ func (caller *HTTPCaller) CallPut(url string, subpath string, body map[string]in
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
err = caller.StoreResp(resp)
|
return io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return caller.LastResults["body"].([]byte), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// CallRaw calls the Raw method on the HTTP server
|
// CallRaw calls the Raw method on the HTTP server
|
||||||
@@ -171,12 +143,7 @@ func (caller *HTTPCaller) CallRaw(method string, url string, subpath string,
|
|||||||
req.AddCookie(c)
|
req.AddCookie(c)
|
||||||
}
|
}
|
||||||
client := &http.Client{}
|
client := &http.Client{}
|
||||||
resp, err := client.Do(req)
|
return client.Do(req)
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// CallRaw calls the Raw method on the HTTP server
|
// CallRaw calls the Raw method on the HTTP server
|
||||||
@@ -196,17 +163,3 @@ func (caller *HTTPCaller) CallForm(method string, url string, subpath string,
|
|||||||
client := &http.Client{}
|
client := &http.Client{}
|
||||||
return client.Do(req)
|
return client.Do(req)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (caller *HTTPCaller) StoreResp(resp *http.Response) error {
|
|
||||||
caller.LastResults = make(map[string]interface{})
|
|
||||||
caller.LastResults["header"] = resp.Header
|
|
||||||
caller.LastResults["code"] = resp.StatusCode
|
|
||||||
data, err := io.ReadAll(resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("Error reading the body of the last request")
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
caller.LastResults["body"] = data
|
|
||||||
return nil
|
|
||||||
}
|
|
Reference in New Issue
Block a user