323 lines
8.9 KiB
Go
323 lines
8.9 KiB
Go
package models
|
|
|
|
import (
|
|
"errors"
|
|
"time"
|
|
|
|
"cloud.o-forge.io/core/deprecated-oc-catalog/services"
|
|
"github.com/beego/beego/v2/core/logs"
|
|
"github.com/vk496/cron"
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
"go.mongodb.org/mongo-driver/mongo"
|
|
"go.mongodb.org/mongo-driver/mongo/options"
|
|
)
|
|
|
|
const MAX_DATES uint = 1000 // Max number of dates to retrieve
|
|
const MAX_SCHEDULES int = 200
|
|
|
|
type ScheduleDB struct {
|
|
StartDate time.Time
|
|
StopDate time.Time
|
|
Workflow string
|
|
ResourceQty ExecutionRequirementsModel
|
|
}
|
|
|
|
type ScheduleInfo struct {
|
|
Total int
|
|
NextExecutions [5]string
|
|
}
|
|
|
|
type WorkflowSchedule struct {
|
|
IsService bool `description:"Service: true, Task: false"`
|
|
StartDate time.Time
|
|
StopDate time.Time
|
|
Cron string `json:"cron"`
|
|
Duration uint `json:"duration" description:"Durantion in seconds" example:"7200"`
|
|
Events string `json:"events"`
|
|
IsBooked bool `json:"isBooked"`
|
|
}
|
|
|
|
func timeBetween(base, t1, t2 time.Time) bool {
|
|
if t2.Before(t1) {
|
|
return false
|
|
}
|
|
|
|
if (t1.Before(base) || t1.Equal(base)) && (t2.After(base) || t2.Equal(base)) {
|
|
return true
|
|
} else {
|
|
return false
|
|
}
|
|
}
|
|
|
|
func CheckSchedule(cronString string, duration uint, cronFirstDate, cronLastDate time.Time, desiredToSchedule ExecutionRequirementsModel) (err error) {
|
|
// ########################################
|
|
// TODO: Redesign the whole flow for all possible cases. Meanwhile, use it like this
|
|
// ########################################
|
|
//
|
|
// Here we have already a timeslot filled by other cron Task. So we must check if there
|
|
// is still resources available
|
|
|
|
// However, we could have 3 possibilities here:
|
|
//
|
|
//
|
|
// ## Task with start and stop Date in the DB ##
|
|
//
|
|
// startDate stopDate
|
|
// .-----------------------------·
|
|
// | |
|
|
// ·-----------------------------·
|
|
//
|
|
//
|
|
// New tasks that have conflicts with what we have in the DB
|
|
//
|
|
// CASE1 (beggining):
|
|
// .--------·
|
|
// |########W|
|
|
// ·--------·
|
|
//
|
|
// CASE2 (end):
|
|
// .--------·
|
|
// |########|
|
|
// ·--------·
|
|
//
|
|
// CASE3 (middle):
|
|
// .--------·
|
|
// |########|
|
|
// ·--------·
|
|
//
|
|
// CASE4 (multiple):
|
|
// .-----· .-· .-----· .----------------·
|
|
// |#####| |#| |#####| |################|
|
|
// ·-----· ·-· ·-----· ·----------------·
|
|
//
|
|
//
|
|
// The first 3 cases are trivial. But in the 4th case, we must get the sum of the resources
|
|
// The same could happen in the opposite, where cases are DB entries
|
|
|
|
cron, err := cron.Parse(cronString)
|
|
if err != nil {
|
|
return errors.New("Bad cron message: " + err.Error())
|
|
}
|
|
|
|
dcModel := GetDatacenterFromAcronym(services.DC_NAME)
|
|
if dcModel == nil {
|
|
return errors.New("The DC " + services.DC_NAME + " doesn't have any DC model with that acronym")
|
|
}
|
|
|
|
if desiredToSchedule.CPUs > dcModel.GetTotalCPUs() {
|
|
return errors.New("Requested more CPUs than DC have")
|
|
}
|
|
if desiredToSchedule.GPUs > dcModel.GetTotalGPUs() {
|
|
return errors.New("Requested more GPUs than DC have")
|
|
}
|
|
if desiredToSchedule.RAM > dcModel.GetTotalRAM() {
|
|
return errors.New("Requested more RAM than DC have")
|
|
}
|
|
|
|
var lastMongoDBdate ScheduleDB
|
|
dberr := services.MngoCollSchedule.FindOne(services.MngoCtx,
|
|
primitive.M{},
|
|
options.FindOne().SetSort(primitive.D{{"stopdate", -1}}),
|
|
).Decode(&lastMongoDBdate)
|
|
|
|
if dberr != nil {
|
|
if dberr == mongo.ErrNoDocuments {
|
|
// The database is empty. We can book without problems
|
|
return
|
|
}
|
|
return dberr
|
|
}
|
|
|
|
// for cursor.Next(services.MngoCtx) {
|
|
// var item Workspace
|
|
// if err = cursor.Decode(&item); err != nil {
|
|
// logs.Error(err)
|
|
// close(ch)
|
|
// }
|
|
// }
|
|
// var cronScheduleStart, cronScheduleStop time.Time
|
|
|
|
// cronScheduleStart = cron.Next(cronFirstDate) // Get the first execution
|
|
|
|
for cronScheduleStart := cron.Next(cronFirstDate); !cronScheduleStart.IsZero() && cronScheduleStart.Before(lastMongoDBdate.StopDate); cronScheduleStart = cron.Next(cronScheduleStart) {
|
|
cronScheduleStop := cronScheduleStart.Add(time.Second * time.Duration(duration))
|
|
if cronScheduleStop.After(cronLastDate) || cronScheduleStart.Before(cronFirstDate) {
|
|
// We skip values that are in the middle of the limits
|
|
continue
|
|
}
|
|
// ###########################
|
|
|
|
cursor, err := services.MngoCollSchedule.Find(services.MngoCtx,
|
|
primitive.M{"$or": primitive.A{
|
|
primitive.M{"startdate": primitive.M{
|
|
"$gte": cronScheduleStart,
|
|
"$lte": cronScheduleStop,
|
|
}},
|
|
primitive.M{"stopdate": primitive.M{
|
|
"$gte": cronScheduleStart,
|
|
"$lte": cronScheduleStop,
|
|
}},
|
|
primitive.M{"$and": primitive.A{
|
|
primitive.M{"startdate": primitive.M{
|
|
"$lte": cronScheduleStart,
|
|
}},
|
|
primitive.M{"stopdate": primitive.M{
|
|
"$gte": cronScheduleStop,
|
|
}},
|
|
}},
|
|
}},
|
|
// options.Find().SetSort(primitive.D{{"startdate", 1}}),
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var items []ScheduleDB
|
|
cursor.All(services.MngoCtx, &items)
|
|
|
|
if len(items) == 0 {
|
|
// A empty time slot. Available
|
|
continue
|
|
}
|
|
|
|
// There is some workflows booked here. We must check if there is remaining resources
|
|
var alreadScheduled ExecutionRequirementsModel
|
|
|
|
for _, scheduled := range items {
|
|
alreadScheduled.CPUs += scheduled.ResourceQty.CPUs
|
|
alreadScheduled.GPUs += scheduled.ResourceQty.GPUs
|
|
alreadScheduled.RAM += scheduled.ResourceQty.RAM
|
|
}
|
|
|
|
if alreadScheduled.CPUs+desiredToSchedule.CPUs > dcModel.GetTotalCPUs() {
|
|
return errors.New("Not enough CPU capacity from date " + cronScheduleStart.UTC().String() + " to " + cronScheduleStop.UTC().String())
|
|
}
|
|
|
|
if alreadScheduled.GPUs+desiredToSchedule.GPUs > dcModel.GetTotalGPUs() {
|
|
return errors.New("Not enough GPU capacity from date " + cronScheduleStart.UTC().String() + " to " + cronScheduleStop.UTC().String())
|
|
}
|
|
|
|
if alreadScheduled.RAM+desiredToSchedule.RAM > dcModel.GetTotalRAM() {
|
|
return errors.New("Not enough RAM capacity from date " + cronScheduleStart.UTC().String() + " to " + cronScheduleStop.UTC().String())
|
|
}
|
|
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func CreateScheduleWorkflow(dcName, userID, workflowName, cronString string, duration uint, startDate, stopDate time.Time, requirements ExecutionRequirementsModel) (ret ScheduleInfo, err error) {
|
|
|
|
//TODO: Check that dcName is correct
|
|
|
|
err = CheckSchedule(cronString, duration, startDate, stopDate, requirements)
|
|
if err != nil {
|
|
return ret, err
|
|
}
|
|
|
|
// Already checked possible errors
|
|
myCron, _ := cron.Parse(cronString)
|
|
|
|
scheduledTimeStart := myCron.Next(time.Now().UTC()) // Get the first execution
|
|
|
|
counter := 0
|
|
|
|
var uploadSchedule []interface{}
|
|
|
|
for !scheduledTimeStart.IsZero() && counter < MAX_SCHEDULES {
|
|
scheduledTimeStop := scheduledTimeStart.Add(time.Second * time.Duration(duration))
|
|
if scheduledTimeStop.After(stopDate) || scheduledTimeStart.Before(startDate) {
|
|
// We skip values that are in the middle of the limits
|
|
scheduledTimeStart = myCron.Next(scheduledTimeStart)
|
|
counter++
|
|
continue
|
|
}
|
|
|
|
uploadSchedule = append(uploadSchedule, ScheduleDB{
|
|
StartDate: scheduledTimeStart,
|
|
StopDate: scheduledTimeStop,
|
|
Workflow: dcName + "." + userID + "." + workflowName,
|
|
ResourceQty: requirements, //stub
|
|
})
|
|
|
|
scheduledTimeStart = myCron.Next(scheduledTimeStart)
|
|
counter++
|
|
}
|
|
|
|
//FIXME: Consider doing something with the inserting result
|
|
_, err = services.MngoCollSchedule.InsertMany(services.MngoCtx, uploadSchedule)
|
|
if err != nil {
|
|
logs.Error(err)
|
|
}
|
|
|
|
ret.Total = len(uploadSchedule)
|
|
for i := 0; i < 5 && len(uploadSchedule) > i; i++ {
|
|
elem := uploadSchedule[i].(ScheduleDB)
|
|
ret.NextExecutions[i] = elem.StartDate.String()
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
func GetFarSchedules(baseDate time.Time, isNext bool) *time.Time {
|
|
|
|
operator := "$gt"
|
|
if !isNext {
|
|
// Previous to this date
|
|
operator = "$lt"
|
|
}
|
|
|
|
var res *ScheduleDB
|
|
dberr := services.MngoCollSchedule.FindOne(services.MngoCtx,
|
|
primitive.M{"startdate": primitive.M{
|
|
operator: baseDate,
|
|
}},
|
|
options.FindOne().SetSort(primitive.D{{"startdate", 1}})).Decode(&res)
|
|
|
|
if dberr != nil {
|
|
logs.Error(dberr)
|
|
return nil
|
|
}
|
|
|
|
return &res.StartDate
|
|
}
|
|
|
|
func GetSchedules(startDate, stopDate time.Time) (data []ScheduleDB, maxLimit bool, err error) {
|
|
|
|
if startDate.After(stopDate) {
|
|
return nil, false, errors.New("stopDate must be after startDate")
|
|
}
|
|
|
|
// Range of 35 days as max
|
|
if startDate.Add(24 * time.Hour * time.Duration(35)).Before(stopDate) {
|
|
return nil, false, errors.New("Must be less than 35 days between startDate and stopDate")
|
|
}
|
|
|
|
//FIXME: Discuss if we should check old schedules
|
|
// if startDate.Before(time.Now().UTC()) {
|
|
// return nil, false, errors.New("TimeServer is " + time.Now().UTC().String() + " but your startDate is " + startDate.String())
|
|
// }
|
|
|
|
firstDateCur, err := services.MngoCollSchedule.Find(services.MngoCtx,
|
|
primitive.M{"startdate": primitive.M{
|
|
"$gte": startDate,
|
|
"$lte": stopDate,
|
|
}},
|
|
options.Find().SetLimit(int64(MAX_DATES)),
|
|
)
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
firstDateCur.All(services.MngoCtx, &data)
|
|
|
|
if len(data) == int(MAX_DATES) {
|
|
maxLimit = true
|
|
}
|
|
|
|
return
|
|
|
|
}
|