96 Commits

Author SHA1 Message Date
plm
fd1c579ec4 Removing required field on PeerId, see #7 2025-01-10 17:39:58 +01:00
plm
0f4adeea86 Same prefix for all builtin microservices in opencloud 2025-01-08 16:55:42 +01:00
plm
21d08204b5 Fixing env based layer; not using onion builtin mechanism to preserve opencloud conf key/value format 2024-12-16 09:17:54 +01:00
plm
1de4888599 Remove extra underscore character; it breaks the env var loading 2024-12-10 14:01:47 +01:00
mr
15ca06aba8 test 2024-11-21 08:45:03 +01:00
mr
e9b3a65a0e test 2024-11-21 08:15:46 +01:00
mr
d8fac883d2 test 2024-11-21 07:51:59 +01:00
mr
ac49d3324d test 2024-11-20 17:05:21 +01:00
mr
3b77c0da83 test 2024-11-20 16:38:07 +01:00
mr
57f18b2244 test 2024-11-20 16:08:54 +01:00
mr
b49685aa82 tools 2024-11-20 10:39:20 +01:00
mr
08e9ee67fe discovery is bugged 2024-11-20 09:53:09 +01:00
mr
94803f820a hard stuff 2024-11-19 14:28:23 +01:00
mr
898700d127 hard stuff 2024-11-19 14:03:41 +01:00
mr
cfcad2343d hard stuff 2024-11-19 13:41:14 +01:00
mr
aab0d54dfa hard stuff 2024-11-19 13:28:01 +01:00
mr
603641a589 hard stuff 2024-11-19 12:01:14 +01:00
mr
35d90f9c4e hard stuff 2024-11-19 12:00:10 +01:00
mr
3b8079ca87 hard stuff 2024-11-19 10:08:46 +01:00
mr
902ad0ea71 multiple booking for one 2024-11-18 15:49:40 +01:00
mr
9a8625f8b4 no need of a required url base 64 2024-11-15 09:07:52 +01:00
mr
c24f2f26c4 state execution^C 2024-11-14 11:39:36 +01:00
mr
14fe694fd3 state execution 2024-11-14 10:02:18 +01:00
mr
06c3af5d4d state execution 2024-11-12 15:10:26 +01:00
mr
8a21a7c803 state execution 2024-11-12 14:44:41 +01:00
mr
6497e7dbdd state execution 2024-11-12 13:36:18 +01:00
mr
b5dba2458a state execution 2024-11-12 12:45:03 +01:00
mr
b2a6ac19cb state execution 2024-11-12 12:17:30 +01:00
mr
d6dba8e1f1 state execution 2024-11-12 11:41:57 +01:00
mr
bc79d54284 set up 2024-11-12 11:01:47 +01:00
mr
7fd44a55cb test 2024-11-08 11:44:23 +01:00
mr
f3df1e42b9 debug 2024-11-07 13:25:26 +01:00
mr
4c0c75be91 debug 2024-11-07 12:46:00 +01:00
mr
e6ca520a88 debug 2024-11-07 12:39:28 +01:00
mr
2d249f38ff rebrand oc-datacenter in oc-compute 2024-11-07 11:36:31 +01:00
mr
4a17d0c07d rebrand oc-datacenter in oc-compute 2024-11-07 11:11:33 +01:00
mr
1d6b9db5f9 rebrand oc-datacenter in oc-compute 2024-11-07 11:05:24 +01:00
mr
d249bcdf94 tooling 2024-11-06 16:09:56 +01:00
mr
66413e15bb tooling 2024-11-06 15:42:06 +01:00
mr
828efb4c98 test 2024-11-06 15:32:55 +01:00
mr
06e048165e enums 2024-11-06 10:36:58 +01:00
mr
c47ba64d49 go 2024-11-06 10:04:08 +01:00
mr
e4860ff67e zblah 2024-11-06 09:41:46 +01:00
mr
5f05b73366 neo local peer draft 2024-10-30 11:58:14 +01:00
mr
20ce1f5ef3 neo local peer draft 2024-10-30 11:19:41 +01:00
mr
4de43a301c neo local peer draft 2024-10-30 11:17:52 +01:00
mr
1a55212378 peeeers 2024-10-30 10:16:13 +01:00
mr
c34e5579fc test 2024-10-25 14:58:32 +02:00
mr
cc8fc2df21 method 2024-10-23 11:22:34 +02:00
mr
cd902c6688 method 2024-10-23 10:51:15 +02:00
mr
84deb17e37 method 2024-10-23 09:55:08 +02:00
mr
158d3aacc8 method 2024-10-23 09:07:36 +02:00
mr
55a25aba83 method 2024-10-22 14:27:52 +02:00
mr
719fa239e0 method 2024-10-22 14:23:39 +02:00
mr
72d5c64c2d get with custom types 2024-10-22 09:28:18 +02:00
mr
e45fefe883 get with custom types 2024-10-21 11:39:52 +02:00
mr
8e82b87fb3 init api in oc-lib because same on every api 2024-10-21 09:48:05 +02:00
mr
59a1b52242 nats discovery 2024-10-18 08:51:12 +02:00
mr
3c54f3d39e nats discovery 2024-10-17 17:04:09 +02:00
mr
56f81fb30e nats discovery 2024-10-17 16:51:37 +02:00
mr
73fce1d8fb nats discovery 2024-10-17 16:05:00 +02:00
mr
3e7f8513eb nats discovery 2024-10-17 16:04:07 +02:00
mr
1561d0c81e nats discovery 2024-10-17 14:16:18 +02:00
mr
847ef2e95c nats discovery 2024-10-17 14:14:04 +02:00
mr
dbcd9cf004 nats discovery 2024-10-17 13:53:57 +02:00
mr
9654d59fc0 exclude dest 2024-10-16 17:15:21 +02:00
mr
b432266486 callPut 2024-10-16 13:50:09 +02:00
mr
73602b6c3d add peer state 2024-10-15 16:37:18 +02:00
mr
9f5e6d6018 include oc in racine path for remote calling 2024-10-15 10:35:38 +02:00
mr
62705cc9b9 include oc in racine path for remote calling 2024-10-15 10:23:07 +02:00
mr
69fe3f8d76 utils func for storage 2024-10-10 09:40:19 +02:00
mr
556d711ab6 utils func for storage 2024-10-10 09:38:27 +02:00
mr
17749c6c0b resource are lightest enough 2024-10-10 08:55:22 +02:00
mr
84024a143e resource are lightest enough 2024-10-09 13:19:31 +02:00
mr
ae49c40ea5 add usable variable in refs 2024-10-09 09:29:16 +02:00
mr
b90ffbc4f0 add a web resource concept for data + storage 2024-10-09 09:05:49 +02:00
mr
ba6ac86bff url format improved 2024-10-07 17:38:21 +02:00
mr
5d9c922b26 url format improved 2024-10-07 16:47:03 +02:00
mr
208f69ae7e url format improved 2024-10-07 16:02:03 +02:00
mr
3c14c64f4a url format improved 2024-10-07 16:01:11 +02:00
mr
3bbde50f96 url format improved 2024-10-07 15:12:44 +02:00
mr
65668f5bee url format improved 2024-10-07 15:07:06 +02:00
mr
03e066c297 missing datacenter resource 2024-10-07 14:39:02 +02:00
mr
e69d5b3351 missing datacenter resource 2024-10-07 14:07:05 +02:00
mr
a39f539426 missing datacenter resource 2024-10-07 12:33:46 +02:00
mr
6724ff38fe missing datacenter resource 2024-10-07 12:02:33 +02:00
mr
c09994fd84 missing datacenter resource 2024-10-07 11:37:37 +02:00
mr
1d0b06ac4a missing datacenter resource 2024-10-07 11:06:12 +02:00
mr
7b2da3ba94 missing datacenter resource 2024-10-07 10:20:21 +02:00
mr
c5bdc96542 debug model collection add history 2024-10-07 09:54:14 +02:00
mr
e28b87ce7b debug model collection add history 2024-10-07 09:32:04 +02:00
mr
0efb852839 debug model collection add history 2024-10-07 09:08:53 +02:00
mr
6dfd6058ef test 2024-10-07 08:44:34 +02:00
mr
c083ce748c add workflow func tools 2024-10-04 10:42:30 +02:00
mr
3388fcc6f3 add workflow func tools 2024-10-03 17:25:54 +02:00
mr
91128313fc add workflow func tools 2024-10-03 17:13:50 +02:00
42 changed files with 742 additions and 362 deletions

View File

@@ -26,12 +26,12 @@ import (
func GetConfLoader() *onion.Onion {
logger := zerolog.New(os.Stdout).With().Timestamp().Logger()
AppName := GetAppName()
EnvPrefix := strings.ToUpper(AppName[0:2]+AppName[3:]) + "_"
EnvPrefix := "OC_"
defaultConfigFile := "/etc/oc/" + AppName[3:] + ".json"
localConfigFile := "./" + AppName[3:] + ".json"
var configFile string
var o *onion.Onion
l3 := onion.NewEnvLayerPrefix("_", EnvPrefix)
l3 := GetEnvVarLayer(EnvPrefix)
l2, err := onion.NewFileLayer(localConfigFile, nil)
if err == nil {
logger.Info().Msg("Local config file found " + localConfigFile + ", overriding default file")
@@ -54,3 +54,17 @@ func GetConfLoader() *onion.Onion {
}
return o
}
func GetEnvVarLayer(prefix string) onion.Layer {
envVars := make(map[string]interface{})
for _, e := range os.Environ() {
pair := strings.SplitN(e, "=", 2)
key := pair[0]
if strings.HasPrefix(key, prefix) {
envVars[strings.TrimPrefix(key, prefix)] = pair[1]
}
}
return onion.NewMapLayer(envVars)
}

View File

@@ -8,7 +8,6 @@ import (
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/static"
"github.com/rs/zerolog"
"go.mongodb.org/mongo-driver/bson"
@@ -132,10 +131,6 @@ func (m *MongoDB) prepareDB(list_collection []string, config MongoConf) {
new_collection := mngoDB.Collection(collection_name)
if _, exists := collectionMap[collection_name]; !exists {
m.createCollection(collection_name, new_collection)
if collection_name == "peer" {
id, p := static.GetMyLocalBsonPeer()
m.StoreOne(p, id, collection_name)
}
} else {
CollectionMap[collection_name] = new_collection
}
@@ -275,7 +270,6 @@ func (m *MongoDB) LoadOne(id string, collection_name string) (*mongo.SingleResul
}
filter := bson.M{"_id": id}
targetDBCollection := CollectionMap[collection_name]
MngoCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

View File

@@ -19,7 +19,7 @@ func init() {
{Key: "example", Value: "text"}},
})
IndexesMap["datacenter"] = append(IndexesMap["datacenter"], mongo.IndexModel{Keys: bson.D{
IndexesMap["compute"] = append(IndexesMap["compute"], mongo.IndexModel{Keys: bson.D{
{Key: "description", Value: "text"},
{Key: "example", Value: "text"},
{Key: "owner", Value: "text"}},

View File

@@ -16,8 +16,8 @@ import (
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/rules/rule"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resource_model"
"cloud.o-forge.io/core/oc-lib/models/resources/compute"
"cloud.o-forge.io/core/oc-lib/models/resources/data"
"cloud.o-forge.io/core/oc-lib/models/resources/datacenter"
"cloud.o-forge.io/core/oc-lib/models/resources/processing"
"cloud.o-forge.io/core/oc-lib/models/resources/storage"
w "cloud.o-forge.io/core/oc-lib/models/resources/workflow"
@@ -26,6 +26,7 @@ import (
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/models/workspace"
"cloud.o-forge.io/core/oc-lib/tools"
beego "github.com/beego/beego/v2/server/web"
"github.com/beego/beego/v2/server/web/context"
"github.com/goraz/onion"
"github.com/rs/zerolog"
@@ -41,7 +42,7 @@ const (
DATA_RESOURCE = tools.DATA_RESOURCE
PROCESSING_RESOURCE = tools.PROCESSING_RESOURCE
STORAGE_RESOURCE = tools.STORAGE_RESOURCE
DATACENTER_RESOURCE = tools.DATACENTER_RESOURCE
COMPUTE_RESOURCE = tools.COMPUTE_RESOURCE
WORKFLOW_RESOURCE = tools.WORKFLOW_RESOURCE
WORKFLOW = tools.WORKFLOW
WORKSPACE = tools.WORKSPACE
@@ -95,11 +96,32 @@ type LibData struct {
Err string `bson:"error" json:"error"`
}
func Init(appName string) {
func InitDaemon(appName string) {
config.SetAppName(appName) // set the app name to the logger to define the main log chan
// create a temporary console logger for init
logs.SetLogger(logs.CreateLogger("main"))
// Load the right config file
o := GetConfLoader()
// feed the library with the loaded config
SetConfig(
o.GetStringDefault("MONGO_URL", "mongodb://127.0.0.1:27017"),
o.GetStringDefault("MONGO_DATABASE", "DC_myDC"),
o.GetStringDefault("NATS_URL", "nats://localhost:4222"),
o.GetStringDefault("LOKI_URL", ""),
o.GetStringDefault("LOG_LEVEL", "info"),
)
// Beego init
beego.BConfig.AppName = appName
beego.BConfig.Listen.HTTPPort = o.GetIntDefault("port", 8080)
beego.BConfig.WebConfig.DirectoryIndex = true
beego.BConfig.WebConfig.StaticDir["/swagger"] = "swagger"
}
func Init(appName string) {
InitDaemon(appName)
api := &tools.API{}
api.Discovered(beego.BeeApp.Handlers.GetAllControllerInfo())
}
//
@@ -137,12 +159,16 @@ func SetConfig(mongoUrl string, database string, natsUrl string, lokiUrl string,
Resource model is the model that will define the structure of the resources
*/
accessor := (&resource_model.ResourceModel{}).GetAccessor(nil)
for _, model := range []string{tools.DATA_RESOURCE.String(), tools.PROCESSING_RESOURCE.String(), tools.STORAGE_RESOURCE.String(), tools.DATACENTER_RESOURCE.String(), tools.WORKFLOW_RESOURCE.String()} {
for _, model := range []string{tools.DATA_RESOURCE.String(), tools.PROCESSING_RESOURCE.String(), tools.STORAGE_RESOURCE.String(), tools.COMPUTE_RESOURCE.String(), tools.WORKFLOW_RESOURCE.String()} {
data, code, _ := accessor.Search(nil, model)
if code == 404 || len(data) == 0 {
refs := map[string]string{}
m := map[string]resource_model.Model{}
// TODO Specify the model for each resource
// for now only processing is specified here (not an elegant way)
if model == tools.DATA_RESOURCE.String() || model == tools.STORAGE_RESOURCE.String() {
refs["path"] = "string"
}
if model == tools.PROCESSING_RESOURCE.String() {
m["command"] = resource_model.Model{
Type: "string",
@@ -163,6 +189,7 @@ func SetConfig(mongoUrl string, database string, natsUrl string, lokiUrl string,
}
accessor.StoreOne(&resource_model.ResourceModel{
ResourceType: model,
VarRefs: refs,
Model: map[string]map[string]resource_model.Model{
"container": m,
},
@@ -400,9 +427,9 @@ func (l *LibData) ToDataResource() *data.DataResource {
return nil
}
func (l *LibData) ToDatacenterResource() *datacenter.DatacenterResource {
if l.Data != nil && l.Data.GetAccessor(nil).GetType() == tools.DATACENTER_RESOURCE.String() {
return l.Data.(*datacenter.DatacenterResource)
func (l *LibData) ToComputeResource() *compute.ComputeResource {
if l.Data != nil && l.Data.GetAccessor(nil).GetType() == tools.COMPUTE_RESOURCE.String() {
return l.Data.(*compute.ComputeResource)
}
return nil
}

14
go.mod
View File

@@ -3,6 +3,7 @@ module cloud.o-forge.io/core/oc-lib
go 1.22.0
require (
github.com/beego/beego/v2 v2.3.1
github.com/go-playground/validator/v10 v10.22.0
github.com/google/uuid v1.6.0
github.com/goraz/onion v0.1.3
@@ -25,17 +26,25 @@ require (
)
require (
github.com/beego/beego/v2 v2.3.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gabriel-vasile/mimetype v1.4.4 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/kr/text v0.1.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
@@ -45,6 +54,5 @@ require (
golang.org/x/sync v0.7.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

30
go.sum
View File

@@ -1,12 +1,18 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/beego/beego/v2 v2.3.1 h1:7MUKMpJYzOXtCUsTEoXOxsDV/UcHw6CPbaWMlthVNsc=
github.com/beego/beego/v2 v2.3.1/go.mod h1:5cqHsOHJIxkq44tBpRvtDe59GuVRVv/9/tyVDxd5ce4=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/coreos/etcd v3.3.17+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/elazarl/go-bindata-assetfs v1.0.1 h1:m0kkaHRKEu7tUIUFVwhGGGYClXvyl4RE03qmvRTNfbw=
github.com/elazarl/go-bindata-assetfs v1.0.1/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
github.com/etcd-io/etcd v3.3.17+incompatible/go.mod h1:cdZ77EstHBwVtD6iTgzgvogwcjo9m4iOqoijouPJ4bs=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gabriel-vasile/mimetype v1.4.4 h1:QjV6pZ7/XZ7ryI2KuyeEDE8wnh7fHP9YnQy+R0LnH8I=
@@ -31,12 +37,16 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/goraz/onion v0.1.3 h1:KhyvbDA2b70gcz/d5izfwTiOH8SmrvV43AsVzpng3n0=
github.com/goraz/onion v0.1.3/go.mod h1:XEmz1XoBz+wxTgWB8NwuvRm4RAu3vKxvrmYtzK+XCuQ=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
@@ -50,6 +60,8 @@ github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
@@ -62,18 +74,28 @@ github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/ogier/pflag v0.0.1/go.mod h1:zkFki7tvTa0tafRvTBIZTvzYyAu6kQhPZFnshFFPE+g=
github.com/pelletier/go-toml v1.6.0/go.mod h1:5N711Q9dKgbdkxHL+MEfF31hpT7l0S0s/t2kKREewys=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE=
github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18 h1:DAYUYH5869yV94zvCES9F51oYtN5oGlwjxJJz7ZCnik=
github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18/go.mod h1:nkxAfR/5quYxwPZhyDxgasBMnRtBZd0FCEpawpjMUFg=
github.com/skarademir/naturalsort v0.0.0-20150715044055-69a5d87bef62/go.mod h1:oIdVclZaltY1Nf7OQUkg1/2jImBJ+ZfKZuDIRSwk3p0=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
@@ -139,8 +161,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

View File

@@ -17,10 +17,10 @@ import (
*/
type Booking struct {
workflow_execution.WorkflowExecution // WorkflowExecution contains the workflow execution data
DatacenterResourceID string `json:"datacenter_resource_id,omitempty" bson:"datacenter_resource_id,omitempty" validate:"required"` // DatacenterResourceID is the ID of the datacenter resource specified in the booking
ComputeResourceID string `json:"compute_resource_id,omitempty" bson:"compute_resource_id,omitempty" validate:"required"` // ComputeResourceID is the ID of the compute resource specified in the booking
}
// CheckBooking checks if a booking is possible on a specific datacenter resource
// CheckBooking checks if a booking is possible on a specific compute resource
func (wfa *Booking) CheckBooking(id string, start time.Time, end *time.Time) (bool, error) {
// check if
if end == nil {
@@ -30,8 +30,8 @@ func (wfa *Booking) CheckBooking(id string, start time.Time, end *time.Time) (bo
e := *end
accessor := wfa.GetAccessor(nil)
res, code, err := accessor.Search(&dbs.Filters{
And: map[string][]dbs.Filter{ // check if there is a booking on the same datacenter resource by filtering on the datacenter_resource_id, the state and the execution date
"datacenter_resource_id": {{Operator: dbs.EQUAL.String(), Value: id}},
And: map[string][]dbs.Filter{ // check if there is a booking on the same compute resource by filtering on the compute_resource_id, the state and the execution date
"compute_resource_id": {{Operator: dbs.EQUAL.String(), Value: id}},
"workflowexecution.state": {{Operator: dbs.EQUAL.String(), Value: workflow_execution.SCHEDULED.EnumIndex()}},
"workflowexecution.execution_date": {
{Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(e)},
@@ -56,8 +56,10 @@ func (ao *Booking) GetID() string {
}
func (r *Booking) GenerateID() {
if r.UUID == "" {
r.UUID = uuid.New().String()
}
}
func (d *Booking) GetName() string {
return d.UUID + "_" + d.ExecDate.String()

View File

@@ -1,9 +1,12 @@
package booking
import (
"time"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/dbs/mongo"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
)
type bookingMongoAccessor struct {
@@ -42,10 +45,14 @@ func (wfa *bookingMongoAccessor) LoadOne(id string) (utils.DBObject, int, error)
return nil, code, err
}
res_mongo.Decode(&workflow)
if workflow.State == workflow_execution.SCHEDULED && time.Now().UTC().After(*workflow.ExecDate) {
workflow.State = workflow_execution.FORGOTTEN
wfa.GenericRawUpdateOne(&workflow, id, wfa)
}
return &workflow, 200, nil
}
func (wfa bookingMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) {
func (wfa *bookingMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) {
objs := []utils.ShallowDBObject{}
res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType())
if err != nil {
@@ -57,6 +64,10 @@ func (wfa bookingMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error)
return nil, 404, err
}
for _, r := range results {
if r.State == workflow_execution.SCHEDULED && time.Now().UTC().After(*r.ExecDate) {
r.State = workflow_execution.FORGOTTEN
wfa.GenericRawUpdateOne(&r, r.UUID, wfa)
}
objs = append(objs, &r.AbstractObject) // Warning only AbstractObject is returned
}
return objs, 200, nil
@@ -82,6 +93,10 @@ func (wfa *bookingMongoAccessor) Search(filters *dbs.Filters, search string) ([]
return nil, 404, err
}
for _, r := range results {
if r.State == workflow_execution.SCHEDULED && time.Now().UTC().After(*r.ExecDate) {
r.State = workflow_execution.FORGOTTEN
wfa.GenericRawUpdateOne(&r, r.UUID, wfa)
}
objs = append(objs, &r)
}
return objs, 200, nil

View File

@@ -2,6 +2,7 @@ package collaborative_area
import (
"encoding/json"
"time"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/rules/rule"
"cloud.o-forge.io/core/oc-lib/models/peer"
@@ -12,6 +13,13 @@ import (
"github.com/google/uuid"
)
type CollaborativeAreaRule struct {
ShareMode string `json:"share_mode,omitempty" bson:"share_mode,omitempty"` // Share is the share of the rule
CreatedAt time.Time `json:"created_at,omitempty" bson:"created_at,omitempty"` // CreatedAt is the time the rule was created
Creator string `json:"creator,omitempty" bson:"creator,omitempty"` // Creator is the creator of the rule
ExploitedBy string `json:"exploited_by,omitempty" bson:"exploited_by,omitempty"` // ExploitedBy is the exploited by of the rule
}
// SharedWorkspace is a struct that represents a shared workspace
// WARNING : it got a shallow object version, this one is the full version
// full version is the one used by API
@@ -22,11 +30,12 @@ type CollaborativeArea struct {
CreatorID string `json:"peer_id,omitempty" bson:"peer_id,omitempty" validate:"required"` // CreatorID is the ID of the creator
Version string `json:"version,omitempty" bson:"version,omitempty"` // Version is the version of the workspace
Description string `json:"description,omitempty" bson:"description,omitempty" validate:"required"` // Description is the description of the workspace
CollaborativeAreaRule *CollaborativeAreaRule `json:"collaborative_area,omitempty" bson:"collaborative_area,omitempty"` // CollaborativeArea is the collaborative area of the workspace
Attributes map[string]interface{} `json:"attributes,omitempty" bson:"attributes,omitempty"` // Attributes is the attributes of the workspace (TODO)
Workspaces []string `json:"workspaces,omitempty" bson:"workspaces,omitempty"` // Workspaces is the workspaces of the workspace
Workflows []string `json:"workflows,omitempty" bson:"workflows,omitempty"` // Workflows is the workflows of the workspace
Peers []string `json:"peers,omitempty" bson:"peers,omitempty"` // Peers is the peers of the workspace
Rules []string `json:"rules,omitempty" bson:"rules,omitempty"` // Rules is the rules of the workspace
Workspaces []string `json:"workspaces" bson:"workspaces"` // Workspaces is the workspaces of the workspace
Workflows []string `json:"workflows" bson:"workflows"` // Workflows is the workflows of the workspace
Peers []string `json:"peers" bson:"peers"` // Peers is the peers of the workspace
Rules []string `json:"rules" bson:"rules,omitempty"` // Rules is the rules of the workspace
SharedRules []*rule.Rule `json:"shared_rules,omitempty" bson:"-"` // SharedRules is the shared rules of the workspace
SharedWorkspaces []*workspace.Workspace `json:"shared_workspaces,omitempty" bson:"-"` // SharedWorkspaces is the shared workspaces of the workspace

View File

@@ -1,8 +1,10 @@
package collaborative_area
import (
"errors"
"fmt"
"slices"
"time"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/dbs/mongo"
@@ -11,7 +13,6 @@ import (
"cloud.o-forge.io/core/oc-lib/models/utils"
w "cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workspace"
"cloud.o-forge.io/core/oc-lib/static"
"cloud.o-forge.io/core/oc-lib/tools"
)
@@ -155,7 +156,7 @@ func (wfa *collaborativeAreaMongoAccessor) deleteToPeer(shared *CollaborativeAre
}
paccess := (&peer.Peer{})
for _, v := range shared.Peers {
if (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: v}}).IsMySelf() {
if ok, _ := (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: v}}).IsMySelf(); ok {
continue
}
b, err := paccess.LaunchPeerExecution(v, shared.UUID, tools.COLLABORATIVE_AREA, tools.DELETE, nil, wfa.Caller)
@@ -173,7 +174,7 @@ func (wfa *collaborativeAreaMongoAccessor) sendToPeer(shared *CollaborativeArea)
paccess := (&peer.Peer{})
for _, v := range shared.Peers {
if (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: v}}).IsMySelf() || shared.IsSent {
if ok, _ := (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: v}}).IsMySelf(); ok || shared.IsSent {
continue
}
shared.IsSent = true
@@ -187,16 +188,17 @@ func (wfa *collaborativeAreaMongoAccessor) sendToPeer(shared *CollaborativeArea)
// UpdateOne updates a collaborative area in the database, given its ID and the new data, it automatically share to peers if the workspace is shared
func (wfa *collaborativeAreaMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
res, code, err := wfa.GenericUpdateOne(set.(*CollaborativeArea), id, wfa, &CollaborativeArea{})
wfa.deleteToPeer(res.(*CollaborativeArea)) // delete the collaborative area on the peer
fmt.Println("UpdateOne", set, res, code, err)
// wfa.deleteToPeer(res.(*CollaborativeArea)) // delete the collaborative area on the peer
wfa.sharedWorkflow(res.(*CollaborativeArea), id) // replace all shared workflows
wfa.sharedWorkspace(res.(*CollaborativeArea), id) // replace all collaborative areas (not shared worspace obj but workspace one)
wfa.sendToPeer(res.(*CollaborativeArea)) // send the collaborative area (collaborative area object) to the peers
// wfa.sendToPeer(res.(*CollaborativeArea)) // send the collaborative area (collaborative area object) to the peers
return res, code, err
}
// StoreOne stores a collaborative area in the database, it automatically share to peers if the workspace is shared
func (wfa *collaborativeAreaMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
id, _ := static.GetMyLocalJsonPeer() // get the local peer
_, id := (&peer.Peer{}).IsMySelf() // get the local peer
data.(*CollaborativeArea).CreatorID = id // set the creator id
data.(*CollaborativeArea).Peers = append(data.(*CollaborativeArea).Peers, id) // add the creator id to the peers
// then reset the shared fields
@@ -209,9 +211,20 @@ func (wfa *collaborativeAreaMongoAccessor) StoreOne(data utils.DBObject) (utils.
if data.(*CollaborativeArea).Rules == nil {
data.(*CollaborativeArea).Rules = []string{}
}
if data.(*CollaborativeArea).CollaborativeAreaRule == nil {
data.(*CollaborativeArea).CollaborativeAreaRule = &CollaborativeAreaRule{
ShareMode: "private",
ExploitedBy: "collaborators only",
}
}
data.(*CollaborativeArea).CollaborativeAreaRule.CreatedAt = time.Now().UTC()
// retrieve or proper peer
dd, code, err := (&peer.Peer{}).GetAccessor(nil).Search(nil, "0")
if code != 200 || len(dd) == 0 {
return nil, code, errors.New("Could not retrieve the peer" + err.Error())
}
data.(*CollaborativeArea).CollaborativeAreaRule.Creator = dd[0].GetID()
d, code, err := wfa.GenericStoreOne(data.(*CollaborativeArea), wfa)
if code == 200 {
wfa.sharedWorkflow(d.(*CollaborativeArea), d.GetID()) // create all shared workflows
wfa.sharedWorkspace(d.(*CollaborativeArea), d.GetID()) // create all collaborative areas

View File

@@ -15,9 +15,9 @@ type ShallowCollaborativeArea struct {
Version string `json:"version,omitempty" bson:"version,omitempty"`
Description string `json:"description,omitempty" bson:"description,omitempty" validate:"required"`
Attributes map[string]interface{} `json:"attributes,omitempty" bson:"attributes,omitempty"`
Workspaces []string `json:"workspaces,omitempty" bson:"workspaces,omitempty"`
Workflows []string `json:"workflows,omitempty" bson:"workflows,omitempty"`
Peers []string `json:"peers,omitempty" bson:"peers,omitempty"`
Workspaces []string `json:"workspaces" bson:"workspaces"`
Workflows []string `json:"workflows" bson:"workflows"`
Peers []string `json:"peers" bson:"peers"`
Rules []string `json:"rules,omitempty" bson:"rules,omitempty"`
}

View File

@@ -10,6 +10,7 @@ import (
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resource_model"
d "cloud.o-forge.io/core/oc-lib/models/resources/data"
"cloud.o-forge.io/core/oc-lib/models/resources/compute"
p "cloud.o-forge.io/core/oc-lib/models/resources/processing"
s "cloud.o-forge.io/core/oc-lib/models/resources/storage"
w "cloud.o-forge.io/core/oc-lib/models/resources/workflow"
@@ -26,6 +27,7 @@ It's used to create the models dynamically
var models = map[string]func() utils.DBObject{
tools.WORKFLOW_RESOURCE.String(): func() utils.DBObject { return &w.WorkflowResource{} },
tools.DATA_RESOURCE.String(): func() utils.DBObject { return &d.DataResource{} },
tools.COMPUTE_RESOURCE.String(): func() utils.DBObject { return &compute.ComputeResource{} },
tools.STORAGE_RESOURCE.String(): func() utils.DBObject { return &s.StorageResource{} },
tools.PROCESSING_RESOURCE.String(): func() utils.DBObject { return &p.ProcessingResource{} },
tools.WORKFLOW.String(): func() utils.DBObject { return &w2.Workflow{} },
@@ -36,6 +38,8 @@ var models = map[string]func() utils.DBObject{
tools.COLLABORATIVE_AREA.String(): func() utils.DBObject { return &collaborative_area.CollaborativeArea{} },
tools.RULE.String(): func() utils.DBObject { return &rule.Rule{} },
tools.BOOKING.String(): func() utils.DBObject { return &booking.Booking{} },
tools.WORKFLOW_HISTORY.String(): func() utils.DBObject { return &w2.WorkflowHistory{} },
tools.WORKSPACE_HISTORY.String(): func() utils.DBObject { return &w3.WorkspaceHistory{} },
}
// Model returns the model object based on the model type

View File

@@ -5,17 +5,36 @@ import (
"fmt"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/static"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/google/uuid"
)
// now write a go enum for the state partner with self, blacklist, partner
type PeerState int
const (
NONE PeerState = iota
SELF
PARTNER
BLACKLIST
)
func (m PeerState) String() string {
return [...]string{"NONE", "SELF", "PARTNER", "BLACKLIST"}[m]
}
func (m PeerState) EnumIndex() int {
return int(m)
}
// Peer is a struct that represents a peer
type Peer struct {
utils.AbstractObject
Url string `json:"url,omitempty" bson:"url,omitempty" validate:"required,base64url"` // Url is the URL of the peer (base64url)
Url string `json:"url,omitempty" bson:"url,omitempty" validate:"required"` // Url is the URL of the peer (base64url)
PublicKey string `json:"public_key,omitempty" bson:"public_key,omitempty"` // PublicKey is the public key of the peer
Services map[string]int `json:"services,omitempty" bson:"services,omitempty"` // Services is the services of the peer
Services map[string]int `json:"services,omitempty" bson:"services,omitempty"`
State PeerState `json:"state" bson:"state" default:"0"`
FailedExecution []PeerExecution `json:"failed_execution" bson:"failed_execution"` // FailedExecution is the list of failed executions, to be retried
}
@@ -45,9 +64,13 @@ func (ao *Peer) RemoveExecution(exec PeerExecution) {
}
// IsMySelf checks if the peer is the local peer
func (ao *Peer) IsMySelf() bool {
id, _ := static.GetMyLocalJsonPeer()
return ao.UUID == id
func (ao *Peer) IsMySelf() (bool, string) {
d, code, err := ao.GetAccessor(nil).Search(nil, SELF.String())
if code != 200 || err != nil || len(d) == 0 {
return false, ""
}
id := d[0].GetID()
return ao.UUID == id, id
}
// LaunchPeerExecution launches an execution on a peer
@@ -61,8 +84,10 @@ func (ao *Peer) GetID() string {
}
func (r *Peer) GenerateID() {
if r.UUID == "" {
r.UUID = uuid.New().String()
}
}
func (d *Peer) GetName() string {
return d.Name

View File

@@ -32,14 +32,23 @@ type PeerCache struct {
func (p *PeerCache) urlFormat(url string, dt tools.DataType) string {
// localhost is replaced by the local peer URL
// because localhost must collide on a web request security protocol
if strings.Contains(url, "localhost") || strings.Contains(url, "127.0.0.1") {
r := regexp.MustCompile("(:[0-9]+)")
localhost := ""
if strings.Contains(url, "localhost") {
localhost = "localhost"
}
if strings.Contains(url, "127.0.0.1") {
localhost = "127.0.0.1"
}
if localhost != "" {
r := regexp.MustCompile("(" + localhost + ":[0-9]+)")
t := r.FindString(url)
if t != "" {
url = strings.Replace(url, t, "", -1)
url = strings.Replace(url, t, dt.API()+":8080/oc", -1)
} else {
url = strings.ReplaceAll(url, localhost, dt.API()+":8080/oc")
}
url = strings.ReplaceAll(url, "localhost", dt.API()+":8080")
url = strings.ReplaceAll(url, "127.0.0.1", dt.API()+":8080")
} else {
url = url + "/" + dt.API()
}
return url
}
@@ -60,8 +69,10 @@ func (p *PeerCache) checkPeerStatus(peerID string, appName string, caller *tools
if meth == "" {
return res.(*Peer), false
}
url := p.urlFormat(res.(*Peer).Url+meth, tools.PEER) // Format the URL
state, services := api.CheckRemotePeer(url) // Check the status of the peer
url := p.urlFormat(res.(*Peer).Url, tools.PEER) + meth // Format the URL
fmt.Println("Checking peer status on", url, "...")
state, services := api.CheckRemotePeer(url)
fmt.Println("Checking peer status on", url, state, services) // Check the status of the peer
res.(*Peer).Services = services // Update the services states of the peer
access.UpdateOne(res, peerID) // Update the peer in the db
return res.(*Peer), state != tools.DEAD && services[appName] == 0 // Return the peer and its status
@@ -70,23 +81,21 @@ func (p *PeerCache) checkPeerStatus(peerID string, appName string, caller *tools
// LaunchPeerExecution launches an execution on a peer
func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
dt tools.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
fmt.Println("Launching peer execution on", caller.URLS, dt, method)
methods := caller.URLS[dt] // Get the methods url of the data type
if _, ok := methods[method]; !ok {
if m, ok := methods[method]; !ok || m == "" {
return nil, errors.New("no path found")
}
meth := methods[method] // Get the method url to execute
if meth == "" {
return nil, errors.New("no path found")
} else {
meth = strings.ReplaceAll(meth, ":id", dataID) // Replace the id in the url in case of a DELETE / UPDATE method (it's a standard naming in OC)
}
url := ""
// Check the status of the peer
if mypeer, ok := p.checkPeerStatus(peerID, dt.API(), caller); !ok {
if mypeer, ok := p.checkPeerStatus(peerID, dt.API(), caller); !ok && mypeer != nil {
// If the peer is not reachable, add the execution to the failed executions list
pexec := &PeerExecution{
Method: method.String(),
Url: p.urlFormat((mypeer.Url)+"/"+dt.API()+meth, dt),
Url: p.urlFormat((mypeer.Url)+meth, dt),
Body: body,
DataType: dt.EnumIndex(),
DataID: dataID,
@@ -95,8 +104,11 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
mypeer.GetAccessor(nil).UpdateOne(mypeer, peerID) // Update the peer in the db
return nil, errors.New("peer is not reachable")
} else {
if mypeer == nil {
return nil, errors.New("peer not found")
}
// If the peer is reachable, launch the execution
url = p.urlFormat((mypeer.Url)+"/"+dt.API()+meth, dt) // Format the URL
url = p.urlFormat((mypeer.Url)+meth, dt) // Format the URL
tmp := mypeer.FailedExecution // Get the failed executions list
mypeer.FailedExecution = []PeerExecution{} // Reset the failed executions list
mypeer.GetAccessor(nil).UpdateOne(mypeer, peerID) // Update the peer in the db
@@ -104,6 +116,7 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
go p.exec(v.Url, tools.ToMethod(v.Method), v.Body, caller)
}
}
fmt.Println("URL exec", url)
return nil, p.exec(url, method, body, caller) // Execute the method
}
@@ -125,7 +138,7 @@ func (p *PeerCache) exec(url string, method tools.METHOD, body map[string]interf
if err != nil {
return err
}
if e, ok := m["error"]; ok && e != "<nil>" { // Check if there is an error in the response
if e, ok := m["error"]; ok && e != "<nil>" && e != "" { // Check if there is an error in the response
return errors.New(fmt.Sprintf("%v", m["error"]))
}
return nil

View File

@@ -1,6 +1,8 @@
package peer
import (
"strconv"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/dbs/mongo"
"cloud.o-forge.io/core/oc-lib/models/utils"
@@ -67,12 +69,22 @@ func (wfa peerMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) {
func (wfa *peerMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) {
objs := []utils.ShallowDBObject{}
if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" {
s, err := strconv.Atoi(search)
if err == nil {
filters = &dbs.Filters{
Or: map[string][]dbs.Filter{ // search by name if no filters are provided
"state": {{Operator: dbs.EQUAL.String(), Value: s}},
},
}
} else {
filters = &dbs.Filters{
Or: map[string][]dbs.Filter{ // search by name if no filters are provided
"abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
"url": {{Operator: dbs.LIKE.String(), Value: search}},
},
}
}
}
res_mongo, code, err := mongo.MONGOService.Search(filters, wfa.GetType())
if err != nil {
wfa.Logger.Error().Msg("Could not store to db. Error: " + err.Error())

View File

@@ -8,6 +8,11 @@ import (
"github.com/google/uuid"
)
type WebResource struct {
Protocol string `bson:"protocol,omitempty" json:"protocol,omitempty"` // Protocol is the protocol of the URL
Path string `bson:"path,omitempty" json:"path,omitempty"` // Path is the path of the URL
}
/*
* AbstractResource is a struct that represents a resource
* it defines the resource data
@@ -20,7 +25,7 @@ type AbstractResource struct {
Owner string `json:"owner,omitempty" bson:"owner,omitempty" validate:"required"` // Owner is the owner of the resource
OwnerLogo string `json:"owner_logo,omitempty" bson:"owner_logo,omitempty"` // OwnerLogo is the owner logo of the resource
SourceUrl string `json:"source_url,omitempty" bson:"source_url,omitempty" validate:"required"` // SourceUrl is the source URL of the resource
PeerID string `json:"peer_id,omitempty" bson:"peer_id,omitempty" validate:"required"` // PeerID is the ID of the peer getting this resource
PeerID string `json:"peer_id,omitempty" bson:"peer_id,omitempty"` // PeerID is the ID of the peer getting this resource
Price string `json:"price,omitempty" bson:"price,omitempty"` // Price is the price of access to the resource
License string `json:"license,omitempty" bson:"license,omitempty"` // License is the license of the resource
ResourceModel *ResourceModel `json:"resource_model,omitempty" bson:"resource_model,omitempty"` // ResourceModel is the model of the resource
@@ -76,6 +81,7 @@ type Model struct {
type ResourceModel struct {
UUID string `json:"id,omitempty" bson:"id,omitempty" validate:"required"`
ResourceType string `json:"resource_type,omitempty" bson:"resource_type,omitempty" validate:"required"`
VarRefs map[string]string `json:"var_refs,omitempty" bson:"var_refs,omitempty"` // VarRefs is the variable references of the model
Model map[string]map[string]Model `json:"model,omitempty" bson:"model,omitempty"`
}

View File

@@ -1,4 +1,4 @@
package datacenter
package compute
import (
"encoding/json"
@@ -8,18 +8,53 @@ import (
"cloud.o-forge.io/core/oc-lib/tools"
)
type TechnologyEnum int
const (
DOCKER TechnologyEnum = iota
KUBERNETES
SLURM
HW
CONDOR
)
func (t TechnologyEnum) String() string {
return [...]string{"DOCKER", "KUBERNETES", "SLURM", "HW", "CONDOR"}[t]
}
type AccessEnum int
const (
SSH AccessEnum = iota
SSH_KUBE_API
SSH_SLURM
SSH_DOCKER
OPENCLOUD
VPN
)
func (a AccessEnum) String() string {
return [...]string{"SSH", "SSH_KUBE_API", "SSH_SLURM", "SSH_DOCKER", "OPENCLOUD", "VPN"}[a]
}
/*
* DatacenterResource is a struct that represents a datacenter resource
* it defines the resource datacenter
* ComputeResource is a struct that represents a compute resource
* it defines the resource compute
*/
type DatacenterResource struct {
type ComputeResource struct {
resource_model.AbstractResource
Technology TechnologyEnum `json:"technology" bson:"technology" default:"0"` // Technology is the technology
Architecture string `json:"architecture,omitempty" bson:"architecture,omitempty"` // Architecture is the architecture
Access AccessEnum `json:"access" bson:"access default:"0"` // Access is the access
Localisation string `json:"localisation,omitempty" bson:"localisation,omitempty"` // Localisation is the localisation
CPUs []*CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs
RAM *RAM `bson:"ram,omitempty" json:"ram,omitempty"` // RAM is the RAM
GPUs []*GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs
}
func (dma *DatacenterResource) Deserialize(j map[string]interface{}) utils.DBObject {
func (dma *ComputeResource) Deserialize(j map[string]interface{}) utils.DBObject {
b, err := json.Marshal(j)
if err != nil {
return nil
@@ -28,7 +63,7 @@ func (dma *DatacenterResource) Deserialize(j map[string]interface{}) utils.DBObj
return dma
}
func (dma *DatacenterResource) Serialize() map[string]interface{} {
func (dma *ComputeResource) Serialize() map[string]interface{} {
var m map[string]interface{}
b, err := json.Marshal(dma)
if err != nil {
@@ -38,9 +73,9 @@ func (dma *DatacenterResource) Serialize() map[string]interface{} {
return m
}
func (d *DatacenterResource) GetAccessor(caller *tools.HTTPCaller) utils.Accessor {
func (d *ComputeResource) GetAccessor(caller *tools.HTTPCaller) utils.Accessor {
data := New()
data.Init(tools.DATACENTER_RESOURCE, caller)
data.Init(tools.COMPUTE_RESOURCE, caller)
return data
}

View File

@@ -1,4 +1,4 @@
package datacenter
package compute
import (
"cloud.o-forge.io/core/oc-lib/dbs"
@@ -7,39 +7,39 @@ import (
"cloud.o-forge.io/core/oc-lib/models/utils"
)
type datacenterMongoAccessor struct {
type computeMongoAccessor struct {
utils.AbstractAccessor // AbstractAccessor contains the basic fields of an accessor (model, caller)
}
// New creates a new instance of the datacenterMongoAccessor
func New() *datacenterMongoAccessor {
return &datacenterMongoAccessor{}
// New creates a new instance of the computeMongoAccessor
func New() *computeMongoAccessor {
return &computeMongoAccessor{}
}
/*
* Nothing special here, just the basic CRUD operations
*/
func (dca *datacenterMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) {
func (dca *computeMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) {
return dca.GenericDeleteOne(id, dca)
}
func (dca *datacenterMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
set.(*DatacenterResource).ResourceModel = nil
return dca.GenericUpdateOne(set, id, dca, &DatacenterResource{})
func (dca *computeMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
set.(*ComputeResource).ResourceModel = nil
return dca.GenericUpdateOne(set, id, dca, &ComputeResource{})
}
func (dca *datacenterMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
data.(*DatacenterResource).ResourceModel = nil
func (dca *computeMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
data.(*ComputeResource).ResourceModel = nil
return dca.GenericStoreOne(data, dca)
}
func (dca *datacenterMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
func (dca *computeMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
return dca.GenericStoreOne(data, dca)
}
func (dca *datacenterMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
var datacenter DatacenterResource
func (dca *computeMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
var compute ComputeResource
res_mongo, code, err := mongo.MONGOService.LoadOne(id, dca.GetType())
if err != nil {
@@ -47,23 +47,23 @@ func (dca *datacenterMongoAccessor) LoadOne(id string) (utils.DBObject, int, err
return nil, code, err
}
res_mongo.Decode(&datacenter)
res_mongo.Decode(&compute)
accessor := (&resource_model.ResourceModel{}).GetAccessor(nil)
resources, _, err := accessor.Search(nil, dca.GetType())
if err == nil && len(resources) > 0 {
datacenter.ResourceModel = resources[0].(*resource_model.ResourceModel)
compute.ResourceModel = resources[0].(*resource_model.ResourceModel)
}
return &datacenter, 200, nil
return &compute, 200, nil
}
func (wfa datacenterMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) {
func (wfa computeMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) {
objs := []utils.ShallowDBObject{}
res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType())
if err != nil {
wfa.Logger.Error().Msg("Could not retrieve any from db. Error: " + err.Error())
return nil, code, err
}
var results []DatacenterResource
var results []ComputeResource
if err = res_mongo.All(mongo.MngoCtx, &results); err != nil {
return nil, 404, err
}
@@ -73,12 +73,12 @@ func (wfa datacenterMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, erro
if err == nil && len(resources) > 0 {
r.ResourceModel = resources[0].(*resource_model.ResourceModel)
}
objs = append(objs, &r.AbstractResource) // only get the abstract resource !
objs = append(objs, &r) // only get the abstract resource !
}
return objs, 200, nil
}
func (wfa *datacenterMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) {
func (wfa *computeMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) {
objs := []utils.ShallowDBObject{}
if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" {
filters = &dbs.Filters{
@@ -96,7 +96,7 @@ func (wfa *datacenterMongoAccessor) Search(filters *dbs.Filters, search string)
wfa.Logger.Error().Msg("Could not store to db. Error: " + err.Error())
return nil, code, err
}
var results []DatacenterResource
var results []ComputeResource
if err = res_mongo.All(mongo.MngoCtx, &results); err != nil {
return nil, 404, err
}
@@ -106,7 +106,7 @@ func (wfa *datacenterMongoAccessor) Search(filters *dbs.Filters, search string)
if err == nil && len(resources) > 0 {
r.ResourceModel = resources[0].(*resource_model.ResourceModel)
}
objs = append(objs, &r.AbstractResource) // only get the abstract resource !
objs = append(objs, &r) // only get the abstract resource !
}
return objs, 200, nil
}

View File

@@ -1,4 +1,4 @@
package datacenter
package compute
import (
"testing"
@@ -9,10 +9,10 @@ import (
"github.com/stretchr/testify/assert"
)
func TestStoreOneDatacenter(t *testing.T) {
dc := DatacenterResource{
func TestStoreOneCompute(t *testing.T) {
dc := ComputeResource{
AbstractResource: resource_model.AbstractResource{
AbstractObject: utils.AbstractObject{Name: "testDatacenter"},
AbstractObject: utils.AbstractObject{Name: "testCompute"},
Description: "Lorem Ipsum",
Logo: "azerty.com",
Owner: "toto",
@@ -27,10 +27,10 @@ func TestStoreOneDatacenter(t *testing.T) {
assert.NotEmpty(t, id)
}
func TestLoadOneDatacenter(t *testing.T) {
dc := DatacenterResource{
func TestLoadOneCompute(t *testing.T) {
dc := ComputeResource{
AbstractResource: resource_model.AbstractResource{
AbstractObject: utils.AbstractObject{Name: "testDatacenter"},
AbstractObject: utils.AbstractObject{Name: "testCompute"},
Description: "Lorem Ipsum",
Logo: "azerty.com",
Owner: "toto",

View File

@@ -8,14 +8,34 @@ import (
"cloud.o-forge.io/core/oc-lib/tools"
)
// enum of public private or licenced data
type DataLicense int
const (
PUBLIC DataLicense = iota
PRIVATE
LICENCED
)
/*
* Struct of Usage Conditions
*/
type UsageConditions struct {
Usage string `json:"usage,omitempty" bson:"usage,omitempty" description:"usage of the data"` // Usage is the usage of the data
Actors []string `json:"actors,omitempty" bson:"actors,omitempty" description:"actors of the data"` // Actors is the actors of the data
}
/*
* DataResource is a struct that represents a data resource
* it defines the resource data
*/
type DataResource struct {
resource_model.AbstractResource // AbstractResource contains the basic fields of an object (id, name)
Protocols []string `json:"protocol,omitempty" bson:"protocol,omitempty"` //TODO Enum type
DataType string `json:"datatype,omitempty" bson:"datatype,omitempty"` // DataType is the type of the data
resource_model.WebResource
Type string `bson:"type,omitempty" json:"type,omitempty"` // Type is the type of the storage
UsageConditions UsageConditions `json:"usage_conditions,omitempty" bson:"usage_conditions,omitempty" description:"usage conditions of the data"` // UsageConditions is the usage conditions of the data
License DataLicense `json:"license" bson:"license" description:"license of the data" default:"0"` // License is the license of the data
Interest DataLicense `json:"interest" bson:"interest" description:"interest of the data" default:"0"` // Interest is the interest of the data
Example string `json:"example,omitempty" bson:"example,omitempty" description:"base64 encoded data"` // Example is an example of the data
}

View File

@@ -71,7 +71,7 @@ func (wfa dataMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) {
if err == nil && len(resources) > 0 {
r.ResourceModel = resources[0].(*resource_model.ResourceModel)
}
objs = append(objs, &r.AbstractResource) // only get the abstract resource !
objs = append(objs, &r) // only get the abstract resource !
}
return objs, 200, nil
}
@@ -104,7 +104,7 @@ func (wfa *dataMongoAccessor) Search(filters *dbs.Filters, search string) ([]uti
if err == nil && len(resources) > 0 {
r.ResourceModel = resources[0].(*resource_model.ResourceModel)
}
objs = append(objs, &r.AbstractResource) // only get the abstract resource !
objs = append(objs, &r) // only get the abstract resource !
}
return objs, 200, nil
}

View File

@@ -10,7 +10,11 @@ import (
)
func TestStoreOneData(t *testing.T) {
d := DataResource{DataType: "jpeg", Example: "123456",
d := DataResource{
WebResource: resource_model.WebResource{
Protocol: "http", Path: "azerty.fr",
},
Example: "123456",
AbstractResource: resource_model.AbstractResource{
AbstractObject: utils.AbstractObject{Name: "testData"},
Description: "Lorem Ipsum",
@@ -28,7 +32,11 @@ func TestStoreOneData(t *testing.T) {
}
func TestLoadOneDate(t *testing.T) {
d := DataResource{DataType: "jpeg", Example: "123456",
d := DataResource{
WebResource: resource_model.WebResource{
Protocol: "http", Path: "azerty.fr",
},
Example: "123456",
AbstractResource: resource_model.AbstractResource{
AbstractObject: utils.AbstractObject{Name: "testData"},
Description: "Lorem Ipsum",

View File

@@ -4,7 +4,7 @@ import (
"encoding/json"
"cloud.o-forge.io/core/oc-lib/models/resource_model"
"cloud.o-forge.io/core/oc-lib/models/resources/datacenter"
"cloud.o-forge.io/core/oc-lib/models/resources/compute"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
@@ -29,9 +29,10 @@ type Expose struct {
*/
type ProcessingResource struct {
resource_model.AbstractResource
CPUs []*datacenter.CPU `bson:"cpus,omitempty" json:"cp_us,omitempty"` // CPUs is the list of CPUs
GPUs []*datacenter.GPU `bson:"gpus,omitempty" json:"gp_us,omitempty"` // GPUs is the list of GPUs
RAM *datacenter.RAM `bson:"ram,omitempty" json:"ram,omitempty"` // RAM is the RAM
IsService bool `json:"is_service,omitempty" bson:"is_service,omitempty"` // IsService is a flag that indicates if the processing is a service
CPUs []*compute.CPU `bson:"cpus,omitempty" json:"cp_us,omitempty"` // CPUs is the list of CPUs
GPUs []*compute.GPU `bson:"gpus,omitempty" json:"gp_us,omitempty"` // GPUs is the list of GPUs
RAM *compute.RAM `bson:"ram,omitempty" json:"ram,omitempty"` // RAM is the RAM
Storage uint `bson:"storage,omitempty" json:"storage,omitempty"` // Storage is the storage
Parallel bool `bson:"parallel,omitempty" json:"parallel,omitempty"` // Parallel is a flag that indicates if the processing is parallel
ScalingModel uint `bson:"scaling_model,omitempty" json:"scaling_model,omitempty"` // ScalingModel is the scaling model

View File

@@ -74,7 +74,7 @@ func (wfa processingMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, erro
if err == nil && len(resources) > 0 {
r.ResourceModel = resources[0].(*resource_model.ResourceModel)
}
objs = append(objs, &r.AbstractResource) // only get the abstract resource !
objs = append(objs, &r) // only get the abstract resource !
}
return objs, 200, nil
}
@@ -108,7 +108,7 @@ func (wfa *processingMongoAccessor) Search(filters *dbs.Filters, search string)
if err == nil && len(resources) > 0 {
r.ResourceModel = resources[0].(*resource_model.ResourceModel)
}
objs = append(objs, &r.AbstractResource) // only get the abstract resource !
objs = append(objs, &r) // only get the abstract resource !
}
return objs, 200, nil
}

View File

@@ -3,7 +3,7 @@ package resources
import (
"cloud.o-forge.io/core/oc-lib/models/resource_model"
"cloud.o-forge.io/core/oc-lib/models/resources/data"
"cloud.o-forge.io/core/oc-lib/models/resources/datacenter"
"cloud.o-forge.io/core/oc-lib/models/resources/compute"
"cloud.o-forge.io/core/oc-lib/models/resources/processing"
"cloud.o-forge.io/core/oc-lib/models/resources/storage"
w "cloud.o-forge.io/core/oc-lib/models/resources/workflow"
@@ -18,13 +18,13 @@ type ResourceSet struct {
Datas []string `bson:"datas,omitempty" json:"datas,omitempty"`
Storages []string `bson:"storages,omitempty" json:"storages,omitempty"`
Processings []string `bson:"processings,omitempty" json:"processings,omitempty"`
Datacenters []string `bson:"datacenters,omitempty" json:"datacenters,omitempty"`
Computes []string `bson:"computes,omitempty" json:"computes,omitempty"`
Workflows []string `bson:"workflows,omitempty" json:"workflows,omitempty"`
DataResources []*data.DataResource `bson:"-" json:"data_resources,omitempty"`
StorageResources []*storage.StorageResource `bson:"-" json:"storage_resources,omitempty"`
ProcessingResources []*processing.ProcessingResource `bson:"-" json:"processing_resources,omitempty"`
DatacenterResources []*datacenter.DatacenterResource `bson:"-" json:"datacenter_resources,omitempty"`
ComputeResources []*compute.ComputeResource `bson:"-" json:"compute_resources,omitempty"`
WorkflowResources []*w.WorkflowResource `bson:"-" json:"workflow_resources,omitempty"`
}
@@ -32,7 +32,7 @@ type ItemResource struct {
Data *data.DataResource `bson:"data,omitempty" json:"data,omitempty"`
Processing *processing.ProcessingResource `bson:"processing,omitempty" json:"processing,omitempty"`
Storage *storage.StorageResource `bson:"storage,omitempty" json:"storage,omitempty"`
Datacenter *datacenter.DatacenterResource `bson:"datacenter,omitempty" json:"datacenter,omitempty"`
Compute *compute.ComputeResource `bson:"compute,omitempty" json:"compute,omitempty"`
Workflow *w.WorkflowResource `bson:"workflow,omitempty" json:"workflow,omitempty"`
}
@@ -47,12 +47,11 @@ func (i *ItemResource) GetAbstractRessource() *resource_model.AbstractResource {
if(i.Storage != nil){
return &i.Storage.AbstractResource
}
if(i.Datacenter != nil){
return &i.Datacenter.AbstractResource
if(i.Compute != nil){
return &i.Compute.AbstractResource
}
if(i.Workflow != nil){
return &i.Workflow.AbstractResource
}
return nil
}

View File

@@ -8,11 +8,11 @@ import (
"cloud.o-forge.io/core/oc-lib/tools"
)
type StorageType int
type StorageSize int
// StorageType - Enum that defines the type of storage
const (
GB StorageType = iota
GB StorageSize = iota
MB
KB
)
@@ -24,14 +24,22 @@ var argoType = [...]string{
}
// New creates a new instance of the StorageResource struct
func (dma StorageType) ToArgo() string {
func (dma StorageSize) ToArgo() string {
return argoType[dma]
}
type URL struct {
Protocol string `bson:"protocol,omitempty" json:"protocol,omitempty"` // Protocol is the protocol of the URL
Path string `bson:"path,omitempty" json:"path,omitempty"` // Path is the path of the URL
}
// enum of a data type
type StorageType int
const (
FILE = iota
STREAM
API
DATABASE
S3
MEMORY
HARDWARE
)
/*
* StorageResource is a struct that represents a storage resource
@@ -39,11 +47,11 @@ type URL struct {
*/
type StorageResource struct {
resource_model.AbstractResource // AbstractResource contains the basic fields of an object (id, name)
resource_model.WebResource
Type StorageType `bson:"type,omitempty" json:"type,omitempty"` // Type is the type of the storage
Acronym string `bson:"acronym,omitempty" json:"acronym,omitempty"` // Acronym is the acronym of the storage
Type string `bson:"type,omitempty" json:"type,omitempty"` // Type is the type of the storage
SizeType StorageType `bson:"size_type" json:"size_type" default:"0"` // SizeType is the type of the storage size
SizeType StorageSize `bson:"size_type" json:"size_type" default:"0"` // SizeType is the type of the storage size
Size uint `bson:"size,omitempty" json:"size,omitempty"` // Size is the size of the storage
Url *URL `bson:"url,omitempty" json:"url,omitempty"` // Will allow to select between several protocols
Local bool `bson:"local" json:"local"` // Local is a flag that indicates if the storage is local
Encryption bool `bson:"encryption,omitempty" json:"encryption,omitempty"` // Encryption is a flag that indicates if the storage is encrypted
Redundancy string `bson:"redundancy,omitempty" json:"redundancy,omitempty"` // Redundancy is the redundancy of the storage

View File

@@ -74,7 +74,7 @@ func (wfa storageMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error)
if err == nil && len(resources) > 0 {
r.ResourceModel = resources[0].(*resource_model.ResourceModel)
}
objs = append(objs, &r.AbstractResource) // only get the abstract resource !
objs = append(objs, &r) // only get the abstract resource !
}
return objs, 200, nil
}
@@ -108,7 +108,7 @@ func (wfa *storageMongoAccessor) Search(filters *dbs.Filters, search string) ([]
if err == nil && len(resources) > 0 {
r.ResourceModel = resources[0].(*resource_model.ResourceModel)
}
objs = append(objs, &r.AbstractResource) // only get the abstract resource !
objs = append(objs, &r) // only get the abstract resource !
}
return objs, 200, nil
}

View File

@@ -10,7 +10,7 @@ import (
)
func TestStoreOneStorage(t *testing.T) {
s := StorageResource{Size: 123, Url: &URL{Protocol: "http", Path: "azerty.fr"},
s := StorageResource{Size: 123, WebResource: resource_model.WebResource{Protocol: "http", Path: "azerty.fr"},
AbstractResource: resource_model.AbstractResource{
AbstractObject: utils.AbstractObject{Name: "testData"},
Description: "Lorem Ipsum",
@@ -28,7 +28,7 @@ func TestStoreOneStorage(t *testing.T) {
}
func TestLoadOneStorage(t *testing.T) {
s := StorageResource{Size: 123, Url: &URL{Protocol: "http", Path: "azerty.fr"},
s := StorageResource{Size: 123, WebResource: resource_model.WebResource{Protocol: "http", Path: "azerty.fr"},
AbstractResource: resource_model.AbstractResource{
AbstractObject: utils.AbstractObject{Name: "testData"},
Description: "Lorem Ipsum",

View File

@@ -1,6 +1,10 @@
package graph
import "cloud.o-forge.io/core/oc-lib/models/resources"
import (
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
// Graph is a struct that represents a graph
type Graph struct {
@@ -9,6 +13,23 @@ type Graph struct {
Links []GraphLink `bson:"links" json:"links" default:"{}" validate:"required"` // Links is the list of links between elements in the graph
}
func (g *Graph) GetResource(id string) (string, utils.DBObject) {
if item, ok := g.Items[id]; ok {
if item.Data != nil {
return tools.DATA_RESOURCE.String(), item.Data
} else if item.Compute != nil {
return tools.COMPUTE_RESOURCE.String(), item.Compute
} else if item.Workflow != nil {
return tools.WORKFLOW_RESOURCE.String(), item.Workflow
} else if item.Processing != nil {
return tools.PROCESSING_RESOURCE.String(), item.Processing
} else if item.Storage != nil {
return tools.STORAGE_RESOURCE.String(), item.Storage
}
}
return "", nil
}
// GraphItem is a struct that represents an item in a graph
type GraphItem struct {
ID string `bson:"id" json:"id" validate:"required"` // ID is the unique identifier of the item

View File

@@ -73,7 +73,7 @@ func (wfa workflowResourceMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int
if err == nil && len(resources) > 0 {
r.ResourceModel = resources[0].(*resource_model.ResourceModel)
}
objs = append(objs, &r.AbstractResource)
objs = append(objs, &r)
}
return objs, 200, nil
}
@@ -107,7 +107,7 @@ func (wfa *workflowResourceMongoAccessor) Search(filters *dbs.Filters, search st
if err == nil && len(resources) > 0 {
r.ResourceModel = resources[0].(*resource_model.ResourceModel)
}
objs = append(objs, &r.AbstractResource)
objs = append(objs, &r)
}
return objs, 200, nil
}

View File

@@ -8,7 +8,6 @@ import (
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/dbs/mongo"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/static"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/go-playground/validator/v10"
"github.com/google/uuid"
@@ -42,7 +41,7 @@ func (ao *AbstractObject) GetName() string {
func (ao *AbstractObject) UpToDate() {
ao.UpdateDate = time.Now()
ao.LastPeerWriter, _ = static.GetMyLocalJsonPeer()
// ao.LastPeerWriter, _ = static.GetMyLocalJsonPeer()
}
// GetAccessor returns the accessor of the object (abstract)
@@ -161,3 +160,14 @@ func (dma *AbstractAccessor) GenericUpdateOne(set DBObject, id string, accessor
}
return accessor.LoadOne(id)
}
// GenericLoadOne loads one object from the database (generic)
// json expected in entry is a flatted object no need to respect the inheritance hierarchy
func (dma *AbstractAccessor) GenericRawUpdateOne(set DBObject, id string, accessor Accessor) (DBObject, int, error) {
id, code, err := mongo.MONGOService.UpdateOne(set, id, accessor.GetType())
if err != nil {
dma.Logger.Error().Msg("Could not update " + id + " to db. Error: " + err.Error())
return nil, code, err
}
return accessor.LoadOne(id)
}

View File

@@ -6,7 +6,8 @@ import (
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/resources/datacenter"
"cloud.o-forge.io/core/oc-lib/models/resources/compute"
"cloud.o-forge.io/core/oc-lib/models/resources/storage"
"cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
@@ -26,92 +27,48 @@ type AbstractWorkflow struct {
Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workflow
}
func (w AbstractWorkflow) isADependancy(id string) (bool, []string) {
dependancyOfIDs := []string{}
isDeps := false
for _, link := range w.Graph.Links {
source := w.Graph.Items[link.Destination.ID].Processing
if id == link.Source.ID && source != nil {
isDeps = true
dependancyOfIDs = append(dependancyOfIDs, link.Destination.ID)
}
wourceWF := w.Graph.Items[link.Destination.ID].Workflow
if id == link.Source.ID && wourceWF != nil {
isDeps = true
dependancyOfIDs = append(dependancyOfIDs, link.Destination.ID)
func (w *AbstractWorkflow) GetWorkflows() (list_computings []graph.GraphItem) {
for _, item := range w.Graph.Items {
if item.Workflow != nil {
list_computings = append(list_computings, item)
}
}
return isDeps, dependancyOfIDs
return
}
func (w *AbstractWorkflow) GetStoragesByRelatedProcessing(processingID string, relatedToData bool, ignoreRelation bool) (map[string][]utils.DBObject, map[string]map[string][]utils.DBObject) {
storages := make(map[string][]utils.DBObject)
datasRelatedToStorage := make(map[string]map[string][]utils.DBObject)
func (w *AbstractWorkflow) GetComputeByRelatedProcessing(processingID string) []*compute.ComputeResource {
storages := []*compute.ComputeResource{}
for _, link := range w.Graph.Links {
nodeID := link.Destination.ID // we considers that the processing is the destination
node := w.Graph.Items[link.Source.ID].Compute // we are looking for the storage as source
if node == nil { // if the source is not a storage, we consider that the destination is the storage
nodeID = link.Source.ID // and the processing is the source
node = w.Graph.Items[link.Destination.ID].Compute // we are looking for the storage as destination
}
if processingID == nodeID && node != nil { // if the storage is linked to the processing
storages = append(storages, node)
}
}
return storages
}
func (w *AbstractWorkflow) GetStoragesByRelatedProcessing(processingID string) []*storage.StorageResource {
storages := []*storage.StorageResource{}
for _, link := range w.Graph.Links {
inout := "in"
storageID := link.Source.ID // Default value because we are looking for the input storage cause processing is destination
nodeID := link.Destination.ID // we considers that the processing is the destination
node := w.Graph.Items[link.Source.ID].Storage // we are looking for the storage as source
if node == nil { // if the source is not a storage, we consider that the destination is the storage
inout = "out"
storageID = link.Destination.ID // then we are looking for the output storage
nodeID = link.Source.ID // and the processing is the source
node = w.Graph.Items[link.Destination.ID].Storage // we are looking for the storage as destination
}
if processingID == nodeID && node != nil { // if the storage is linked to the processing
if storages[inout] == nil {
storages[inout] = []utils.DBObject{}
}
if !ignoreRelation {
datasRelatedToStorage[storageID], _ = w.GetDatasByRelatedProcessing(processingID, false, true)
if relatedToData && len(datasRelatedToStorage[storageID]) > 0 {
storages[inout] = append(storages[inout], node)
} else if !relatedToData && len(datasRelatedToStorage[storageID]) == 0 {
storages[inout] = append(storages[inout], node)
}
} else {
storages[inout] = append(storages[inout], node)
storages = append(storages, node)
}
}
}
return storages, datasRelatedToStorage
return storages
}
func (w *AbstractWorkflow) GetDatasByRelatedProcessing(dataID string, relatedToStorage bool, ignoreRelation bool) (map[string][]utils.DBObject, map[string]map[string][]utils.DBObject) {
datas := make(map[string][]utils.DBObject)
datasRelatedToData := make(map[string]map[string][]utils.DBObject)
for _, link := range w.Graph.Links {
inout := "in"
dataID := link.Source.ID // Default value because we are looking for the input storage cause processing is destination
nodeID := link.Destination.ID // we considers that the processing is the destination
node := w.Graph.Items[link.Source.ID].Data // we are looking for the storage as source
if node == nil { // if the source is not a storage, we consider that the destination is the storage
inout = "out"
dataID = link.Destination.ID // then we are looking for the output storage
nodeID = link.Source.ID // and the processing is the source
node = w.Graph.Items[link.Destination.ID].Data // we are looking for the storage as destination
}
if dataID == nodeID && node != nil { // if the storage is linked to the processing
if datas[inout] == nil {
datas[inout] = []utils.DBObject{}
}
datas[inout] = append(datas[inout], node)
if !ignoreRelation {
datasRelatedToData[dataID], _ = w.GetStoragesByRelatedProcessing(dataID, false, true)
if relatedToStorage && len(datasRelatedToData[dataID]) > 0 {
datas[inout] = append(datas[inout], node)
} else if !relatedToStorage && len(datasRelatedToData[dataID]) == 0 {
datas[inout] = append(datas[inout], node)
}
} else {
datas[inout] = append(datas[inout], node)
}
}
}
return datas, datasRelatedToData
}
func (w *AbstractWorkflow) getProcessingsByRelatedProcessing() (list_computings []graph.GraphItem) {
func (w *AbstractWorkflow) GetProcessings() (list_computings []graph.GraphItem) {
for _, item := range w.Graph.Items {
if item.Processing != nil {
list_computings = append(list_computings, item)
@@ -120,16 +77,16 @@ func (w *AbstractWorkflow) getProcessingsByRelatedProcessing() (list_computings
return
}
// tool function to check if a link is a link between a datacenter and a resource
// tool function to check if a link is a link between a compute and a resource
func (w *AbstractWorkflow) isDCLink(link graph.GraphLink) (bool, string) {
if w.Graph == nil || w.Graph.Items == nil {
return false, ""
}
if d, ok := w.Graph.Items[link.Source.ID]; ok && d.Datacenter != nil {
return true, d.Datacenter.UUID
if d, ok := w.Graph.Items[link.Source.ID]; ok && d.Compute != nil {
return true, d.Compute.UUID
}
if d, ok := w.Graph.Items[link.Destination.ID]; ok && d.Datacenter != nil {
return true, d.Datacenter.UUID
if d, ok := w.Graph.Items[link.Destination.ID]; ok && d.Compute != nil {
return true, d.Compute.UUID
}
return false, ""
}
@@ -151,15 +108,15 @@ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) {
if wfa.Graph == nil { // no graph no booking
return false, nil
}
accessor := (&datacenter.DatacenterResource{}).GetAccessor(nil)
accessor := (&compute.ComputeResource{}).GetAccessor(nil)
for _, link := range wfa.Graph.Links {
if ok, dc_id := wfa.isDCLink(link); ok { // check if the link is a link between a datacenter and a resource
if ok, dc_id := wfa.isDCLink(link); ok { // check if the link is a link between a compute and a resource
dc, code, _ := accessor.LoadOne(dc_id)
if code != 200 {
continue
}
// CHECK BOOKING ON PEER, datacenter could be a remote one
peerID := dc.(*datacenter.DatacenterResource).PeerID
// CHECK BOOKING ON PEER, compute could be a remote one
peerID := dc.(*compute.ComputeResource).PeerID
if peerID == "" {
return false, errors.New("no peer id")
} // no peer id no booking, we need to know where to book

View File

@@ -3,14 +3,16 @@ package workflow
import (
"errors"
"fmt"
"slices"
"strings"
"time"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/dbs/mongo"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/resources/datacenter"
"cloud.o-forge.io/core/oc-lib/models/resources/compute"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/models/workspace"
@@ -125,15 +127,20 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w
g = realData.Graph
}
if g != nil && g.Links != nil && len(g.Links) > 0 { // if the graph is set and has links then book the workflow (even on ourselves)
accessor := (&datacenter.DatacenterResource{}).GetAccessor(nil)
accessor := (&compute.ComputeResource{}).GetAccessor(nil)
isDCFound := []string{}
for _, link := range g.Links {
if ok, dc_id := realData.isDCLink(link); ok { // check if the link is a link between a datacenter and a resource booking is only on datacenter
if ok, dc_id := realData.isDCLink(link); ok { // check if the link is a link between a compute and a resource booking is only on compute
if slices.Contains(isDCFound, dc_id) {
continue
} // if the compute is already found, skip it
isDCFound = append(isDCFound, dc_id)
dc, code, _ := accessor.LoadOne(dc_id)
if code != 200 {
continue
}
// CHECK BOOKING
peerID := dc.(*datacenter.DatacenterResource).PeerID
peerID := dc.(*compute.ComputeResource).PeerID
if peerID == "" { // no peer id no booking
continue
}
@@ -141,7 +148,7 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w
_, err := (&peer.Peer{}).LaunchPeerExecution(peerID, "", tools.BOOKING, tools.POST,
(&workflow_execution.WorkflowExecutions{ // it's the standard model for booking see OC-PEER
WorkflowID: id, // set the workflow id "WHO"
ResourceID: dc_id, // set the datacenter id "WHERE"
ResourceID: dc_id, // set the compute id "WHERE"
Executions: execs, // set the executions to book "WHAT"
}).Serialize(), wfa.Caller)
if err != nil {
@@ -158,7 +165,7 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w
* share is a function that shares a workflow to the peers if the workflow is shared
*/
func (wfa *workflowMongoAccessor) share(realData *Workflow, delete bool, caller *tools.HTTPCaller) {
if realData.Shared == nil || len(realData.Shared) == 0 || caller.Disabled { // no shared no sharing
if realData == nil || realData.Shared == nil || len(realData.Shared) == 0 || caller == nil || caller.Disabled { // no shared no sharing
return
}
for _, sharedID := range realData.Shared { // loop through the shared ids
@@ -171,7 +178,7 @@ func (wfa *workflowMongoAccessor) share(realData *Workflow, delete bool, caller
paccess := &peer.Peer{}
for _, p := range res.(*shallow_collaborative_area.ShallowCollaborativeArea).Peers {
paccess.UUID = p
if paccess.IsMySelf() { // if the peer is the current peer, never share because it will create a loop
if ok, _ := paccess.IsMySelf(); ok { // if the peer is the current peer, never share because it will create a loop
continue
}
if delete { // if the workflow is deleted, share the deletion
@@ -192,44 +199,40 @@ func (wfa *workflowMongoAccessor) share(realData *Workflow, delete bool, caller
* execution is a create or delete function for the workflow executions depending on the schedule of the workflow
*/
func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delete bool) (int, error) {
var err error
nats := tools.NewNATSCaller() // create a new nats caller because executions are sent to the nats for daemons
if !realData.ScheduleActive { // if the schedule is not active, delete the executions
mongo.MONGOService.DeleteMultiple(map[string]interface{}{
"state": 1, // only delete the scheduled executions only scheduled if executions are in progress or ended, they should not be deleted for registration
"workflow_id": id,
}, tools.WORKFLOW_EXECUTION.String())
err := wfa.book(id, realData, []*workflow_execution.WorkflowExecution{}) // delete the booking of the workflow on the peers
fmt.Println("DELETE BOOKING", err)
nats.SetNATSPub(tools.WORKFLOW.String(), tools.REMOVE, realData) // send the deletion to the nats
if err != nil {
return 409, err
}
return 200, nil
}
accessor := (&workflow_execution.WorkflowExecution{}).GetAccessor(nil)
execs, err := wfa.getExecutions(id, realData) // get the executions of the workflow
if err != nil {
return 422, err
}
if !realData.ScheduleActive || delete { // if the schedule is not active, delete the executions
execs = []*workflow_execution.WorkflowExecution{}
}
err = wfa.book(id, realData, execs) // book the workflow on the peers
fmt.Println("BOOKING", err)
if err != nil {
return 409, err // if the booking fails, return an error for integrity between peers
}
if delete { // if delete is set to true, delete the executions
return 200, nil
}
if len(execs) > 0 { // if the executions are set, store them
fmt.Println("BOOKING", delete)
for _, obj := range execs {
_, code, err := accessor.StoreOne(obj)
fmt.Println("EXEC", code, err)
if code != 200 {
return code, err
}
}
nats.SetNATSPub(tools.WORKFLOW.String(), tools.CREATE, realData) // send the creation to the nats
} else {
return 422, err
}
return 200, nil
}
@@ -239,34 +242,53 @@ func (wfa *workflowMongoAccessor) UpdateOne(set utils.DBObject, id string) (util
if code != 200 {
return nil, 409, err
}
// avoid the update if the schedule is the same
avoid := set.(*Workflow).Schedule == nil || (res.(*Workflow).Schedule != nil && res.(*Workflow).ScheduleActive == set.(*Workflow).ScheduleActive && res.(*Workflow).Schedule.Start == set.(*Workflow).Schedule.Start && res.(*Workflow).Schedule.End == set.(*Workflow).Schedule.End && res.(*Workflow).Schedule.Cron == set.(*Workflow).Schedule.Cron)
res, code, err = wfa.GenericUpdateOne(set, id, wfa, &Workflow{})
if code != 200 {
return nil, code, err
}
workflow := res.(*Workflow)
if !avoid { // if the schedule is not avoided, update the executions
if code, err := wfa.execution(id, res.(*Workflow), false); code != 200 {
if code, err := wfa.execution(id, workflow, false); code != 200 {
return nil, code, errors.New("could not update the executions : " + err.Error())
}
}
wfa.execute(res.(*Workflow), false, false) // update the workspace for the workflow
wfa.share(res.(*Workflow), false, wfa.Caller) // share the update to the peers
fmt.Println("UPDATE", workflow.ScheduleActive, workflow.Schedule)
if workflow.ScheduleActive && workflow.Schedule != nil { // if the workflow is scheduled, update the executions
now := time.Now().UTC()
if (workflow.Schedule.End != nil && now.After(*workflow.Schedule.End)) || (workflow.Schedule.End == nil && workflow.Schedule.Start != nil && now.After(*workflow.Schedule.Start)) { // if the start date is passed, then you can book
workflow.ScheduleActive = false
wfa.GenericRawUpdateOne(workflow, id, wfa)
} // if the start date is passed, update the executions
}
wfa.execute(workflow, false, false) // update the workspace for the workflow
wfa.share(workflow, false, wfa.Caller) // share the update to the peers
return res, code, nil
}
// StoreOne stores a workflow in the database
func (wfa *workflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
res, code, err := wfa.GenericStoreOne(data, wfa)
d := data.(*Workflow)
if d.ScheduleActive && d.Schedule != nil { // if the workflow is scheduled, update the executions
now := time.Now().UTC()
if (d.Schedule.End != nil && now.After(*d.Schedule.End)) || (d.Schedule.End == nil && d.Schedule.Start != nil && now.After(*d.Schedule.Start)) { // if the start date is passed, then you can book
d.ScheduleActive = false
} // if the start date is passed, update the executions
}
res, code, err := wfa.GenericStoreOne(d, wfa)
if err != nil || code != 200 {
return nil, code, err
}
wfa.share(res.(*Workflow), false, wfa.Caller) // share the creation to the peers
workflow := res.(*Workflow)
wfa.share(workflow, false, wfa.Caller) // share the creation to the peers
//store the executions
if code, err := wfa.execution(res.GetID(), res.(*Workflow), false); err != nil {
if code, err := wfa.execution(res.GetID(), workflow, false); err != nil {
return nil, code, err
}
wfa.execute(res.(*Workflow), false, false) // store the workspace for the workflow
wfa.execute(workflow, false, false) // store the workspace for the workflow
return res, code, nil
}
@@ -300,7 +322,7 @@ func (wfa *workflowMongoAccessor) execute(workflow *Workflow, delete bool, activ
Processings: workflow.Processings,
Storages: workflow.Storages,
Workflows: workflow.Workflows,
Datacenters: workflow.Datacenters,
Computes: workflow.Computes,
},
}, resource[0].GetID())
} else { // if the workspace does not exist, create it
@@ -312,7 +334,7 @@ func (wfa *workflowMongoAccessor) execute(workflow *Workflow, delete bool, activ
Processings: workflow.Processings,
Storages: workflow.Storages,
Workflows: workflow.Workflows,
Datacenters: workflow.Datacenters,
Computes: workflow.Computes,
},
})
}
@@ -327,8 +349,15 @@ func (wfa *workflowMongoAccessor) LoadOne(id string) (utils.DBObject, int, error
return nil, code, err
}
res_mongo.Decode(&workflow)
wfa.execute(&workflow, false, true) // if no workspace is attached to the workflow, create it
if workflow.ScheduleActive && workflow.Schedule != nil { // if the workflow is scheduled, update the executions
now := time.Now().UTC()
if (workflow.Schedule.End != nil && now.After(*workflow.Schedule.End)) || (workflow.Schedule.End == nil && workflow.Schedule.Start != nil && now.After(*workflow.Schedule.Start)) { // if the start date is passed, then you can book
workflow.ScheduleActive = false
wfa.GenericRawUpdateOne(&workflow, id, wfa)
} // if the start date is passed, update the executions
}
wfa.execute(&workflow, false, true) // if no workspace is attached to the workflow, create it
return &workflow, 200, nil
}

View File

@@ -18,6 +18,7 @@ const (
STARTED
FAILURE
SUCCESS
FORGOTTEN
)
var str = [...]string{
@@ -25,6 +26,7 @@ var str = [...]string{
"started",
"failure",
"success",
"forgotten",
}
func FromInt(i int) string {
@@ -81,7 +83,7 @@ type WorkflowExecution struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
ExecDate *time.Time `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` // ExecDate is the execution date of the workflow, is required
EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` // EndDate is the end date of the workflow
State int64 `json:"state,omitempty" bson:"state,omitempty"` // State is the state of the workflow
State ScheduledType `json:"state" bson:"state" default:"0"` // State is the state of the workflow
WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow
}
@@ -94,13 +96,13 @@ func (wfa *WorkflowExecution) ArgoStatusToState(status string) *WorkflowExecutio
status = strings.ToLower(status)
switch status {
case "succeeded": // Succeeded
wfa.State = int64(SUCCESS.EnumIndex())
wfa.State = SUCCESS
case "pending": // Pending
wfa.State = int64(SCHEDULED.EnumIndex())
wfa.State = SCHEDULED
case "running": // Running
wfa.State = int64(STARTED.EnumIndex())
wfa.State = STARTED
default: // Failed
wfa.State = int64(FAILURE.EnumIndex())
wfa.State = FAILURE
}
return wfa
}

View File

@@ -1,6 +1,8 @@
package workflow_execution
import (
"time"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/dbs/mongo"
"cloud.o-forge.io/core/oc-lib/models/utils"
@@ -38,10 +40,14 @@ func (wfa *workflowExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, i
return nil, code, err
}
res_mongo.Decode(&workflow)
if workflow.State == SCHEDULED && time.Now().UTC().After(*workflow.ExecDate) {
workflow.State = FORGOTTEN
wfa.GenericRawUpdateOne(&workflow, id, wfa)
}
return &workflow, 200, nil
}
func (wfa workflowExecutionMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) {
func (wfa *workflowExecutionMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) {
objs := []utils.ShallowDBObject{}
res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType())
if err != nil {
@@ -53,6 +59,10 @@ func (wfa workflowExecutionMongoAccessor) LoadAll() ([]utils.ShallowDBObject, in
return nil, 404, err
}
for _, r := range results {
if r.State == SCHEDULED && time.Now().UTC().After(*r.ExecDate) {
r.State = FORGOTTEN
wfa.GenericRawUpdateOne(&r, r.UUID, wfa)
}
objs = append(objs, &r.AbstractObject)
}
return objs, 200, nil
@@ -78,6 +88,10 @@ func (wfa *workflowExecutionMongoAccessor) Search(filters *dbs.Filters, search s
return nil, 404, err
}
for _, r := range results {
if r.State == SCHEDULED && time.Now().UTC().After(*r.ExecDate) {
r.State = FORGOTTEN
wfa.GenericRawUpdateOne(&r, r.UUID, wfa)
}
objs = append(objs, &r)
}
return objs, 200, nil

View File

@@ -12,7 +12,7 @@ import (
// Workspace is a struct that represents a workspace
type Workspace struct {
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
resources.ResourceSet // ResourceSet contains the resources of the workspace (data, datacenter, processing, storage, workflow)
resources.ResourceSet // ResourceSet contains the resources of the workspace (data, compute, processing, storage, workflow)
IsContextual bool `json:"is_contextual" bson:"is_contextual" default:"false"` // IsContextual is a flag that indicates if the workspace is contextual
Active bool `json:"active" bson:"active" default:"false"` // Active is a flag that indicates if the workspace is active
Shared string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workspace

View File

@@ -2,13 +2,14 @@ package workspace
import (
"errors"
"fmt"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/dbs/mongo"
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources/compute"
"cloud.o-forge.io/core/oc-lib/models/resources/data"
"cloud.o-forge.io/core/oc-lib/models/resources/datacenter"
"cloud.o-forge.io/core/oc-lib/models/resources/processing"
"cloud.o-forge.io/core/oc-lib/models/resources/storage"
w "cloud.o-forge.io/core/oc-lib/models/resources/workflow"
@@ -40,7 +41,7 @@ func (wfa *workspaceMongoAccessor) DeleteOne(id string) (utils.DBObject, int, er
func (wfa *workspaceMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
d := set.(*Workspace) // Get the workspace from the set
d.DataResources = nil // Reset the resources
d.DatacenterResources = nil
d.ComputeResources = nil
d.StorageResources = nil
d.ProcessingResources = nil
d.WorkflowResources = nil
@@ -76,7 +77,7 @@ func (wfa *workspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject
// reset the resources
d := data.(*Workspace)
d.DataResources = nil
d.DatacenterResources = nil
d.ComputeResources = nil
d.StorageResources = nil
d.ProcessingResources = nil
d.WorkflowResources = nil
@@ -102,13 +103,13 @@ func (wfa *workspaceMongoAccessor) fill(workflow *Workspace) *Workspace {
}
}
}
// Fill the workspace with the datacenters
if workflow.Datacenters != nil && len(workflow.Datacenters) > 0 {
dataAccessor := (&datacenter.DatacenterResource{}).GetAccessor(nil)
for _, id := range workflow.Datacenters {
// Fill the workspace with the computes
if workflow.Computes != nil && len(workflow.Computes) > 0 {
dataAccessor := (&compute.ComputeResource{}).GetAccessor(nil)
for _, id := range workflow.Computes {
d, _, e := dataAccessor.LoadOne(id)
if e == nil {
workflow.DatacenterResources = append(workflow.DatacenterResources, d.(*datacenter.DatacenterResource))
workflow.ComputeResources = append(workflow.ComputeResources, d.(*compute.ComputeResource))
}
}
}
@@ -205,7 +206,8 @@ func (wfa *workspaceMongoAccessor) Search(filters *dbs.Filters, search string) (
This function is used to share the workspace with the peers
*/
func (wfa *workspaceMongoAccessor) share(realData *Workspace, method tools.METHOD, caller *tools.HTTPCaller) {
if realData.Shared == "" || caller.Disabled {
fmt.Println("Sharing workspace", realData, caller)
if realData == nil || realData.Shared == "" || caller == nil || caller.Disabled {
return
}
access := (&shallow_collaborative_area.ShallowCollaborativeArea{}).GetAccessor(nil)
@@ -217,7 +219,7 @@ func (wfa *workspaceMongoAccessor) share(realData *Workspace, method tools.METHO
paccess := &peer.Peer{}
for _, p := range res.(*shallow_collaborative_area.ShallowCollaborativeArea).Peers {
paccess.UUID = p
if paccess.IsMySelf() { // If the peer is the current peer, never share because it will create a loop
if ok, _ := paccess.IsMySelf(); ok { // If the peer is the current peer, never share because it will create a loop
continue
}
if method == tools.DELETE { // If the workspace is deleted, share the deletion

View File

@@ -1,30 +0,0 @@
package static
/*
This package contains static data for the peer model
It's used to test the peer model
Temporary version, will be removed in the future and replaced with a more dynamic solution
to generate the data
*/
// GetMyLocalBsonPeer returns a tuple with the peer ID and the peer data in BSON format
func GetMyLocalBsonPeer() (string, map[string]interface{}) {
return "6fd0134c-fefc-427e-94c2-e01365fc5fb0", map[string]interface{}{
"abstractobject": map[string]interface{}{
"id": "6fd0134c-fefc-427e-94c2-e01365fc5fb0",
"name": "local_peer",
},
"url": "http://localhost",
"public_key": "public_key_lulz",
}
}
// GetMyLocalJsonPeer returns a tuple with the peer ID and the peer data in JSON format
func GetMyLocalJsonPeer() (string, map[string]interface{}) {
return "6fd0134c-fefc-427e-94c2-e01365fc5fb0", map[string]interface{}{
"id": "6fd0134c-fefc-427e-94c2-e01365fc5fb0",
"name": "local_peer",
"url": "http://localhost",
"public_key": "public_key_lulz",
}
}

View File

@@ -3,9 +3,12 @@ package tools
import (
"encoding/json"
"errors"
"fmt"
"strings"
"cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/dbs/mongo"
beego "github.com/beego/beego/v2/server/web"
)
/*
@@ -49,6 +52,16 @@ func (s State) String() string {
type API struct{}
func (a *API) Discovered(infos []*beego.ControllerInfo) {
respondToDiscovery := func(m map[string]interface{}) {
if len(m) == 0 {
a.SubscribeRouter(infos)
}
}
a.ListenRouter(respondToDiscovery)
a.SubscribeRouter(infos)
}
// GetState returns the state of the API
func (a *API) GetState() (State, int, error) {
// Check if the database is up
@@ -70,12 +83,35 @@ func (a *API) GetState() (State, int, error) {
return ALIVE, 200, nil // If everything is up, return alive
}
func (a *API) ListenRouter(exec func(msg map[string]interface{})) {
go NewNATSCaller().ListenNats(DISCOVERY.GenerateKey("api"), exec)
}
func (a *API) SubscribeRouter(infos []*beego.ControllerInfo) {
nats := NewNATSCaller()
discovery := map[string][]string{}
for _, info := range infos {
path := strings.ReplaceAll(info.GetPattern(), "/oc/", "/"+strings.ReplaceAll(config.GetAppName(), "oc-", ""))
for k, v := range info.GetMethod() {
if discovery[path] == nil {
discovery[path] = []string{}
}
if strings.Contains(strings.ToLower(v), "internal") {
discovery[path] = append(discovery[path], "INTERNAL"+k)
} else {
discovery[path] = append(discovery[path], k)
}
}
}
nats.SetNATSPub("api", DISCOVERY, discovery)
}
// CheckRemotePeer checks the state of a remote peer
func (a *API) CheckRemotePeer(url string) (State, map[string]int) {
// Check if the database is up
caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller
var resp APIStatusResponse
b, err := caller.CallPost(url, "/status", map[string]interface{}{}) // Call the status endpoint of the peer
b, err := caller.CallPost(url, "", map[string]interface{}{}) // Call the status endpoint of the peer
if err != nil {
return DEAD, map[string]int{} // If the peer is not reachable, return dead
}
@@ -91,7 +127,7 @@ func (a *API) CheckRemotePeer(url string) (State, map[string]int) {
}
// CheckRemoteAPIs checks the state of remote APIs from your proper OC
func (a *API) CheckRemoteAPIs(urls map[string]string) (State, map[string]string, error) {
func (a *API) CheckRemoteAPIs(apis []DataType) (State, map[string]string, error) {
// Check if the database is up
new := map[string]string{}
caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller
@@ -99,19 +135,20 @@ func (a *API) CheckRemoteAPIs(urls map[string]string) (State, map[string]string,
e := ""
state := ALIVE
reachable := false
for appName, url := range urls { // Check the state of each remote API in the list
for _, api := range apis { // Check the state of each remote API in the list
var resp APIStatusResponse
b, err := caller.CallGet(url, "/version/status") // Call the status endpoint of the remote API (standard OC status endpoint)
b, err := caller.CallGet("http://"+api.API()+":8080", "/oc/version/status") // Call the status endpoint of the remote API (standard OC status endpoint)
if err != nil {
state = REDUCED_SERVICE // If a remote API is not reachable, return reduced service
continue
}
json.Unmarshal(b, &resp)
fmt.Println(string(b))
if resp.Data == nil { //
state = REDUCED_SERVICE // If the response is empty, return reduced service
continue
}
new[appName] = resp.Data.State
new[api.String()] = resp.Data.State
if resp.Data.Code > code {
code = resp.Data.Code
e += resp.Error

View File

@@ -8,7 +8,7 @@ const (
DATA_RESOURCE
PROCESSING_RESOURCE
STORAGE_RESOURCE
DATACENTER_RESOURCE
COMPUTE_RESOURCE
WORKFLOW_RESOURCE
WORKFLOW
WORKFLOW_EXECUTION
@@ -27,7 +27,7 @@ var CATALOGAPI = "oc-catalog"
var SHAREDAPI = "oc-shared"
var WORKFLOWAPI = "oc-workflow"
var WORKSPACEAPI = "oc-workspace"
var PEERSAPI = "oc-peers"
var PEERSAPI = "oc-peer"
var DATACENTERAPI = "oc-datacenter"
// Bind the standard API name to the data type
@@ -41,7 +41,7 @@ var DefaultAPI = [...]string{
WORKFLOWAPI,
NOAPI,
WORKSPACEAPI,
NOAPI,
CATALOGAPI,
PEERSAPI,
SHAREDAPI,
SHAREDAPI,
@@ -56,14 +56,14 @@ var Str = [...]string{
"data_resource",
"processing_resource",
"storage_resource",
"datacenter_resource",
"compute_resource",
"workflow_resource",
"workflow",
"workflow_execution",
"workspace",
"resource_model",
"peer",
"shared_workspace",
"collaborative_area",
"rule",
"booking",
"workflow_history",

View File

@@ -5,6 +5,7 @@ import (
"strings"
"cloud.o-forge.io/core/oc-lib/config"
"cloud.o-forge.io/core/oc-lib/logs"
"github.com/nats-io/nats.go"
)
@@ -14,6 +15,7 @@ type NATSMethod int
const (
REMOVE NATSMethod = iota
CREATE
DISCOVERY
)
// NameToMethod returns the NATSMethod enum value from a string
@@ -43,6 +45,33 @@ func NewNATSCaller() *natsCaller {
return &natsCaller{}
}
// on workflows' scheduling. Messages must contain
// workflow execution ID, to allow retrieval of execution infos
func (s *natsCaller) ListenNats(chanName string, exec func(msg map[string]interface{})) {
log := logs.GetLogger()
if config.GetConfig().NATSUrl == "" {
log.Error().Msg(" -> NATS_SERVER is not set")
return
}
nc, err := nats.Connect(config.GetConfig().NATSUrl)
if err != nil {
log.Error().Msg(" -> Could not reach NATS server : " + err.Error())
return
}
ch := make(chan *nats.Msg, 64)
subs, err := nc.ChanSubscribe(chanName, ch)
if err != nil {
log.Error().Msg("Error listening to NATS : " + err.Error())
}
defer subs.Unsubscribe()
for msg := range ch {
map_mess := map[string]interface{}{}
json.Unmarshal(msg.Data, &map_mess)
exec(map_mess)
}
}
// SetNATSPub sets a message to the NATS server
func (o *natsCaller) SetNATSPub(dataName string, method NATSMethod, data interface{}) string {
if config.GetConfig().NATSUrl == "" {

View File

@@ -5,6 +5,8 @@ import (
"encoding/json"
"io"
"net/http"
"net/url"
"strings"
)
// HTTP Method Enum defines the different methods that can be used to interact with the HTTP server
@@ -15,11 +17,16 @@ const (
PUT
POST
DELETE
STRICT_INTERNAL_GET
STRICT_INTERNAL_PUT
STRICT_INTERNAL_POST
STRICT_INTERNAL_DELETE
)
// String returns the string of the enum
func (m METHOD) String() string {
return [...]string{"GET", "PUT", "POST", "DELETE"}[m]
return [...]string{"GET", "PUT", "POST", "DELETE", "INTERNALGET", "INTERNALPUT", "INTERNALPOST", "INTERNALDELETE"}[m]
}
// EnumIndex returns the index of the enum
@@ -29,7 +36,8 @@ func (m METHOD) EnumIndex() int {
// ToMethod returns the method from a string
func ToMethod(str string) METHOD {
for _, s := range []METHOD{GET, PUT, POST, DELETE} {
for _, s := range []METHOD{GET, PUT, POST, DELETE,
STRICT_INTERNAL_GET, STRICT_INTERNAL_PUT, STRICT_INTERNAL_POST, STRICT_INTERNAL_DELETE} {
if s.String() == str {
return s
}
@@ -53,8 +61,16 @@ func NewHTTPCaller(urls map[DataType]map[METHOD]string) *HTTPCaller {
}
// CallGet calls the GET method on the HTTP server
func (caller *HTTPCaller) CallGet(url string, subpath string) ([]byte, error) {
resp, err := http.Get(url + subpath)
func (caller *HTTPCaller) CallGet(url string, subpath string, types ...string) ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, url+subpath, bytes.NewBuffer([]byte("")))
if err != nil {
return nil, err
}
for _, t := range types {
req.Header.Set("Content-Type", t)
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
@@ -65,7 +81,7 @@ func (caller *HTTPCaller) CallGet(url string, subpath string) ([]byte, error) {
// CallPut calls the DELETE method on the HTTP server
func (caller *HTTPCaller) CallDelete(url string, subpath string) ([]byte, error) {
resp, err := http.NewRequest("DELETE", url+subpath, nil)
if err != nil {
if err != nil || resp == nil || resp.Body == nil {
return nil, err
}
defer resp.Body.Close()
@@ -73,10 +89,32 @@ func (caller *HTTPCaller) CallDelete(url string, subpath string) ([]byte, error)
}
// CallPost calls the POST method on the HTTP server
func (caller *HTTPCaller) CallPost(url string, subpath string, body map[string]interface{}) ([]byte, error) {
func (caller *HTTPCaller) CallPost(url string, subpath string, body map[string]interface{}, types ...string) ([]byte, error) {
postBody, _ := json.Marshal(body)
responseBody := bytes.NewBuffer(postBody)
resp, err := http.Post(url+subpath, "application/json", responseBody)
contentType := "application/json"
if len(types) > 0 {
contentType = types[0]
}
resp, err := http.Post(url+subpath, contentType, responseBody)
if err != nil || resp == nil || resp.Body == nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
// CallPost calls the POST method on the HTTP server
func (caller *HTTPCaller) CallPut(url string, subpath string, body map[string]interface{}) ([]byte, error) {
postBody, _ := json.Marshal(body)
responseBody := bytes.NewBuffer(postBody)
req, err := http.NewRequest(http.MethodPut, url+subpath, responseBody)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
@@ -84,4 +122,40 @@ func (caller *HTTPCaller) CallPost(url string, subpath string, body map[string]i
return io.ReadAll(resp.Body)
}
// NO PUT IN HERE TO DANGEROUS TO USE ON A REMOTE SERVER, MAYBE NEEDED IN THE FUTURE
// CallRaw calls the Raw method on the HTTP server
func (caller *HTTPCaller) CallRaw(method string, url string, subpath string,
body map[string]interface{}, content_type string, fakeTLSTermination bool, cookies ...*http.Cookie) (*http.Response, error) {
postBody, _ := json.Marshal(body)
responseBody := bytes.NewBuffer(postBody)
req, err := http.NewRequest(method, url+subpath, responseBody)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", content_type)
if fakeTLSTermination {
req.Header.Add("X-Forwarded-Proto", "https")
}
for _, c := range cookies {
req.AddCookie(c)
}
client := &http.Client{}
return client.Do(req)
}
// CallRaw calls the Raw method on the HTTP server
func (caller *HTTPCaller) CallForm(method string, url string, subpath string,
body url.Values, content_type string, fakeTLSTermination bool, cookies ...*http.Cookie) (*http.Response, error) {
req, err := http.NewRequest(method, url+subpath, strings.NewReader(body.Encode()))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", content_type)
if fakeTLSTermination {
req.Header.Add("X-Forwarded-Proto", "https")
}
for _, c := range cookies {
req.AddCookie(c)
}
client := &http.Client{}
return client.Do(req)
}