diff --git a/main.go b/main.go index dc6658a..5b49c19 100644 --- a/main.go +++ b/main.go @@ -143,8 +143,8 @@ func executeOutside(argo_file_path string, stepMax int) { var stdout, stderr io.ReadCloser // var stderr io.ReadCloser var err error - logger.Debug().Msg("executing :" + "argo submit --log " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID) - cmd := exec.Command("argo", "submit", "--log", argo_file_path, "--serviceaccount", "sa-"+conf.GetConfig().ExecutionID, "-n", conf.GetConfig().ExecutionID ) + logger.Debug().Msg("executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID) + cmd := exec.Command("argo", "submit", "--watch", argo_file_path, "--serviceaccount", "sa-"+conf.GetConfig().ExecutionID, "-n", conf.GetConfig().ExecutionID ) if stdout, err = cmd.StdoutPipe(); err != nil { wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error()) return @@ -154,10 +154,11 @@ func executeOutside(argo_file_path string, stepMax int) { } var wg sync.WaitGroup split := strings.Split(argo_file_path, "_") - argoLogs := models.NewArgoLogs(split[0], "argo", stepMax) + argoLogs := models.NewArgoLogs(split[0], conf.GetConfig().ExecutionID, stepMax) argoLogs.StartStepRecording(argoLogs.NewWatch(), wf_logger) - argoLogs.IsStreaming = true - go logWorkflow(argo_file_path, stepMax, stdout, argoLogs.NewWatch(), argoLogs.NewWatch(), argoLogs, []string{}, &wg) + argoLogs.IsStreaming = true // Used to determine wether or not the logs are read from a docker container or on localhost + // go logWorkflow(argo_file_path, stepMax, stdout, argoLogs.NewWatch(), argoLogs.NewWatch(), argoLogs, []string{}, &wg) + go models.LogLocalWorkflow(stdout,&wg) if err := cmd.Wait(); err != nil { wf_logger.Error().Msg("Could not execute argo submit") @@ -167,6 +168,10 @@ func executeOutside(argo_file_path string, stepMax int) { wg.Wait() } +// !!!! BUGGED !!!! +// Should be refactored to create a function dedicated to logging output from execution in a container +// LogLocalWorkflow() has been implemented to be used when oc-monitord is executed locally + // We could improve this function by creating an object with the same attribute as the output // and only send a new log if the current object has different values than the previous func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser, @@ -183,12 +188,8 @@ func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser, wg.Add(1) } seeit++ - } else if count == 0 { - if argoLogs.IsStreaming { - continue - } else { - break - } + } else if count == 0 && !argoLogs.IsStreaming { + break } if count == 1 { see = log @@ -202,7 +203,7 @@ func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser, current_watch.Logs = append(current_watch.Logs, strings.ReplaceAll(log, "\"", "")) } count++ - if strings.Contains(log, "sub-process exited") { + if strings.Contains(log, "sub-process exited") || argoLogs.IsStreaming { current_watch = argoLogs.StopStepRecording(current_watch) argoLogs.Seen = append(argoLogs.Seen, see) if checkStatus(current_watch, previous_watch, argoLogs) { @@ -223,6 +224,9 @@ func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser, } previous_watch = current_watch current_watch = &models.ArgoWatch{} + if argoLogs.IsStreaming { + current_watch.Logs = []string{} + } } } } diff --git a/models/local_argo_pods.go b/models/local_argo_pods.go new file mode 100644 index 0000000..0fbcef3 --- /dev/null +++ b/models/local_argo_pods.go @@ -0,0 +1,112 @@ +package models + +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "oc-monitord/conf" + "strings" + "sync" + + "cloud.o-forge.io/core/oc-lib/logs" + "github.com/rs/zerolog" +) + +var logger zerolog.Logger +var wf_logger zerolog.Logger + +// Take the slice of string that make up one round of stderr outputs from the --watch option in argo submit +func NewLocalArgoLogs(inputs []string) *ArgoWatch { + var workflow ArgoWatch + + for _, input := range inputs { + line := strings.TrimSpace(input) + if line == "" { + continue + } + switch { + case strings.HasPrefix(line, "Name:"): + workflow.Name = parseValue(line) + case strings.HasPrefix(line, "Namespace:"): + workflow.Namespace = parseValue(line) + case strings.HasPrefix(line, "Status:"): + workflow.Status = parseValue(line) + case strings.HasPrefix(line, "PodRunning"): + workflow.PodRunning = parseBoolValue(line) + case strings.HasPrefix(line, "Completed"): + workflow.Completed = parseBoolValue(line) + case strings.HasPrefix(line, "Created:"): + workflow.Created = parseValue(line) + case strings.HasPrefix(line, "Started:"): + workflow.Started = parseValue(line) + case strings.HasPrefix(line, "Duration:"): + workflow.Duration = parseValue(line) + case strings.HasPrefix(line, "Progress:"): + workflow.Progress = parseValue(line) + } + } + + return &workflow +} + + + +func parseValue(line string) string { + parts := strings.SplitN(line, ":", 2) + if len(parts) < 2 { + return "" + } + return strings.TrimSpace(parts[1]) +} + +func parseBoolValue(line string) bool { + value := parseValue(line) + return value == "True" +} + +func LogLocalWorkflow(pipe io.ReadCloser, wg *sync.WaitGroup) { + logger = logs.GetLogger() + + logger.Debug().Msg("created wf_logger") + fmt.Println("created wf_logger") + wf_logger = logger.With().Str("argo_name", "MON WF DE TEST").Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger() + + var current_watch, previous_watch ArgoWatch + + watch_output := make([]string, 0) + scanner := bufio.NewScanner(pipe) + for scanner.Scan() { + log := scanner.Text() + watch_output = append(watch_output, log) + + if strings.HasPrefix(log, "Progress:") { + + current_watch = *NewLocalArgoLogs(watch_output) + workflowName := current_watch.Name + if !current_watch.Equals(&previous_watch) { + wg.Add(1) + // checkStatus(current_watch.Status, previous_watch.Status) + jsonified, err := json.Marshal(current_watch) + if err != nil { + logger.Error().Msg("Could not create watch log for " + workflowName) + } + wf_logger.Info().Msg(string(jsonified)) + previous_watch = current_watch + current_watch = ArgoWatch{} + wg.Done() + } + } + } +} + +// Debug, no logs sent +// func logPods(pipe io.ReadCloser, name string) { +// pods_logger = wf_logger.With().Str("pod_name", name).Logger() +// scanner := bufio.NewScanner(pipe) +// for scanner.Scan() { +// log := scanner.Text() +// pods_logger.Info().Msg(log) +// } + +// }