package models import ( "context" "encoding/xml" "errors" "net/url" "os" "sort" "time" "cloud.o-forge.io/core/deprecated-oc-catalog/models/rtype" swagger "cloud.o-forge.io/core/deprecated-oc-catalog/selfapi" "cloud.o-forge.io/core/deprecated-oc-catalog/services" "github.com/beego/beego/v2/core/logs" "github.com/vk496/cron" "go.mongodb.org/mongo-driver/bson/primitive" ) type LinkingState uint // When we check the schedule of a workflow, we report with this type DCstatus struct { DCname string DCobjID string //FIXME: Probably should be model ID IsReachable bool IsAvailable bool Booked *ScheduleInfo ErrorMessage string } const ( NO_LINK LinkingState = iota INPUT OUTPUT ) func boolToLinkingState(boolState bool) LinkingState { switch boolState { case true: return INPUT case false: return OUTPUT default: return NO_LINK } } const SchedulesDB = "schedules" type Workflow struct { // The key of the map is the ID of the object itself Data map[string]DataObject `json:"data"` Computing map[string]ComputingObject `json:"computing"` Storage map[string]StorageObject `json:"storage"` Datacenter map[string]DatacenterObject `json:"datacenter"` //TODO: Decide if there should be multiple objects of a datacenter Links map[string]Link `json:"link"` Schedules WorkflowSchedule `json:"schedules"` MxgraphXML string `description:"State of the mxgraph"` } // TODO : describe what use case this interface satisfies type ResourceObject interface { getHost() *string getName() *string getModel() (ResourceModel, error) getRtype() rtype.Rtype setReference(rObjID primitive.ObjectID) getReference() primitive.ObjectID isLinked(rObjID string) LinkingState addLink(direction LinkingState, rObjID string) } // This type allows to process computing and storage component // which can get input from the user type EditableResourceObject interface { ResourceObject addUserInput(map[string]interface{}) } // Get a sum of all execution requirements attached to a DC obj func (wf Workflow) GetExecutionRequirements(dcIDobj string) (ret ExecutionRequirementsModel, err error) { // Find the id of the DC obj if _, ok := wf.Datacenter[dcIDobj]; !ok { return ExecutionRequirementsModel{}, errors.New("DC obj" + dcIDobj + " doesn't exist in the Workflow") } // Get all elements that are attached to the DC for _, computingObj := range wf.Computing { if computingObj.DataCenterID == dcIDobj { mymodel, err := computingObj.getModel() if err != nil { return ExecutionRequirementsModel{}, err } compModel := mymodel.(ComputingModel) //TODO a generic way to concatenate execution requirements ret.CPUs += compModel.ExecutionRequirements.CPUs ret.GPUs += compModel.ExecutionRequirements.GPUs ret.RAM += compModel.ExecutionRequirements.RAM } } return } func (w *Workflow) GetResource(rObjID *string) (retObj ResourceObject) { if rObjID == nil { return nil } if datVal, ok := w.Data[*rObjID]; ok { retObj = &datVal return } if compVal, ok := w.Computing[*rObjID]; ok { retObj = &compVal return } if storVal, ok := w.Storage[*rObjID]; ok { retObj = &storVal return } if dcVal, ok := w.Datacenter[*rObjID]; ok { retObj = &dcVal return } return nil } func (w *Workflow) GetResourceMapByRtype(rt rtype.Rtype) interface{} { switch rt { case rtype.DATA: return w.Data case rtype.COMPUTING: return w.Computing case rtype.STORAGE: return w.Storage case rtype.DATACENTER: return w.Datacenter default: return nil } } func (w *Workflow) CreateResourceObject(rt rtype.Rtype) ResourceObject { var res ResourceObject switch rt { case rtype.DATA: res = &DataObject{} case rtype.COMPUTING: res = &ComputingObject{} case rtype.STORAGE: res = &StorageObject{} case rtype.DATACENTER: res = &DatacenterObject{} default: res = nil } return res } func (w *Workflow) AddObj(robj ResourceObject) *primitive.ObjectID { outputID := primitive.NewObjectID() w.UpdateObj(robj, outputID.Hex()) return &outputID } func (w *Workflow) AddLinkToWorkflow(link Link, id string) { if w.Links == nil { w.Links = make(map[string]Link) } w.Links[id] = link } func (w *Workflow) UpdateDB(userID, workflowName string) error { _, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx, primitive.M{"_id": userID}, primitive.M{"$set": primitive.M{WorkflowDB + "." + workflowName: w}}, ) return err } func (w *Workflow) UpdateObj(robj ResourceObject, objID string) { switch robj.getRtype() { case rtype.DATA: var target DataObject if w.Data == nil { //init w.Data = make(map[string]DataObject) } target = *robj.(*DataObject) w.Data[objID] = target case rtype.COMPUTING: var target ComputingObject if w.Computing == nil { //init w.Computing = make(map[string]ComputingObject) } target = *robj.(*ComputingObject) w.Computing[objID] = target case rtype.STORAGE: var target StorageObject if w.Storage == nil { //init w.Storage = make(map[string]StorageObject) } target = *robj.(*StorageObject) w.Storage[objID] = target case rtype.DATACENTER: var target DatacenterObject if w.Datacenter == nil { //init w.Datacenter = make(map[string]DatacenterObject) } target = *robj.(*DatacenterObject) w.Datacenter[objID] = target } } func GetWorkflow(userID, workflowName string) (workflow *Workflow, err error) { userWorkspace := GetWorkspace(userID) if userWorkspace != nil { if theWorkflow, ok := userWorkspace.Workflows[workflowName]; ok { return &theWorkflow, nil } logs.Debug("No workflow name") return nil, errors.New("No workflow name") } logs.Debug("No workspace") return nil, errors.New("No workspace") } func CreateWorkflow(userID, workflowName string) (err error) { //TODO: Maybe operate directly in the DB instead retriving the full object? userWorkspace := GetWorkspace(userID) // Exist in the DB if userWorkspace != nil { if _, ok := userWorkspace.Workflows[workflowName]; ok { message := "Workspace workflow " + workflowName + " is already created for user " + userID logs.Debug(message) return errors.New(message) } userWP := &Workflow{} // New element addElem := primitive.M{WorkflowDB + "." + workflowName: userWP} // If user doesn't have workflows, we must init at least one if userWorkspace.Workflows == nil { addElem = primitive.M{WorkflowDB: map[string]*Workflow{ workflowName: userWP, }} } _, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx, primitive.M{"_id": userID}, primitive.M{"$set": addElem}, ) if err != nil { message := "Internal error when updating in DB" logs.Debug(message + "; " + err.Error()) return errors.New(message) } return nil } return errors.New("Can't create a workflow without a workspace") } func CreateObjectInWorkflow(userID, workflowName, rID string) (rObjID2 *string, err error) { userWorkspace := GetWorkspace(userID) if userWorkspace == nil { return nil, errors.New("No workspace for user " + userID) } if _, ok := userWorkspace.Workflows[workflowName]; !ok { return nil, errors.New("Workspace workflow " + workflowName + " doesn't exist for user " + userID) } rIDObj, err := primitive.ObjectIDFromHex(rID) if err != nil { return nil, errors.New("ID " + rID + " is not valid") } //TODO: We are replacing the entire array instead of pushing // a new element. Probably will have problems with multithread/async // operations and consistency in the future for rtyp, resource := range userWorkspace.GetResources() { if contains(resource, rID) { wWorkflow := userWorkspace.Workflows[workflowName] newObj := wWorkflow.CreateResourceObject(rtyp) newObj.setReference(rIDObj) outputID := wWorkflow.AddObj(newObj) _, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx, primitive.M{"_id": userID}, primitive.M{"$set": primitive.M{WorkflowDB + "." + workflowName + "." + rtyp.String(): wWorkflow.GetResourceMapByRtype(rtyp)}}, ) if err != nil { return nil, errors.New("Internal error when updating in DB: " + err.Error()) } outStr := outputID.Hex() return &outStr, nil } } return nil, errors.New("rID " + rID + " doesn't exist in the user workspace") } func LinkObjectsInWorkspace(userID, workflowName, rObjIDsource string, isInput bool, rObjIDtarger string) (err error) { userWorkspace := GetWorkspace(userID) if userWorkspace == nil { return errors.New("No workspace for user " + userID) } if _, ok := userWorkspace.Workflows[workflowName]; !ok { return errors.New("Workspace workflow " + workflowName + " doesn't exist for user " + userID) } // Check rObjIDsource if _, ok := userWorkspace.Workflows[workflowName].Data[rObjIDsource]; !ok { return errors.New("rObjIDsource must be of type DATA for now") } // Check rObjIDtarger wWorkflow := userWorkspace.Workflows[workflowName] resObjTarget := wWorkflow.GetResource(&rObjIDtarger) if resObjTarget == nil { return errors.New("rObjIDtarger doesn't exist") } else if resObjTarget.getRtype() == rtype.DATA { return errors.New("rObjIDtarger of type Data doesn't have inputs/outputs") } if resObjTarget.isLinked(rObjIDsource) != NO_LINK { return errors.New("rObjIDsource already exists in the inputs or outputs") } resObjTarget.addLink(boolToLinkingState(isInput), rObjIDsource) _, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx, primitive.M{"_id": userID}, primitive.M{"$set": primitive.M{ WorkflowDB + "." + workflowName + "." + resObjTarget.getRtype().String() + "." + rObjIDtarger: resObjTarget}}, ) if err != nil { return errors.New("Internal error when updating in DB: " + err.Error()) } return nil } func GetWorkflowSchedule(username, workflowName string) (Currentschedule *WorkflowSchedule, err error) { userWorkspace := GetWorkspace(username) if userWorkspace == nil { return nil, errors.New("No workspace for user " + username) } if workflow, ok := userWorkspace.Workflows[workflowName]; ok { Currentschedule = &workflow.Schedules } return } func SetWorkflowSchedule(username, workflowName, cronString, events string, isService bool, startDate, stopDate time.Time, duration uint) (NextSchedules *ScheduleInfo, err error) { userWorkspace := GetWorkspace(username) if userWorkspace == nil { return nil, errors.New("No workspace for user " + username) } // Check if workflow exist if _, ok := userWorkspace.Workflows[workflowName]; !ok { return } // We mustn't modify a booked schedule if userWorkspace.Workflows[workflowName].Schedules.IsBooked { return nil, errors.New("A booked schedule can't be modified") } sch := WorkflowSchedule{} NextSchedules = &ScheduleInfo{} sch.StartDate = startDate sch.StopDate = stopDate sch.IsService = isService if isService { // Service NextSchedules.NextExecutions[0] = startDate.String() NextSchedules.Total = 1 } if !isService { // Task sch.Cron = cronString sch.Duration = duration sch.Events = events // Obtain next executions counter := 0 myCron, _ := cron.Parse(cronString) // NOTE: already checked in the controller scheduledStart := myCron.Next(startDate) // Get the first execution starting from startDate for !scheduledStart.IsZero() && counter < MAX_SCHEDULES { scheduleStop := scheduledStart.Add(time.Second * time.Duration(duration)) if scheduleStop.After(stopDate) || scheduledStart.Before(startDate) { // If a task is longer than last possible date, we ignore it scheduledStart = myCron.Next(scheduledStart) counter++ continue } if counter < len(NextSchedules.NextExecutions) { NextSchedules.NextExecutions[counter] = scheduledStart.String() } scheduledStart = myCron.Next(scheduledStart) counter++ } NextSchedules.Total = counter if NextSchedules.Total == 0 { return nil, errors.New("Current Task configuration will have 0 executions") } } _, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx, primitive.M{"_id": username}, primitive.M{"$set": primitive.M{ WorkflowDB + "." + workflowName + "." + SchedulesDB: &sch}}, ) if err != nil { return nil, errors.New("Internal error when updating in DB: " + err.Error()) } return } func GetMxGraph(username, workflowName string) (xmlData *string, err error) { userWorkspace := GetWorkspace(username) if userWorkspace == nil { return nil, errors.New("No workspace for user " + username) } if _, ok := userWorkspace.Workflows[workflowName]; !ok { return nil, errors.New("Workspace workflow " + workflowName + " doesn't exist for user " + username) } var data string = userWorkspace.Workflows[workflowName].MxgraphXML if data == "" { xmlData = nil } else { xmlData = &data } return } func ParseMxGraph(username, workflowName, xmlData string) (err error, mxissues []error) { userWorkspace := GetWorkspace(username) if userWorkspace == nil { return errors.New("No workspace for user " + username), nil } currentWorkflow, ok := userWorkspace.Workflows[workflowName] if !ok { return errors.New("No workflow " + workflowName), nil } if currentWorkflow.Schedules.IsBooked { //well, let's allow that //return errors.New("Can't modify a booked workflow"), nil } decodedValue, err := url.QueryUnescape(xmlData) if err != nil { return err, nil } // TEMPORARY test the xml created os.WriteFile("graph.xml", []byte(decodedValue), 0660) var xmlModel MxGraphModel // logs.Debug(xmlData) err = xml.Unmarshal([]byte(xmlData), &xmlModel) if err != nil { return err, nil } xmlModel.createLinks() targetWorkspaceWorkflow, err, mxissues := userWorkspace.ConsumeMxGraphModel(xmlModel) if err != nil { return err, nil } targetWorkspaceWorkflow.MxgraphXML = xmlData targetWorkspaceWorkflow.Schedules = currentWorkflow.Schedules //TODO: Probably we should move schedules outside the workflow _, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx, primitive.M{"_id": username}, primitive.M{"$set": primitive.M{ WorkflowDB + "." + workflowName: targetWorkspaceWorkflow}}, ) if err != nil { return errors.New("Internal error when updating in DB: " + err.Error()), nil } return nil, mxissues } // FindInSlice takes a slice and looks for an element in it. If found it will // return it's key, otherwise it will return -1 and a bool of false. func FindInSlice(slice []string, val string) (int, bool) { for i, item := range slice { if item == val { return i, true } } return -1, false } // At least one element exist in both slices // Return index1, index2 and if exist func FindSliceInSlice(slice1 []string, slice2 []string) (int, int, bool) { for i1, item1 := range slice1 { for i2, item2 := range slice2 { if item1 == item2 { return i1, i2, true } } } return -1, -1, false } func (ws Workspace) ConsumeMxGraphModel(xmlmodel MxGraphModel) (returned_wf *Workflow, err error, issues []error) { returned_wf = &Workflow{} // When we will iterate over the full array of cells, we first will register the resources // and after the linkage between them sort.Slice(xmlmodel.Root.MxCell, func(i, j int) bool { return xmlmodel.Root.MxCell[i].RID != nil }) // Create the object and add it to the appropriate list // for all the components with setting, which are identified // by a MxObject tag in the xml if ok := xmlmodel.Root.MxObject != nil; ok { for _, object := range *xmlmodel.Root.MxObject { resObj, err, mxissues := returned_wf.mxCellToComponent(object.MxCell, ws) if err != nil { issues = append(issues, mxissues...) } // add the component to the worflow's attribute that stores // all components in a map[string]Component where the key // is the component's ID in the mxGraph and the value the Component object returned_wf.UpdateObj(resObj, object.ID) // Construct the object corresponding to the componant's type and use its addUserInput method if resObj.getRtype() == rtype.COMPUTING { comp_obj := returned_wf.GetResource(&object.ID).(*ComputingObject) comp_obj.AddUserInput(object.Settings) returned_wf.UpdateObj(comp_obj, object.ID) } // if(resObj.getRtype() == rtype.DATA){ // } } } for _, cell := range xmlmodel.Root.MxCell { switch { case cell.RID != nil: resObj, err, mxissues := returned_wf.mxCellToComponent(cell, ws) if err != nil { issues = append(issues, mxissues...) } returned_wf.UpdateObj(resObj, cell.ID) case cell.ID == "0" || cell.ID == "1": // ID 0 and 1 are special cases of mxeditor continue // issues = append(issues, errors.New("MxCell with ID "+cell.ID+" doesn't have a valid link")) default: // Not root nor resource. Should be only links sourceObj := returned_wf.GetResource(cell.Source) targetObj := returned_wf.GetResource(cell.Target) if sourceObj == nil || targetObj == nil { if sourceObj == nil && targetObj == nil { issues = append(issues, errors.New("Arrow "+cell.ID+" is alone")) } else if sourceObj == nil { issues = append(issues, errors.New("Arrow ("+cell.ID+") to "+*targetObj.getName()+" without parent")) } else { issues = append(issues, errors.New("Arrow "+cell.ID+" from "+*sourceObj.getName()+" without target")) } // If is a invalid link, we can't save it in the DB continue } // Not root nor resource. Should be only links // If is a invalid link, we can't save it in the DB // We should always get a ID because we already registered resources and discarded which doesn't correspond to existent models // save back // If we have a relationship of: // Source ----> Target // // The Source will be in the INPUTs of the Target. // But we also must make sure that the Target will be in the OUTPUTs of the Source } } issues = returned_wf.createLinks(xmlmodel.Root.MxLink, issues) issues = returned_wf.checkLinks(issues) // dcslist := make(map[string]bool) // dataslist := make(map[string]bool) // // datalist := make(map[string]bool) // // Test wether the computing components are linked with a DC // for _, comp := range returned_wf.Computing { // if comp.DataCenterID == "" { // issues = append(issues, errors.New("Computing "+*comp.getName()+" without a Datacenter")) // } else { // // If doesn't exist in the list, means is new element to register as used // dcslist[comp.DataCenterID] = true // } // for _, dcin := range comp.Inputs { // switch returned_wf.GetResource(&dcin).getRtype() { // case rtype.DATA: // dataslist[dcin] = true // } // } // for _, dcout := range comp.Outputs { // switch returned_wf.GetResource(&dcout).getRtype() { // case rtype.DATA: // dataslist[dcout] = true // } // } // } // for _, storage_component := range returned_wf.Storage { // if storage_component.Inputs == nil && storage_component.Outputs == nil { // issues = append(issues, errors.New("Storage "+*storage_component.getName()+" without compatible inputs and outputs")) // } // } // for dcID, dc_component := range returned_wf.Datacenter { // // if rID doesn't exist in the list, it means that it's not used // if _, ok := dcslist[dcID]; !ok { // issues = append(issues, errors.New("DC "+*dc_component.getName()+" not attached to any Computing")) // } // } // for dcID, data_component := range returned_wf.Data { // // if rID doesn't exist in the list, it means that it's not used // if _, ok := dataslist[dcID]; !ok { // issues = append(issues, errors.New("Data "+*data_component.getName()+" not attached to any Computing")) // } // } ////////////////////////////////////////////////////////// // // // Starting from here, we check the type of resources // // // ////////////////////////////////////////////////////////// // FIXME: Avoid checking twice the same cases (cycles). Ex: // // Comp1 ----> Comp2 // // In this case, we will check Comp1 outputs with Comp2 // inputs AND Comp2 inputs with Comp1 outputs, since we are // iterating over all existent Computing models in the Graph for _, comp := range returned_wf.Computing { compModel, err2 := comp.getModel() if err = err2; err != nil { return } currentCompModel := compModel.(ComputingModel) // Come computings may not allow inputs or outputs if len(currentCompModel.Dinputs) == 0 && len(comp.Inputs) > 0 { issues = append(issues, errors.New("Computing "+compModel.getName()+" must not have any input")) continue } if len(currentCompModel.Doutputs) == 0 && len(comp.Outputs) > 0 { issues = append(issues, errors.New("Computing "+compModel.getName()+" must not have any output")) continue } //TODO: We should allow heterogenous inputs? for _, objIn := range comp.Inputs { resIn := returned_wf.GetResource(&objIn) resInType := resIn.getRtype() switch resInType { case rtype.DATA: dataModel, err2 := resIn.getModel() if err = err2; err != nil { return } myDataModel := dataModel.(DataModel) if _, ok := FindInSlice(currentCompModel.Dinputs, myDataModel.Dtype); !ok { issues = append(issues, errors.New("Computing "+compModel.getName()+" can't handle inputs of type "+myDataModel.Dtype+" from Data "+dataModel.getName())) } case rtype.COMPUTING: inCompModel, err2 := resIn.getModel() if err = err2; err != nil { return } myInComputingModel := inCompModel.(ComputingModel) if _, _, ok := FindSliceInSlice(myInComputingModel.Doutputs, currentCompModel.Dinputs); !ok { issues = append(issues, errors.New("Computing "+compModel.getName()+" can't handle any input from "+inCompModel.getName())) } case rtype.STORAGE: // Storage can give use anything, so we always accept it's input for now continue default: issues = append(issues, errors.New("Computing "+currentCompModel.getName()+" can't have any resource of type "+resInType.String()+" (behaviour not defined)")) } } //TODO: We should allow heterogenous outputs? for _, objOut := range comp.Outputs { resOut := returned_wf.GetResource(&objOut) resOutType := resOut.getRtype() switch resOutType { case rtype.COMPUTING: outCompModel, err2 := resOut.getModel() if err = err2; err != nil { return } myOutComputingModel := outCompModel.(ComputingModel) if _, _, ok := FindSliceInSlice(currentCompModel.Doutputs, myOutComputingModel.Dinputs); !ok { issues = append(issues, errors.New("Computing "+compModel.getName()+" doesn't have output data compatible with "+outCompModel.getName())) } case rtype.STORAGE: // Storage can save anything, so we always accept store it for now continue default: issues = append(issues, errors.New("Computing "+currentCompModel.getName()+" can't have any resource of type "+resOutType.String()+" (behaviour not defined)")) } } } return } func (w *Workflow) createLinks(links []MxLink, issues []error) []error { for _, link := range links { if (len(link.Source) > 0 && len(link.Target) > 0){ sourceObj := w.GetResource(&link.Source) targetObj := w.GetResource(&link.Target) link_object := NewLink(sourceObj, link.Source, targetObj, link.Target) w.AddLinkToWorkflow(link_object, link.ID) } else { issues = append(issues, w.processLinkErrors(link)) } } return issues } func (w *Workflow) processLinkErrors(link MxLink) (issue error) { if len(link.Source) == 0 && len(link.Target) == 0 { issue = errors.New("Arrow " + link.ID + " is alone") } else if len(link.Source) == 0 { targetObj := w.GetResource(&link.Target) issue = errors.New("Arrow (" + link.ID + ") to " + *targetObj.getName() + " without parent") } else { sourceObj := w.GetResource(&link.Source) issue = errors.New("Arrow " + link.ID + " from " + *sourceObj.getName() + " without target") } return issue } func (w *Workflow) checkLinks(issues []error) []error { // Check that storage components have a valid link for id, storage := range w.Storage { if(!w.isComponentSrc(id) && !w.isComponentDst(id)){ issues = append(issues, errors.New("Storage "+*storage.getName()+" without compatible inputs and outputs")) } } // Check that data components are linked to a computing component for id, data := range w.Data { if(!w.isLinkedToComputing(id)){ issues = append(issues, errors.New("Data "+*data.getName()+" not attached to any Computing")) } } // Check that DC is linked to a computing component for id, dc:= range w.Datacenter { if(!w.isLinkedToComputing(id)){ issues = append(issues, errors.New("Datacenter "+*dc.getName()+" not attached to any Computing")) } } // Check that all data computing components are linked to a DC for id,comp:= range w.Computing { if(!w.isLinkedToDC(id)){ issues = append(issues, errors.New("Computing "+*comp.getName()+" not attached to any datacenter")) } } return issues } func (w *Workflow) isComponentSrc(id string) bool { for _, link := range w.Links{ if(link.Source == id && link.Source != ""){ return true } } return false } func (w *Workflow) isComponentDst(id string) bool { for _, link := range w.Links{ if(link.Destination == id && link.Source != ""){ return true } } return false } func (w *Workflow) isLinkedToComputing(id string) bool { for idComputing, _ := range w.Computing { if( (w.isComponentSrc(id) && w.isComponentDst(idComputing)) || (w.isComponentSrc(idComputing) && w.isComponentDst(id))){ return true } } return false } func (w *Workflow) isLinkedToDC(id string) bool { for _, link := range w.Links { if link.Source == id && link.DCLink { return true } } return false } func sumExecutionReqs(exeqReq ...ExecutionRequirementsModel) (ret ExecutionRequirementsModel) { for _, v := range exeqReq { ret.CPUs += v.CPUs ret.GPUs += v.GPUs ret.RAM += v.RAM } return } func CheckAndBookWorkflowSchedule(username, workflowName string, book bool) (myRet []DCstatus, err error) { userWorkspace := GetWorkspace(username) if userWorkspace == nil { return nil, errors.New("No workspace for user " + username) } currentWorkflow, ok := userWorkspace.Workflows[workflowName] if !ok { return nil, errors.New("No workflow " + workflowName + " for user " + username) } if currentWorkflow.Schedules.IsBooked { return nil, errors.New("Can't operate DCs for a already booked schedule") } // dd := ¤tWorkflow // We can have multiple DCobjs pointing to the same DCmodel. We must sum all req of the same DCmodel totalDCs := make(map[primitive.ObjectID]ExecutionRequirementsModel) for dcIDobj, dcObj := range currentWorkflow.Datacenter { modelID := dcObj.getReference() var totalsModel ExecutionRequirementsModel totalsModel, err = currentWorkflow.GetExecutionRequirements(dcIDobj) if err != nil { return } if _, ok := totalDCs[modelID]; ok { totalDCs[modelID] = sumExecutionReqs(totalDCs[modelID], totalsModel) } else { totalDCs[modelID] = totalsModel } } myRet = make([]DCstatus, len(totalDCs)) var i int i = -1 for modelID, execReq := range totalDCs { i++ var dcModel *DatacenterModel dcModel, err = GetOneDatacenter(modelID.Hex()) if err != nil { return } myRet[i].DCname = dcModel.Name myRet[i].DCobjID = modelID.Hex() // retrieve the host of the DC host := GetHost(dcModel.Hosts) if host == nil { myRet[i].ErrorMessage = "Datacenter " + myRet[i].DCname + " doesn't have a host property" continue } cli := services.GetSelfAPI(*host) data, err := cli.ScheduleApi.ScheduleControllerCheckIfScheduleCanBeCreatedInThisDC(context.Background(), currentWorkflow.Schedules.Cron, int32(currentWorkflow.Schedules.Duration), currentWorkflow.Schedules.StartDate.Format(time.RFC3339), currentWorkflow.Schedules.StopDate.Format(time.RFC3339), swagger.ModelsExecutionRequirementsModel{ Cpus: int32(execReq.CPUs), Gpus: int32(execReq.GPUs), Ram: int32(execReq.RAM), }, ) if err != nil { myRet[i].ErrorMessage = err.Error() swErr, ok := err.(swagger.GenericSwaggerError) if ok { myRet[i].IsReachable = true myRet[i].ErrorMessage += ": " + string(swErr.Body()) } continue } myRet[i].IsReachable = true if data.StatusCode == 200 { myRet[i].IsAvailable = true } } // If we only check, we should exit here if !book { return } for _, v := range myRet { if !v.IsAvailable { return } } i = -1 allBooked := true for modelID, execReq := range totalDCs { i++ // _ = v var dcModel *DatacenterModel dcModel, err = GetOneDatacenter(modelID.Hex()) if err != nil { myRet[i].ErrorMessage = err.Error() continue } cli := services.GetSelfAPI(*GetHost(dcModel.Hosts)) // If we are here, we already check that host exists and is reachable data, resp, err := cli.ScheduleApi.ScheduleControllerCreateSchedule(context.Background(), services.DC_NAME, workflowName, currentWorkflow.Schedules.Cron, int32(currentWorkflow.Schedules.Duration), currentWorkflow.Schedules.StartDate.Format(time.RFC3339), currentWorkflow.Schedules.StopDate.Format(time.RFC3339), swagger.ModelsExecutionRequirementsModel{ Cpus: int32(execReq.CPUs), Gpus: int32(execReq.GPUs), Ram: int32(execReq.RAM), }, ) if err != nil { allBooked = false myRet[i].ErrorMessage = err.Error() swErr, ok := err.(swagger.GenericSwaggerError) if ok { myRet[i].IsReachable = true myRet[i].ErrorMessage += ": " + string(swErr.Body()) } continue } if resp.StatusCode == 200 { //FIXME: Maybe some better way of casting? var nextExec [5]string for counter := 0; counter < 5; counter++ { nextExec[counter] = data.NextExecutions[counter] } myRet[i].Booked = &ScheduleInfo{ Total: int(data.Total), NextExecutions: nextExec, } } } // If some DC fail, we must not mark the workflow as booked if !allBooked { return } currentWorkflow.Schedules.IsBooked = true _, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx, primitive.M{"_id": username}, primitive.M{"$set": primitive.M{ WorkflowDB + "." + workflowName + "." + SchedulesDB: ¤tWorkflow.Schedules}}, ) if err != nil { logs.Critical("Internal error when updating in DB: " + err.Error()) } return myRet, nil } // Not sure if this method handles error propagation well func (wf Workflow) mxCellToComponent(cell MxCell, ws Workspace) (resObj ResourceObject, err error, issues []error) { rType := ws.getRtype(*cell.RID) if rType == rtype.INVALID { return nil, errors.New("Refering to a rID that is not in the workflow"), nil } // Generate ObjectID for the reference ID rIDObj, err := primitive.ObjectIDFromHex(*cell.RID) if err != nil { return nil, errors.New("Bad ID format: " + *cell.RID), nil } resObj = wf.CreateResourceObject(rType) resObj.setReference(rIDObj) return }