oc-catalog/models/workflow.go

998 lines
26 KiB
Go

package models
import (
"context"
"encoding/xml"
"errors"
"sort"
"time"
"cloud.o-forge.io/core/oc-catalog/models/rtype"
swagger "cloud.o-forge.io/core/oc-catalog/selfapi"
"cloud.o-forge.io/core/oc-catalog/services"
"github.com/beego/beego/v2/core/logs"
"github.com/vk496/cron"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type LinkingState uint
// When we check the schedule of a workflow, we report with this
type DCstatus struct {
DCname string
DCobjID string //FIXME: Probably should be model ID
IsReachable bool
IsAvailable bool
Booked *ScheduleInfo
ErrorMessage string
}
const (
NO_LINK LinkingState = iota
INPUT
OUTPUT
)
func boolToLinkingState(boolState bool) LinkingState {
switch boolState {
case true:
return INPUT
case false:
return OUTPUT
default:
return NO_LINK
}
}
const SchedulesDB = "schedules"
type Workflow struct {
// The key of the map is the ID of the object itself
Data map[string]DataObject `json:"data"`
Computing map[string]ComputingObject `json:"computing"`
Storage map[string]StorageObject `json:"storage"`
Datacenter map[string]DatacenterObject `json:"datacenter"` //TODO: Decide if there should be multiple objects of a datacenter
Schedules WorkflowSchedule `json:"schedules"`
MxgraphXML string `description:"State of the mxgraph"`
}
type ResourceObject interface {
getHost() *string
getName() *string
getModel() (ResourceModel, error)
getRtype() rtype.Rtype
setReference(rObjID primitive.ObjectID)
getReference() primitive.ObjectID
isLinked(rObjID string) LinkingState
addLink(direction LinkingState, rObjID string)
}
// Get a sum of all execution requirements attached to a DC obj
func (w Workflow) GetExecutionRequirements(dcIDobj string) (ret ExecutionRequirementsModel, err error) {
// Find the id of the DC obj
if _, ok := w.Datacenter[dcIDobj]; !ok {
return ExecutionRequirementsModel{}, errors.New("DC obj" + dcIDobj + " doesn't exist in the Workflow")
}
// Get all elements that are attached to the DC
for _, computingObj := range w.Computing {
if computingObj.DataCenterID == dcIDobj {
mymodel, err := computingObj.getModel()
if err != nil {
return ExecutionRequirementsModel{}, err
}
compModel := mymodel.(ComputingModel)
//TODO a generic way to concatenate execution requirements
ret.CPUs += compModel.ExecutionRequirements.CPUs
ret.GPUs += compModel.ExecutionRequirements.GPUs
ret.RAM += compModel.ExecutionRequirements.RAM
}
}
return
}
func (w *Workflow) GetResource(rObjID *string) (retObj ResourceObject) {
if rObjID == nil {
return nil
}
if storVal, ok := w.Data[*rObjID]; ok {
retObj = &storVal
return
}
if storVal, ok := w.Computing[*rObjID]; ok {
retObj = &storVal
return
}
if storVal, ok := w.Storage[*rObjID]; ok {
retObj = &storVal
return
}
if storVal, ok := w.Datacenter[*rObjID]; ok {
retObj = &storVal
return
}
return nil
}
func (w *Workflow) GetResourceMapByRtype(rt rtype.Rtype) interface{} {
switch rt {
case rtype.DATA:
return w.Data
case rtype.COMPUTING:
return w.Computing
case rtype.STORAGE:
return w.Storage
case rtype.DATACENTER:
return w.Datacenter
default:
return nil
}
}
func (w *Workflow) CreateResourceObject(rt rtype.Rtype) ResourceObject {
var res ResourceObject
switch rt {
case rtype.DATA:
res = &DataObject{}
case rtype.COMPUTING:
res = &ComputingObject{}
case rtype.STORAGE:
res = &StorageObject{}
case rtype.DATACENTER:
res = &DatacenterObject{}
default:
res = nil
}
return res
}
func (w *Workflow) AddObj(robj ResourceObject) *primitive.ObjectID {
outputID := primitive.NewObjectID()
w.UpdateObj(robj, outputID.Hex())
return &outputID
}
func (w *Workflow) UpdateDB(userID, workflowName string) error {
_, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": userID},
primitive.M{"$set": primitive.M{WorkflowDB + "." + workflowName: w}},
)
return err
}
func (w *Workflow) UpdateObj(robj ResourceObject, objID string) {
switch robj.getRtype() {
case rtype.DATA:
var target DataObject
if w.Data == nil {
//init
w.Data = make(map[string]DataObject)
}
target = *robj.(*DataObject)
w.Data[objID] = target
case rtype.COMPUTING:
var target ComputingObject
if w.Computing == nil {
//init
w.Computing = make(map[string]ComputingObject)
}
target = *robj.(*ComputingObject)
w.Computing[objID] = target
case rtype.STORAGE:
var target StorageObject
if w.Storage == nil {
//init
w.Storage = make(map[string]StorageObject)
}
target = *robj.(*StorageObject)
w.Storage[objID] = target
case rtype.DATACENTER:
var target DatacenterObject
if w.Datacenter == nil {
//init
w.Datacenter = make(map[string]DatacenterObject)
}
target = *robj.(*DatacenterObject)
w.Datacenter[objID] = target
}
}
func GetWorkflow(userID, workflowName string) (workflow *Workflow, err error) {
userWorkspace := GetWorkspace(userID)
if userWorkspace != nil {
if theWorkflow, ok := userWorkspace.Workflows[workflowName]; ok {
return &theWorkflow, nil
}
logs.Debug("No workflow name")
return nil, errors.New("No workflow name")
}
logs.Debug("No workspace")
return nil, errors.New("No workspace")
}
func CreateWorkflow(userID, workflowName string) (err error) {
//TODO: Maybe operate directly in the DB instead retriving the full object?
userWorkspace := GetWorkspace(userID)
// Exist in the DB
if userWorkspace != nil {
if _, ok := userWorkspace.Workflows[workflowName]; ok {
message := "Workspace workflow " + workflowName +
" is already created for user " + userID
logs.Debug(message)
return errors.New(message)
}
userWP := &Workflow{}
// New element
addElem := primitive.M{WorkflowDB + "." + workflowName: userWP}
// If user doesn't have workflows, we must init at least one
if userWorkspace.Workflows == nil {
addElem = primitive.M{WorkflowDB: map[string]*Workflow{
workflowName: userWP,
}}
}
_, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": userID},
primitive.M{"$set": addElem},
)
if err != nil {
message := "Internal error when updating in DB"
logs.Debug(message + "; " + err.Error())
return errors.New(message)
}
return nil
}
return errors.New("Can't create a workflow without a workspace")
}
func CreateObjectInWorkflow(userID, workflowName, rID string) (rObjID2 *string, err error) {
userWorkspace := GetWorkspace(userID)
if userWorkspace == nil {
return nil, errors.New("No workspace for user " + userID)
}
if _, ok := userWorkspace.Workflows[workflowName]; !ok {
return nil, errors.New("Workspace workflow " + workflowName + " doesn't exist for user " + userID)
}
rIDObj, err := primitive.ObjectIDFromHex(rID)
if err != nil {
return nil, errors.New("ID " + rID + " is not valid")
}
//TODO: We are replacing the entire array instead of pushing
// a new element. Probably will have problems with multithread/async
// operations and consistency in the future
for rtyp, resource := range userWorkspace.GetResources() {
if contains(resource, rID) {
wWorkflow := userWorkspace.Workflows[workflowName]
newObj := wWorkflow.CreateResourceObject(rtyp)
newObj.setReference(rIDObj)
outputID := wWorkflow.AddObj(newObj)
_, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": userID},
primitive.M{"$set": primitive.M{WorkflowDB + "." + workflowName + "." + rtyp.String(): wWorkflow.GetResourceMapByRtype(rtyp)}},
)
if err != nil {
return nil, errors.New("Internal error when updating in DB: " + err.Error())
}
outStr := outputID.Hex()
return &outStr, nil
}
}
return nil, errors.New("rID " + rID + " doesn't exist in the user workspace")
}
func LinkObjectsInWorkspace(userID, workflowName, rObjIDsource string, isInput bool, rObjIDtarger string) (err error) {
userWorkspace := GetWorkspace(userID)
if userWorkspace == nil {
return errors.New("No workspace for user " + userID)
}
if _, ok := userWorkspace.Workflows[workflowName]; !ok {
return errors.New("Workspace workflow " + workflowName + " doesn't exist for user " + userID)
}
// Check rObjIDsource
if _, ok := userWorkspace.Workflows[workflowName].Data[rObjIDsource]; !ok {
return errors.New("rObjIDsource must be of type DATA for now")
}
// Check rObjIDtarger
wWorkflow := userWorkspace.Workflows[workflowName]
resObjTarget := wWorkflow.GetResource(&rObjIDtarger)
if resObjTarget == nil {
return errors.New("rObjIDtarger doesn't exist")
} else if resObjTarget.getRtype() == rtype.DATA {
return errors.New("rObjIDtarger of type Data doesn't have inputs/outputs")
}
if resObjTarget.isLinked(rObjIDsource) != NO_LINK {
return errors.New("rObjIDsource already exists in the inputs or outputs")
}
resObjTarget.addLink(boolToLinkingState(isInput), rObjIDsource)
_, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": userID},
primitive.M{"$set": primitive.M{
WorkflowDB + "." +
workflowName + "." +
resObjTarget.getRtype().String() + "." +
rObjIDtarger: resObjTarget}},
)
if err != nil {
return errors.New("Internal error when updating in DB: " + err.Error())
}
return nil
}
func GetWorkflowSchedule(username, workflowName string) (Currentschedule *WorkflowSchedule, err error) {
userWorkspace := GetWorkspace(username)
if userWorkspace == nil {
return nil, errors.New("No workspace for user " + username)
}
if workflow, ok := userWorkspace.Workflows[workflowName]; ok {
Currentschedule = &workflow.Schedules
}
return
}
func SetWorkflowSchedule(username, workflowName, cronString, events string, isService bool, startDate, stopDate time.Time, duration uint) (NextSchedules *ScheduleInfo, err error) {
userWorkspace := GetWorkspace(username)
if userWorkspace == nil {
return nil, errors.New("No workspace for user " + username)
}
// Check if workflow exist
if _, ok := userWorkspace.Workflows[workflowName]; !ok {
return
}
// We mustn't modify a booked schedule
if userWorkspace.Workflows[workflowName].Schedules.IsBooked {
return nil, errors.New("A booked schedule can't be modified")
}
sch := WorkflowSchedule{}
NextSchedules = &ScheduleInfo{}
sch.StartDate = startDate
sch.StopDate = stopDate
sch.IsService = isService
if isService { // Service
NextSchedules.NextExecutions[0] = startDate.String()
NextSchedules.Total = 1
}
if !isService { // Task
sch.Cron = cronString
sch.Duration = duration
sch.Events = events
// Obtain next executions
counter := 0
myCron, _ := cron.Parse(cronString) // NOTE: already checked in the controller
scheduledStart := myCron.Next(startDate) // Get the first execution starting from startDate
for !scheduledStart.IsZero() && counter < MAX_SCHEDULES {
scheduleStop := scheduledStart.Add(time.Second * time.Duration(duration))
if scheduleStop.After(stopDate) || scheduledStart.Before(startDate) {
// If a task is longer than last possible date, we ignore it
scheduledStart = myCron.Next(scheduledStart)
counter++
continue
}
if counter < len(NextSchedules.NextExecutions) {
NextSchedules.NextExecutions[counter] = scheduledStart.String()
}
scheduledStart = myCron.Next(scheduledStart)
counter++
}
NextSchedules.Total = counter
if NextSchedules.Total == 0 {
return nil, errors.New("Current Task configuration will have 0 executions")
}
}
_, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": username},
primitive.M{"$set": primitive.M{
WorkflowDB + "." +
workflowName + "." +
SchedulesDB: &sch}},
)
if err != nil {
return nil, errors.New("Internal error when updating in DB: " + err.Error())
}
return
}
func GetMxGraph(username, workflowName string) (xmlData *string, err error) {
userWorkspace := GetWorkspace(username)
if userWorkspace == nil {
return nil, errors.New("No workspace for user " + username)
}
if _, ok := userWorkspace.Workflows[workflowName]; !ok {
return nil, errors.New("Workspace workflow " + workflowName + " doesn't exist for user " + username)
}
var data string = userWorkspace.Workflows[workflowName].MxgraphXML
if data == "" {
xmlData = nil
} else {
xmlData = &data
}
return
}
func ParseMxGraph(username, workflowName, xmlData string) (err error, mxissues []error) {
userWorkspace := GetWorkspace(username)
if userWorkspace == nil {
return errors.New("No workspace for user " + username), nil
}
currentWorkflow, ok := userWorkspace.Workflows[workflowName]
if !ok {
return errors.New("No workflow " + workflowName), nil
}
if currentWorkflow.Schedules.IsBooked {
return errors.New("Can't modify a booked workflow"), nil
}
var xmlModel MxGraphModel
// logs.Debug(xmlData)
err = xml.Unmarshal([]byte(xmlData), &xmlModel)
if err != nil {
return err, nil
}
targetWorkspaceWorkflow, err, mxissues := userWorkspace.ConsumeMxGraphModel(xmlModel)
if err != nil {
return err, nil
}
targetWorkspaceWorkflow.MxgraphXML = xmlData
targetWorkspaceWorkflow.Schedules = currentWorkflow.Schedules //TODO: Probably we should move schudles outside the workflow
_, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": username},
primitive.M{"$set": primitive.M{
WorkflowDB + "." +
workflowName: targetWorkspaceWorkflow}},
)
if err != nil {
return errors.New("Internal error when updating in DB: " + err.Error()), nil
}
return nil, mxissues
}
// FindInSlice takes a slice and looks for an element in it. If found it will
// return it's key, otherwise it will return -1 and a bool of false.
func FindInSlice(slice []string, val string) (int, bool) {
for i, item := range slice {
if item == val {
return i, true
}
}
return -1, false
}
// At least one element exist in both slices
// Return index1, index2 and if exist
func FindSliceInSlice(slice1 []string, slice2 []string) (int, int, bool) {
for i1, item1 := range slice1 {
for i2, item2 := range slice2 {
if item1 == item2 {
return i1, i2, true
}
}
}
return -1, -1, false
}
func (w Workspace) ConsumeMxGraphModel(xmlmodel MxGraphModel) (ret *Workflow, err error, issues []error) {
ret = &Workflow{}
// When we will iterate over the full array of cells, we first will register the resources
// and after the linkage between them
sort.Slice(xmlmodel.Root.MxCell, func(i, j int) bool {
return xmlmodel.Root.MxCell[i].RID != nil
})
for _, cell := range xmlmodel.Root.MxCell {
switch {
case cell.RID != nil:
// Case of a Resource
rType := w.getRtype(*cell.RID)
if rType == rtype.INVALID {
return nil,
errors.New("Refering to a rID that is not in the workflow"),
nil
}
// Generate ObjectID for the reference ID
rIDObj, err := primitive.ObjectIDFromHex(*cell.RID)
if err != nil {
return nil,
errors.New("Bad ID format: " + *cell.RID),
nil
}
resObj := ret.CreateResourceObject(rType)
resObj.setReference(rIDObj)
ret.UpdateObj(resObj, cell.ID)
case cell.ID == "0" || cell.ID == "1":
// ID 0 and 1 are special cases of mxeditor
continue
// issues = append(issues, errors.New("MxCell with ID "+cell.ID+" doesn't have a valid link"))
default:
// Not root nor resource. Should be only links
sourceObj := ret.GetResource(cell.Source)
targetObj := ret.GetResource(cell.Target)
if sourceObj == nil || targetObj == nil {
if sourceObj == nil && targetObj == nil {
issues = append(issues, errors.New("Arrow "+cell.ID+" is alone"))
} else if sourceObj == nil {
issues = append(issues, errors.New("Arrow ("+cell.ID+") to "+*targetObj.getName()+" without parent"))
} else {
issues = append(issues, errors.New("Arrow "+cell.ID+" from "+*sourceObj.getName()+" without target"))
}
// If is a invalid link, we can't save it in the DB
continue
}
if sourceObj.getRtype() == rtype.DATACENTER || targetObj.getRtype() == rtype.DATACENTER {
var datacenter, datacenterLinked *string
if sourceObj.getRtype() == rtype.DATACENTER {
datacenter = cell.Source
datacenterLinked = cell.Target
} else {
datacenter = cell.Target
datacenterLinked = cell.Source
}
switch ret.GetResource(datacenterLinked).getRtype() {
case rtype.COMPUTING:
computingObj := ret.GetResource(datacenterLinked).(*ComputingObject)
// We should always get a ID because we already registered resources and discarded which doesn't correspond to existent models
computingObj.DataCenterID = *datacenter
ret.UpdateObj(computingObj, *datacenterLinked)
}
} else {
targetObj.addLink(INPUT, *cell.Source)
ret.UpdateObj(targetObj, *cell.Target) // save back
// If we have a relationship of:
// Source ----> Target
//
// The Source will be in the INPUTs of the Target.
// But we also must make sure that the Target will be in the OUTPUTs of the Source
sourceObj.addLink(OUTPUT, *cell.Target)
ret.UpdateObj(sourceObj, *cell.Source)
}
}
}
dcslist := make(map[string]bool)
dataslist := make(map[string]bool)
// datalist := make(map[string]bool)
for _, comp := range ret.Computing {
if comp.DataCenterID == "" {
issues = append(issues, errors.New("Computing "+*comp.getName()+" without a Datacenter"))
} else {
// If doesn't exist in the list, means is new element to register as used
dcslist[comp.DataCenterID] = true
}
for _, dcin := range comp.Inputs {
switch ret.GetResource(&dcin).getRtype() {
case rtype.DATA:
dataslist[dcin] = true
}
}
for _, dcout := range comp.Outputs {
switch ret.GetResource(&dcout).getRtype() {
case rtype.DATA:
dataslist[dcout] = true
}
}
}
for _, va := range ret.Storage {
if va.Inputs == nil && va.Outputs == nil {
issues = append(issues, errors.New("Storage "+*va.getName()+" without compatible inputs and outputs"))
}
}
for dcID, va := range ret.Datacenter {
// if rID doesn't exist in the list, it means that it's not used
if _, ok := dcslist[dcID]; !ok {
issues = append(issues, errors.New("DC "+*va.getName()+" not atached to any Computing"))
}
}
for dcID, va := range ret.Data {
// if rID doesn't exist in the list, it means that it's not used
if _, ok := dataslist[dcID]; !ok {
issues = append(issues, errors.New("Data "+*va.getName()+" not atached to any Computing"))
}
}
//////////////////////////////////////////////////////////
// //
// Starting from here, we check the type of resources //
// //
//////////////////////////////////////////////////////////
// FIXME: Avoid checking twice the same cases (cycles). Ex:
//
// Comp1 ----> Comp2
//
// In this case, we will check Comp1 outputs with Comp2
// inputs AND Comp2 inputs with Comp1 outputs, since we are
// iterating over all existent Computing models in the Graph
for _, comp := range ret.Computing {
compModel, err2 := comp.getModel()
if err = err2; err != nil {
return
}
currentCompModel := compModel.(ComputingModel)
// Come computings may not allow inputs or outputs
if len(currentCompModel.Dinputs) == 0 && len(comp.Inputs) > 0 {
issues = append(issues, errors.New("Computing "+compModel.getName()+" must not have any input"))
continue
}
if len(currentCompModel.Doutputs) == 0 && len(comp.Outputs) > 0 {
issues = append(issues, errors.New("Computing "+compModel.getName()+" must not have any output"))
continue
}
//TODO: We should allow heterogenous inputs?
for _, objIn := range comp.Inputs {
resIn := ret.GetResource(&objIn)
resInType := resIn.getRtype()
switch resInType {
case rtype.DATA:
dataModel, err2 := resIn.getModel()
if err = err2; err != nil {
return
}
myDataModel := dataModel.(DataModel)
if _, ok := FindInSlice(currentCompModel.Dinputs, myDataModel.Dtype); !ok {
issues = append(issues, errors.New("Computing "+compModel.getName()+" can't handle inputs of type "+myDataModel.Dtype+" from Data "+dataModel.getName()))
}
case rtype.COMPUTING:
inCompModel, err2 := resIn.getModel()
if err = err2; err != nil {
return
}
myInComputingModel := inCompModel.(ComputingModel)
if _, _, ok := FindSliceInSlice(myInComputingModel.Doutputs, currentCompModel.Dinputs); !ok {
issues = append(issues, errors.New("Computing "+compModel.getName()+" can't handle any input from "+inCompModel.getName()))
}
case rtype.STORAGE:
// Storage can give use anything, so we always accept it's input for now
continue
default:
issues = append(issues, errors.New("Computing "+currentCompModel.getName()+" can't have any resource of type "+resInType.String()+" (behaviour not defined)"))
}
}
//TODO: We should allow heterogenous outputs?
for _, objOut := range comp.Outputs {
resOut := ret.GetResource(&objOut)
resOutType := resOut.getRtype()
switch resOutType {
case rtype.COMPUTING:
outCompModel, err2 := resOut.getModel()
if err = err2; err != nil {
return
}
myOutComputingModel := outCompModel.(ComputingModel)
if _, _, ok := FindSliceInSlice(currentCompModel.Doutputs, myOutComputingModel.Dinputs); !ok {
issues = append(issues, errors.New("Computing "+compModel.getName()+" doesn't have output data compatible with "+outCompModel.getName()))
}
case rtype.STORAGE:
// Storage can save anything, so we always accept store it for now
continue
default:
issues = append(issues, errors.New("Computing "+currentCompModel.getName()+" can't have any resource of type "+resOutType.String()+" (behaviour not defined)"))
}
}
}
return
}
func sumExecutionReqs(exeqReq ...ExecutionRequirementsModel) (ret ExecutionRequirementsModel) {
for _, v := range exeqReq {
ret.CPUs += v.CPUs
ret.GPUs += v.GPUs
ret.RAM += v.RAM
}
return
}
func CheckAndBookWorkflowSchedule(username, workflowName string, book bool) (myRet []DCstatus, err error) {
userWorkspace := GetWorkspace(username)
if userWorkspace == nil {
return nil, errors.New("No workspace for user " + username)
}
currentWorkflow, ok := userWorkspace.Workflows[workflowName]
if !ok {
return nil, errors.New("No workflow " + workflowName + " for user " + username)
}
if currentWorkflow.Schedules.IsBooked {
return nil, errors.New("Can't operate DCs for a already booked schedule")
}
// dd := &currentWorkflow
// We can have multiple DCobjs pointing to the same DCmodel. We must sum all req of the same DCmodel
totalDCs := make(map[primitive.ObjectID]ExecutionRequirementsModel)
for dcIDobj, dcObj := range currentWorkflow.Datacenter {
modelID := dcObj.getReference()
var totalsModel ExecutionRequirementsModel
totalsModel, err = currentWorkflow.GetExecutionRequirements(dcIDobj)
if err != nil {
return
}
if _, ok := totalDCs[modelID]; ok {
totalDCs[modelID] = sumExecutionReqs(totalDCs[modelID], totalsModel)
} else {
totalDCs[modelID] = totalsModel
}
}
myRet = make([]DCstatus, len(totalDCs))
var i int
i = -1
for modelID, execReq := range totalDCs {
i++
var dcModel *DatacenterModel
dcModel, err = GetOneDatacenter(modelID.Hex())
if err != nil {
return
}
myRet[i].DCname = dcModel.Name
myRet[i].DCobjID = modelID.Hex()
// retrieve the host of the DC
host := GetHost(dcModel.Hosts)
if host == nil {
myRet[i].ErrorMessage = "Datacenter " + myRet[i].DCname + " doesn't have a host property"
continue
}
cli := services.GetSelfAPI(*host)
data, err := cli.ScheduleApi.ScheduleControllerCheckIfScheduleCanBeCreatedInThisDC(context.Background(),
currentWorkflow.Schedules.Cron,
int32(currentWorkflow.Schedules.Duration),
currentWorkflow.Schedules.StartDate.Format(time.RFC3339),
currentWorkflow.Schedules.StopDate.Format(time.RFC3339),
swagger.ModelsExecutionRequirementsModel{
Cpus: int32(execReq.CPUs),
Gpus: int32(execReq.GPUs),
Ram: int32(execReq.RAM),
},
)
if err != nil {
myRet[i].ErrorMessage = err.Error()
swErr, ok := err.(swagger.GenericSwaggerError)
if ok {
myRet[i].IsReachable = true
myRet[i].ErrorMessage += ": " + string(swErr.Body())
}
continue
}
myRet[i].IsReachable = true
if data.StatusCode == 200 {
myRet[i].IsAvailable = true
}
}
// If we only check, we should exit here
if !book {
return
}
for _, v := range myRet {
if !v.IsAvailable {
return
}
}
i = -1
allBooked := true
for modelID, execReq := range totalDCs {
i++
// _ = v
var dcModel *DatacenterModel
dcModel, err = GetOneDatacenter(modelID.Hex())
if err != nil {
myRet[i].ErrorMessage = err.Error()
continue
}
cli := services.GetSelfAPI(*GetHost(dcModel.Hosts)) // If we are here, we already check that host exists and is reachable
data, resp, err := cli.ScheduleApi.ScheduleControllerCreateSchedule(context.Background(),
services.DC_NAME,
workflowName,
currentWorkflow.Schedules.Cron,
int32(currentWorkflow.Schedules.Duration),
currentWorkflow.Schedules.StartDate.Format(time.RFC3339),
currentWorkflow.Schedules.StopDate.Format(time.RFC3339),
swagger.ModelsExecutionRequirementsModel{
Cpus: int32(execReq.CPUs),
Gpus: int32(execReq.GPUs),
Ram: int32(execReq.RAM),
},
)
if err != nil {
allBooked = false
myRet[i].ErrorMessage = err.Error()
swErr, ok := err.(swagger.GenericSwaggerError)
if ok {
myRet[i].IsReachable = true
myRet[i].ErrorMessage += ": " + string(swErr.Body())
}
continue
}
if resp.StatusCode == 200 {
//FIXME: Maybe some better way of casting?
var nextExec [5]string
for counter := 0; counter < 5; counter++ {
nextExec[counter] = data.NextExecutions[counter]
}
myRet[i].Booked = &ScheduleInfo{
Total: int(data.Total),
NextExecutions: nextExec,
}
}
}
// If some DC fail, we must not mark the workflow as booked
if !allBooked {
return
}
currentWorkflow.Schedules.IsBooked = true
_, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": username},
primitive.M{"$set": primitive.M{
WorkflowDB + "." +
workflowName + "." +
SchedulesDB: &currentWorkflow.Schedules}},
)
if err != nil {
logs.Critical("Internal error when updating in DB: " + err.Error())
}
return myRet, nil
}