deprecated-oc-catalog/models/workflow.go

1157 lines
30 KiB
Go
Raw Permalink Normal View History

2023-03-03 14:43:11 +01:00
package models
import (
"context"
"encoding/xml"
"errors"
"net/url"
"os"
2023-03-03 14:43:11 +01:00
"sort"
"time"
2024-07-29 18:02:29 +02:00
"cloud.o-forge.io/core/deprecated-oc-catalog/models/rtype"
swagger "cloud.o-forge.io/core/deprecated-oc-catalog/selfapi"
"cloud.o-forge.io/core/deprecated-oc-catalog/services"
2023-03-03 14:43:11 +01:00
"github.com/beego/beego/v2/core/logs"
"github.com/vk496/cron"
"go.mongodb.org/mongo-driver/bson/primitive"
)
type LinkingState uint
// When we check the schedule of a workflow, we report with this
type DCstatus struct {
DCname string
DCobjID string //FIXME: Probably should be model ID
IsReachable bool
IsAvailable bool
Booked *ScheduleInfo
ErrorMessage string
}
const (
NO_LINK LinkingState = iota
INPUT
OUTPUT
)
func boolToLinkingState(boolState bool) LinkingState {
switch boolState {
case true:
return INPUT
case false:
return OUTPUT
default:
return NO_LINK
}
}
const SchedulesDB = "schedules"
type Workflow struct {
// The key of the map is the ID of the object itself
2024-04-25 15:48:32 +02:00
Data map[string]DataObject `json:"data"`
Computing map[string]ComputingObject `json:"computing"`
Storage map[string]StorageObject `json:"storage"`
Datacenter map[string]DatacenterObject `json:"datacenter"` //TODO: Decide if there should be multiple objects of a datacenter
Links map[string]Link `json:"link"`
2023-03-03 14:43:11 +01:00
Schedules WorkflowSchedule `json:"schedules"`
MxgraphXML string `description:"State of the mxgraph"`
}
// TODO : describe what use case this interface satisfies
2023-03-03 14:43:11 +01:00
type ResourceObject interface {
getHost() *string
getName() *string
getModel() (ResourceModel, error)
getRtype() rtype.Rtype
setReference(rObjID primitive.ObjectID)
getReference() primitive.ObjectID
isLinked(rObjID string) LinkingState
addLink(direction LinkingState, rObjID string)
}
// This type allows to process computing and storage component
// which can get input from the user
2024-04-25 15:48:32 +02:00
type EditableResourceObject interface {
ResourceObject
addUserInput(map[string]interface{})
}
2023-03-03 14:43:11 +01:00
// Get a sum of all execution requirements attached to a DC obj
func (wf Workflow) GetExecutionRequirements(dcIDobj string) (ret ExecutionRequirementsModel, err error) {
2023-03-03 14:43:11 +01:00
// Find the id of the DC obj
if _, ok := wf.Datacenter[dcIDobj]; !ok {
2023-03-03 14:43:11 +01:00
return ExecutionRequirementsModel{}, errors.New("DC obj" + dcIDobj + " doesn't exist in the Workflow")
}
// Get all elements that are attached to the DC
for _, computingObj := range wf.Computing {
2023-03-03 14:43:11 +01:00
if computingObj.DataCenterID == dcIDobj {
mymodel, err := computingObj.getModel()
if err != nil {
return ExecutionRequirementsModel{}, err
}
compModel := mymodel.(ComputingModel)
//TODO a generic way to concatenate execution requirements
ret.CPUs += compModel.ExecutionRequirements.CPUs
ret.GPUs += compModel.ExecutionRequirements.GPUs
ret.RAM += compModel.ExecutionRequirements.RAM
}
}
return
}
func (w *Workflow) GetResource(rObjID *string) (retObj ResourceObject) {
if rObjID == nil {
return nil
}
2024-04-25 15:48:32 +02:00
if datVal, ok := w.Data[*rObjID]; ok {
retObj = &datVal
2023-03-03 14:43:11 +01:00
return
}
2024-04-25 15:48:32 +02:00
if compVal, ok := w.Computing[*rObjID]; ok {
retObj = &compVal
2023-03-03 14:43:11 +01:00
return
}
if storVal, ok := w.Storage[*rObjID]; ok {
retObj = &storVal
return
}
2024-04-25 15:48:32 +02:00
if dcVal, ok := w.Datacenter[*rObjID]; ok {
retObj = &dcVal
2023-03-03 14:43:11 +01:00
return
}
return nil
}
func (w *Workflow) GetResourceMapByRtype(rt rtype.Rtype) interface{} {
switch rt {
case rtype.DATA:
return w.Data
case rtype.COMPUTING:
return w.Computing
case rtype.STORAGE:
return w.Storage
case rtype.DATACENTER:
return w.Datacenter
default:
return nil
}
}
func (w *Workflow) CreateResourceObject(rt rtype.Rtype) ResourceObject {
var res ResourceObject
switch rt {
case rtype.DATA:
res = &DataObject{}
case rtype.COMPUTING:
res = &ComputingObject{}
case rtype.STORAGE:
res = &StorageObject{}
case rtype.DATACENTER:
res = &DatacenterObject{}
default:
res = nil
}
2023-03-03 14:43:11 +01:00
return res
}
func (w *Workflow) AddObj(robj ResourceObject) *primitive.ObjectID {
outputID := primitive.NewObjectID()
w.UpdateObj(robj, outputID.Hex())
return &outputID
}
2024-04-25 15:48:32 +02:00
func (w *Workflow) AddLinkToWorkflow(link Link, id string) {
2024-04-15 11:42:17 +02:00
if w.Links == nil {
w.Links = make(map[string]Link)
}
w.Links[id] = link
}
2023-03-03 14:43:11 +01:00
func (w *Workflow) UpdateDB(userID, workflowName string) error {
_, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": userID},
primitive.M{"$set": primitive.M{WorkflowDB + "." + workflowName: w}},
)
return err
}
func (w *Workflow) UpdateObj(robj ResourceObject, objID string) {
switch robj.getRtype() {
case rtype.DATA:
var target DataObject
if w.Data == nil {
//init
w.Data = make(map[string]DataObject)
}
target = *robj.(*DataObject)
w.Data[objID] = target
case rtype.COMPUTING:
var target ComputingObject
if w.Computing == nil {
//init
w.Computing = make(map[string]ComputingObject)
}
target = *robj.(*ComputingObject)
w.Computing[objID] = target
case rtype.STORAGE:
var target StorageObject
if w.Storage == nil {
//init
w.Storage = make(map[string]StorageObject)
}
target = *robj.(*StorageObject)
w.Storage[objID] = target
case rtype.DATACENTER:
var target DatacenterObject
if w.Datacenter == nil {
//init
w.Datacenter = make(map[string]DatacenterObject)
}
target = *robj.(*DatacenterObject)
w.Datacenter[objID] = target
}
}
func GetWorkflow(userID, workflowName string) (workflow *Workflow, err error) {
userWorkspace := GetWorkspace(userID)
if userWorkspace != nil {
if theWorkflow, ok := userWorkspace.Workflows[workflowName]; ok {
return &theWorkflow, nil
}
logs.Debug("No workflow name")
return nil, errors.New("No workflow name")
}
logs.Debug("No workspace")
return nil, errors.New("No workspace")
}
func CreateWorkflow(userID, workflowName string) (err error) {
//TODO: Maybe operate directly in the DB instead retriving the full object?
userWorkspace := GetWorkspace(userID)
// Exist in the DB
if userWorkspace != nil {
if _, ok := userWorkspace.Workflows[workflowName]; ok {
message := "Workspace workflow " + workflowName +
" is already created for user " + userID
logs.Debug(message)
return errors.New(message)
}
userWP := &Workflow{}
// New element
addElem := primitive.M{WorkflowDB + "." + workflowName: userWP}
// If user doesn't have workflows, we must init at least one
if userWorkspace.Workflows == nil {
addElem = primitive.M{WorkflowDB: map[string]*Workflow{
workflowName: userWP,
}}
}
_, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": userID},
primitive.M{"$set": addElem},
)
if err != nil {
message := "Internal error when updating in DB"
logs.Debug(message + "; " + err.Error())
return errors.New(message)
}
return nil
}
return errors.New("Can't create a workflow without a workspace")
}
func CreateObjectInWorkflow(userID, workflowName, rID string) (rObjID2 *string, err error) {
userWorkspace := GetWorkspace(userID)
if userWorkspace == nil {
return nil, errors.New("No workspace for user " + userID)
}
if _, ok := userWorkspace.Workflows[workflowName]; !ok {
return nil, errors.New("Workspace workflow " + workflowName + " doesn't exist for user " + userID)
}
rIDObj, err := primitive.ObjectIDFromHex(rID)
if err != nil {
return nil, errors.New("ID " + rID + " is not valid")
}
//TODO: We are replacing the entire array instead of pushing
// a new element. Probably will have problems with multithread/async
// operations and consistency in the future
for rtyp, resource := range userWorkspace.GetResources() {
if contains(resource, rID) {
wWorkflow := userWorkspace.Workflows[workflowName]
newObj := wWorkflow.CreateResourceObject(rtyp)
newObj.setReference(rIDObj)
outputID := wWorkflow.AddObj(newObj)
_, err := services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": userID},
primitive.M{"$set": primitive.M{WorkflowDB + "." + workflowName + "." + rtyp.String(): wWorkflow.GetResourceMapByRtype(rtyp)}},
)
if err != nil {
return nil, errors.New("Internal error when updating in DB: " + err.Error())
}
outStr := outputID.Hex()
return &outStr, nil
}
}
return nil, errors.New("rID " + rID + " doesn't exist in the user workspace")
}
func LinkObjectsInWorkspace(userID, workflowName, rObjIDsource string, isInput bool, rObjIDtarger string) (err error) {
userWorkspace := GetWorkspace(userID)
if userWorkspace == nil {
return errors.New("No workspace for user " + userID)
}
if _, ok := userWorkspace.Workflows[workflowName]; !ok {
return errors.New("Workspace workflow " + workflowName + " doesn't exist for user " + userID)
}
// Check rObjIDsource
if _, ok := userWorkspace.Workflows[workflowName].Data[rObjIDsource]; !ok {
return errors.New("rObjIDsource must be of type DATA for now")
}
// Check rObjIDtarger
wWorkflow := userWorkspace.Workflows[workflowName]
resObjTarget := wWorkflow.GetResource(&rObjIDtarger)
if resObjTarget == nil {
return errors.New("rObjIDtarger doesn't exist")
} else if resObjTarget.getRtype() == rtype.DATA {
return errors.New("rObjIDtarger of type Data doesn't have inputs/outputs")
}
if resObjTarget.isLinked(rObjIDsource) != NO_LINK {
return errors.New("rObjIDsource already exists in the inputs or outputs")
}
resObjTarget.addLink(boolToLinkingState(isInput), rObjIDsource)
_, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": userID},
primitive.M{"$set": primitive.M{
WorkflowDB + "." +
workflowName + "." +
resObjTarget.getRtype().String() + "." +
rObjIDtarger: resObjTarget}},
)
if err != nil {
return errors.New("Internal error when updating in DB: " + err.Error())
}
return nil
}
func GetWorkflowSchedule(username, workflowName string) (Currentschedule *WorkflowSchedule, err error) {
userWorkspace := GetWorkspace(username)
if userWorkspace == nil {
return nil, errors.New("No workspace for user " + username)
}
if workflow, ok := userWorkspace.Workflows[workflowName]; ok {
Currentschedule = &workflow.Schedules
}
return
}
func SetWorkflowSchedule(username, workflowName, cronString, events string, isService bool, startDate, stopDate time.Time, duration uint) (NextSchedules *ScheduleInfo, err error) {
userWorkspace := GetWorkspace(username)
if userWorkspace == nil {
return nil, errors.New("No workspace for user " + username)
}
// Check if workflow exist
if _, ok := userWorkspace.Workflows[workflowName]; !ok {
return
}
// We mustn't modify a booked schedule
if userWorkspace.Workflows[workflowName].Schedules.IsBooked {
return nil, errors.New("A booked schedule can't be modified")
}
sch := WorkflowSchedule{}
NextSchedules = &ScheduleInfo{}
sch.StartDate = startDate
sch.StopDate = stopDate
sch.IsService = isService
if isService { // Service
NextSchedules.NextExecutions[0] = startDate.String()
NextSchedules.Total = 1
}
if !isService { // Task
sch.Cron = cronString
sch.Duration = duration
sch.Events = events
// Obtain next executions
counter := 0
myCron, _ := cron.Parse(cronString) // NOTE: already checked in the controller
scheduledStart := myCron.Next(startDate) // Get the first execution starting from startDate
for !scheduledStart.IsZero() && counter < MAX_SCHEDULES {
scheduleStop := scheduledStart.Add(time.Second * time.Duration(duration))
if scheduleStop.After(stopDate) || scheduledStart.Before(startDate) {
// If a task is longer than last possible date, we ignore it
scheduledStart = myCron.Next(scheduledStart)
counter++
continue
}
if counter < len(NextSchedules.NextExecutions) {
NextSchedules.NextExecutions[counter] = scheduledStart.String()
}
scheduledStart = myCron.Next(scheduledStart)
counter++
}
NextSchedules.Total = counter
if NextSchedules.Total == 0 {
return nil, errors.New("Current Task configuration will have 0 executions")
}
}
_, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": username},
primitive.M{"$set": primitive.M{
WorkflowDB + "." +
workflowName + "." +
SchedulesDB: &sch}},
)
if err != nil {
return nil, errors.New("Internal error when updating in DB: " + err.Error())
}
return
}
func GetMxGraph(username, workflowName string) (xmlData *string, err error) {
userWorkspace := GetWorkspace(username)
if userWorkspace == nil {
return nil, errors.New("No workspace for user " + username)
}
if _, ok := userWorkspace.Workflows[workflowName]; !ok {
return nil, errors.New("Workspace workflow " + workflowName + " doesn't exist for user " + username)
}
var data string = userWorkspace.Workflows[workflowName].MxgraphXML
if data == "" {
xmlData = nil
} else {
xmlData = &data
}
return
}
func ParseMxGraph(username, workflowName, xmlData string) (err error, mxissues []error) {
userWorkspace := GetWorkspace(username)
if userWorkspace == nil {
return errors.New("No workspace for user " + username), nil
}
currentWorkflow, ok := userWorkspace.Workflows[workflowName]
if !ok {
return errors.New("No workflow " + workflowName), nil
}
if currentWorkflow.Schedules.IsBooked {
2023-10-18 11:01:41 +02:00
//well, let's allow that
//return errors.New("Can't modify a booked workflow"), nil
2023-03-03 14:43:11 +01:00
}
decodedValue, err := url.QueryUnescape(xmlData)
if err != nil {
return err, nil
}
// TEMPORARY test the xml created
os.WriteFile("graph.xml", []byte(decodedValue), 0660)
2023-03-03 14:43:11 +01:00
var xmlModel MxGraphModel
// logs.Debug(xmlData)
err = xml.Unmarshal([]byte(xmlData), &xmlModel)
if err != nil {
return err, nil
}
xmlModel.createLinks()
2023-03-03 14:43:11 +01:00
targetWorkspaceWorkflow, err, mxissues := userWorkspace.ConsumeMxGraphModel(xmlModel)
if err != nil {
return err, nil
}
targetWorkspaceWorkflow.MxgraphXML = xmlData
targetWorkspaceWorkflow.Schedules = currentWorkflow.Schedules //TODO: Probably we should move schedules outside the workflow
2023-03-03 14:43:11 +01:00
_, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": username},
primitive.M{"$set": primitive.M{
WorkflowDB + "." +
workflowName: targetWorkspaceWorkflow}},
)
if err != nil {
return errors.New("Internal error when updating in DB: " + err.Error()), nil
}
return nil, mxissues
}
// FindInSlice takes a slice and looks for an element in it. If found it will
// return it's key, otherwise it will return -1 and a bool of false.
func FindInSlice(slice []string, val string) (int, bool) {
for i, item := range slice {
if item == val {
return i, true
}
}
return -1, false
}
// At least one element exist in both slices
// Return index1, index2 and if exist
func FindSliceInSlice(slice1 []string, slice2 []string) (int, int, bool) {
for i1, item1 := range slice1 {
for i2, item2 := range slice2 {
if item1 == item2 {
return i1, i2, true
}
}
}
return -1, -1, false
}
func (ws Workspace) ConsumeMxGraphModel(xmlmodel MxGraphModel) (returned_wf *Workflow, err error, issues []error) {
2023-03-03 14:43:11 +01:00
2024-03-22 10:00:35 +01:00
returned_wf = &Workflow{}
2023-03-03 14:43:11 +01:00
// When we will iterate over the full array of cells, we first will register the resources
// and after the linkage between them
sort.Slice(xmlmodel.Root.MxCell, func(i, j int) bool {
return xmlmodel.Root.MxCell[i].RID != nil
})
2024-04-25 15:48:32 +02:00
// Create the object and add it to the appropriate list
// for all the components with setting, which are identified
// by a MxObject tag in the xml
2024-04-25 15:48:32 +02:00
if ok := xmlmodel.Root.MxObject != nil; ok {
for _, object := range *xmlmodel.Root.MxObject {
resObj, err, mxissues := returned_wf.mxCellToComponent(object.MxCell, ws)
if err != nil {
issues = append(issues, mxissues...)
}
// add the component to the worflow's attribute that stores
// all components in a map[string]Component where the key
// is the component's ID in the mxGraph and the value the Component object
returned_wf.UpdateObj(resObj, object.ID)
// Construct the object corresponding to the componant's type and use its addUserInput method
if resObj.getRtype() == rtype.COMPUTING {
comp_obj := returned_wf.GetResource(&object.ID).(*ComputingObject)
comp_obj.AddUserInput(object.Settings)
returned_wf.UpdateObj(comp_obj, object.ID)
}
// if(resObj.getRtype() == rtype.DATA){
// }
}
}
2024-04-25 15:48:32 +02:00
2023-03-03 14:43:11 +01:00
for _, cell := range xmlmodel.Root.MxCell {
switch {
case cell.RID != nil:
2024-04-25 15:48:32 +02:00
resObj, err, mxissues := returned_wf.mxCellToComponent(cell, ws)
2023-03-03 14:43:11 +01:00
if err != nil {
issues = append(issues, mxissues...)
2023-03-03 14:43:11 +01:00
}
2024-03-22 10:00:35 +01:00
returned_wf.UpdateObj(resObj, cell.ID)
2023-03-03 14:43:11 +01:00
case cell.ID == "0" || cell.ID == "1":
// ID 0 and 1 are special cases of mxeditor
continue
// issues = append(issues, errors.New("MxCell with ID "+cell.ID+" doesn't have a valid link"))
default:
// Not root nor resource. Should be only links
sourceObj := returned_wf.GetResource(cell.Source)
targetObj := returned_wf.GetResource(cell.Target)
if sourceObj == nil || targetObj == nil {
if sourceObj == nil && targetObj == nil {
issues = append(issues, errors.New("Arrow "+cell.ID+" is alone"))
} else if sourceObj == nil {
issues = append(issues, errors.New("Arrow ("+cell.ID+") to "+*targetObj.getName()+" without parent"))
} else {
issues = append(issues, errors.New("Arrow "+cell.ID+" from "+*sourceObj.getName()+" without target"))
}
// If is a invalid link, we can't save it in the DB
continue
}
2023-03-03 14:43:11 +01:00
// Not root nor resource. Should be only links
2024-04-15 11:42:17 +02:00
// If is a invalid link, we can't save it in the DB
// We should always get a ID because we already registered resources and discarded which doesn't correspond to existent models
// save back
// If we have a relationship of:
// Source ----> Target
//
// The Source will be in the INPUTs of the Target.
// But we also must make sure that the Target will be in the OUTPUTs of the Source
2024-04-25 15:48:32 +02:00
2023-03-03 14:43:11 +01:00
}
}
2024-04-16 18:49:49 +02:00
issues = returned_wf.createLinks(xmlmodel.Root.MxLink, issues)
issues = returned_wf.checkLinks(issues)
2024-07-04 16:04:19 +02:00
// dcslist := make(map[string]bool)
// dataslist := make(map[string]bool)
// // datalist := make(map[string]bool)
// // Test wether the computing components are linked with a DC
// for _, comp := range returned_wf.Computing {
// if comp.DataCenterID == "" {
// issues = append(issues, errors.New("Computing "+*comp.getName()+" without a Datacenter"))
// } else {
// // If doesn't exist in the list, means is new element to register as used
// dcslist[comp.DataCenterID] = true
// }
// for _, dcin := range comp.Inputs {
// switch returned_wf.GetResource(&dcin).getRtype() {
// case rtype.DATA:
// dataslist[dcin] = true
// }
// }
// for _, dcout := range comp.Outputs {
// switch returned_wf.GetResource(&dcout).getRtype() {
// case rtype.DATA:
// dataslist[dcout] = true
// }
// }
// }
2024-07-04 16:04:19 +02:00
// for _, storage_component := range returned_wf.Storage {
// if storage_component.Inputs == nil && storage_component.Outputs == nil {
// issues = append(issues, errors.New("Storage "+*storage_component.getName()+" without compatible inputs and outputs"))
// }
// }
// for dcID, dc_component := range returned_wf.Datacenter {
// // if rID doesn't exist in the list, it means that it's not used
// if _, ok := dcslist[dcID]; !ok {
// issues = append(issues, errors.New("DC "+*dc_component.getName()+" not attached to any Computing"))
// }
// }
// for dcID, data_component := range returned_wf.Data {
// // if rID doesn't exist in the list, it means that it's not used
// if _, ok := dataslist[dcID]; !ok {
// issues = append(issues, errors.New("Data "+*data_component.getName()+" not attached to any Computing"))
// }
// }
2023-03-03 14:43:11 +01:00
//////////////////////////////////////////////////////////
// //
// Starting from here, we check the type of resources //
// //
//////////////////////////////////////////////////////////
// FIXME: Avoid checking twice the same cases (cycles). Ex:
//
// Comp1 ----> Comp2
//
// In this case, we will check Comp1 outputs with Comp2
// inputs AND Comp2 inputs with Comp1 outputs, since we are
// iterating over all existent Computing models in the Graph
2024-03-22 10:00:35 +01:00
for _, comp := range returned_wf.Computing {
2023-03-03 14:43:11 +01:00
compModel, err2 := comp.getModel()
if err = err2; err != nil {
return
}
currentCompModel := compModel.(ComputingModel)
// Come computings may not allow inputs or outputs
if len(currentCompModel.Dinputs) == 0 && len(comp.Inputs) > 0 {
issues = append(issues, errors.New("Computing "+compModel.getName()+" must not have any input"))
continue
}
if len(currentCompModel.Doutputs) == 0 && len(comp.Outputs) > 0 {
issues = append(issues, errors.New("Computing "+compModel.getName()+" must not have any output"))
continue
}
//TODO: We should allow heterogenous inputs?
for _, objIn := range comp.Inputs {
2024-03-22 10:00:35 +01:00
resIn := returned_wf.GetResource(&objIn)
2023-03-03 14:43:11 +01:00
resInType := resIn.getRtype()
switch resInType {
case rtype.DATA:
dataModel, err2 := resIn.getModel()
if err = err2; err != nil {
return
}
myDataModel := dataModel.(DataModel)
if _, ok := FindInSlice(currentCompModel.Dinputs, myDataModel.Dtype); !ok {
issues = append(issues, errors.New("Computing "+compModel.getName()+" can't handle inputs of type "+myDataModel.Dtype+" from Data "+dataModel.getName()))
}
case rtype.COMPUTING:
inCompModel, err2 := resIn.getModel()
if err = err2; err != nil {
return
}
myInComputingModel := inCompModel.(ComputingModel)
if _, _, ok := FindSliceInSlice(myInComputingModel.Doutputs, currentCompModel.Dinputs); !ok {
issues = append(issues, errors.New("Computing "+compModel.getName()+" can't handle any input from "+inCompModel.getName()))
}
case rtype.STORAGE:
// Storage can give use anything, so we always accept it's input for now
continue
default:
issues = append(issues, errors.New("Computing "+currentCompModel.getName()+" can't have any resource of type "+resInType.String()+" (behaviour not defined)"))
}
}
//TODO: We should allow heterogenous outputs?
for _, objOut := range comp.Outputs {
2024-03-22 10:00:35 +01:00
resOut := returned_wf.GetResource(&objOut)
2023-03-03 14:43:11 +01:00
resOutType := resOut.getRtype()
switch resOutType {
case rtype.COMPUTING:
outCompModel, err2 := resOut.getModel()
if err = err2; err != nil {
return
}
myOutComputingModel := outCompModel.(ComputingModel)
if _, _, ok := FindSliceInSlice(currentCompModel.Doutputs, myOutComputingModel.Dinputs); !ok {
issues = append(issues, errors.New("Computing "+compModel.getName()+" doesn't have output data compatible with "+outCompModel.getName()))
}
case rtype.STORAGE:
// Storage can save anything, so we always accept store it for now
continue
default:
issues = append(issues, errors.New("Computing "+currentCompModel.getName()+" can't have any resource of type "+resOutType.String()+" (behaviour not defined)"))
}
}
}
return
}
2024-04-16 18:49:49 +02:00
func (w *Workflow) createLinks(links []MxLink, issues []error) []error {
2024-04-15 11:42:17 +02:00
for _, link := range links {
if (len(link.Source) > 0 && len(link.Target) > 0){
sourceObj := w.GetResource(&link.Source)
targetObj := w.GetResource(&link.Target)
2024-04-25 15:48:32 +02:00
link_object := NewLink(sourceObj, link.Source, targetObj, link.Target)
w.AddLinkToWorkflow(link_object, link.ID)
} else {
issues = append(issues, w.processLinkErrors(link))
}
}
return issues
}
2024-04-15 11:42:17 +02:00
func (w *Workflow) processLinkErrors(link MxLink) (issue error) {
2024-04-25 15:48:32 +02:00
if len(link.Source) == 0 && len(link.Target) == 0 {
issue = errors.New("Arrow " + link.ID + " is alone")
} else if len(link.Source) == 0 {
targetObj := w.GetResource(&link.Target)
2024-04-25 15:48:32 +02:00
issue = errors.New("Arrow (" + link.ID + ") to " + *targetObj.getName() + " without parent")
} else {
sourceObj := w.GetResource(&link.Source)
2024-04-25 15:48:32 +02:00
issue = errors.New("Arrow " + link.ID + " from " + *sourceObj.getName() + " without target")
}
return issue
}
2024-04-16 18:49:49 +02:00
func (w *Workflow) checkLinks(issues []error) []error {
// Check that storage components have a valid link
for id, storage := range w.Storage {
2024-04-16 18:49:49 +02:00
if(!w.isComponentSrc(id) && !w.isComponentDst(id)){
issues = append(issues, errors.New("Storage "+*storage.getName()+" without compatible inputs and outputs"))
}
}
2024-04-25 15:48:32 +02:00
// Check that data components are linked to a computing component
for id, data := range w.Data {
2024-04-16 18:49:49 +02:00
if(!w.isLinkedToComputing(id)){
issues = append(issues, errors.New("Data "+*data.getName()+" not attached to any Computing"))
}
}
// Check that DC is linked to a computing component
for id, dc:= range w.Datacenter {
2024-04-16 18:49:49 +02:00
if(!w.isLinkedToComputing(id)){
issues = append(issues, errors.New("Datacenter "+*dc.getName()+" not attached to any Computing"))
2024-04-15 11:42:17 +02:00
}
}
2024-04-25 15:48:32 +02:00
// Check that all data computing components are linked to a DC
for id,comp:= range w.Computing {
2024-04-16 18:49:49 +02:00
if(!w.isLinkedToDC(id)){
issues = append(issues, errors.New("Computing "+*comp.getName()+" not attached to any datacenter"))
}
2024-04-25 15:48:32 +02:00
}
2024-04-15 11:42:17 +02:00
return issues
}
2024-04-16 18:49:49 +02:00
func (w *Workflow) isComponentSrc(id string) bool {
for _, link := range w.Links{
if(link.Source == id && link.Source != ""){
return true
}
}
return false
}
2024-04-16 18:49:49 +02:00
func (w *Workflow) isComponentDst(id string) bool {
for _, link := range w.Links{
if(link.Destination == id && link.Source != ""){
return true
}
}
return false
}
2024-04-16 18:49:49 +02:00
func (w *Workflow) isLinkedToComputing(id string) bool {
for idComputing, _ := range w.Computing {
2024-04-16 18:49:49 +02:00
if( (w.isComponentSrc(id) && w.isComponentDst(idComputing)) || (w.isComponentSrc(idComputing) && w.isComponentDst(id))){
return true
}
2024-04-25 15:48:32 +02:00
}
return false
}
2024-04-16 18:49:49 +02:00
func (w *Workflow) isLinkedToDC(id string) bool {
2024-04-25 15:48:32 +02:00
for _, link := range w.Links {
if link.Source == id && link.DCLink {
return true
}
}
return false
}
2023-03-03 14:43:11 +01:00
func sumExecutionReqs(exeqReq ...ExecutionRequirementsModel) (ret ExecutionRequirementsModel) {
for _, v := range exeqReq {
ret.CPUs += v.CPUs
ret.GPUs += v.GPUs
ret.RAM += v.RAM
}
return
}
func CheckAndBookWorkflowSchedule(username, workflowName string, book bool) (myRet []DCstatus, err error) {
userWorkspace := GetWorkspace(username)
if userWorkspace == nil {
return nil, errors.New("No workspace for user " + username)
}
currentWorkflow, ok := userWorkspace.Workflows[workflowName]
if !ok {
return nil, errors.New("No workflow " + workflowName + " for user " + username)
}
if currentWorkflow.Schedules.IsBooked {
return nil, errors.New("Can't operate DCs for a already booked schedule")
}
// dd := &currentWorkflow
// We can have multiple DCobjs pointing to the same DCmodel. We must sum all req of the same DCmodel
totalDCs := make(map[primitive.ObjectID]ExecutionRequirementsModel)
for dcIDobj, dcObj := range currentWorkflow.Datacenter {
modelID := dcObj.getReference()
var totalsModel ExecutionRequirementsModel
totalsModel, err = currentWorkflow.GetExecutionRequirements(dcIDobj)
if err != nil {
return
}
if _, ok := totalDCs[modelID]; ok {
totalDCs[modelID] = sumExecutionReqs(totalDCs[modelID], totalsModel)
} else {
totalDCs[modelID] = totalsModel
}
}
myRet = make([]DCstatus, len(totalDCs))
var i int
i = -1
for modelID, execReq := range totalDCs {
i++
var dcModel *DatacenterModel
dcModel, err = GetOneDatacenter(modelID.Hex())
if err != nil {
return
}
myRet[i].DCname = dcModel.Name
myRet[i].DCobjID = modelID.Hex()
// retrieve the host of the DC
host := GetHost(dcModel.Hosts)
if host == nil {
myRet[i].ErrorMessage = "Datacenter " + myRet[i].DCname + " doesn't have a host property"
continue
}
cli := services.GetSelfAPI(*host)
data, err := cli.ScheduleApi.ScheduleControllerCheckIfScheduleCanBeCreatedInThisDC(context.Background(),
currentWorkflow.Schedules.Cron,
int32(currentWorkflow.Schedules.Duration),
currentWorkflow.Schedules.StartDate.Format(time.RFC3339),
currentWorkflow.Schedules.StopDate.Format(time.RFC3339),
swagger.ModelsExecutionRequirementsModel{
Cpus: int32(execReq.CPUs),
Gpus: int32(execReq.GPUs),
Ram: int32(execReq.RAM),
},
)
if err != nil {
myRet[i].ErrorMessage = err.Error()
swErr, ok := err.(swagger.GenericSwaggerError)
if ok {
myRet[i].IsReachable = true
myRet[i].ErrorMessage += ": " + string(swErr.Body())
}
continue
}
myRet[i].IsReachable = true
if data.StatusCode == 200 {
myRet[i].IsAvailable = true
}
}
// If we only check, we should exit here
if !book {
return
}
for _, v := range myRet {
if !v.IsAvailable {
return
}
}
i = -1
allBooked := true
for modelID, execReq := range totalDCs {
i++
// _ = v
var dcModel *DatacenterModel
dcModel, err = GetOneDatacenter(modelID.Hex())
if err != nil {
myRet[i].ErrorMessage = err.Error()
continue
}
cli := services.GetSelfAPI(*GetHost(dcModel.Hosts)) // If we are here, we already check that host exists and is reachable
data, resp, err := cli.ScheduleApi.ScheduleControllerCreateSchedule(context.Background(),
services.DC_NAME,
workflowName,
currentWorkflow.Schedules.Cron,
int32(currentWorkflow.Schedules.Duration),
currentWorkflow.Schedules.StartDate.Format(time.RFC3339),
currentWorkflow.Schedules.StopDate.Format(time.RFC3339),
swagger.ModelsExecutionRequirementsModel{
Cpus: int32(execReq.CPUs),
Gpus: int32(execReq.GPUs),
Ram: int32(execReq.RAM),
},
)
if err != nil {
allBooked = false
myRet[i].ErrorMessage = err.Error()
swErr, ok := err.(swagger.GenericSwaggerError)
if ok {
myRet[i].IsReachable = true
myRet[i].ErrorMessage += ": " + string(swErr.Body())
}
continue
}
if resp.StatusCode == 200 {
//FIXME: Maybe some better way of casting?
var nextExec [5]string
for counter := 0; counter < 5; counter++ {
nextExec[counter] = data.NextExecutions[counter]
}
myRet[i].Booked = &ScheduleInfo{
Total: int(data.Total),
NextExecutions: nextExec,
}
}
}
// If some DC fail, we must not mark the workflow as booked
if !allBooked {
return
}
currentWorkflow.Schedules.IsBooked = true
_, err = services.MngoCollWorkspace.UpdateOne(services.MngoCtx,
primitive.M{"_id": username},
primitive.M{"$set": primitive.M{
WorkflowDB + "." +
workflowName + "." +
SchedulesDB: &currentWorkflow.Schedules}},
)
2024-04-25 15:48:32 +02:00
if err != nil {
2023-03-03 14:43:11 +01:00
logs.Critical("Internal error when updating in DB: " + err.Error())
}
return myRet, nil
}
2024-04-25 15:48:32 +02:00
// Not sure if this method handles error propagation well
func (wf Workflow) mxCellToComponent(cell MxCell, ws Workspace) (resObj ResourceObject, err error, issues []error) {
rType := ws.getRtype(*cell.RID)
if rType == rtype.INVALID {
return nil,
errors.New("Refering to a rID that is not in the workflow"),
nil
}
// Generate ObjectID for the reference ID
rIDObj, err := primitive.ObjectIDFromHex(*cell.RID)
if err != nil {
return nil,
errors.New("Bad ID format: " + *cell.RID),
nil
}
resObj = wf.CreateResourceObject(rType)
resObj.setReference(rIDObj)
2024-04-25 15:48:32 +02:00
return
}