modified the conditions to start the kubernetes logs action of pods and added waitingroups to the logging method in order to wait for the logs to be retrieved before stopping the daemon
This commit is contained in:
parent
03675d09ae
commit
e1b0ad089c
@ -7,6 +7,8 @@ import (
|
|||||||
"oc-monitord/tools"
|
"oc-monitord/tools"
|
||||||
"oc-monitord/utils"
|
"oc-monitord/utils"
|
||||||
"slices"
|
"slices"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
@ -113,6 +115,8 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface)
|
|||||||
|
|
||||||
wfl := utils.GetWFLogger("")
|
wfl := utils.GetWFLogger("")
|
||||||
wfl.Debug().Msg("Starting to log " + wfName)
|
wfl.Debug().Msg("Starting to log " + wfName)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
for event := range (watcher.ResultChan()) {
|
for event := range (watcher.ResultChan()) {
|
||||||
wf, ok := event.Object.(*wfv1.Workflow)
|
wf, ok := event.Object.(*wfv1.Workflow)
|
||||||
@ -164,7 +168,9 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface)
|
|||||||
if !slices.Contains(pods,pod.Name){
|
if !slices.Contains(pods,pod.Name){
|
||||||
pl := wfl.With().Str("pod", pod.Name).Logger()
|
pl := wfl.With().Str("pod", pod.Name).Logger()
|
||||||
if wfName == pod.Name { pods = append(pods, pod.Name); continue } // One of the node is the Workflow, the others are the pods so don't try to log on the wf name
|
if wfName == pod.Name { pods = append(pods, pod.Name); continue } // One of the node is the Workflow, the others are the pods so don't try to log on the wf name
|
||||||
go logKubernetesPods(namespace, wfName, pod.Name, pl)
|
fmt.Println("Found a new pod to log : " + pod.Name)
|
||||||
|
wg.Add(1)
|
||||||
|
go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg)
|
||||||
pods = append(pods, pod.Name)
|
pods = append(pods, pod.Name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -172,6 +178,7 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface)
|
|||||||
// Stop listening to the chan when the Workflow is completed or something bad happened
|
// Stop listening to the chan when the Workflow is completed or something bad happened
|
||||||
if node.Phase.Completed() {
|
if node.Phase.Completed() {
|
||||||
wfl.Info().Msg(wfName + " worflow completed")
|
wfl.Info().Msg(wfName + " worflow completed")
|
||||||
|
wg.Wait()
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if node.Phase.FailedOrError() {
|
if node.Phase.FailedOrError() {
|
||||||
@ -197,24 +204,38 @@ func retrieveCondition(wf *wfv1.Workflow) (c Conditions) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Function needed to be executed as a go thread
|
// Function needed to be executed as a go thread
|
||||||
func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger){
|
func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger, wg *sync.WaitGroup){
|
||||||
|
|
||||||
|
s := strings.Split(podName, ".")
|
||||||
|
name := s[0] + "-" + s[1]
|
||||||
|
step := s[1]
|
||||||
|
|
||||||
k, err := tools.NewKubernetesTool()
|
k, err := tools.NewKubernetesTool()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error().Msg("Could not get Kubernetes tools")
|
logger.Error().Msg("Could not get Kubernetes tools")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
reader, err := k.GetPodLogger(executionId, wfName, podName)
|
reader, err := k.GetPodLogger(executionId, wfName, podName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error().Msg(err.Error())
|
logger.Error().Msg(err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fmt.Println("=======================")
|
||||||
|
fmt.Println("Starting to log " + name)
|
||||||
|
fmt.Println("=======================")
|
||||||
|
|
||||||
scanner := bufio.NewScanner(reader)
|
scanner := bufio.NewScanner(reader)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
log := scanner.Text()
|
log := scanner.Text()
|
||||||
podLog := NewArgoPodLog(wfName,podName,log)
|
podLog := NewArgoPodLog(name,step,log)
|
||||||
jsonified, _ := json.Marshal(podLog)
|
jsonified, _ := json.Marshal(podLog)
|
||||||
logger.Info().Msg(string(jsonified))
|
logger.Info().Msg(string(jsonified))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wg.Done()
|
||||||
|
fmt.Println("=======================")
|
||||||
|
fmt.Println("Finishing to log " + name)
|
||||||
|
fmt.Println("=======================")
|
||||||
}
|
}
|
@ -132,17 +132,19 @@ func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch
|
|||||||
func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) {
|
func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) {
|
||||||
var targetPod v1.Pod
|
var targetPod v1.Pod
|
||||||
|
|
||||||
|
|
||||||
pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{
|
pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{
|
||||||
LabelSelector: "workflows.argoproj.io/workflow="+wfName,
|
LabelSelector: "workflows.argoproj.io/workflow="+wfName,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to list pods: " + err.Error())
|
return nil, fmt.Errorf("failed to list pods: " + err.Error())
|
||||||
}
|
}
|
||||||
if len(pods.Items) == 0 {
|
if len(pods.Items) == 0 {
|
||||||
|
|
||||||
return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/workflow="+ wfName + " no pods found with label workflows.argoproj.io/node-name=" + nodeName + " in namespace " + ns)
|
return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/workflow="+ wfName + " no pods found with label workflows.argoproj.io/node-name=" + nodeName + " in namespace " + ns)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fmt.Println("Searching for workflows.argoproj.io/node-name=" + nodeName)
|
||||||
for _, pod := range pods.Items {
|
for _, pod := range pods.Items {
|
||||||
if pod.Annotations["workflows.argoproj.io/node-name"] == nodeName {
|
if pod.Annotations["workflows.argoproj.io/node-name"] == nodeName {
|
||||||
targetPod = pod
|
targetPod = pod
|
||||||
@ -171,9 +173,12 @@ func (k *KubernetesTools) testPodReady(pod v1.Pod, ns string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var initialized bool
|
var initialized bool
|
||||||
|
fmt.Println("============= \n " + pod.Name + " not ready yet \n==============")
|
||||||
for _, cond := range pod.Status.Conditions {
|
for _, cond := range pod.Status.Conditions {
|
||||||
if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue {
|
// It seems that for remote pods the pod gets the Succeeded status before it has time to display the it is ready to run in .status.conditions,so we added the OR condition
|
||||||
|
if (cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue) || pod.Status.Phase == v1.PodSucceeded {
|
||||||
initialized = true
|
initialized = true
|
||||||
|
fmt.Println("============= \n " + pod.Name + " ready \n==============")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user