2024-08-02 13:34:39 +02:00
// A class that translates the informations held in the graph object
// via its lists of components into an argo file, using the a list of
// link ID to build the dag
package workflow_builder
import (
"fmt"
"oc-monitor/logger"
. "oc-monitor/models"
"os"
"strings"
"time"
"cloud.o-forge.io/core/oc-lib/models/resource_model"
"cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph"
"github.com/nwtgck/go-fakelish"
"gopkg.in/yaml.v3"
)
type ArgoBuilder struct {
graph graph . Graph
Workflow Workflow
}
type Workflow struct {
ApiVersion string ` yaml:"apiVersion" `
Kind string ` yaml:"kind" `
Metadata struct {
GenerateName string ` yaml:"generateName" `
} ` yaml:"metadata" `
Spec Spec ` yaml:"spec,omitempty" `
}
type Spec struct {
Entrypoint string ` yaml:"entrypoint" `
Arguments [ ] Parameter ` yaml:"arguments,omitempty" `
Volumes [ ] VolumeClaimTemplate ` yaml:"volumeClaimTemplates,omitempty" `
Templates [ ] Template ` yaml:"templates" `
}
func ( b * ArgoBuilder ) CreateDAG ( ) ( string , error ) {
b . createTemplates ( )
b . createDAGstep ( )
b . createVolumes ( )
b . Workflow . Spec . Entrypoint = "dag"
b . Workflow . ApiVersion = "argoproj.io/v1alpha1"
b . Workflow . Kind = "Workflow"
random_name := generateWfName ( )
2024-08-06 11:40:30 +02:00
b . Workflow . Metadata . GenerateName = "oc-monitor-" + random_name
2024-08-02 13:34:39 +02:00
yamlified , err := yaml . Marshal ( b . Workflow )
if err != nil {
logger . Logger . Error ( ) . Msg ( "Could not transform object to yaml file" )
return "" , err
}
// Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss
current_timestamp := time . Now ( ) . Format ( "02_01_2006_150405" )
file_name := random_name + "_" + current_timestamp + ".yml"
workflows_dir := "argo_workflows/"
err = os . WriteFile ( workflows_dir + file_name , [ ] byte ( yamlified ) , 0660 )
if err != nil {
logger . Logger . Error ( ) . Msg ( "Could not write the yaml file" )
return "" , err
}
return file_name , nil
}
func ( b * ArgoBuilder ) createTemplates ( ) {
for _ , comp := range b . getProcessings ( ) {
var command string
var args string
var env string
comp_res := comp . Processing
command = getStringValue ( comp_res . AbstractResource , "command" )
args = getStringValue ( comp_res . AbstractResource , "args" )
env = getStringValue ( comp_res . AbstractResource , "env" )
image_name := strings . Split ( command , " " ) [ 0 ] // TODO : decide where to store the image name, GUI or models.computing.Image
temp_container := Container { Image : image_name } // TODO : decide where to store the image name, GUI or models.computing.Image
temp_container . Command = getComputingCommands ( command )
temp_container . Args = getComputingArgs ( strings . Split ( args , " " ) , command )
// Only for dev purpose,
input_names := getComputingEnvironmentName ( strings . Split ( env , " " ) )
var inputs_container [ ] Parameter
for _ , name := range input_names {
inputs_container = append ( inputs_container , Parameter { Name : name } )
}
argo_name := getArgoName ( comp_res . GetName ( ) , comp . ID )
new_temp := Template { Name : argo_name , Container : temp_container }
new_temp . Inputs . Parameters = inputs_container
new_temp . Container . VolumeMounts = append ( new_temp . Container . VolumeMounts , VolumeMount { Name : "workdir" , MountPath : "/mnt/vol" } ) // TODO : replace this with a search of the storage / data source name
b . Workflow . Spec . Templates = append ( b . Workflow . Spec . Templates , new_temp )
}
}
func ( b * ArgoBuilder ) createDAGstep ( ) {
new_dag := Dag { }
for _ , comp := range b . getProcessings ( ) {
comp_res := comp . Processing
env := getStringValue ( comp_res . AbstractResource , "env" )
unique_name := getArgoName ( comp_res . GetName ( ) , comp . ID )
step := Task { Name : unique_name , Template : unique_name }
comp_envs := getComputingEnvironment ( strings . Split ( env , " " ) )
for name , value := range comp_envs {
step . Arguments . Parameters = append ( step . Arguments . Parameters , Parameter { Name : name , Value : value } )
}
// retrieves the name (computing.name-computing.ID)
step . Dependencies = b . getDependency ( comp . ID ) // Error : we use the component ID instead of the GraphItem ID -> store objects
new_dag . Tasks = append ( new_dag . Tasks , step )
}
b . Workflow . Spec . Templates = append ( b . Workflow . Spec . Templates , Template { Name : "dag" , Dag : new_dag } )
}
func ( b * ArgoBuilder ) createVolumes ( ) {
// For testing purposes we only declare one volume, mounted in each computing
new_volume := VolumeClaimTemplate { }
new_volume . Metadata . Name = "workdir"
new_volume . Spec . AccessModes = [ ] string { "ReadWriteOnce" }
new_volume . Spec . Resources . Requests . Storage = "1Gi"
b . Workflow . Spec . Volumes = append ( b . Workflow . Spec . Volumes , new_volume )
}
func ( b * ArgoBuilder ) getDependency ( current_computing_id string ) ( dependencies [ ] string ) {
for _ , link := range b . graph . Links {
2024-08-06 11:40:30 +02:00
source := b . graph . Items [ link . Source . ID ] . Processing
if current_computing_id == link . Destination . ID && source != nil {
dependency_name := getArgoName ( source . GetName ( ) , link . Source . ID )
dependencies = append ( dependencies , dependency_name )
2024-08-02 13:34:39 +02:00
}
}
return
}
// func (b *ArgoBuilder) componentInBranch(component_id string, branch []string) bool {
// for _, link := range branch {
// if b.graph.Links[link].Source == component_id || b.graph.Links[link].Destination == component_id {
// return true
// }
// }
// return false
// }
// func (b *ArgoBuilder) findPreviousComputing(computing_id string, branch []string, index int) string {
// for i := index; i >= 0 ; i-- {
// previousLink := b.graph.Links[branch[i]]
// if previousLink.Source != computing_id && b.graph.GetComponentType(previousLink.Source) == "computing"{
// name := getArgoName(b.graph.getComponentName(previousLink.Source),previousLink.Source)
// return name
// }
// if previousLink.Destination != computing_id && b.graph.GetComponentType(previousLink.Destination) == "computing"{
// name := getArgoName(b.graph.getComponentName(previousLink.Destination),previousLink.Destination)
// return name
// }
// }
// return ""
// }
func getComputingCommands ( user_input string ) ( list_command [ ] string ) {
user_input = removeImageName ( user_input )
if len ( user_input ) == 0 {
return
}
list_command = strings . Split ( user_input , " " )
for i := range list_command {
list_command [ i ] = list_command [ i ]
}
return
}
func getComputingArgs ( user_input [ ] string , command string ) ( list_args [ ] string ) {
if len ( user_input ) == 0 {
return
}
// quickfix that might need improvement
if strings . Contains ( command , "sh -c" ) {
list_args = append ( list_args , strings . Join ( user_input , " " ) )
return
}
2024-08-06 11:40:30 +02:00
list_args = append ( list_args , user_input ... )
2024-08-02 13:34:39 +02:00
return
}
// Currently implements code to overcome problems in data structure
func getComputingEnvironment ( user_input [ ] string ) ( map_env map [ string ] string ) {
is_empty := len ( user_input ) == 0
is_empty_string := len ( user_input ) == 1 && user_input [ 0 ] == ""
if is_empty || is_empty_string {
return
}
if len ( user_input ) == 1 {
user_input = strings . Split ( user_input [ 0 ] , "," )
}
map_env = make ( map [ string ] string , 0 )
for _ , str := range user_input {
new_pair := strings . Split ( str , "=" )
if len ( new_pair ) != 2 {
logger . Logger . Error ( ) . Msg ( "Error extracting the environment variable from " + str )
panic ( 0 )
}
map_env [ new_pair [ 0 ] ] = new_pair [ 1 ]
}
return
}
func getComputingEnvironmentName ( user_input [ ] string ) ( list_names [ ] string ) {
env_map := getComputingEnvironment ( user_input )
for name := range env_map {
list_names = append ( list_names , name )
}
return
}
func generateWfName ( ) ( Name string ) {
Name = fakelish . GenerateFakeWord ( 5 , 8 ) + "-" + fakelish . GenerateFakeWord ( 5 , 8 )
return
}
func getArgoName ( raw_name string , component_id string ) ( formatedName string ) {
formatedName = strings . ReplaceAll ( raw_name , " " , "-" )
formatedName += "-" + component_id
formatedName = strings . ToLower ( formatedName )
return
}
func printYAML ( data interface { } ) {
yamlData , err := yaml . Marshal ( data )
if err != nil {
fmt . Printf ( "Error marshalling YAML: %v\n" , err )
return
}
fmt . Println ( string ( yamlData ) )
}
func removeImageName ( user_input string ) string {
// First command is the name of the container for now
if len ( strings . Split ( user_input , " " ) ) == 1 {
return ""
}
slice_input := strings . Split ( user_input , " " )
new_slice := slice_input [ 1 : ]
user_input = strings . Join ( new_slice , " " )
return user_input
}
// Return the graphItem containing a Processing resource, so that we have access to the ID of the graphItem in order to identify it in the links
func ( b * ArgoBuilder ) getProcessings ( ) ( list_computings [ ] graph . GraphItem ) {
for _ , item := range b . graph . Items {
if item . Processing != nil {
list_computings = append ( list_computings , item )
}
}
return
}
func getStringValue ( comp resource_model . AbstractResource , key string ) string {
if res := comp . GetModelValue ( key ) ; res != nil {
return res . ( string )
}
return ""
2024-08-06 11:40:30 +02:00
}