initial commit

This commit is contained in:
ycc
2023-03-03 14:43:11 +01:00
parent 7229007847
commit 88c21d1828
142 changed files with 13975 additions and 22 deletions

164
models/computing.go Normal file
View File

@@ -0,0 +1,164 @@
package models
import (
"cloud.o-forge.io/core/oc-catalog/models/rtype"
"cloud.o-forge.io/core/oc-catalog/services"
"github.com/beego/beego/v2/core/logs"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type ExecutionRequirementsModel struct {
CPUs uint `json:"cpus" required:"true"`
GPUs uint `json:"gpus" description:"Amount of GPUs needed"`
RAM uint `json:"ram" required:"true" description:"Units in MB"`
// We should check closely how to deal with storage, since they are independent models
// but also part of a DataCenter
// Storage uint `json:"storage" description:"Units in MB"`
Parallel bool `json:"parallel"`
ScalingModel uint `json:"scaling_model"`
DiskIO string `json:"disk_io"`
}
type RepositoryModel struct {
Credentials string `json:"credentials"`
Url string `json:"url"`
}
type ComputingNEWModel struct {
Description string `json:"description" required:"true"`
Name string `json:"name,omitempty" required:"true" validate:"required" description:"Name of the computing"`
ShortDescription string `json:"short_description" required:"true" validate:"required"`
Logo string `json:"logo" required:"true" validate:"required"`
Type string `json:"type,omitempty" required:"true"`
Owner string `json:"owner"`
License string `json:"license"`
Price uint `json:"price"`
ExecutionRequirements ExecutionRequirementsModel `json:"execution_requirements"`
Dinputs []string
Doutputs []string
Repository RepositoryModel `json:"repository"`
}
type ComputingModel struct {
ID string `json:"ID" bson:"_id" required:"true" example:"5099803df3f4948bd2f98391"`
ComputingNEWModel `bson:",inline"`
}
func (model ComputingModel) getRtype() rtype.Rtype {
return rtype.COMPUTING
}
func (model ComputingModel) getName() string {
return model.Name
}
// A user can have multiple workload project with the same model. We must distinguish what is
// the model and what is the user object
type ComputingObject struct {
ReferenceID primitive.ObjectID `json:"referenceID" description:"Computing model ID"`
Inputs []string `json:"inputs"`
Outputs []string `json:"outputs"`
DataCenterID string `json:"datacenterID" description:"Datacenter where the computing will be executed"`
}
func (obj ComputingObject) getHost() *string {
return nil // Host is DC only attribute
}
func (obj *ComputingObject) setReference(rID primitive.ObjectID) {
obj.ReferenceID = rID
}
func (obj ComputingObject) getReference() primitive.ObjectID {
return obj.ReferenceID
}
func (obj ComputingObject) getRtype() rtype.Rtype {
return rtype.COMPUTING
}
func (obj ComputingObject) getModel() (ret ResourceModel, err error) {
var ret2 ComputingModel
res := services.MngoCollComputing.FindOne(services.MngoCtx,
primitive.M{"_id": obj.ReferenceID},
)
if err = res.Err(); err != nil {
return
}
err = res.Decode(&ret2)
return ret2, err
}
func (obj ComputingObject) getName() (name *string) {
aa, err := obj.getModel()
if err != nil {
logs.Warn(err)
return
}
name2 := aa.getName()
return &name2
}
func (obj ComputingObject) isLinked(rObjID string) LinkingState {
if contains(obj.Inputs, rObjID) {
return INPUT
}
if contains(obj.Outputs, rObjID) {
return OUTPUT
}
return NO_LINK
}
func (obj *ComputingObject) addLink(direction LinkingState, rID string) {
switch direction {
case INPUT:
obj.Inputs = append(obj.Inputs, rID)
case OUTPUT:
obj.Outputs = append(obj.Outputs, rID)
}
}
func GetOneComputing(ID string) (object *ComputingModel, err error) {
obj, err := getOneResourceByID(ID, rtype.COMPUTING)
if err != nil {
return object, err
}
object = obj.(*ComputingModel)
return object, err
}
func GetMultipleComputing(IDs []string) (object *[]ComputingModel, err error) {
objArray, err := getMultipleResourceByIDs(IDs, rtype.COMPUTING)
if err != nil {
return nil, err
}
object = objArray.(*[]ComputingModel)
return object, err
}
func PostOneComputing(obj ComputingNEWModel) (ID string, err error) {
return postOneResource(obj, rtype.COMPUTING)
}

125
models/data.go Normal file
View File

@@ -0,0 +1,125 @@
package models
import (
"cloud.o-forge.io/core/oc-catalog/models/rtype"
"cloud.o-forge.io/core/oc-catalog/services"
"github.com/beego/beego/v2/core/logs"
"go.mongodb.org/mongo-driver/bson/primitive"
)
// TODO: Define better the different types of Data model with model herarchy
// TODO: review why swagger are not using the metadata when we do herarchy
type DataNEWModel struct {
Name string `json:"name,omitempty" required:"true" validate:"required" description:"Name of the data"`
ShortDescription string `json:"short_description" required:"true" validate:"required"`
Logo string `json:"logo" required:"true" validate:"required"`
Description string `json:"description" required:"true" validate:"required"`
Type string `json:"type,omitempty" required:"true" validate:"required" description:"Define type of data" example:"file"`
Example string `json:"example" required:"true" validate:"required" description:"base64 encoded data"`
Location string `json:"location" required:"true" validate:"required"`
Dtype string `json:"dtype"`
Protocol []string `json:"protocol"` //TODO Enum type
}
type DataModel struct {
ID string `json:"ID" bson:"_id" required:"true" validate:"required"`
DataNEWModel `bson:",inline"`
}
func (obj DataModel) getRtype() rtype.Rtype {
return rtype.DATA
}
func (model DataModel) getName() string {
return model.Name
}
type DataIO struct {
Counter uint `description:"Incremental number starting from 0"`
}
type DataObject struct {
ReferenceID primitive.ObjectID `json:"referenceID" description:"Data model ID"`
}
func (obj DataObject) getHost() *string {
return nil // Host is DC only attribute
}
func (obj DataObject) getModel() (ret ResourceModel, err error) {
var ret2 DataModel
res := services.MngoCollData.FindOne(services.MngoCtx,
primitive.M{"_id": obj.ReferenceID},
)
if err = res.Err(); err != nil {
return
}
err = res.Decode(&ret2)
return ret2, err
}
func (obj *DataObject) setReference(rID primitive.ObjectID) {
obj.ReferenceID = rID
}
func (obj DataObject) getReference() primitive.ObjectID {
return obj.ReferenceID
}
func (obj DataObject) getRtype() rtype.Rtype {
return rtype.DATA
}
func (obj DataObject) getName() (name *string) {
res := services.MngoCollData.FindOne(services.MngoCtx, primitive.M{"_id": obj.ReferenceID})
if res.Err() != nil {
logs.Error(res)
return
}
var ret DataModel
res.Decode(&ret)
return &ret.Name
}
func (obj DataObject) isLinked(rID string) LinkingState {
return NO_LINK
}
func (obj *DataObject) addLink(direction LinkingState, rObjID string) {
}
func PostOneData(obj DataNEWModel) (string, error) {
return postOneResource(obj, rtype.DATA)
}
func GetMultipleData(IDs []string) (object *[]DataModel, err error) {
objArray, err := getMultipleResourceByIDs(IDs, rtype.DATA)
if err != nil {
return nil, err
}
object = objArray.(*[]DataModel)
return object, err
}
func GetOneData(ID string) (object *DataModel, err error) {
obj, err := getOneResourceByID(ID, rtype.DATA)
if err != nil {
return nil, err
}
object = obj.(*DataModel)
return object, err
}

212
models/datacenter.go Normal file
View File

@@ -0,0 +1,212 @@
package models
import (
"net"
"time"
"cloud.o-forge.io/core/oc-catalog/models/rtype"
"cloud.o-forge.io/core/oc-catalog/services"
"github.com/beego/beego/v2/core/logs"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type DatacenterCpuModel struct {
Cores uint `json:"cores" required:"true"` //TODO: validate
Architecture string `json:"architecture"` //TOOD: enum
Shared bool `json:"shared"`
MinimumMemory uint `json:"minimum_memory"`
Platform string `json:"platform"`
}
type DatacenterMemoryModel struct {
Size uint `json:"size" description:"Units in MB"`
Ecc bool `json:"ecc"`
}
type DatacenterGpuModel struct {
CudaCores uint `json:"cuda_cores"`
Model string `json:"model"`
Memory uint `json:"memory" description:"Units in MB"`
TensorCores uint `json:"tensor_cores"`
}
type DatacenterNEWModel struct {
Name string `json:"name" required:"true"`
Type string `json:"type,omitempty" required:"true"`
Acronym string `json:"acronym" required:"true" description:"id of the DC"`
Hosts []string `json:"hosts" required:"true" description:"list of host:port"`
Description string `json:"description" required:"true"`
ShortDescription string `json:"short_description" required:"true" validate:"required"`
Logo string `json:"logo" required:"true" validate:"required"`
CPU DatacenterCpuModel `json:"cpu" required:"true"`
RAM DatacenterMemoryModel `json:"ram" required:"true"`
GPU []DatacenterGpuModel `json:"gpu" required:"true"`
Owner string `json:"owner" `
BookingPrice int `json:"bookingPrice" `
}
type DatacenterModel struct {
ID string `json:"ID" bson:"_id" required:"true"`
DatacenterNEWModel `bson:",inline"`
}
func GetDatacenterFromAcronym(DC_name string) (retObj *DatacenterModel) {
// TODO: This call should get the data from the peers, since it could be a different
// host in the future
res := services.MngoCollDatacenter.FindOne(services.MngoCtx, primitive.M{"acronym": DC_name})
if res.Err() != nil {
logs.Error(res)
return
}
var ret DatacenterModel
res.Decode(&ret)
return &ret
}
func (obj DatacenterModel) GetTotalCPUs() uint {
return obj.CPU.Cores
}
func (obj DatacenterModel) GetTotalGPUs() uint {
return uint(len(obj.GPU))
}
func (obj DatacenterModel) GetTotalRAM() uint {
return obj.RAM.Size
}
func (obj DatacenterModel) getRtype() rtype.Rtype {
return rtype.DATACENTER
}
func (model DatacenterModel) getName() string {
return model.Name
}
type DatacenterObject struct {
ReferenceID primitive.ObjectID `json:"referenceID" description:"Data model ID"`
}
func (obj *DatacenterObject) setReference(rID primitive.ObjectID) {
obj.ReferenceID = rID
}
func (obj DatacenterObject) getModel() (ret ResourceModel, err error) {
var ret2 DatacenterModel
res := services.MngoCollDatacenter.FindOne(services.MngoCtx,
primitive.M{"_id": obj.ReferenceID},
)
if err = res.Err(); err != nil {
return
}
err = res.Decode(&ret2)
return ret2, err
}
func (obj DatacenterObject) getReference() primitive.ObjectID {
return obj.ReferenceID
}
// Return a reachable host. If no one is reachable, return the first entry
func (obj DatacenterObject) getHost() (host *string) {
res := services.MngoCollDatacenter.FindOne(services.MngoCtx, primitive.M{"_id": obj.ReferenceID})
if res.Err() != nil {
logs.Error(res)
return nil
}
var ret DatacenterModel
err := res.Decode(&ret)
if err != nil {
logs.Error(res)
return nil
}
host = GetHost(ret.Hosts)
return
}
func GetHost(hosts []string) (host *string) {
// Return the first one if we can't reach any server
host = &hosts[0]
for _, singleHost := range hosts {
conn, err := net.DialTimeout("tcp", singleHost, time.Duration(3)*time.Second) //FIXME: longer wait for connection in the future?
if err != nil {
continue
}
if conn != nil {
//bingo
host = &singleHost
conn.Close()
return
}
}
return
}
func (obj DatacenterObject) getRtype() rtype.Rtype {
return rtype.DATACENTER
}
func (obj DatacenterObject) getName() (name *string) {
res := services.MngoCollDatacenter.FindOne(services.MngoCtx, primitive.M{"_id": obj.ReferenceID})
if res.Err() != nil {
logs.Error(res)
return
}
var ret DatacenterModel
res.Decode(&ret)
return &ret.Name
}
func (obj DatacenterObject) isLinked(rID string) LinkingState {
return NO_LINK
}
func (obj *DatacenterObject) addLink(direction LinkingState, rObjID string) {
}
func PostOneDatacenter(obj DatacenterNEWModel) (string, error) {
return postOneResource(obj, rtype.DATACENTER)
}
func GetMultipleDatacenter(IDs []string) (object *[]DatacenterModel, err error) {
objArray, err := getMultipleResourceByIDs(IDs, rtype.DATACENTER)
if err != nil {
return nil, err
}
object = objArray.(*[]DatacenterModel)
return object, err
}
func GetOneDatacenter(ID string) (object *DatacenterModel, err error) {
obj, err := getOneResourceByID(ID, rtype.DATACENTER)
if err != nil {
return object, err
}
object = obj.(*DatacenterModel) //TODO: fix a possible segfault in this model and the others
return object, err
}

118
models/generic.go Normal file
View File

@@ -0,0 +1,118 @@
package models
import (
"errors"
"cloud.o-forge.io/core/oc-catalog/models/rtype"
"cloud.o-forge.io/core/oc-catalog/services"
"github.com/beego/beego/v2/core/logs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo/options"
)
func getObjIDFromString(id string) interface{} {
objectID, err := primitive.ObjectIDFromHex(id)
if err == nil {
return objectID
}
return id
}
func getMultipleObjIDFromArray(ids []string) []interface{} {
var ret []interface{}
for _, val := range ids {
ret = append(ret, getObjIDFromString(val))
}
return ret
}
func getOneResourceByID(ID string, rType rtype.Rtype) (obj interface{}, err error) {
targetDBCollection := rType.MongoCollection()
var retObj interface{}
// asd := rType.
switch rType {
case rtype.DATA:
retObj = &DataModel{}
case rtype.COMPUTING:
retObj = &ComputingModel{}
case rtype.STORAGE:
retObj = &StorageModel{}
case rtype.DATACENTER:
retObj = &DatacenterModel{}
default:
message := "Rtype " + rType.String() + " is not implemented"
logs.Error(message)
return nil, errors.New(message)
}
filter := bson.M{"_id": getObjIDFromString(ID)}
res := targetDBCollection.FindOne(services.MngoCtx, filter)
res.Decode(retObj)
if res.Err() != nil {
logs.Warn("Couldn't find resource: " + res.Err().Error())
}
return retObj, res.Err()
}
func getMultipleResourceByIDs(IDs []string, rType rtype.Rtype) (interface{}, error) {
targetDBCollection := rType.MongoCollection()
var retObj interface{}
// asd := rType.
switch rType {
case rtype.DATA:
retObj = &[]DataModel{}
case rtype.COMPUTING:
retObj = &[]ComputingModel{}
case rtype.STORAGE:
retObj = &[]StorageModel{}
case rtype.DATACENTER:
retObj = &[]DatacenterModel{}
default:
message := "Rtype " + rType.String() + " is not implemented"
logs.Error(message)
return nil, errors.New(message)
}
filter := bson.M{"_id": bson.M{"$in": getMultipleObjIDFromArray(IDs)}}
//FIXME: Limit of find
res, err := targetDBCollection.Find(services.MngoCtx,
filter,
options.Find().SetLimit(100),
)
if err != nil {
logs.Warn("Couldn't find multiple data: " + err.Error())
return nil, err
}
res.All(services.MngoCtx, retObj)
return retObj, res.Err()
}
func postOneResource(retObj interface{}, rType rtype.Rtype) (ID string, err error) {
targetDBCollection := rType.MongoCollection()
result, err := targetDBCollection.InsertOne(services.MngoCtx, retObj)
if err != nil {
logs.Warn("Couldn't insert resource: " + err.Error())
return "", err
}
return result.InsertedID.(primitive.ObjectID).Hex(), nil
}

35
models/mxgraph.go Normal file
View File

@@ -0,0 +1,35 @@
package models
import (
"encoding/xml"
)
type MxGraphModel struct {
XMLName xml.Name `xml:"mxGraphModel"`
Root struct {
XMLName xml.Name `xml:"root"`
MxCell []MxCell `xml:"mxCell"`
}
}
type MxCell struct {
XMLName xml.Name `xml:"mxCell"`
ID string `xml:"id,attr"`
Parent *string `xml:"parent,attr"`
RID *string `xml:"rID,attr"`
Source *string `xml:"source,attr"`
Target *string `xml:"target,attr"`
}
type mxissue struct {
msg string
}
func (m *mxissue) Error() string {
return m.msg
}
func newMxIssue(message string) error {
return &mxissue{message}
}

73
models/rtype/rtype.go Normal file
View File

@@ -0,0 +1,73 @@
package rtype
import (
"cloud.o-forge.io/core/oc-catalog/services"
"github.com/beego/beego/v2/core/logs"
"go.mongodb.org/mongo-driver/mongo"
)
//http://www.inanzzz.com/index.php/post/wqbs/a-basic-usage-of-int-and-string-enum-types-in-golang
type Rtype int
const (
INVALID Rtype = iota
DATA
COMPUTING
STORAGE
DATACENTER
)
var extensions = [...]string{
"INVALID",
"data",
"computing",
"storage",
"datacenter",
}
func IsValidRtype(input string) bool {
for _, v := range extensions {
if v == input {
return true
}
}
return false
}
func NewRtype(rType string) Rtype {
switch rType {
case DATA.String():
return DATA
case COMPUTING.String():
return COMPUTING
case STORAGE.String():
return STORAGE
case DATACENTER.String():
return DATACENTER
default:
return INVALID
}
}
func (e Rtype) String() string {
return extensions[e]
}
func (e Rtype) MongoCollection() *mongo.Collection {
switch e {
case DATA:
return services.MngoCollData
case COMPUTING:
return services.MngoCollComputing
case STORAGE:
return services.MngoCollStorage
case DATACENTER:
return services.MngoCollDatacenter
default:
message := "Rtype " + e.String() + " is not implemented. Returning a nil"
logs.Error(message)
return nil
}
}

322
models/schedule.go Normal file
View File

@@ -0,0 +1,322 @@
package models
import (
"errors"
"time"
"cloud.o-forge.io/core/oc-catalog/services"
"github.com/beego/beego/v2/core/logs"
"github.com/vk496/cron"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
const MAX_DATES uint = 1000 // Max number of dates to retrieve
const MAX_SCHEDULES int = 200
type ScheduleDB struct {
StartDate time.Time
StopDate time.Time
Workflow string
ResourceQty ExecutionRequirementsModel
}
type ScheduleInfo struct {
Total int
NextExecutions [5]string
}
type WorkflowSchedule struct {
IsService bool `description:"Service: true, Task: false"`
StartDate time.Time
StopDate time.Time
Cron string `json:"cron"`
Duration uint `json:"duration" description:"Durantion in seconds" example:"7200"`
Events string `json:"events"`
IsBooked bool `json:"isBooked"`
}
func timeBetween(base, t1, t2 time.Time) bool {
if t2.Before(t1) {
return false
}
if (t1.Before(base) || t1.Equal(base)) && (t2.After(base) || t2.Equal(base)) {
return true
} else {
return false
}
}
func CheckSchedule(cronString string, duration uint, cronFirstDate, cronLastDate time.Time, desiredToSchedule ExecutionRequirementsModel) (err error) {
// ########################################
// TODO: Redesign the whole flow for all possible cases. Meanwhile, use it like this
// ########################################
//
// Here we have already a timeslot filled by other cron Task. So we must check if there
// is still resources available
// However, we could have 3 possibilities here:
//
//
// ## Task with start and stop Date in the DB ##
//
// startDate stopDate
// .-----------------------------·
// | |
// ·-----------------------------·
//
//
// New tasks that have conflicts with what we have in the DB
//
// CASE1 (beggining):
// .--------·
// |########W|
// ·--------·
//
// CASE2 (end):
// .--------·
// |########|
// ·--------·
//
// CASE3 (middle):
// .--------·
// |########|
// ·--------·
//
// CASE4 (multiple):
// .-----· .-· .-----· .----------------·
// |#####| |#| |#####| |################|
// ·-----· ·-· ·-----· ·----------------·
//
//
// The first 3 cases are trivial. But in the 4th case, we must get the sum of the resources
// The same could happen in the opposite, where cases are DB entries
cron, err := cron.Parse(cronString)
if err != nil {
return errors.New("Bad cron message: " + err.Error())
}
dcModel := GetDatacenterFromAcronym(services.DC_NAME)
if dcModel == nil {
return errors.New("The DC " + services.DC_NAME + " doesn't have any DC model with that acronym")
}
if desiredToSchedule.CPUs > dcModel.GetTotalCPUs() {
return errors.New("Requested more CPUs than DC have")
}
if desiredToSchedule.GPUs > dcModel.GetTotalGPUs() {
return errors.New("Requested more GPUs than DC have")
}
if desiredToSchedule.RAM > dcModel.GetTotalRAM() {
return errors.New("Requested more RAM than DC have")
}
var lastMongoDBdate ScheduleDB
dberr := services.MngoCollSchedule.FindOne(services.MngoCtx,
primitive.M{},
options.FindOne().SetSort(primitive.D{{"stopdate", -1}}),
).Decode(&lastMongoDBdate)
if dberr != nil {
if dberr == mongo.ErrNoDocuments {
// The database is empty. We can book without problems
return
}
return dberr
}
// for cursor.Next(services.MngoCtx) {
// var item Workspace
// if err = cursor.Decode(&item); err != nil {
// logs.Error(err)
// close(ch)
// }
// }
// var cronScheduleStart, cronScheduleStop time.Time
// cronScheduleStart = cron.Next(cronFirstDate) // Get the first execution
for cronScheduleStart := cron.Next(cronFirstDate); !cronScheduleStart.IsZero() && cronScheduleStart.Before(lastMongoDBdate.StopDate); cronScheduleStart = cron.Next(cronScheduleStart) {
cronScheduleStop := cronScheduleStart.Add(time.Second * time.Duration(duration))
if cronScheduleStop.After(cronLastDate) || cronScheduleStart.Before(cronFirstDate) {
// We skip values that are in the middle of the limits
continue
}
// ###########################
cursor, err := services.MngoCollSchedule.Find(services.MngoCtx,
primitive.M{"$or": primitive.A{
primitive.M{"startdate": primitive.M{
"$gte": cronScheduleStart,
"$lte": cronScheduleStop,
}},
primitive.M{"stopdate": primitive.M{
"$gte": cronScheduleStart,
"$lte": cronScheduleStop,
}},
primitive.M{"$and": primitive.A{
primitive.M{"startdate": primitive.M{
"$lte": cronScheduleStart,
}},
primitive.M{"stopdate": primitive.M{
"$gte": cronScheduleStop,
}},
}},
}},
// options.Find().SetSort(primitive.D{{"startdate", 1}}),
)
if err != nil {
return err
}
var items []ScheduleDB
cursor.All(services.MngoCtx, &items)
if len(items) == 0 {
// A empty time slot. Available
continue
}
// There is some workflows booked here. We must check if there is remaining resources
var alreadScheduled ExecutionRequirementsModel
for _, scheduled := range items {
alreadScheduled.CPUs += scheduled.ResourceQty.CPUs
alreadScheduled.GPUs += scheduled.ResourceQty.GPUs
alreadScheduled.RAM += scheduled.ResourceQty.RAM
}
if alreadScheduled.CPUs+desiredToSchedule.CPUs > dcModel.GetTotalCPUs() {
return errors.New("Not enough CPU capacity from date " + cronScheduleStart.UTC().String() + " to " + cronScheduleStop.UTC().String())
}
if alreadScheduled.GPUs+desiredToSchedule.GPUs > dcModel.GetTotalGPUs() {
return errors.New("Not enough GPU capacity from date " + cronScheduleStart.UTC().String() + " to " + cronScheduleStop.UTC().String())
}
if alreadScheduled.RAM+desiredToSchedule.RAM > dcModel.GetTotalRAM() {
return errors.New("Not enough RAM capacity from date " + cronScheduleStart.UTC().String() + " to " + cronScheduleStop.UTC().String())
}
}
return
}
func CreateScheduleWorkflow(dcName, userID, workflowName, cronString string, duration uint, startDate, stopDate time.Time, requirements ExecutionRequirementsModel) (ret ScheduleInfo, err error) {
//TODO: Check that dcName is correct
err = CheckSchedule(cronString, duration, startDate, stopDate, requirements)
if err != nil {
return ret, err
}
// Already checked possible errors
myCron, _ := cron.Parse(cronString)
scheduledTimeStart := myCron.Next(time.Now().UTC()) // Get the first execution
counter := 0
var uploadSchedule []interface{}
for !scheduledTimeStart.IsZero() && counter < MAX_SCHEDULES {
scheduledTimeStop := scheduledTimeStart.Add(time.Second * time.Duration(duration))
if scheduledTimeStop.After(stopDate) || scheduledTimeStart.Before(startDate) {
// We skip values that are in the middle of the limits
scheduledTimeStart = myCron.Next(scheduledTimeStart)
counter++
continue
}
uploadSchedule = append(uploadSchedule, ScheduleDB{
StartDate: scheduledTimeStart,
StopDate: scheduledTimeStop,
Workflow: dcName + "." + userID + "." + workflowName,
ResourceQty: requirements, //stub
})
scheduledTimeStart = myCron.Next(scheduledTimeStart)
counter++
}
//FIXME: Consider doing something with the inserting result
_, err = services.MngoCollSchedule.InsertMany(services.MngoCtx, uploadSchedule)
if err != nil {
logs.Error(err)
}
ret.Total = len(uploadSchedule)
for i := 0; i < 5 && len(uploadSchedule) > i; i++ {
elem := uploadSchedule[i].(ScheduleDB)
ret.NextExecutions[i] = elem.StartDate.String()
}
return
}
func GetFarSchedules(baseDate time.Time, isNext bool) *time.Time {
operator := "$gt"
if !isNext {
// Previous to this date
operator = "$lt"
}
var res *ScheduleDB
dberr := services.MngoCollSchedule.FindOne(services.MngoCtx,
primitive.M{"startdate": primitive.M{
operator: baseDate,
}},
options.FindOne().SetSort(primitive.D{{"startdate", 1}})).Decode(&res)
if dberr != nil {
logs.Error(dberr)
return nil
}
return &res.StartDate
}
func GetSchedules(startDate, stopDate time.Time) (data []ScheduleDB, maxLimit bool, err error) {
if startDate.After(stopDate) {
return nil, false, errors.New("stopDate must be after startDate")
}
// Range of 35 days as max
if startDate.Add(24 * time.Hour * time.Duration(35)).Before(stopDate) {
return nil, false, errors.New("Must be less than 35 days between startDate and stopDate")
}
//FIXME: Discuss if we should check old schedules
// if startDate.Before(time.Now().UTC()) {
// return nil, false, errors.New("TimeServer is " + time.Now().UTC().String() + " but your startDate is " + startDate.String())
// }
firstDateCur, err := services.MngoCollSchedule.Find(services.MngoCtx,
primitive.M{"startdate": primitive.M{
"$gte": startDate,
"$lte": stopDate,
}},
options.Find().SetLimit(int64(MAX_DATES)),
)
if err != nil {
return
}
firstDateCur.All(services.MngoCtx, &data)
if len(data) == int(MAX_DATES) {
maxLimit = true
}
return
}

101
models/search.go Normal file
View File

@@ -0,0 +1,101 @@
package models
import (
"strings"
"cloud.o-forge.io/core/oc-catalog/services"
"github.com/beego/beego/v2/core/logs"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type SearchResult struct {
Computing []ComputingModel `json:"computing" required:"true"`
Datacenter []DatacenterModel `json:"datacenter"`
Storage []StorageModel `json:"storage"`
Data []DataModel `json:"data"`
}
func FindByWord(word string) (object *SearchResult, err error) {
returnObject := SearchResult{}
opts := options.Find()
opts.SetLimit(100) //FIXME: Decide if limit and how
var cursor *mongo.Cursor
if strings.TrimSpace(word) == "*" {
word = ""
} else {
word = `(?i).*` + word + `.*`
}
// filter := bson.M{"$text": bson.M{"$search": word}}
if cursor, err = services.MngoCollComputing.Find(
services.MngoCtx,
bson.M{"$or": []bson.M{
{"description": bson.M{"$regex": word}},
{"owner": bson.M{"$regex": word}},
{"license": bson.M{"$regex": word}},
}},
opts,
); err != nil {
logs.Error(err)
return nil, err
}
if err = cursor.All(services.MngoCtx, &returnObject.Computing); err != nil {
logs.Error(err)
return nil, err
}
if cursor, err = services.MngoCollDatacenter.Find(
services.MngoCtx,
bson.M{"$or": []bson.M{
{"name": bson.M{"$regex": word}},
{"description": bson.M{"$regex": word}},
{"owner": bson.M{"$regex": word}},
}}, opts,
); err != nil {
logs.Error(err)
return nil, err
}
if err = cursor.All(services.MngoCtx, &returnObject.Datacenter); err != nil {
logs.Error(err)
return nil, err
}
if cursor, err = services.MngoCollStorage.Find(
services.MngoCtx,
bson.M{"$or": []bson.M{
{"name": bson.M{"$regex": word}},
{"description": bson.M{"$regex": word}},
}},
opts,
); err != nil {
logs.Error(err)
return nil, err
}
if err = cursor.All(services.MngoCtx, &returnObject.Storage); err != nil {
logs.Error(err)
return nil, err
}
if cursor, err = services.MngoCollData.Find(
services.MngoCtx,
bson.M{"$or": []bson.M{
{"description": bson.M{"$regex": word}},
{"example": bson.M{"$regex": word}},
}},
opts,
); err != nil {
logs.Error(err)
return nil, err
}
if err = cursor.All(services.MngoCtx, &returnObject.Data); err != nil {
logs.Error(err)
return nil, err
}
return &returnObject, nil
}

138
models/storage.go Normal file
View File

@@ -0,0 +1,138 @@
package models
import (
"cloud.o-forge.io/core/oc-catalog/models/rtype"
"cloud.o-forge.io/core/oc-catalog/services"
"github.com/beego/beego/v2/core/logs"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type StorageNEWModel struct {
Name string `json:"name" required:"true"`
Description string `json:"description" required:"true"`
ShortDescription string `json:"short_description" required:"true" validate:"required"`
Logo string `json:"logo" required:"true" validate:"required"`
Type string `json:"type,omitempty" required:"true"`
DCacronym string `json:"DCacronym" required:"true" description:"Unique ID of the DC where it is the storage"`
Size uint `json:"size" required:"true"`
Encryption bool `json:"encryption" `
Redundancy string `json:"redundancy" `
Throughput string `json:"throughput" `
BookingPrice uint `json:"bookingPrice" `
}
type StorageModel struct {
ID string `json:"ID" bson:"_id" required:"true"`
StorageNEWModel `bson:",inline"`
}
func (obj StorageModel) getRtype() rtype.Rtype {
return rtype.STORAGE
}
func (model StorageModel) getName() string {
return model.Name
}
type StorageObject struct {
ReferenceID primitive.ObjectID `json:"referenceID" description:"Storage model ID"`
Inputs []string `json:"inputs" `
Outputs []string `json:"outputs" `
}
func (obj StorageObject) getHost() *string {
return nil // Host is DC only attribute
}
func (obj StorageObject) getModel() (ret ResourceModel, err error) {
var ret2 StorageModel
res := services.MngoCollStorage.FindOne(services.MngoCtx,
primitive.M{"_id": obj.ReferenceID},
)
if err = res.Err(); err != nil {
return
}
err = res.Decode(&ret2)
return ret2, err
}
func (obj *StorageObject) setReference(rID primitive.ObjectID) {
obj.ReferenceID = rID
}
func (obj StorageObject) getReference() primitive.ObjectID {
return obj.ReferenceID
}
func (obj StorageObject) getRtype() rtype.Rtype {
return rtype.STORAGE
}
func (obj StorageObject) getName() (name *string) {
res := services.MngoCollStorage.FindOne(services.MngoCtx, primitive.M{"_id": obj.ReferenceID})
if res.Err() != nil {
logs.Error(res)
return
}
var ret StorageModel
res.Decode(&ret)
return &ret.Name
}
func (obj StorageObject) isLinked(rObjID string) LinkingState {
if contains(obj.Inputs, rObjID) {
return INPUT
}
if contains(obj.Outputs, rObjID) {
return OUTPUT
}
return NO_LINK
}
func (obj *StorageObject) addLink(direction LinkingState, rObjID string) {
switch direction {
case INPUT:
obj.Inputs = append(obj.Inputs, rObjID)
case OUTPUT:
obj.Outputs = append(obj.Outputs, rObjID)
}
}
func PostOneStorage(obj StorageNEWModel) (string, error) {
return postOneResource(obj, rtype.STORAGE)
}
func GetOneStorage(ID string) (object *StorageModel, err error) {
obj, err := getOneResourceByID(ID, rtype.STORAGE)
if err != nil {
return object, err
}
object = obj.(*StorageModel)
return object, err
}
func GetMultipleStorage(IDs []string) (object *[]StorageModel, err error) {
objArray, err := getMultipleResourceByIDs(IDs, rtype.STORAGE)
if err != nil {
return nil, err
}
object = objArray.(*[]StorageModel)
return object, err
}

12
models/user.go Normal file
View File

@@ -0,0 +1,12 @@
package models
type UserModel struct {
ID string `json:"id,omitempty",bson:"_id"`
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
Email string `json:"email,omitempty"`
}
func Login(username, password string) bool {
return true
}

997
models/workflow.go Normal file
View File

@@ -0,0 +1,997 @@
package models
import (
"context"
"encoding/xml"
"errors"
"sort"
"time"
"cloud.o-forge.io/core/oc-catalog/models/rtype"
swagger "cloud.o-forge.io/core/oc-catalog/selfapi"
"cloud.o-forge.io/core/oc-catalog/services"
"github.com/beego/beego/v2/core/logs"
"github.com/vk496/cron"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type LinkingState uint
// When we check the schedule of a workflow, we report with this
type DCstatus struct {
DCname string
DCobjID string //FIXME: Probably should be model ID
IsReachable bool
IsAvailable bool
Booked *ScheduleInfo
ErrorMessage string
}
const (
NO_LINK LinkingState = iota
INPUT
OUTPUT
)
func boolToLinkingState(boolState bool) LinkingState {
switch boolState {
case true:
return INPUT
case false:
return OUTPUT
default:
return NO_LINK
}
}
const SchedulesDB = "schedules"
type Workflow struct {
// The key of the map is the ID of the object itself
Data map[string]DataObject `json:"data"`
Computing map[string]ComputingObject `json:"computing"`
Storage map[string]StorageObject `json:"storage"`
Datacenter map[string]DatacenterObject `json:"datacenter"` //TODO: Decide if there should be multiple objects of a datacenter
Schedules WorkflowSchedule `json:"schedules"`
MxgraphXML string `description:"State of the mxgraph"`
}
type ResourceObject interface {
getHost() *string
getName() *string
getModel() (ResourceModel, error)
getRtype() rtype.Rtype
setReference(rObjID primitive.ObjectID)
getReference() primitive.ObjectID
isLinked(rObjID string) LinkingState
addLink(direction LinkingState, rObjID string)
}
// Get a sum of all execution requirements attached to a DC obj
func (w Workflow) GetExecutionRequirements(dcIDobj string) (ret ExecutionRequirementsModel, err error) {
// Find the id of the DC obj
if _, ok := w.Datacenter[dcIDobj]; !ok {
return ExecutionRequirementsModel{}, errors.New("DC obj" + dcIDobj + " doesn't exist in the Workflow")
}
// Get all elements that are attached to the DC
for _, computingObj := range w.Computing {
if computingObj.DataCenterID == dcIDobj {
mymodel, err := computingObj.getModel()
if err != nil {
return ExecutionRequirementsModel{}, err
}
compModel := mymodel.(ComputingModel)
//TODO a generic way to concatenate execution requirements
ret.CPUs += compModel.ExecutionRequirements.CPUs
ret.GPUs += compModel.ExecutionRequirements.GPUs
ret.RAM += compModel.ExecutionRequirements.RAM
}
}
return
}
func (w *Workflow) GetResource(rObjID *string) (retObj ResourceObject) {
if rObjID == nil {
return nil
}
if storVal, ok := w.Data[*rObjID]; ok {
retObj = &storVal
return
}
if storVal, ok := w.Computing[*rObjID]; ok {
retObj = &storVal
return
}
if storVal, ok := w.Storage[*rObjID]; ok {
retObj = &storVal
return
}
if storVal, ok := w.Datacenter[*rObjID]; ok {
retObj = &storVal
return
}
return nil
}
func (w *Workflow) GetResourceMapByRtype(rt rtype.Rtype) interface{} {
switch rt {
case rtype.DATA:
return w.Data
case rtype.COMPUTING:
return w.Computing
case rtype.STORAGE:
return w.Storage
case rtype.DATACENTER:
return w.Datacenter
default:
return nil
}
}
func (w *Workflow) CreateResourceObject(rt rtype.Rtype) ResourceObject {
var res ResourceObject
switch rt {
case rtype.DATA:
res = &DataObject{}
case rtype.COMPUTING:
res = &ComputingObject{}
case rtype.STORAGE:
res = &StorageObject{}
case rtype.DATACENTER:
res = &DatacenterObject{}
default:
res = nil
}
return res
}
func (w *Workflow) AddObj(robj ResourceObject) *primitive.ObjectID {
outputID := primitive.NewObjectID()
w.UpdateObj(robj, outputID.Hex())
return &outputID
}
func (w *Workflow) UpdateDB(userID, workflowName string) error {
_, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": userID},
primitive.M{"$set": primitive.M{WorkflowDB + "." + workflowName: w}},
)
return err
}
func (w *Workflow) UpdateObj(robj ResourceObject, objID string) {
switch robj.getRtype() {
case rtype.DATA:
var target DataObject
if w.Data == nil {
//init
w.Data = make(map[string]DataObject)
}
target = *robj.(*DataObject)
w.Data[objID] = target
case rtype.COMPUTING:
var target ComputingObject
if w.Computing == nil {
//init
w.Computing = make(map[string]ComputingObject)
}
target = *robj.(*ComputingObject)
w.Computing[objID] = target
case rtype.STORAGE:
var target StorageObject
if w.Storage == nil {
//init
w.Storage = make(map[string]StorageObject)
}
target = *robj.(*StorageObject)
w.Storage[objID] = target
case rtype.DATACENTER:
var target DatacenterObject
if w.Datacenter == nil {
//init
w.Datacenter = make(map[string]DatacenterObject)
}
target = *robj.(*DatacenterObject)
w.Datacenter[objID] = target
}
}
func GetWorkflow(userID, workflowName string) (workflow *Workflow, err error) {
userWorkspace := GetWorkspace(userID)
if userWorkspace != nil {
if theWorkflow, ok := userWorkspace.Workflows[workflowName]; ok {
return &theWorkflow, nil
}
logs.Debug("No workflow name")
return nil, errors.New("No workflow name")
}
logs.Debug("No workspace")
return nil, errors.New("No workspace")
}
func CreateWorkflow(userID, workflowName string) (err error) {
//TODO: Maybe operate directly in the DB instead retriving the full object?
userWorkspace := GetWorkspace(userID)
// Exist in the DB
if userWorkspace != nil {
if _, ok := userWorkspace.Workflows[workflowName]; ok {
message := "Workspace workflow " + workflowName +
" is already created for user " + userID
logs.Debug(message)
return errors.New(message)
}
userWP := &Workflow{}
// New element
addElem := primitive.M{WorkflowDB + "." + workflowName: userWP}
// If user doesn't have workflows, we must init at least one
if userWorkspace.Workflows == nil {
addElem = primitive.M{WorkflowDB: map[string]*Workflow{
workflowName: userWP,
}}
}
_, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": userID},
primitive.M{"$set": addElem},
)
if err != nil {
message := "Internal error when updating in DB"
logs.Debug(message + "; " + err.Error())
return errors.New(message)
}
return nil
}
return errors.New("Can't create a workflow without a workspace")
}
func CreateObjectInWorkflow(userID, workflowName, rID string) (rObjID2 *string, err error) {
userWorkspace := GetWorkspace(userID)
if userWorkspace == nil {
return nil, errors.New("No workspace for user " + userID)
}
if _, ok := userWorkspace.Workflows[workflowName]; !ok {
return nil, errors.New("Workspace workflow " + workflowName + " doesn't exist for user " + userID)
}
rIDObj, err := primitive.ObjectIDFromHex(rID)
if err != nil {
return nil, errors.New("ID " + rID + " is not valid")
}
//TODO: We are replacing the entire array instead of pushing
// a new element. Probably will have problems with multithread/async
// operations and consistency in the future
for rtyp, resource := range userWorkspace.GetResources() {
if contains(resource, rID) {
wWorkflow := userWorkspace.Workflows[workflowName]
newObj := wWorkflow.CreateResourceObject(rtyp)
newObj.setReference(rIDObj)
outputID := wWorkflow.AddObj(newObj)
_, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": userID},
primitive.M{"$set": primitive.M{WorkflowDB + "." + workflowName + "." + rtyp.String(): wWorkflow.GetResourceMapByRtype(rtyp)}},
)
if err != nil {
return nil, errors.New("Internal error when updating in DB: " + err.Error())
}
outStr := outputID.Hex()
return &outStr, nil
}
}
return nil, errors.New("rID " + rID + " doesn't exist in the user workspace")
}
func LinkObjectsInWorkspace(userID, workflowName, rObjIDsource string, isInput bool, rObjIDtarger string) (err error) {
userWorkspace := GetWorkspace(userID)
if userWorkspace == nil {
return errors.New("No workspace for user " + userID)
}
if _, ok := userWorkspace.Workflows[workflowName]; !ok {
return errors.New("Workspace workflow " + workflowName + " doesn't exist for user " + userID)
}
// Check rObjIDsource
if _, ok := userWorkspace.Workflows[workflowName].Data[rObjIDsource]; !ok {
return errors.New("rObjIDsource must be of type DATA for now")
}
// Check rObjIDtarger
wWorkflow := userWorkspace.Workflows[workflowName]
resObjTarget := wWorkflow.GetResource(&rObjIDtarger)
if resObjTarget == nil {
return errors.New("rObjIDtarger doesn't exist")
} else if resObjTarget.getRtype() == rtype.DATA {
return errors.New("rObjIDtarger of type Data doesn't have inputs/outputs")
}
if resObjTarget.isLinked(rObjIDsource) != NO_LINK {
return errors.New("rObjIDsource already exists in the inputs or outputs")
}
resObjTarget.addLink(boolToLinkingState(isInput), rObjIDsource)
_, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": userID},
primitive.M{"$set": primitive.M{
WorkflowDB + "." +
workflowName + "." +
resObjTarget.getRtype().String() + "." +
rObjIDtarger: resObjTarget}},
)
if err != nil {
return errors.New("Internal error when updating in DB: " + err.Error())
}
return nil
}
func GetWorkflowSchedule(username, workflowName string) (Currentschedule *WorkflowSchedule, err error) {
userWorkspace := GetWorkspace(username)
if userWorkspace == nil {
return nil, errors.New("No workspace for user " + username)
}
if workflow, ok := userWorkspace.Workflows[workflowName]; ok {
Currentschedule = &workflow.Schedules
}
return
}
func SetWorkflowSchedule(username, workflowName, cronString, events string, isService bool, startDate, stopDate time.Time, duration uint) (NextSchedules *ScheduleInfo, err error) {
userWorkspace := GetWorkspace(username)
if userWorkspace == nil {
return nil, errors.New("No workspace for user " + username)
}
// Check if workflow exist
if _, ok := userWorkspace.Workflows[workflowName]; !ok {
return
}
// We mustn't modify a booked schedule
if userWorkspace.Workflows[workflowName].Schedules.IsBooked {
return nil, errors.New("A booked schedule can't be modified")
}
sch := WorkflowSchedule{}
NextSchedules = &ScheduleInfo{}
sch.StartDate = startDate
sch.StopDate = stopDate
sch.IsService = isService
if isService { // Service
NextSchedules.NextExecutions[0] = startDate.String()
NextSchedules.Total = 1
}
if !isService { // Task
sch.Cron = cronString
sch.Duration = duration
sch.Events = events
// Obtain next executions
counter := 0
myCron, _ := cron.Parse(cronString) // NOTE: already checked in the controller
scheduledStart := myCron.Next(startDate) // Get the first execution starting from startDate
for !scheduledStart.IsZero() && counter < MAX_SCHEDULES {
scheduleStop := scheduledStart.Add(time.Second * time.Duration(duration))
if scheduleStop.After(stopDate) || scheduledStart.Before(startDate) {
// If a task is longer than last possible date, we ignore it
scheduledStart = myCron.Next(scheduledStart)
counter++
continue
}
if counter < len(NextSchedules.NextExecutions) {
NextSchedules.NextExecutions[counter] = scheduledStart.String()
}
scheduledStart = myCron.Next(scheduledStart)
counter++
}
NextSchedules.Total = counter
if NextSchedules.Total == 0 {
return nil, errors.New("Current Task configuration will have 0 executions")
}
}
_, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": username},
primitive.M{"$set": primitive.M{
WorkflowDB + "." +
workflowName + "." +
SchedulesDB: &sch}},
)
if err != nil {
return nil, errors.New("Internal error when updating in DB: " + err.Error())
}
return
}
func GetMxGraph(username, workflowName string) (xmlData *string, err error) {
userWorkspace := GetWorkspace(username)
if userWorkspace == nil {
return nil, errors.New("No workspace for user " + username)
}
if _, ok := userWorkspace.Workflows[workflowName]; !ok {
return nil, errors.New("Workspace workflow " + workflowName + " doesn't exist for user " + username)
}
var data string = userWorkspace.Workflows[workflowName].MxgraphXML
if data == "" {
xmlData = nil
} else {
xmlData = &data
}
return
}
func ParseMxGraph(username, workflowName, xmlData string) (err error, mxissues []error) {
userWorkspace := GetWorkspace(username)
if userWorkspace == nil {
return errors.New("No workspace for user " + username), nil
}
currentWorkflow, ok := userWorkspace.Workflows[workflowName]
if !ok {
return errors.New("No workflow " + workflowName), nil
}
if currentWorkflow.Schedules.IsBooked {
return errors.New("Can't modify a booked workflow"), nil
}
var xmlModel MxGraphModel
// logs.Debug(xmlData)
err = xml.Unmarshal([]byte(xmlData), &xmlModel)
if err != nil {
return err, nil
}
targetWorkspaceWorkflow, err, mxissues := userWorkspace.ConsumeMxGraphModel(xmlModel)
if err != nil {
return err, nil
}
targetWorkspaceWorkflow.MxgraphXML = xmlData
targetWorkspaceWorkflow.Schedules = currentWorkflow.Schedules //TODO: Probably we should move schudles outside the workflow
_, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": username},
primitive.M{"$set": primitive.M{
WorkflowDB + "." +
workflowName: targetWorkspaceWorkflow}},
)
if err != nil {
return errors.New("Internal error when updating in DB: " + err.Error()), nil
}
return nil, mxissues
}
// FindInSlice takes a slice and looks for an element in it. If found it will
// return it's key, otherwise it will return -1 and a bool of false.
func FindInSlice(slice []string, val string) (int, bool) {
for i, item := range slice {
if item == val {
return i, true
}
}
return -1, false
}
// At least one element exist in both slices
// Return index1, index2 and if exist
func FindSliceInSlice(slice1 []string, slice2 []string) (int, int, bool) {
for i1, item1 := range slice1 {
for i2, item2 := range slice2 {
if item1 == item2 {
return i1, i2, true
}
}
}
return -1, -1, false
}
func (w Workspace) ConsumeMxGraphModel(xmlmodel MxGraphModel) (ret *Workflow, err error, issues []error) {
ret = &Workflow{}
// When we will iterate over the full array of cells, we first will register the resources
// and after the linkage between them
sort.Slice(xmlmodel.Root.MxCell, func(i, j int) bool {
return xmlmodel.Root.MxCell[i].RID != nil
})
for _, cell := range xmlmodel.Root.MxCell {
switch {
case cell.RID != nil:
// Case of a Resource
rType := w.getRtype(*cell.RID)
if rType == rtype.INVALID {
return nil,
errors.New("Refering to a rID that is not in the workflow"),
nil
}
// Generate ObjectID for the reference ID
rIDObj, err := primitive.ObjectIDFromHex(*cell.RID)
if err != nil {
return nil,
errors.New("Bad ID format: " + *cell.RID),
nil
}
resObj := ret.CreateResourceObject(rType)
resObj.setReference(rIDObj)
ret.UpdateObj(resObj, cell.ID)
case cell.ID == "0" || cell.ID == "1":
// ID 0 and 1 are special cases of mxeditor
continue
// issues = append(issues, errors.New("MxCell with ID "+cell.ID+" doesn't have a valid link"))
default:
// Not root nor resource. Should be only links
sourceObj := ret.GetResource(cell.Source)
targetObj := ret.GetResource(cell.Target)
if sourceObj == nil || targetObj == nil {
if sourceObj == nil && targetObj == nil {
issues = append(issues, errors.New("Arrow "+cell.ID+" is alone"))
} else if sourceObj == nil {
issues = append(issues, errors.New("Arrow ("+cell.ID+") to "+*targetObj.getName()+" without parent"))
} else {
issues = append(issues, errors.New("Arrow "+cell.ID+" from "+*sourceObj.getName()+" without target"))
}
// If is a invalid link, we can't save it in the DB
continue
}
if sourceObj.getRtype() == rtype.DATACENTER || targetObj.getRtype() == rtype.DATACENTER {
var datacenter, datacenterLinked *string
if sourceObj.getRtype() == rtype.DATACENTER {
datacenter = cell.Source
datacenterLinked = cell.Target
} else {
datacenter = cell.Target
datacenterLinked = cell.Source
}
switch ret.GetResource(datacenterLinked).getRtype() {
case rtype.COMPUTING:
computingObj := ret.GetResource(datacenterLinked).(*ComputingObject)
// We should always get a ID because we already registered resources and discarded which doesn't correspond to existent models
computingObj.DataCenterID = *datacenter
ret.UpdateObj(computingObj, *datacenterLinked)
}
} else {
targetObj.addLink(INPUT, *cell.Source)
ret.UpdateObj(targetObj, *cell.Target) // save back
// If we have a relationship of:
// Source ----> Target
//
// The Source will be in the INPUTs of the Target.
// But we also must make sure that the Target will be in the OUTPUTs of the Source
sourceObj.addLink(OUTPUT, *cell.Target)
ret.UpdateObj(sourceObj, *cell.Source)
}
}
}
dcslist := make(map[string]bool)
dataslist := make(map[string]bool)
// datalist := make(map[string]bool)
for _, comp := range ret.Computing {
if comp.DataCenterID == "" {
issues = append(issues, errors.New("Computing "+*comp.getName()+" without a Datacenter"))
} else {
// If doesn't exist in the list, means is new element to register as used
dcslist[comp.DataCenterID] = true
}
for _, dcin := range comp.Inputs {
switch ret.GetResource(&dcin).getRtype() {
case rtype.DATA:
dataslist[dcin] = true
}
}
for _, dcout := range comp.Outputs {
switch ret.GetResource(&dcout).getRtype() {
case rtype.DATA:
dataslist[dcout] = true
}
}
}
for _, va := range ret.Storage {
if va.Inputs == nil && va.Outputs == nil {
issues = append(issues, errors.New("Storage "+*va.getName()+" without compatible inputs and outputs"))
}
}
for dcID, va := range ret.Datacenter {
// if rID doesn't exist in the list, it means that it's not used
if _, ok := dcslist[dcID]; !ok {
issues = append(issues, errors.New("DC "+*va.getName()+" not atached to any Computing"))
}
}
for dcID, va := range ret.Data {
// if rID doesn't exist in the list, it means that it's not used
if _, ok := dataslist[dcID]; !ok {
issues = append(issues, errors.New("Data "+*va.getName()+" not atached to any Computing"))
}
}
//////////////////////////////////////////////////////////
// //
// Starting from here, we check the type of resources //
// //
//////////////////////////////////////////////////////////
// FIXME: Avoid checking twice the same cases (cycles). Ex:
//
// Comp1 ----> Comp2
//
// In this case, we will check Comp1 outputs with Comp2
// inputs AND Comp2 inputs with Comp1 outputs, since we are
// iterating over all existent Computing models in the Graph
for _, comp := range ret.Computing {
compModel, err2 := comp.getModel()
if err = err2; err != nil {
return
}
currentCompModel := compModel.(ComputingModel)
// Come computings may not allow inputs or outputs
if len(currentCompModel.Dinputs) == 0 && len(comp.Inputs) > 0 {
issues = append(issues, errors.New("Computing "+compModel.getName()+" must not have any input"))
continue
}
if len(currentCompModel.Doutputs) == 0 && len(comp.Outputs) > 0 {
issues = append(issues, errors.New("Computing "+compModel.getName()+" must not have any output"))
continue
}
//TODO: We should allow heterogenous inputs?
for _, objIn := range comp.Inputs {
resIn := ret.GetResource(&objIn)
resInType := resIn.getRtype()
switch resInType {
case rtype.DATA:
dataModel, err2 := resIn.getModel()
if err = err2; err != nil {
return
}
myDataModel := dataModel.(DataModel)
if _, ok := FindInSlice(currentCompModel.Dinputs, myDataModel.Dtype); !ok {
issues = append(issues, errors.New("Computing "+compModel.getName()+" can't handle inputs of type "+myDataModel.Dtype+" from Data "+dataModel.getName()))
}
case rtype.COMPUTING:
inCompModel, err2 := resIn.getModel()
if err = err2; err != nil {
return
}
myInComputingModel := inCompModel.(ComputingModel)
if _, _, ok := FindSliceInSlice(myInComputingModel.Doutputs, currentCompModel.Dinputs); !ok {
issues = append(issues, errors.New("Computing "+compModel.getName()+" can't handle any input from "+inCompModel.getName()))
}
case rtype.STORAGE:
// Storage can give use anything, so we always accept it's input for now
continue
default:
issues = append(issues, errors.New("Computing "+currentCompModel.getName()+" can't have any resource of type "+resInType.String()+" (behaviour not defined)"))
}
}
//TODO: We should allow heterogenous outputs?
for _, objOut := range comp.Outputs {
resOut := ret.GetResource(&objOut)
resOutType := resOut.getRtype()
switch resOutType {
case rtype.COMPUTING:
outCompModel, err2 := resOut.getModel()
if err = err2; err != nil {
return
}
myOutComputingModel := outCompModel.(ComputingModel)
if _, _, ok := FindSliceInSlice(currentCompModel.Doutputs, myOutComputingModel.Dinputs); !ok {
issues = append(issues, errors.New("Computing "+compModel.getName()+" doesn't have output data compatible with "+outCompModel.getName()))
}
case rtype.STORAGE:
// Storage can save anything, so we always accept store it for now
continue
default:
issues = append(issues, errors.New("Computing "+currentCompModel.getName()+" can't have any resource of type "+resOutType.String()+" (behaviour not defined)"))
}
}
}
return
}
func sumExecutionReqs(exeqReq ...ExecutionRequirementsModel) (ret ExecutionRequirementsModel) {
for _, v := range exeqReq {
ret.CPUs += v.CPUs
ret.GPUs += v.GPUs
ret.RAM += v.RAM
}
return
}
func CheckAndBookWorkflowSchedule(username, workflowName string, book bool) (myRet []DCstatus, err error) {
userWorkspace := GetWorkspace(username)
if userWorkspace == nil {
return nil, errors.New("No workspace for user " + username)
}
currentWorkflow, ok := userWorkspace.Workflows[workflowName]
if !ok {
return nil, errors.New("No workflow " + workflowName + " for user " + username)
}
if currentWorkflow.Schedules.IsBooked {
return nil, errors.New("Can't operate DCs for a already booked schedule")
}
// dd := &currentWorkflow
// We can have multiple DCobjs pointing to the same DCmodel. We must sum all req of the same DCmodel
totalDCs := make(map[primitive.ObjectID]ExecutionRequirementsModel)
for dcIDobj, dcObj := range currentWorkflow.Datacenter {
modelID := dcObj.getReference()
var totalsModel ExecutionRequirementsModel
totalsModel, err = currentWorkflow.GetExecutionRequirements(dcIDobj)
if err != nil {
return
}
if _, ok := totalDCs[modelID]; ok {
totalDCs[modelID] = sumExecutionReqs(totalDCs[modelID], totalsModel)
} else {
totalDCs[modelID] = totalsModel
}
}
myRet = make([]DCstatus, len(totalDCs))
var i int
i = -1
for modelID, execReq := range totalDCs {
i++
var dcModel *DatacenterModel
dcModel, err = GetOneDatacenter(modelID.Hex())
if err != nil {
return
}
myRet[i].DCname = dcModel.Name
myRet[i].DCobjID = modelID.Hex()
// retrieve the host of the DC
host := GetHost(dcModel.Hosts)
if host == nil {
myRet[i].ErrorMessage = "Datacenter " + myRet[i].DCname + " doesn't have a host property"
continue
}
cli := services.GetSelfAPI(*host)
data, err := cli.ScheduleApi.ScheduleControllerCheckIfScheduleCanBeCreatedInThisDC(context.Background(),
currentWorkflow.Schedules.Cron,
int32(currentWorkflow.Schedules.Duration),
currentWorkflow.Schedules.StartDate.Format(time.RFC3339),
currentWorkflow.Schedules.StopDate.Format(time.RFC3339),
swagger.ModelsExecutionRequirementsModel{
Cpus: int32(execReq.CPUs),
Gpus: int32(execReq.GPUs),
Ram: int32(execReq.RAM),
},
)
if err != nil {
myRet[i].ErrorMessage = err.Error()
swErr, ok := err.(swagger.GenericSwaggerError)
if ok {
myRet[i].IsReachable = true
myRet[i].ErrorMessage += ": " + string(swErr.Body())
}
continue
}
myRet[i].IsReachable = true
if data.StatusCode == 200 {
myRet[i].IsAvailable = true
}
}
// If we only check, we should exit here
if !book {
return
}
for _, v := range myRet {
if !v.IsAvailable {
return
}
}
i = -1
allBooked := true
for modelID, execReq := range totalDCs {
i++
// _ = v
var dcModel *DatacenterModel
dcModel, err = GetOneDatacenter(modelID.Hex())
if err != nil {
myRet[i].ErrorMessage = err.Error()
continue
}
cli := services.GetSelfAPI(*GetHost(dcModel.Hosts)) // If we are here, we already check that host exists and is reachable
data, resp, err := cli.ScheduleApi.ScheduleControllerCreateSchedule(context.Background(),
services.DC_NAME,
workflowName,
currentWorkflow.Schedules.Cron,
int32(currentWorkflow.Schedules.Duration),
currentWorkflow.Schedules.StartDate.Format(time.RFC3339),
currentWorkflow.Schedules.StopDate.Format(time.RFC3339),
swagger.ModelsExecutionRequirementsModel{
Cpus: int32(execReq.CPUs),
Gpus: int32(execReq.GPUs),
Ram: int32(execReq.RAM),
},
)
if err != nil {
allBooked = false
myRet[i].ErrorMessage = err.Error()
swErr, ok := err.(swagger.GenericSwaggerError)
if ok {
myRet[i].IsReachable = true
myRet[i].ErrorMessage += ": " + string(swErr.Body())
}
continue
}
if resp.StatusCode == 200 {
//FIXME: Maybe some better way of casting?
var nextExec [5]string
for counter := 0; counter < 5; counter++ {
nextExec[counter] = data.NextExecutions[counter]
}
myRet[i].Booked = &ScheduleInfo{
Total: int(data.Total),
NextExecutions: nextExec,
}
}
}
// If some DC fail, we must not mark the workflow as booked
if !allBooked {
return
}
currentWorkflow.Schedules.IsBooked = true
_, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": username},
primitive.M{"$set": primitive.M{
WorkflowDB + "." +
workflowName + "." +
SchedulesDB: &currentWorkflow.Schedules}},
)
if err != nil {
logs.Critical("Internal error when updating in DB: " + err.Error())
}
return myRet, nil
}

418
models/workspace.go Normal file
View File

@@ -0,0 +1,418 @@
package models
import (
"context"
"errors"
"cloud.o-forge.io/core/oc-catalog/models/rtype"
"cloud.o-forge.io/core/oc-catalog/services"
"github.com/beego/beego/v2/core/logs"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
)
// Assure consistency by using a const which refers to the MongoDB entry name
// Workspace.Projects
const WorkflowDB = "workflows"
type Workspace struct {
UserID string `bson:"_id" json:"user_id"`
Workflows map[string]Workflow //WorkflowDB
// ID: rtype
Data []string `json:"data"`
Computing []string `json:"computing"`
Datacenter []string `json:"datacenter"`
Storage []string `json:"storage"`
}
type ResourceModel interface {
getRtype() rtype.Rtype
getName() string
}
func (w Workspace) getRtype(rID string) (resModel rtype.Rtype) {
for _, compVal := range w.Computing {
if compVal == rID {
return rtype.COMPUTING
}
}
for _, datVal := range w.Data {
if datVal == rID {
return rtype.DATA
}
}
for _, storVal := range w.Storage {
if storVal == rID {
return rtype.STORAGE
}
}
for _, datcentVal := range w.Datacenter {
if datcentVal == rID {
return rtype.DATACENTER
}
}
return rtype.INVALID
}
func (w *Workspace) GetResources() map[rtype.Rtype][]string {
return map[rtype.Rtype][]string{
rtype.DATA: w.Data,
rtype.COMPUTING: w.Computing,
rtype.STORAGE: w.Storage,
rtype.DATACENTER: w.Datacenter,
}
}
func (w *Workspace) GetWorkflow(workflowName string) *Workflow {
var proj Workflow
proj = w.Workflows[workflowName]
return &proj
}
func (w *Workspace) GetWorkflows() []string {
if len(w.Workflows) == 0 {
return nil
}
workflowNames := make([]string, len(w.Workflows))
i := 0
for k := range w.Workflows {
workflowNames[i] = k
i++
}
return workflowNames
}
type WorkspaceModel struct {
UserID string `bson:"_id" json:"user_id"`
Data []DataModel `json:"data"`
Computing []ComputingModel `json:"computing"`
Datacenter []DatacenterModel `json:"datacenter"`
Storage []StorageModel `json:"storage"`
}
func ListFullWorkspace(userID string) (*WorkspaceModel, error) {
ws := GetWorkspace(userID)
if ws == nil {
return nil, errors.New("Internal error")
}
fws := &WorkspaceModel{
UserID: ws.UserID,
Data: []DataModel{},
Computing: []ComputingModel{},
Datacenter: []DatacenterModel{},
Storage: []StorageModel{},
}
pipeline := []primitive.M{
{"$match": primitive.M{"_id": userID}},
{"$lookup": primitive.M{
"localField": "data",
"from": services.MngoNamesCollection.DATA,
"foreignField": "_id",
"as": "data",
}},
{"$lookup": primitive.M{
"localField": "computing",
"from": services.MngoNamesCollection.COMPUTING,
"foreignField": "_id",
"as": "computing",
}},
{"$lookup": primitive.M{
"localField": "datacenter",
"from": services.MngoNamesCollection.DATACENTER,
"foreignField": "_id",
"as": "datacenter",
}},
{"$lookup": primitive.M{
"localField": "storage",
"from": services.MngoNamesCollection.STORAGE,
"foreignField": "_id",
"as": "storage",
}},
}
ret, err := services.MngoCollWorkspace.Aggregate(services.MngoCtx, pipeline)
if err != nil {
message := "Couldn't obtain subobjects"
logs.Debug(message + "; " + err.Error())
return nil, errors.New(message)
}
if ret.RemainingBatchLength() == 1 {
ret.Next(context.Background())
ret.Decode(&fws)
}
return fws, nil
}
// Contains tells whether a contains x.
func contains(a []string, x string) bool {
for _, n := range a {
if x == n {
return true
}
}
return false
}
func RemoveResource(userID, rID, rType string) error {
rIDObj, err := primitive.ObjectIDFromHex(rID)
if err != nil {
message := "ID " + rID + " is not valid"
logs.Debug(message + "; " + err.Error())
return errors.New(message)
}
result, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": userID},
primitive.M{"$pull": primitive.M{rType: rIDObj}},
)
if err != nil {
message := err.Error()
logs.Debug(message)
return errors.New(message)
}
if result.MatchedCount == 0 {
message := "No user " + userID + " in workspace"
logs.Debug(message)
return errors.New(message)
}
if result.ModifiedCount == 0 {
message := "No rID " + rID + " in rtype " + rType
logs.Debug(message)
return errors.New(message)
}
return nil
}
func (w *Workspace) updateDB() (err error) {
_, err = services.MngoCollWorkspace.ReplaceOne(services.MngoCtx,
primitive.M{"_id": w.UserID},
w,
)
return
}
func (w *Workspace) NewResource(rID string, rType string) (err error) {
var targetArray *[]string
switch rType {
case rtype.DATA.String():
targetArray = &w.Data
case rtype.COMPUTING.String():
targetArray = &w.Computing
case rtype.STORAGE.String():
targetArray = &w.Storage
case rtype.DATACENTER.String():
targetArray = &w.Datacenter
default:
return errors.New("Rtype " + rType + " is not valid")
}
for _, models := range *targetArray {
if models == rID {
return errors.New("Resource " + rID + " of type " + rType +
" is already registered for user " + w.UserID)
}
}
*targetArray = append(*targetArray, rID)
w.updateDB()
return
}
func AddResource(userID, rID, rType string) (err error) {
var rIDObj *primitive.ObjectID
if rIDObj, err = IsValidResource(rID, rType); err != nil {
return err
}
//TODO: Maybe operate directly in the DB instead retriving the full object?
userWorkspace := GetWorkspace(userID)
// Exist in the DB
if userWorkspace != nil {
var targetArray []string
switch rType {
case rtype.DATA.String():
targetArray = userWorkspace.Data
case rtype.COMPUTING.String():
targetArray = userWorkspace.Computing
case rtype.STORAGE.String():
targetArray = userWorkspace.Storage
case rtype.DATACENTER.String():
targetArray = userWorkspace.Datacenter
default:
message := "Rtype " + rType + " is not valid"
logs.Debug(message)
return errors.New(message)
}
if ok := contains(targetArray, rID); ok {
// Element already registered
message := "Resource " + rID + " of type " + rType +
" is already registered for user " + userID
logs.Debug(message)
return errors.New(message)
}
// New element
// userWorkspace.ResourceList[rID] = rType
_, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": userID},
primitive.M{"$push": primitive.M{rType: rIDObj}},
)
if err != nil {
message := "Internal error when updating in DB"
logs.Debug(message + "; " + err.Error())
return errors.New(message)
}
return nil
}
return errors.New("Internal error")
}
func rTypeToCollection(rType string) (*mongo.Collection, error) {
switch rType {
case rtype.DATA.String():
return services.MngoCollData, nil
case rtype.COMPUTING.String():
return services.MngoCollComputing, nil
case rtype.DATACENTER.String():
return services.MngoCollDatacenter, nil
case rtype.STORAGE.String():
return services.MngoCollStorage, nil
}
message := rType + " is not a valid resource type"
logs.Debug(message)
return nil, errors.New(message)
}
func IsValidResource(rID, rType string) (*primitive.ObjectID, error) {
targetColl, err := rTypeToCollection(rType)
if err != nil {
return nil, err
}
rIDObj, err := primitive.ObjectIDFromHex(rID)
if err != nil {
message := "ID " + rID + " is not valid"
logs.Debug(message + "; " + err.Error())
return nil, errors.New(message)
}
result := targetColl.FindOne(services.MngoCtx, primitive.M{"_id": rIDObj})
if result.Err() != nil {
message := "ID " + rID + " doesn't exist for resource type " + rType
logs.Debug(message + "; " + result.Err().Error())
return nil, errors.New(message)
}
return &rIDObj, nil
}
func GetAllWorkspaces() <-chan *Workspace {
ch := make(chan *Workspace)
go func() {
cursor, err := services.MngoCollWorkspace.Find(services.MngoCtx, primitive.M{})
if err != nil {
logs.Error(cursor.Err())
close(ch)
}
for cursor.Next(services.MngoCtx) {
var item Workspace
if err = cursor.Decode(&item); err != nil {
logs.Error(err)
close(ch)
}
ch <- &item
}
close(ch) // Remember to close or the loop never ends!
}()
return ch
}
func (w *Workspace) GetAllWorkspacesProjects() <-chan *Workflow {
ch := make(chan *Workflow)
go func() {
for _, wproj := range w.Workflows {
ch <- &wproj
}
close(ch)
}()
return ch
}
func GetWorkspace(userID string) (retObj *Workspace) {
if err := services.MngoCollWorkspace.FindOne(services.MngoCtx, primitive.M{"_id": userID}).Decode(&retObj); err != nil {
logs.Error(err.Error())
return nil
}
return
}
func NewWorkspace(userID string) (*Workspace, error) {
newWsp := &Workspace{
UserID: userID,
Data: []string{},
Computing: []string{},
Datacenter: []string{},
Storage: []string{},
}
_, err := services.MngoCollWorkspace.InsertOne(services.MngoCtx, newWsp)
if err != nil {
logs.Warning(err.Error())
return nil, err
}
return newWsp, nil
}