From e1b0ad089c39d42baf17b8972b636e94b90d3bcf Mon Sep 17 00:00:00 2001 From: pb Date: Mon, 19 May 2025 18:56:41 +0200 Subject: [PATCH 1/2] modified the conditions to start the kubernetes logs action of pods and added waitingroups to the logging method in order to wait for the logs to be retrieved before stopping the daemon --- logger/argo_logs.go | 31 ++++++++++++++++++++++++++----- tools/kubernetes.go | 13 +++++++++---- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/logger/argo_logs.go b/logger/argo_logs.go index 83d8bc6..3b0eeda 100644 --- a/logger/argo_logs.go +++ b/logger/argo_logs.go @@ -7,6 +7,8 @@ import ( "oc-monitord/tools" "oc-monitord/utils" "slices" + "strings" + "sync" "time" "github.com/rs/zerolog" @@ -113,6 +115,8 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) wfl := utils.GetWFLogger("") wfl.Debug().Msg("Starting to log " + wfName) + + var wg sync.WaitGroup for event := range (watcher.ResultChan()) { wf, ok := event.Object.(*wfv1.Workflow) @@ -164,7 +168,9 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) if !slices.Contains(pods,pod.Name){ pl := wfl.With().Str("pod", pod.Name).Logger() if wfName == pod.Name { pods = append(pods, pod.Name); continue } // One of the node is the Workflow, the others are the pods so don't try to log on the wf name - go logKubernetesPods(namespace, wfName, pod.Name, pl) + fmt.Println("Found a new pod to log : " + pod.Name) + wg.Add(1) + go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg) pods = append(pods, pod.Name) } } @@ -172,6 +178,7 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) // Stop listening to the chan when the Workflow is completed or something bad happened if node.Phase.Completed() { wfl.Info().Msg(wfName + " worflow completed") + wg.Wait() break } if node.Phase.FailedOrError() { @@ -197,24 +204,38 @@ func retrieveCondition(wf *wfv1.Workflow) (c Conditions) { } // Function needed to be executed as a go thread -func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger){ +func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger, wg *sync.WaitGroup){ + + s := strings.Split(podName, ".") + name := s[0] + "-" + s[1] + step := s[1] + k, err := tools.NewKubernetesTool() if err != nil { logger.Error().Msg("Could not get Kubernetes tools") return } - + reader, err := k.GetPodLogger(executionId, wfName, podName) if err != nil { logger.Error().Msg(err.Error()) return } - + + fmt.Println("=======================") + fmt.Println("Starting to log " + name) + fmt.Println("=======================") + scanner := bufio.NewScanner(reader) for scanner.Scan() { log := scanner.Text() - podLog := NewArgoPodLog(wfName,podName,log) + podLog := NewArgoPodLog(name,step,log) jsonified, _ := json.Marshal(podLog) logger.Info().Msg(string(jsonified)) } + + wg.Done() + fmt.Println("=======================") + fmt.Println("Finishing to log " + name) + fmt.Println("=======================") } \ No newline at end of file diff --git a/tools/kubernetes.go b/tools/kubernetes.go index 1e616bd..e3b2ce8 100644 --- a/tools/kubernetes.go +++ b/tools/kubernetes.go @@ -132,17 +132,19 @@ func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) { var targetPod v1.Pod + pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{ - LabelSelector: "workflows.argoproj.io/workflow="+wfName, + LabelSelector: "workflows.argoproj.io/workflow="+wfName, }) if err != nil { return nil, fmt.Errorf("failed to list pods: " + err.Error()) } if len(pods.Items) == 0 { - + return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/workflow="+ wfName + " no pods found with label workflows.argoproj.io/node-name=" + nodeName + " in namespace " + ns) } - + + fmt.Println("Searching for workflows.argoproj.io/node-name=" + nodeName) for _, pod := range pods.Items { if pod.Annotations["workflows.argoproj.io/node-name"] == nodeName { targetPod = pod @@ -171,9 +173,12 @@ func (k *KubernetesTools) testPodReady(pod v1.Pod, ns string) { } var initialized bool + fmt.Println("============= \n " + pod.Name + " not ready yet \n==============") for _, cond := range pod.Status.Conditions { - if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue { + // It seems that for remote pods the pod gets the Succeeded status before it has time to display the it is ready to run in .status.conditions,so we added the OR condition + if (cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue) || pod.Status.Phase == v1.PodSucceeded { initialized = true + fmt.Println("============= \n " + pod.Name + " ready \n==============") return } } From 6917295fbd60d89fa940780d64637aa042a8e6e9 Mon Sep 17 00:00:00 2001 From: pb Date: Mon, 19 May 2025 18:58:33 +0200 Subject: [PATCH 2/2] removed the debug comments --- logger/argo_logs.go | 10 +--------- tools/kubernetes.go | 3 --- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/logger/argo_logs.go b/logger/argo_logs.go index 3b0eeda..547ab0e 100644 --- a/logger/argo_logs.go +++ b/logger/argo_logs.go @@ -168,7 +168,7 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) if !slices.Contains(pods,pod.Name){ pl := wfl.With().Str("pod", pod.Name).Logger() if wfName == pod.Name { pods = append(pods, pod.Name); continue } // One of the node is the Workflow, the others are the pods so don't try to log on the wf name - fmt.Println("Found a new pod to log : " + pod.Name) + pl.Info().Msg("Found a new pod to log : " + pod.Name) wg.Add(1) go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg) pods = append(pods, pod.Name) @@ -222,10 +222,6 @@ func logKubernetesPods(executionId string, wfName string,podName string, logger return } - fmt.Println("=======================") - fmt.Println("Starting to log " + name) - fmt.Println("=======================") - scanner := bufio.NewScanner(reader) for scanner.Scan() { log := scanner.Text() @@ -234,8 +230,4 @@ func logKubernetesPods(executionId string, wfName string,podName string, logger logger.Info().Msg(string(jsonified)) } - wg.Done() - fmt.Println("=======================") - fmt.Println("Finishing to log " + name) - fmt.Println("=======================") } \ No newline at end of file diff --git a/tools/kubernetes.go b/tools/kubernetes.go index e3b2ce8..d46e45d 100644 --- a/tools/kubernetes.go +++ b/tools/kubernetes.go @@ -144,7 +144,6 @@ func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/workflow="+ wfName + " no pods found with label workflows.argoproj.io/node-name=" + nodeName + " in namespace " + ns) } - fmt.Println("Searching for workflows.argoproj.io/node-name=" + nodeName) for _, pod := range pods.Items { if pod.Annotations["workflows.argoproj.io/node-name"] == nodeName { targetPod = pod @@ -173,12 +172,10 @@ func (k *KubernetesTools) testPodReady(pod v1.Pod, ns string) { } var initialized bool - fmt.Println("============= \n " + pod.Name + " not ready yet \n==============") for _, cond := range pod.Status.Conditions { // It seems that for remote pods the pod gets the Succeeded status before it has time to display the it is ready to run in .status.conditions,so we added the OR condition if (cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue) || pod.Status.Phase == v1.PodSucceeded { initialized = true - fmt.Println("============= \n " + pod.Name + " ready \n==============") return } }