Changed component's retrieving from parsing XML to using the stored info in DB
This commit is contained in:
parent
23ee20a59f
commit
86062f3d88
16
README.md
16
README.md
@ -1,2 +1,18 @@
|
||||
# oc-scheduler
|
||||
|
||||
OC-Scheduler retrieves the content of submitted workflows and prepare them to be executed.
|
||||
|
||||
## Parsing
|
||||
|
||||
From a workflow's name we retrieve the xml graph associated and parse it in order to create the object representing each componant.
|
||||
Each object is linked to another, represented by a links object with the two object IDs has attributes.
|
||||
|
||||
TODO :
|
||||
- [x] Retrieve the user input's for each component.
|
||||
|
||||
## Organising
|
||||
|
||||
TODO :
|
||||
- [ ] create an argo file from the graph/worfklow
|
||||
- [ ] Create a different entry for each component
|
||||
- [ ] execute each element in the right order
|
86
componentParser.go
Normal file
86
componentParser.go
Normal file
@ -0,0 +1,86 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"cloud.o-forge.io/core/oc-catalog/models"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
)
|
||||
|
||||
// This module allows us to :
|
||||
// - Reduce the size of graph.go
|
||||
// - apply specific object construction logic depending on the component's type
|
||||
|
||||
// So far the unpacking of json/map data into the models has the caveat of not applying to the nested struct
|
||||
|
||||
func ConstructComputingObject(id string, component_info map[string]interface{}, user_inputs map[string]interface{}) (computing models.ComputingModel) {
|
||||
|
||||
computing.ID = id
|
||||
err := mapstructure.Decode(component_info,&computing.ComputingNEWModel)
|
||||
if err != nil {
|
||||
fmt.Println("Computing : Error unpacking json into objects info")
|
||||
log.Err(err)
|
||||
}
|
||||
|
||||
err = mapstructure.Decode(user_inputs,&computing.ComputingNEWModel)
|
||||
if err != nil {
|
||||
fmt.Println("Computing : Error unpacking data input into comp object")
|
||||
log.Err(err).Msg(err.Error())
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func ConstructDataObject(id string, component_info map[string]interface{}, user_inputs map[string]interface{}) (data models.DataModel) {
|
||||
|
||||
data.ID = id
|
||||
err := mapstructure.Decode(component_info,&data.DataNEWModel)
|
||||
if err != nil {
|
||||
fmt.Println("Data: Error unpacking json into objects info")
|
||||
log.Err(err)
|
||||
}
|
||||
|
||||
err = mapstructure.Decode(user_inputs,&data.DataNEWModel)
|
||||
if err != nil {
|
||||
fmt.Println("Data: Error unpacking data input into comp object")
|
||||
log.Err(err).Msg(err.Error())
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func ConstructDatacenterObject(id string, component_info map[string]interface{}, user_inputs map[string]interface{}) (datacenter models.DatacenterModel) {
|
||||
|
||||
datacenter.ID = id
|
||||
err := mapstructure.Decode(component_info,&datacenter.DatacenterNEWModel)
|
||||
if err != nil {
|
||||
fmt.Println("Datacenter: Error unpacking json into objects info")
|
||||
log.Err(err)
|
||||
}
|
||||
|
||||
err = mapstructure.Decode(user_inputs,&datacenter.DatacenterNEWModel)
|
||||
if err != nil {
|
||||
fmt.Println("Datacenter: Error unpacking data input into comp object")
|
||||
log.Err(err).Msg(err.Error())
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func ConstructStorageObject(id string, component_info map[string]interface{}, user_inputs map[string]interface{}) (storage models.StorageModel) {
|
||||
|
||||
storage.ID = id
|
||||
err := mapstructure.Decode(component_info,&storage.StorageNEWModel)
|
||||
if err != nil {
|
||||
fmt.Println("Storage: Error unpacking json into objects info")
|
||||
log.Err(err)
|
||||
}
|
||||
|
||||
err = mapstructure.Decode(user_inputs,&storage.StorageNEWModel)
|
||||
if err != nil {
|
||||
fmt.Println("Storage: Error unpacking data input into comp object")
|
||||
log.Err(err).Msg(err.Error())
|
||||
}
|
||||
|
||||
return
|
||||
}
|
205
graph.go
205
graph.go
@ -8,7 +8,8 @@ import (
|
||||
"strings"
|
||||
|
||||
"cloud.o-forge.io/core/oc-catalog/models"
|
||||
"github.com/mitchellh/mapstructure"
|
||||
|
||||
"github.com/beego/beego/v2/core/logs"
|
||||
"github.com/sbabiv/xml2map"
|
||||
"github.com/tidwall/gjson"
|
||||
)
|
||||
@ -83,46 +84,47 @@ func (g *Graph) LoadFrom(workspace string) error {
|
||||
g.GetWorkflowComponents(workspace)
|
||||
|
||||
|
||||
for _, element := range cells {
|
||||
id := element["@id"].(string)
|
||||
// Case MXCell
|
||||
if _, ok := element["@style"]; ok {
|
||||
if _, ok2 := element["@rID"]; ok2 {
|
||||
// Resolve elements
|
||||
// fmt.Print(id + ": ")
|
||||
// fmt.Println(element["@rID"], element["@rType"])
|
||||
// fmt.Println(element)
|
||||
dictionnary[id] = element["@rID"].(string)
|
||||
g.addElementByType(element)
|
||||
}
|
||||
} else {
|
||||
// Case object : contains user's input through the GUI
|
||||
if _, ok := element["mxCell"]; ok {
|
||||
// Attribute values
|
||||
// for _, element := range cells {
|
||||
// id := element["@id"].(string)
|
||||
// // Case MXCell
|
||||
// if _, ok := element["@style"]; ok {
|
||||
// if _, ok2 := element["@rID"]; ok2 {
|
||||
// // Resolve elements
|
||||
// // fmt.Print(id + ": ")
|
||||
// // fmt.Println(element["@rID"], element["@rType"])
|
||||
// // fmt.Println(element)
|
||||
// dictionnary[id] = element["@rID"].(string)
|
||||
// g.addElementByType(element)
|
||||
// }
|
||||
// } else {
|
||||
// // Case object : contains user's input through the GUI
|
||||
// if _, ok := element["mxCell"]; ok {
|
||||
// // Attribute values
|
||||
|
||||
// Extracts the cell ids
|
||||
element = element["mxCell"].(map[string]interface{})
|
||||
if _, ok := element["@style"]; ok {
|
||||
if _, ok2 := element["@rID"]; ok2 {
|
||||
// Resolve elements
|
||||
// fmt.Print(id + ": ")
|
||||
// fmt.Println(element["@rID"], element["@rType"])
|
||||
// fmt.Println(element)
|
||||
dictionnary[id] = element["@rID"].(string)
|
||||
g.addElementByType(element)
|
||||
}
|
||||
}
|
||||
}
|
||||
// // Extracts the cell ids
|
||||
// element = element["mxCell"].(map[string]interface{})
|
||||
// if _, ok := element["@style"]; ok {
|
||||
// if _, ok2 := element["@rID"]; ok2 {
|
||||
// // Resolve elements
|
||||
// // fmt.Print(id + ": ")
|
||||
// // fmt.Println(element["@rID"], element["@rType"])
|
||||
// // fmt.Println(element)
|
||||
// dictionnary[id] = element["@rID"].(string)
|
||||
// g.addElementByType(element)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
}
|
||||
// register links
|
||||
if src, ok := element["@source"]; ok {
|
||||
//src = element["@source"].(string)
|
||||
idlinks = append(idlinks, Link{Src: src.(string), Dst: element["@target"].(string)})
|
||||
// fmt.Println("Link: " + src.(string) + " " + element["@target"].(string))
|
||||
}
|
||||
// }
|
||||
// // register links
|
||||
// if src, ok := element["@source"]; ok {
|
||||
// //src = element["@source"].(string)
|
||||
// idlinks = append(idlinks, Link{Src: src.(string), Dst: element["@target"].(string)})
|
||||
// // fmt.Println("Link: " + src.(string) + " " + element["@target"].(string))
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
}
|
||||
// translate links
|
||||
for _, link := range idlinks {
|
||||
g.Links = append(g.Links, Link{Src: dictionnary[link.Src], Dst: dictionnary[link.Dst]})
|
||||
@ -140,48 +142,96 @@ func (g *Graph) addElementByType(element map[string]interface{}) {
|
||||
}
|
||||
if element["@rType"] == "computing" {
|
||||
g.AddComputingModel(element["@rID"].(string))
|
||||
g.AddUserInput(element["@rID"].(string))
|
||||
}
|
||||
if element["@rType"] == "storage" {
|
||||
g.AddStorageModel(element["@rID"].(string))
|
||||
}
|
||||
}
|
||||
|
||||
func (g *Graph) GetWorkflowComponents(workspace string){
|
||||
// retrieve if list of components' object
|
||||
result := gjson.Get(workspace, "computing")
|
||||
os.WriteFile("computing.json", []byte(result.String()), 0660)
|
||||
// TODO : extract all the JSON/data processing to a new object that takes
|
||||
// the current graph as an attribute and deals with adding new objects
|
||||
// to it, depending on their type
|
||||
|
||||
result.ForEach(func(id, value gjson.Result) bool{
|
||||
m, ok := value.Value().(map[string]interface{})
|
||||
if !ok {
|
||||
fmt.Printf("error getting map from json")
|
||||
}
|
||||
// Create the objects that correspond to each component
|
||||
// in a workflow, combining the user input and the base components attributes
|
||||
|
||||
var comp_obj models.ComputingModel
|
||||
comp_obj.ID = id.Str
|
||||
err := mapstructure.Decode(m,&comp_obj.ComputingNEWModel)
|
||||
if err != nil {
|
||||
fmt.Print("Error unpacking json into comp object")
|
||||
}
|
||||
func (g *Graph) GetWorkflowComponents(workflow string){
|
||||
types := []string{"computing","datacenter","data","storage"} // create a constant for more maintainability OR even better get the list of all component's type for this WF
|
||||
for _, component_type := range types {
|
||||
// Retrieve the dict of component for a specific type in the workflow
|
||||
result := gjson.Get(workflow, component_type)
|
||||
|
||||
if (result.Type != gjson.Null) {
|
||||
result.ForEach(func(id, value gjson.Result) bool{
|
||||
component_db, ok := value.Value().(map[string]interface{})
|
||||
|
||||
if !ok {
|
||||
fmt.Printf("error getting map from json")
|
||||
}
|
||||
|
||||
g.Computings = append(g.Computings, comp_obj)
|
||||
comp_id := component_db["referenceID"].(string)
|
||||
var obj interface{}
|
||||
if (comp_id != "") {
|
||||
component_map, _ := g.GetComponentById(component_type, comp_id)
|
||||
switch component_type {
|
||||
case "computing":
|
||||
obj = ConstructComputingObject(id.Str,component_map,component_db)
|
||||
case "data":
|
||||
obj = ConstructDataObject(id.Str,component_map,component_db)
|
||||
case "datacenter":
|
||||
obj = ConstructDatacenterObject(id.Str,component_map,component_db)
|
||||
case "storage":
|
||||
obj = ConstructStorageObject(id.Str,component_map,component_db)
|
||||
default :
|
||||
logs.Critical("Component type doesn't match a know type : " + component_type)
|
||||
}
|
||||
g.AddComponentObject(component_type, obj)
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return true
|
||||
func (g *Graph) AddComponentObject(comp_type string, component interface{}){
|
||||
|
||||
switch comp_type {
|
||||
case "computing":
|
||||
g.Computings = append(g.Computings, component.(models.ComputingModel))
|
||||
|
||||
case "data":
|
||||
g.Datas = append(g.Datas, component.(models.DataModel))
|
||||
|
||||
case "datacenter":
|
||||
g.Datacenters = append(g.Datacenters, component.(models.DatacenterModel))
|
||||
|
||||
case "storage":
|
||||
g.Storages = append(g.Storages, component.(models.StorageModel))
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Construct the object corresponding to component_type from its ID, retrieved in
|
||||
// the xml graph, in order to merge the user input with the base model
|
||||
|
||||
func (g *Graph) GetComponentById(component_type string, id string) (map[string]interface{}, error) {
|
||||
// TODO : Add a verification that g.ws is initialized ?
|
||||
|
||||
body , err := g.ws.Get("v1/"+component_type+"/"+id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
comp := make(map[string]interface{})
|
||||
jsonified := gjson.ParseBytes(body)
|
||||
jsonified.ForEach(func(key, value gjson.Result) bool {
|
||||
comp[key.Str] = value.String()
|
||||
return true // keep iterating
|
||||
})
|
||||
|
||||
// var slice_obj []models.ComputingModel
|
||||
// var response map[string]interface{}
|
||||
// json.Unmarshal(computings_obj,response)
|
||||
|
||||
// if len(computings_obj) != 0 {
|
||||
// for _, obj := range(computings_obj){
|
||||
// // var new_comp models.ComputingModel
|
||||
// // json.Unmarshal(obj,&new_comp)
|
||||
// fmt.Print(obj)
|
||||
// }
|
||||
// }
|
||||
|
||||
return comp, nil
|
||||
}
|
||||
|
||||
func (g *Graph) AddDataModel(id string) error {
|
||||
@ -228,25 +278,6 @@ func (g *Graph) AddStorageModel(id string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add the user input to the object
|
||||
// So far only computing (command, args, env var) and storage (path to the storing point) allow user input
|
||||
func (g *Graph) AddUserInput(id string) error {
|
||||
// TODO refactor the code in oc-catalog models so that we can call the addUserInput regardless of the class
|
||||
|
||||
// Detect which type of component
|
||||
// Retrieve the component from its ID
|
||||
|
||||
fmt.Printf("component %s",id)
|
||||
|
||||
// switch type
|
||||
// case computing
|
||||
// Computing.addUserInput(map[string]interface{})
|
||||
|
||||
// case storage
|
||||
// Storage..addUserInput(map[string]interface{})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *Graph) ExportToArgo(id string) error {
|
||||
return nil
|
||||
}
|
||||
|
7
main.go
7
main.go
@ -1,6 +1,7 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
@ -28,6 +29,7 @@ func main() {
|
||||
_ = loglevel
|
||||
|
||||
var g Graph
|
||||
|
||||
list, err := g.GetGraphList(apiurl)
|
||||
if err != nil {
|
||||
log.Fatal().Msg("Failed to get the workspaces list, check api url and that api server is up : " + apiurl)
|
||||
@ -35,6 +37,9 @@ func main() {
|
||||
for workspace, _ := range list {
|
||||
println(workspace)
|
||||
}
|
||||
g.LoadFrom(list["TLE-feed"])
|
||||
|
||||
g.LoadFrom(list["test-alpr"])
|
||||
|
||||
fmt.Print("stop")
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user