1157 lines
30 KiB
Go
1157 lines
30 KiB
Go
package models
|
|
|
|
import (
|
|
"context"
|
|
"encoding/xml"
|
|
"errors"
|
|
"net/url"
|
|
"os"
|
|
"sort"
|
|
"time"
|
|
|
|
"cloud.o-forge.io/core/deprecated-oc-catalog/models/rtype"
|
|
swagger "cloud.o-forge.io/core/deprecated-oc-catalog/selfapi"
|
|
"cloud.o-forge.io/core/deprecated-oc-catalog/services"
|
|
"github.com/beego/beego/v2/core/logs"
|
|
"github.com/vk496/cron"
|
|
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
)
|
|
|
|
type LinkingState uint
|
|
|
|
// When we check the schedule of a workflow, we report with this
|
|
type DCstatus struct {
|
|
DCname string
|
|
DCobjID string //FIXME: Probably should be model ID
|
|
IsReachable bool
|
|
|
|
IsAvailable bool
|
|
Booked *ScheduleInfo
|
|
|
|
ErrorMessage string
|
|
}
|
|
|
|
const (
|
|
NO_LINK LinkingState = iota
|
|
INPUT
|
|
OUTPUT
|
|
)
|
|
|
|
func boolToLinkingState(boolState bool) LinkingState {
|
|
switch boolState {
|
|
case true:
|
|
return INPUT
|
|
case false:
|
|
return OUTPUT
|
|
default:
|
|
return NO_LINK
|
|
}
|
|
}
|
|
|
|
const SchedulesDB = "schedules"
|
|
|
|
type Workflow struct {
|
|
// The key of the map is the ID of the object itself
|
|
Data map[string]DataObject `json:"data"`
|
|
Computing map[string]ComputingObject `json:"computing"`
|
|
Storage map[string]StorageObject `json:"storage"`
|
|
Datacenter map[string]DatacenterObject `json:"datacenter"` //TODO: Decide if there should be multiple objects of a datacenter
|
|
Links map[string]Link `json:"link"`
|
|
|
|
Schedules WorkflowSchedule `json:"schedules"`
|
|
|
|
MxgraphXML string `description:"State of the mxgraph"`
|
|
}
|
|
|
|
// TODO : describe what use case this interface satisfies
|
|
|
|
type ResourceObject interface {
|
|
getHost() *string
|
|
getName() *string
|
|
getModel() (ResourceModel, error)
|
|
getRtype() rtype.Rtype
|
|
setReference(rObjID primitive.ObjectID)
|
|
getReference() primitive.ObjectID
|
|
isLinked(rObjID string) LinkingState
|
|
addLink(direction LinkingState, rObjID string)
|
|
}
|
|
|
|
// This type allows to process computing and storage component
|
|
// which can get input from the user
|
|
type EditableResourceObject interface {
|
|
ResourceObject
|
|
addUserInput(map[string]interface{})
|
|
}
|
|
|
|
// Get a sum of all execution requirements attached to a DC obj
|
|
func (wf Workflow) GetExecutionRequirements(dcIDobj string) (ret ExecutionRequirementsModel, err error) {
|
|
|
|
// Find the id of the DC obj
|
|
|
|
if _, ok := wf.Datacenter[dcIDobj]; !ok {
|
|
return ExecutionRequirementsModel{}, errors.New("DC obj" + dcIDobj + " doesn't exist in the Workflow")
|
|
}
|
|
|
|
// Get all elements that are attached to the DC
|
|
|
|
for _, computingObj := range wf.Computing {
|
|
if computingObj.DataCenterID == dcIDobj {
|
|
mymodel, err := computingObj.getModel()
|
|
if err != nil {
|
|
return ExecutionRequirementsModel{}, err
|
|
}
|
|
|
|
compModel := mymodel.(ComputingModel)
|
|
|
|
//TODO a generic way to concatenate execution requirements
|
|
ret.CPUs += compModel.ExecutionRequirements.CPUs
|
|
ret.GPUs += compModel.ExecutionRequirements.GPUs
|
|
ret.RAM += compModel.ExecutionRequirements.RAM
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (w *Workflow) GetResource(rObjID *string) (retObj ResourceObject) {
|
|
|
|
if rObjID == nil {
|
|
return nil
|
|
}
|
|
|
|
if datVal, ok := w.Data[*rObjID]; ok {
|
|
retObj = &datVal
|
|
return
|
|
}
|
|
|
|
if compVal, ok := w.Computing[*rObjID]; ok {
|
|
retObj = &compVal
|
|
return
|
|
}
|
|
|
|
if storVal, ok := w.Storage[*rObjID]; ok {
|
|
retObj = &storVal
|
|
return
|
|
}
|
|
|
|
if dcVal, ok := w.Datacenter[*rObjID]; ok {
|
|
retObj = &dcVal
|
|
return
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (w *Workflow) GetResourceMapByRtype(rt rtype.Rtype) interface{} {
|
|
switch rt {
|
|
case rtype.DATA:
|
|
return w.Data
|
|
case rtype.COMPUTING:
|
|
return w.Computing
|
|
case rtype.STORAGE:
|
|
return w.Storage
|
|
case rtype.DATACENTER:
|
|
return w.Datacenter
|
|
default:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (w *Workflow) CreateResourceObject(rt rtype.Rtype) ResourceObject {
|
|
var res ResourceObject
|
|
switch rt {
|
|
case rtype.DATA:
|
|
res = &DataObject{}
|
|
case rtype.COMPUTING:
|
|
res = &ComputingObject{}
|
|
case rtype.STORAGE:
|
|
res = &StorageObject{}
|
|
case rtype.DATACENTER:
|
|
res = &DatacenterObject{}
|
|
default:
|
|
res = nil
|
|
}
|
|
|
|
return res
|
|
}
|
|
|
|
func (w *Workflow) AddObj(robj ResourceObject) *primitive.ObjectID {
|
|
outputID := primitive.NewObjectID()
|
|
w.UpdateObj(robj, outputID.Hex())
|
|
|
|
return &outputID
|
|
}
|
|
|
|
func (w *Workflow) AddLinkToWorkflow(link Link, id string) {
|
|
if w.Links == nil {
|
|
w.Links = make(map[string]Link)
|
|
}
|
|
w.Links[id] = link
|
|
}
|
|
|
|
func (w *Workflow) UpdateDB(userID, workflowName string) error {
|
|
|
|
_, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
|
|
primitive.M{"_id": userID},
|
|
primitive.M{"$set": primitive.M{WorkflowDB + "." + workflowName: w}},
|
|
)
|
|
|
|
return err
|
|
}
|
|
|
|
func (w *Workflow) UpdateObj(robj ResourceObject, objID string) {
|
|
switch robj.getRtype() {
|
|
case rtype.DATA:
|
|
var target DataObject
|
|
if w.Data == nil {
|
|
//init
|
|
w.Data = make(map[string]DataObject)
|
|
}
|
|
target = *robj.(*DataObject)
|
|
w.Data[objID] = target
|
|
|
|
case rtype.COMPUTING:
|
|
var target ComputingObject
|
|
if w.Computing == nil {
|
|
//init
|
|
w.Computing = make(map[string]ComputingObject)
|
|
}
|
|
target = *robj.(*ComputingObject)
|
|
w.Computing[objID] = target
|
|
|
|
case rtype.STORAGE:
|
|
var target StorageObject
|
|
if w.Storage == nil {
|
|
//init
|
|
w.Storage = make(map[string]StorageObject)
|
|
}
|
|
target = *robj.(*StorageObject)
|
|
w.Storage[objID] = target
|
|
case rtype.DATACENTER:
|
|
var target DatacenterObject
|
|
if w.Datacenter == nil {
|
|
//init
|
|
w.Datacenter = make(map[string]DatacenterObject)
|
|
}
|
|
target = *robj.(*DatacenterObject)
|
|
w.Datacenter[objID] = target
|
|
}
|
|
|
|
}
|
|
|
|
func GetWorkflow(userID, workflowName string) (workflow *Workflow, err error) {
|
|
|
|
userWorkspace := GetWorkspace(userID)
|
|
|
|
if userWorkspace != nil {
|
|
if theWorkflow, ok := userWorkspace.Workflows[workflowName]; ok {
|
|
return &theWorkflow, nil
|
|
}
|
|
logs.Debug("No workflow name")
|
|
return nil, errors.New("No workflow name")
|
|
}
|
|
|
|
logs.Debug("No workspace")
|
|
return nil, errors.New("No workspace")
|
|
}
|
|
|
|
func CreateWorkflow(userID, workflowName string) (err error) {
|
|
|
|
//TODO: Maybe operate directly in the DB instead retriving the full object?
|
|
userWorkspace := GetWorkspace(userID)
|
|
|
|
// Exist in the DB
|
|
if userWorkspace != nil {
|
|
|
|
if _, ok := userWorkspace.Workflows[workflowName]; ok {
|
|
message := "Workspace workflow " + workflowName +
|
|
" is already created for user " + userID
|
|
logs.Debug(message)
|
|
return errors.New(message)
|
|
}
|
|
|
|
userWP := &Workflow{}
|
|
|
|
// New element
|
|
addElem := primitive.M{WorkflowDB + "." + workflowName: userWP}
|
|
|
|
// If user doesn't have workflows, we must init at least one
|
|
if userWorkspace.Workflows == nil {
|
|
addElem = primitive.M{WorkflowDB: map[string]*Workflow{
|
|
workflowName: userWP,
|
|
}}
|
|
}
|
|
|
|
_, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
|
|
primitive.M{"_id": userID},
|
|
primitive.M{"$set": addElem},
|
|
)
|
|
|
|
if err != nil {
|
|
message := "Internal error when updating in DB"
|
|
logs.Debug(message + "; " + err.Error())
|
|
return errors.New(message)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
return errors.New("Can't create a workflow without a workspace")
|
|
}
|
|
|
|
func CreateObjectInWorkflow(userID, workflowName, rID string) (rObjID2 *string, err error) {
|
|
|
|
userWorkspace := GetWorkspace(userID)
|
|
|
|
if userWorkspace == nil {
|
|
return nil, errors.New("No workspace for user " + userID)
|
|
}
|
|
|
|
if _, ok := userWorkspace.Workflows[workflowName]; !ok {
|
|
return nil, errors.New("Workspace workflow " + workflowName + " doesn't exist for user " + userID)
|
|
}
|
|
|
|
rIDObj, err := primitive.ObjectIDFromHex(rID)
|
|
|
|
if err != nil {
|
|
return nil, errors.New("ID " + rID + " is not valid")
|
|
}
|
|
|
|
//TODO: We are replacing the entire array instead of pushing
|
|
// a new element. Probably will have problems with multithread/async
|
|
// operations and consistency in the future
|
|
|
|
for rtyp, resource := range userWorkspace.GetResources() {
|
|
if contains(resource, rID) {
|
|
wWorkflow := userWorkspace.Workflows[workflowName]
|
|
|
|
newObj := wWorkflow.CreateResourceObject(rtyp)
|
|
newObj.setReference(rIDObj)
|
|
|
|
outputID := wWorkflow.AddObj(newObj)
|
|
|
|
_, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
|
|
primitive.M{"_id": userID},
|
|
primitive.M{"$set": primitive.M{WorkflowDB + "." + workflowName + "." + rtyp.String(): wWorkflow.GetResourceMapByRtype(rtyp)}},
|
|
)
|
|
|
|
if err != nil {
|
|
return nil, errors.New("Internal error when updating in DB: " + err.Error())
|
|
}
|
|
|
|
outStr := outputID.Hex()
|
|
|
|
return &outStr, nil
|
|
}
|
|
}
|
|
|
|
return nil, errors.New("rID " + rID + " doesn't exist in the user workspace")
|
|
}
|
|
|
|
func LinkObjectsInWorkspace(userID, workflowName, rObjIDsource string, isInput bool, rObjIDtarger string) (err error) {
|
|
userWorkspace := GetWorkspace(userID)
|
|
|
|
if userWorkspace == nil {
|
|
return errors.New("No workspace for user " + userID)
|
|
}
|
|
|
|
if _, ok := userWorkspace.Workflows[workflowName]; !ok {
|
|
return errors.New("Workspace workflow " + workflowName + " doesn't exist for user " + userID)
|
|
}
|
|
|
|
// Check rObjIDsource
|
|
if _, ok := userWorkspace.Workflows[workflowName].Data[rObjIDsource]; !ok {
|
|
return errors.New("rObjIDsource must be of type DATA for now")
|
|
}
|
|
|
|
// Check rObjIDtarger
|
|
|
|
wWorkflow := userWorkspace.Workflows[workflowName]
|
|
|
|
resObjTarget := wWorkflow.GetResource(&rObjIDtarger)
|
|
|
|
if resObjTarget == nil {
|
|
return errors.New("rObjIDtarger doesn't exist")
|
|
} else if resObjTarget.getRtype() == rtype.DATA {
|
|
return errors.New("rObjIDtarger of type Data doesn't have inputs/outputs")
|
|
}
|
|
|
|
if resObjTarget.isLinked(rObjIDsource) != NO_LINK {
|
|
return errors.New("rObjIDsource already exists in the inputs or outputs")
|
|
}
|
|
|
|
resObjTarget.addLink(boolToLinkingState(isInput), rObjIDsource)
|
|
|
|
_, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
|
|
primitive.M{"_id": userID},
|
|
primitive.M{"$set": primitive.M{
|
|
WorkflowDB + "." +
|
|
workflowName + "." +
|
|
resObjTarget.getRtype().String() + "." +
|
|
rObjIDtarger: resObjTarget}},
|
|
)
|
|
|
|
if err != nil {
|
|
return errors.New("Internal error when updating in DB: " + err.Error())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func GetWorkflowSchedule(username, workflowName string) (Currentschedule *WorkflowSchedule, err error) {
|
|
|
|
userWorkspace := GetWorkspace(username)
|
|
|
|
if userWorkspace == nil {
|
|
return nil, errors.New("No workspace for user " + username)
|
|
}
|
|
|
|
if workflow, ok := userWorkspace.Workflows[workflowName]; ok {
|
|
Currentschedule = &workflow.Schedules
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func SetWorkflowSchedule(username, workflowName, cronString, events string, isService bool, startDate, stopDate time.Time, duration uint) (NextSchedules *ScheduleInfo, err error) {
|
|
|
|
userWorkspace := GetWorkspace(username)
|
|
|
|
if userWorkspace == nil {
|
|
return nil, errors.New("No workspace for user " + username)
|
|
}
|
|
|
|
// Check if workflow exist
|
|
if _, ok := userWorkspace.Workflows[workflowName]; !ok {
|
|
return
|
|
}
|
|
|
|
// We mustn't modify a booked schedule
|
|
if userWorkspace.Workflows[workflowName].Schedules.IsBooked {
|
|
return nil, errors.New("A booked schedule can't be modified")
|
|
}
|
|
|
|
sch := WorkflowSchedule{}
|
|
NextSchedules = &ScheduleInfo{}
|
|
|
|
sch.StartDate = startDate
|
|
sch.StopDate = stopDate
|
|
sch.IsService = isService
|
|
|
|
if isService { // Service
|
|
NextSchedules.NextExecutions[0] = startDate.String()
|
|
NextSchedules.Total = 1
|
|
}
|
|
|
|
if !isService { // Task
|
|
sch.Cron = cronString
|
|
sch.Duration = duration
|
|
sch.Events = events
|
|
|
|
// Obtain next executions
|
|
counter := 0
|
|
myCron, _ := cron.Parse(cronString) // NOTE: already checked in the controller
|
|
scheduledStart := myCron.Next(startDate) // Get the first execution starting from startDate
|
|
|
|
for !scheduledStart.IsZero() && counter < MAX_SCHEDULES {
|
|
scheduleStop := scheduledStart.Add(time.Second * time.Duration(duration))
|
|
if scheduleStop.After(stopDate) || scheduledStart.Before(startDate) {
|
|
// If a task is longer than last possible date, we ignore it
|
|
scheduledStart = myCron.Next(scheduledStart)
|
|
counter++
|
|
continue
|
|
}
|
|
|
|
if counter < len(NextSchedules.NextExecutions) {
|
|
NextSchedules.NextExecutions[counter] = scheduledStart.String()
|
|
}
|
|
|
|
scheduledStart = myCron.Next(scheduledStart)
|
|
counter++
|
|
}
|
|
|
|
NextSchedules.Total = counter
|
|
|
|
if NextSchedules.Total == 0 {
|
|
return nil, errors.New("Current Task configuration will have 0 executions")
|
|
}
|
|
}
|
|
|
|
_, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
|
|
primitive.M{"_id": username},
|
|
primitive.M{"$set": primitive.M{
|
|
WorkflowDB + "." +
|
|
workflowName + "." +
|
|
SchedulesDB: &sch}},
|
|
)
|
|
|
|
if err != nil {
|
|
return nil, errors.New("Internal error when updating in DB: " + err.Error())
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func GetMxGraph(username, workflowName string) (xmlData *string, err error) {
|
|
userWorkspace := GetWorkspace(username)
|
|
|
|
if userWorkspace == nil {
|
|
return nil, errors.New("No workspace for user " + username)
|
|
}
|
|
|
|
if _, ok := userWorkspace.Workflows[workflowName]; !ok {
|
|
return nil, errors.New("Workspace workflow " + workflowName + " doesn't exist for user " + username)
|
|
}
|
|
|
|
var data string = userWorkspace.Workflows[workflowName].MxgraphXML
|
|
|
|
if data == "" {
|
|
xmlData = nil
|
|
} else {
|
|
xmlData = &data
|
|
}
|
|
return
|
|
}
|
|
|
|
func ParseMxGraph(username, workflowName, xmlData string) (err error, mxissues []error) {
|
|
|
|
userWorkspace := GetWorkspace(username)
|
|
|
|
if userWorkspace == nil {
|
|
return errors.New("No workspace for user " + username), nil
|
|
}
|
|
|
|
currentWorkflow, ok := userWorkspace.Workflows[workflowName]
|
|
if !ok {
|
|
return errors.New("No workflow " + workflowName), nil
|
|
}
|
|
|
|
if currentWorkflow.Schedules.IsBooked {
|
|
//well, let's allow that
|
|
//return errors.New("Can't modify a booked workflow"), nil
|
|
}
|
|
|
|
decodedValue, err := url.QueryUnescape(xmlData)
|
|
if err != nil {
|
|
return err, nil
|
|
}
|
|
|
|
// TEMPORARY test the xml created
|
|
os.WriteFile("graph.xml", []byte(decodedValue), 0660)
|
|
|
|
var xmlModel MxGraphModel
|
|
|
|
// logs.Debug(xmlData)
|
|
err = xml.Unmarshal([]byte(xmlData), &xmlModel)
|
|
if err != nil {
|
|
return err, nil
|
|
}
|
|
|
|
xmlModel.createLinks()
|
|
|
|
targetWorkspaceWorkflow, err, mxissues := userWorkspace.ConsumeMxGraphModel(xmlModel)
|
|
if err != nil {
|
|
return err, nil
|
|
}
|
|
|
|
targetWorkspaceWorkflow.MxgraphXML = xmlData
|
|
targetWorkspaceWorkflow.Schedules = currentWorkflow.Schedules //TODO: Probably we should move schedules outside the workflow
|
|
|
|
_, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
|
|
primitive.M{"_id": username},
|
|
primitive.M{"$set": primitive.M{
|
|
WorkflowDB + "." +
|
|
workflowName: targetWorkspaceWorkflow}},
|
|
)
|
|
|
|
if err != nil {
|
|
return errors.New("Internal error when updating in DB: " + err.Error()), nil
|
|
}
|
|
|
|
return nil, mxissues
|
|
}
|
|
|
|
// FindInSlice takes a slice and looks for an element in it. If found it will
|
|
// return it's key, otherwise it will return -1 and a bool of false.
|
|
func FindInSlice(slice []string, val string) (int, bool) {
|
|
for i, item := range slice {
|
|
if item == val {
|
|
return i, true
|
|
}
|
|
}
|
|
return -1, false
|
|
}
|
|
|
|
// At least one element exist in both slices
|
|
// Return index1, index2 and if exist
|
|
func FindSliceInSlice(slice1 []string, slice2 []string) (int, int, bool) {
|
|
for i1, item1 := range slice1 {
|
|
for i2, item2 := range slice2 {
|
|
if item1 == item2 {
|
|
return i1, i2, true
|
|
}
|
|
}
|
|
}
|
|
return -1, -1, false
|
|
}
|
|
|
|
func (ws Workspace) ConsumeMxGraphModel(xmlmodel MxGraphModel) (returned_wf *Workflow, err error, issues []error) {
|
|
|
|
returned_wf = &Workflow{}
|
|
|
|
// When we will iterate over the full array of cells, we first will register the resources
|
|
// and after the linkage between them
|
|
sort.Slice(xmlmodel.Root.MxCell, func(i, j int) bool {
|
|
return xmlmodel.Root.MxCell[i].RID != nil
|
|
})
|
|
|
|
// Create the object and add it to the appropriate list
|
|
// for all the components with setting, which are identified
|
|
// by a MxObject tag in the xml
|
|
if ok := xmlmodel.Root.MxObject != nil; ok {
|
|
for _, object := range *xmlmodel.Root.MxObject {
|
|
|
|
resObj, err, mxissues := returned_wf.mxCellToComponent(object.MxCell, ws)
|
|
if err != nil {
|
|
issues = append(issues, mxissues...)
|
|
}
|
|
|
|
// add the component to the worflow's attribute that stores
|
|
// all components in a map[string]Component where the key
|
|
// is the component's ID in the mxGraph and the value the Component object
|
|
returned_wf.UpdateObj(resObj, object.ID)
|
|
|
|
// Construct the object corresponding to the componant's type and use its addUserInput method
|
|
if resObj.getRtype() == rtype.COMPUTING {
|
|
comp_obj := returned_wf.GetResource(&object.ID).(*ComputingObject)
|
|
comp_obj.AddUserInput(object.Settings)
|
|
returned_wf.UpdateObj(comp_obj, object.ID)
|
|
}
|
|
// if(resObj.getRtype() == rtype.DATA){
|
|
// }
|
|
}
|
|
}
|
|
|
|
for _, cell := range xmlmodel.Root.MxCell {
|
|
|
|
switch {
|
|
case cell.RID != nil:
|
|
resObj, err, mxissues := returned_wf.mxCellToComponent(cell, ws)
|
|
if err != nil {
|
|
issues = append(issues, mxissues...)
|
|
}
|
|
|
|
returned_wf.UpdateObj(resObj, cell.ID)
|
|
|
|
case cell.ID == "0" || cell.ID == "1":
|
|
// ID 0 and 1 are special cases of mxeditor
|
|
continue
|
|
// issues = append(issues, errors.New("MxCell with ID "+cell.ID+" doesn't have a valid link"))
|
|
|
|
default:
|
|
// Not root nor resource. Should be only links
|
|
sourceObj := returned_wf.GetResource(cell.Source)
|
|
targetObj := returned_wf.GetResource(cell.Target)
|
|
|
|
if sourceObj == nil || targetObj == nil {
|
|
if sourceObj == nil && targetObj == nil {
|
|
issues = append(issues, errors.New("Arrow "+cell.ID+" is alone"))
|
|
} else if sourceObj == nil {
|
|
issues = append(issues, errors.New("Arrow ("+cell.ID+") to "+*targetObj.getName()+" without parent"))
|
|
} else {
|
|
issues = append(issues, errors.New("Arrow "+cell.ID+" from "+*sourceObj.getName()+" without target"))
|
|
}
|
|
|
|
// If is a invalid link, we can't save it in the DB
|
|
continue
|
|
}
|
|
|
|
// Not root nor resource. Should be only links
|
|
// If is a invalid link, we can't save it in the DB
|
|
// We should always get a ID because we already registered resources and discarded which doesn't correspond to existent models
|
|
// save back
|
|
// If we have a relationship of:
|
|
// Source ----> Target
|
|
//
|
|
// The Source will be in the INPUTs of the Target.
|
|
// But we also must make sure that the Target will be in the OUTPUTs of the Source
|
|
|
|
}
|
|
}
|
|
|
|
issues = returned_wf.createLinks(xmlmodel.Root.MxLink, issues)
|
|
issues = returned_wf.checkLinks(issues)
|
|
|
|
// dcslist := make(map[string]bool)
|
|
// dataslist := make(map[string]bool)
|
|
// // datalist := make(map[string]bool)
|
|
|
|
|
|
// // Test wether the computing components are linked with a DC
|
|
// for _, comp := range returned_wf.Computing {
|
|
// if comp.DataCenterID == "" {
|
|
// issues = append(issues, errors.New("Computing "+*comp.getName()+" without a Datacenter"))
|
|
// } else {
|
|
// // If doesn't exist in the list, means is new element to register as used
|
|
// dcslist[comp.DataCenterID] = true
|
|
|
|
// }
|
|
|
|
// for _, dcin := range comp.Inputs {
|
|
// switch returned_wf.GetResource(&dcin).getRtype() {
|
|
// case rtype.DATA:
|
|
// dataslist[dcin] = true
|
|
// }
|
|
// }
|
|
|
|
// for _, dcout := range comp.Outputs {
|
|
// switch returned_wf.GetResource(&dcout).getRtype() {
|
|
// case rtype.DATA:
|
|
// dataslist[dcout] = true
|
|
// }
|
|
// }
|
|
|
|
// }
|
|
|
|
// for _, storage_component := range returned_wf.Storage {
|
|
// if storage_component.Inputs == nil && storage_component.Outputs == nil {
|
|
// issues = append(issues, errors.New("Storage "+*storage_component.getName()+" without compatible inputs and outputs"))
|
|
// }
|
|
// }
|
|
|
|
// for dcID, dc_component := range returned_wf.Datacenter {
|
|
// // if rID doesn't exist in the list, it means that it's not used
|
|
// if _, ok := dcslist[dcID]; !ok {
|
|
// issues = append(issues, errors.New("DC "+*dc_component.getName()+" not attached to any Computing"))
|
|
// }
|
|
// }
|
|
|
|
// for dcID, data_component := range returned_wf.Data {
|
|
// // if rID doesn't exist in the list, it means that it's not used
|
|
// if _, ok := dataslist[dcID]; !ok {
|
|
// issues = append(issues, errors.New("Data "+*data_component.getName()+" not attached to any Computing"))
|
|
// }
|
|
// }
|
|
|
|
//////////////////////////////////////////////////////////
|
|
// //
|
|
// Starting from here, we check the type of resources //
|
|
// //
|
|
//////////////////////////////////////////////////////////
|
|
|
|
// FIXME: Avoid checking twice the same cases (cycles). Ex:
|
|
//
|
|
// Comp1 ----> Comp2
|
|
//
|
|
// In this case, we will check Comp1 outputs with Comp2
|
|
// inputs AND Comp2 inputs with Comp1 outputs, since we are
|
|
// iterating over all existent Computing models in the Graph
|
|
|
|
for _, comp := range returned_wf.Computing {
|
|
|
|
compModel, err2 := comp.getModel()
|
|
if err = err2; err != nil {
|
|
return
|
|
}
|
|
|
|
currentCompModel := compModel.(ComputingModel)
|
|
|
|
// Come computings may not allow inputs or outputs
|
|
if len(currentCompModel.Dinputs) == 0 && len(comp.Inputs) > 0 {
|
|
issues = append(issues, errors.New("Computing "+compModel.getName()+" must not have any input"))
|
|
continue
|
|
}
|
|
|
|
if len(currentCompModel.Doutputs) == 0 && len(comp.Outputs) > 0 {
|
|
issues = append(issues, errors.New("Computing "+compModel.getName()+" must not have any output"))
|
|
continue
|
|
}
|
|
|
|
//TODO: We should allow heterogenous inputs?
|
|
for _, objIn := range comp.Inputs {
|
|
resIn := returned_wf.GetResource(&objIn)
|
|
resInType := resIn.getRtype()
|
|
switch resInType {
|
|
case rtype.DATA:
|
|
|
|
dataModel, err2 := resIn.getModel()
|
|
if err = err2; err != nil {
|
|
return
|
|
}
|
|
myDataModel := dataModel.(DataModel)
|
|
|
|
if _, ok := FindInSlice(currentCompModel.Dinputs, myDataModel.Dtype); !ok {
|
|
issues = append(issues, errors.New("Computing "+compModel.getName()+" can't handle inputs of type "+myDataModel.Dtype+" from Data "+dataModel.getName()))
|
|
}
|
|
|
|
case rtype.COMPUTING:
|
|
inCompModel, err2 := resIn.getModel()
|
|
if err = err2; err != nil {
|
|
return
|
|
}
|
|
|
|
myInComputingModel := inCompModel.(ComputingModel)
|
|
|
|
if _, _, ok := FindSliceInSlice(myInComputingModel.Doutputs, currentCompModel.Dinputs); !ok {
|
|
issues = append(issues, errors.New("Computing "+compModel.getName()+" can't handle any input from "+inCompModel.getName()))
|
|
}
|
|
case rtype.STORAGE:
|
|
// Storage can give use anything, so we always accept it's input for now
|
|
continue
|
|
|
|
default:
|
|
issues = append(issues, errors.New("Computing "+currentCompModel.getName()+" can't have any resource of type "+resInType.String()+" (behaviour not defined)"))
|
|
}
|
|
}
|
|
|
|
//TODO: We should allow heterogenous outputs?
|
|
for _, objOut := range comp.Outputs {
|
|
resOut := returned_wf.GetResource(&objOut)
|
|
resOutType := resOut.getRtype()
|
|
switch resOutType {
|
|
case rtype.COMPUTING:
|
|
outCompModel, err2 := resOut.getModel()
|
|
if err = err2; err != nil {
|
|
return
|
|
}
|
|
|
|
myOutComputingModel := outCompModel.(ComputingModel)
|
|
|
|
if _, _, ok := FindSliceInSlice(currentCompModel.Doutputs, myOutComputingModel.Dinputs); !ok {
|
|
issues = append(issues, errors.New("Computing "+compModel.getName()+" doesn't have output data compatible with "+outCompModel.getName()))
|
|
}
|
|
case rtype.STORAGE:
|
|
// Storage can save anything, so we always accept store it for now
|
|
continue
|
|
|
|
default:
|
|
issues = append(issues, errors.New("Computing "+currentCompModel.getName()+" can't have any resource of type "+resOutType.String()+" (behaviour not defined)"))
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (w *Workflow) createLinks(links []MxLink, issues []error) []error {
|
|
|
|
for _, link := range links {
|
|
if (len(link.Source) > 0 && len(link.Target) > 0){
|
|
sourceObj := w.GetResource(&link.Source)
|
|
targetObj := w.GetResource(&link.Target)
|
|
link_object := NewLink(sourceObj, link.Source, targetObj, link.Target)
|
|
w.AddLinkToWorkflow(link_object, link.ID)
|
|
} else {
|
|
issues = append(issues, w.processLinkErrors(link))
|
|
}
|
|
}
|
|
return issues
|
|
}
|
|
|
|
func (w *Workflow) processLinkErrors(link MxLink) (issue error) {
|
|
if len(link.Source) == 0 && len(link.Target) == 0 {
|
|
issue = errors.New("Arrow " + link.ID + " is alone")
|
|
} else if len(link.Source) == 0 {
|
|
targetObj := w.GetResource(&link.Target)
|
|
issue = errors.New("Arrow (" + link.ID + ") to " + *targetObj.getName() + " without parent")
|
|
} else {
|
|
sourceObj := w.GetResource(&link.Source)
|
|
issue = errors.New("Arrow " + link.ID + " from " + *sourceObj.getName() + " without target")
|
|
}
|
|
|
|
return issue
|
|
}
|
|
|
|
func (w *Workflow) checkLinks(issues []error) []error {
|
|
|
|
// Check that storage components have a valid link
|
|
for id, storage := range w.Storage {
|
|
if(!w.isComponentSrc(id) && !w.isComponentDst(id)){
|
|
issues = append(issues, errors.New("Storage "+*storage.getName()+" without compatible inputs and outputs"))
|
|
}
|
|
}
|
|
|
|
// Check that data components are linked to a computing component
|
|
for id, data := range w.Data {
|
|
if(!w.isLinkedToComputing(id)){
|
|
issues = append(issues, errors.New("Data "+*data.getName()+" not attached to any Computing"))
|
|
}
|
|
}
|
|
|
|
// Check that DC is linked to a computing component
|
|
for id, dc:= range w.Datacenter {
|
|
if(!w.isLinkedToComputing(id)){
|
|
issues = append(issues, errors.New("Datacenter "+*dc.getName()+" not attached to any Computing"))
|
|
}
|
|
|
|
}
|
|
|
|
// Check that all data computing components are linked to a DC
|
|
for id,comp:= range w.Computing {
|
|
if(!w.isLinkedToDC(id)){
|
|
issues = append(issues, errors.New("Computing "+*comp.getName()+" not attached to any datacenter"))
|
|
}
|
|
|
|
}
|
|
|
|
return issues
|
|
}
|
|
|
|
func (w *Workflow) isComponentSrc(id string) bool {
|
|
|
|
for _, link := range w.Links{
|
|
if(link.Source == id && link.Source != ""){
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (w *Workflow) isComponentDst(id string) bool {
|
|
|
|
for _, link := range w.Links{
|
|
if(link.Destination == id && link.Source != ""){
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (w *Workflow) isLinkedToComputing(id string) bool {
|
|
|
|
for idComputing, _ := range w.Computing {
|
|
if( (w.isComponentSrc(id) && w.isComponentDst(idComputing)) || (w.isComponentSrc(idComputing) && w.isComponentDst(id))){
|
|
return true
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func (w *Workflow) isLinkedToDC(id string) bool {
|
|
|
|
for _, link := range w.Links {
|
|
if link.Source == id && link.DCLink {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func sumExecutionReqs(exeqReq ...ExecutionRequirementsModel) (ret ExecutionRequirementsModel) {
|
|
for _, v := range exeqReq {
|
|
ret.CPUs += v.CPUs
|
|
ret.GPUs += v.GPUs
|
|
ret.RAM += v.RAM
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func CheckAndBookWorkflowSchedule(username, workflowName string, book bool) (myRet []DCstatus, err error) {
|
|
|
|
userWorkspace := GetWorkspace(username)
|
|
|
|
if userWorkspace == nil {
|
|
return nil, errors.New("No workspace for user " + username)
|
|
}
|
|
|
|
currentWorkflow, ok := userWorkspace.Workflows[workflowName]
|
|
if !ok {
|
|
return nil, errors.New("No workflow " + workflowName + " for user " + username)
|
|
}
|
|
|
|
if currentWorkflow.Schedules.IsBooked {
|
|
return nil, errors.New("Can't operate DCs for a already booked schedule")
|
|
}
|
|
|
|
// dd := ¤tWorkflow
|
|
|
|
// We can have multiple DCobjs pointing to the same DCmodel. We must sum all req of the same DCmodel
|
|
totalDCs := make(map[primitive.ObjectID]ExecutionRequirementsModel)
|
|
|
|
for dcIDobj, dcObj := range currentWorkflow.Datacenter {
|
|
modelID := dcObj.getReference()
|
|
|
|
var totalsModel ExecutionRequirementsModel
|
|
totalsModel, err = currentWorkflow.GetExecutionRequirements(dcIDobj)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if _, ok := totalDCs[modelID]; ok {
|
|
totalDCs[modelID] = sumExecutionReqs(totalDCs[modelID], totalsModel)
|
|
} else {
|
|
totalDCs[modelID] = totalsModel
|
|
}
|
|
}
|
|
|
|
myRet = make([]DCstatus, len(totalDCs))
|
|
var i int
|
|
i = -1
|
|
for modelID, execReq := range totalDCs {
|
|
i++
|
|
var dcModel *DatacenterModel
|
|
dcModel, err = GetOneDatacenter(modelID.Hex())
|
|
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
myRet[i].DCname = dcModel.Name
|
|
myRet[i].DCobjID = modelID.Hex()
|
|
|
|
// retrieve the host of the DC
|
|
host := GetHost(dcModel.Hosts)
|
|
if host == nil {
|
|
myRet[i].ErrorMessage = "Datacenter " + myRet[i].DCname + " doesn't have a host property"
|
|
continue
|
|
}
|
|
|
|
cli := services.GetSelfAPI(*host)
|
|
data, err := cli.ScheduleApi.ScheduleControllerCheckIfScheduleCanBeCreatedInThisDC(context.Background(),
|
|
currentWorkflow.Schedules.Cron,
|
|
int32(currentWorkflow.Schedules.Duration),
|
|
currentWorkflow.Schedules.StartDate.Format(time.RFC3339),
|
|
currentWorkflow.Schedules.StopDate.Format(time.RFC3339),
|
|
swagger.ModelsExecutionRequirementsModel{
|
|
Cpus: int32(execReq.CPUs),
|
|
Gpus: int32(execReq.GPUs),
|
|
Ram: int32(execReq.RAM),
|
|
},
|
|
)
|
|
|
|
if err != nil {
|
|
myRet[i].ErrorMessage = err.Error()
|
|
swErr, ok := err.(swagger.GenericSwaggerError)
|
|
if ok {
|
|
myRet[i].IsReachable = true
|
|
myRet[i].ErrorMessage += ": " + string(swErr.Body())
|
|
}
|
|
continue
|
|
}
|
|
|
|
myRet[i].IsReachable = true
|
|
|
|
if data.StatusCode == 200 {
|
|
myRet[i].IsAvailable = true
|
|
}
|
|
|
|
}
|
|
|
|
// If we only check, we should exit here
|
|
if !book {
|
|
return
|
|
}
|
|
|
|
for _, v := range myRet {
|
|
if !v.IsAvailable {
|
|
return
|
|
}
|
|
}
|
|
|
|
i = -1
|
|
allBooked := true
|
|
for modelID, execReq := range totalDCs {
|
|
i++
|
|
// _ = v
|
|
var dcModel *DatacenterModel
|
|
dcModel, err = GetOneDatacenter(modelID.Hex())
|
|
|
|
if err != nil {
|
|
myRet[i].ErrorMessage = err.Error()
|
|
continue
|
|
}
|
|
|
|
cli := services.GetSelfAPI(*GetHost(dcModel.Hosts)) // If we are here, we already check that host exists and is reachable
|
|
data, resp, err := cli.ScheduleApi.ScheduleControllerCreateSchedule(context.Background(),
|
|
services.DC_NAME,
|
|
workflowName,
|
|
currentWorkflow.Schedules.Cron,
|
|
int32(currentWorkflow.Schedules.Duration),
|
|
currentWorkflow.Schedules.StartDate.Format(time.RFC3339),
|
|
currentWorkflow.Schedules.StopDate.Format(time.RFC3339),
|
|
swagger.ModelsExecutionRequirementsModel{
|
|
Cpus: int32(execReq.CPUs),
|
|
Gpus: int32(execReq.GPUs),
|
|
Ram: int32(execReq.RAM),
|
|
},
|
|
)
|
|
|
|
if err != nil {
|
|
allBooked = false
|
|
myRet[i].ErrorMessage = err.Error()
|
|
swErr, ok := err.(swagger.GenericSwaggerError)
|
|
if ok {
|
|
myRet[i].IsReachable = true
|
|
myRet[i].ErrorMessage += ": " + string(swErr.Body())
|
|
}
|
|
continue
|
|
}
|
|
|
|
if resp.StatusCode == 200 {
|
|
//FIXME: Maybe some better way of casting?
|
|
|
|
var nextExec [5]string
|
|
for counter := 0; counter < 5; counter++ {
|
|
nextExec[counter] = data.NextExecutions[counter]
|
|
}
|
|
|
|
myRet[i].Booked = &ScheduleInfo{
|
|
Total: int(data.Total),
|
|
NextExecutions: nextExec,
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
// If some DC fail, we must not mark the workflow as booked
|
|
if !allBooked {
|
|
return
|
|
}
|
|
|
|
currentWorkflow.Schedules.IsBooked = true
|
|
_, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
|
|
primitive.M{"_id": username},
|
|
primitive.M{"$set": primitive.M{
|
|
WorkflowDB + "." +
|
|
workflowName + "." +
|
|
SchedulesDB: ¤tWorkflow.Schedules}},
|
|
)
|
|
|
|
if err != nil {
|
|
logs.Critical("Internal error when updating in DB: " + err.Error())
|
|
}
|
|
|
|
return myRet, nil
|
|
}
|
|
|
|
// Not sure if this method handles error propagation well
|
|
func (wf Workflow) mxCellToComponent(cell MxCell, ws Workspace) (resObj ResourceObject, err error, issues []error) {
|
|
rType := ws.getRtype(*cell.RID)
|
|
|
|
if rType == rtype.INVALID {
|
|
return nil,
|
|
errors.New("Refering to a rID that is not in the workflow"),
|
|
nil
|
|
}
|
|
|
|
// Generate ObjectID for the reference ID
|
|
rIDObj, err := primitive.ObjectIDFromHex(*cell.RID)
|
|
if err != nil {
|
|
return nil,
|
|
errors.New("Bad ID format: " + *cell.RID),
|
|
nil
|
|
}
|
|
|
|
resObj = wf.CreateResourceObject(rType)
|
|
resObj.setReference(rIDObj)
|
|
|
|
return
|
|
}
|
|
|