generates the steps of a dag yaml

This commit is contained in:
pb 2024-05-02 09:52:28 +02:00
parent 603dd62f99
commit b7441c7430
5 changed files with 279 additions and 4 deletions

108
argo_builder.go Normal file
View File

@ -0,0 +1,108 @@
// A class that translates the informations held in the graph object
// via its lists of components into an argo file, using the a list of
// link ID to build the dag
package main
import (
"fmt"
"slices"
"github.com/beego/beego/v2/core/logs"
"gopkg.in/yaml.v3"
)
type ArgoBuilder struct {
graph Graph
branches [][]string
steps []DagStep
}
type DagStep struct {
Name string `yaml:"name"`
Template string `yaml:"template"`
Arguments []string `yaml:"arguments"`
Dependencies []string `yaml:"dependencies"`
}
func (b *ArgoBuilder) CreateDAG() bool {
fmt.Println("list of branches : ", b.branches)
b.createDAGstep()
yamlified, err := yaml.Marshal(b.steps)
if err != nil {
logs.Error("Could not produce yaml file")
}
fmt.Println(string(yamlified))
return true
}
func (b *ArgoBuilder) createDAGstep() {
for _, comp := range b.graph.Computings{
unique_name := comp.Name + "_" + comp.ID
step := DagStep{Name: unique_name, Template: unique_name, Arguments: comp.Arguments}
// For each branch, check if the computing has a dependency
for _, branch := range b.branches {
if b.componentInBranch(comp.ID,branch) {
// retrieves the name (computing.name_computing.ID)
dependency := b.getDependency(comp.ID,branch)
if dependency != "" && !slices.Contains(step.Dependencies,dependency) {
step.Dependencies = append(step.Dependencies,dependency)
}
}
}
b.steps = append(b.steps, step)
}
}
func (b *ArgoBuilder) getDependency (current_computing_id string, branch []string) string {
for i := len(branch)-1; i >= 0 ; i-- {
current_link := b.graph.Links[branch[i]]
current_computing_found := false
if current_link.Source == current_computing_id || current_link.Destination == current_computing_id {
current_computing_found = true
}
if current_computing_found {
return b.findPreviousComputing(current_computing_id,branch,i-1)
}
}
return ""
}
func (b *ArgoBuilder) componentInBranch(component_id string, branch []string) bool {
for _, link := range branch {
if b.graph.Links[link].Source == component_id || b.graph.Links[link].Destination == component_id {
return true
}
}
return false
}
func (b *ArgoBuilder) findPreviousComputing(computing_id string, branch []string, index int) string {
for i := index; i >= 0 ; i-- {
previousLink := b.graph.Links[branch[i]]
if previousLink.Source != computing_id && b.graph.getComponentType(previousLink.Source) == "computing"{
name := b.graph.getComponentName(previousLink.Source) + "_" + previousLink.Source
return name
}
if previousLink.Destination != computing_id && b.graph.getComponentType(previousLink.Destination) == "computing"{
name := b.graph.getComponentName(previousLink.Destination) + "_" + previousLink.Destination
return name
}
}
return ""
}

View File

@ -0,0 +1,58 @@
@startuml process_branches
actor oc_scheduler as oc
participant graph as graph
participant argoBuilder as argo
entity list_branches as branches
entity graph as argoGraph
oc -> graph : loadGraphFrom(nom_workflow)
activate oc
activate graph
graph --> oc : Graph
oc -> graph : generateArgo()
graph -> graph : createBranches
graph -> argo : new ArgoBuilder(Graph,list_branches)
activate argo
argo -> argo : create list of nodes already written
note top : a node is a component in a link object,\n represented by its ID
argo -> argo
note bottom : We want to prepare the list of branches by removing\n nodes that appears several times\ne.g : two branches :\n1 - 2 2 - 3 3 - 4\n1 - 2 2 - 3 3 - 5\nWe will iterate over each branch to create the component's YAML\n and don't want to create components 1 2 et 3 twice or more
loop branch in branches
group if Source first link is not in list node written
loop while link Source is shared
argo -> argo : create an object representing the component
argo -> argo : add component to list of created nodes
end
end
end
loop branch in branches
loop link in branch
group if link.Destination not in list of nodes already written
argo -> argoGraph : get component
argoGraph --> argo : component
argo -> argo : generate YAML depending on type (attributes)
argo -> argo : add ID to list of node already written
end
end
end
loop for each struct created
argo -> argo : Marshal into yaml
argo -> argo : write the marshalled string to a file
end
@enduml

View File

@ -0,0 +1,67 @@
@startuml
actor oc_scheduler as oc
participant graph as graph
oc -> graph : loadFrom(worflow_name)
activate graph
activate oc
graph --> oc: Graph
group if g.Links size > 0
oc -> graph: generateArgo()
graph -> graph: create slice of end links (strings)
loop link in links
alt link is not a DC Link && destination id IS NOT a source id in any other link
graph -> graph: add link to list of endLinks
note right : exemple\n If current link is A-->B\n we look for another link like C-->A
end
end
loop link in endLinks
graph -> graph : copy Links id on a list
loop while hasParent
graph -> graph : currentLink = idList[i]
group if link.dst == currentLink.Src
graph -> graph : do something
note right : TEST ALGO : edit a\n string that shows every branch
graph -> graph : change currentSrc with link.Src
graph -> graph : remove currentLink from idList
end
group else
graph -> graph : hasParent = false
end
end
end
end
@enduml
créer une liste sans les liens composant - DC
// selectionner toutes les fins de branches :
creer une liste d'identifiant
pour chaque lien
si dst n'est pas source on ajoute l'id du lien a la liste
// recréer chaque branche
pour chaque id de fin de branche
copier les liens dans un nouvel objet
declarer une variable pour controller s'il y'a encore un lien parent
tant qu'il y'a un lien parent
iterer dans la liste des liens
si un lien a en dst le même id que la source actuelle :
afficher "{id_src} s'execute une fois que {id_dst_trouve} est fini"
changer l'id de la source actuelle par celui de la source du lien trouvé
sinon
lien parent = faux

View File

@ -0,0 +1,43 @@
@startuml processing links in workflow
participant graph as graph
participant argoBuilder as argo
entity list_branches as branches
' entity graph as argoGraph
graph -> argo : create new argo file (graph)
activate graph
activate argo
argo -> branches : identify each branches from graph
activate branches
branches --> argo : list of branches
deactivate branches
loop branch in branches
loop link in branch
argo -> argo : retrieve the computing component
note right: a link is always made\n of a computing \nand a data source or \ndata storage component
loop branch in branches
group if component id has a previous computing component
argo -> branches : get the name of previous computing
branches --> argo : computing.Name + computing.ID
argo -> argo : add list of dependencie to DagStep object
end
end
group if other component is storage
argo -> argo : retrieve the path
argo -> argo : declare a volume in the argo format
argo -> argo : add the created volume to computing as an output
else
argo -> argo : create a container to retrieve the data
argo -> argo : mount a local folder in computing to share data
note right: how do we make sure that\n the data is mounted in\na place where the computing\nknows it can find it ?
end
end
end
@enduml

View File

@ -270,7 +270,6 @@ func (g *Graph) ExportToArgo(id string) error {
// Return a list containing the IDs of each link that make up a branch in the graph // Return a list containing the IDs of each link that make up a branch in the graph
func (g *Graph) getListBranches(end_links map[string]models.Link, editable_link_list map[string]models.Link, current_branch []string) (list_branches [][]string) { func (g *Graph) getListBranches(end_links map[string]models.Link, editable_link_list map[string]models.Link, current_branch []string) (list_branches [][]string) {
fmt.Printf("Working on %v \n", current_branch)
if current_branch == nil { if current_branch == nil {
current_branch = make([]string, 0) current_branch = make([]string, 0)
} }
@ -289,8 +288,9 @@ func (g *Graph) getListBranches(end_links map[string]models.Link, editable_link_
previous_index := g.hasPreviousLink(j, editable_link_list) previous_index := g.hasPreviousLink(j, editable_link_list)
if len(previous_index) == 0 { if len(previous_index) == 0 {
new_branches = append(new_branches, []string{link_id}) // new_branches = append(new_branches, []string{link_id})
return new_branches // return new_branches
list_branches = append(list_branches, []string{link_id})
} }
for _, id_link := range previous_index { for _, id_link := range previous_index {
@ -309,7 +309,6 @@ func (g *Graph) getListBranches(end_links map[string]models.Link, editable_link_
} }
return
} }