oc-catalog/models/schedule.go

323 lines
8.9 KiB
Go

package models
import (
"errors"
"time"
"cloud.o-forge.io/core/oc-catalog/services"
"github.com/beego/beego/v2/core/logs"
"github.com/vk496/cron"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
const MAX_DATES uint = 1000 // Max number of dates to retrieve
const MAX_SCHEDULES int = 200
type ScheduleDB struct {
StartDate time.Time
StopDate time.Time
Workflow string
ResourceQty ExecutionRequirementsModel
}
type ScheduleInfo struct {
Total int
NextExecutions [5]string
}
type WorkflowSchedule struct {
IsService bool `description:"Service: true, Task: false"`
StartDate time.Time
StopDate time.Time
Cron string `json:"cron"`
Duration uint `json:"duration" description:"Durantion in seconds" example:"7200"`
Events string `json:"events"`
IsBooked bool `json:"isBooked"`
}
func timeBetween(base, t1, t2 time.Time) bool {
if t2.Before(t1) {
return false
}
if (t1.Before(base) || t1.Equal(base)) && (t2.After(base) || t2.Equal(base)) {
return true
} else {
return false
}
}
func CheckSchedule(cronString string, duration uint, cronFirstDate, cronLastDate time.Time, desiredToSchedule ExecutionRequirementsModel) (err error) {
// ########################################
// TODO: Redesign the whole flow for all possible cases. Meanwhile, use it like this
// ########################################
//
// Here we have already a timeslot filled by other cron Task. So we must check if there
// is still resources available
// However, we could have 3 possibilities here:
//
//
// ## Task with start and stop Date in the DB ##
//
// startDate stopDate
// .-----------------------------·
// | |
// ·-----------------------------·
//
//
// New tasks that have conflicts with what we have in the DB
//
// CASE1 (beggining):
// .--------·
// |########W|
// ·--------·
//
// CASE2 (end):
// .--------·
// |########|
// ·--------·
//
// CASE3 (middle):
// .--------·
// |########|
// ·--------·
//
// CASE4 (multiple):
// .-----· .-· .-----· .----------------·
// |#####| |#| |#####| |################|
// ·-----· ·-· ·-----· ·----------------·
//
//
// The first 3 cases are trivial. But in the 4th case, we must get the sum of the resources
// The same could happen in the opposite, where cases are DB entries
cron, err := cron.Parse(cronString)
if err != nil {
return errors.New("Bad cron message: " + err.Error())
}
dcModel := GetDatacenterFromAcronym(services.DC_NAME)
if dcModel == nil {
return errors.New("The DC " + services.DC_NAME + " doesn't have any DC model with that acronym")
}
if desiredToSchedule.CPUs > dcModel.GetTotalCPUs() {
return errors.New("Requested more CPUs than DC have")
}
if desiredToSchedule.GPUs > dcModel.GetTotalGPUs() {
return errors.New("Requested more GPUs than DC have")
}
if desiredToSchedule.RAM > dcModel.GetTotalRAM() {
return errors.New("Requested more RAM than DC have")
}
var lastMongoDBdate ScheduleDB
dberr := services.MngoCollSchedule.FindOne(services.MngoCtx,
primitive.M{},
options.FindOne().SetSort(primitive.D{{"stopdate", -1}}),
).Decode(&lastMongoDBdate)
if dberr != nil {
if dberr == mongo.ErrNoDocuments {
// The database is empty. We can book without problems
return
}
return dberr
}
// for cursor.Next(services.MngoCtx) {
// var item Workspace
// if err = cursor.Decode(&item); err != nil {
// logs.Error(err)
// close(ch)
// }
// }
// var cronScheduleStart, cronScheduleStop time.Time
// cronScheduleStart = cron.Next(cronFirstDate) // Get the first execution
for cronScheduleStart := cron.Next(cronFirstDate); !cronScheduleStart.IsZero() && cronScheduleStart.Before(lastMongoDBdate.StopDate); cronScheduleStart = cron.Next(cronScheduleStart) {
cronScheduleStop := cronScheduleStart.Add(time.Second * time.Duration(duration))
if cronScheduleStop.After(cronLastDate) || cronScheduleStart.Before(cronFirstDate) {
// We skip values that are in the middle of the limits
continue
}
// ###########################
cursor, err := services.MngoCollSchedule.Find(services.MngoCtx,
primitive.M{"$or": primitive.A{
primitive.M{"startdate": primitive.M{
"$gte": cronScheduleStart,
"$lte": cronScheduleStop,
}},
primitive.M{"stopdate": primitive.M{
"$gte": cronScheduleStart,
"$lte": cronScheduleStop,
}},
primitive.M{"$and": primitive.A{
primitive.M{"startdate": primitive.M{
"$lte": cronScheduleStart,
}},
primitive.M{"stopdate": primitive.M{
"$gte": cronScheduleStop,
}},
}},
}},
// options.Find().SetSort(primitive.D{{"startdate", 1}}),
)
if err != nil {
return err
}
var items []ScheduleDB
cursor.All(services.MngoCtx, &items)
if len(items) == 0 {
// A empty time slot. Available
continue
}
// There is some workflows booked here. We must check if there is remaining resources
var alreadScheduled ExecutionRequirementsModel
for _, scheduled := range items {
alreadScheduled.CPUs += scheduled.ResourceQty.CPUs
alreadScheduled.GPUs += scheduled.ResourceQty.GPUs
alreadScheduled.RAM += scheduled.ResourceQty.RAM
}
if alreadScheduled.CPUs+desiredToSchedule.CPUs > dcModel.GetTotalCPUs() {
return errors.New("Not enough CPU capacity from date " + cronScheduleStart.UTC().String() + " to " + cronScheduleStop.UTC().String())
}
if alreadScheduled.GPUs+desiredToSchedule.GPUs > dcModel.GetTotalGPUs() {
return errors.New("Not enough GPU capacity from date " + cronScheduleStart.UTC().String() + " to " + cronScheduleStop.UTC().String())
}
if alreadScheduled.RAM+desiredToSchedule.RAM > dcModel.GetTotalRAM() {
return errors.New("Not enough RAM capacity from date " + cronScheduleStart.UTC().String() + " to " + cronScheduleStop.UTC().String())
}
}
return
}
func CreateScheduleWorkflow(dcName, userID, workflowName, cronString string, duration uint, startDate, stopDate time.Time, requirements ExecutionRequirementsModel) (ret ScheduleInfo, err error) {
//TODO: Check that dcName is correct
err = CheckSchedule(cronString, duration, startDate, stopDate, requirements)
if err != nil {
return ret, err
}
// Already checked possible errors
myCron, _ := cron.Parse(cronString)
scheduledTimeStart := myCron.Next(time.Now().UTC()) // Get the first execution
counter := 0
var uploadSchedule []interface{}
for !scheduledTimeStart.IsZero() && counter < MAX_SCHEDULES {
scheduledTimeStop := scheduledTimeStart.Add(time.Second * time.Duration(duration))
if scheduledTimeStop.After(stopDate) || scheduledTimeStart.Before(startDate) {
// We skip values that are in the middle of the limits
scheduledTimeStart = myCron.Next(scheduledTimeStart)
counter++
continue
}
uploadSchedule = append(uploadSchedule, ScheduleDB{
StartDate: scheduledTimeStart,
StopDate: scheduledTimeStop,
Workflow: dcName + "." + userID + "." + workflowName,
ResourceQty: requirements, //stub
})
scheduledTimeStart = myCron.Next(scheduledTimeStart)
counter++
}
//FIXME: Consider doing something with the inserting result
_, err = services.MngoCollSchedule.InsertMany(services.MngoCtx, uploadSchedule)
if err != nil {
logs.Error(err)
}
ret.Total = len(uploadSchedule)
for i := 0; i < 5 && len(uploadSchedule) > i; i++ {
elem := uploadSchedule[i].(ScheduleDB)
ret.NextExecutions[i] = elem.StartDate.String()
}
return
}
func GetFarSchedules(baseDate time.Time, isNext bool) *time.Time {
operator := "$gt"
if !isNext {
// Previous to this date
operator = "$lt"
}
var res *ScheduleDB
dberr := services.MngoCollSchedule.FindOne(services.MngoCtx,
primitive.M{"startdate": primitive.M{
operator: baseDate,
}},
options.FindOne().SetSort(primitive.D{{"startdate", 1}})).Decode(&res)
if dberr != nil {
logs.Error(dberr)
return nil
}
return &res.StartDate
}
func GetSchedules(startDate, stopDate time.Time) (data []ScheduleDB, maxLimit bool, err error) {
if startDate.After(stopDate) {
return nil, false, errors.New("stopDate must be after startDate")
}
// Range of 35 days as max
if startDate.Add(24 * time.Hour * time.Duration(35)).Before(stopDate) {
return nil, false, errors.New("Must be less than 35 days between startDate and stopDate")
}
//FIXME: Discuss if we should check old schedules
// if startDate.Before(time.Now().UTC()) {
// return nil, false, errors.New("TimeServer is " + time.Now().UTC().String() + " but your startDate is " + startDate.String())
// }
firstDateCur, err := services.MngoCollSchedule.Find(services.MngoCtx,
primitive.M{"startdate": primitive.M{
"$gte": startDate,
"$lte": stopDate,
}},
options.Find().SetLimit(int64(MAX_DATES)),
)
if err != nil {
return
}
firstDateCur.All(services.MngoCtx, &data)
if len(data) == int(MAX_DATES) {
maxLimit = true
}
return
}