2024-08-02 13:34:39 +02:00
// A class that translates the informations held in the graph object
// via its lists of components into an argo file, using the a list of
// link ID to build the dag
package workflow_builder
import (
2024-09-25 15:46:39 +02:00
"fmt"
2024-08-19 11:43:40 +02:00
. "oc-monitord/models"
2024-08-02 13:34:39 +02:00
"os"
2024-10-11 13:44:16 +02:00
"regexp"
2024-08-02 13:34:39 +02:00
"strings"
"time"
2024-08-19 11:43:40 +02:00
oclib "cloud.o-forge.io/core/oc-lib"
2024-11-07 13:36:28 +01:00
"cloud.o-forge.io/core/oc-lib/models/resources/compute"
2024-10-11 13:44:16 +02:00
"cloud.o-forge.io/core/oc-lib/models/resources/processing"
2024-08-02 13:34:39 +02:00
"cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph"
2024-08-29 09:34:10 +02:00
w "cloud.o-forge.io/core/oc-lib/models/workflow"
2024-08-02 13:34:39 +02:00
"github.com/nwtgck/go-fakelish"
2024-09-03 14:24:03 +02:00
"github.com/rs/zerolog"
2024-08-02 13:34:39 +02:00
"gopkg.in/yaml.v3"
)
2024-09-03 14:24:03 +02:00
var logger zerolog . Logger
2024-08-02 13:34:39 +02:00
type ArgoBuilder struct {
2024-10-11 13:44:16 +02:00
OriginWorkflow * w . Workflow
2024-09-25 15:46:39 +02:00
Workflow Workflow
2024-10-11 13:44:16 +02:00
Services [ ] * Service
2024-09-25 15:46:39 +02:00
Timeout int
2024-08-02 13:34:39 +02:00
}
type Workflow struct {
ApiVersion string ` yaml:"apiVersion" `
Kind string ` yaml:"kind" `
Metadata struct {
2024-08-20 15:23:02 +02:00
Name string ` yaml:"name" `
2024-08-02 13:34:39 +02:00
} ` yaml:"metadata" `
Spec Spec ` yaml:"spec,omitempty" `
}
2024-10-11 13:44:16 +02:00
func ( b * Workflow ) setDag ( dag * Dag ) {
for _ , t := range b . Spec . Templates {
if t . Name == "dag" {
t . Dag = dag
}
}
}
func ( b * Workflow ) getDag ( ) * Dag {
for _ , t := range b . Spec . Templates {
if t . Name == "dag" {
return t . Dag
}
}
b . Spec . Templates = append ( b . Spec . Templates , Template { Name : "dag" , Dag : & Dag { } } )
return b . Spec . Templates [ len ( b . Spec . Templates ) - 1 ] . Dag
}
2024-08-02 13:34:39 +02:00
type Spec struct {
Entrypoint string ` yaml:"entrypoint" `
Arguments [ ] Parameter ` yaml:"arguments,omitempty" `
Volumes [ ] VolumeClaimTemplate ` yaml:"volumeClaimTemplates,omitempty" `
Templates [ ] Template ` yaml:"templates" `
2024-08-22 10:52:49 +02:00
Timeout int ` yaml:"activeDeadlineSeconds,omitempty" `
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
func ( b * ArgoBuilder ) CreateDAG ( write bool ) ( string , int , [ ] string , [ ] string , error ) {
2024-09-25 15:46:39 +02:00
2024-08-29 10:17:31 +02:00
// handle services by checking if there is only one processing with hostname and port
2024-10-11 13:44:16 +02:00
firstItems , lastItems , volumes := b . createTemplates ( )
b . createVolumes ( volumes )
2024-08-29 10:17:31 +02:00
2024-08-22 10:52:49 +02:00
if b . Timeout > 0 {
b . Workflow . Spec . Timeout = b . Timeout
}
2024-08-02 13:34:39 +02:00
b . Workflow . Spec . Entrypoint = "dag"
b . Workflow . ApiVersion = "argoproj.io/v1alpha1"
b . Workflow . Kind = "Workflow"
2024-10-11 13:44:16 +02:00
if ! write {
return "" , len ( b . Workflow . getDag ( ) . Tasks ) , firstItems , lastItems , nil
}
random_name := fakelish . GenerateFakeWord ( 5 , 8 ) + "-" + fakelish . GenerateFakeWord ( 5 , 8 )
2024-08-08 10:11:40 +02:00
b . Workflow . Metadata . Name = "oc-monitor-" + random_name
2024-09-03 14:24:03 +02:00
logger = oclib . GetLogger ( )
2024-08-02 13:34:39 +02:00
yamlified , err := yaml . Marshal ( b . Workflow )
if err != nil {
2024-08-19 11:43:40 +02:00
logger . Error ( ) . Msg ( "Could not transform object to yaml file" )
2024-10-11 13:44:16 +02:00
return "" , 0 , firstItems , lastItems , err
2024-08-02 13:34:39 +02:00
}
// Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss
current_timestamp := time . Now ( ) . Format ( "02_01_2006_150405" )
file_name := random_name + "_" + current_timestamp + ".yml"
2024-08-19 11:43:40 +02:00
workflows_dir := "./argo_workflows/"
2024-08-02 13:34:39 +02:00
err = os . WriteFile ( workflows_dir + file_name , [ ] byte ( yamlified ) , 0660 )
if err != nil {
2024-08-19 11:43:40 +02:00
logger . Error ( ) . Msg ( "Could not write the yaml file" )
2024-10-11 13:44:16 +02:00
return "" , 0 , firstItems , lastItems , err
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
return file_name , len ( b . Workflow . getDag ( ) . Tasks ) , firstItems , lastItems , nil
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
func ( b * ArgoBuilder ) createTemplates ( ) ( [ ] string , [ ] string , [ ] VolumeMount ) {
volumes := [ ] VolumeMount { }
firstItems := [ ] string { }
lastItems := [ ] string { }
for _ , comp := range b . OriginWorkflow . GetProcessings ( ) {
if comp . Processing . Container != nil {
volumes , firstItems , lastItems = b . createArgoTemplates (
comp . ID , comp . Processing , volumes , firstItems , lastItems )
} else {
logger . Error ( ) . Msg ( "Not enough configuration setup, template can't be created : " + comp . Processing . GetName ( ) )
return firstItems , lastItems , volumes
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
}
firstWfTasks := map [ string ] [ ] string { }
latestWfTasks := map [ string ] [ ] string { }
relatedWfTasks := map [ string ] [ ] string { }
for _ , wf := range b . OriginWorkflow . GetWorkflows ( ) {
realWorkflow , code , err := w . New ( ) . LoadOne ( wf . Workflow . WorkflowID )
if code != 200 {
logger . Error ( ) . Msg ( "Error loading the workflow : " + err . Error ( ) )
continue
2024-09-03 14:24:03 +02:00
}
2024-10-11 13:44:16 +02:00
subBuilder := ArgoBuilder { OriginWorkflow : realWorkflow . ( * w . Workflow ) , Timeout : b . Timeout }
_ , _ , fi , li , err := subBuilder . CreateDAG ( false )
if err != nil {
logger . Error ( ) . Msg ( "Error creating the subworkflow : " + err . Error ( ) )
continue
}
firstWfTasks [ wf . ID ] = fi
if ok , depsOfIds := subBuilder . isArgoDependancy ( wf . ID ) ; ok { // IS BEFORE
latestWfTasks [ wf . ID ] = li
relatedWfTasks [ wf . ID ] = depsOfIds
}
subDag := subBuilder . Workflow . getDag ( )
d := b . Workflow . getDag ( )
d . Tasks = append ( d . Tasks , subDag . Tasks ... ) // add the tasks of the subworkflow to the main workflow
b . Workflow . Spec . Templates = append ( b . Workflow . Spec . Templates , subBuilder . Workflow . Spec . Templates ... )
b . Workflow . Spec . Volumes = append ( b . Workflow . Spec . Volumes , subBuilder . Workflow . Spec . Volumes ... )
b . Workflow . Spec . Arguments = append ( b . Workflow . Spec . Arguments , subBuilder . Workflow . Spec . Arguments ... )
b . Services = append ( b . Services , subBuilder . Services ... )
2024-09-03 14:24:03 +02:00
}
2024-10-11 13:44:16 +02:00
for wfID , depsOfIds := range relatedWfTasks {
for _ , dep := range depsOfIds {
for _ , task := range b . Workflow . getDag ( ) . Tasks {
if strings . Contains ( task . Name , dep ) {
index := - 1
for i , depp := range task . Dependencies {
if strings . Contains ( depp , wfID ) {
index = i
break
}
}
if index != - 1 {
task . Dependencies = append ( task . Dependencies [ : index ] , task . Dependencies [ index + 1 : ] ... )
}
task . Dependencies = append ( task . Dependencies , latestWfTasks [ wfID ] ... )
}
}
}
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
for wfID , fi := range firstWfTasks {
deps := b . getArgoDependencies ( wfID )
if len ( deps ) > 0 {
for _ , dep := range fi {
for _ , task := range b . Workflow . getDag ( ) . Tasks {
if strings . Contains ( task . Name , dep ) {
task . Dependencies = append ( task . Dependencies , deps ... )
}
}
}
2024-08-02 13:34:39 +02:00
}
}
2024-09-03 14:24:03 +02:00
if b . Services != nil {
2024-10-11 13:44:16 +02:00
dag := b . Workflow . getDag ( )
dag . Tasks = append ( dag . Tasks , Task { Name : "workflow-service-pod" , Template : "workflow-service-pod" } )
b . addServiceToArgo ( )
2024-09-03 14:24:03 +02:00
}
2024-10-11 13:44:16 +02:00
return firstItems , lastItems , volumes
2024-08-02 13:34:39 +02:00
}
2024-11-07 13:36:28 +01:00
func ( b * ArgoBuilder ) getAllComputeFromProcessing ( p * processing . ProcessingResource ) [ ] * compute . ComputeResource {
computeResources := [ ] * compute . ComputeResource { }
for _ , link := range b . OriginWorkflow . Graph . Links {
if link . Source . ID == p . GetID ( ) && b . OriginWorkflow . Graph . Items [ link . Destination . ID ] . Compute != nil {
computeResources = append ( computeResources , b . OriginWorkflow . Graph . Items [ link . Destination . ID ] . Compute )
} else if link . Destination . ID == p . GetID ( ) && b . OriginWorkflow . Graph . Items [ link . Source . ID ] . Compute != nil {
computeResources = append ( computeResources , b . OriginWorkflow . Graph . Items [ link . Source . ID ] . Compute )
}
}
return computeResources
}
2024-10-11 13:44:16 +02:00
func ( b * ArgoBuilder ) createArgoTemplates ( id string ,
processing * processing . ProcessingResource ,
volumes [ ] VolumeMount ,
firstItems [ ] string ,
lastItems [ ] string ) ( [ ] VolumeMount , [ ] string , [ ] string ) {
_ , firstItems , lastItems = b . addTaskToArgo ( b . Workflow . getDag ( ) , id , processing , firstItems , lastItems )
template := & Template { Name : getArgoName ( processing . GetName ( ) , id ) }
template . CreateContainer ( processing , b . Workflow . getDag ( ) )
2024-11-07 13:36:28 +01:00
// get datacenter from the processing
computes := b . getAllComputeFromProcessing ( processing )
for _ , compute := range computes {
2024-10-11 13:44:16 +02:00
b . CreateService ( id , processing )
template . Metadata . Labels = make ( map [ string ] string )
2024-11-07 13:36:28 +01:00
template . Metadata . Labels [ "app" ] = "oc-service-" + processing . GetName ( ) + "-" + compute . GetName ( ) // Construct the template for the k8s service and add a link in graph between k8s service and processing
2024-10-11 13:44:16 +02:00
}
storages := b . OriginWorkflow . GetStoragesByRelatedProcessing ( id )
for _ , storage := range storages {
if storage . Local {
volumes = template . Container . AddVolumeMount ( VolumeMount {
Name : strings . ReplaceAll ( strings . ToLower ( storage . GetName ( ) ) , " " , "-" ) ,
MountPath : storage . Path ,
Storage : storage ,
} , volumes )
2024-08-02 13:34:39 +02:00
}
}
2024-10-11 13:44:16 +02:00
b . Workflow . Spec . Templates = append ( b . Workflow . Spec . Templates , * template )
return volumes , firstItems , lastItems
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
func ( b * ArgoBuilder ) addTaskToArgo ( dag * Dag , graphItemID string , processing * processing . ProcessingResource ,
firstItems [ ] string , lastItems [ ] string ) ( * Dag , [ ] string , [ ] string ) {
unique_name := getArgoName ( processing . GetName ( ) , graphItemID )
step := Task { Name : unique_name , Template : unique_name }
if processing . Container != nil {
for name , value := range processing . Container . Env {
step . Arguments . Parameters = append ( step . Arguments . Parameters , Parameter {
Name : name ,
Value : b . affectVariableEnv ( value , b . OriginWorkflow . Graph ) ,
} )
}
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
step . Dependencies = b . getArgoDependencies ( graphItemID )
name := ""
if b . OriginWorkflow . Graph . Items [ graphItemID ] . Processing != nil {
name = b . OriginWorkflow . Graph . Items [ graphItemID ] . Processing . GetName ( )
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
if b . OriginWorkflow . Graph . Items [ graphItemID ] . Workflow != nil {
name = b . OriginWorkflow . Graph . Items [ graphItemID ] . Workflow . GetName ( )
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
if len ( step . Dependencies ) == 0 && name != "" {
firstItems = append ( firstItems , getArgoName ( name , graphItemID ) )
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
if ok , _ := b . isArgoDependancy ( graphItemID ) ; ! ok && name != "" {
lastItems = append ( lastItems , getArgoName ( name , graphItemID ) )
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
dag . Tasks = append ( dag . Tasks , step )
return dag , firstItems , lastItems
}
2024-08-02 13:34:39 +02:00
2024-10-11 13:44:16 +02:00
func ( b * ArgoBuilder ) affectVariableEnv ( envVar string , graph * graph . Graph ) string {
var myExp = regexp . MustCompile ( ` (\ { \ { .*\}\}) ` ) // regex to find all the variables in the command
matches := myExp . FindAllString ( envVar , - 1 ) // find all the variables in the command
for _ , match := range matches { // for each variable in the command
splitted := strings . Split ( // split the variable to get the inout and the vars only
strings . ReplaceAll ( strings . ReplaceAll ( strings . ReplaceAll ( match , "{{" , "" ) , "}}" , "" ) , " " , "" ) , "_" )
if len ( splitted ) < 3 { // if the variable is not well formatted, we skip it
logger . Error ( ) . Msgf ( "The variable %v is not well formatted" , match )
continue
}
graphItemID := splitted [ 1 ] // graphitemid is the id of the object
vars := splitted [ 2 ] // vars is the name of the variable of the object
_ , obj := graph . GetResource ( graphItemID )
if obj != nil {
envVar = strings . ReplaceAll ( envVar , match , fmt . Sprintf ( "%v" , obj . Serialize ( ) [ vars ] ) )
2024-08-02 13:34:39 +02:00
}
}
2024-10-11 13:44:16 +02:00
return envVar
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
func ( b * ArgoBuilder ) createVolumes ( volumes [ ] VolumeMount ) { // TODO : one think about remote volume but TG
for _ , volume := range volumes {
new_volume := VolumeClaimTemplate { }
new_volume . Metadata . Name = strings . ReplaceAll ( strings . ToLower ( volume . Name ) , " " , "-" )
new_volume . Spec . AccessModes = [ ] string { "ReadWriteOnce" }
new_volume . Spec . Resources . Requests . Storage = fmt . Sprintf ( "%v" , volume . Storage . Size ) + volume . Storage . SizeType . ToArgo ( )
b . Workflow . Spec . Volumes = append ( b . Workflow . Spec . Volumes , new_volume )
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
}
2024-08-02 13:34:39 +02:00
2024-10-11 13:44:16 +02:00
func ( b * ArgoBuilder ) isArgoDependancy ( id string ) ( bool , [ ] string ) {
dependancyOfIDs := [ ] string { }
isDeps := false
for _ , link := range b . OriginWorkflow . Graph . Links {
source := b . OriginWorkflow . Graph . Items [ link . Destination . ID ] . Processing
if id == link . Source . ID && source != nil {
isDeps = true
dependancyOfIDs = append ( dependancyOfIDs , getArgoName ( source . GetName ( ) , link . Destination . ID ) )
}
wourceWF := b . OriginWorkflow . Graph . Items [ link . Destination . ID ] . Workflow
if id == link . Source . ID && wourceWF != nil {
isDeps = true
dependancyOfIDs = append ( dependancyOfIDs , getArgoName ( wourceWF . GetName ( ) , link . Destination . ID ) )
}
}
return isDeps , dependancyOfIDs
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
func ( b * ArgoBuilder ) getArgoDependencies ( id string ) ( dependencies [ ] string ) {
for _ , link := range b . OriginWorkflow . Graph . Links {
source := b . OriginWorkflow . Graph . Items [ link . Source . ID ] . Processing
if id == link . Destination . ID && source != nil {
dependency_name := getArgoName ( source . GetName ( ) , link . Source . ID )
dependencies = append ( dependencies , dependency_name )
continue
}
}
2024-08-02 13:34:39 +02:00
return
}
func getArgoName ( raw_name string , component_id string ) ( formatedName string ) {
formatedName = strings . ReplaceAll ( raw_name , " " , "-" )
formatedName += "-" + component_id
formatedName = strings . ToLower ( formatedName )
return
}