package logger import ( "bufio" "encoding/json" "fmt" "oc-monitord/tools" "oc-monitord/utils" "slices" "strings" "sync" "time" "cloud.o-forge.io/core/oc-lib/models/common/enum" octools "cloud.o-forge.io/core/oc-lib/tools" "github.com/rs/zerolog" "k8s.io/apimachinery/pkg/watch" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) // An object to monitor the logs generated by a specific pod from a workflow execution type ArgoWatch struct { Name string Namespace string Status string Conditions Created string Started string Duration string Progress string Logs []string } type Conditions struct { PodRunning bool Completed bool } func (a *ArgoWatch) Equals(arg *ArgoWatch) bool { if arg == nil { return false } return a.Status == arg.Status && a.Progress == arg.Progress && a.Conditions.PodRunning == arg.Conditions.PodRunning && a.Conditions.Completed == arg.Conditions.Completed } func NewArgoLogs(name string, namespace string, stepMax int) *ArgoLogs { return &ArgoLogs{ Name: "oc-monitor-" + name, Namespace: namespace, CreatedDate: time.Now().UTC().Format("2006-01-02 15:04:05"), StepCount: 0, StepMax: stepMax, stop: false, Seen: []string{}, } } // An object to monitor and log the output of an argo submit type ArgoLogs struct { Name string Namespace string CreatedDate string StepCount int StepMax int stop bool Started time.Time Seen []string Logs []string IsStreaming bool } func (a *ArgoLogs) NewWatch() *ArgoWatch { return &ArgoWatch{ Name: a.Name, Namespace: a.Namespace, Status: "Pending", Created: a.CreatedDate, Started: a.Started.Format("2006-01-02 15:04:05"), Conditions: Conditions{ PodRunning: a.StepCount > 0 && a.StepCount < a.StepMax, Completed: a.StepCount == a.StepMax, }, Progress: fmt.Sprintf("%v/%v", a.StepCount, a.StepMax), Duration: "0s", Logs: []string{}, } } func (a *ArgoLogs) StartStepRecording(current_watch *ArgoWatch, logger zerolog.Logger) { jsonified, _ := json.Marshal(current_watch) logger.Info().Msg(string(jsonified)) a.StepCount += 1 a.Started = time.Now() } type ArgoPodLog struct { PodName string Step string Message string } func NewArgoPodLog(name string, step string, msg string) ArgoPodLog { return ArgoPodLog{ PodName: name, Step: step, Message: msg, } } // LogKubernetesArgo watches an Argo workflow and emits NATS lifecycle events. // It no longer writes directly to the database — all state transitions are // delegated to oc-scheduler (WorkflowExecution) and oc-datacenter (Bookings) // via the dedicated NATS channels. // // - wfName : Argo workflow name (also the name of the root DAG node) // - execID : WorkflowExecution UUID (for oc-scheduler to update state) // - executionsID: run-group ID shared by all bookings of this run // - namespace : Kubernetes namespace // - watcher : Argo watch stream func LogKubernetesArgo(wfName string, execID string, executionsID string, namespace string, watcher watch.Interface) { var argoWatcher *ArgoWatch var pods []string var node wfv1.NodeStatus wfl := utils.GetWFLogger("") wfl.Debug().Msg("Starting to log " + wfName) var wg sync.WaitGroup // nodePhases tracks the last known phase of each step node so we can detect // phase transitions and emit WORKFLOW_STEP_DONE_EVENT exactly once per step. nodePhases := map[string]wfv1.NodePhase{} // stepResults captures the final NodeStatus of every completed step so the // WORKFLOW_DONE_EVENT can include a full recap (Steps slice) for oc-scheduler // and oc-catalog to catch up if they missed individual STEP_DONE events. stepResults := map[string]wfv1.NodeStatus{} workflowStartedEmitted := false for event := range watcher.ResultChan() { wf, ok := event.Object.(*wfv1.Workflow) if !ok { wfl.Error().Msg("unexpected type") continue } if len(wf.Status.Nodes) == 0 { wfl.Info().Msg("No node status yet") continue } // ── Emit WORKFLOW_STARTED_EVENT once ──────────────────────────────── if !workflowStartedEmitted { realStart := wf.Status.StartedAt.Time emitLifecycleEvent(octools.WORKFLOW_STARTED_EVENT, octools.WorkflowLifecycleEvent{ ExecutionID: execID, ExecutionsID: executionsID, State: enum.STARTED.EnumIndex(), RealStart: &realStart, }) workflowStartedEmitted = true } conditions := retrieveCondition(wf) // Retrieving the Status for the main node, which is named after the workflow if node, ok = wf.Status.Nodes[wfName]; !ok { bytified, _ := json.MarshalIndent(wf.Status.Nodes, "", "\t") wfl.Fatal().Msg("Could not find the " + wfName + " node in \n" + string(bytified)) } now := time.Now().UTC() start, _ := time.Parse(time.RFC3339, node.StartedAt.String()) duration := now.Sub(start.UTC()) newWatcher := ArgoWatch{ Name: node.Name, Namespace: namespace, Status: string(node.Phase), Created: node.StartedAt.String(), Started: node.StartedAt.String(), Progress: string(node.Progress), Duration: duration.String(), Conditions: conditions, } if argoWatcher == nil { argoWatcher = &newWatcher } if !newWatcher.Equals(argoWatcher) { jsonified, _ := json.Marshal(newWatcher) wfl.Info().Msg(string(jsonified)) argoWatcher = &newWatcher } // ── Per-step completion detection ──────────────────────────────────── for _, stepNode := range wf.Status.Nodes { if stepNode.Name == wfName { continue // skip the main DAG node } prev := nodePhases[stepNode.Name] nodePhases[stepNode.Name] = stepNode.Phase if prev == stepNode.Phase { continue // no change } if !stepNode.Phase.Completed() && !stepNode.Phase.FailedOrError() { continue // not terminal yet } if prev.Completed() || prev.FailedOrError() { continue // already processed } bookingID := extractBookingID(stepNode.Name) if bookingID == "" { continue } stepState := enum.SUCCESS if stepNode.Phase.FailedOrError() { stepState = enum.FAILURE } realStart := stepNode.StartedAt.Time realEnd := stepNode.FinishedAt.Time if realEnd.IsZero() { realEnd = time.Now().UTC() } emitLifecycleEvent(octools.WORKFLOW_STEP_DONE_EVENT, octools.WorkflowLifecycleEvent{ ExecutionID: execID, ExecutionsID: executionsID, BookingID: bookingID, State: stepState.EnumIndex(), RealStart: &realStart, RealEnd: &realEnd, }) // Store for the final recap emitted with WORKFLOW_DONE_EVENT. stepResults[bookingID] = stepNode } // ── Pod log streaming ──────────────────────────────────────────────── for _, pod := range wf.Status.Nodes { if pod.Type != wfv1.NodeTypePod { continue } if !slices.Contains(pods, pod.Name) { pl := wfl.With().Str("pod", pod.Name).Logger() pl.Info().Msg("Found a new pod to log : " + pod.Name) wg.Add(1) go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg) pods = append(pods, pod.Name) } } // ── Workflow terminal phase ────────────────────────────────────────── if node.Phase.Completed() || node.Phase.FailedOrError() { if node.Phase.Completed() { wfl.Info().Msg(wfName + " workflow completed") } else { wfl.Error().Msg(wfName + " has failed, please refer to the logs") wfl.Error().Msg(node.Message) } wg.Wait() wfl.Info().Msg(wfName + " exiting") finalState := enum.SUCCESS if node.Phase.FailedOrError() { finalState = enum.FAILURE } realStart := node.StartedAt.Time realEnd := node.FinishedAt.Time if realEnd.IsZero() { realEnd = time.Now().UTC() } // Build recap from all observed step results. steps := make([]octools.StepMetric, 0, len(stepResults)) for bookingID, s := range stepResults { stepState := enum.SUCCESS if s.Phase.FailedOrError() { stepState = enum.FAILURE } start := s.StartedAt.Time end := s.FinishedAt.Time if end.IsZero() { end = realEnd } steps = append(steps, octools.StepMetric{ BookingID: bookingID, State: stepState.EnumIndex(), RealStart: &start, RealEnd: &end, }) } emitLifecycleEvent(octools.WORKFLOW_DONE_EVENT, octools.WorkflowLifecycleEvent{ ExecutionID: execID, ExecutionsID: executionsID, State: finalState.EnumIndex(), RealStart: &realStart, RealEnd: &realEnd, Steps: steps, }) break } } } // emitLifecycleEvent publishes a WorkflowLifecycleEvent on the given NATS channel. func emitLifecycleEvent(method octools.NATSMethod, evt octools.WorkflowLifecycleEvent) { payload, err := json.Marshal(evt) if err != nil { return } octools.NewNATSCaller().SetNATSPub(method, octools.NATSResponse{ FromApp: "oc-monitord", Method: int(method), Payload: payload, }) } // extractBookingID extracts the bookingID (UUID, 36 chars) from an Argo node // display name. Argo step nodes are named "{wfName}.{taskName}" where taskName // is "{resource-name}-{bookingID}" as generated by getArgoName in argo_builder. func extractBookingID(nodeName string) string { parts := strings.SplitN(nodeName, ".", 2) if len(parts) < 2 { return "" } taskName := parts[1] if len(taskName) < 36 { return "" } candidate := taskName[len(taskName)-36:] // Validate UUID shape: 8-4-4-4-12 with dashes at positions 8,13,18,23. if candidate[8] == '-' && candidate[13] == '-' && candidate[18] == '-' && candidate[23] == '-' { return candidate } return "" } func retrieveCondition(wf *wfv1.Workflow) (c Conditions) { for _, cond := range wf.Status.Conditions { if cond.Type == "PodRunning" { c.PodRunning = cond.Status == "True" } if cond.Type == "Completed" { c.Completed = cond.Status == "True" } } return } // logKubernetesPods streams pod logs to the structured logger. func logKubernetesPods(executionId string, wfName string, podName string, logger zerolog.Logger, wg *sync.WaitGroup) { defer wg.Done() s := strings.Split(podName, ".") name := s[0] + "-" + s[1] step := s[1] k, err := tools.NewKubernetesTool() if err != nil { logger.Error().Msg("Could not get Kubernetes tools") return } reader, err := k.GetPodLogger(executionId, wfName, podName) if err != nil { logger.Error().Msg(err.Error()) return } scanner := bufio.NewScanner(reader) for scanner.Scan() { log := scanner.Text() podLog := NewArgoPodLog(name, step, log) jsonified, _ := json.Marshal(podLog) logger.Info().Msg(string(jsonified)) } }