reimplemented logging of wf when executed locally
This commit is contained in:
		
							
								
								
									
										28
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										28
									
								
								main.go
									
									
									
									
									
								
							@@ -143,8 +143,8 @@ func executeOutside(argo_file_path string, stepMax int) {
 | 
			
		||||
	var stdout, stderr io.ReadCloser
 | 
			
		||||
	// var stderr 	io.ReadCloser
 | 
			
		||||
	var err error
 | 
			
		||||
	logger.Debug().Msg("executing :" + "argo submit --log " +  argo_file_path +  " --serviceaccount sa-" + conf.GetConfig().ExecutionID +  " -n "  +  conf.GetConfig().ExecutionID)
 | 
			
		||||
	cmd := exec.Command("argo", "submit", "--log", argo_file_path, "--serviceaccount", "sa-"+conf.GetConfig().ExecutionID, "-n", conf.GetConfig().ExecutionID )
 | 
			
		||||
	logger.Debug().Msg("executing :" + "argo submit --watch " +  argo_file_path +  " --serviceaccount sa-" + conf.GetConfig().ExecutionID +  " -n "  +  conf.GetConfig().ExecutionID)
 | 
			
		||||
	cmd := exec.Command("argo", "submit", "--watch", argo_file_path, "--serviceaccount", "sa-"+conf.GetConfig().ExecutionID, "-n", conf.GetConfig().ExecutionID )
 | 
			
		||||
	if stdout, err = cmd.StdoutPipe(); err != nil {
 | 
			
		||||
		wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error())
 | 
			
		||||
		return
 | 
			
		||||
@@ -154,10 +154,11 @@ func executeOutside(argo_file_path string, stepMax int) {
 | 
			
		||||
	}
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	split := strings.Split(argo_file_path, "_")
 | 
			
		||||
	argoLogs := models.NewArgoLogs(split[0], "argo", stepMax)
 | 
			
		||||
	argoLogs := models.NewArgoLogs(split[0], conf.GetConfig().ExecutionID, stepMax)
 | 
			
		||||
	argoLogs.StartStepRecording(argoLogs.NewWatch(), wf_logger)
 | 
			
		||||
	argoLogs.IsStreaming = true
 | 
			
		||||
	go logWorkflow(argo_file_path, stepMax, stdout, argoLogs.NewWatch(), argoLogs.NewWatch(), argoLogs, []string{}, &wg)
 | 
			
		||||
	argoLogs.IsStreaming = true	// Used to determine wether or not the logs are read from a docker container or on localhost
 | 
			
		||||
	// go logWorkflow(argo_file_path, stepMax, stdout, argoLogs.NewWatch(), argoLogs.NewWatch(), argoLogs, []string{}, &wg)
 | 
			
		||||
	go models.LogLocalWorkflow(stdout,&wg)
 | 
			
		||||
 | 
			
		||||
	if err := cmd.Wait(); err != nil {
 | 
			
		||||
		wf_logger.Error().Msg("Could not execute argo submit")
 | 
			
		||||
@@ -167,6 +168,10 @@ func executeOutside(argo_file_path string, stepMax int) {
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// !!!! BUGGED !!!!
 | 
			
		||||
// Should be refactored to create a function dedicated to logging output from execution in a container
 | 
			
		||||
// LogLocalWorkflow() has been implemented to be used when oc-monitord is executed locally
 | 
			
		||||
 | 
			
		||||
// We could improve this function by creating an object with the same attribute as the output
 | 
			
		||||
// and only send a new log if the current object has different values than the previous
 | 
			
		||||
func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser,
 | 
			
		||||
@@ -183,12 +188,8 @@ func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser,
 | 
			
		||||
				wg.Add(1)
 | 
			
		||||
			}
 | 
			
		||||
			seeit++
 | 
			
		||||
		} else if count == 0 {
 | 
			
		||||
			if argoLogs.IsStreaming {
 | 
			
		||||
				continue
 | 
			
		||||
			} else {
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
		} else if count == 0 && !argoLogs.IsStreaming  {
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		if count == 1 {
 | 
			
		||||
			see = log
 | 
			
		||||
@@ -202,7 +203,7 @@ func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser,
 | 
			
		||||
			current_watch.Logs = append(current_watch.Logs, strings.ReplaceAll(log, "\"", ""))
 | 
			
		||||
		}
 | 
			
		||||
		count++
 | 
			
		||||
		if strings.Contains(log, "sub-process exited") {
 | 
			
		||||
		if strings.Contains(log, "sub-process exited") || argoLogs.IsStreaming {
 | 
			
		||||
			current_watch = argoLogs.StopStepRecording(current_watch)
 | 
			
		||||
			argoLogs.Seen = append(argoLogs.Seen, see)
 | 
			
		||||
			if checkStatus(current_watch, previous_watch, argoLogs) {
 | 
			
		||||
@@ -223,6 +224,9 @@ func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser,
 | 
			
		||||
			}
 | 
			
		||||
			previous_watch = current_watch
 | 
			
		||||
			current_watch = &models.ArgoWatch{}
 | 
			
		||||
			if argoLogs.IsStreaming {
 | 
			
		||||
				current_watch.Logs = []string{}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										112
									
								
								models/local_argo_pods.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										112
									
								
								models/local_argo_pods.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,112 @@
 | 
			
		||||
package models
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
	"oc-monitord/conf"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"cloud.o-forge.io/core/oc-lib/logs"
 | 
			
		||||
	"github.com/rs/zerolog"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var logger zerolog.Logger
 | 
			
		||||
var wf_logger zerolog.Logger
 | 
			
		||||
 | 
			
		||||
// Take the slice of string that make up one round of stderr outputs from the --watch option in argo submit
 | 
			
		||||
func NewLocalArgoLogs(inputs []string) *ArgoWatch {
 | 
			
		||||
	var workflow ArgoWatch
 | 
			
		||||
 | 
			
		||||
	for _, input := range inputs {
 | 
			
		||||
		line := strings.TrimSpace(input)
 | 
			
		||||
		if line == "" {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		switch {
 | 
			
		||||
		case strings.HasPrefix(line, "Name:"):
 | 
			
		||||
			workflow.Name = parseValue(line)
 | 
			
		||||
		case strings.HasPrefix(line, "Namespace:"):
 | 
			
		||||
			workflow.Namespace = parseValue(line)
 | 
			
		||||
		case strings.HasPrefix(line, "Status:"):
 | 
			
		||||
			workflow.Status = parseValue(line)
 | 
			
		||||
		case strings.HasPrefix(line, "PodRunning"):
 | 
			
		||||
			workflow.PodRunning = parseBoolValue(line)
 | 
			
		||||
		case strings.HasPrefix(line, "Completed"):
 | 
			
		||||
			workflow.Completed = parseBoolValue(line)
 | 
			
		||||
		case strings.HasPrefix(line, "Created:"):
 | 
			
		||||
			workflow.Created = parseValue(line)
 | 
			
		||||
		case strings.HasPrefix(line, "Started:"):
 | 
			
		||||
			workflow.Started = parseValue(line)
 | 
			
		||||
		case strings.HasPrefix(line, "Duration:"):
 | 
			
		||||
			workflow.Duration = parseValue(line)
 | 
			
		||||
		case strings.HasPrefix(line, "Progress:"):
 | 
			
		||||
			workflow.Progress = parseValue(line)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return &workflow
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
func parseValue(line string) string {
 | 
			
		||||
	parts := strings.SplitN(line, ":", 2)
 | 
			
		||||
	if len(parts) < 2 {
 | 
			
		||||
		return ""
 | 
			
		||||
	}
 | 
			
		||||
	return strings.TrimSpace(parts[1])
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func parseBoolValue(line string) bool {
 | 
			
		||||
	value := parseValue(line)
 | 
			
		||||
	return value == "True"
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func LogLocalWorkflow(pipe io.ReadCloser, wg *sync.WaitGroup) {
 | 
			
		||||
	logger = logs.GetLogger()
 | 
			
		||||
 | 
			
		||||
	logger.Debug().Msg("created wf_logger")
 | 
			
		||||
	fmt.Println("created wf_logger")
 | 
			
		||||
	wf_logger = logger.With().Str("argo_name", "MON WF DE TEST").Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger()
 | 
			
		||||
 | 
			
		||||
	var current_watch, previous_watch ArgoWatch
 | 
			
		||||
 | 
			
		||||
	watch_output := make([]string, 0)
 | 
			
		||||
	scanner := bufio.NewScanner(pipe)
 | 
			
		||||
	for scanner.Scan() {
 | 
			
		||||
		log := scanner.Text()
 | 
			
		||||
		watch_output = append(watch_output, log)
 | 
			
		||||
 | 
			
		||||
		if strings.HasPrefix(log, "Progress:") {
 | 
			
		||||
 | 
			
		||||
			current_watch = *NewLocalArgoLogs(watch_output)
 | 
			
		||||
			workflowName := current_watch.Name
 | 
			
		||||
			if !current_watch.Equals(&previous_watch) {
 | 
			
		||||
				wg.Add(1)
 | 
			
		||||
				// checkStatus(current_watch.Status, previous_watch.Status)
 | 
			
		||||
				jsonified, err := json.Marshal(current_watch)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					logger.Error().Msg("Could not create watch log for " + workflowName)
 | 
			
		||||
				}
 | 
			
		||||
				wf_logger.Info().Msg(string(jsonified))
 | 
			
		||||
				previous_watch = current_watch
 | 
			
		||||
				current_watch = ArgoWatch{}
 | 
			
		||||
				wg.Done()
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Debug, no logs sent
 | 
			
		||||
// func logPods(pipe io.ReadCloser, name string) {
 | 
			
		||||
// 	pods_logger = wf_logger.With().Str("pod_name", name).Logger()
 | 
			
		||||
// 	scanner := bufio.NewScanner(pipe)
 | 
			
		||||
// 	for scanner.Scan() {
 | 
			
		||||
// 		log := scanner.Text()
 | 
			
		||||
// 		pods_logger.Info().Msg(log)
 | 
			
		||||
// 	}
 | 
			
		||||
 | 
			
		||||
// }
 | 
			
		||||
		Reference in New Issue
	
	Block a user