From 1871e067df9b5962898a7f062c8f091488199e67 Mon Sep 17 00:00:00 2001 From: pb Date: Thu, 8 Aug 2024 10:11:40 +0200 Subject: [PATCH] update workflow execution status --- main.go | 113 ++++++++++++++++--------------- workflow_builder/argo_builder.go | 4 +- 2 files changed, 61 insertions(+), 56 deletions(-) diff --git a/main.go b/main.go index 67c67c4..ff479e8 100644 --- a/main.go +++ b/main.go @@ -10,7 +10,6 @@ import ( "regexp" "strings" "sync" - "time" "oc-monitor/conf" "oc-monitor/models" @@ -29,11 +28,11 @@ import ( var logger zerolog.Logger var wf_logger zerolog.Logger +var pods_logger zerolog.Logger var parser argparse.Parser var monitorLocal bool var workflowName string -const namespace = "-n argo" const defaultConfigFile = "/etc/oc/ocmonitor_conf.json" const localConfigFile = "./conf/ocmonitor_conf.json" @@ -105,7 +104,8 @@ func retrieveWorkflowId(exec_id string) string { } func executeWorkflow(argo_file_path string) { - var stdout, stderr io.ReadCloser + // var stdout, stderr, stdout_logs, stderr_logs io.ReadCloser + var stdout, stderr io.ReadCloser // var stderr io.ReadCloser var err error @@ -120,26 +120,50 @@ func executeWorkflow(argo_file_path string) { return } + // cmd_logs := exec.Command("argo","logs","oc-monitor-"+workflowName,"-f" , "-n", "argo") + + // // if stdout_logs, err = cmd_logs.StdoutPipe(); err != nil{ + // // wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error()) + // // return + // // } + + // // if stderr_logs, err = cmd_logs.StderrPipe(); err != nil{ + // // wf_logger.Error().Msg("Could not retrieve stderrpipe " + err.Error()) + // // return + // // } + if err := cmd.Start(); err != nil { - panic(err) - } - - var wg sync.WaitGroup + panic(err) + } + // if err := cmd_logs.Start(); err != nil { + // panic(err) + // } + + + var wg sync.WaitGroup go logWorkflow(stdout, &wg) - go logWorkflow(stderr,&wg) + // go logWorkflow(stderr,&wg) - time.Sleep(time.Second * 1) - go logPods(workflowName) + // time.Sleep(time.Second * 1) + // go logPods(stdout_logs,"out") + // go logPods(stderr_logs,"err") if err := cmd.Wait(); err != nil { wf_logger.Error().Msg("Could not execute argo submit") wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderr).Text()) - // updateStatus(exec_id, FATAL) + updateStatus("fatal") } + // if err := cmd_logs.Wait(); err != nil { + // wf_logger.Error().Msg("Could not execute argo logs") + // wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stdout_logs).Text()) + // wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderr_logs).Text()) + // } + + wg.Wait() } @@ -150,58 +174,39 @@ func logWorkflow(pipe io.ReadCloser, wg *sync.WaitGroup) { watch_output := make([]string,0) scanner := bufio.NewScanner(pipe) - for scanner.Scan() { - log := scanner.Text() - watch_output = append(watch_output, log) - - if(strings.HasPrefix(log, "Progress:")){ - current_watch = *models.NewArgoLogs(watch_output) - fmt.Println("Status : " + current_watch.Status) - workflowName = current_watch.Name - if(!current_watch.Equals(previous_watch)){ - wg.Add(1) - defer wg.Done() - if(current_watch.Status == "Succeeded"){ - fmt.Print() - } - checkStatus(current_watch.Status, previous_watch.Status) - jsonified, err := json.Marshal(current_watch) - if err != nil { - logger.Error().Msg("Could not create watch log") - } - wf_logger.Info().Msg(string(jsonified)) - previous_watch = current_watch - current_watch = models.ArgoWatch{} + for scanner.Scan() { + log := scanner.Text() + watch_output = append(watch_output, log) + + if(strings.HasPrefix(log, "Progress:")){ + + current_watch = *models.NewArgoLogs(watch_output) + fmt.Println("Status : " + current_watch.Status) + workflowName = current_watch.Name + if(!current_watch.Equals(previous_watch)){ + wg.Add(1) + checkStatus(current_watch.Status, previous_watch.Status) + jsonified, err := json.Marshal(current_watch) + if err != nil { + logger.Error().Msg("Could not create watch log") } + wf_logger.Info().Msg(string(jsonified)) + previous_watch = current_watch + current_watch = models.ArgoWatch{} + wg.Done() } } + } } // Debug, no logs sent -func logPods(workflow_name string){ - var stderr io.ReadCloser - var err error - - cmd := exec.Command("argo","logs",workflow_name, "-n", "argo") - - if stderr, err = cmd.StderrPipe(); err != nil{ - wf_logger.Error().Msg("Could not retrieve stderrpipe " + err.Error()) - return - } - - if err := cmd.Start(); err != nil { - panic(err) - } - - scanner := bufio.NewScanner(stderr) +func logPods(pipe io.ReadCloser,name string){ + pods_logger = wf_logger.With().Str("sortie name",name).Logger() + scanner := bufio.NewScanner(pipe) for scanner.Scan() { log := scanner.Text() // fmt.Println(log) - wf_logger.Info().Msg(log) - } - - if err := cmd.Wait(); err != nil { - wf_logger.Error().Msg("Could not execute argo logs") + pods_logger.Info().Msg(log) } } diff --git a/workflow_builder/argo_builder.go b/workflow_builder/argo_builder.go index b19006f..3e5c0f4 100644 --- a/workflow_builder/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -27,7 +27,7 @@ type Workflow struct { ApiVersion string `yaml:"apiVersion"` Kind string `yaml:"kind"` Metadata struct { - GenerateName string `yaml:"generateName"` + Name string `yaml:"Name"` } `yaml:"metadata"` Spec Spec `yaml:"spec,omitempty"` } @@ -49,7 +49,7 @@ func (b *ArgoBuilder) CreateDAG() (string, error) { b.Workflow.ApiVersion = "argoproj.io/v1alpha1" b.Workflow.Kind = "Workflow" random_name := generateWfName() - b.Workflow.Metadata.GenerateName = "oc-monitor-" + random_name + b.Workflow.Metadata.Name = "oc-monitor-" + random_name yamlified, err := yaml.Marshal(b.Workflow) if err != nil {