bugfix empty workflow

This commit is contained in:
pb 2024-04-25 15:48:32 +02:00
parent 9bd5be7758
commit a1f04dc2f0
2 changed files with 94 additions and 140 deletions

View File

@ -52,11 +52,11 @@ const SchedulesDB = "schedules"
type Workflow struct { type Workflow struct {
// The key of the map is the ID of the object itself // The key of the map is the ID of the object itself
Data map[string]DataObject `json:"data"` Data map[string]DataObject `json:"data"`
Computing map[string]ComputingObject `json:"computing"` Computing map[string]ComputingObject `json:"computing"`
Storage map[string]StorageObject `json:"storage"` Storage map[string]StorageObject `json:"storage"`
Datacenter map[string]DatacenterObject `json:"datacenter"` //TODO: Decide if there should be multiple objects of a datacenter Datacenter map[string]DatacenterObject `json:"datacenter"` //TODO: Decide if there should be multiple objects of a datacenter
Links map[string]Link `json:"link"` Links map[string]Link `json:"link"`
Schedules WorkflowSchedule `json:"schedules"` Schedules WorkflowSchedule `json:"schedules"`
@ -78,7 +78,7 @@ type ResourceObject interface {
// This type allows to process computing and storage component // This type allows to process computing and storage component
// which can get input from the user // which can get input from the user
type EditableResourceObject interface{ type EditableResourceObject interface {
ResourceObject ResourceObject
addUserInput(map[string]interface{}) addUserInput(map[string]interface{})
} }
@ -119,13 +119,13 @@ func (w *Workflow) GetResource(rObjID *string) (retObj ResourceObject) {
return nil return nil
} }
if storVal, ok := w.Data[*rObjID]; ok { if datVal, ok := w.Data[*rObjID]; ok {
retObj = &storVal retObj = &datVal
return return
} }
if storVal, ok := w.Computing[*rObjID]; ok { if compVal, ok := w.Computing[*rObjID]; ok {
retObj = &storVal retObj = &compVal
return return
} }
@ -134,8 +134,8 @@ func (w *Workflow) GetResource(rObjID *string) (retObj ResourceObject) {
return return
} }
if storVal, ok := w.Datacenter[*rObjID]; ok { if dcVal, ok := w.Datacenter[*rObjID]; ok {
retObj = &storVal retObj = &dcVal
return return
} }
@ -182,7 +182,7 @@ func (w *Workflow) AddObj(robj ResourceObject) *primitive.ObjectID {
return &outputID return &outputID
} }
func (w *Workflow) AddLinkToWorkflow (link Link, id string){ func (w *Workflow) AddLinkToWorkflow(link Link, id string) {
if w.Links == nil { if w.Links == nil {
w.Links = make(map[string]Link) w.Links = make(map[string]Link)
} }
@ -547,7 +547,6 @@ func ParseMxGraph(username, workflowName, xmlData string) (err error, mxissues [
} }
xmlModel.createLinks() xmlModel.createLinks()
targetWorkspaceWorkflow, err, mxissues := userWorkspace.ConsumeMxGraphModel(xmlModel) targetWorkspaceWorkflow, err, mxissues := userWorkspace.ConsumeMxGraphModel(xmlModel)
if err != nil { if err != nil {
@ -605,37 +604,38 @@ func (ws Workspace) ConsumeMxGraphModel(xmlmodel MxGraphModel) (returned_wf *Wor
return xmlmodel.Root.MxCell[i].RID != nil return xmlmodel.Root.MxCell[i].RID != nil
}) })
// Create the object and add it to the appropriate list // Create the object and add it to the appropriate list
// for all the components with setting, which are identified // for all the components with setting, which are identified
// by a MxObject tag in the xml // by a MxObject tag in the xml
for _, object := range *xmlmodel.Root.MxObject{ if ok := xmlmodel.Root.MxObject != nil; ok {
for _, object := range *xmlmodel.Root.MxObject {
resObj, err, mxissues := returned_wf.mxCellToComponent(object.MxCell,ws)
if err != nil {
issues = append(issues, mxissues...)
}
// add the component to the worflow's attribute that stores
// all components in a map[string]Component where the key
// is the component's ID in the mxGraph and the value the Component object
returned_wf.UpdateObj(resObj,object.ID)
// Construct the object corresponding to the componant's type and use its addUserInput method
if(resObj.getRtype() == rtype.COMPUTING){
comp_obj := returned_wf.GetResource(&object.ID).(*ComputingObject)
comp_obj.AddUserInput(object.Settings)
returned_wf.UpdateObj(comp_obj,object.ID)
}
// if(resObj.getRtype() == rtype.DATA){
// }
}
resObj, err, mxissues := returned_wf.mxCellToComponent(object.MxCell, ws)
if err != nil {
issues = append(issues, mxissues...)
}
// add the component to the worflow's attribute that stores
// all components in a map[string]Component where the key
// is the component's ID in the mxGraph and the value the Component object
returned_wf.UpdateObj(resObj, object.ID)
// Construct the object corresponding to the componant's type and use its addUserInput method
if resObj.getRtype() == rtype.COMPUTING {
comp_obj := returned_wf.GetResource(&object.ID).(*ComputingObject)
comp_obj.AddUserInput(object.Settings)
returned_wf.UpdateObj(comp_obj, object.ID)
}
// if(resObj.getRtype() == rtype.DATA){
// }
}
}
for _, cell := range xmlmodel.Root.MxCell { for _, cell := range xmlmodel.Root.MxCell {
switch { switch {
case cell.RID != nil: case cell.RID != nil:
resObj, err, mxissues := returned_wf.mxCellToComponent(cell,ws) resObj, err, mxissues := returned_wf.mxCellToComponent(cell, ws)
if err != nil { if err != nil {
issues = append(issues, mxissues...) issues = append(issues, mxissues...)
} }
@ -674,63 +674,13 @@ func (ws Workspace) ConsumeMxGraphModel(xmlmodel MxGraphModel) (returned_wf *Wor
// //
// The Source will be in the INPUTs of the Target. // The Source will be in the INPUTs of the Target.
// But we also must make sure that the Target will be in the OUTPUTs of the Source // But we also must make sure that the Target will be in the OUTPUTs of the Source
} }
} }
issues = returned_wf.CreateLinks(xmlmodel.Root.MxLink, issues) issues = returned_wf.CreateLinks(xmlmodel.Root.MxLink, issues)
issues = returned_wf.CheckLinks(issues) issues = returned_wf.CheckLinks(issues)
// dcslist := make(map[string]bool)
// dataslist := make(map[string]bool)
// // datalist := make(map[string]bool)
// // Test wether the computing components are linked with a DC
// for _, comp := range returned_wf.Computing {
// if comp.DataCenterID == "" {
// issues = append(issues, errors.New("Computing "+*comp.getName()+" without a Datacenter"))
// } else {
// // If doesn't exist in the list, means is new element to register as used
// dcslist[comp.DataCenterID] = true
// }
// for _, dcin := range comp.Inputs {
// switch returned_wf.GetResource(&dcin).getRtype() {
// case rtype.DATA:
// dataslist[dcin] = true
// }
// }
// for _, dcout := range comp.Outputs {
// switch returned_wf.GetResource(&dcout).getRtype() {
// case rtype.DATA:
// dataslist[dcout] = true
// }
// }
// }
// for _, storage_component := range returned_wf.Storage {
// if storage_component.Inputs == nil && storage_component.Outputs == nil {
// issues = append(issues, errors.New("Storage "+*storage_component.getName()+" without compatible inputs and outputs"))
// }
// }
// for dcID, dc_component := range returned_wf.Datacenter {
// // if rID doesn't exist in the list, it means that it's not used
// if _, ok := dcslist[dcID]; !ok {
// issues = append(issues, errors.New("DC "+*dc_component.getName()+" not attached to any Computing"))
// }
// }
// for dcID, data_component := range returned_wf.Data {
// // if rID doesn't exist in the list, it means that it's not used
// if _, ok := dataslist[dcID]; !ok {
// issues = append(issues, errors.New("Data "+*data_component.getName()+" not attached to any Computing"))
// }
// }
////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////
// // // //
@ -834,13 +784,13 @@ func (ws Workspace) ConsumeMxGraphModel(xmlmodel MxGraphModel) (returned_wf *Wor
} }
func (w *Workflow) CreateLinks(links []MxLink, issues []error) []error { func (w *Workflow) CreateLinks(links []MxLink, issues []error) []error {
for _, link := range links { for _, link := range links {
if (len(link.Source) > 0 && len(link.Target) > 0){ if len(link.Source) > 0 && len(link.Target) > 0 {
sourceObj := w.GetResource(&link.Source) sourceObj := w.GetResource(&link.Source)
targetObj := w.GetResource(&link.Target) targetObj := w.GetResource(&link.Target)
link_object := NewLink(sourceObj,link.Source, targetObj, link.Target) link_object := NewLink(sourceObj, link.Source, targetObj, link.Target)
w.AddLinkToWorkflow(link_object,link.ID) w.AddLinkToWorkflow(link_object, link.ID)
} else { } else {
issues = append(issues, w.processLinkErrors(link)) issues = append(issues, w.processLinkErrors(link))
} }
@ -849,59 +799,58 @@ func (w *Workflow) CreateLinks(links []MxLink, issues []error) []error {
} }
func (w *Workflow) processLinkErrors(link MxLink) (issue error) { func (w *Workflow) processLinkErrors(link MxLink) (issue error) {
if len(link.Source) == 0 && len(link.Target) == 0 { if len(link.Source) == 0 && len(link.Target) == 0 {
issue = errors.New("Arrow "+link.ID+" is alone") issue = errors.New("Arrow " + link.ID + " is alone")
} else if len(link.Source) == 0{ } else if len(link.Source) == 0 {
targetObj := w.GetResource(&link.Target) targetObj := w.GetResource(&link.Target)
issue = errors.New("Arrow ("+link.ID+") to "+*targetObj.getName()+" without parent") issue = errors.New("Arrow (" + link.ID + ") to " + *targetObj.getName() + " without parent")
} else { } else {
sourceObj := w.GetResource(&link.Source) sourceObj := w.GetResource(&link.Source)
issue = errors.New("Arrow "+link.ID+" from "+*sourceObj.getName()+" without target") issue = errors.New("Arrow " + link.ID + " from " + *sourceObj.getName() + " without target")
} }
return issue return issue
} }
func (w *Workflow) CheckLinks(issues []error) []error { func (w *Workflow) CheckLinks(issues []error) []error {
// Check that storage components have a valid link // Check that storage components have a valid link
for id, storage := range w.Storage { for id, storage := range w.Storage {
if(!w.IsComponentSrc(id) && !w.IsComponentDst(id)){ if !w.IsComponentSrc(id) && !w.IsComponentDst(id) {
issues = append(issues, errors.New("Storage "+*storage.getName()+" without compatible inputs and outputs")) issues = append(issues, errors.New("Storage "+*storage.getName()+" without compatible inputs and outputs"))
} }
} }
// Check that data components are linked to a computing component // Check that data components are linked to a computing component
for id, data := range w.Data { for id, data := range w.Data {
if(!w.HasLinkageToComputing(id)){ if !w.HasLinkageToComputing(id) {
issues = append(issues, errors.New("Data "+*data.getName()+" not attached to any Computing")) issues = append(issues, errors.New("Data "+*data.getName()+" not attached to any Computing"))
} }
} }
// Check that DC is linked to a computing component // Check that DC is linked to a computing component
for id, dc:= range w.Datacenter { for id, dc := range w.Datacenter {
if(!w.HasLinkageToComputing(id)){ if !w.HasLinkageToComputing(id) {
issues = append(issues, errors.New("Datacenter "+*dc.getName()+" not attached to any Computing")) issues = append(issues, errors.New("Datacenter "+*dc.getName()+" not attached to any Computing"))
} }
} }
// Check that all data computing components are linked to a DC // Check that all data computing components are linked to a DC
for id,comp:= range w.Computing { for id, comp := range w.Computing {
if(!w.HasLinkageToDC(id)){ if !w.HasLinkageToDC(id) {
issues = append(issues, errors.New("Computing "+*comp.getName()+" not attached to any datacenter")) issues = append(issues, errors.New("Computing "+*comp.getName()+" not attached to any datacenter"))
} }
} }
return issues return issues
} }
func (w *Workflow) IsComponentSrc(id string) bool { func (w *Workflow) IsComponentSrc(id string) bool {
for _, link := range w.Links{ for _, link := range w.Links {
if(link.Source == id && link.Source != ""){ if link.Source == id && link.Source != "" {
return true return true
} }
} }
@ -910,9 +859,9 @@ func (w *Workflow) IsComponentSrc(id string) bool {
} }
func (w *Workflow) IsComponentDst(id string) bool { func (w *Workflow) IsComponentDst(id string) bool {
for _, link := range w.Links{ for _, link := range w.Links {
if(link.Destination == id && link.Source != ""){ if link.Destination == id && link.Source != "" {
return true return true
} }
} }
@ -923,10 +872,10 @@ func (w *Workflow) IsComponentDst(id string) bool {
func (w *Workflow) HasLinkageToComputing(id string) bool { func (w *Workflow) HasLinkageToComputing(id string) bool {
for idComputing, _ := range w.Computing { for idComputing, _ := range w.Computing {
if( (w.IsComponentSrc(id) && w.IsComponentDst(idComputing)) || (w.IsComponentSrc(idComputing) && w.IsComponentDst(id))){ if (w.IsComponentSrc(id) && w.IsComponentDst(idComputing)) || (w.IsComponentSrc(idComputing) && w.IsComponentDst(id)) {
return true return true
} }
} }
return false return false
@ -934,8 +883,8 @@ func (w *Workflow) HasLinkageToComputing(id string) bool {
func (w *Workflow) HasLinkageToDC(id string) bool { func (w *Workflow) HasLinkageToDC(id string) bool {
for _, link := range w.Links{ for _, link := range w.Links {
if(link.Source == id && link.DCLink){ if link.Source == id && link.DCLink {
return true return true
} }
} }
@ -1124,15 +1073,15 @@ func CheckAndBookWorkflowSchedule(username, workflowName string, book bool) (myR
SchedulesDB: &currentWorkflow.Schedules}}, SchedulesDB: &currentWorkflow.Schedules}},
) )
if err != nil { if err != nil {
logs.Critical("Internal error when updating in DB: " + err.Error()) logs.Critical("Internal error when updating in DB: " + err.Error())
} }
return myRet, nil return myRet, nil
} }
// Not sure if this method handles error propagation well // Not sure if this method handles error propagation well
func (wf Workflow) mxCellToComponent(cell MxCell, ws Workspace) (resObj ResourceObject,err error, issues []error){ func (wf Workflow) mxCellToComponent(cell MxCell, ws Workspace) (resObj ResourceObject, err error, issues []error) {
rType := ws.getRtype(*cell.RID) rType := ws.getRtype(*cell.RID)
if rType == rtype.INVALID { if rType == rtype.INVALID {
@ -1151,7 +1100,7 @@ func (wf Workflow) mxCellToComponent(cell MxCell, ws Workspace) (resObj Resource
resObj = wf.CreateResourceObject(rType) resObj = wf.CreateResourceObject(rType)
resObj.setReference(rIDObj) resObj.setReference(rIDObj)
return return
} }

View File

@ -236,6 +236,27 @@
}, },
"inputs": [], "inputs": [],
"outputs": [] "outputs": []
},
{
"name": "Mosquito server",
"short_description": "open source message broker that implements the MQTT protocol versions 5.0, 3.1.1 and 3.1.",
"logo": "./local_imgs/mosquitto-logo.png",
"description": "A very long description of what this storage is",
"type": "computing",
"owner": "IRT",
"price": 300,
"license": "GPLv2",
"execution_requirements": {
"cpus": 1,
"ram": 1024,
"storage": 300,
"gpus": 1,
"disk_io": "30 MB/s",
"parallel": true,
"scaling_model": 2
},
"inputs": [],
"outputs": []
} }
] ]
}, },
@ -273,22 +294,6 @@
"inputs": [], "inputs": [],
"outputs": [], "outputs": [],
"URL" : "" "URL" : ""
},
{
"name": "Mosquito server",
"short_description": "open source message broker that implements the MQTT protocol versions 5.0, 3.1.1 and 3.1.",
"logo": "./local_imgs/mosquitto-logo.png",
"description": "A very long description of what this storage is",
"type": "storage",
"DCacronym": "DC_myDC",
"size": 40000,
"encryption": false,
"redundancy": "RAID5S",
"throughput": "r:300,w:350",
"bookingPrice": 90,
"inputs": [],
"outputs": [],
"URL" : ""
} }
] ]
}, },