From fb8d994be3909ef3398dc93317a68a5958ace39d Mon Sep 17 00:00:00 2001 From: pb Date: Thu, 17 Apr 2025 16:51:29 +0200 Subject: [PATCH] Modified how logging with monitord container is implemented, with simpler logic thanks to the argo client library and k8 client-go for pods' logs --- logger/argo_logs.go | 219 ++++++++++++++++++++++++++++++++++++++++++++ main.go | 139 +++++----------------------- models/argo_logs.go | 161 -------------------------------- tools/interface.go | 10 +- tools/kubernetes.go | 148 +++++++++++++++--------------- 5 files changed, 322 insertions(+), 355 deletions(-) create mode 100644 logger/argo_logs.go delete mode 100644 models/argo_logs.go diff --git a/logger/argo_logs.go b/logger/argo_logs.go new file mode 100644 index 0000000..9b59ba6 --- /dev/null +++ b/logger/argo_logs.go @@ -0,0 +1,219 @@ +package logger + +import ( + "bufio" + "encoding/json" + "fmt" + "oc-monitord/tools" + "oc-monitord/utils" + "slices" + "time" + + "github.com/rs/zerolog" + "k8s.io/apimachinery/pkg/watch" + + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" +) + +// An object to monitor the logs generated by a specific pod from a workflow execution +type ArgoWatch struct { + Name string + Namespace string + Status string + Conditions + Created string + Started string + Duration string + Progress string + Logs []string +} + +type Conditions struct { + PodRunning bool + Completed bool +} + +func (a *ArgoWatch) Equals(arg *ArgoWatch) bool { + if arg == nil { + return false + } + return a.Status == arg.Status && a.Progress == arg.Progress && a.Conditions.PodRunning == arg.Conditions.PodRunning && a.Conditions.Completed == arg.Conditions.Completed +} + +func NewArgoLogs(name string, namespace string, stepMax int) *ArgoLogs { + return &ArgoLogs{ + Name: "oc-monitor-" + name, + Namespace: namespace, + CreatedDate: time.Now().Format("2006-01-02 15:04:05"), + StepCount: 0, + StepMax: stepMax, + stop: false, + Seen: []string{}, + } +} + +// An object to monitor and log the output of an argo submit +type ArgoLogs struct { + Name string + Namespace string + CreatedDate string + StepCount int + StepMax int + stop bool + Started time.Time + Seen []string + Logs []string + IsStreaming bool +} + +func (a *ArgoLogs) NewWatch() *ArgoWatch { + return &ArgoWatch{ + Name: a.Name, + Namespace: a.Namespace, + Status: "Pending", + Created: a.CreatedDate, + Started: a.Started.Format("2006-01-02 15:04:05"), + Conditions: Conditions{ + PodRunning: a.StepCount > 0 && a.StepCount < a.StepMax, + Completed: a.StepCount == a.StepMax, + }, + Progress: fmt.Sprintf("%v/%v", a.StepCount, a.StepMax), + Duration: "0s", + Logs: []string{}, + } + +} + +func (a *ArgoLogs) StartStepRecording(current_watch *ArgoWatch, logger zerolog.Logger) { + jsonified, _ := json.Marshal(current_watch) + logger.Info().Msg(string(jsonified)) + a.StepCount += 1 + a.Started = time.Now() +} + + +type ArgoPodLog struct { + PodName string + Step string + Message string +} + +func NewArgoPodLog(name string, step string, msg string) ArgoPodLog { + return ArgoPodLog{ + PodName: name, + Step: step, + Message: msg, + } +} + +func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interface) { + var argoWatcher *ArgoWatch + var pods []string + var node wfv1.NodeStatus + + wfl := utils.GetWFLogger("") + + for event := range (watcher.ResultChan()) { + wf, ok := event.Object.(*wfv1.Workflow) + if !ok { + wfl.Error().Msg("unexpected type") + continue + } + if len(wf.Status.Nodes) == 0 { + wfl.Debug().Msg("No node status yet") // The first output of the channel doesn't contain Nodes so we skip it + continue + } + + conditions := retrieveCondition(wf) + + // Retrieving the Status for the main node, which is named after the workflow + if node, ok = wf.Status.Nodes[wfName]; !ok { + bytified, _ := json.MarshalIndent(wf.Status.Nodes,"","\t") + wfl.Fatal().Msg("Could not find the " + wfName + " node in \n" + string(bytified)) + } + + now := time.Now() + start, _ := time.Parse(time.RFC3339, node.StartedAt.String() ) + duration := now.Sub(start) + + newWatcher := ArgoWatch{ + Name: node.Name, + Namespace: executionID, + Status: string(node.Phase), + Created: node.StartedAt.String(), + Started: node.StartedAt.String(), + Progress: string(node.Progress), + Duration: duration.String(), + Conditions: conditions, + } + + if argoWatcher == nil { + argoWatcher = &newWatcher + } + + if !newWatcher.Equals(argoWatcher){ + jsonified, _ := json.Marshal(newWatcher) + wfl.Info().Msg(string(jsonified)) + argoWatcher = &newWatcher + } + + // I don't think we need to use WaitGroup here, because the loop itself + // acts as blocking process for the main thread, because Argo watch never closes the channel + for _, pod := range wf.Status.Nodes{ + if !slices.Contains(pods,pod.Name){ + pl := wfl.With().Str("pod", pod.Name).Logger() + if wfName == pod.Name { pods = append(pods, pod.Name); continue } // One of the node is the Workflow, the others are the pods so don't try to log on the wf name + go logKubernetesPods(executionID, wfName, pod.Name, pl) + pods = append(pods, pod.Name) + } + } + + // Stop listening to the chan when the Workflow is completed or something bad happened + if node.Phase.Completed() { + wfl.Info().Msg(wfName + " worflow completed") + break + } + if node.Phase.FailedOrError() { + wfl.Error().Msg(wfName + "has failed, please refer to the logs") + wfl.Error().Msg(node.Message) + break + } + } +} + +func retrieveCondition(wf *wfv1.Workflow) (c Conditions) { + for _, cond := range wf.Status.Conditions { + if cond.Type == "PodRunning" { + c.PodRunning = cond.Status == "True" + } + if cond.Type == "Completed" { + c.Completed = cond.Status == "True" + } + } + + return + +} + +// Function needed to be executed as a go thread +func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger){ + k, err := tools.NewKubernetesTool() + if err != nil { + logger.Error().Msg("Could not get Kubernetes tools") + return + } + + reader, err := k.GetPodLogger(executionId, wfName, podName) + if err != nil { + logger.Error().Msg(err.Error()) + return + } + + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + log := scanner.Text() + podLog := NewArgoPodLog(wfName,podName,log) + jsonified, _ := json.Marshal(podLog) + logger.Info().Msg(string(jsonified)) + } +} \ No newline at end of file diff --git a/main.go b/main.go index 4b7f8e6..9e3b4e1 100644 --- a/main.go +++ b/main.go @@ -3,19 +3,17 @@ package main import ( "bufio" "encoding/base64" - "encoding/json" "fmt" "io" "os" "os/exec" "regexp" - "slices" "strings" "sync" "time" "oc-monitord/conf" - "oc-monitord/models" + l "oc-monitord/logger" u "oc-monitord/utils" "oc-monitord/workflow_builder" @@ -45,7 +43,6 @@ import ( var logger zerolog.Logger var wf_logger zerolog.Logger -var pods_logger zerolog.Logger var parser argparse.Parser var workflowName string @@ -90,7 +87,7 @@ func main() { logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API") } - builder, stepMax, err := new_wf.ExportToArgo(exec.ExecutionsID, conf.GetConfig().Timeout) + builder, _, err := new_wf.ExportToArgo(exec.ExecutionsID, conf.GetConfig().Timeout) // Removed stepMax so far, I don't know if we need it anymore if err != nil { logger.Error().Msg("Could not create the Argo file for " + conf.GetConfig().WorkflowID) logger.Error().Msg(err.Error()) @@ -105,22 +102,21 @@ func main() { wf_logger := u.GetWFLogger(workflowName) wf_logger.Debug().Msg("Testing argo name") - _ = stepMax if conf.GetConfig().KubeHost == "" { // Not in a k8s environment, get conf from parameters fmt.Println("Executes outside of k8s") - executeOutside(argoFilePath, stepMax, builder.Workflow) + executeOutside(argoFilePath, builder.Workflow) } else { // Executed in a k8s environment fmt.Println("Executes inside a k8s") // executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID() - executeInside(conf.GetConfig().ExecutionID, conf.GetConfig().ExecutionID, argoFilePath, stepMax) + executeInside(conf.GetConfig().ExecutionID, conf.GetConfig().ExecutionID, argoFilePath) } } // So far we only log the output from -func executeInside(execID string, ns string, argo_file_path string, stepMax int) { +func executeInside(execID string, ns string, argo_file_path string) { t, err := tools2.NewService(conf.GetConfig().Mode) if err != nil { logger.Error().Msg("Could not create KubernetesTool") @@ -128,14 +124,20 @@ func executeInside(execID string, ns string, argo_file_path string, stepMax int) } name, err := t.CreateArgoWorkflow(argo_file_path, ns) - + _ = name if err != nil { logger.Error().Msg("Could not create argo workflow : " + err.Error()) + fmt.Println("CA :" + conf.GetConfig().KubeCA) + fmt.Println("Cert :" + conf.GetConfig().KubeCert) + fmt.Println("Data :" + conf.GetConfig().KubeData) return } else { - argoLogs := models.NewArgoLogs(workflowName, "argo", stepMax) - argoLogs.StartStepRecording(argoLogs.NewWatch(), wf_logger) - err := t.LogWorkflow(execID, ns, name, argo_file_path, stepMax, argoLogs.NewWatch(), argoLogs.NewWatch(), argoLogs, []string{}, logWorkflow) + watcher, err := t.GetArgoWatch(execID, workflowName) + if err != nil { + logger.Error().Msg("Could not retrieve Watcher : " + err.Error()) + } + + l.LogKubernetesArgo(name, execID, watcher) if err != nil { logger.Error().Msg("Could not log workflow : " + err.Error()) } @@ -143,11 +145,9 @@ func executeInside(execID string, ns string, argo_file_path string, stepMax int) } -func executeOutside(argo_file_path string, stepMax int, workflow workflow_builder.Workflow) { - // var stdoutSubmit, stderrSubmit, stdout_logs, stderr_logs io.ReadCloser +func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { var stdoutSubmit, stderrSubmit io.ReadCloser var stdoutLogs, stderrLogs io.ReadCloser - // var stderr io.ReadCloser var wg sync.WaitGroup var err error @@ -158,29 +158,20 @@ func executeOutside(argo_file_path string, stepMax int, workflow workflow_builde wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error()) return } - - // //======== Code block that implemented a method that logs both locally and container executed wf - // // Need to be improved, did not log well for local executions - // split := strings.Split(argo_file_path, "_") - // argoLogs := models.NewArgoLogs(split[0], conf.GetConfig().ExecutionID, stepMax) - // argoLogs.StartStepRecording(argoLogs.NewWatch(), wf_logger) - // argoLogs.IsStreaming = true // Used to determine wether or not the logs are read from a docker container or on localhost - // // go logWorkflow(argo_file_path, stepMax, stdout, argoLogs.NewWatch(), argoLogs.NewWatch(), argoLogs, []string{}, &wg) - // // ======= - - var steps []string - for _, template := range workflow.Spec.Templates { - steps = append(steps, template.Name) - } - + cmdLogs := exec.Command("argo", "logs", "oc-monitor-"+workflowName, "-n", conf.GetConfig().ExecutionID, "--follow","--no-color") if stdoutLogs, err = cmdLogs.StdoutPipe(); err != nil { wf_logger.Error().Msg("Could not retrieve stdoutpipe for 'argo logs'" + err.Error()) return } + + var steps []string + for _, template := range workflow.Spec.Templates { + steps = append(steps, template.Name) + } - go models.LogLocalWorkflow(workflowName, stdoutSubmit, &wg) - go models.LogPods(workflowName, stdoutLogs, steps, &wg) + go l.LogLocalWorkflow(workflowName, stdoutSubmit, &wg) + go l.LogLocalPod(workflowName, stdoutLogs, steps, &wg) fmt.Println("Starting argo submit") if err := cmdSubmit.Start(); err != nil { @@ -209,68 +200,6 @@ func executeOutside(argo_file_path string, stepMax int, workflow workflow_builde wg.Wait() } -// !!!! BUGGED !!!! -// Should be refactored to create a function dedicated to logging output from execution in a container -// LogLocalWorkflow() has been implemented to be used when oc-monitord is executed locally - -// We could improve this function by creating an object with the same attribute as the output -// and only send a new log if the current object has different values than the previous -func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser, - current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, - argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup) { - scanner := bufio.NewScanner(pipe) - count := 0 - see := "" - seeit := 0 - for scanner.Scan() { - log := scanner.Text() - if strings.Contains(log, "capturing logs") && count == 0 { - if !argoLogs.IsStreaming { - wg.Add(1) - } - seeit++ - } else if count == 0 && !argoLogs.IsStreaming { - break - } - if count == 1 { - see = log - if slices.Contains(argoLogs.Seen, see) && !argoLogs.IsStreaming { - wg.Done() - seeit-- - break - } - } - if !slices.Contains(current_watch.Logs, log) { - current_watch.Logs = append(current_watch.Logs, strings.ReplaceAll(log, "\"", "")) - } - count++ - if strings.Contains(log, "sub-process exited") || argoLogs.IsStreaming { - current_watch = argoLogs.StopStepRecording(current_watch) - argoLogs.Seen = append(argoLogs.Seen, see) - if checkStatus(current_watch, previous_watch, argoLogs) { - count = 0 - if !argoLogs.IsStreaming { - wg.Done() - } - seeit-- - } - jsonified, err := json.Marshal(current_watch) - if err != nil { - logger.Error().Msg("Could not create watch log") - } - if current_watch.Status == "Failed" { - wf_logger.Error().Msg(string(jsonified)) - } else { - wf_logger.Info().Msg(string(jsonified)) - } - previous_watch = current_watch - current_watch = &models.ArgoWatch{} - if argoLogs.IsStreaming { - current_watch.Logs = []string{} - } - } - } -} func loadConfig(is_k8s bool, parser *argparse.Parser) { var o *onion.Onion @@ -375,26 +304,6 @@ func getContainerName(argo_file string) string { return container_name } -// Uses the ArgoWatch object to update status of the workflow execution object -func checkStatus(current *models.ArgoWatch, previous *models.ArgoWatch, argoLogs *models.ArgoLogs) bool { - if previous == nil || current.Status != previous.Status || argoLogs.IsStreaming { - argoLogs.StepCount += 1 - if len(current.Logs) > 0 { - newLogs := []string{} - for _, log := range current.Logs { - if !slices.Contains(argoLogs.Logs, log) { - newLogs = append(newLogs, log) - } - } - updateStatus(current.Status, strings.Join(newLogs, "\n")) - current.Logs = newLogs - argoLogs.Logs = append(argoLogs.Logs, newLogs...) - } else { - updateStatus(current.Status, "") - } - } - return previous == nil || current.Status != previous.Status || argoLogs.IsStreaming -} func updateStatus(status string, log string) { exec_id := conf.GetConfig().ExecutionID diff --git a/models/argo_logs.go b/models/argo_logs.go deleted file mode 100644 index ecf150e..0000000 --- a/models/argo_logs.go +++ /dev/null @@ -1,161 +0,0 @@ -package models - -import ( - "encoding/json" - "fmt" - "strconv" - "strings" - "time" - - "github.com/acarl005/stripansi" - "github.com/rs/zerolog" -) - -// An object to monitor the logs generated by a specific pod from a workflow execution -type ArgoWatch struct { - Name string - Namespace string - Status string - Conditions - Created string - Started string - Duration string - Progress string - Logs []string -} - -type Conditions struct { - PodRunning bool - Completed bool -} - -func (a *ArgoWatch) Equals(arg *ArgoWatch) bool { - if arg == nil { - return false - } - return a.Status == arg.Status && a.Progress == arg.Progress && a.Conditions.PodRunning == arg.Conditions.PodRunning && a.Conditions.Completed == arg.Conditions.Completed -} - -func NewArgoLogs(name string, namespace string, stepMax int) *ArgoLogs { - return &ArgoLogs{ - Name: "oc-monitor-" + name, - Namespace: namespace, - CreatedDate: time.Now().Format("2006-01-02 15:04:05"), - StepCount: 0, - StepMax: stepMax, - stop: false, - Seen: []string{}, - } -} - -// An object to monitor and log the output of an argo submit -type ArgoLogs struct { - Name string - Namespace string - CreatedDate string - StepCount int - StepMax int - stop bool - Started time.Time - Seen []string - Logs []string - IsStreaming bool -} - -func (a *ArgoLogs) NewWatch() *ArgoWatch { - return &ArgoWatch{ - Name: a.Name, - Namespace: a.Namespace, - Status: "Pending", - Created: a.CreatedDate, - Started: a.Started.Format("2006-01-02 15:04:05"), - Conditions: Conditions{ - PodRunning: a.StepCount > 0 && a.StepCount < a.StepMax, - Completed: a.StepCount == a.StepMax, - }, - Progress: fmt.Sprintf("%v/%v", a.StepCount, a.StepMax), - Duration: "0s", - Logs: []string{}, - } - -} - -func (a *ArgoLogs) StartStepRecording(current_watch *ArgoWatch, logger zerolog.Logger) { - jsonified, _ := json.Marshal(current_watch) - logger.Info().Msg(string(jsonified)) - a.StepCount += 1 - a.Started = time.Now() -} - -func (a *ArgoLogs) StopStepRecording(current *ArgoWatch) *ArgoWatch { - fn := strings.Split(a.Name, "_") - logs := []string{} - err := false - end := "" - for _, input := range current.Logs { - line := strings.TrimSpace(input) - if line == "" || !strings.Contains(line, fn[0]) || !strings.Contains(line, ":") { - continue - } - step := strings.Split(line, ":") - if strings.Contains(line, "sub-process exited") { - b := strings.Split(line, "time=\"") - if len(b) > 1 { - end = b[1][:19] - } - } - if len(step) < 2 || strings.Contains(line, "time=") || strings.TrimSpace(strings.Join(step[1:], " : ")) == "" || strings.TrimSpace(strings.Join(step[1:], " : ")) == a.Name { - continue - } - log := stripansi.Strip(strings.TrimSpace(strings.Join(step[1:], " : "))) - t, e := strconv.Unquote(log) - if e == nil { - logs = append(logs, t) - } else { - logs = append(logs, strings.ReplaceAll(log, "\"", "`")) - } - - if strings.Contains(logs[len(logs)-1], "Error") { - err = true - } - } - status := "Pending" - if a.StepCount > 0 { - status = "Running" - } - if a.StepCount == a.StepMax { - if err { - status = "Failed" - } else { - status = "Succeeded" - } - } - duration := float64(0) - if end != "" { - timeE, _ := time.Parse("2006-01-02T15:04:05", end) - duration = timeE.Sub(a.Started).Seconds() - } - current.Conditions = Conditions{ - PodRunning: a.StepCount > 0 && a.StepCount < a.StepMax, - Completed: a.StepCount == a.StepMax, - } - current.Progress = fmt.Sprintf("%v/%v", a.StepCount, a.StepMax) - current.Duration = fmt.Sprintf("%v", fmt.Sprintf("%.2f", duration)+"s") - - current.Status = status - return current -} - -type ArgoPodLog struct { - PodName string - Step string - Message string -} - -func NewArgoPodLog(name string, step string, msg string) ArgoPodLog { - return ArgoPodLog{ - PodName: name, - Step: step, - Message: msg, - } -} \ No newline at end of file diff --git a/tools/interface.go b/tools/interface.go index 2b356d8..b6f2610 100644 --- a/tools/interface.go +++ b/tools/interface.go @@ -3,17 +3,15 @@ package tools import ( "errors" "io" - "oc-monitord/models" - "sync" + + "k8s.io/apimachinery/pkg/watch" ) type Tool interface { CreateArgoWorkflow(path string, ns string) (string, error) CreateAccessSecret(ns string, login string, password string) (string, error) - LogWorkflow(execID string, namespace string, workflowName string, argoFilePath string, stepMax int, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, - argoLogs *models.ArgoLogs, seen []string, - logFunc func(argoFilePath string, stepMax int, pipe io.ReadCloser, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, - argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup)) error + GetArgoWatch(executionId string, wfName string) (watch.Interface, error) + GetPodLogger(ns string, wfName string, podName string) (io.ReadCloser, error) } var _service = map[string]func() (Tool, error){ diff --git a/tools/kubernetes.go b/tools/kubernetes.go index 794cbcd..1825cf7 100644 --- a/tools/kubernetes.go +++ b/tools/kubernetes.go @@ -7,21 +7,18 @@ import ( "fmt" "io" "oc-monitord/conf" - "oc-monitord/models" "oc-monitord/utils" "os" - "sync" "time" - "cloud.o-forge.io/core/oc-lib/models/common/enum" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" "github.com/google/uuid" - corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) @@ -59,73 +56,6 @@ func NewKubernetesTool() (Tool, error) { }, nil } -func (k *KubernetesTools) LogWorkflow(execID string, namespace string, workflowName string, argoFilePath string, stepMax int, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, - seen []string, logFunc func(argoFilePath string, stepMax int, pipe io.ReadCloser, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup)) error { - exec := utils.GetExecution(execID) - if exec == nil { - return errors.New("Could not retrieve workflow ID from execution ID " + execID) - } - if exec.State == enum.DRAFT || exec.State == enum.FAILURE || exec.State == enum.SUCCESS { - l := utils.GetWFLogger("") - l.Error().Msg("The execution's state doesn't meet requirement, state is : " + exec.State.String()) - return nil - } - k.logWorkflow(namespace, workflowName, argoFilePath, stepMax, current_watch, previous_watch, argoLogs, seen, logFunc) - return k.LogWorkflow(execID, namespace, workflowName, argoFilePath, stepMax, current_watch, previous_watch, argoLogs, seen, logFunc) -} - -func (k *KubernetesTools) logWorkflow(namespace string, workflowName string, argoFilePath string, stepMax int, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, - seen []string, - logFunc func(argoFilePath string, stepMax int, pipe io.ReadCloser, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup)) error { - // List pods related to the Argo workflow - fmt.Println("\n!!!!!!!! !!!!!!!!!! !!!!!!!! &&&& & STARTING LOG\n\n") - labelSelector := fmt.Sprintf("workflows.argoproj.io/workflow=%s", workflowName) - for retries := 0; retries < 10; retries++ { // Retry for up to ~20 seconds - // List workflow pods - wfPods, err := k.Set.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{ - LabelSelector: labelSelector, - }) - if err != nil { - return err - } - // If we found pods, stream logs - if len(wfPods.Items) > 0 { - var wg sync.WaitGroup - // Stream logs from all matching pods - for _, pod := range wfPods.Items { - for _, container := range pod.Spec.Containers { - wg.Add(1) - go k.streamLogs(namespace, pod.Name, container.Name, argoFilePath, stepMax, &wg, current_watch, previous_watch, argoLogs, seen, logFunc) - } - } - wg.Wait() - return nil - } - time.Sleep(2 * time.Second) // Wait before retrying - } - return errors.New("no pods found for the workflow") -} - -// Function to stream logs -func (k *KubernetesTools) streamLogs(namespace string, podName string, containerName string, - argoFilePath string, stepMax int, wg *sync.WaitGroup, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, seen []string, - logFunc func(argo_file_path string, stepMax int, pipe io.ReadCloser, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup)) { - req := k.Set.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{ - Container: containerName, // Main container - Follow: true, // Equivalent to -f flag in kubectl logs - }) - defer wg.Done() - // Open stream - stream, err := req.Stream(context.Background()) - if err != nil { - return - } - defer stream.Close() - var internalWg sync.WaitGroup - logFunc(argoFilePath, stepMax, stream, current_watch, previous_watch, argoLogs, seen, &internalWg) - internalWg.Wait() -} - func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, error) { // Read workflow YAML file workflowYAML, err := os.ReadFile(path) @@ -149,7 +79,7 @@ func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, er } // Create the workflow in the "argo" namespace - createdWf, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(ns).Create(context.Background(), workflow, metav1.CreateOptions{}) + createdWf, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(ns).Create(context.TODO(), workflow, metav1.CreateOptions{}) if err != nil { return "", errors.New("failed to create workflow: " + err.Error()) } @@ -177,9 +107,81 @@ func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password s Data: secretData, } // Create the Secret in Kubernetes - _, err := k.Set.CoreV1().Secrets(namespace).Create(context.Background(), secret, metav1.CreateOptions{}) + _, err := k.Set.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{}) if err != nil { return "", errors.New("Error creating secret: " + err.Error()) } return name, nil } + +func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch.Interface, error){ + wfl := utils.GetWFLogger("") + wfl.Debug().Msg("Starting argo watch with argo lib") + fmt.Println("metadata.name=oc-monitor-"+wfName + " in namespace : " + executionId) + options := metav1.ListOptions{FieldSelector: "metadata.name=oc-monitor-"+wfName} + fmt.Println(options) + watcher, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(executionId).Watch(context.TODO(), options) + if err != nil { + return nil, errors.New("Error executing 'argo watch " + wfName + " -n " + executionId + " with ArgoprojV1alpha1 client") + } + + + return watcher, nil + +} + +func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) { + var targetPod v1.Pod + + pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{ + LabelSelector: "workflows.argoproj.io/workflow="+wfName, + }) + if err != nil { + return nil, fmt.Errorf("failed to list pods: " + err.Error()) + } + if len(pods.Items) == 0 { + return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/node-name=" + nodeName) + } + + for _, pod := range pods.Items { + if pod.Annotations["workflows.argoproj.io/node-name"] == nodeName { + targetPod = pod + } + } + + // k8s API throws an error if we try getting logs while the container are not initialized, so we repeat status check there + k.testPodReady(targetPod, ns) + + // When using kubec logs for a pod we see it contacts /api/v1/namespaces/NAMESPACE/pods/oc-monitor-PODNAME/log?container=main so we add this container: main to the call + req, err := k.Set.CoreV1().Pods(ns).GetLogs(targetPod.Name, &v1.PodLogOptions{Follow: true, Container: "main"}). Stream(context.Background()) + if err != nil { + return nil, fmt.Errorf(" Error when trying to get logs for " + targetPod.Name + " : " + err.Error()) + } + + return req, nil +} + +func (k *KubernetesTools) testPodReady(pod v1.Pod, ns string) { + for { + pod, err := k.Set.CoreV1().Pods(ns).Get(context.Background(), pod.Name, metav1.GetOptions{}) + if err != nil { + wfl := utils.GetWFLogger("") + wfl.Error().Msg("Error fetching pod: " + err.Error() + "\n") + break + } + + var initialized bool + for _, cond := range pod.Status.Conditions { + if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue { + initialized = true + return + } + } + + if initialized { + return + } + + time.Sleep(2 * time.Second) // avoid hammering the API + } +} \ No newline at end of file