update workflow execution status

This commit is contained in:
pb 2024-08-08 10:11:40 +02:00
parent 98b6cdaae5
commit 1871e067df
2 changed files with 61 additions and 56 deletions

113
main.go
View File

@ -10,7 +10,6 @@ import (
"regexp"
"strings"
"sync"
"time"
"oc-monitor/conf"
"oc-monitor/models"
@ -29,11 +28,11 @@ import (
var logger zerolog.Logger
var wf_logger zerolog.Logger
var pods_logger zerolog.Logger
var parser argparse.Parser
var monitorLocal bool
var workflowName string
const namespace = "-n argo"
const defaultConfigFile = "/etc/oc/ocmonitor_conf.json"
const localConfigFile = "./conf/ocmonitor_conf.json"
@ -105,7 +104,8 @@ func retrieveWorkflowId(exec_id string) string {
}
func executeWorkflow(argo_file_path string) {
var stdout, stderr io.ReadCloser
// var stdout, stderr, stdout_logs, stderr_logs io.ReadCloser
var stdout, stderr io.ReadCloser
// var stderr io.ReadCloser
var err error
@ -120,26 +120,50 @@ func executeWorkflow(argo_file_path string) {
return
}
// cmd_logs := exec.Command("argo","logs","oc-monitor-"+workflowName,"-f" , "-n", "argo")
// // if stdout_logs, err = cmd_logs.StdoutPipe(); err != nil{
// // wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error())
// // return
// // }
// // if stderr_logs, err = cmd_logs.StderrPipe(); err != nil{
// // wf_logger.Error().Msg("Could not retrieve stderrpipe " + err.Error())
// // return
// // }
if err := cmd.Start(); err != nil {
panic(err)
}
var wg sync.WaitGroup
panic(err)
}
// if err := cmd_logs.Start(); err != nil {
// panic(err)
// }
var wg sync.WaitGroup
go logWorkflow(stdout, &wg)
go logWorkflow(stderr,&wg)
// go logWorkflow(stderr,&wg)
time.Sleep(time.Second * 1)
go logPods(workflowName)
// time.Sleep(time.Second * 1)
// go logPods(stdout_logs,"out")
// go logPods(stderr_logs,"err")
if err := cmd.Wait(); err != nil {
wf_logger.Error().Msg("Could not execute argo submit")
wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderr).Text())
// updateStatus(exec_id, FATAL)
updateStatus("fatal")
}
// if err := cmd_logs.Wait(); err != nil {
// wf_logger.Error().Msg("Could not execute argo logs")
// wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stdout_logs).Text())
// wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderr_logs).Text())
// }
wg.Wait()
}
@ -150,58 +174,39 @@ func logWorkflow(pipe io.ReadCloser, wg *sync.WaitGroup) {
watch_output := make([]string,0)
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
log := scanner.Text()
watch_output = append(watch_output, log)
if(strings.HasPrefix(log, "Progress:")){
current_watch = *models.NewArgoLogs(watch_output)
fmt.Println("Status : " + current_watch.Status)
workflowName = current_watch.Name
if(!current_watch.Equals(previous_watch)){
wg.Add(1)
defer wg.Done()
if(current_watch.Status == "Succeeded"){
fmt.Print()
}
checkStatus(current_watch.Status, previous_watch.Status)
jsonified, err := json.Marshal(current_watch)
if err != nil {
logger.Error().Msg("Could not create watch log")
}
wf_logger.Info().Msg(string(jsonified))
previous_watch = current_watch
current_watch = models.ArgoWatch{}
for scanner.Scan() {
log := scanner.Text()
watch_output = append(watch_output, log)
if(strings.HasPrefix(log, "Progress:")){
current_watch = *models.NewArgoLogs(watch_output)
fmt.Println("Status : " + current_watch.Status)
workflowName = current_watch.Name
if(!current_watch.Equals(previous_watch)){
wg.Add(1)
checkStatus(current_watch.Status, previous_watch.Status)
jsonified, err := json.Marshal(current_watch)
if err != nil {
logger.Error().Msg("Could not create watch log")
}
wf_logger.Info().Msg(string(jsonified))
previous_watch = current_watch
current_watch = models.ArgoWatch{}
wg.Done()
}
}
}
}
// Debug, no logs sent
func logPods(workflow_name string){
var stderr io.ReadCloser
var err error
cmd := exec.Command("argo","logs",workflow_name, "-n", "argo")
if stderr, err = cmd.StderrPipe(); err != nil{
wf_logger.Error().Msg("Could not retrieve stderrpipe " + err.Error())
return
}
if err := cmd.Start(); err != nil {
panic(err)
}
scanner := bufio.NewScanner(stderr)
func logPods(pipe io.ReadCloser,name string){
pods_logger = wf_logger.With().Str("sortie name",name).Logger()
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
log := scanner.Text()
// fmt.Println(log)
wf_logger.Info().Msg(log)
}
if err := cmd.Wait(); err != nil {
wf_logger.Error().Msg("Could not execute argo logs")
pods_logger.Info().Msg(log)
}
}

View File

@ -27,7 +27,7 @@ type Workflow struct {
ApiVersion string `yaml:"apiVersion"`
Kind string `yaml:"kind"`
Metadata struct {
GenerateName string `yaml:"generateName"`
Name string `yaml:"Name"`
} `yaml:"metadata"`
Spec Spec `yaml:"spec,omitempty"`
}
@ -49,7 +49,7 @@ func (b *ArgoBuilder) CreateDAG() (string, error) {
b.Workflow.ApiVersion = "argoproj.io/v1alpha1"
b.Workflow.Kind = "Workflow"
random_name := generateWfName()
b.Workflow.Metadata.GenerateName = "oc-monitor-" + random_name
b.Workflow.Metadata.Name = "oc-monitor-" + random_name
yamlified, err := yaml.Marshal(b.Workflow)
if err != nil {