2024-08-02 13:34:39 +02:00
// A class that translates the informations held in the graph object
// via its lists of components into an argo file, using the a list of
// link ID to build the dag
package workflow_builder
import (
2024-08-19 11:43:40 +02:00
. "oc-monitord/models"
2024-08-02 13:34:39 +02:00
"os"
2024-08-29 09:34:10 +02:00
"slices"
2024-08-02 13:34:39 +02:00
"strings"
"time"
2024-08-19 11:43:40 +02:00
oclib "cloud.o-forge.io/core/oc-lib"
2024-08-02 13:34:39 +02:00
"cloud.o-forge.io/core/oc-lib/models/resource_model"
"cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph"
2024-08-29 09:34:10 +02:00
w "cloud.o-forge.io/core/oc-lib/models/workflow"
2024-08-02 13:34:39 +02:00
"github.com/nwtgck/go-fakelish"
"gopkg.in/yaml.v3"
)
type ArgoBuilder struct {
2024-08-29 09:34:10 +02:00
OriginWorkflow w . Workflow
Workflow Workflow
Timeout int
2024-08-02 13:34:39 +02:00
}
type Workflow struct {
ApiVersion string ` yaml:"apiVersion" `
Kind string ` yaml:"kind" `
Metadata struct {
2024-08-20 15:23:02 +02:00
Name string ` yaml:"name" `
2024-08-02 13:34:39 +02:00
} ` yaml:"metadata" `
Spec Spec ` yaml:"spec,omitempty" `
}
type Spec struct {
Entrypoint string ` yaml:"entrypoint" `
Arguments [ ] Parameter ` yaml:"arguments,omitempty" `
Volumes [ ] VolumeClaimTemplate ` yaml:"volumeClaimTemplates,omitempty" `
Templates [ ] Template ` yaml:"templates" `
2024-08-22 10:52:49 +02:00
Timeout int ` yaml:"activeDeadlineSeconds,omitempty" `
2024-08-02 13:34:39 +02:00
}
func ( b * ArgoBuilder ) CreateDAG ( ) ( string , error ) {
2024-08-29 10:17:31 +02:00
// handle services by checking if there is only one processing with hostname and port
if ( b . isService ( ) ) {
b . createNginxVolumes ( )
}
2024-08-02 13:34:39 +02:00
b . createTemplates ( )
b . createDAGstep ( )
b . createVolumes ( )
2024-08-29 10:17:31 +02:00
2024-08-22 10:52:49 +02:00
if b . Timeout > 0 {
b . Workflow . Spec . Timeout = b . Timeout
}
2024-08-02 13:34:39 +02:00
b . Workflow . Spec . Entrypoint = "dag"
b . Workflow . ApiVersion = "argoproj.io/v1alpha1"
b . Workflow . Kind = "Workflow"
random_name := generateWfName ( )
2024-08-08 10:11:40 +02:00
b . Workflow . Metadata . Name = "oc-monitor-" + random_name
2024-08-19 11:43:40 +02:00
logger := oclib . GetLogger ( )
2024-08-02 13:34:39 +02:00
yamlified , err := yaml . Marshal ( b . Workflow )
if err != nil {
2024-08-19 11:43:40 +02:00
logger . Error ( ) . Msg ( "Could not transform object to yaml file" )
2024-08-02 13:34:39 +02:00
return "" , err
}
// Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss
current_timestamp := time . Now ( ) . Format ( "02_01_2006_150405" )
file_name := random_name + "_" + current_timestamp + ".yml"
2024-08-19 11:43:40 +02:00
workflows_dir := "./argo_workflows/"
2024-08-02 13:34:39 +02:00
err = os . WriteFile ( workflows_dir + file_name , [ ] byte ( yamlified ) , 0660 )
if err != nil {
2024-08-19 11:43:40 +02:00
logger . Error ( ) . Msg ( "Could not write the yaml file" )
2024-08-02 13:34:39 +02:00
return "" , err
}
return file_name , nil
}
func ( b * ArgoBuilder ) createTemplates ( ) {
for _ , comp := range b . getProcessings ( ) {
var command string
2024-08-19 11:43:40 +02:00
var args string
var env string
2024-08-02 13:34:39 +02:00
comp_res := comp . Processing
command = getStringValue ( comp_res . AbstractResource , "command" )
args = getStringValue ( comp_res . AbstractResource , "args" )
env = getStringValue ( comp_res . AbstractResource , "env" )
image_name := strings . Split ( command , " " ) [ 0 ] // TODO : decide where to store the image name, GUI or models.computing.Image
temp_container := Container { Image : image_name } // TODO : decide where to store the image name, GUI or models.computing.Image
temp_container . Command = getComputingCommands ( command )
2024-08-29 10:17:31 +02:00
temp_container . Args = getComputingArgs ( args , command )
2024-08-02 13:34:39 +02:00
// Only for dev purpose,
input_names := getComputingEnvironmentName ( strings . Split ( env , " " ) )
var inputs_container [ ] Parameter
for _ , name := range input_names {
inputs_container = append ( inputs_container , Parameter { Name : name } )
}
argo_name := getArgoName ( comp_res . GetName ( ) , comp . ID )
new_temp := Template { Name : argo_name , Container : temp_container }
new_temp . Inputs . Parameters = inputs_container
new_temp . Container . VolumeMounts = append ( new_temp . Container . VolumeMounts , VolumeMount { Name : "workdir" , MountPath : "/mnt/vol" } ) // TODO : replace this with a search of the storage / data source name
2024-08-29 10:17:31 +02:00
new_temp . Container . VolumeMounts = append ( new_temp . Container . VolumeMounts , VolumeMount { Name : "nginx-demo" , MountPath : "/usr/share/nginx" } ) // Used for processing services' demo with nginx
2024-08-02 13:34:39 +02:00
b . Workflow . Spec . Templates = append ( b . Workflow . Spec . Templates , new_temp )
}
}
func ( b * ArgoBuilder ) createDAGstep ( ) {
new_dag := Dag { }
for _ , comp := range b . getProcessings ( ) {
comp_res := comp . Processing
env := getStringValue ( comp_res . AbstractResource , "env" )
unique_name := getArgoName ( comp_res . GetName ( ) , comp . ID )
step := Task { Name : unique_name , Template : unique_name }
comp_envs := getComputingEnvironment ( strings . Split ( env , " " ) )
for name , value := range comp_envs {
step . Arguments . Parameters = append ( step . Arguments . Parameters , Parameter { Name : name , Value : value } )
}
// retrieves the name (computing.name-computing.ID)
2024-08-19 11:43:40 +02:00
step . Dependencies = b . getDependency ( comp . ID ) // Error : we use the component ID instead of the GraphItem ID -> store objects
2024-08-02 13:34:39 +02:00
new_dag . Tasks = append ( new_dag . Tasks , step )
}
b . Workflow . Spec . Templates = append ( b . Workflow . Spec . Templates , Template { Name : "dag" , Dag : new_dag } )
}
func ( b * ArgoBuilder ) createVolumes ( ) {
// For testing purposes we only declare one volume, mounted in each computing
new_volume := VolumeClaimTemplate { }
new_volume . Metadata . Name = "workdir"
new_volume . Spec . AccessModes = [ ] string { "ReadWriteOnce" }
new_volume . Spec . Resources . Requests . Storage = "1Gi"
b . Workflow . Spec . Volumes = append ( b . Workflow . Spec . Volumes , new_volume )
}
2024-08-29 10:17:31 +02:00
// For demo purposes, until we implement the use of storage ressources
func ( b * ArgoBuilder ) createNginxVolumes ( ) {
new_volume := VolumeClaimTemplate { }
new_volume . Metadata . Name = "nginx-demo"
new_volume . Spec . AccessModes = [ ] string { "ReadWriteOnce" }
new_volume . Spec . Resources . Requests . Storage = "1Gi"
b . Workflow . Spec . Volumes = append ( b . Workflow . Spec . Volumes , new_volume )
}
2024-08-02 13:34:39 +02:00
func ( b * ArgoBuilder ) getDependency ( current_computing_id string ) ( dependencies [ ] string ) {
2024-08-29 09:34:10 +02:00
for _ , link := range b . OriginWorkflow . Graph . Links {
2024-08-29 12:08:57 +02:00
if b . OriginWorkflow . Graph . Items [ link . Source . ID ] . Processing == nil {
2024-08-19 11:43:40 +02:00
continue
}
2024-08-29 09:34:10 +02:00
source := b . OriginWorkflow . Graph . Items [ link . Source . ID ] . Processing
2024-08-06 11:40:30 +02:00
if current_computing_id == link . Destination . ID && source != nil {
dependency_name := getArgoName ( source . GetName ( ) , link . Source . ID )
dependencies = append ( dependencies , dependency_name )
2024-08-02 13:34:39 +02:00
}
}
return
}
2024-08-19 11:43:40 +02:00
func getComputingCommands ( user_input string ) [ ] string {
2024-08-02 13:34:39 +02:00
user_input = removeImageName ( user_input )
if len ( user_input ) == 0 {
2024-08-19 11:43:40 +02:00
return [ ] string { }
2024-08-02 13:34:39 +02:00
}
2024-08-19 11:43:40 +02:00
return strings . Split ( user_input , " " )
2024-08-02 13:34:39 +02:00
}
2024-08-29 10:17:31 +02:00
func getComputingArgs ( user_input string , command string ) ( list_args [ ] string ) {
2024-08-02 13:34:39 +02:00
if len ( user_input ) == 0 {
return
}
2024-08-29 10:17:31 +02:00
args := strings . Split ( user_input , " " )
2024-08-02 13:34:39 +02:00
// quickfix that might need improvement
if strings . Contains ( command , "sh -c" ) {
2024-08-29 10:17:31 +02:00
list_args = append ( list_args , strings . Join ( args , " " ) )
2024-08-02 13:34:39 +02:00
return
}
2024-08-29 10:17:31 +02:00
list_args = append ( list_args , args ... )
2024-08-02 13:34:39 +02:00
return
}
// Currently implements code to overcome problems in data structure
func getComputingEnvironment ( user_input [ ] string ) ( map_env map [ string ] string ) {
2024-08-19 11:43:40 +02:00
logger := oclib . GetLogger ( )
2024-08-02 13:34:39 +02:00
is_empty := len ( user_input ) == 0
is_empty_string := len ( user_input ) == 1 && user_input [ 0 ] == ""
if is_empty || is_empty_string {
return
}
if len ( user_input ) == 1 {
user_input = strings . Split ( user_input [ 0 ] , "," )
}
map_env = make ( map [ string ] string , 0 )
for _ , str := range user_input {
new_pair := strings . Split ( str , "=" )
if len ( new_pair ) != 2 {
2024-08-19 11:43:40 +02:00
logger . Error ( ) . Msg ( "Error extracting the environment variable from " + str )
2024-08-02 13:34:39 +02:00
panic ( 0 )
}
map_env [ new_pair [ 0 ] ] = new_pair [ 1 ]
}
return
}
func getComputingEnvironmentName ( user_input [ ] string ) ( list_names [ ] string ) {
env_map := getComputingEnvironment ( user_input )
for name := range env_map {
list_names = append ( list_names , name )
}
return
}
func generateWfName ( ) ( Name string ) {
Name = fakelish . GenerateFakeWord ( 5 , 8 ) + "-" + fakelish . GenerateFakeWord ( 5 , 8 )
return
}
func getArgoName ( raw_name string , component_id string ) ( formatedName string ) {
formatedName = strings . ReplaceAll ( raw_name , " " , "-" )
formatedName += "-" + component_id
formatedName = strings . ToLower ( formatedName )
return
}
func removeImageName ( user_input string ) string {
// First command is the name of the container for now
if len ( strings . Split ( user_input , " " ) ) == 1 {
return ""
}
slice_input := strings . Split ( user_input , " " )
new_slice := slice_input [ 1 : ]
user_input = strings . Join ( new_slice , " " )
return user_input
}
// Return the graphItem containing a Processing resource, so that we have access to the ID of the graphItem in order to identify it in the links
func ( b * ArgoBuilder ) getProcessings ( ) ( list_computings [ ] graph . GraphItem ) {
2024-08-29 09:34:10 +02:00
for _ , item := range b . OriginWorkflow . Graph . Items {
2024-08-02 13:34:39 +02:00
if item . Processing != nil {
list_computings = append ( list_computings , item )
}
}
return
}
2024-08-19 11:43:40 +02:00
func ( b * ArgoBuilder ) IsProcessing ( id string ) bool {
2024-08-29 09:34:10 +02:00
return slices . Contains ( b . OriginWorkflow . Processings , id )
2024-08-06 11:40:30 +02:00
}
2024-08-19 11:43:40 +02:00
func getStringValue ( comp resource_model . AbstractResource , key string ) string {
if res := comp . GetModelValue ( key ) ; res != nil {
return res . ( string )
}
return ""
}
2024-08-29 10:17:31 +02:00
func ( b * ArgoBuilder ) isService ( ) bool {
// for dev purpose do not commit to main
if os . Getenv ( "test_service" ) != "" {
return true
}
comp_list := b . getProcessings ( )
if len ( comp_list ) != 1 {
return false
}
comp := comp_list [ 0 ]
return comp . Data . ResourceModel . Model [ "port" ] . Value != "" && comp . Data . ResourceModel . Model [ "hostname" ] . Value != ""
}