Compare commits

...

71 Commits

Author SHA1 Message Date
pb
88c88cac5b testing simplyfied urlFormat() method which works thanks to traefik 2025-03-13 16:57:27 +01:00
pb
1ae38c98ad correct path for ADMIRALTY_NODESAPI 2025-03-13 11:58:57 +01:00
pb
2d517cc594 Correcting an error in CallGet() 2025-03-12 15:41:10 +01:00
pb
a9c82bd261 Replaced the return of Call[Method]() by the stored value of the resp.Body 2025-03-12 15:37:03 +01:00
pb
79aec86f5f Replaced the return of Call[Method]() by the stored value of the resp.Body 2025-03-12 15:26:00 +01:00
pb
9b3dfc7576 fixing how last result in stored in httpcaller 2025-03-12 12:13:55 +01:00
pb
037ae74782 modified the way HTTPCaller store last resposne 2025-03-12 12:09:55 +01:00
pb
b81c60a3ce modified the way HTTPCaller store last resposne 2025-03-12 12:00:32 +01:00
pb
363ac94c47 debug instructions 2025-03-12 11:35:25 +01:00
pb
378f9e5095 Added a new http.Response field to HTTPCaller to store results for each call 2025-03-12 10:39:20 +01:00
pb
659b494ee4 Added a new field to HTTPCaller to store results for each call 2025-03-12 09:45:17 +01:00
pb
92965c6af2 Added more information in error when LaunchPeerExecution method doesn't match caller's 2025-03-11 16:48:05 +01:00
pb
70cb5aec9f changed some variable name for better understanding of process in LaunchPeerExecution 2025-03-11 12:03:35 +01:00
pb
d59e77d5a2 changed how url is consructed in LaunchPeerExecution by placing meth after peer url + dt 2025-03-05 16:42:22 +01:00
pb
ff1b857ab0 removed caller from checkPeerStatus() parameters by adding the path to url 2025-03-05 11:54:14 +01:00
pb
dbdccdb920 Corrected urlFormat() from peer_cache which constructed url with API's name at the end 2025-03-05 11:01:46 +01:00
pb
fd3fef72d3 correct DefaultAPI list 2025-03-03 10:55:00 +01:00
pb
1890fd4f71 added the resources used for admiralty in datacenter API 2025-03-03 10:33:37 +01:00
pb
95af3cb515 removed caller from checkPeerStatus() parameters by adding the path to url 2025-03-03 10:32:52 +01:00
pb
3acebc451e Added the required tag to ExecutionsID 2025-02-26 11:00:37 +01:00
mr
5111c9c8be discovery clear view 2025-02-19 15:29:42 +01:00
mr
3ecb0e9d96 set up auth for workspace 2025-02-19 11:41:52 +01:00
mr
b4a1766677 better load & peear cache for traefik 2025-02-19 11:03:12 +01:00
mr
241c6a5a08 casual debug, time added before change of state bookin & exec 2025-02-19 08:55:11 +01:00
mr
7c30633bde verifyAuthAction for instance 2025-02-18 14:02:29 +01:00
mr
81d3406305 verifyAuthAction for instance 2025-02-18 12:55:49 +01:00
mr
04f7537066 save 2025-02-18 12:39:16 +01:00
mr
6bf058ab5c save 2025-02-18 11:11:40 +01:00
mr
b771b5d25e save 2025-02-18 10:25:08 +01:00
mr
6e6ed4ea2c debug 2025-02-18 09:53:55 +01:00
mr
a098f0a672 conf 2025-02-18 09:01:21 +01:00
mr
cafadec146 missing res access 2025-02-17 08:25:19 +01:00
mr
0940b63961 correct 2025-02-14 16:16:25 +01:00
mr
a2dca94dca correct 2025-02-14 15:01:49 +01:00
mr
085a8718e0 correct 2025-02-13 15:11:23 +01:00
mr
271cc2caa0 workflow scheduler create booking with a booking execution lot id 2025-02-13 09:50:18 +01:00
mr
42b60ca5cd workflow scheduler create booking with a booking execution lot id 2025-02-13 09:10:24 +01:00
mr
4920322d0a workflow scheduler create booking with a booking execution lot id 2025-02-13 08:26:26 +01:00
mr
c7c1535ba9 workflow scheduler create booking with a booking execution lot id 2025-02-12 16:08:15 +01:00
mr
576f53f81b workflow scheduler create booking with a booking execution lot id 2025-02-12 15:45:03 +01:00
mr
c0e6247fb8 workflow scheduler create booking with a booking execution lot id 2025-02-12 15:27:05 +01:00
mr
3e85fdc779 workflow scheduler create booking with a booking execution lot id 2025-02-12 14:14:28 +01:00
mr
4833bcb710 workflow scheduler create booking with a booking execution lot id 2025-02-12 14:08:57 +01:00
mr
7d69d65dd2 workflow scheduler create booking with a booking execution lot id 2025-02-12 11:41:34 +01:00
mr
a098b3797a workflow scheduler create booking with a booking execution lot id 2025-02-11 15:33:01 +01:00
mr
7d03676ac2 workflow scheduler create booking with a booking execution lot id 2025-02-11 14:11:12 +01:00
mr
945b7a893e workflow scheduler create booking with a booking execution lot id 2025-02-11 13:58:24 +01:00
mr
ef028cb2b9 workflow scheduler create booking with a booking execution lot id 2025-02-11 13:54:06 +01:00
mr
4cfd0a1789 workflow scheduler create booking with a booking execution lot id 2025-02-11 12:28:04 +01:00
mr
7c57cf34a8 workflow scheduler create booking with a booking execution lot id 2025-02-11 12:13:34 +01:00
mr
019b590b4f workflow scheduler create booking with a booking execution lot id 2025-02-11 11:26:02 +01:00
mr
d82ae166a1 add purchase resource in model catalog 2025-02-11 09:16:18 +01:00
mr
ffaa67fb5d add purchase resource in model catalog 2025-02-11 08:30:38 +01:00
mr
a573a4ce71 add purchase resource in model catalog 2025-02-11 07:55:15 +01:00
mr
52d5a1fbf9 add purchase resource in model catalog 2025-02-10 13:10:42 +01:00
mr
4ad32401fd add purchase resource in model catalog 2025-02-10 13:04:13 +01:00
mr
f663ec80f5 add purchase resource in model catalog 2025-02-10 11:32:55 +01:00
mr
e55727d9e2 add purchase resource in model catalog 2025-02-10 10:42:37 +01:00
mr
4a178d01e3 add purchase resource in model catalog 2025-02-10 09:58:46 +01:00
mr
3d13833572 workflow execution evolved 2025-02-07 11:41:12 +01:00
mr
31ec352b57 workflow execution evolved 2025-02-07 08:29:57 +01:00
mr
940ef17f7b workflow execution evolved 2025-02-06 12:56:51 +01:00
mr
ad3293da9d workflow execution evolved 2025-02-06 11:13:06 +01:00
mr
3ffff7d32c workflow execution evolved 2025-02-06 09:56:00 +01:00
mr
e646cfef0b workflow execution evolved 2025-02-06 09:08:35 +01:00
plm
5255ffc2f7 Merging issue#4 2025-01-10 17:43:31 +01:00
plm
fd1c579ec4 Removing required field on PeerId, see #7 2025-01-10 17:39:58 +01:00
plm
0f4adeea86 Same prefix for all builtin microservices in opencloud 2025-01-08 16:55:42 +01:00
plm
245f3adea3 Merge branch 'issue#4' 2024-12-16 09:18:58 +01:00
plm
21d08204b5 Fixing env based layer; not using onion builtin mechanism to preserve opencloud conf key/value format 2024-12-16 09:17:54 +01:00
plm
1de4888599 Remove extra underscore character; it breaks the env var loading 2024-12-10 14:01:47 +01:00
30 changed files with 435 additions and 213 deletions

View File

@ -26,12 +26,12 @@ import (
func GetConfLoader() *onion.Onion {
logger := zerolog.New(os.Stdout).With().Timestamp().Logger()
AppName := GetAppName()
EnvPrefix := strings.ToUpper(AppName[0:2]+AppName[3:]) + "_"
EnvPrefix := "OC_"
defaultConfigFile := "/etc/oc/" + AppName[3:] + ".json"
localConfigFile := "./" + AppName[3:] + ".json"
var configFile string
var o *onion.Onion
l3 := onion.NewEnvLayerPrefix("_", EnvPrefix)
l3 := GetEnvVarLayer(EnvPrefix)
l2, err := onion.NewFileLayer(localConfigFile, nil)
if err == nil {
logger.Info().Msg("Local config file found " + localConfigFile + ", overriding default file")
@ -54,3 +54,17 @@ func GetConfLoader() *onion.Onion {
}
return o
}
func GetEnvVarLayer(prefix string) onion.Layer {
envVars := make(map[string]interface{})
for _, e := range os.Environ() {
pair := strings.SplitN(e, "=", 2)
key := pair[0]
if strings.HasPrefix(key, prefix) {
envVars[strings.TrimPrefix(key, prefix)] = pair[1]
}
}
return onion.NewMapLayer(envVars)
}

View File

@ -247,19 +247,21 @@ func ToScheduler(m interface{}) (n *workflow_execution.WorkflowSchedule) {
}
func (r *Request) Schedule(wfID string, scheduler *workflow_execution.WorkflowSchedule) (*workflow_execution.WorkflowSchedule, error) {
if _, _, err := scheduler.Schedules(wfID, &tools.APIRequest{
ws, _, _, err := scheduler.Schedules(wfID, &tools.APIRequest{
Caller: r.caller,
Username: r.user,
PeerID: r.peerID,
Groups: r.groups,
}); err != nil {
})
if err != nil {
return nil, err
}
return scheduler, nil
fmt.Println("BAM", ws)
return ws, nil
}
func (r *Request) CheckBooking(wfID string, start string, end string, durationInS float64, cron string) bool {
ok, _, _, err := workflow_execution.NewScheduler(start, end, durationInS, cron).CheckBooking(wfID, &tools.APIRequest{
ok, _, _, _, err := workflow_execution.NewScheduler(start, end, durationInS, cron).CheckBooking(wfID, &tools.APIRequest{
Caller: r.caller,
Username: r.user,
PeerID: r.peerID,
@ -561,9 +563,9 @@ func (l *LibData) ToRule() *rule.Rule {
return nil
}
func (l *LibData) ToWorkflowExecution() *workflow_execution.WorkflowExecutions {
func (l *LibData) ToWorkflowExecution() *workflow_execution.WorkflowExecution {
if l.Data.GetAccessor(nil).GetType() == tools.WORKFLOW_EXECUTION {
return l.Data.(*workflow_execution.WorkflowExecutions)
return l.Data.(*workflow_execution.WorkflowExecution)
}
return nil
}

View File

@ -15,8 +15,9 @@ import (
*/
type Booking struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
DestPeerID string `json:"dest_peer_id,omitempty"` // DestPeerID is the ID of the destination peer
WorkflowID string `json:"workflow_id,omitempty" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
ExecutionsID string `json:"executions_id,omitempty" bson:"executions_id,omitempty" validate:"required"` // ExecutionsID is the ID of the executions
DestPeerID string `json:"dest_peer_id,omitempty"` // DestPeerID is the ID of the destination peer
WorkflowID string `json:"workflow_id,omitempty" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
ExecutionID string `json:"execution_id,omitempty" bson:"execution_id,omitempty" validate:"required"`
State enum.BookingStatus `json:"state,omitempty" bson:"state,omitempty" validate:"required"` // State is the state of the booking
ExpectedStartDate time.Time `json:"expected_start_date,omitempty" bson:"expected_start_date,omitempty" validate:"required"` // ExpectedStartDate is the expected start date of the booking
@ -80,10 +81,6 @@ func (d *Booking) GetDelayOnDuration() time.Duration {
return d.GetRealDuration() - d.GetUsualDuration()
}
func (d *Booking) GetName() string {
return d.GetID() + "_" + d.ExpectedStartDate.String()
}
func (d *Booking) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request) // Create a new instance of the accessor
}
@ -93,7 +90,7 @@ func (d *Booking) VerifyAuth(request *tools.APIRequest) bool {
}
func (r *Booking) StoreDraftDefault() {
r.IsDraft = true
r.IsDraft = false
}
func (r *Booking) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {

View File

@ -1,6 +1,7 @@
package booking
import (
"errors"
"time"
"cloud.o-forge.io/core/oc-lib/dbs"
@ -33,7 +34,11 @@ func (a *bookingMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error)
}
func (a *bookingMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
return utils.GenericUpdateOne(set, id, a, &Booking{})
if set.(*Booking).State == 0 {
return nil, 400, errors.New("state is required")
}
realSet := &Booking{State: set.(*Booking).State}
return utils.GenericUpdateOne(realSet, id, a, &Booking{})
}
func (a *bookingMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
@ -46,10 +51,15 @@ func (a *bookingMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int
func (a *bookingMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne[*Booking](id, func(d utils.DBObject) (utils.DBObject, int, error) {
now := time.Now()
now = now.Add(time.Second * -60)
if d.(*Booking).State == enum.DRAFT && now.UTC().After(d.(*Booking).ExpectedStartDate) {
return utils.GenericDeleteOne(d.GetID(), a)
}
if (d.(*Booking).ExpectedEndDate) == nil {
d.(*Booking).State = enum.FORGOTTEN
utils.GenericRawUpdateOne(d, id, a)
} else if d.(*Booking).State == enum.SCHEDULED && time.Now().UTC().After(*&d.(*Booking).ExpectedStartDate) {
} else if d.(*Booking).State == enum.SCHEDULED && now.UTC().After(d.(*Booking).ExpectedStartDate) {
d.(*Booking).State = enum.DELAYED
utils.GenericRawUpdateOne(d, id, a)
}
@ -67,7 +77,13 @@ func (a *bookingMongoAccessor) Search(filters *dbs.Filters, search string, isDra
func (a *bookingMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject {
return func(d utils.DBObject) utils.ShallowDBObject {
if d.(*Booking).State == enum.SCHEDULED && time.Now().UTC().After(*&d.(*Booking).ExpectedStartDate) {
now := time.Now()
now = now.Add(time.Second * -60)
if d.(*Booking).State == enum.DRAFT && now.UTC().After(d.(*Booking).ExpectedStartDate) {
utils.GenericDeleteOne(d.GetID(), a)
return nil
}
if d.(*Booking).State == enum.SCHEDULED && now.UTC().After(d.(*Booking).ExpectedStartDate) {
d.(*Booking).State = enum.DELAYED
utils.GenericRawUpdateOne(d, d.GetID(), a)
}

View File

@ -42,13 +42,15 @@ const (
S3
MEMORY
HARDWARE
AZURE
GCS
)
// String() - Returns the string representation of the storage type
func (t StorageType) String() string {
return [...]string{"FILE", "STREAM", "API", "DATABASE", "S3", "MEMORY", "HARDWARE"}[t]
return [...]string{"FILE", "STREAM", "API", "DATABASE", "S3", "MEMORY", "HARDWARE", "AZURE", "GCS"}[t]
}
func TypeList() []StorageType {
return []StorageType{FILE, STREAM, API, DATABASE, S3, MEMORY, HARDWARE}
return []StorageType{FILE, STREAM, API, DATABASE, S3, MEMORY, HARDWARE, AZURE, GCS}
}

View File

@ -7,7 +7,7 @@ import (
"cloud.o-forge.io/core/oc-lib/tools"
)
func GetPlannerNearestStart(start time.Time, planned map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 {
func GetPlannerNearestStart(start time.Time, planned map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest) float64 {
near := float64(10000000000) // set a high value
for _, items := range planned { // loop through the planned items
for _, priced := range items { // loop through the priced items
@ -23,7 +23,7 @@ func GetPlannerNearestStart(start time.Time, planned map[tools.DataType][]pricin
return near
}
func GetPlannerLongestTime(end *time.Time, planned map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 {
func GetPlannerLongestTime(end *time.Time, planned map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest) float64 {
if end == nil {
return -1
}

View File

@ -3,6 +3,7 @@ package models
import (
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/tools"
"cloud.o-forge.io/core/oc-lib/models/booking"
@ -27,7 +28,7 @@ var models = map[string]func() utils.DBObject{
tools.STORAGE_RESOURCE.String(): func() utils.DBObject { return &resource.StorageResource{} },
tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &resource.ProcessingResource{} },
tools.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} },
tools.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecutions{} },
tools.WORKFLOW_EXECUTION.String(): func() utils.DBObject { return &workflow_execution.WorkflowExecution{} },
tools.WORKSPACE.String(): func() utils.DBObject { return &w3.Workspace{} },
tools.PEER.String(): func() utils.DBObject { return &peer.Peer{} },
tools.COLLABORATIVE_AREA.String(): func() utils.DBObject { return &collaborative_area.CollaborativeArea{} },
@ -36,6 +37,7 @@ var models = map[string]func() utils.DBObject{
tools.WORKFLOW_HISTORY.String(): func() utils.DBObject { return &w2.WorkflowHistory{} },
tools.WORKSPACE_HISTORY.String(): func() utils.DBObject { return &w3.WorkspaceHistory{} },
tools.ORDER.String(): func() utils.DBObject { return &order.Order{} },
tools.PURCHASE_RESOURCE.String(): func() utils.DBObject { return &purchase_resource.PurchaseResource{} },
}
// Model returns the model object based on the model type

View File

@ -24,6 +24,7 @@ import (
type Order struct {
utils.AbstractObject
OrderBy string `json:"order_by" bson:"order_by" validate:"required"`
WorkflowID string `json:"workflow_id" bson:"workflow_id" validate:"required"`
WorkflowExecutionIDs []string `json:"workflow_execution_ids" bson:"workflow_execution_ids" validate:"required"`
Status enum.CompletionStatus `json:"status" bson:"status" default:"0"`
SubOrders map[string]*PeerOrder `json:"sub_orders" bson:"sub_orders"`
@ -67,10 +68,10 @@ func (o *Order) Pay(scheduler *workflow_execution.WorkflowSchedule, request *too
} else {
o.IsDraft = false
}
for _, exec := range scheduler.WorkflowExecutions {
for _, exec := range scheduler.WorkflowExecution {
exec.IsDraft = false
_, code, err := utils.GenericUpdateOne(exec, exec.GetID(),
workflow_execution.NewAccessor(request), &workflow_execution.WorkflowExecutions{})
workflow_execution.NewAccessor(request), &workflow_execution.WorkflowExecution{})
if code != 200 || err != nil {
return errors.New("could not update the workflow execution" + fmt.Sprintf("%v", err))
}
@ -90,14 +91,16 @@ func (o *Order) draftStoreFromModel(scheduler *workflow_execution.WorkflowSchedu
if request == nil {
return errors.New("no request found")
}
if scheduler.Workflow.Graph == nil { // if the workflow has no graph, return an error
fmt.Println("Drafting order", scheduler.Workflow)
if scheduler.Workflow == nil || scheduler.Workflow.Graph == nil { // if the workflow has no graph, return an error
return errors.New("no graph found")
}
o.SetName()
o.WorkflowID = scheduler.Workflow.GetID()
o.IsDraft = true
o.OrderBy = request.Username
o.OrderBy = request.PeerID
o.WorkflowExecutionIDs = []string{} // create an array of ids
for _, exec := range scheduler.WorkflowExecutions {
for _, exec := range scheduler.WorkflowExecution {
o.WorkflowExecutionIDs = append(o.WorkflowExecutionIDs, exec.GetID())
}
// set the name of the order
@ -124,6 +127,9 @@ func (o *Order) draftStoreFromModel(scheduler *workflow_execution.WorkflowSchedu
for _, resource := range resources {
peerOrder.AddItem(resource, len(resources)) // TODO SPECIALS REF ADDITIONALS NOTES
}
if o.SubOrders == nil {
o.SubOrders = map[string]*PeerOrder{}
}
o.SubOrders[peerOrder.GetID()] = peerOrder
}
// search an order with same user name and same session id
@ -134,7 +140,8 @@ func (o *Order) draftStoreFromModel(scheduler *workflow_execution.WorkflowSchedu
// should store the order
res, code, err := o.GetAccessor(request).Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"order_by": {{Operator: dbs.EQUAL.String(), Value: request.Username}},
"workflow_id": {{Operator: dbs.EQUAL.String(), Value: o.WorkflowID}},
"order_by": {{Operator: dbs.EQUAL.String(), Value: request.PeerID}},
},
}, "", o.IsDraft)
if code != 200 || err != nil {
@ -159,12 +166,12 @@ func (o *Order) draftBookOrder(scheduler *workflow_execution.WorkflowSchedule, r
if request == nil {
return draftedBookings, errors.New("no request found")
}
for _, exec := range scheduler.WorkflowExecutions {
for _, exec := range scheduler.WorkflowExecution {
_, priceds, _, err := scheduler.Workflow.Planify(exec.ExecDate, exec.EndDate, request)
if err != nil {
return draftedBookings, errors.New("could not planify the workflow" + fmt.Sprintf("%v", err))
}
bookings := exec.Book(scheduler.Workflow.UUID, priceds)
bookings := exec.Book(scheduler.UUID, scheduler.Workflow.UUID, priceds)
for _, booking := range bookings {
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)
@ -321,4 +328,5 @@ func (d *PeerItemOrder) GetPrice(request *tools.APIRequest) (float64, error) {
return p * float64(d.Quantity), nil
}
// WTF HOW TO SELECT THE RIGHT PRICE ???
// SHOULD SET A BUYING STATUS WHEN PAYMENT IS VALIDATED

View File

@ -4,7 +4,6 @@ import (
"encoding/json"
"errors"
"fmt"
"regexp"
"strings"
"cloud.o-forge.io/core/oc-lib/tools"
@ -29,47 +28,40 @@ type PeerCache struct {
}
// urlFormat formats the URL of the peer with the data type API function
func (p *PeerCache) urlFormat(url string, dt tools.DataType) string {
func (p *PeerCache) urlFormat(hostUrl string, dt tools.DataType) string {
// localhost is replaced by the local peer URL
// because localhost must collide on a web request security protocol
localhost := ""
if strings.Contains(url, "localhost") {
/*localhost := ""
if strings.Contains(hostUrl, "localhost") {
localhost = "localhost"
}
if strings.Contains(url, "127.0.0.1") {
if strings.Contains(hostUrl, "127.0.0.1") {
localhost = "127.0.0.1"
}
if localhost != "" {
r := regexp.MustCompile("(" + localhost + ":[0-9]+)")
t := r.FindString(url)
t := r.FindString(hostUrl)
if t != "" {
url = strings.Replace(url, t, dt.API()+":8080/oc", -1)
hostUrl = strings.Replace(hostUrl, t, dt.API()+":8080/oc", -1)
} else {
url = strings.ReplaceAll(url, localhost, dt.API()+":8080/oc")
hostUrl = strings.ReplaceAll(hostUrl, localhost, dt.API()+":8080/oc")
}
} else {
url = url + "/" + dt.API()
}
return url
} else {*/
hostUrl = hostUrl + "/" + strings.ReplaceAll(dt.API(), "oc-", "")
//}
fmt.Println("Contacting", hostUrl)
return hostUrl
}
// checkPeerStatus checks the status of a peer
func (p *PeerCache) checkPeerStatus(peerID string, appName string, caller *tools.HTTPCaller) (*Peer, bool) {
func (p *PeerCache) checkPeerStatus(peerID string, appName string) (*Peer, bool) {
api := tools.API{}
access := NewShallowAccessor()
res, code, _ := access.LoadOne(peerID) // Load the peer from db
if code != 200 { // no peer no party
return nil, false
}
methods := caller.URLS[tools.PEER] // Get the methods url of the peer
if methods == nil {
return res.(*Peer), false
}
meth := methods[tools.POST] // Get the POST method to check status
if meth == "" {
return res.(*Peer), false
}
url := p.urlFormat(res.(*Peer).Url, tools.PEER) + meth // Format the URL
url := p.urlFormat(res.(*Peer).Url, tools.PEER) + "/status" // Format the URL
state, services := api.CheckRemotePeer(url)
res.(*Peer).ServicesState = services // Update the services states of the peer
access.UpdateOne(res, peerID) // Update the peer in the db
@ -77,23 +69,24 @@ func (p *PeerCache) checkPeerStatus(peerID string, appName string, caller *tools
}
// LaunchPeerExecution launches an execution on a peer
// The method contacts the path described by : peer.Url + datatype path (from enums) + replacement of id by dataID
func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
dt tools.DataType, method tools.METHOD, body interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
fmt.Println("Launching peer execution on", caller.URLS, dt, method)
methods := caller.URLS[dt] // Get the methods url of the data type
if m, ok := methods[method]; !ok || m == "" {
return nil, errors.New("no path found")
return nil, errors.New("Requested method " + method.String() + " not declared in HTTPCaller")
}
meth := methods[method] // Get the method url to execute
meth = strings.ReplaceAll(meth, ":id", dataID) // Replace the id in the url in case of a DELETE / UPDATE method (it's a standard naming in OC)
path := methods[method] // Get the path corresponding to the action we want to execute
path = strings.ReplaceAll(path, ":id", dataID) // Replace the id in the path in case of a DELETE / UPDATE method (it's a standard naming in OC)
url := ""
// Check the status of the peer
if mypeer, ok := p.checkPeerStatus(peerID, dt.API(), caller); !ok && mypeer != nil {
if mypeer, ok := p.checkPeerStatus(peerID, dt.API()); !ok && mypeer != nil {
// If the peer is not reachable, add the execution to the failed executions list
pexec := &PeerExecution{
Method: method.String(),
Url: p.urlFormat((mypeer.Url)+meth, dt),
Url: p.urlFormat((mypeer.Url), dt) + path, // the url is constitued of : host URL + resource path + action path (ex : mypeer.com/datacenter/resourcetype/path/to/action)
Body: body,
DataType: dt.EnumIndex(),
DataID: dataID,
@ -106,7 +99,7 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
return nil, errors.New("peer not found")
}
// If the peer is reachable, launch the execution
url = p.urlFormat((mypeer.Url)+meth, dt) // Format the URL
url = p.urlFormat((mypeer.Url), dt) + path // Format the URL
tmp := mypeer.FailedExecution // Get the failed executions list
mypeer.FailedExecution = []PeerExecution{} // Reset the failed executions list
NewShallowAccessor().UpdateOne(mypeer, peerID) // Update the peer in the db

View File

@ -17,7 +17,7 @@ import (
* it defines the resource compute
*/
type ComputeResource struct {
AbstractIntanciatedResource[*ComputeResourceInstance]
AbstractInstanciatedResource[*ComputeResourceInstance]
Architecture string `json:"architecture,omitempty" bson:"architecture,omitempty"` // Architecture is the architecture
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
}
@ -35,7 +35,7 @@ func (abs *ComputeResource) ConvertToPricedResource(
if t != tools.COMPUTE_RESOURCE {
return nil
}
p := abs.AbstractIntanciatedResource.ConvertToPricedResource(t, request)
p := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, request)
priced := p.(*PricedResource)
return &PricedComputeResource{
PricedResource: *priced,
@ -52,6 +52,7 @@ type ComputeNode struct {
type ComputeResourceInstance struct {
ResourceInstance[*ComputeResourcePartnership]
Source string `json:"source,omitempty" bson:"source,omitempty"` // Source is the source of the resource
SecurityLevel string `json:"security_level,omitempty" bson:"security_level,omitempty"`
PowerSources []string `json:"power_sources,omitempty" bson:"power_sources,omitempty"`
AnnualCO2Emissions float64 `json:"annual_co2_emissions,omitempty" bson:"co2_emissions,omitempty"`
@ -138,11 +139,19 @@ func (r *PricedComputeResource) GetType() tools.DataType {
}
func (r *PricedComputeResource) GetPrice() (float64, error) {
if r.UsageStart == nil || r.UsageEnd == nil {
return 0, errors.New("usage start and end must be set")
now := time.Now()
if r.UsageStart == nil {
r.UsageStart = &now
}
if r.UsageEnd == nil {
add := r.UsageStart.Add(time.Duration(1 * time.Hour))
r.UsageEnd = &add
}
if r.SelectedPricing == nil {
return 0, errors.New("selected pricing must be set")
if len(r.PricingProfiles) == 0 {
return 0, errors.New("pricing profile must be set on Priced Compute" + r.ResourceID)
}
r.SelectedPricing = &r.PricingProfiles[0]
}
pricing := *r.SelectedPricing
price := float64(0)

View File

@ -2,6 +2,7 @@ package resources
import (
"errors"
"fmt"
"time"
"cloud.o-forge.io/core/oc-lib/models/common/models"
@ -15,7 +16,7 @@ import (
* it defines the resource data
*/
type DataResource struct {
AbstractIntanciatedResource[*DataInstance]
AbstractInstanciatedResource[*DataInstance]
Type string `bson:"type,omitempty" json:"type,omitempty"`
Quality string `bson:"quality,omitempty" json:"quality,omitempty"`
OpenData bool `bson:"open_data" json:"open_data" default:"false"` // Type is the type of the storage
@ -41,7 +42,7 @@ func (abs *DataResource) ConvertToPricedResource(
if t != tools.DATA_RESOURCE {
return nil
}
p := abs.AbstractIntanciatedResource.ConvertToPricedResource(t, request)
p := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, request)
priced := p.(*PricedResource)
return &PricedDataResource{
PricedResource: *priced,
@ -150,11 +151,20 @@ func (r *PricedDataResource) GetType() tools.DataType {
}
func (r *PricedDataResource) GetPrice() (float64, error) {
if r.UsageStart == nil || r.UsageEnd == nil {
return 0, errors.New("usage start and end must be set")
fmt.Println("GetPrice", r.UsageStart, r.UsageEnd)
now := time.Now()
if r.UsageStart == nil {
r.UsageStart = &now
}
if r.UsageEnd == nil {
add := r.UsageStart.Add(time.Duration(1 * time.Hour))
r.UsageEnd = &add
}
if r.SelectedPricing == nil {
return 0, errors.New("selected pricing must be set")
if len(r.PricingProfiles) == 0 {
return 0, errors.New("pricing profile must be set on Priced Data" + r.ResourceID)
}
r.SelectedPricing = &r.PricingProfiles[0]
}
pricing := *r.SelectedPricing
var err error

View File

@ -12,6 +12,7 @@ type ResourceInterface interface {
ConvertToPricedResource(t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF
GetType() string
GetSelectedInstance() utils.DBObject
ClearEnv() utils.DBObject
SetAllowedInstances(request *tools.APIRequest)
}
@ -20,6 +21,7 @@ type ResourceInstanceITF interface {
GetID() string
GetName() string
StoreDraftDefault()
ClearEnv()
GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF
GetPeerGroups() ([]ResourcePartnerITF, []map[string][]string)
ClearPeerGroups()

View File

@ -2,6 +2,7 @@ package resources
import (
"errors"
"fmt"
"time"
"cloud.o-forge.io/core/oc-lib/models/common/pricing"
@ -34,18 +35,6 @@ func (abs *PricedResource) GetCreatorID() string {
return abs.CreatorID
}
func (abs *PricedResource) SetStartUsage(start time.Time) {
if abs.UsageStart == nil {
abs.UsageStart = &start
}
}
func (abs *PricedResource) SetEndUsage(end time.Time) {
if abs.UsageEnd == nil {
abs.UsageEnd = &end
}
}
func (abs *PricedResource) IsPurchased() bool {
if abs.SelectedPricing == nil {
return false
@ -71,20 +60,34 @@ func (abs *PricedResource) SetLocationEnd(end time.Time) {
func (abs *PricedResource) GetExplicitDurationInS() float64 {
if abs.ExplicitBookingDurationS == 0 {
if abs.UsageEnd == nil || abs.UsageStart == nil {
if abs.UsageEnd == nil && abs.UsageStart == nil {
return time.Duration(1 * time.Hour).Seconds()
}
if abs.UsageEnd == nil {
add := abs.UsageStart.Add(time.Duration(1 * time.Hour))
abs.UsageEnd = &add
}
return abs.UsageEnd.Sub(*abs.UsageStart).Seconds()
}
return abs.ExplicitBookingDurationS
}
func (r *PricedResource) GetPrice() (float64, error) {
if r.UsageStart == nil || r.UsageEnd == nil {
return 0, errors.New("usage start and end must be set")
fmt.Println("GetPrice", r.UsageStart, r.UsageEnd)
now := time.Now()
if r.UsageStart == nil {
r.UsageStart = &now
}
if r.UsageEnd == nil {
add := r.UsageStart.Add(time.Duration(1 * time.Hour))
r.UsageEnd = &add
}
if r.SelectedPricing == nil {
return 0, errors.New("selected pricing must be set")
if len(r.PricingProfiles) == 0 {
return 0, errors.New("pricing profile must be set on Priced Resource " + r.ResourceID)
}
r.SelectedPricing = &r.PricingProfiles[0]
}
return (*r.SelectedPricing).GetPrice(1, 0, *r.UsageStart, *r.UsageEnd)
pricing := *r.SelectedPricing
return pricing.GetPrice(1, 0, *r.UsageStart, *r.UsageEnd)
}

View File

@ -25,7 +25,7 @@ type ProcessingUsage struct {
* it defines the resource processing
*/
type ProcessingResource struct {
AbstractIntanciatedResource[*ProcessingInstance]
AbstractInstanciatedResource[*ProcessingInstance]
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
IsService bool `json:"is_service,omitempty" bson:"is_service,omitempty"` // IsService is a flag that indicates if the processing is a service
Usage *ProcessingUsage `bson:"usage,omitempty" json:"usage,omitempty"` // Usage is the usage of the processing

View File

@ -17,9 +17,9 @@ type purchaseResourceMongoAccessor struct {
func NewAccessor(request *tools.APIRequest) *purchaseResourceMongoAccessor {
return &purchaseResourceMongoAccessor{
AbstractAccessor: utils.AbstractAccessor{
Logger: logs.CreateLogger(tools.BUYING_STATUS.String()), // Create a logger with the data type
Logger: logs.CreateLogger(tools.PURCHASE_RESOURCE.String()), // Create a logger with the data type
Request: request,
Type: tools.BUYING_STATUS,
Type: tools.PURCHASE_RESOURCE,
},
}
}

View File

@ -47,12 +47,12 @@ func (r *AbstractResource) CanDelete() bool {
return r.IsDraft // only draft bookings can be deleted
}
type AbstractIntanciatedResource[T ResourceInstanceITF] struct {
type AbstractInstanciatedResource[T ResourceInstanceITF] struct {
AbstractResource // AbstractResource contains the basic fields of an object (id, name)
Instances []T `json:"instances,omitempty" bson:"instances,omitempty"` // Bill is the bill of the resource // Bill is the bill of the resource
}
func (abs *AbstractIntanciatedResource[T]) ConvertToPricedResource(
func (abs *AbstractInstanciatedResource[T]) ConvertToPricedResource(
t tools.DataType, request *tools.APIRequest) pricing.PricedItemITF {
instances := map[string]string{}
profiles := []pricing.PricingProfileITF{}
@ -71,7 +71,14 @@ func (abs *AbstractIntanciatedResource[T]) ConvertToPricedResource(
}
}
func (r *AbstractIntanciatedResource[T]) GetSelectedInstance() utils.DBObject {
func (abs *AbstractInstanciatedResource[T]) ClearEnv() utils.DBObject {
for _, instance := range abs.Instances {
instance.ClearEnv()
}
return abs
}
func (r *AbstractInstanciatedResource[T]) GetSelectedInstance() utils.DBObject {
if r.SelectedInstanceIndex != nil && len(r.Instances) > *r.SelectedInstanceIndex {
return r.Instances[*r.SelectedInstanceIndex]
}
@ -81,11 +88,14 @@ func (r *AbstractIntanciatedResource[T]) GetSelectedInstance() utils.DBObject {
return nil
}
func (abs *AbstractIntanciatedResource[T]) SetAllowedInstances(request *tools.APIRequest) {
func (abs *AbstractInstanciatedResource[T]) SetAllowedInstances(request *tools.APIRequest) {
if request != nil && request.PeerID == abs.CreatorID && request.PeerID != "" {
return
}
abs.Instances = verifyAuthAction[T](abs.Instances, request)
}
func (d *AbstractIntanciatedResource[T]) Trim() {
func (d *AbstractInstanciatedResource[T]) Trim() {
d.Type = d.GetType()
if ok, _ := (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: d.CreatorID}}).IsMySelf(); !ok {
for _, instance := range d.Instances {
@ -94,7 +104,7 @@ func (d *AbstractIntanciatedResource[T]) Trim() {
}
}
func (abs *AbstractIntanciatedResource[T]) VerifyAuth(request *tools.APIRequest) bool {
func (abs *AbstractInstanciatedResource[T]) VerifyAuth(request *tools.APIRequest) bool {
return len(verifyAuthAction[T](abs.Instances, request)) > 0 || abs.AbstractObject.VerifyAuth(request)
}
@ -127,6 +137,11 @@ type GeoPoint struct {
Longitude float64 `json:"longitude,omitempty" bson:"longitude,omitempty"`
}
type Credentials struct {
Login string `json:"login,omitempty" bson:"login,omitempty"`
Pass string `json:"password,omitempty" bson:"password,omitempty"`
}
type ResourceInstance[T ResourcePartnerITF] struct {
utils.AbstractObject
Location GeoPoint `json:"location,omitempty" bson:"location,omitempty"`
@ -138,6 +153,12 @@ type ResourceInstance[T ResourcePartnerITF] struct {
Partnerships []T `json:"partnerships,omitempty" bson:"partnerships,omitempty"`
}
func (ri *ResourceInstance[T]) ClearEnv() {
ri.Env = []models.Param{}
ri.Inputs = []models.Param{}
ri.Outputs = []models.Param{}
}
func (ri *ResourceInstance[T]) GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF {
pricings := []pricing.PricingProfileITF{}
for _, p := range ri.Partnerships {
@ -169,10 +190,19 @@ type ResourcePartnerShip[T pricing.PricingProfileITF] struct {
}
func (ri *ResourcePartnerShip[T]) GetPricingsProfiles(peerID string, groups []string) []pricing.PricingProfileITF {
profiles := []pricing.PricingProfileITF{}
if ri.PeerGroups[peerID] != nil {
for _, ri := range ri.PricingProfiles {
profiles = append(profiles, ri)
}
if slices.Contains(groups, "*") {
for _, ri := range ri.PricingProfiles {
profiles = append(profiles, ri)
}
return profiles
}
for _, p := range ri.PeerGroups[peerID] {
if slices.Contains(groups, p) {
profiles := []pricing.PricingProfileITF{}
for _, ri := range ri.PricingProfiles {
profiles = append(profiles, ri)
}
@ -180,7 +210,7 @@ func (ri *ResourcePartnerShip[T]) GetPricingsProfiles(peerID string, groups []st
}
}
}
return []pricing.PricingProfileITF{}
return profiles
}
func (rp *ResourcePartnerShip[T]) GetPeerGroups() map[string][]string {

View File

@ -65,6 +65,12 @@ func (wfa *resourceMongoAccessor[T]) LoadAll(isDraft bool) ([]utils.ShallowDBObj
}
func (wfa *resourceMongoAccessor[T]) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) {
if filters == nil && search == "*" {
return utils.GenericLoadAll[T](func(d utils.DBObject) utils.ShallowDBObject {
d.(T).SetAllowedInstances(wfa.Request)
return d
}, isDraft, wfa)
}
return utils.GenericSearch[T](filters, search, wfa.getResourceFilter(search),
func(d utils.DBObject) utils.ShallowDBObject {
d.(T).SetAllowedInstances(wfa.Request)
@ -73,9 +79,6 @@ func (wfa *resourceMongoAccessor[T]) Search(filters *dbs.Filters, search string,
}
func (abs *resourceMongoAccessor[T]) getResourceFilter(search string) *dbs.Filters {
if search == "*" {
search = ""
}
return &dbs.Filters{
Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided
"abstractintanciatedresource.abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},

View File

@ -2,6 +2,7 @@ package resources
import (
"errors"
"fmt"
"time"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
@ -16,9 +17,9 @@ import (
* it defines the resource storage
*/
type StorageResource struct {
AbstractIntanciatedResource[*StorageResourceInstance] // AbstractResource contains the basic fields of an object (id, name)
StorageType enum.StorageType `bson:"storage_type" json:"storage_type" default:"-1"` // Type is the type of the storage
Acronym string `bson:"acronym,omitempty" json:"acronym,omitempty"` // Acronym is the acronym of the storage
AbstractInstanciatedResource[*StorageResourceInstance] // AbstractResource contains the basic fields of an object (id, name)
StorageType enum.StorageType `bson:"storage_type" json:"storage_type" default:"-1"` // Type is the type of the storage
Acronym string `bson:"acronym,omitempty" json:"acronym,omitempty"` // Acronym is the acronym of the storage
}
func (d *StorageResource) GetAccessor(request *tools.APIRequest) utils.Accessor {
@ -34,7 +35,7 @@ func (abs *StorageResource) ConvertToPricedResource(
if t != tools.STORAGE_RESOURCE {
return nil
}
p := abs.AbstractIntanciatedResource.ConvertToPricedResource(t, request)
p := abs.AbstractInstanciatedResource.ConvertToPricedResource(t, request)
priced := p.(*PricedResource)
return &PricedStorageResource{
PricedResource: *priced,
@ -43,6 +44,7 @@ func (abs *StorageResource) ConvertToPricedResource(
type StorageResourceInstance struct {
ResourceInstance[*StorageResourcePartnership]
Credentials *Credentials `json:"credentials,omitempty" bson:"credentials,omitempty"`
Source string `bson:"source,omitempty" json:"source,omitempty"` // Source is the source of the storage
Local bool `bson:"local" json:"local"`
SecurityLevel string `bson:"security_level,omitempty" json:"security_level,omitempty"`
@ -53,6 +55,13 @@ type StorageResourceInstance struct {
Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` // Throughput is the throughput of the storage
}
func (ri *StorageResourceInstance) ClearEnv() {
ri.Credentials = nil
ri.Env = []models.Param{}
ri.Inputs = []models.Param{}
ri.Outputs = []models.Param{}
}
func (ri *StorageResourceInstance) StoreDraftDefault() {
found := false
for _, p := range ri.ResourceInstance.Env {
@ -161,11 +170,20 @@ func (r *PricedStorageResource) GetType() tools.DataType {
}
func (r *PricedStorageResource) GetPrice() (float64, error) {
if r.UsageStart == nil || r.UsageEnd == nil {
return 0, errors.New("usage start and end must be set")
fmt.Println("GetPrice", r.UsageStart, r.UsageEnd)
now := time.Now()
if r.UsageStart == nil {
r.UsageStart = &now
}
if r.UsageEnd == nil {
add := r.UsageStart.Add(time.Duration(1 * time.Hour))
r.UsageEnd = &add
}
if r.SelectedPricing == nil {
return 0, errors.New("selected pricing must be set")
if len(r.PricingProfiles) == 0 {
return 0, errors.New("pricing profile must be set on Priced Storage" + r.ResourceID)
}
r.SelectedPricing = &r.PricingProfiles[0]
}
pricing := *r.SelectedPricing
var err error

View File

@ -23,6 +23,10 @@ func (r *WorkflowResource) GetType() string {
return tools.WORKFLOW_RESOURCE.String()
}
func (d *WorkflowResource) ClearEnv() utils.DBObject {
return d
}
func (d *WorkflowResource) Trim() {
/* EMPTY */
}

View File

@ -91,7 +91,7 @@ func (ao *AbstractObject) UpToDate(user string, peer string, create bool) {
}
func (ao *AbstractObject) VerifyAuth(request *tools.APIRequest) bool {
return ao.AccessMode == Public || (request != nil && ao.CreatorID == request.PeerID)
return ao.AccessMode == Public || (request != nil && ao.CreatorID == request.PeerID && request.PeerID != "")
}
func (ao *AbstractObject) GetObjectFilters(search string) *dbs.Filters {

View File

@ -66,7 +66,6 @@ func GenericDeleteOne(id string, a Accessor) (DBObject, int, error) {
return nil, 403, errors.New("you are not allowed to delete :" + a.GetType().String())
}
if err != nil {
a.GetLogger().Error().Msg("Could not retrieve " + id + " to db. Error: " + err.Error())
return nil, code, err
}
if a.ShouldVerifyAuth() && !res.VerifyAuth(a.GetRequest()) {
@ -114,7 +113,6 @@ func GenericLoadOne[T DBObject](id string, f func(DBObject) (DBObject, int, erro
var data T
res_mongo, code, err := mongo.MONGOService.LoadOne(id, a.GetType().String())
if err != nil {
a.GetLogger().Error().Msg("Could not retrieve " + id + " from db. Error: " + err.Error())
return nil, code, err
}
res_mongo.Decode(&data)
@ -128,7 +126,6 @@ func genericLoadAll[T DBObject](res *mgb.Cursor, code int, err error, onlyDraft
objs := []ShallowDBObject{}
var results []T
if err != nil {
a.GetLogger().Error().Msg("Could not retrieve any from db. Error: " + err.Error())
return nil, code, err
}
if err = res.All(mongo.MngoCtx, &results); err != nil {

View File

@ -128,8 +128,8 @@ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) {
return true, nil
}
func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIRequest) (float64, map[tools.DataType][]pricing.PricedItemITF, *Workflow, error) {
priceds := map[tools.DataType][]pricing.PricedItemITF{}
func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIRequest) (float64, map[tools.DataType]map[string]pricing.PricedItemITF, *Workflow, error) {
priceds := map[tools.DataType]map[string]pricing.PricedItemITF{}
ps, priceds, err := plan[*resources.ProcessingResource](tools.PROCESSING_RESOURCE, wf, priceds, request, wf.Graph.IsProcessing,
func(res resources.ResourceInterface, priced pricing.PricedItemITF) (time.Time, float64) {
return start.Add(time.Duration(wf.Graph.GetAverageTimeProcessingBeforeStart(0, res.GetID(), request)) * time.Second), priced.GetExplicitDurationInS()
@ -189,12 +189,12 @@ func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIR
return longest, priceds, wf, nil
}
func plan[T resources.ResourceInterface](dt tools.DataType, wf *Workflow, priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest,
f func(graph.GraphItem) bool, start func(resources.ResourceInterface, pricing.PricedItemITF) (time.Time, float64), end func(time.Time, float64) *time.Time) ([]T, map[tools.DataType][]pricing.PricedItemITF, error) {
func plan[T resources.ResourceInterface](dt tools.DataType, wf *Workflow, priceds map[tools.DataType]map[string]pricing.PricedItemITF, request *tools.APIRequest,
f func(graph.GraphItem) bool, start func(resources.ResourceInterface, pricing.PricedItemITF) (time.Time, float64), end func(time.Time, float64) *time.Time) ([]T, map[tools.DataType]map[string]pricing.PricedItemITF, error) {
resources := []T{}
for _, item := range wf.GetGraphItems(f) {
if priceds[dt] == nil {
priceds[dt] = []pricing.PricedItemITF{}
priceds[dt] = map[string]pricing.PricedItemITF{}
}
dt, realItem := item.GetResource()
if realItem == nil {
@ -212,7 +212,7 @@ func plan[T resources.ResourceInterface](dt tools.DataType, wf *Workflow, priced
priced.SetLocationEnd(*e)
}
resources = append(resources, realItem.(T))
priceds[dt] = append(priceds[dt], priced)
priceds[dt][item.ID] = priced
}
return resources, priceds, nil
}

View File

@ -92,15 +92,15 @@ func (a *workflowMongoAccessor) share(realData *Workflow, delete bool, caller *t
func (a *workflowMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
// avoid the update if the schedule is the same
set = a.verifyResource(set)
if set.(*Workflow).Graph.Partial {
if set.(*Workflow).Graph != nil && set.(*Workflow).Graph.Partial {
return nil, 403, errors.New("you are not allowed to update a partial workflow")
}
res, code, err := utils.GenericUpdateOne(a.verifyResource(set), id, a, &Workflow{})
res, code, err := utils.GenericUpdateOne(set, id, a, &Workflow{})
if code != 200 {
return nil, code, err
}
workflow := res.(*Workflow)
a.execute(workflow, false, false) // update the workspace for the workflow
a.execute(workflow, false, true) // update the workspace for the workflow
a.share(workflow, false, a.GetCaller()) // share the update to the peers
return res, code, nil
}
@ -109,7 +109,7 @@ func (a *workflowMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.
func (a *workflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
data = a.verifyResource(data)
d := data.(*Workflow)
if d.Graph.Partial {
if d.Graph != nil && d.Graph.Partial {
return nil, 403, errors.New("you are not allowed to update a partial workflow")
}
res, code, err := utils.GenericStoreOne(d, a)
@ -119,12 +119,19 @@ func (a *workflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, i
workflow := res.(*Workflow)
a.share(workflow, false, a.GetCaller()) // share the creation to the peers
a.execute(workflow, false, false) // store the workspace for the workflow
a.execute(workflow, false, true) // store the workspace for the workflow
return res, code, nil
}
// CopyOne copies a workflow in the database
func (a *workflowMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
wf := data.(*Workflow)
for _, item := range wf.Graph.Items {
_, obj := item.GetResource()
if obj != nil {
obj.ClearEnv()
}
}
return utils.GenericStoreOne(data, a)
}
@ -207,10 +214,10 @@ func (a *workflowMongoAccessor) verifyResource(obj utils.DBObject) utils.DBObjec
} else if t == tools.DATA_RESOURCE {
access = resources.NewAccessor[*resources.DataResource](t, a.GetRequest(), func() utils.DBObject { return &resources.DataResource{} })
} else {
wf.Graph.Clear(item.Data.GetID())
wf.Graph.Clear(resource.GetID())
}
if error := utils.VerifyAccess(access, resource.GetID()); error != nil {
wf.Graph.Clear(item.Data.GetID())
wf.Graph.Clear(resource.GetID())
}
}
return wf

View File

@ -15,40 +15,41 @@ import (
)
/*
* WorkflowExecutions is a struct that represents a list of workflow executions
* WorkflowExecution is a struct that represents a list of workflow executions
* Warning: No user can write (del, post, put) a workflow execution, it is only used by the system
* workflows generate their own executions
*/
type WorkflowExecutions struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
ExecDate time.Time `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` // ExecDate is the execution date of the workflow, is required
EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` // EndDate is the end date of the workflow
State enum.BookingStatus `json:"state" bson:"state" default:"0"` // TEMPORARY TODO DEFAULT 1 -> 0 State is the state of the workflow
WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
type WorkflowExecution struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
PeerBookByGraph map[string]map[string][]string `json:"peer_book_by_graph,omitempty" bson:"peer_book_by_graph,omitempty"` // BookByResource is a map of the resource id and the list of the booking id
ExecutionsID string `json:"executions_id,omitempty" bson:"executions_id,omitempty"`
ExecDate time.Time `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` // ExecDate is the execution date of the workflow, is required
EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` // EndDate is the end date of the workflow
State enum.BookingStatus `json:"state" bson:"state" default:"0"` // TEMPORARY TODO DEFAULT 1 -> 0 State is the state of the workflow
WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
}
func (r *WorkflowExecutions) StoreDraftDefault() {
// r.IsDraft = true
func (r *WorkflowExecution) StoreDraftDefault() {
r.IsDraft = false // TODO: TEMPORARY
r.State = enum.SCHEDULED
}
func (r *WorkflowExecutions) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
if r.State != set.(*WorkflowExecutions).State {
return true, &WorkflowExecutions{State: set.(*WorkflowExecutions).State} // only state can be updated
func (r *WorkflowExecution) CanUpdate(set utils.DBObject) (bool, utils.DBObject) {
if r.State != set.(*WorkflowExecution).State {
return true, &WorkflowExecution{State: set.(*WorkflowExecution).State} // only state can be updated
}
return !r.IsDraft, set // only draft buying can be updated
}
func (r *WorkflowExecutions) CanDelete() bool {
func (r *WorkflowExecution) CanDelete() bool {
return r.IsDraft // only draft bookings can be deleted
}
func (wfa *WorkflowExecutions) Equals(we *WorkflowExecutions) bool {
func (wfa *WorkflowExecution) Equals(we *WorkflowExecution) bool {
return wfa.ExecDate.Equal(we.ExecDate) && wfa.WorkflowID == we.WorkflowID
}
func (ws *WorkflowExecutions) PurgeDraft(request *tools.APIRequest) error {
func (ws *WorkflowExecution) PurgeDraft(request *tools.APIRequest) error {
if ws.EndDate == nil {
// if no end... then Book like a savage
e := ws.ExecDate.Add(time.Hour)
@ -75,7 +76,7 @@ func (ws *WorkflowExecutions) PurgeDraft(request *tools.APIRequest) error {
}
// tool to transform the argo status to a state
func (wfa *WorkflowExecutions) ArgoStatusToState(status string) *WorkflowExecutions {
func (wfa *WorkflowExecution) ArgoStatusToState(status string) *WorkflowExecution {
status = strings.ToLower(status)
switch status {
case "succeeded": // Succeeded
@ -90,38 +91,54 @@ func (wfa *WorkflowExecutions) ArgoStatusToState(status string) *WorkflowExecuti
return wfa
}
func (r *WorkflowExecutions) GenerateID() {
r.UUID = uuid.New().String()
func (r *WorkflowExecution) GenerateID() {
if r.UUID == "" {
r.UUID = uuid.New().String()
}
}
func (d *WorkflowExecutions) GetName() string {
func (d *WorkflowExecution) GetName() string {
return d.UUID + "_" + d.ExecDate.String()
}
func (d *WorkflowExecutions) GetAccessor(request *tools.APIRequest) utils.Accessor {
func (d *WorkflowExecution) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request) // Create a new instance of the accessor
}
func (d *WorkflowExecutions) VerifyAuth(request *tools.APIRequest) bool {
func (d *WorkflowExecution) VerifyAuth(request *tools.APIRequest) bool {
return true
}
func (d *WorkflowExecutions) Book(wfID string, priceds map[tools.DataType][]pricing.PricedItemITF) []*booking.Booking {
booking := d.bookEach(wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE])
booking = append(booking, d.bookEach(wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...)
func (d *WorkflowExecution) Book(executionsID string, wfID string, priceds map[tools.DataType]map[string]pricing.PricedItemITF) []*booking.Booking {
booking := d.bookEach(executionsID, wfID, tools.STORAGE_RESOURCE, priceds[tools.STORAGE_RESOURCE])
booking = append(booking, d.bookEach(executionsID, wfID, tools.PROCESSING_RESOURCE, priceds[tools.PROCESSING_RESOURCE])...)
return booking
}
func (d *WorkflowExecutions) bookEach(wfID string, dt tools.DataType, priceds []pricing.PricedItemITF) []*booking.Booking {
func (d *WorkflowExecution) bookEach(executionsID string, wfID string, dt tools.DataType, priceds map[string]pricing.PricedItemITF) []*booking.Booking {
items := []*booking.Booking{}
for _, priced := range priceds {
for itemID, priced := range priceds {
if d.PeerBookByGraph == nil {
d.PeerBookByGraph = map[string]map[string][]string{}
}
if d.PeerBookByGraph[priced.GetCreatorID()] == nil {
d.PeerBookByGraph[priced.GetCreatorID()] = map[string][]string{}
}
if d.PeerBookByGraph[priced.GetCreatorID()][itemID] == nil {
d.PeerBookByGraph[priced.GetCreatorID()][itemID] = []string{}
}
start := d.ExecDate
if s := priced.GetLocationStart(); s != nil {
start = *s
}
end := start.Add(time.Duration(priced.GetExplicitDurationInS()) * time.Second)
bookingItem := &booking.Booking{
State: enum.DRAFT,
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(),
Name: d.GetName() + "_" + executionsID + "_" + wfID,
},
ExecutionsID: executionsID,
State: enum.SCHEDULED,
ResourceID: priced.GetID(),
ResourceType: dt,
DestPeerID: priced.GetCreatorID(),
@ -131,6 +148,8 @@ func (d *WorkflowExecutions) bookEach(wfID string, dt tools.DataType, priceds []
ExpectedEndDate: &end,
}
items = append(items, bookingItem)
d.PeerBookByGraph[priced.GetCreatorID()][itemID] = append(
d.PeerBookByGraph[priced.GetCreatorID()][itemID], bookingItem.GetID())
}
return items
}

View File

@ -43,11 +43,11 @@ func (wfa *workflowExecutionMongoAccessor) DeleteOne(id string) (utils.DBObject,
}
func (wfa *workflowExecutionMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
if set.(*WorkflowExecutions).State == 0 {
if set.(*WorkflowExecution).State == 0 {
return nil, 400, errors.New("state is required")
}
realSet := WorkflowExecutions{State: set.(*WorkflowExecutions).State}
return utils.GenericUpdateOne(&realSet, id, wfa, &WorkflowExecutions{})
realSet := WorkflowExecution{State: set.(*WorkflowExecution).State}
return utils.GenericUpdateOne(&realSet, id, wfa, &WorkflowExecution{})
}
func (wfa *workflowExecutionMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
@ -59,13 +59,15 @@ func (wfa *workflowExecutionMongoAccessor) CopyOne(data utils.DBObject) (utils.D
}
func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne[*WorkflowExecutions](id, func(d utils.DBObject) (utils.DBObject, int, error) {
if d.(*WorkflowExecutions).State == enum.DRAFT && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
return utils.GenericLoadOne[*WorkflowExecution](id, func(d utils.DBObject) (utils.DBObject, int, error) {
now := time.Now()
now = now.Add(time.Second * -60)
if d.(*WorkflowExecution).State == enum.DRAFT && !a.shallow && now.UTC().After(d.(*WorkflowExecution).ExecDate) {
utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request))
return nil, 404, errors.New("not found")
}
if d.(*WorkflowExecutions).State == enum.SCHEDULED && !a.shallow && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
d.(*WorkflowExecutions).State = enum.FORGOTTEN
if d.(*WorkflowExecution).State == enum.SCHEDULED && !a.shallow && now.UTC().After(d.(*WorkflowExecution).ExecDate) {
d.(*WorkflowExecution).State = enum.FORGOTTEN
utils.GenericRawUpdateOne(d, id, newShallowAccessor(a.Request))
}
return d, 200, nil
@ -73,21 +75,23 @@ func (a *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int
}
func (a *workflowExecutionMongoAccessor) LoadAll(isDraft bool) ([]utils.ShallowDBObject, int, error) {
return utils.GenericLoadAll[*WorkflowExecutions](a.getExec(), isDraft, a)
return utils.GenericLoadAll[*WorkflowExecution](a.getExec(), isDraft, a)
}
func (a *workflowExecutionMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) {
return utils.GenericSearch[*WorkflowExecutions](filters, search, a.GetExecFilters(search), a.getExec(), isDraft, a)
return utils.GenericSearch[*WorkflowExecution](filters, search, a.GetExecFilters(search), a.getExec(), isDraft, a)
}
func (a *workflowExecutionMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject {
return func(d utils.DBObject) utils.ShallowDBObject {
if d.(*WorkflowExecutions).State == enum.DRAFT && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
now := time.Now()
now = now.Add(time.Second * -60)
if d.(*WorkflowExecution).State == enum.DRAFT && now.UTC().After(d.(*WorkflowExecution).ExecDate) {
utils.GenericDeleteOne(d.GetID(), newShallowAccessor(a.Request))
return nil
}
if d.(*WorkflowExecutions).State == enum.SCHEDULED && time.Now().UTC().After(d.(*WorkflowExecutions).ExecDate) {
d.(*WorkflowExecutions).State = enum.FORGOTTEN
if d.(*WorkflowExecution).State == enum.SCHEDULED && now.UTC().After(d.(*WorkflowExecution).ExecDate) {
d.(*WorkflowExecution).State = enum.FORGOTTEN
utils.GenericRawUpdateOne(d, d.GetID(), newShallowAccessor(a.Request))
return d
}

View File

@ -6,11 +6,13 @@ import (
"strings"
"time"
"cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
"github.com/robfig/cron"
)
@ -20,14 +22,15 @@ import (
*/
// it's a flying object only use in a session time. It's not stored in the database
type WorkflowSchedule struct {
Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule
WorkflowExecutions []*WorkflowExecutions `json:"workflow_executions,omitempty"` // WorkflowExecutions is the list of executions of the workflow
Message string `json:"message,omitempty"` // Message is the message of the schedule
Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule
Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time
End *time.Time `json:"end,omitempty"` // End is the end time of the schedule, is required and must be greater than the Start time
DurationS float64 `json:"duration_s" default:"-1"` // End is the end time of the schedule
Cron string `json:"cron,omitempty"` // here the cron format : ss mm hh dd MM dw task
UUID string `json:"id" validate:"required"` // ExecutionsID is the list of the executions id of the workflow
Workflow *workflow.Workflow `json:"workflow,omitempty"` // Workflow is the workflow dependancy of the schedule
WorkflowExecution []*WorkflowExecution `json:"workflow_executions,omitempty"` // WorkflowExecution is the list of executions of the workflow
Message string `json:"message,omitempty"` // Message is the message of the schedule
Warning string `json:"warning,omitempty"` // Warning is the warning message of the schedule
Start time.Time `json:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time
End *time.Time `json:"end,omitempty"` // End is the end time of the schedule, is required and must be greater than the Start time
DurationS float64 `json:"duration_s" default:"-1"` // End is the end time of the schedule
Cron string `json:"cron,omitempty"` // here the cron format : ss mm hh dd MM dw task
}
func NewScheduler(start string, end string, durationInS float64, cron string) *WorkflowSchedule {
@ -36,6 +39,7 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W
return nil
}
ws := &WorkflowSchedule{
UUID: uuid.New().String(),
Start: s,
DurationS: durationInS,
Cron: cron,
@ -47,19 +51,19 @@ func NewScheduler(start string, end string, durationInS float64, cron string) *W
return ws
}
func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecutions, error) {
func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest) (bool, *workflow.Workflow, []*WorkflowExecution, []*booking.Booking, error) {
if request.Caller == nil && request.Caller.URLS == nil && request.Caller.URLS[tools.BOOKING] == nil || request.Caller.URLS[tools.BOOKING][tools.GET] == "" {
return false, nil, []*WorkflowExecutions{}, errors.New("no caller defined")
return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("no caller defined")
}
access := workflow.NewAccessor(request)
res, code, err := access.LoadOne(wfID)
if code != 200 {
return false, nil, []*WorkflowExecutions{}, errors.New("could not load the workflow with id: " + err.Error())
return false, nil, []*WorkflowExecution{}, []*booking.Booking{}, errors.New("could not load the workflow with id: " + err.Error())
}
wf := res.(*workflow.Workflow)
longest, priceds, wf, err := wf.Planify(ws.Start, ws.End, request)
if err != nil {
return false, wf, []*WorkflowExecutions{}, err
return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err
}
ws.DurationS = longest
ws.Message = "We estimate that the workflow will start at " + ws.Start.String() + " and last " + fmt.Sprintf("%v", ws.DurationS) + " seconds."
@ -68,10 +72,11 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
}
execs, err := ws.getExecutions(wf)
if err != nil {
return false, wf, []*WorkflowExecutions{}, err
return false, wf, []*WorkflowExecution{}, []*booking.Booking{}, err
}
bookings := []*booking.Booking{}
for _, exec := range execs {
bookings := exec.Book(wfID, priceds)
bookings = append(bookings, exec.Book(ws.UUID, wfID, priceds)...)
for _, b := range bookings {
meth := request.Caller.URLS[tools.BOOKING][tools.GET]
meth = strings.ReplaceAll(meth, ":id", b.ResourceID)
@ -80,44 +85,50 @@ func (ws *WorkflowSchedule) CheckBooking(wfID string, request *tools.APIRequest)
request.Caller.URLS[tools.BOOKING][tools.GET] = meth
_, err := (&peer.Peer{}).LaunchPeerExecution(b.DestPeerID, b.ResourceID, tools.BOOKING, tools.GET, nil, request.Caller)
if err != nil {
return false, wf, execs, err
return false, wf, execs, bookings, err
}
}
}
return true, wf, execs, nil
return true, wf, execs, bookings, nil
}
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*workflow.Workflow, []*WorkflowExecutions, error) {
func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*WorkflowSchedule, *workflow.Workflow, []*WorkflowExecution, error) {
if request == nil {
return nil, []*WorkflowExecutions{}, errors.New("no request found")
return ws, nil, []*WorkflowExecution{}, errors.New("no request found")
}
c := request.Caller
if c == nil || c.URLS == nil || c.URLS[tools.BOOKING] == nil {
return nil, []*WorkflowExecutions{}, errors.New("no caller defined")
return ws, nil, []*WorkflowExecution{}, errors.New("no caller defined")
}
methods := c.URLS[tools.BOOKING]
if _, ok := methods[tools.GET]; !ok {
return nil, []*WorkflowExecutions{}, errors.New("no path found")
return ws, nil, []*WorkflowExecution{}, errors.New("no path found")
}
ok, wf, executions, err := ws.CheckBooking(wfID, request)
ok, wf, executions, bookings, err := ws.CheckBooking(wfID, request)
ws.WorkflowExecution = executions
if !ok || err != nil {
return nil, []*WorkflowExecutions{}, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
return ws, nil, executions, errors.New("could not book the workflow : " + fmt.Sprintf("%v", err))
}
ws.Workflow = wf
ws.WorkflowExecutions = executions
for _, booking := range bookings {
_, err := (&peer.Peer{}).LaunchPeerExecution(booking.DestPeerID, "",
tools.BOOKING, tools.POST, booking.Serialize(booking), request.Caller)
if err != nil {
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
}
}
fmt.Println("Schedules")
for _, exec := range executions {
err := exec.PurgeDraft(request)
if err != nil {
return nil, []*WorkflowExecutions{}, errors.New("purge draft" + fmt.Sprintf("%v", err))
return ws, nil, []*WorkflowExecution{}, errors.New("purge draft" + fmt.Sprintf("%v", err))
}
exec.GenerateID()
exec.StoreDraftDefault()
// Should DELETE the previous execution2
utils.GenericStoreOne(exec, NewAccessor(request))
}
return wf, executions, nil
fmt.Println("Schedules")
return ws, wf, executions, nil
}
/*
@ -132,21 +143,23 @@ VERIFY THAT WE HANDLE DIFFERENCE BETWEEN LOCATION TIME && BOOKING
* getExecutions is a function that returns the executions of a workflow
* it returns an array of workflow_execution.WorkflowExecution
*/
func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*WorkflowExecutions, error) {
workflows_executions := []*WorkflowExecutions{}
func (ws *WorkflowSchedule) getExecutions(workflow *workflow.Workflow) ([]*WorkflowExecution, error) {
workflows_executions := []*WorkflowExecution{}
dates, err := ws.getDates()
if err != nil {
return workflows_executions, err
}
for _, date := range dates {
obj := &WorkflowExecutions{
obj := &WorkflowExecution{
AbstractObject: utils.AbstractObject{
UUID: uuid.New().String(), // set the uuid of the execution
Name: workflow.Name + "_execution_" + date.Start.String(), // set the name of the execution
},
ExecDate: date.Start, // set the execution date
EndDate: date.End, // set the end date
State: enum.DRAFT, // set the state to 1 (scheduled)
WorkflowID: workflow.GetID(), // set the workflow id dependancy of the execution
ExecutionsID: ws.UUID,
ExecDate: date.Start, // set the execution date
EndDate: date.End, // set the end date
State: enum.DRAFT, // set the state to 1 (scheduled)
WorkflowID: workflow.GetID(), // set the workflow id dependancy of the execution
}
workflows_executions = append(workflows_executions, obj)
}

View File

@ -72,9 +72,11 @@ func (a *workspaceMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils
func (a *workspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
filters := &dbs.Filters{
Or: map[string][]dbs.Filter{
"abstractobject.name": {{Operator: dbs.LIKE.String(), Value: data.GetName() + "_workspace"}},
"abstractobject.name": {{Operator: dbs.LIKE.String(), Value: data.GetName() + "_workspace"}},
"abstractobject.creator_id": {{Operator: dbs.EQUAL.String(), Value: a.GetPeerID()}},
},
}
// filters *dbs.Filters, word string, isDraft bool
res, _, err := a.Search(filters, "", true) // Search for the workspace
if err == nil && len(res) > 0 { // If the workspace already exists, return an error
return nil, 409, errors.New("a workspace with the same name already exists")

View File

@ -115,8 +115,8 @@ func (a *API) SubscribeRouter(infos []*beego.ControllerInfo) {
// CheckRemotePeer checks the state of a remote peer
func (a *API) CheckRemotePeer(url string) (State, map[string]int) {
// Check if the database is up
caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller
var resp APIStatusResponse
caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller
b, err := caller.CallPost(url, "", map[string]interface{}{}) // Call the status endpoint of the peer
if err != nil {
return DEAD, map[string]int{} // If the peer is not reachable, return dead

View File

@ -20,7 +20,12 @@ const (
WORKFLOW_HISTORY
WORKSPACE_HISTORY
ORDER
BUYING_STATUS
PURCHASE_RESOURCE
ADMIRALTY_SOURCE
ADMIRALTY_TARGET
ADMIRALTY_SECRET
ADMIRALTY_KUBECONFIG
ADMIRALTY_NODES
)
var NOAPI = ""
@ -30,6 +35,11 @@ var WORKFLOWAPI = "oc-workflow"
var WORKSPACEAPI = "oc-workspace"
var PEERSAPI = "oc-peer"
var DATACENTERAPI = "oc-datacenter"
var ADMIRALTY_SOURCEAPI = DATACENTERAPI+"/admiralty/source"
var ADMIRALTY_TARGETAPI = DATACENTERAPI+"/admiralty/target"
var ADMIRALTY_SECRETAPI = DATACENTERAPI+"/admiralty/secret"
var ADMIRALTY_KUBECONFIGAPI = DATACENTERAPI+"/admiralty/kubeconfig"
var ADMIRALTY_NODESAPI = DATACENTERAPI+"/admiralty/node"
// Bind the standard API name to the data type
var DefaultAPI = [...]string{
@ -50,6 +60,11 @@ var DefaultAPI = [...]string{
NOAPI,
NOAPI,
NOAPI,
ADMIRALTY_SOURCEAPI,
ADMIRALTY_TARGETAPI,
ADMIRALTY_SECRETAPI,
ADMIRALTY_KUBECONFIGAPI,
ADMIRALTY_NODESAPI,
}
// Bind the standard data name to the data type
@ -70,7 +85,12 @@ var Str = [...]string{
"workflow_history",
"workspace_history",
"order",
"buying_status",
"purchase_resource",
"admiralty_source",
"admiralty_target",
"admiralty_secret",
"admiralty_kubeconfig",
"admiralty_node",
}
func FromInt(i int) string {
@ -91,5 +111,5 @@ func (d DataType) EnumIndex() int {
}
func DataTypeList() []DataType {
return []DataType{DATA_RESOURCE, PROCESSING_RESOURCE, STORAGE_RESOURCE, COMPUTE_RESOURCE, WORKFLOW_RESOURCE, WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY, ORDER, BUYING_STATUS}
return []DataType{DATA_RESOURCE, PROCESSING_RESOURCE, STORAGE_RESOURCE, COMPUTE_RESOURCE, WORKFLOW_RESOURCE, WORKFLOW, WORKFLOW_EXECUTION, WORKSPACE, PEER, COLLABORATIVE_AREA, RULE, BOOKING, WORKFLOW_HISTORY, WORKSPACE_HISTORY, ORDER, PURCHASE_RESOURCE,ADMIRALTY_SOURCE,ADMIRALTY_TARGET,ADMIRALTY_SECRET,ADMIRALTY_KUBECONFIG,ADMIRALTY_NODES}
}

View File

@ -3,6 +3,7 @@ package tools
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
@ -49,8 +50,9 @@ func ToMethod(str string) METHOD {
var HTTPCallerInstance = &HTTPCaller{} // Singleton instance of the HTTPCaller
type HTTPCaller struct {
URLS map[DataType]map[METHOD]string // Map of the different methods and their urls
Disabled bool // Disabled flag
URLS map[DataType]map[METHOD]string // Map of the different methods and their urls
Disabled bool // Disabled flag
LastResults map[string]interface{} // Used to store information regarding the last execution of a given method on a given data type
}
// NewHTTPCaller creates a new instance of the HTTP Caller
@ -76,17 +78,33 @@ func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) (
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
err = caller.StoreResp(resp)
if err != nil {
return nil, err
}
return caller.LastResults["body"].([]byte), nil
}
// CallPut calls the DELETE method on the HTTP server
func (caller *HTTPCaller) CallDelete(url string, subpath string) ([]byte, error) {
resp, err := http.NewRequest("DELETE", url+subpath, nil)
if err != nil || resp == nil || resp.Body == nil {
req, err := http.NewRequest("DELETE", url+subpath, nil)
if err != nil {
return nil, err
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil || req == nil || req.Body == nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
err = caller.StoreResp(resp)
if err != nil {
return nil, err
}
return caller.LastResults["body"].([]byte), nil
}
// CallPost calls the POST method on the HTTP server
@ -105,7 +123,12 @@ func (caller *HTTPCaller) CallPost(url string, subpath string, body interface{},
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
err = caller.StoreResp(resp)
if err != nil {
return nil, err
}
return caller.LastResults["body"].([]byte), nil
}
// CallPost calls the POST method on the HTTP server
@ -123,7 +146,12 @@ func (caller *HTTPCaller) CallPut(url string, subpath string, body map[string]in
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
err = caller.StoreResp(resp)
if err != nil {
return nil, err
}
return caller.LastResults["body"].([]byte), nil
}
// CallRaw calls the Raw method on the HTTP server
@ -143,7 +171,12 @@ func (caller *HTTPCaller) CallRaw(method string, url string, subpath string,
req.AddCookie(c)
}
client := &http.Client{}
return client.Do(req)
resp, err := client.Do(req)
if err != nil {
return nil, err
}
return resp, nil
}
// CallRaw calls the Raw method on the HTTP server
@ -163,3 +196,17 @@ func (caller *HTTPCaller) CallForm(method string, url string, subpath string,
client := &http.Client{}
return client.Do(req)
}
func (caller *HTTPCaller) StoreResp(resp *http.Response) error {
caller.LastResults = make(map[string]interface{})
caller.LastResults["header"] = resp.Header
caller.LastResults["code"] = resp.StatusCode
data, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println("Error reading the body of the last request")
return err
}
caller.LastResults["body"] = data
return nil
}