package logger import ( "bufio" "encoding/json" "fmt" "oc-monitord/tools" "oc-monitord/utils" "slices" "time" "github.com/rs/zerolog" "k8s.io/apimachinery/pkg/watch" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) // An object to monitor the logs generated by a specific pod from a workflow execution type ArgoWatch struct { Name string Namespace string Status string Conditions Created string Started string Duration string Progress string Logs []string } type Conditions struct { PodRunning bool Completed bool } func (a *ArgoWatch) Equals(arg *ArgoWatch) bool { if arg == nil { return false } return a.Status == arg.Status && a.Progress == arg.Progress && a.Conditions.PodRunning == arg.Conditions.PodRunning && a.Conditions.Completed == arg.Conditions.Completed } func NewArgoLogs(name string, namespace string, stepMax int) *ArgoLogs { return &ArgoLogs{ Name: "oc-monitor-" + name, Namespace: namespace, CreatedDate: time.Now().Format("2006-01-02 15:04:05"), StepCount: 0, StepMax: stepMax, stop: false, Seen: []string{}, } } // An object to monitor and log the output of an argo submit type ArgoLogs struct { Name string Namespace string CreatedDate string StepCount int StepMax int stop bool Started time.Time Seen []string Logs []string IsStreaming bool } func (a *ArgoLogs) NewWatch() *ArgoWatch { return &ArgoWatch{ Name: a.Name, Namespace: a.Namespace, Status: "Pending", Created: a.CreatedDate, Started: a.Started.Format("2006-01-02 15:04:05"), Conditions: Conditions{ PodRunning: a.StepCount > 0 && a.StepCount < a.StepMax, Completed: a.StepCount == a.StepMax, }, Progress: fmt.Sprintf("%v/%v", a.StepCount, a.StepMax), Duration: "0s", Logs: []string{}, } } func (a *ArgoLogs) StartStepRecording(current_watch *ArgoWatch, logger zerolog.Logger) { jsonified, _ := json.Marshal(current_watch) logger.Info().Msg(string(jsonified)) a.StepCount += 1 a.Started = time.Now() } type ArgoPodLog struct { PodName string Step string Message string } func NewArgoPodLog(name string, step string, msg string) ArgoPodLog { return ArgoPodLog{ PodName: name, Step: step, Message: msg, } } func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interface) { var argoWatcher *ArgoWatch var pods []string var node wfv1.NodeStatus wfl := utils.GetWFLogger("") for event := range (watcher.ResultChan()) { wf, ok := event.Object.(*wfv1.Workflow) if !ok { wfl.Error().Msg("unexpected type") continue } if len(wf.Status.Nodes) == 0 { wfl.Debug().Msg("No node status yet") // The first output of the channel doesn't contain Nodes so we skip it continue } conditions := retrieveCondition(wf) // Retrieving the Status for the main node, which is named after the workflow if node, ok = wf.Status.Nodes[wfName]; !ok { bytified, _ := json.MarshalIndent(wf.Status.Nodes,"","\t") wfl.Fatal().Msg("Could not find the " + wfName + " node in \n" + string(bytified)) } now := time.Now() start, _ := time.Parse(time.RFC3339, node.StartedAt.String() ) duration := now.Sub(start) newWatcher := ArgoWatch{ Name: node.Name, Namespace: executionID, Status: string(node.Phase), Created: node.StartedAt.String(), Started: node.StartedAt.String(), Progress: string(node.Progress), Duration: duration.String(), Conditions: conditions, } if argoWatcher == nil { argoWatcher = &newWatcher } if !newWatcher.Equals(argoWatcher){ jsonified, _ := json.Marshal(newWatcher) wfl.Info().Msg(string(jsonified)) argoWatcher = &newWatcher } // I don't think we need to use WaitGroup here, because the loop itself // acts as blocking process for the main thread, because Argo watch never closes the channel for _, pod := range wf.Status.Nodes{ if !slices.Contains(pods,pod.Name){ pl := wfl.With().Str("pod", pod.Name).Logger() if wfName == pod.Name { pods = append(pods, pod.Name); continue } // One of the node is the Workflow, the others are the pods so don't try to log on the wf name go logKubernetesPods(executionID, wfName, pod.Name, pl) pods = append(pods, pod.Name) } } // Stop listening to the chan when the Workflow is completed or something bad happened if node.Phase.Completed() { wfl.Info().Msg(wfName + " worflow completed") break } if node.Phase.FailedOrError() { wfl.Error().Msg(wfName + "has failed, please refer to the logs") wfl.Error().Msg(node.Message) break } } } func retrieveCondition(wf *wfv1.Workflow) (c Conditions) { for _, cond := range wf.Status.Conditions { if cond.Type == "PodRunning" { c.PodRunning = cond.Status == "True" } if cond.Type == "Completed" { c.Completed = cond.Status == "True" } } return } // Function needed to be executed as a go thread func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger){ k, err := tools.NewKubernetesTool() if err != nil { logger.Error().Msg("Could not get Kubernetes tools") return } reader, err := k.GetPodLogger(executionId, wfName, podName) if err != nil { logger.Error().Msg(err.Error()) return } scanner := bufio.NewScanner(reader) for scanner.Scan() { log := scanner.Text() podLog := NewArgoPodLog(wfName,podName,log) jsonified, _ := json.Marshal(podLog) logger.Info().Msg(string(jsonified)) } }