2024-08-02 13:34:39 +02:00
// A class that translates the informations held in the graph object
// via its lists of components into an argo file, using the a list of
// link ID to build the dag
package workflow_builder
import (
2024-09-25 15:46:39 +02:00
"fmt"
2025-02-14 12:00:29 +01:00
"oc-monitord/conf"
2024-08-19 11:43:40 +02:00
. "oc-monitord/models"
2025-02-14 12:00:29 +01:00
tools2 "oc-monitord/tools"
2024-08-02 13:34:39 +02:00
"os"
"strings"
"time"
2024-08-19 11:43:40 +02:00
oclib "cloud.o-forge.io/core/oc-lib"
2025-02-05 08:36:26 +01:00
"cloud.o-forge.io/core/oc-lib/models/common/enum"
2025-01-13 14:05:21 +01:00
"cloud.o-forge.io/core/oc-lib/models/resources"
2024-08-29 09:34:10 +02:00
w "cloud.o-forge.io/core/oc-lib/models/workflow"
2024-08-02 13:34:39 +02:00
"github.com/nwtgck/go-fakelish"
2024-09-03 14:24:03 +02:00
"github.com/rs/zerolog"
2024-08-02 13:34:39 +02:00
"gopkg.in/yaml.v3"
)
2024-09-03 14:24:03 +02:00
var logger zerolog . Logger
2024-08-02 13:34:39 +02:00
type ArgoBuilder struct {
2024-10-11 13:44:16 +02:00
OriginWorkflow * w . Workflow
2024-09-25 15:46:39 +02:00
Workflow Workflow
2024-10-11 13:44:16 +02:00
Services [ ] * Service
2024-09-25 15:46:39 +02:00
Timeout int
2024-08-02 13:34:39 +02:00
}
type Workflow struct {
ApiVersion string ` yaml:"apiVersion" `
Kind string ` yaml:"kind" `
Metadata struct {
2024-08-20 15:23:02 +02:00
Name string ` yaml:"name" `
2024-08-02 13:34:39 +02:00
} ` yaml:"metadata" `
Spec Spec ` yaml:"spec,omitempty" `
}
2024-10-11 13:44:16 +02:00
func ( b * Workflow ) getDag ( ) * Dag {
for _ , t := range b . Spec . Templates {
if t . Name == "dag" {
return t . Dag
}
}
b . Spec . Templates = append ( b . Spec . Templates , Template { Name : "dag" , Dag : & Dag { } } )
return b . Spec . Templates [ len ( b . Spec . Templates ) - 1 ] . Dag
}
2024-08-02 13:34:39 +02:00
type Spec struct {
Entrypoint string ` yaml:"entrypoint" `
Arguments [ ] Parameter ` yaml:"arguments,omitempty" `
Volumes [ ] VolumeClaimTemplate ` yaml:"volumeClaimTemplates,omitempty" `
Templates [ ] Template ` yaml:"templates" `
2024-08-22 10:52:49 +02:00
Timeout int ` yaml:"activeDeadlineSeconds,omitempty" `
2024-08-02 13:34:39 +02:00
}
2025-02-05 08:36:26 +01:00
// TODO: found on a processing instance linked to storage
// add s3, gcs, azure, etc if needed on a link between processing and storage
2025-02-14 12:00:29 +01:00
func ( b * ArgoBuilder ) CreateDAG ( namespace string , write bool ) ( string , int , [ ] string , [ ] string , error ) {
2025-02-05 08:36:26 +01:00
fmt . Println ( "Creating DAG" , b . OriginWorkflow . Graph . Items )
2024-08-29 10:17:31 +02:00
// handle services by checking if there is only one processing with hostname and port
2025-02-14 12:00:29 +01:00
firstItems , lastItems , volumes := b . createTemplates ( namespace )
2024-10-11 13:44:16 +02:00
b . createVolumes ( volumes )
2024-08-29 10:17:31 +02:00
2024-08-22 10:52:49 +02:00
if b . Timeout > 0 {
b . Workflow . Spec . Timeout = b . Timeout
}
2024-08-02 13:34:39 +02:00
b . Workflow . Spec . Entrypoint = "dag"
b . Workflow . ApiVersion = "argoproj.io/v1alpha1"
b . Workflow . Kind = "Workflow"
2024-10-11 13:44:16 +02:00
if ! write {
return "" , len ( b . Workflow . getDag ( ) . Tasks ) , firstItems , lastItems , nil
}
random_name := fakelish . GenerateFakeWord ( 5 , 8 ) + "-" + fakelish . GenerateFakeWord ( 5 , 8 )
2024-08-08 10:11:40 +02:00
b . Workflow . Metadata . Name = "oc-monitor-" + random_name
2024-09-03 14:24:03 +02:00
logger = oclib . GetLogger ( )
2024-08-02 13:34:39 +02:00
yamlified , err := yaml . Marshal ( b . Workflow )
if err != nil {
2024-08-19 11:43:40 +02:00
logger . Error ( ) . Msg ( "Could not transform object to yaml file" )
2024-10-11 13:44:16 +02:00
return "" , 0 , firstItems , lastItems , err
2024-08-02 13:34:39 +02:00
}
// Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss
current_timestamp := time . Now ( ) . Format ( "02_01_2006_150405" )
file_name := random_name + "_" + current_timestamp + ".yml"
2024-08-19 11:43:40 +02:00
workflows_dir := "./argo_workflows/"
2024-08-02 13:34:39 +02:00
err = os . WriteFile ( workflows_dir + file_name , [ ] byte ( yamlified ) , 0660 )
if err != nil {
2024-08-19 11:43:40 +02:00
logger . Error ( ) . Msg ( "Could not write the yaml file" )
2024-10-11 13:44:16 +02:00
return "" , 0 , firstItems , lastItems , err
2024-08-02 13:34:39 +02:00
}
2025-02-17 16:54:25 +01:00
return workflows_dir + file_name , len ( b . Workflow . getDag ( ) . Tasks ) , firstItems , lastItems , nil
2024-08-02 13:34:39 +02:00
}
2025-02-14 12:00:29 +01:00
func ( b * ArgoBuilder ) createTemplates ( namespace string ) ( [ ] string , [ ] string , [ ] VolumeMount ) {
2024-10-11 13:44:16 +02:00
volumes := [ ] VolumeMount { }
firstItems := [ ] string { }
lastItems := [ ] string { }
2025-02-05 08:36:26 +01:00
items := b . OriginWorkflow . GetGraphItems ( b . OriginWorkflow . Graph . IsProcessing )
fmt . Println ( "Creating templates" , len ( items ) )
for _ , item := range b . OriginWorkflow . GetGraphItems ( b . OriginWorkflow . Graph . IsProcessing ) {
instance := item . Processing . GetSelectedInstance ( )
fmt . Println ( "Creating template for" , item . Processing . GetName ( ) , instance )
if instance == nil || instance . ( * resources . ProcessingInstance ) . Access == nil && instance . ( * resources . ProcessingInstance ) . Access . Container != nil {
logger . Error ( ) . Msg ( "Not enough configuration setup, template can't be created : " + item . Processing . GetName ( ) )
2024-10-11 13:44:16 +02:00
return firstItems , lastItems , volumes
2024-08-02 13:34:39 +02:00
}
2025-02-14 12:00:29 +01:00
volumes , firstItems , lastItems = b . createArgoTemplates ( namespace ,
2025-02-05 08:36:26 +01:00
item . ID , item . Processing , volumes , firstItems , lastItems )
2024-10-11 13:44:16 +02:00
}
firstWfTasks := map [ string ] [ ] string { }
latestWfTasks := map [ string ] [ ] string { }
relatedWfTasks := map [ string ] [ ] string { }
2025-01-13 14:05:21 +01:00
for _ , wf := range b . OriginWorkflow . Workflows {
realWorkflow , code , err := w . NewAccessor ( nil ) . LoadOne ( wf )
2024-10-11 13:44:16 +02:00
if code != 200 {
logger . Error ( ) . Msg ( "Error loading the workflow : " + err . Error ( ) )
continue
2024-09-03 14:24:03 +02:00
}
2024-10-11 13:44:16 +02:00
subBuilder := ArgoBuilder { OriginWorkflow : realWorkflow . ( * w . Workflow ) , Timeout : b . Timeout }
2025-02-14 12:00:29 +01:00
_ , _ , fi , li , err := subBuilder . CreateDAG ( namespace , false )
2024-10-11 13:44:16 +02:00
if err != nil {
logger . Error ( ) . Msg ( "Error creating the subworkflow : " + err . Error ( ) )
continue
}
2025-01-13 14:05:21 +01:00
firstWfTasks [ wf ] = fi
if ok , depsOfIds := subBuilder . isArgoDependancy ( wf ) ; ok { // IS BEFORE
latestWfTasks [ wf ] = li
relatedWfTasks [ wf ] = depsOfIds
2024-10-11 13:44:16 +02:00
}
subDag := subBuilder . Workflow . getDag ( )
d := b . Workflow . getDag ( )
d . Tasks = append ( d . Tasks , subDag . Tasks ... ) // add the tasks of the subworkflow to the main workflow
b . Workflow . Spec . Templates = append ( b . Workflow . Spec . Templates , subBuilder . Workflow . Spec . Templates ... )
b . Workflow . Spec . Volumes = append ( b . Workflow . Spec . Volumes , subBuilder . Workflow . Spec . Volumes ... )
b . Workflow . Spec . Arguments = append ( b . Workflow . Spec . Arguments , subBuilder . Workflow . Spec . Arguments ... )
b . Services = append ( b . Services , subBuilder . Services ... )
2024-09-03 14:24:03 +02:00
}
2024-10-11 13:44:16 +02:00
for wfID , depsOfIds := range relatedWfTasks {
for _ , dep := range depsOfIds {
for _ , task := range b . Workflow . getDag ( ) . Tasks {
if strings . Contains ( task . Name , dep ) {
index := - 1
for i , depp := range task . Dependencies {
if strings . Contains ( depp , wfID ) {
index = i
break
}
}
if index != - 1 {
task . Dependencies = append ( task . Dependencies [ : index ] , task . Dependencies [ index + 1 : ] ... )
}
task . Dependencies = append ( task . Dependencies , latestWfTasks [ wfID ] ... )
}
}
}
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
for wfID , fi := range firstWfTasks {
deps := b . getArgoDependencies ( wfID )
if len ( deps ) > 0 {
for _ , dep := range fi {
for _ , task := range b . Workflow . getDag ( ) . Tasks {
if strings . Contains ( task . Name , dep ) {
task . Dependencies = append ( task . Dependencies , deps ... )
}
}
}
2024-08-02 13:34:39 +02:00
}
}
2024-09-03 14:24:03 +02:00
if b . Services != nil {
2024-10-11 13:44:16 +02:00
dag := b . Workflow . getDag ( )
dag . Tasks = append ( dag . Tasks , Task { Name : "workflow-service-pod" , Template : "workflow-service-pod" } )
b . addServiceToArgo ( )
2024-09-03 14:24:03 +02:00
}
2024-10-11 13:44:16 +02:00
return firstItems , lastItems , volumes
2024-08-02 13:34:39 +02:00
}
2025-02-14 12:00:29 +01:00
func ( b * ArgoBuilder ) createArgoTemplates ( namespace string ,
id string ,
2025-01-13 14:05:21 +01:00
processing * resources . ProcessingResource ,
2024-10-11 13:44:16 +02:00
volumes [ ] VolumeMount ,
firstItems [ ] string ,
lastItems [ ] string ) ( [ ] VolumeMount , [ ] string , [ ] string ) {
_ , firstItems , lastItems = b . addTaskToArgo ( b . Workflow . getDag ( ) , id , processing , firstItems , lastItems )
template := & Template { Name : getArgoName ( processing . GetName ( ) , id ) }
2025-02-05 08:36:26 +01:00
fmt . Println ( "Creating template for" , template . Name )
2025-02-14 12:00:29 +01:00
template . CreateContainer ( processing , b . Workflow . getDag ( ) , template . Name )
2024-11-07 13:36:28 +01:00
// get datacenter from the processing
2024-11-12 09:10:33 +01:00
if processing . IsService {
2024-10-11 13:44:16 +02:00
b . CreateService ( id , processing )
template . Metadata . Labels = make ( map [ string ] string )
2024-11-12 09:10:33 +01:00
template . Metadata . Labels [ "app" ] = "oc-service-" + processing . GetName ( ) // Construct the template for the k8s service and add a link in graph between k8s service and processing
2024-10-11 13:44:16 +02:00
}
2025-02-05 08:36:26 +01:00
related := b . OriginWorkflow . GetByRelatedProcessing ( id , b . OriginWorkflow . Graph . IsStorage )
for _ , r := range related {
storage := r . Node . ( * resources . StorageResource )
for _ , linkToStorage := range r . Links {
for _ , rw := range linkToStorage . StorageLinkInfos {
art := Artifact { Path : template . ReplacePerEnv ( rw . Source , linkToStorage . Env ) }
if rw . Write {
art . Name = storage . GetName ( ) + "-" + rw . Destination + "-input-write"
} else {
art . Name = storage . GetName ( ) + "-" + rw . Destination + "-input-read"
}
if storage . StorageType == enum . S3 {
art . S3 = & Key {
2025-02-14 12:00:29 +01:00
Key : template . ReplacePerEnv ( rw . Destination + "/" + rw . FileName , linkToStorage . Env ) ,
Insecure : true , // temporary
}
sel := storage . GetSelectedInstance ( )
if sel != nil {
if sel . ( * resources . StorageResourceInstance ) . Credentials != nil {
tool , err := tools2 . NewService ( conf . GetConfig ( ) . Mode )
if err != nil || tool == nil {
logger . Error ( ) . Msg ( "Could not create the access secret" )
} else {
id , err := tool . CreateAccessSecret ( namespace ,
sel . ( * resources . StorageResourceInstance ) . Credentials . Login ,
sel . ( * resources . StorageResourceInstance ) . Credentials . Pass )
if err == nil {
art . S3 . AccessKeySecret = & Secret {
Name : id ,
Key : "access-key" ,
}
art . S3 . SecretKeySecret = & Secret {
Name : id ,
Key : "secret-key" ,
}
}
}
}
art . S3 . Key = strings . ReplaceAll ( art . S3 . Key , sel . ( * resources . StorageResourceInstance ) . Source + "/" , "" )
art . S3 . Key = strings . ReplaceAll ( art . S3 . Key , sel . ( * resources . StorageResourceInstance ) . Source , "" )
splits := strings . Split ( art . S3 . EndPoint , "/" )
if len ( splits ) > 1 {
art . S3 . Bucket = splits [ 0 ]
art . S3 . EndPoint = strings . Join ( splits [ 1 : ] , "/" )
} else {
art . S3 . Bucket = splits [ 0 ]
}
2025-02-05 08:36:26 +01:00
}
}
if rw . Write {
template . Outputs . Artifacts = append ( template . Inputs . Artifacts , art )
} else {
template . Inputs . Artifacts = append ( template . Outputs . Artifacts , art )
}
}
}
index := 0
if storage . SelectedInstanceIndex != nil && ( * storage . SelectedInstanceIndex ) >= 0 {
index = * storage . SelectedInstanceIndex
2025-01-13 14:05:21 +01:00
}
2025-02-05 08:36:26 +01:00
s := storage . Instances [ index ]
2025-01-13 14:05:21 +01:00
if s . Local {
2024-10-11 13:44:16 +02:00
volumes = template . Container . AddVolumeMount ( VolumeMount {
Name : strings . ReplaceAll ( strings . ToLower ( storage . GetName ( ) ) , " " , "-" ) ,
2025-01-13 14:05:21 +01:00
MountPath : s . Source ,
2025-02-05 08:36:26 +01:00
Storage : storage ,
2024-10-11 13:44:16 +02:00
} , volumes )
2024-08-02 13:34:39 +02:00
}
}
2024-10-11 13:44:16 +02:00
b . Workflow . Spec . Templates = append ( b . Workflow . Spec . Templates , * template )
return volumes , firstItems , lastItems
2024-08-02 13:34:39 +02:00
}
2025-01-13 14:05:21 +01:00
func ( b * ArgoBuilder ) addTaskToArgo ( dag * Dag , graphItemID string , processing * resources . ProcessingResource ,
2024-10-11 13:44:16 +02:00
firstItems [ ] string , lastItems [ ] string ) ( * Dag , [ ] string , [ ] string ) {
unique_name := getArgoName ( processing . GetName ( ) , graphItemID )
step := Task { Name : unique_name , Template : unique_name }
2025-02-05 08:36:26 +01:00
instance := processing . GetSelectedInstance ( )
if instance != nil {
for _ , value := range instance . ( * resources . ProcessingInstance ) . Env {
2024-10-11 13:44:16 +02:00
step . Arguments . Parameters = append ( step . Arguments . Parameters , Parameter {
2025-02-05 08:36:26 +01:00
Name : value . Name ,
Value : value . Value ,
} )
}
for _ , value := range instance . ( * resources . ProcessingInstance ) . Inputs {
step . Arguments . Parameters = append ( step . Arguments . Parameters , Parameter {
Name : value . Name ,
Value : value . Value ,
} )
}
for _ , value := range instance . ( * resources . ProcessingInstance ) . Outputs {
step . Arguments . Parameters = append ( step . Arguments . Parameters , Parameter {
Name : value . Name ,
Value : value . Value ,
2024-10-11 13:44:16 +02:00
} )
}
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
step . Dependencies = b . getArgoDependencies ( graphItemID )
name := ""
if b . OriginWorkflow . Graph . Items [ graphItemID ] . Processing != nil {
name = b . OriginWorkflow . Graph . Items [ graphItemID ] . Processing . GetName ( )
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
if b . OriginWorkflow . Graph . Items [ graphItemID ] . Workflow != nil {
name = b . OriginWorkflow . Graph . Items [ graphItemID ] . Workflow . GetName ( )
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
if len ( step . Dependencies ) == 0 && name != "" {
firstItems = append ( firstItems , getArgoName ( name , graphItemID ) )
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
if ok , _ := b . isArgoDependancy ( graphItemID ) ; ! ok && name != "" {
lastItems = append ( lastItems , getArgoName ( name , graphItemID ) )
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
dag . Tasks = append ( dag . Tasks , step )
return dag , firstItems , lastItems
}
2024-08-02 13:34:39 +02:00
2024-10-11 13:44:16 +02:00
func ( b * ArgoBuilder ) createVolumes ( volumes [ ] VolumeMount ) { // TODO : one think about remote volume but TG
for _ , volume := range volumes {
2025-02-05 08:36:26 +01:00
index := 0
if volume . Storage . SelectedInstanceIndex != nil && ( * volume . Storage . SelectedInstanceIndex ) >= 0 {
index = * volume . Storage . SelectedInstanceIndex
2025-01-13 14:05:21 +01:00
}
2025-02-05 08:36:26 +01:00
storage := volume . Storage . Instances [ index ]
2024-10-11 13:44:16 +02:00
new_volume := VolumeClaimTemplate { }
new_volume . Metadata . Name = strings . ReplaceAll ( strings . ToLower ( volume . Name ) , " " , "-" )
new_volume . Spec . AccessModes = [ ] string { "ReadWriteOnce" }
2025-01-13 14:05:21 +01:00
new_volume . Spec . Resources . Requests . Storage = fmt . Sprintf ( "%v" , storage . SizeGB ) + storage . SizeType . ToArgo ( )
2024-10-11 13:44:16 +02:00
b . Workflow . Spec . Volumes = append ( b . Workflow . Spec . Volumes , new_volume )
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
}
2024-08-02 13:34:39 +02:00
2024-10-11 13:44:16 +02:00
func ( b * ArgoBuilder ) isArgoDependancy ( id string ) ( bool , [ ] string ) {
dependancyOfIDs := [ ] string { }
isDeps := false
for _ , link := range b . OriginWorkflow . Graph . Links {
2025-02-06 08:55:28 +01:00
if _ , ok := b . OriginWorkflow . Graph . Items [ link . Destination . ID ] ; ! ok {
fmt . Println ( "Could not find the source of the link" , link . Destination . ID )
continue
}
2024-10-11 13:44:16 +02:00
source := b . OriginWorkflow . Graph . Items [ link . Destination . ID ] . Processing
if id == link . Source . ID && source != nil {
isDeps = true
dependancyOfIDs = append ( dependancyOfIDs , getArgoName ( source . GetName ( ) , link . Destination . ID ) )
}
wourceWF := b . OriginWorkflow . Graph . Items [ link . Destination . ID ] . Workflow
if id == link . Source . ID && wourceWF != nil {
isDeps = true
dependancyOfIDs = append ( dependancyOfIDs , getArgoName ( wourceWF . GetName ( ) , link . Destination . ID ) )
}
}
return isDeps , dependancyOfIDs
2024-08-02 13:34:39 +02:00
}
2024-10-11 13:44:16 +02:00
func ( b * ArgoBuilder ) getArgoDependencies ( id string ) ( dependencies [ ] string ) {
for _ , link := range b . OriginWorkflow . Graph . Links {
2025-02-05 08:36:26 +01:00
if _ , ok := b . OriginWorkflow . Graph . Items [ link . Source . ID ] ; ! ok {
fmt . Println ( "Could not find the source of the link" , link . Source . ID )
continue
}
2024-10-11 13:44:16 +02:00
source := b . OriginWorkflow . Graph . Items [ link . Source . ID ] . Processing
if id == link . Destination . ID && source != nil {
dependency_name := getArgoName ( source . GetName ( ) , link . Source . ID )
dependencies = append ( dependencies , dependency_name )
continue
}
}
2024-08-02 13:34:39 +02:00
return
}
func getArgoName ( raw_name string , component_id string ) ( formatedName string ) {
formatedName = strings . ReplaceAll ( raw_name , " " , "-" )
formatedName += "-" + component_id
formatedName = strings . ToLower ( formatedName )
return
}