2023-03-03 14:43:11 +01:00
package models
import (
"context"
"encoding/xml"
"errors"
2024-03-28 14:43:33 +01:00
"net/url"
"os"
2023-03-03 14:43:11 +01:00
"sort"
"time"
"cloud.o-forge.io/core/oc-catalog/models/rtype"
swagger "cloud.o-forge.io/core/oc-catalog/selfapi"
"cloud.o-forge.io/core/oc-catalog/services"
"github.com/beego/beego/v2/core/logs"
"github.com/vk496/cron"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type LinkingState uint
// When we check the schedule of a workflow, we report with this
type DCstatus struct {
DCname string
DCobjID string //FIXME: Probably should be model ID
IsReachable bool
IsAvailable bool
Booked * ScheduleInfo
ErrorMessage string
}
const (
NO_LINK LinkingState = iota
INPUT
OUTPUT
)
func boolToLinkingState ( boolState bool ) LinkingState {
switch boolState {
case true :
return INPUT
case false :
return OUTPUT
default :
return NO_LINK
}
}
const SchedulesDB = "schedules"
type Workflow struct {
// The key of the map is the ID of the object itself
2024-04-15 11:42:17 +02:00
Data map [ string ] DataObject ` json:"data" `
Computing map [ string ] ComputingObject ` json:"computing" `
Storage map [ string ] StorageObject ` json:"storage" `
Datacenter map [ string ] DatacenterObject ` json:"datacenter" ` //TODO: Decide if there should be multiple objects of a datacenter
Links map [ string ] Link ` json:"link" `
2023-03-03 14:43:11 +01:00
Schedules WorkflowSchedule ` json:"schedules" `
MxgraphXML string ` description:"State of the mxgraph" `
}
2024-04-12 13:25:01 +02:00
// TODO : describe what use case this interface satisfies
2023-03-03 14:43:11 +01:00
type ResourceObject interface {
getHost ( ) * string
getName ( ) * string
getModel ( ) ( ResourceModel , error )
getRtype ( ) rtype . Rtype
setReference ( rObjID primitive . ObjectID )
getReference ( ) primitive . ObjectID
isLinked ( rObjID string ) LinkingState
addLink ( direction LinkingState , rObjID string )
}
2024-03-29 17:00:53 +01:00
// This type allows to process computing and storage component
2024-03-28 14:43:33 +01:00
// which can get input from the user
type EditableResourceObject interface {
ResourceObject
addUserInput ( map [ string ] interface { } )
}
2023-03-03 14:43:11 +01:00
// Get a sum of all execution requirements attached to a DC obj
2024-03-28 14:43:33 +01:00
func ( wf Workflow ) GetExecutionRequirements ( dcIDobj string ) ( ret ExecutionRequirementsModel , err error ) {
2023-03-03 14:43:11 +01:00
// Find the id of the DC obj
2024-03-28 14:43:33 +01:00
if _ , ok := wf . Datacenter [ dcIDobj ] ; ! ok {
2023-03-03 14:43:11 +01:00
return ExecutionRequirementsModel { } , errors . New ( "DC obj" + dcIDobj + " doesn't exist in the Workflow" )
}
// Get all elements that are attached to the DC
2024-03-28 14:43:33 +01:00
for _ , computingObj := range wf . Computing {
2023-03-03 14:43:11 +01:00
if computingObj . DataCenterID == dcIDobj {
mymodel , err := computingObj . getModel ( )
if err != nil {
return ExecutionRequirementsModel { } , err
}
compModel := mymodel . ( ComputingModel )
//TODO a generic way to concatenate execution requirements
ret . CPUs += compModel . ExecutionRequirements . CPUs
ret . GPUs += compModel . ExecutionRequirements . GPUs
ret . RAM += compModel . ExecutionRequirements . RAM
}
}
return
}
func ( w * Workflow ) GetResource ( rObjID * string ) ( retObj ResourceObject ) {
if rObjID == nil {
return nil
}
if storVal , ok := w . Data [ * rObjID ] ; ok {
retObj = & storVal
return
}
if storVal , ok := w . Computing [ * rObjID ] ; ok {
retObj = & storVal
return
}
if storVal , ok := w . Storage [ * rObjID ] ; ok {
retObj = & storVal
return
}
if storVal , ok := w . Datacenter [ * rObjID ] ; ok {
retObj = & storVal
return
}
return nil
}
func ( w * Workflow ) GetResourceMapByRtype ( rt rtype . Rtype ) interface { } {
switch rt {
case rtype . DATA :
return w . Data
case rtype . COMPUTING :
return w . Computing
case rtype . STORAGE :
return w . Storage
case rtype . DATACENTER :
return w . Datacenter
default :
return nil
}
}
func ( w * Workflow ) CreateResourceObject ( rt rtype . Rtype ) ResourceObject {
var res ResourceObject
switch rt {
case rtype . DATA :
res = & DataObject { }
case rtype . COMPUTING :
res = & ComputingObject { }
case rtype . STORAGE :
res = & StorageObject { }
case rtype . DATACENTER :
res = & DatacenterObject { }
default :
res = nil
}
2024-03-28 14:43:33 +01:00
2023-03-03 14:43:11 +01:00
return res
}
func ( w * Workflow ) AddObj ( robj ResourceObject ) * primitive . ObjectID {
outputID := primitive . NewObjectID ( )
w . UpdateObj ( robj , outputID . Hex ( ) )
return & outputID
}
2024-04-15 11:42:17 +02:00
func ( w * Workflow ) AddLinkToWorkflow ( link Link , id string ) {
if w . Links == nil {
w . Links = make ( map [ string ] Link )
}
w . Links [ id ] = link
}
2023-03-03 14:43:11 +01:00
func ( w * Workflow ) UpdateDB ( userID , workflowName string ) error {
_ , err := services . MngoCollWorkspace . UpdateOne ( services . MngoCtx ,
primitive . M { "_id" : userID } ,
primitive . M { "$set" : primitive . M { WorkflowDB + "." + workflowName : w } } ,
)
return err
}
func ( w * Workflow ) UpdateObj ( robj ResourceObject , objID string ) {
switch robj . getRtype ( ) {
case rtype . DATA :
var target DataObject
if w . Data == nil {
//init
w . Data = make ( map [ string ] DataObject )
}
target = * robj . ( * DataObject )
w . Data [ objID ] = target
case rtype . COMPUTING :
var target ComputingObject
if w . Computing == nil {
//init
w . Computing = make ( map [ string ] ComputingObject )
}
target = * robj . ( * ComputingObject )
w . Computing [ objID ] = target
case rtype . STORAGE :
var target StorageObject
if w . Storage == nil {
//init
w . Storage = make ( map [ string ] StorageObject )
}
target = * robj . ( * StorageObject )
w . Storage [ objID ] = target
case rtype . DATACENTER :
var target DatacenterObject
if w . Datacenter == nil {
//init
w . Datacenter = make ( map [ string ] DatacenterObject )
}
target = * robj . ( * DatacenterObject )
w . Datacenter [ objID ] = target
}
}
func GetWorkflow ( userID , workflowName string ) ( workflow * Workflow , err error ) {
userWorkspace := GetWorkspace ( userID )
if userWorkspace != nil {
if theWorkflow , ok := userWorkspace . Workflows [ workflowName ] ; ok {
return & theWorkflow , nil
}
logs . Debug ( "No workflow name" )
return nil , errors . New ( "No workflow name" )
}
logs . Debug ( "No workspace" )
return nil , errors . New ( "No workspace" )
}
func CreateWorkflow ( userID , workflowName string ) ( err error ) {
//TODO: Maybe operate directly in the DB instead retriving the full object?
userWorkspace := GetWorkspace ( userID )
// Exist in the DB
if userWorkspace != nil {
if _ , ok := userWorkspace . Workflows [ workflowName ] ; ok {
message := "Workspace workflow " + workflowName +
" is already created for user " + userID
logs . Debug ( message )
return errors . New ( message )
}
userWP := & Workflow { }
// New element
addElem := primitive . M { WorkflowDB + "." + workflowName : userWP }
// If user doesn't have workflows, we must init at least one
if userWorkspace . Workflows == nil {
addElem = primitive . M { WorkflowDB : map [ string ] * Workflow {
workflowName : userWP ,
} }
}
_ , err := services . MngoCollWorkspace . UpdateOne ( services . MngoCtx ,
primitive . M { "_id" : userID } ,
primitive . M { "$set" : addElem } ,
)
if err != nil {
message := "Internal error when updating in DB"
logs . Debug ( message + "; " + err . Error ( ) )
return errors . New ( message )
}
return nil
}
return errors . New ( "Can't create a workflow without a workspace" )
}
func CreateObjectInWorkflow ( userID , workflowName , rID string ) ( rObjID2 * string , err error ) {
userWorkspace := GetWorkspace ( userID )
if userWorkspace == nil {
return nil , errors . New ( "No workspace for user " + userID )
}
if _ , ok := userWorkspace . Workflows [ workflowName ] ; ! ok {
return nil , errors . New ( "Workspace workflow " + workflowName + " doesn't exist for user " + userID )
}
rIDObj , err := primitive . ObjectIDFromHex ( rID )
if err != nil {
return nil , errors . New ( "ID " + rID + " is not valid" )
}
//TODO: We are replacing the entire array instead of pushing
// a new element. Probably will have problems with multithread/async
// operations and consistency in the future
for rtyp , resource := range userWorkspace . GetResources ( ) {
if contains ( resource , rID ) {
wWorkflow := userWorkspace . Workflows [ workflowName ]
newObj := wWorkflow . CreateResourceObject ( rtyp )
newObj . setReference ( rIDObj )
outputID := wWorkflow . AddObj ( newObj )
_ , err := services . MngoCollWorkspace . UpdateOne ( services . MngoCtx ,
primitive . M { "_id" : userID } ,
primitive . M { "$set" : primitive . M { WorkflowDB + "." + workflowName + "." + rtyp . String ( ) : wWorkflow . GetResourceMapByRtype ( rtyp ) } } ,
)
if err != nil {
return nil , errors . New ( "Internal error when updating in DB: " + err . Error ( ) )
}
outStr := outputID . Hex ( )
return & outStr , nil
}
}
return nil , errors . New ( "rID " + rID + " doesn't exist in the user workspace" )
}
func LinkObjectsInWorkspace ( userID , workflowName , rObjIDsource string , isInput bool , rObjIDtarger string ) ( err error ) {
userWorkspace := GetWorkspace ( userID )
if userWorkspace == nil {
return errors . New ( "No workspace for user " + userID )
}
if _ , ok := userWorkspace . Workflows [ workflowName ] ; ! ok {
return errors . New ( "Workspace workflow " + workflowName + " doesn't exist for user " + userID )
}
// Check rObjIDsource
if _ , ok := userWorkspace . Workflows [ workflowName ] . Data [ rObjIDsource ] ; ! ok {
return errors . New ( "rObjIDsource must be of type DATA for now" )
}
// Check rObjIDtarger
wWorkflow := userWorkspace . Workflows [ workflowName ]
resObjTarget := wWorkflow . GetResource ( & rObjIDtarger )
if resObjTarget == nil {
return errors . New ( "rObjIDtarger doesn't exist" )
} else if resObjTarget . getRtype ( ) == rtype . DATA {
return errors . New ( "rObjIDtarger of type Data doesn't have inputs/outputs" )
}
if resObjTarget . isLinked ( rObjIDsource ) != NO_LINK {
return errors . New ( "rObjIDsource already exists in the inputs or outputs" )
}
resObjTarget . addLink ( boolToLinkingState ( isInput ) , rObjIDsource )
_ , err = services . MngoCollWorkspace . UpdateOne ( services . MngoCtx ,
primitive . M { "_id" : userID } ,
primitive . M { "$set" : primitive . M {
WorkflowDB + "." +
workflowName + "." +
resObjTarget . getRtype ( ) . String ( ) + "." +
rObjIDtarger : resObjTarget } } ,
)
if err != nil {
return errors . New ( "Internal error when updating in DB: " + err . Error ( ) )
}
return nil
}
func GetWorkflowSchedule ( username , workflowName string ) ( Currentschedule * WorkflowSchedule , err error ) {
userWorkspace := GetWorkspace ( username )
if userWorkspace == nil {
return nil , errors . New ( "No workspace for user " + username )
}
if workflow , ok := userWorkspace . Workflows [ workflowName ] ; ok {
Currentschedule = & workflow . Schedules
}
return
}
func SetWorkflowSchedule ( username , workflowName , cronString , events string , isService bool , startDate , stopDate time . Time , duration uint ) ( NextSchedules * ScheduleInfo , err error ) {
userWorkspace := GetWorkspace ( username )
if userWorkspace == nil {
return nil , errors . New ( "No workspace for user " + username )
}
// Check if workflow exist
if _ , ok := userWorkspace . Workflows [ workflowName ] ; ! ok {
return
}
// We mustn't modify a booked schedule
if userWorkspace . Workflows [ workflowName ] . Schedules . IsBooked {
return nil , errors . New ( "A booked schedule can't be modified" )
}
sch := WorkflowSchedule { }
NextSchedules = & ScheduleInfo { }
sch . StartDate = startDate
sch . StopDate = stopDate
sch . IsService = isService
if isService { // Service
NextSchedules . NextExecutions [ 0 ] = startDate . String ( )
NextSchedules . Total = 1
}
if ! isService { // Task
sch . Cron = cronString
sch . Duration = duration
sch . Events = events
// Obtain next executions
counter := 0
myCron , _ := cron . Parse ( cronString ) // NOTE: already checked in the controller
scheduledStart := myCron . Next ( startDate ) // Get the first execution starting from startDate
for ! scheduledStart . IsZero ( ) && counter < MAX_SCHEDULES {
scheduleStop := scheduledStart . Add ( time . Second * time . Duration ( duration ) )
if scheduleStop . After ( stopDate ) || scheduledStart . Before ( startDate ) {
// If a task is longer than last possible date, we ignore it
scheduledStart = myCron . Next ( scheduledStart )
counter ++
continue
}
if counter < len ( NextSchedules . NextExecutions ) {
NextSchedules . NextExecutions [ counter ] = scheduledStart . String ( )
}
scheduledStart = myCron . Next ( scheduledStart )
counter ++
}
NextSchedules . Total = counter
if NextSchedules . Total == 0 {
return nil , errors . New ( "Current Task configuration will have 0 executions" )
}
}
_ , err = services . MngoCollWorkspace . UpdateOne ( services . MngoCtx ,
primitive . M { "_id" : username } ,
primitive . M { "$set" : primitive . M {
WorkflowDB + "." +
workflowName + "." +
SchedulesDB : & sch } } ,
)
if err != nil {
return nil , errors . New ( "Internal error when updating in DB: " + err . Error ( ) )
}
return
}
func GetMxGraph ( username , workflowName string ) ( xmlData * string , err error ) {
userWorkspace := GetWorkspace ( username )
if userWorkspace == nil {
return nil , errors . New ( "No workspace for user " + username )
}
if _ , ok := userWorkspace . Workflows [ workflowName ] ; ! ok {
return nil , errors . New ( "Workspace workflow " + workflowName + " doesn't exist for user " + username )
}
var data string = userWorkspace . Workflows [ workflowName ] . MxgraphXML
if data == "" {
xmlData = nil
} else {
xmlData = & data
}
return
}
func ParseMxGraph ( username , workflowName , xmlData string ) ( err error , mxissues [ ] error ) {
userWorkspace := GetWorkspace ( username )
if userWorkspace == nil {
return errors . New ( "No workspace for user " + username ) , nil
}
currentWorkflow , ok := userWorkspace . Workflows [ workflowName ]
if ! ok {
return errors . New ( "No workflow " + workflowName ) , nil
}
if currentWorkflow . Schedules . IsBooked {
2023-10-18 11:01:41 +02:00
//well, let's allow that
//return errors.New("Can't modify a booked workflow"), nil
2023-03-03 14:43:11 +01:00
}
2024-03-28 14:43:33 +01:00
decodedValue , err := url . QueryUnescape ( xmlData )
if err != nil {
return err , nil
}
// TEMPORARY test the xml created
os . WriteFile ( "graph.xml" , [ ] byte ( decodedValue ) , 0660 )
2023-03-03 14:43:11 +01:00
var xmlModel MxGraphModel
// logs.Debug(xmlData)
err = xml . Unmarshal ( [ ] byte ( xmlData ) , & xmlModel )
if err != nil {
return err , nil
}
2024-04-16 16:12:54 +02:00
xmlModel . createLinks ( )
2024-03-28 14:43:33 +01:00
2023-03-03 14:43:11 +01:00
targetWorkspaceWorkflow , err , mxissues := userWorkspace . ConsumeMxGraphModel ( xmlModel )
if err != nil {
return err , nil
}
targetWorkspaceWorkflow . MxgraphXML = xmlData
2024-03-28 14:43:33 +01:00
targetWorkspaceWorkflow . Schedules = currentWorkflow . Schedules //TODO: Probably we should move schedules outside the workflow
2023-03-03 14:43:11 +01:00
_ , err = services . MngoCollWorkspace . UpdateOne ( services . MngoCtx ,
primitive . M { "_id" : username } ,
primitive . M { "$set" : primitive . M {
WorkflowDB + "." +
workflowName : targetWorkspaceWorkflow } } ,
)
if err != nil {
return errors . New ( "Internal error when updating in DB: " + err . Error ( ) ) , nil
}
return nil , mxissues
}
// FindInSlice takes a slice and looks for an element in it. If found it will
// return it's key, otherwise it will return -1 and a bool of false.
func FindInSlice ( slice [ ] string , val string ) ( int , bool ) {
for i , item := range slice {
if item == val {
return i , true
}
}
return - 1 , false
}
// At least one element exist in both slices
// Return index1, index2 and if exist
func FindSliceInSlice ( slice1 [ ] string , slice2 [ ] string ) ( int , int , bool ) {
for i1 , item1 := range slice1 {
for i2 , item2 := range slice2 {
if item1 == item2 {
return i1 , i2 , true
}
}
}
return - 1 , - 1 , false
}
2024-03-28 14:43:33 +01:00
func ( ws Workspace ) ConsumeMxGraphModel ( xmlmodel MxGraphModel ) ( returned_wf * Workflow , err error , issues [ ] error ) {
2023-03-03 14:43:11 +01:00
2024-03-22 10:00:35 +01:00
returned_wf = & Workflow { }
2023-03-03 14:43:11 +01:00
// When we will iterate over the full array of cells, we first will register the resources
// and after the linkage between them
sort . Slice ( xmlmodel . Root . MxCell , func ( i , j int ) bool {
return xmlmodel . Root . MxCell [ i ] . RID != nil
} )
2024-04-12 13:25:01 +02:00
// Create the object and add it to the appropriate list
// for all the components with setting, which are identified
// by a MxObject tag in the xml
2024-03-28 14:43:33 +01:00
for _ , object := range * xmlmodel . Root . MxObject {
2024-03-29 17:00:53 +01:00
resObj , err , mxissues := returned_wf . mxCellToComponent ( object . MxCell , ws )
2024-03-28 14:43:33 +01:00
if err != nil {
issues = append ( issues , mxissues ... )
}
2024-03-29 17:00:53 +01:00
// add the component to the worflow's attribute that stores
// all components in a map[string]Component where the key
// is the component's ID in the mxGraph and the value the Component object
returned_wf . UpdateObj ( resObj , object . ID )
// Construct the object corresponding to the componant's type and use its addUserInput method
if ( resObj . getRtype ( ) == rtype . COMPUTING ) {
comp_obj := returned_wf . GetResource ( & object . ID ) . ( * ComputingObject )
comp_obj . AddUserInput ( object . Settings )
returned_wf . UpdateObj ( comp_obj , object . ID )
}
// if(resObj.getRtype() == rtype.DATA){
// }
2024-03-28 14:43:33 +01:00
}
2023-03-03 14:43:11 +01:00
for _ , cell := range xmlmodel . Root . MxCell {
switch {
case cell . RID != nil :
2024-03-29 17:00:53 +01:00
resObj , err , mxissues := returned_wf . mxCellToComponent ( cell , ws )
2023-03-03 14:43:11 +01:00
if err != nil {
2024-03-28 14:43:33 +01:00
issues = append ( issues , mxissues ... )
2023-03-03 14:43:11 +01:00
}
2024-03-22 10:00:35 +01:00
returned_wf . UpdateObj ( resObj , cell . ID )
2023-03-03 14:43:11 +01:00
case cell . ID == "0" || cell . ID == "1" :
// ID 0 and 1 are special cases of mxeditor
continue
// issues = append(issues, errors.New("MxCell with ID "+cell.ID+" doesn't have a valid link"))
default :
2024-04-16 16:12:54 +02:00
// Not root nor resource. Should be only links
sourceObj := returned_wf . GetResource ( cell . Source )
targetObj := returned_wf . GetResource ( cell . Target )
if sourceObj == nil || targetObj == nil {
if sourceObj == nil && targetObj == nil {
issues = append ( issues , errors . New ( "Arrow " + cell . ID + " is alone" ) )
} else if sourceObj == nil {
issues = append ( issues , errors . New ( "Arrow (" + cell . ID + ") to " + * targetObj . getName ( ) + " without parent" ) )
} else {
issues = append ( issues , errors . New ( "Arrow " + cell . ID + " from " + * sourceObj . getName ( ) + " without target" ) )
}
// If is a invalid link, we can't save it in the DB
continue
}
2023-03-03 14:43:11 +01:00
// Not root nor resource. Should be only links
2024-04-15 11:42:17 +02:00
// If is a invalid link, we can't save it in the DB
// We should always get a ID because we already registered resources and discarded which doesn't correspond to existent models
// save back
// If we have a relationship of:
// Source ----> Target
//
// The Source will be in the INPUTs of the Target.
// But we also must make sure that the Target will be in the OUTPUTs of the Source
2023-03-03 14:43:11 +01:00
}
}
2024-04-16 16:12:54 +02:00
issues = returned_wf . CreateLinks ( xmlmodel . Root . MxLink , issues )
issues = returned_wf . CheckLinks ( issues )
// dcslist := make(map[string]bool)
// dataslist := make(map[string]bool)
// // datalist := make(map[string]bool)
// // Test wether the computing components are linked with a DC
// for _, comp := range returned_wf.Computing {
// if comp.DataCenterID == "" {
// issues = append(issues, errors.New("Computing "+*comp.getName()+" without a Datacenter"))
// } else {
// // If doesn't exist in the list, means is new element to register as used
// dcslist[comp.DataCenterID] = true
// }
// for _, dcin := range comp.Inputs {
// switch returned_wf.GetResource(&dcin).getRtype() {
// case rtype.DATA:
// dataslist[dcin] = true
// }
// }
// for _, dcout := range comp.Outputs {
// switch returned_wf.GetResource(&dcout).getRtype() {
// case rtype.DATA:
// dataslist[dcout] = true
// }
// }
// }
// for _, storage_component := range returned_wf.Storage {
// if storage_component.Inputs == nil && storage_component.Outputs == nil {
// issues = append(issues, errors.New("Storage "+*storage_component.getName()+" without compatible inputs and outputs"))
// }
// }
// for dcID, dc_component := range returned_wf.Datacenter {
// // if rID doesn't exist in the list, it means that it's not used
// if _, ok := dcslist[dcID]; !ok {
// issues = append(issues, errors.New("DC "+*dc_component.getName()+" not attached to any Computing"))
// }
// }
// for dcID, data_component := range returned_wf.Data {
// // if rID doesn't exist in the list, it means that it's not used
// if _, ok := dataslist[dcID]; !ok {
// issues = append(issues, errors.New("Data "+*data_component.getName()+" not attached to any Computing"))
// }
// }
2023-03-03 14:43:11 +01:00
//////////////////////////////////////////////////////////
// //
// Starting from here, we check the type of resources //
// //
//////////////////////////////////////////////////////////
// FIXME: Avoid checking twice the same cases (cycles). Ex:
//
// Comp1 ----> Comp2
//
// In this case, we will check Comp1 outputs with Comp2
// inputs AND Comp2 inputs with Comp1 outputs, since we are
// iterating over all existent Computing models in the Graph
2024-03-22 10:00:35 +01:00
for _ , comp := range returned_wf . Computing {
2023-03-03 14:43:11 +01:00
compModel , err2 := comp . getModel ( )
if err = err2 ; err != nil {
return
}
currentCompModel := compModel . ( ComputingModel )
// Come computings may not allow inputs or outputs
if len ( currentCompModel . Dinputs ) == 0 && len ( comp . Inputs ) > 0 {
issues = append ( issues , errors . New ( "Computing " + compModel . getName ( ) + " must not have any input" ) )
continue
}
if len ( currentCompModel . Doutputs ) == 0 && len ( comp . Outputs ) > 0 {
issues = append ( issues , errors . New ( "Computing " + compModel . getName ( ) + " must not have any output" ) )
continue
}
//TODO: We should allow heterogenous inputs?
for _ , objIn := range comp . Inputs {
2024-03-22 10:00:35 +01:00
resIn := returned_wf . GetResource ( & objIn )
2023-03-03 14:43:11 +01:00
resInType := resIn . getRtype ( )
switch resInType {
case rtype . DATA :
dataModel , err2 := resIn . getModel ( )
if err = err2 ; err != nil {
return
}
myDataModel := dataModel . ( DataModel )
if _ , ok := FindInSlice ( currentCompModel . Dinputs , myDataModel . Dtype ) ; ! ok {
issues = append ( issues , errors . New ( "Computing " + compModel . getName ( ) + " can't handle inputs of type " + myDataModel . Dtype + " from Data " + dataModel . getName ( ) ) )
}
case rtype . COMPUTING :
inCompModel , err2 := resIn . getModel ( )
if err = err2 ; err != nil {
return
}
myInComputingModel := inCompModel . ( ComputingModel )
if _ , _ , ok := FindSliceInSlice ( myInComputingModel . Doutputs , currentCompModel . Dinputs ) ; ! ok {
issues = append ( issues , errors . New ( "Computing " + compModel . getName ( ) + " can't handle any input from " + inCompModel . getName ( ) ) )
}
case rtype . STORAGE :
// Storage can give use anything, so we always accept it's input for now
continue
default :
issues = append ( issues , errors . New ( "Computing " + currentCompModel . getName ( ) + " can't have any resource of type " + resInType . String ( ) + " (behaviour not defined)" ) )
}
}
//TODO: We should allow heterogenous outputs?
for _ , objOut := range comp . Outputs {
2024-03-22 10:00:35 +01:00
resOut := returned_wf . GetResource ( & objOut )
2023-03-03 14:43:11 +01:00
resOutType := resOut . getRtype ( )
switch resOutType {
case rtype . COMPUTING :
outCompModel , err2 := resOut . getModel ( )
if err = err2 ; err != nil {
return
}
myOutComputingModel := outCompModel . ( ComputingModel )
if _ , _ , ok := FindSliceInSlice ( currentCompModel . Doutputs , myOutComputingModel . Dinputs ) ; ! ok {
issues = append ( issues , errors . New ( "Computing " + compModel . getName ( ) + " doesn't have output data compatible with " + outCompModel . getName ( ) ) )
}
case rtype . STORAGE :
// Storage can save anything, so we always accept store it for now
continue
default :
issues = append ( issues , errors . New ( "Computing " + currentCompModel . getName ( ) + " can't have any resource of type " + resOutType . String ( ) + " (behaviour not defined)" ) )
}
}
}
return
}
2024-04-16 16:12:54 +02:00
func ( w * Workflow ) CreateLinks ( links [ ] MxLink , issues [ ] error ) [ ] error {
2024-04-15 11:42:17 +02:00
for _ , link := range links {
2024-04-16 16:12:54 +02:00
if ( len ( link . Source ) > 0 && len ( link . Target ) > 0 ) {
sourceObj := w . GetResource ( & link . Source )
targetObj := w . GetResource ( & link . Target )
link_object := NewLink ( sourceObj , link . Source , targetObj , link . Target )
w . AddLinkToWorkflow ( link_object , link . ID )
} else {
issues = append ( issues , w . processLinkErrors ( link ) )
}
}
return issues
}
2024-04-15 11:42:17 +02:00
2024-04-16 16:12:54 +02:00
func ( w * Workflow ) processLinkErrors ( link MxLink ) ( issue error ) {
if len ( link . Source ) == 0 && len ( link . Target ) == 0 {
issue = errors . New ( "Arrow " + link . ID + " is alone" )
} else if len ( link . Source ) == 0 {
targetObj := w . GetResource ( & link . Target )
issue = errors . New ( "Arrow (" + link . ID + ") to " + * targetObj . getName ( ) + " without parent" )
} else {
sourceObj := w . GetResource ( & link . Source )
issue = errors . New ( "Arrow " + link . ID + " from " + * sourceObj . getName ( ) + " without target" )
}
return issue
}
func ( w * Workflow ) CheckLinks ( issues [ ] error ) [ ] error {
// Check that storage components have a valid link
for id , storage := range w . Storage {
if ( ! w . IsComponentSrc ( id ) && ! w . IsComponentDst ( id ) ) {
issues = append ( issues , errors . New ( "Storage " + * storage . getName ( ) + " without compatible inputs and outputs" ) )
}
}
// Check that data components are linked to a computing component
for id , data := range w . Data {
if ( ! w . HasLinkageToComputing ( id ) ) {
issues = append ( issues , errors . New ( "Data " + * data . getName ( ) + " not attached to any Computing" ) )
}
}
// Check that DC is linked to a computing component
for id , dc := range w . Datacenter {
if ( ! w . HasLinkageToComputing ( id ) ) {
issues = append ( issues , errors . New ( "Datacenter " + * dc . getName ( ) + " not attached to any Computing" ) )
2024-04-15 11:42:17 +02:00
}
}
2024-04-16 16:12:54 +02:00
// Check that all data computing components are linked to a DC
for id , comp := range w . Computing {
if ( ! w . HasLinkageToDC ( id ) ) {
issues = append ( issues , errors . New ( "Computing " + * comp . getName ( ) + " not attached to any datacenter" ) )
}
}
2024-04-15 11:42:17 +02:00
return issues
}
2024-04-16 16:12:54 +02:00
func ( w * Workflow ) IsComponentSrc ( id string ) bool {
for _ , link := range w . Links {
if ( link . Source == id && link . Source != "" ) {
return true
}
}
return false
}
func ( w * Workflow ) IsComponentDst ( id string ) bool {
for _ , link := range w . Links {
if ( link . Destination == id && link . Source != "" ) {
return true
}
}
return false
}
func ( w * Workflow ) HasLinkageToComputing ( id string ) bool {
for idComputing , _ := range w . Computing {
if ( ( w . IsComponentSrc ( id ) && w . IsComponentDst ( idComputing ) ) || ( w . IsComponentSrc ( idComputing ) && w . IsComponentDst ( id ) ) ) {
return true
}
}
return false
}
func ( w * Workflow ) HasLinkageToDC ( id string ) bool {
for _ , link := range w . Links {
if ( link . Source == id && link . DCLink ) {
return true
}
}
return false
}
2023-03-03 14:43:11 +01:00
func sumExecutionReqs ( exeqReq ... ExecutionRequirementsModel ) ( ret ExecutionRequirementsModel ) {
for _ , v := range exeqReq {
ret . CPUs += v . CPUs
ret . GPUs += v . GPUs
ret . RAM += v . RAM
}
return
}
func CheckAndBookWorkflowSchedule ( username , workflowName string , book bool ) ( myRet [ ] DCstatus , err error ) {
userWorkspace := GetWorkspace ( username )
if userWorkspace == nil {
return nil , errors . New ( "No workspace for user " + username )
}
currentWorkflow , ok := userWorkspace . Workflows [ workflowName ]
if ! ok {
return nil , errors . New ( "No workflow " + workflowName + " for user " + username )
}
if currentWorkflow . Schedules . IsBooked {
return nil , errors . New ( "Can't operate DCs for a already booked schedule" )
}
// dd := ¤tWorkflow
// We can have multiple DCobjs pointing to the same DCmodel. We must sum all req of the same DCmodel
totalDCs := make ( map [ primitive . ObjectID ] ExecutionRequirementsModel )
for dcIDobj , dcObj := range currentWorkflow . Datacenter {
modelID := dcObj . getReference ( )
var totalsModel ExecutionRequirementsModel
totalsModel , err = currentWorkflow . GetExecutionRequirements ( dcIDobj )
if err != nil {
return
}
if _ , ok := totalDCs [ modelID ] ; ok {
totalDCs [ modelID ] = sumExecutionReqs ( totalDCs [ modelID ] , totalsModel )
} else {
totalDCs [ modelID ] = totalsModel
}
}
myRet = make ( [ ] DCstatus , len ( totalDCs ) )
var i int
i = - 1
for modelID , execReq := range totalDCs {
i ++
var dcModel * DatacenterModel
dcModel , err = GetOneDatacenter ( modelID . Hex ( ) )
if err != nil {
return
}
myRet [ i ] . DCname = dcModel . Name
myRet [ i ] . DCobjID = modelID . Hex ( )
// retrieve the host of the DC
host := GetHost ( dcModel . Hosts )
if host == nil {
myRet [ i ] . ErrorMessage = "Datacenter " + myRet [ i ] . DCname + " doesn't have a host property"
continue
}
cli := services . GetSelfAPI ( * host )
data , err := cli . ScheduleApi . ScheduleControllerCheckIfScheduleCanBeCreatedInThisDC ( context . Background ( ) ,
currentWorkflow . Schedules . Cron ,
int32 ( currentWorkflow . Schedules . Duration ) ,
currentWorkflow . Schedules . StartDate . Format ( time . RFC3339 ) ,
currentWorkflow . Schedules . StopDate . Format ( time . RFC3339 ) ,
swagger . ModelsExecutionRequirementsModel {
Cpus : int32 ( execReq . CPUs ) ,
Gpus : int32 ( execReq . GPUs ) ,
Ram : int32 ( execReq . RAM ) ,
} ,
)
if err != nil {
myRet [ i ] . ErrorMessage = err . Error ( )
swErr , ok := err . ( swagger . GenericSwaggerError )
if ok {
myRet [ i ] . IsReachable = true
myRet [ i ] . ErrorMessage += ": " + string ( swErr . Body ( ) )
}
continue
}
myRet [ i ] . IsReachable = true
if data . StatusCode == 200 {
myRet [ i ] . IsAvailable = true
}
}
// If we only check, we should exit here
if ! book {
return
}
for _ , v := range myRet {
if ! v . IsAvailable {
return
}
}
i = - 1
allBooked := true
for modelID , execReq := range totalDCs {
i ++
// _ = v
var dcModel * DatacenterModel
dcModel , err = GetOneDatacenter ( modelID . Hex ( ) )
if err != nil {
myRet [ i ] . ErrorMessage = err . Error ( )
continue
}
cli := services . GetSelfAPI ( * GetHost ( dcModel . Hosts ) ) // If we are here, we already check that host exists and is reachable
data , resp , err := cli . ScheduleApi . ScheduleControllerCreateSchedule ( context . Background ( ) ,
services . DC_NAME ,
workflowName ,
currentWorkflow . Schedules . Cron ,
int32 ( currentWorkflow . Schedules . Duration ) ,
currentWorkflow . Schedules . StartDate . Format ( time . RFC3339 ) ,
currentWorkflow . Schedules . StopDate . Format ( time . RFC3339 ) ,
swagger . ModelsExecutionRequirementsModel {
Cpus : int32 ( execReq . CPUs ) ,
Gpus : int32 ( execReq . GPUs ) ,
Ram : int32 ( execReq . RAM ) ,
} ,
)
if err != nil {
allBooked = false
myRet [ i ] . ErrorMessage = err . Error ( )
swErr , ok := err . ( swagger . GenericSwaggerError )
if ok {
myRet [ i ] . IsReachable = true
myRet [ i ] . ErrorMessage += ": " + string ( swErr . Body ( ) )
}
continue
}
if resp . StatusCode == 200 {
//FIXME: Maybe some better way of casting?
var nextExec [ 5 ] string
for counter := 0 ; counter < 5 ; counter ++ {
nextExec [ counter ] = data . NextExecutions [ counter ]
}
myRet [ i ] . Booked = & ScheduleInfo {
Total : int ( data . Total ) ,
NextExecutions : nextExec ,
}
}
}
// If some DC fail, we must not mark the workflow as booked
if ! allBooked {
return
}
currentWorkflow . Schedules . IsBooked = true
_ , err = services . MngoCollWorkspace . UpdateOne ( services . MngoCtx ,
primitive . M { "_id" : username } ,
primitive . M { "$set" : primitive . M {
WorkflowDB + "." +
workflowName + "." +
SchedulesDB : & currentWorkflow . Schedules } } ,
)
2024-03-28 14:43:33 +01:00
if err != nil {
2023-03-03 14:43:11 +01:00
logs . Critical ( "Internal error when updating in DB: " + err . Error ( ) )
}
return myRet , nil
}
2024-03-28 14:43:33 +01:00
// Not sure if this method handles error propagation well
2024-03-29 17:00:53 +01:00
func ( wf Workflow ) mxCellToComponent ( cell MxCell , ws Workspace ) ( resObj ResourceObject , err error , issues [ ] error ) {
2024-03-28 14:43:33 +01:00
rType := ws . getRtype ( * cell . RID )
if rType == rtype . INVALID {
return nil ,
errors . New ( "Refering to a rID that is not in the workflow" ) ,
nil
}
// Generate ObjectID for the reference ID
rIDObj , err := primitive . ObjectIDFromHex ( * cell . RID )
if err != nil {
return nil ,
errors . New ( "Bad ID format: " + * cell . RID ) ,
nil
}
resObj = wf . CreateResourceObject ( rType )
resObj . setReference ( rIDObj )
return
}
// func (ws Workspace) extractMxCell(xmlModel MxGraphModel){
// // Iterate through all objects of the MxGraph
// graphObjects := xmlModel.Root.MxObject
// for _, object := range(*graphObjects){
// current_obj_id, _ := strconv.Atoi(object.ID)
// inside_cell_id := strconv.Itoa(current_obj_id + 1)
// cell := ws.GetResource(&inside_cell_id)
// // component := w.GetResource(cell.RID)
// fmt.Print(cell)
// }
// // Extract the mxCell object
// // Invoke the addParameter method from the component
// // Edit the ID to get the object's one
// }