Implemented logging of local execution of argo submit --watch and logs produced by pods

This commit is contained in:
pb
2025-04-09 18:59:37 +02:00
parent 5d8143c93e
commit c31184e2ec
2 changed files with 95 additions and 35 deletions

View File

@@ -14,7 +14,8 @@ import (
)
var logger zerolog.Logger
var wf_logger zerolog.Logger
var wfLogger zerolog.Logger
// Take the slice of string that make up one round of stderr outputs from the --watch option in argo submit
func NewLocalArgoLogs(inputs []string) *ArgoWatch {
@@ -65,12 +66,12 @@ func parseBoolValue(line string) bool {
return value == "True"
}
func LogLocalWorkflow(pipe io.ReadCloser, wg *sync.WaitGroup) {
func LogLocalWorkflow(wfName string, pipe io.ReadCloser, wg *sync.WaitGroup) {
logger = logs.GetLogger()
logger.Debug().Msg("created wf_logger")
fmt.Println("created wf_logger")
wf_logger = logger.With().Str("argo_name", "MON WF DE TEST").Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger()
wfLogger = logger.With().Str("argo_name", wfName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger()
var current_watch, previous_watch ArgoWatch
@@ -80,6 +81,7 @@ func LogLocalWorkflow(pipe io.ReadCloser, wg *sync.WaitGroup) {
log := scanner.Text()
watch_output = append(watch_output, log)
// Log the progress of the WF
if strings.HasPrefix(log, "Progress:") {
current_watch = *NewLocalArgoLogs(watch_output)
@@ -91,22 +93,43 @@ func LogLocalWorkflow(pipe io.ReadCloser, wg *sync.WaitGroup) {
if err != nil {
logger.Error().Msg("Could not create watch log for " + workflowName)
}
wf_logger.Info().Msg(string(jsonified))
wfLogger.Info().Msg(string(jsonified))
previous_watch = current_watch
current_watch = ArgoWatch{}
wg.Done()
}
}
}
}
// Debug, no logs sent
// func logPods(pipe io.ReadCloser, name string) {
// pods_logger = wf_logger.With().Str("pod_name", name).Logger()
// scanner := bufio.NewScanner(pipe)
// for scanner.Scan() {
// log := scanner.Text()
// pods_logger.Info().Msg(log)
// }
func LogPods(pipe io.ReadCloser, steps []string, wg *sync.WaitGroup) {
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
fmt.Println("new line")
wg.Add(1)
var podLogger zerolog.Logger
line := scanner.Text()
podName := strings.Split(line, ":")[0]
podLogger = wfLogger.With().Str("step_name", getStepName(podName, steps)).Logger()
log := strings.Split(line,podName+":")[1]
podLogger.Info().Msg(log)
wg.Done()
}
}
func getStepName(podName string, steps []string) string {
for _, step := range(steps) {
if strings.Contains(podName,step){
return step
}
}
return "error"
}
// }