diff --git a/logger/argo_logs.go b/logger/argo_logs.go index 83d8bc6..3b0eeda 100644 --- a/logger/argo_logs.go +++ b/logger/argo_logs.go @@ -7,6 +7,8 @@ import ( "oc-monitord/tools" "oc-monitord/utils" "slices" + "strings" + "sync" "time" "github.com/rs/zerolog" @@ -113,6 +115,8 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) wfl := utils.GetWFLogger("") wfl.Debug().Msg("Starting to log " + wfName) + + var wg sync.WaitGroup for event := range (watcher.ResultChan()) { wf, ok := event.Object.(*wfv1.Workflow) @@ -164,7 +168,9 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) if !slices.Contains(pods,pod.Name){ pl := wfl.With().Str("pod", pod.Name).Logger() if wfName == pod.Name { pods = append(pods, pod.Name); continue } // One of the node is the Workflow, the others are the pods so don't try to log on the wf name - go logKubernetesPods(namespace, wfName, pod.Name, pl) + fmt.Println("Found a new pod to log : " + pod.Name) + wg.Add(1) + go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg) pods = append(pods, pod.Name) } } @@ -172,6 +178,7 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) // Stop listening to the chan when the Workflow is completed or something bad happened if node.Phase.Completed() { wfl.Info().Msg(wfName + " worflow completed") + wg.Wait() break } if node.Phase.FailedOrError() { @@ -197,24 +204,38 @@ func retrieveCondition(wf *wfv1.Workflow) (c Conditions) { } // Function needed to be executed as a go thread -func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger){ +func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger, wg *sync.WaitGroup){ + + s := strings.Split(podName, ".") + name := s[0] + "-" + s[1] + step := s[1] + k, err := tools.NewKubernetesTool() if err != nil { logger.Error().Msg("Could not get Kubernetes tools") return } - + reader, err := k.GetPodLogger(executionId, wfName, podName) if err != nil { logger.Error().Msg(err.Error()) return } - + + fmt.Println("=======================") + fmt.Println("Starting to log " + name) + fmt.Println("=======================") + scanner := bufio.NewScanner(reader) for scanner.Scan() { log := scanner.Text() - podLog := NewArgoPodLog(wfName,podName,log) + podLog := NewArgoPodLog(name,step,log) jsonified, _ := json.Marshal(podLog) logger.Info().Msg(string(jsonified)) } + + wg.Done() + fmt.Println("=======================") + fmt.Println("Finishing to log " + name) + fmt.Println("=======================") } \ No newline at end of file diff --git a/tools/kubernetes.go b/tools/kubernetes.go index 1e616bd..e3b2ce8 100644 --- a/tools/kubernetes.go +++ b/tools/kubernetes.go @@ -132,17 +132,19 @@ func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) { var targetPod v1.Pod + pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{ - LabelSelector: "workflows.argoproj.io/workflow="+wfName, + LabelSelector: "workflows.argoproj.io/workflow="+wfName, }) if err != nil { return nil, fmt.Errorf("failed to list pods: " + err.Error()) } if len(pods.Items) == 0 { - + return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/workflow="+ wfName + " no pods found with label workflows.argoproj.io/node-name=" + nodeName + " in namespace " + ns) } - + + fmt.Println("Searching for workflows.argoproj.io/node-name=" + nodeName) for _, pod := range pods.Items { if pod.Annotations["workflows.argoproj.io/node-name"] == nodeName { targetPod = pod @@ -171,9 +173,12 @@ func (k *KubernetesTools) testPodReady(pod v1.Pod, ns string) { } var initialized bool + fmt.Println("============= \n " + pod.Name + " not ready yet \n==============") for _, cond := range pod.Status.Conditions { - if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue { + // It seems that for remote pods the pod gets the Succeeded status before it has time to display the it is ready to run in .status.conditions,so we added the OR condition + if (cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue) || pod.Status.Phase == v1.PodSucceeded { initialized = true + fmt.Println("============= \n " + pod.Name + " ready \n==============") return } }