Corrected how pods generated by the argo workflow are logged

This commit is contained in:
pb 2025-05-20 09:25:54 +02:00
commit bdbbd7697a
2 changed files with 24 additions and 9 deletions

View File

@ -7,6 +7,8 @@ import (
"oc-monitord/tools" "oc-monitord/tools"
"oc-monitord/utils" "oc-monitord/utils"
"slices" "slices"
"strings"
"sync"
"time" "time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@ -113,6 +115,8 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface)
wfl := utils.GetWFLogger("") wfl := utils.GetWFLogger("")
wfl.Debug().Msg("Starting to log " + wfName) wfl.Debug().Msg("Starting to log " + wfName)
var wg sync.WaitGroup
for event := range (watcher.ResultChan()) { for event := range (watcher.ResultChan()) {
wf, ok := event.Object.(*wfv1.Workflow) wf, ok := event.Object.(*wfv1.Workflow)
@ -164,7 +168,9 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface)
if !slices.Contains(pods,pod.Name){ if !slices.Contains(pods,pod.Name){
pl := wfl.With().Str("pod", pod.Name).Logger() pl := wfl.With().Str("pod", pod.Name).Logger()
if wfName == pod.Name { pods = append(pods, pod.Name); continue } // One of the node is the Workflow, the others are the pods so don't try to log on the wf name if wfName == pod.Name { pods = append(pods, pod.Name); continue } // One of the node is the Workflow, the others are the pods so don't try to log on the wf name
go logKubernetesPods(namespace, wfName, pod.Name, pl) pl.Info().Msg("Found a new pod to log : " + pod.Name)
wg.Add(1)
go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg)
pods = append(pods, pod.Name) pods = append(pods, pod.Name)
} }
} }
@ -172,6 +178,7 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface)
// Stop listening to the chan when the Workflow is completed or something bad happened // Stop listening to the chan when the Workflow is completed or something bad happened
if node.Phase.Completed() { if node.Phase.Completed() {
wfl.Info().Msg(wfName + " worflow completed") wfl.Info().Msg(wfName + " worflow completed")
wg.Wait()
break break
} }
if node.Phase.FailedOrError() { if node.Phase.FailedOrError() {
@ -197,24 +204,30 @@ func retrieveCondition(wf *wfv1.Workflow) (c Conditions) {
} }
// Function needed to be executed as a go thread // Function needed to be executed as a go thread
func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger){ func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger, wg *sync.WaitGroup){
s := strings.Split(podName, ".")
name := s[0] + "-" + s[1]
step := s[1]
k, err := tools.NewKubernetesTool() k, err := tools.NewKubernetesTool()
if err != nil { if err != nil {
logger.Error().Msg("Could not get Kubernetes tools") logger.Error().Msg("Could not get Kubernetes tools")
return return
} }
reader, err := k.GetPodLogger(executionId, wfName, podName) reader, err := k.GetPodLogger(executionId, wfName, podName)
if err != nil { if err != nil {
logger.Error().Msg(err.Error()) logger.Error().Msg(err.Error())
return return
} }
scanner := bufio.NewScanner(reader) scanner := bufio.NewScanner(reader)
for scanner.Scan() { for scanner.Scan() {
log := scanner.Text() log := scanner.Text()
podLog := NewArgoPodLog(wfName,podName,log) podLog := NewArgoPodLog(name,step,log)
jsonified, _ := json.Marshal(podLog) jsonified, _ := json.Marshal(podLog)
logger.Info().Msg(string(jsonified)) logger.Info().Msg(string(jsonified))
} }
} }

View File

@ -132,17 +132,18 @@ func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch
func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) { func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) {
var targetPod v1.Pod var targetPod v1.Pod
pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{ pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{
LabelSelector: "workflows.argoproj.io/workflow="+wfName, LabelSelector: "workflows.argoproj.io/workflow="+wfName,
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to list pods: " + err.Error()) return nil, fmt.Errorf("failed to list pods: " + err.Error())
} }
if len(pods.Items) == 0 { if len(pods.Items) == 0 {
return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/workflow="+ wfName + " no pods found with label workflows.argoproj.io/node-name=" + nodeName + " in namespace " + ns) return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/workflow="+ wfName + " no pods found with label workflows.argoproj.io/node-name=" + nodeName + " in namespace " + ns)
} }
for _, pod := range pods.Items { for _, pod := range pods.Items {
if pod.Annotations["workflows.argoproj.io/node-name"] == nodeName { if pod.Annotations["workflows.argoproj.io/node-name"] == nodeName {
targetPod = pod targetPod = pod
@ -172,7 +173,8 @@ func (k *KubernetesTools) testPodReady(pod v1.Pod, ns string) {
var initialized bool var initialized bool
for _, cond := range pod.Status.Conditions { for _, cond := range pod.Status.Conditions {
if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue { // It seems that for remote pods the pod gets the Succeeded status before it has time to display the it is ready to run in .status.conditions,so we added the OR condition
if (cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue) || pod.Status.Phase == v1.PodSucceeded {
initialized = true initialized = true
return return
} }