219 lines
5.4 KiB
Go
219 lines
5.4 KiB
Go
package logger
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"fmt"
|
|
"oc-monitord/tools"
|
|
"oc-monitord/utils"
|
|
"slices"
|
|
"time"
|
|
|
|
"github.com/rs/zerolog"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
|
|
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
|
|
)
|
|
|
|
// An object to monitor the logs generated by a specific pod from a workflow execution
|
|
type ArgoWatch struct {
|
|
Name string
|
|
Namespace string
|
|
Status string
|
|
Conditions
|
|
Created string
|
|
Started string
|
|
Duration string
|
|
Progress string
|
|
Logs []string
|
|
}
|
|
|
|
type Conditions struct {
|
|
PodRunning bool
|
|
Completed bool
|
|
}
|
|
|
|
func (a *ArgoWatch) Equals(arg *ArgoWatch) bool {
|
|
if arg == nil {
|
|
return false
|
|
}
|
|
return a.Status == arg.Status && a.Progress == arg.Progress && a.Conditions.PodRunning == arg.Conditions.PodRunning && a.Conditions.Completed == arg.Conditions.Completed
|
|
}
|
|
|
|
func NewArgoLogs(name string, namespace string, stepMax int) *ArgoLogs {
|
|
return &ArgoLogs{
|
|
Name: "oc-monitor-" + name,
|
|
Namespace: namespace,
|
|
CreatedDate: time.Now().Format("2006-01-02 15:04:05"),
|
|
StepCount: 0,
|
|
StepMax: stepMax,
|
|
stop: false,
|
|
Seen: []string{},
|
|
}
|
|
}
|
|
|
|
// An object to monitor and log the output of an argo submit
|
|
type ArgoLogs struct {
|
|
Name string
|
|
Namespace string
|
|
CreatedDate string
|
|
StepCount int
|
|
StepMax int
|
|
stop bool
|
|
Started time.Time
|
|
Seen []string
|
|
Logs []string
|
|
IsStreaming bool
|
|
}
|
|
|
|
func (a *ArgoLogs) NewWatch() *ArgoWatch {
|
|
return &ArgoWatch{
|
|
Name: a.Name,
|
|
Namespace: a.Namespace,
|
|
Status: "Pending",
|
|
Created: a.CreatedDate,
|
|
Started: a.Started.Format("2006-01-02 15:04:05"),
|
|
Conditions: Conditions{
|
|
PodRunning: a.StepCount > 0 && a.StepCount < a.StepMax,
|
|
Completed: a.StepCount == a.StepMax,
|
|
},
|
|
Progress: fmt.Sprintf("%v/%v", a.StepCount, a.StepMax),
|
|
Duration: "0s",
|
|
Logs: []string{},
|
|
}
|
|
|
|
}
|
|
|
|
func (a *ArgoLogs) StartStepRecording(current_watch *ArgoWatch, logger zerolog.Logger) {
|
|
jsonified, _ := json.Marshal(current_watch)
|
|
logger.Info().Msg(string(jsonified))
|
|
a.StepCount += 1
|
|
a.Started = time.Now()
|
|
}
|
|
|
|
|
|
type ArgoPodLog struct {
|
|
PodName string
|
|
Step string
|
|
Message string
|
|
}
|
|
|
|
func NewArgoPodLog(name string, step string, msg string) ArgoPodLog {
|
|
return ArgoPodLog{
|
|
PodName: name,
|
|
Step: step,
|
|
Message: msg,
|
|
}
|
|
}
|
|
|
|
func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interface) {
|
|
var argoWatcher *ArgoWatch
|
|
var pods []string
|
|
var node wfv1.NodeStatus
|
|
|
|
wfl := utils.GetWFLogger("")
|
|
|
|
for event := range (watcher.ResultChan()) {
|
|
wf, ok := event.Object.(*wfv1.Workflow)
|
|
if !ok {
|
|
wfl.Error().Msg("unexpected type")
|
|
continue
|
|
}
|
|
if len(wf.Status.Nodes) == 0 {
|
|
wfl.Debug().Msg("No node status yet") // The first output of the channel doesn't contain Nodes so we skip it
|
|
continue
|
|
}
|
|
|
|
conditions := retrieveCondition(wf)
|
|
|
|
// Retrieving the Status for the main node, which is named after the workflow
|
|
if node, ok = wf.Status.Nodes[wfName]; !ok {
|
|
bytified, _ := json.MarshalIndent(wf.Status.Nodes,"","\t")
|
|
wfl.Fatal().Msg("Could not find the " + wfName + " node in \n" + string(bytified))
|
|
}
|
|
|
|
now := time.Now()
|
|
start, _ := time.Parse(time.RFC3339, node.StartedAt.String() )
|
|
duration := now.Sub(start)
|
|
|
|
newWatcher := ArgoWatch{
|
|
Name: node.Name,
|
|
Namespace: executionID,
|
|
Status: string(node.Phase),
|
|
Created: node.StartedAt.String(),
|
|
Started: node.StartedAt.String(),
|
|
Progress: string(node.Progress),
|
|
Duration: duration.String(),
|
|
Conditions: conditions,
|
|
}
|
|
|
|
if argoWatcher == nil {
|
|
argoWatcher = &newWatcher
|
|
}
|
|
|
|
if !newWatcher.Equals(argoWatcher){
|
|
jsonified, _ := json.Marshal(newWatcher)
|
|
wfl.Info().Msg(string(jsonified))
|
|
argoWatcher = &newWatcher
|
|
}
|
|
|
|
// I don't think we need to use WaitGroup here, because the loop itself
|
|
// acts as blocking process for the main thread, because Argo watch never closes the channel
|
|
for _, pod := range wf.Status.Nodes{
|
|
if !slices.Contains(pods,pod.Name){
|
|
pl := wfl.With().Str("pod", pod.Name).Logger()
|
|
if wfName == pod.Name { pods = append(pods, pod.Name); continue } // One of the node is the Workflow, the others are the pods so don't try to log on the wf name
|
|
go logKubernetesPods(executionID, wfName, pod.Name, pl)
|
|
pods = append(pods, pod.Name)
|
|
}
|
|
}
|
|
|
|
// Stop listening to the chan when the Workflow is completed or something bad happened
|
|
if node.Phase.Completed() {
|
|
wfl.Info().Msg(wfName + " worflow completed")
|
|
break
|
|
}
|
|
if node.Phase.FailedOrError() {
|
|
wfl.Error().Msg(wfName + "has failed, please refer to the logs")
|
|
wfl.Error().Msg(node.Message)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func retrieveCondition(wf *wfv1.Workflow) (c Conditions) {
|
|
for _, cond := range wf.Status.Conditions {
|
|
if cond.Type == "PodRunning" {
|
|
c.PodRunning = cond.Status == "True"
|
|
}
|
|
if cond.Type == "Completed" {
|
|
c.Completed = cond.Status == "True"
|
|
}
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
// Function needed to be executed as a go thread
|
|
func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger){
|
|
k, err := tools.NewKubernetesTool()
|
|
if err != nil {
|
|
logger.Error().Msg("Could not get Kubernetes tools")
|
|
return
|
|
}
|
|
|
|
reader, err := k.GetPodLogger(executionId, wfName, podName)
|
|
if err != nil {
|
|
logger.Error().Msg(err.Error())
|
|
return
|
|
}
|
|
|
|
scanner := bufio.NewScanner(reader)
|
|
for scanner.Scan() {
|
|
log := scanner.Text()
|
|
podLog := NewArgoPodLog(wfName,podName,log)
|
|
jsonified, _ := json.Marshal(podLog)
|
|
logger.Info().Msg(string(jsonified))
|
|
}
|
|
} |