package models import ( "errors" "time" "cloud.o-forge.io/core/oc-catalog/services" "github.com/beego/beego/v2/core/logs" "github.com/vk496/cron" "go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) const MAX_DATES uint = 1000 // Max number of dates to retrieve const MAX_SCHEDULES int = 200 type ScheduleDB struct { StartDate time.Time StopDate time.Time Workflow string ResourceQty ExecutionRequirementsModel } type ScheduleInfo struct { Total int NextExecutions [5]string } type WorkflowSchedule struct { IsService bool `description:"Service: true, Task: false"` StartDate time.Time StopDate time.Time Cron string `json:"cron"` Duration uint `json:"duration" description:"Durantion in seconds" example:"7200"` Events string `json:"events"` IsBooked bool `json:"isBooked"` } func timeBetween(base, t1, t2 time.Time) bool { if t2.Before(t1) { return false } if (t1.Before(base) || t1.Equal(base)) && (t2.After(base) || t2.Equal(base)) { return true } else { return false } } func CheckSchedule(cronString string, duration uint, cronFirstDate, cronLastDate time.Time, desiredToSchedule ExecutionRequirementsModel) (err error) { // ######################################## // TODO: Redesign the whole flow for all possible cases. Meanwhile, use it like this // ######################################## // // Here we have already a timeslot filled by other cron Task. So we must check if there // is still resources available // However, we could have 3 possibilities here: // // // ## Task with start and stop Date in the DB ## // // startDate stopDate // .-----------------------------· // | | // ·-----------------------------· // // // New tasks that have conflicts with what we have in the DB // // CASE1 (beggining): // .--------· // |########W| // ·--------· // // CASE2 (end): // .--------· // |########| // ·--------· // // CASE3 (middle): // .--------· // |########| // ·--------· // // CASE4 (multiple): // .-----· .-· .-----· .----------------· // |#####| |#| |#####| |################| // ·-----· ·-· ·-----· ·----------------· // // // The first 3 cases are trivial. But in the 4th case, we must get the sum of the resources // The same could happen in the opposite, where cases are DB entries cron, err := cron.Parse(cronString) if err != nil { return errors.New("Bad cron message: " + err.Error()) } dcModel := GetDatacenterFromAcronym(services.DC_NAME) if dcModel == nil { return errors.New("The DC " + services.DC_NAME + " doesn't have any DC model with that acronym") } if desiredToSchedule.CPUs > dcModel.GetTotalCPUs() { return errors.New("Requested more CPUs than DC have") } if desiredToSchedule.GPUs > dcModel.GetTotalGPUs() { return errors.New("Requested more GPUs than DC have") } if desiredToSchedule.RAM > dcModel.GetTotalRAM() { return errors.New("Requested more RAM than DC have") } var lastMongoDBdate ScheduleDB dberr := services.MngoCollSchedule.FindOne(services.MngoCtx, primitive.M{}, options.FindOne().SetSort(primitive.D{{"stopdate", -1}}), ).Decode(&lastMongoDBdate) if dberr != nil { if dberr == mongo.ErrNoDocuments { // The database is empty. We can book without problems return } return dberr } // for cursor.Next(services.MngoCtx) { // var item Workspace // if err = cursor.Decode(&item); err != nil { // logs.Error(err) // close(ch) // } // } // var cronScheduleStart, cronScheduleStop time.Time // cronScheduleStart = cron.Next(cronFirstDate) // Get the first execution for cronScheduleStart := cron.Next(cronFirstDate); !cronScheduleStart.IsZero() && cronScheduleStart.Before(lastMongoDBdate.StopDate); cronScheduleStart = cron.Next(cronScheduleStart) { cronScheduleStop := cronScheduleStart.Add(time.Second * time.Duration(duration)) if cronScheduleStop.After(cronLastDate) || cronScheduleStart.Before(cronFirstDate) { // We skip values that are in the middle of the limits continue } // ########################### cursor, err := services.MngoCollSchedule.Find(services.MngoCtx, primitive.M{"$or": primitive.A{ primitive.M{"startdate": primitive.M{ "$gte": cronScheduleStart, "$lte": cronScheduleStop, }}, primitive.M{"stopdate": primitive.M{ "$gte": cronScheduleStart, "$lte": cronScheduleStop, }}, primitive.M{"$and": primitive.A{ primitive.M{"startdate": primitive.M{ "$lte": cronScheduleStart, }}, primitive.M{"stopdate": primitive.M{ "$gte": cronScheduleStop, }}, }}, }}, // options.Find().SetSort(primitive.D{{"startdate", 1}}), ) if err != nil { return err } var items []ScheduleDB cursor.All(services.MngoCtx, &items) if len(items) == 0 { // A empty time slot. Available continue } // There is some workflows booked here. We must check if there is remaining resources var alreadScheduled ExecutionRequirementsModel for _, scheduled := range items { alreadScheduled.CPUs += scheduled.ResourceQty.CPUs alreadScheduled.GPUs += scheduled.ResourceQty.GPUs alreadScheduled.RAM += scheduled.ResourceQty.RAM } if alreadScheduled.CPUs+desiredToSchedule.CPUs > dcModel.GetTotalCPUs() { return errors.New("Not enough CPU capacity from date " + cronScheduleStart.UTC().String() + " to " + cronScheduleStop.UTC().String()) } if alreadScheduled.GPUs+desiredToSchedule.GPUs > dcModel.GetTotalGPUs() { return errors.New("Not enough GPU capacity from date " + cronScheduleStart.UTC().String() + " to " + cronScheduleStop.UTC().String()) } if alreadScheduled.RAM+desiredToSchedule.RAM > dcModel.GetTotalRAM() { return errors.New("Not enough RAM capacity from date " + cronScheduleStart.UTC().String() + " to " + cronScheduleStop.UTC().String()) } } return } func CreateScheduleWorkflow(dcName, userID, workflowName, cronString string, duration uint, startDate, stopDate time.Time, requirements ExecutionRequirementsModel) (ret ScheduleInfo, err error) { //TODO: Check that dcName is correct err = CheckSchedule(cronString, duration, startDate, stopDate, requirements) if err != nil { return ret, err } // Already checked possible errors myCron, _ := cron.Parse(cronString) scheduledTimeStart := myCron.Next(time.Now().UTC()) // Get the first execution counter := 0 var uploadSchedule []interface{} for !scheduledTimeStart.IsZero() && counter < MAX_SCHEDULES { scheduledTimeStop := scheduledTimeStart.Add(time.Second * time.Duration(duration)) if scheduledTimeStop.After(stopDate) || scheduledTimeStart.Before(startDate) { // We skip values that are in the middle of the limits scheduledTimeStart = myCron.Next(scheduledTimeStart) counter++ continue } uploadSchedule = append(uploadSchedule, ScheduleDB{ StartDate: scheduledTimeStart, StopDate: scheduledTimeStop, Workflow: dcName + "." + userID + "." + workflowName, ResourceQty: requirements, //stub }) scheduledTimeStart = myCron.Next(scheduledTimeStart) counter++ } //FIXME: Consider doing something with the inserting result _, err = services.MngoCollSchedule.InsertMany(services.MngoCtx, uploadSchedule) if err != nil { logs.Error(err) } ret.Total = len(uploadSchedule) for i := 0; i < 5 && len(uploadSchedule) > i; i++ { elem := uploadSchedule[i].(ScheduleDB) ret.NextExecutions[i] = elem.StartDate.String() } return } func GetFarSchedules(baseDate time.Time, isNext bool) *time.Time { operator := "$gt" if !isNext { // Previous to this date operator = "$lt" } var res *ScheduleDB dberr := services.MngoCollSchedule.FindOne(services.MngoCtx, primitive.M{"startdate": primitive.M{ operator: baseDate, }}, options.FindOne().SetSort(primitive.D{{"startdate", 1}})).Decode(&res) if dberr != nil { logs.Error(dberr) return nil } return &res.StartDate } func GetSchedules(startDate, stopDate time.Time) (data []ScheduleDB, maxLimit bool, err error) { if startDate.After(stopDate) { return nil, false, errors.New("stopDate must be after startDate") } // Range of 35 days as max if startDate.Add(24 * time.Hour * time.Duration(35)).Before(stopDate) { return nil, false, errors.New("Must be less than 35 days between startDate and stopDate") } //FIXME: Discuss if we should check old schedules // if startDate.Before(time.Now().UTC()) { // return nil, false, errors.New("TimeServer is " + time.Now().UTC().String() + " but your startDate is " + startDate.String()) // } firstDateCur, err := services.MngoCollSchedule.Find(services.MngoCtx, primitive.M{"startdate": primitive.M{ "$gte": startDate, "$lte": stopDate, }}, options.Find().SetLimit(int64(MAX_DATES)), ) if err != nil { return } firstDateCur.All(services.MngoCtx, &data) if len(data) == int(MAX_DATES) { maxLimit = true } return }