Live Structure

This commit is contained in:
mr
2025-06-24 11:29:04 +02:00
parent 2c8dcbe93d
commit cd177bd779
21 changed files with 350 additions and 209 deletions

View File

@@ -15,9 +15,9 @@ type billMongoAccessor struct {
func NewAccessor(request *tools.APIRequest) *billMongoAccessor {
return &billMongoAccessor{
AbstractAccessor: utils.AbstractAccessor{
Logger: logs.CreateLogger(tools.COMPUTE_UNITS.String()), // Create a logger with the data type
Logger: logs.CreateLogger(tools.LIVE_DATACENTER.String()), // Create a logger with the data type
Request: request,
Type: tools.COMPUTE_UNITS,
Type: tools.LIVE_DATACENTER,
},
}
}

View File

@@ -1,58 +0,0 @@
package compute_units
import (
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/biter777/countries"
)
/*
* ComputeUnits is a struct that represents a compute units in your datacenters
*/
type ComputeUnitsCerts struct {
Host string `json:"host,omitempty" bson:"host,omitempty"`
Port string `json:"port,omitempty" bson:"port,omitempty"`
// for now only Kubernetes
CAData string `json:"ca_data,omitempty" bson:"ca_data,omitempty"`
CertData string `json:"cert_data,omitempty" bson:"cert_data,omitempty"`
KeyData string `json:"key_data,omitempty" bson:"key_data,omitempty"`
}
// TODO in the future multiple type of certs depending of infra type
type ComputeUnits struct {
utils.AbstractObject
Certs ComputeUnitsCerts `json:"certs,omitempty" bson:"certs,omitempty"`
MonitorPath string `json:"monitor_path,omitempty" bson:"monitor_path,omitempty"`
Location resources.GeoPoint `json:"location,omitempty" bson:"location,omitempty"`
Country countries.CountryCode `json:"country,omitempty" bson:"country,omitempty"`
AccessProtocol string `json:"access_protocol,omitempty" bson:"access_protocol,omitempty"`
Architecture string `json:"architecture,omitempty" bson:"architecture,omitempty"` // Architecture is the architecture
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
Source string `json:"source,omitempty" bson:"source,omitempty"` // Source is the source of the resource
SecurityLevel string `json:"security_level,omitempty" bson:"security_level,omitempty"`
PowerSources []string `json:"power_sources,omitempty" bson:"power_sources,omitempty"`
AnnualCO2Emissions float64 `json:"annual_co2_emissions,omitempty" bson:"co2_emissions,omitempty"`
CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model
GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model
Nodes []*resources.ComputeNode `json:"nodes,omitempty" bson:"nodes,omitempty"`
ResourceID string `json:"resource_id" bson:"resource_id"`
}
func (r *ComputeUnits) StoreDraftDefault() {
r.IsDraft = true
}
func (r *ComputeUnits) CanDelete() bool {
return r.IsDraft // only draft ComputeUnits can be deleted
}
func (d *ComputeUnits) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor(request) // Create a new instance of the accessor
}

View File

@@ -1,108 +0,0 @@
package compute_units
import (
"encoding/json"
"errors"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
type computeUnitsMongoAccessor struct {
utils.AbstractAccessor // AbstractAccessor contains the basic fields of an accessor (model, caller)
}
// New creates a new instance of the computeUnitsMongoAccessor
func NewAccessor(request *tools.APIRequest) *computeUnitsMongoAccessor {
return &computeUnitsMongoAccessor{
AbstractAccessor: utils.AbstractAccessor{
Logger: logs.CreateLogger(tools.COMPUTE_UNITS.String()), // Create a logger with the data type
Request: request,
Type: tools.COMPUTE_UNITS,
},
}
}
/*
* Nothing special here, just the basic CRUD operations
*/
func (a *computeUnitsMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) {
return utils.GenericDeleteOne(id, a)
}
func (a *computeUnitsMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
// should verify if a source is existing...
return utils.GenericUpdateOne(set, id, a, &ComputeUnits{})
}
func (a *computeUnitsMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
return utils.GenericStoreOne(data.(*ComputeUnits), a)
}
func (a *computeUnitsMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
// is a publisher... that become a resources.
if data.IsDrafted() {
return nil, 422, errors.New("can't publish a drafted compute units")
}
computeUnits := data.(*ComputeUnits)
if computeUnits.MonitorPath == "" || computeUnits.GetID() != "" {
return nil, 422, errors.New("publishing is only allowed is it can be monitored and be accessible")
}
if res, code, err := a.LoadOne(computeUnits.GetID()); err != nil {
return nil, code, err
} else {
computeUnits = res.(*ComputeUnits)
}
resAccess := resources.NewAccessor[*resources.ComputeResource](tools.COMPUTE_RESOURCE, a.Request, func() utils.DBObject { return &resources.ComputeResource{} })
var instance *resources.ComputeResourceInstance
b, _ := json.Marshal(computeUnits)
json.Unmarshal(b, instance)
if computeUnits.ResourceID != "" {
// TODO dependent of a existing resource
res, code, err := resAccess.LoadOne(computeUnits.ResourceID)
if err == nil {
return nil, code, err
}
existingComputeResource := res.(*resources.ComputeResource)
if existingComputeResource.Architecture != computeUnits.Architecture || existingComputeResource.Infrastructure != computeUnits.Infrastructure {
return nil, 422, errors.New("should be same architecture & infrastructure from the resource")
}
existingComputeResource.Instances = append(existingComputeResource.Instances, instance)
return resAccess.UpdateOne(existingComputeResource, existingComputeResource.UUID)
} else {
var r resources.ComputeResource
b, _ := json.Marshal(computeUnits)
json.Unmarshal(b, &r)
r.Instances = append(r.Instances, instance)
res, code, err := resAccess.StoreOne(&r)
if err != nil {
return nil, code, err
}
computeUnits.ResourceID = res.GetID()
return a.UpdateOne(computeUnits, computeUnits.GetID())
}
}
func (a *computeUnitsMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne[*ComputeUnits](id, func(d utils.DBObject) (utils.DBObject, int, error) {
return d, 200, nil
}, a)
}
func (a *computeUnitsMongoAccessor) LoadAll(isDraft bool) ([]utils.ShallowDBObject, int, error) {
return utils.GenericLoadAll[*ComputeUnits](a.getExec(), isDraft, a)
}
func (a *computeUnitsMongoAccessor) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) {
return utils.GenericSearch[*ComputeUnits](filters, search, (&ComputeUnits{}).GetObjectFilters(search), a.getExec(), isDraft, a)
}
func (a *computeUnitsMongoAccessor) getExec() func(utils.DBObject) utils.ShallowDBObject {
return func(d utils.DBObject) utils.ShallowDBObject {
return d
}
}

14
models/live/interfaces.go Executable file
View File

@@ -0,0 +1,14 @@
package live
import (
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
type LiveInterface interface {
GetResourceAccessor(request *tools.APIRequest) utils.Accessor
GetResource() resources.ResourceInterface
GetResourceInstance() resources.ResourceInstanceITF
SetResourceInstance(res resources.ResourceInterface, i resources.ResourceInstanceITF) resources.ResourceInterface
}

55
models/live/live.go Normal file
View File

@@ -0,0 +1,55 @@
package live
import (
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/biter777/countries"
)
/*
* LiveDatacenter is a struct that represents a compute units in your datacenters
*/
type Credentials struct {
Login string `json:"login,omitempty" bson:"login,omitempty"`
Pass string `json:"password,omitempty" bson:"password,omitempty"`
Token string `json:"token,omitempty" bson:"token,omitempty"`
}
type Certs struct {
AuthorityCertificate string `json:"authority_certificate,omitempty" bson:"authority_certificate,omitempty"`
ClientCertificate string `json:"client_certificate,omitempty" bson:"client_certificate,omitempty"`
}
type LiveCerts struct {
Host string `json:"host,omitempty" bson:"host,omitempty"`
Port string `json:"port,omitempty" bson:"port,omitempty"`
Certificates *Certs `json:"certs,omitempty" bson:"certs,omitempty"`
Credentials *Credentials `json:"creds,omitempty" bson:"creds,omitempty"`
}
// TODO in the future multiple type of certs depending of infra type
type AbstractLive struct {
utils.AbstractObject
Certs LiveCerts `json:"certs,omitempty" bson:"certs,omitempty"`
MonitorPath string `json:"monitor_path,omitempty" bson:"monitor_path,omitempty"`
Location resources.GeoPoint `json:"location,omitempty" bson:"location,omitempty"`
Country countries.CountryCode `json:"country,omitempty" bson:"country,omitempty"`
AccessProtocol string `json:"access_protocol,omitempty" bson:"access_protocol,omitempty"`
ResourcesID []string `json:"resources_id" bson:"resources_id"`
}
func (r *AbstractLive) GetResourceType() tools.DataType {
return tools.INVALID
}
func (r *AbstractLive) StoreDraftDefault() {
r.IsDraft = true
}
func (r *AbstractLive) CanDelete() bool {
return r.IsDraft // only draft ComputeUnits can be deleted
}

View File

@@ -0,0 +1,50 @@
package live
import (
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
/*
* LiveDatacenter is a struct that represents a compute units in your datacenters
*/
type LiveDatacenter struct {
AbstractLive
StorageType enum.StorageType `bson:"storage_type" json:"storage_type" default:"-1"` // Type is the type of the storage
Acronym string `bson:"acronym,omitempty" json:"acronym,omitempty"` // Acronym is the acronym of the storage
Architecture string `json:"architecture,omitempty" bson:"architecture,omitempty"` // Architecture is the architecture
Infrastructure enum.InfrastructureType `json:"infrastructure" bson:"infrastructure" default:"-1"` // Infrastructure is the infrastructure
Source string `json:"source,omitempty" bson:"source,omitempty"` // Source is the source of the resource
SecurityLevel string `json:"security_level,omitempty" bson:"security_level,omitempty"`
PowerSources []string `json:"power_sources,omitempty" bson:"power_sources,omitempty"`
AnnualCO2Emissions float64 `json:"annual_co2_emissions,omitempty" bson:"co2_emissions,omitempty"`
CPUs map[string]*models.CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs key is model
GPUs map[string]*models.GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs key is model
Nodes []*resources.ComputeNode `json:"nodes,omitempty" bson:"nodes,omitempty"`
}
func (d *LiveDatacenter) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*LiveDatacenter](request) // Create a new instance of the accessor
}
func (d *LiveDatacenter) GetResourceAccessor(request *tools.APIRequest) utils.Accessor {
return resources.NewAccessor[*resources.ComputeResource](tools.COMPUTE_RESOURCE, request, func() utils.DBObject { return &resources.ComputeResource{} })
}
func (d *LiveDatacenter) GetResource() resources.ResourceInterface {
return &resources.ComputeResource{}
}
func (d *LiveDatacenter) GetResourceInstance() resources.ResourceInstanceITF {
return &resources.ComputeResourceInstance{}
}
func (d *LiveDatacenter) SetResourceInstance(res resources.ResourceInterface, i resources.ResourceInstanceITF) resources.ResourceInterface {
r := res.(*resources.ComputeResource)
r.Instances = append(r.Instances, i.(*resources.ComputeResourceInstance))
return r
}

View File

@@ -0,0 +1,122 @@
package live
import (
"encoding/json"
"errors"
"slices"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
type computeUnitsMongoAccessor[T LiveInterface] struct {
utils.AbstractAccessor // AbstractAccessor contains the basic fields of an accessor (model, caller)
}
// New creates a new instance of the computeUnitsMongoAccessor
func NewAccessor[T LiveInterface](request *tools.APIRequest) *computeUnitsMongoAccessor[T] {
return &computeUnitsMongoAccessor[T]{
AbstractAccessor: utils.AbstractAccessor{
Logger: logs.CreateLogger(tools.LIVE_DATACENTER.String()), // Create a logger with the data type
Request: request,
Type: tools.LIVE_DATACENTER,
},
}
}
/*
* Nothing special here, just the basic CRUD operations
*/
func (a *computeUnitsMongoAccessor[T]) DeleteOne(id string) (utils.DBObject, int, error) {
return utils.GenericDeleteOne(id, a)
}
func (a *computeUnitsMongoAccessor[T]) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
// should verify if a source is existing...
return utils.GenericUpdateOne(set, id, a, &LiveDatacenter{})
}
func (a *computeUnitsMongoAccessor[T]) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
return utils.GenericStoreOne(data.(*LiveDatacenter), a)
}
func (a *computeUnitsMongoAccessor[T]) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
// is a publisher... that become a resources.
if data.IsDrafted() {
return nil, 422, errors.New("can't publish a drafted compute units")
}
datacenterUnits := data.(*LiveDatacenter)
if datacenterUnits.MonitorPath == "" || datacenterUnits.GetID() != "" {
return nil, 422, errors.New("publishing is only allowed is it can be monitored and be accessible")
}
if res, code, err := a.LoadOne(datacenterUnits.GetID()); err != nil {
return nil, code, err
} else {
datacenterUnits = res.(*LiveDatacenter)
}
resAccess := datacenterUnits.GetResourceAccessor(a.Request)
instance := datacenterUnits.GetResourceInstance()
b, _ := json.Marshal(datacenterUnits)
json.Unmarshal(b, instance)
if len(datacenterUnits.ResourcesID) > 0 {
for _, r := range datacenterUnits.ResourcesID {
// TODO dependent of a existing resource
res, code, err := resAccess.LoadOne(r)
if err == nil {
return nil, code, err
}
existingComputeResource := res.(*resources.ComputeResource)
if existingComputeResource.Architecture != datacenterUnits.Architecture || existingComputeResource.Infrastructure != datacenterUnits.Infrastructure {
return nil, 422, errors.New("should be same architecture & infrastructure from the resource")
}
datacenterUnits.SetResourceInstance(existingComputeResource, instance)
resAccess.UpdateOne(existingComputeResource, existingComputeResource.UUID)
}
if datacenterUnits.GetID() != "" {
return a.LoadOne(datacenterUnits.GetID())
} else {
return a.StoreOne(datacenterUnits)
}
} else {
r := datacenterUnits.GetResource()
b, _ := json.Marshal(datacenterUnits)
json.Unmarshal(b, &r)
datacenterUnits.SetResourceInstance(r, instance)
res, code, err := resAccess.StoreOne(r)
if err != nil {
return nil, code, err
}
if !slices.Contains(datacenterUnits.ResourcesID, res.GetID()) {
datacenterUnits.ResourcesID = append(datacenterUnits.ResourcesID, res.GetID())
}
if datacenterUnits.GetID() != "" {
return a.UpdateOne(datacenterUnits, datacenterUnits.GetID())
} else {
return a.StoreOne(datacenterUnits)
}
}
}
func (a *computeUnitsMongoAccessor[T]) LoadOne(id string) (utils.DBObject, int, error) {
return utils.GenericLoadOne[*LiveDatacenter](id, func(d utils.DBObject) (utils.DBObject, int, error) {
return d, 200, nil
}, a)
}
func (a *computeUnitsMongoAccessor[T]) LoadAll(isDraft bool) ([]utils.ShallowDBObject, int, error) {
return utils.GenericLoadAll[*LiveDatacenter](a.getExec(), isDraft, a)
}
func (a *computeUnitsMongoAccessor[T]) Search(filters *dbs.Filters, search string, isDraft bool) ([]utils.ShallowDBObject, int, error) {
return utils.GenericSearch[*LiveDatacenter](filters, search, (&LiveDatacenter{}).GetObjectFilters(search), a.getExec(), isDraft, a)
}
func (a *computeUnitsMongoAccessor[T]) getExec() func(utils.DBObject) utils.ShallowDBObject {
return func(d utils.DBObject) utils.ShallowDBObject {
return d
}
}

View File

@@ -0,0 +1,46 @@
package live
import (
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
/*
* LiveStorage is a struct that represents a compute units in your datacenters
*/
type LiveStorage struct {
AbstractLive
Source string `bson:"source,omitempty" json:"source,omitempty"` // Source is the source of the storage
Path string `bson:"path,omitempty" json:"path,omitempty"` // Path is the store folders in the source
Local bool `bson:"local" json:"local"`
SecurityLevel string `bson:"security_level,omitempty" json:"security_level,omitempty"`
SizeType enum.StorageSize `bson:"size_type" json:"size_type" default:"0"` // SizeType is the type of the storage size
SizeGB int64 `bson:"size,omitempty" json:"size,omitempty"` // Size is the size of the storage
Encryption bool `bson:"encryption,omitempty" json:"encryption,omitempty"` // Encryption is a flag that indicates if the storage is encrypted
Redundancy string `bson:"redundancy,omitempty" json:"redundancy,omitempty"` // Redundancy is the redundancy of the storage
Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` // Throughput is the throughput of the storage
}
func (d *LiveStorage) GetAccessor(request *tools.APIRequest) utils.Accessor {
return NewAccessor[*LiveStorage](request) // Create a new instance of the accessor
}
func (d *LiveStorage) GetResourceAccessor(request *tools.APIRequest) utils.Accessor {
return resources.NewAccessor[*resources.ComputeResource](tools.STORAGE_RESOURCE, request, func() utils.DBObject { return &resources.StorageResource{} })
}
func (d *LiveStorage) GetResource() resources.ResourceInterface {
return &resources.StorageResource{}
}
func (d *LiveStorage) GetResourceInstance() resources.ResourceInstanceITF {
return &resources.StorageResourceInstance{}
}
func (d *LiveStorage) SetResourceInstance(res resources.ResourceInterface, i resources.ResourceInstanceITF) resources.ResourceInterface {
r := res.(*resources.StorageResource)
r.Instances = append(r.Instances, i.(*resources.StorageResourceInstance))
return r
}

View File

@@ -3,7 +3,7 @@ package models
import (
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/bill"
"cloud.o-forge.io/core/oc-lib/models/compute_units"
"cloud.o-forge.io/core/oc-lib/models/live/live_datacenter"
"cloud.o-forge.io/core/oc-lib/models/order"
"cloud.o-forge.io/core/oc-lib/models/resources/purchase_resource"
"cloud.o-forge.io/core/oc-lib/tools"
@@ -40,7 +40,7 @@ var ModelsCatalog = map[string]func() utils.DBObject{
tools.WORKSPACE_HISTORY.String(): func() utils.DBObject { return &w3.WorkspaceHistory{} },
tools.ORDER.String(): func() utils.DBObject { return &order.Order{} },
tools.PURCHASE_RESOURCE.String(): func() utils.DBObject { return &purchase_resource.PurchaseResource{} },
tools.COMPUTE_UNITS.String(): func() utils.DBObject { return &compute_units.ComputeUnits{} },
tools.LIVE_DATACENTER.String(): func() utils.DBObject { return &live_datacenter.LiveDatacenter{} },
tools.BILL.String(): func() utils.DBObject { return &bill.Bill{} },
}

View File

@@ -77,7 +77,7 @@ func (p *Peer) IsMySelf() (bool, string) {
}
// LaunchPeerExecution launches an execution on a peer
func (p *Peer) LaunchPeerExecution(peerID string, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
func (p *Peer) LaunchPeerExecution(peerID string, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, caller *tools.HTTPCaller) (map[string]interface{}, error) {
p.UUID = peerID
return cache.LaunchPeerExecution(peerID, dataID, dt, method, body, caller) // Launch the execution on the peer through the cache
}

View File

@@ -50,11 +50,11 @@ func (p *PeerCache) checkPeerStatus(peerID string, appName string) (*Peer, bool)
// LaunchPeerExecution launches an execution on a peer
// The method contacts the path described by : peer.Url + datatype path (from enums) + replacement of id by dataID
func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
dt tools.DataType, method tools.METHOD, body interface{}, caller tools.HTTPCallerITF) (*PeerExecution, error) {
dt tools.DataType, method tools.METHOD, body interface{}, caller tools.HTTPCallerITF) (map[string]interface{}, error) {
fmt.Println("Launching peer execution on", caller.GetUrls(), dt, method)
methods := caller.GetUrls()[dt] // Get the methods url of the data type
if m, ok := methods[method]; !ok || m == "" {
return nil, errors.New("Requested method " + method.String() + " not declared in HTTPCaller")
return map[string]interface{}{}, errors.New("Requested method " + method.String() + " not declared in HTTPCaller")
}
path := methods[method] // Get the path corresponding to the action we want to execute
path = strings.ReplaceAll(path, ":id", dataID) // Replace the id in the path in case of a DELETE / UPDATE method (it's a standard naming in OC)
@@ -72,10 +72,10 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
}
mypeer.AddExecution(*pexec)
NewShallowAccessor().UpdateOne(mypeer, peerID) // Update the peer in the db
return nil, errors.New("peer is " + peerID + " not reachable")
return map[string]interface{}{}, errors.New("peer is " + peerID + " not reachable")
} else {
if mypeer == nil {
return nil, errors.New("peer " + peerID + " not found")
return map[string]interface{}{}, errors.New("peer " + peerID + " not found")
}
// If the peer is reachable, launch the execution
url = p.urlFormat((mypeer.Url), dt) + path // Format the URL
@@ -86,11 +86,11 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
go p.Exec(v.Url, tools.ToMethod(v.Method), v.Body, caller)
}
}
return nil, p.Exec(url, method, body, caller) // Execute the method
return p.Exec(url, method, body, caller) // Execute the method
}
// exec executes the method on the peer
func (p *PeerCache) Exec(url string, method tools.METHOD, body interface{}, caller tools.HTTPCallerITF) error {
func (p *PeerCache) Exec(url string, method tools.METHOD, body interface{}, caller tools.HTTPCallerITF) (map[string]interface{}, error) {
var b []byte
var err error
if method == tools.POST { // Execute the POST method if it's a POST method
@@ -102,16 +102,16 @@ func (p *PeerCache) Exec(url string, method tools.METHOD, body interface{}, call
if method == tools.DELETE { // Execute the DELETE method if it's a DELETE method
b, err = caller.CallDelete(url, "")
}
if err != nil {
return err
}
var m map[string]interface{}
if err != nil {
return m, err
}
err = json.Unmarshal(b, &m)
if err != nil {
return err
return m, err
}
if e, ok := m["error"]; ok && e != "<nil>" && e != "" { // Check if there is an error in the response
return errors.New(fmt.Sprintf("%v", m["error"]))
return m, errors.New(fmt.Sprintf("%v", m["error"]))
}
return nil
return m, nil
}

View File

@@ -58,7 +58,7 @@ func TestExecSuccess(t *testing.T) {
data, _ := json.Marshal(response)
caller.On("CallPost", url, "", mock.Anything).Return(data, nil)
err := cache.Exec(url, tools.POST, map[string]string{"key": "value"}, caller)
_, err := cache.Exec(url, tools.POST, map[string]string{"key": "value"}, caller)
assert.NoError(t, err)
caller.AssertExpectations(t)
}
@@ -71,7 +71,7 @@ func TestExecReturnsErrorField(t *testing.T) {
data, _ := json.Marshal(response)
caller.On("CallPost", url, "", mock.Anything).Return(data, nil)
err := cache.Exec(url, tools.POST, map[string]string{"key": "value"}, caller)
_, err := cache.Exec(url, tools.POST, map[string]string{"key": "value"}, caller)
assert.Error(t, err)
assert.Equal(t, "something failed", err.Error())
}
@@ -81,7 +81,7 @@ func TestExecInvalidJSON(t *testing.T) {
caller := &MockHTTPCaller{}
url := "http://mockpeer/resource"
caller.On("CallPost", url, "", mock.Anything).Return([]byte("{invalid json}"), nil)
err := cache.Exec(url, tools.POST, map[string]string{"key": "value"}, caller)
_, err := cache.Exec(url, tools.POST, map[string]string{"key": "value"}, caller)
assert.Error(t, err)
assert.Contains(t, err.Error(), "invalid character")
}

View File

@@ -143,11 +143,6 @@ type GeoPoint struct {
Longitude float64 `json:"longitude,omitempty" bson:"longitude,omitempty"`
}
type Credentials struct {
Login string `json:"login,omitempty" bson:"login,omitempty"`
Pass string `json:"password,omitempty" bson:"password,omitempty"`
}
type ResourceInstance[T ResourcePartnerITF] struct {
utils.AbstractObject
Location GeoPoint `json:"location,omitempty" bson:"location,omitempty"`

View File

@@ -17,7 +17,11 @@ type ResourceMongoAccessor[T ResourceInterface] struct {
// New creates a new instance of the computeMongoAccessor
func NewAccessor[T ResourceInterface](t tools.DataType, request *tools.APIRequest, g func() utils.DBObject) *ResourceMongoAccessor[T] {
if !slices.Contains([]tools.DataType{tools.COMPUTE_RESOURCE, tools.STORAGE_RESOURCE, tools.PROCESSING_RESOURCE, tools.WORKFLOW_RESOURCE, tools.DATA_RESOURCE}, t) {
if !slices.Contains([]tools.DataType{
tools.COMPUTE_RESOURCE, tools.STORAGE_RESOURCE,
tools.PROCESSING_RESOURCE, tools.WORKFLOW_RESOURCE,
tools.DATA_RESOURCE,
}, t) {
return nil
}
return &ResourceMongoAccessor[T]{

View File

@@ -43,7 +43,6 @@ func (abs *StorageResource) ConvertToPricedResource(t tools.DataType, request *t
type StorageResourceInstance struct {
ResourceInstance[*StorageResourcePartnership]
Credentials *Credentials `json:"credentials,omitempty" bson:"credentials,omitempty"`
Source string `bson:"source,omitempty" json:"source,omitempty"` // Source is the source of the storage
Path string `bson:"path,omitempty" json:"path,omitempty"` // Path is the store folders in the source
Local bool `bson:"local" json:"local"`
@@ -56,7 +55,6 @@ type StorageResourceInstance struct {
}
func (ri *StorageResourceInstance) ClearEnv() {
ri.Credentials = nil
ri.Env = []models.Param{}
ri.Inputs = []models.Param{}
ri.Outputs = []models.Param{}

View File

@@ -39,7 +39,6 @@ func TestStorageResource_ConvertToPricedResource_InvalidType(t *testing.T) {
func TestStorageResourceInstance_ClearEnv(t *testing.T) {
inst := &resources.StorageResourceInstance{
Credentials: &resources.Credentials{Login: "test"},
ResourceInstance: resources.ResourceInstance[*resources.StorageResourcePartnership]{
Env: []models.Param{{Attr: "A"}},
Inputs: []models.Param{{Attr: "B"}},
@@ -48,7 +47,6 @@ func TestStorageResourceInstance_ClearEnv(t *testing.T) {
}
inst.ClearEnv()
assert.Nil(t, inst.Credentials)
assert.Empty(t, inst.Env)
assert.Empty(t, inst.Inputs)
assert.Empty(t, inst.Outputs)

View File

@@ -43,6 +43,10 @@ func (ri *AbstractObject) GetAccessor(request *tools.APIRequest) Accessor {
return nil
}
func (r *AbstractObject) SetID(id string) {
r.UUID = id
}
func (r *AbstractObject) GenerateID() {
if r.UUID == "" {
r.UUID = uuid.New().String()

View File

@@ -18,6 +18,7 @@ type ShallowDBObject interface {
// DBObject is an interface that defines the basic methods for a DBObject
type DBObject interface {
GenerateID()
SetID(id string)
GetID() string
GetName() string
IsDrafted() bool

View File

@@ -101,10 +101,6 @@ func (ws *WorkflowSchedule) GetBuyAndBook(wfID string, request *tools.APIRequest
}
}
if err := ws.GenerateOrder(purchased, bookings, request); err != nil {
return false, wf, execs, purchased, bookings, err
}
return true, wf, execs, purchased, bookings, nil
}
@@ -189,6 +185,12 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
for _, purchase := range purchases {
go ws.CallDatacenter(purchase, purchase.DestPeerID, tools.PURCHASE_RESOURCE, request, errCh, &m)
}
for i := 0; i < len(purchases); i++ {
if err := <-errCh; err != nil {
return ws, wf, executions, errors.New("could not launch the peer execution : " + fmt.Sprintf("%v", err))
}
}
errCh = make(chan error, len(bookings))
for _, booking := range bookings {
@@ -201,6 +203,10 @@ func (ws *WorkflowSchedule) Schedules(wfID string, request *tools.APIRequest) (*
}
}
if err := ws.GenerateOrder(purchases, bookings, request); err != nil {
return ws, wf, executions, err
}
fmt.Println("Schedules")
for _, exec := range executions {
err := exec.PurgeDraft(request)
@@ -222,9 +228,12 @@ func (ws *WorkflowSchedule) CallDatacenter(purchase utils.DBObject, destPeerID s
return
}
m.Unlock()
if _, err = (&peer.Peer{}).LaunchPeerExecution(destPeerID, "", dt, tools.POST, purchase.Serialize(purchase), &c); err != nil {
if res, err := (&peer.Peer{}).LaunchPeerExecution(destPeerID, "", dt, tools.POST, purchase.Serialize(purchase), &c); err != nil {
errCh <- err
return
} else {
data := res["data"].(map[string]interface{})
purchase.SetID(fmt.Sprintf("%v", data["id"]))
}
errCh <- nil
}