Monitord Acces Change

This commit is contained in:
mr
2026-05-27 16:09:45 +02:00
parent a9284314ef
commit 7c91a8b032
19 changed files with 2496 additions and 332 deletions
+375 -165
View File
@@ -14,7 +14,6 @@ import (
"cloud.o-forge.io/core/oc-lib/models/common/enum"
octools "cloud.o-forge.io/core/oc-lib/tools"
"github.com/rs/zerolog"
"k8s.io/apimachinery/pkg/watch"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
)
@@ -114,15 +113,23 @@ func NewArgoPodLog(name string, step string, msg string) ArgoPodLog {
// delegated to oc-scheduler (WorkflowExecution) and oc-datacenter (Bookings)
// via the dedicated NATS channels.
//
// - wfName : Argo workflow name (also the name of the root DAG node)
// - execID : WorkflowExecution UUID (for oc-scheduler to update state)
// - executionsID: run-group ID shared by all bookings of this run
// - namespace : Kubernetes namespace
// - watcher : Argo watch stream
func LogKubernetesArgo(wfName string, execID string, executionsID string, namespace string, watcher watch.Interface) {
// When the Kubernetes watch channel closes before the workflow reaches a
// terminal state (API server timeout, network blip, etc.), the function
// fetches the actual workflow state directly. If the workflow is still
// running it reconnects the watcher and continues. Only a genuine terminal
// failure emits WORKFLOW_DONE_EVENT with FAILURE.
//
// - wfName : Argo workflow name as returned by CreateArgoWorkflow (used
// as the root DAG node key in wf.Status.Nodes)
// - execID : WorkflowExecution UUID
// - executionsID: run-group ID / Kubernetes namespace of the workflow
// - namespace : Kubernetes namespace for pod log streaming
// - tool : Kubernetes/Argo client (used to reconnect + fetch state)
// - rawWfName : workflow name without the "oc-monitor-" prefix (passed to
// GetArgoWatch which prepends it)
func LogKubernetesArgo(wfName string, execID string, executionsID string, namespace string, tool tools.Tool, rawWfName string) {
var argoWatcher *ArgoWatch
var pods []string
var node wfv1.NodeStatus
wfl := utils.GetWFLogger("")
wfl.Debug().Msg("Starting to log " + wfName)
@@ -131,181 +138,363 @@ func LogKubernetesArgo(wfName string, execID string, executionsID string, namesp
// nodePhases tracks the last known phase of each step node so we can detect
// phase transitions and emit WORKFLOW_STEP_DONE_EVENT exactly once per step.
// Kept across watcher reconnections so we never double-emit.
nodePhases := map[string]wfv1.NodePhase{}
// stepResults captures the final NodeStatus of every completed step so the
// WORKFLOW_DONE_EVENT can include a full recap (Steps slice) for oc-scheduler
// and oc-catalog to catch up if they missed individual STEP_DONE events.
// WORKFLOW_DONE_EVENT can include a full recap (Steps slice).
stepResults := map[string]wfv1.NodeStatus{}
workflowStartedEmitted := false
for event := range watcher.ResultChan() {
wf, ok := event.Object.(*wfv1.Workflow)
if !ok {
wfl.Error().Msg("unexpected type")
continue
}
if len(wf.Status.Nodes) == 0 {
wfl.Info().Msg("No node status yet")
for {
watcher, err := tool.GetArgoWatch(executionsID, rawWfName)
if err != nil {
wfl.Error().Msg("Could not create watcher: " + err.Error())
if resolveAndEmitTerminal(wfName, execID, executionsID, tool, nodePhases, stepResults, &wg, wfl) {
return
}
time.Sleep(5 * time.Second)
continue
}
// ── Emit WORKFLOW_STARTED_EVENT once ────────────────────────────────
if !workflowStartedEmitted {
realStart := wf.Status.StartedAt.Time
emitLifecycleEvent(octools.WORKFLOW_STARTED_EVENT, octools.WorkflowLifecycleEvent{
ExecutionID: execID,
ExecutionsID: executionsID,
State: enum.STARTED.EnumIndex(),
RealStart: &realStart,
})
workflowStartedEmitted = true
}
reachedTerminalState := false
var node wfv1.NodeStatus
conditions := retrieveCondition(wf)
// Retrieving the Status for the main node, which is named after the workflow
if node, ok = wf.Status.Nodes[wfName]; !ok {
bytified, _ := json.MarshalIndent(wf.Status.Nodes, "", "\t")
wfl.Fatal().Msg("Could not find the " + wfName + " node in \n" + string(bytified))
}
now := time.Now().UTC()
start, _ := time.Parse(time.RFC3339, node.StartedAt.String())
duration := now.Sub(start.UTC())
newWatcher := ArgoWatch{
Name: node.Name,
Namespace: namespace,
Status: string(node.Phase),
Created: node.StartedAt.String(),
Started: node.StartedAt.String(),
Progress: string(node.Progress),
Duration: duration.String(),
Conditions: conditions,
}
if argoWatcher == nil {
argoWatcher = &newWatcher
}
if !newWatcher.Equals(argoWatcher) {
jsonified, _ := json.Marshal(newWatcher)
wfl.Info().Msg(string(jsonified))
argoWatcher = &newWatcher
}
// ── Per-step completion detection ────────────────────────────────────
for _, stepNode := range wf.Status.Nodes {
if stepNode.Name == wfName {
continue // skip the main DAG node
for event := range watcher.ResultChan() {
wf, ok := event.Object.(*wfv1.Workflow)
if !ok {
wfl.Error().Msg("unexpected type")
continue
}
prev := nodePhases[stepNode.Name]
nodePhases[stepNode.Name] = stepNode.Phase
if prev == stepNode.Phase {
continue // no change
}
if !stepNode.Phase.Completed() && !stepNode.Phase.FailedOrError() {
continue // not terminal yet
}
if prev.Completed() || prev.FailedOrError() {
continue // already processed
}
bookingID := extractBookingID(stepNode.Name)
if bookingID == "" {
if len(wf.Status.Nodes) == 0 {
wfl.Info().Msg("No node status yet")
continue
}
stepState := enum.SUCCESS
if stepNode.Phase.FailedOrError() {
// ── Emit WORKFLOW_STARTED_EVENT once ────────────────────────────────
if !workflowStartedEmitted {
realStart := wf.Status.StartedAt.Time
if realStart.IsZero() {
realStart = time.Now().UTC()
}
emitLifecycleEvent(octools.WORKFLOW_STARTED_EVENT, octools.WorkflowLifecycleEvent{
ExecutionID: execID,
ExecutionsID: executionsID,
State: enum.STARTED.EnumIndex(),
RealStart: &realStart,
})
workflowStartedEmitted = true
}
conditions := retrieveCondition(wf)
// Retrieving the Status for the main node, which is named after the workflow
if node, ok = wf.Status.Nodes[wfName]; !ok {
bytified, _ := json.MarshalIndent(wf.Status.Nodes, "", "\t")
wfl.Error().Msg("Could not find the " + wfName + " node in \n" + string(bytified))
continue
}
now := time.Now().UTC()
start := node.StartedAt.Time.UTC()
var duration time.Duration
if !start.IsZero() {
duration = now.Sub(start)
}
newWatcher := ArgoWatch{
Name: node.Name,
Namespace: namespace,
Status: string(node.Phase),
Created: node.StartedAt.String(),
Started: node.StartedAt.String(),
Progress: string(node.Progress),
Duration: duration.String(),
Conditions: conditions,
}
if argoWatcher == nil {
argoWatcher = &newWatcher
}
if !newWatcher.Equals(argoWatcher) {
jsonified, _ := json.Marshal(newWatcher)
wfl.Info().Msg(string(jsonified))
argoWatcher = &newWatcher
}
// ── Per-step completion detection ────────────────────────────────────
for _, stepNode := range wf.Status.Nodes {
if stepNode.Name == wfName {
continue // skip the main DAG node
}
prev := nodePhases[stepNode.Name]
nodePhases[stepNode.Name] = stepNode.Phase
if prev == stepNode.Phase {
continue // no change
}
if !stepNode.Phase.Completed() && !stepNode.Phase.FailedOrError() {
continue // not terminal yet
}
if prev.Completed() || prev.FailedOrError() {
continue // already processed
}
bookingID := extractBookingID(stepNode.Name)
if bookingID == "" {
continue
}
stepState := enum.SUCCESS
if stepNode.Phase.FailedOrError() {
if !(strings.Contains(stepNode.Message, "context cancel") || strings.Contains(stepNode.Message, "exit")) {
fmt.Println("1 baraka", stepNode.Message)
stepState = enum.FAILURE
}
}
realStart := stepNode.StartedAt.Time
realEnd := stepNode.FinishedAt.Time
if realEnd.IsZero() {
realEnd = time.Now().UTC()
}
if realStart.IsZero() {
realStart = realEnd
}
emitLifecycleEvent(octools.WORKFLOW_STEP_DONE_EVENT, octools.WorkflowLifecycleEvent{
ExecutionID: execID,
ExecutionsID: executionsID,
BookingID: bookingID,
State: stepState.EnumIndex(),
RealStart: &realStart,
RealEnd: &realEnd,
})
// Store for the final recap emitted with WORKFLOW_DONE_EVENT.
stepResults[bookingID] = stepNode
}
// ── Pod log streaming ────────────────────────────────────────────────
for _, pod := range wf.Status.Nodes {
if pod.Type != wfv1.NodeTypePod {
continue
}
if !slices.Contains(pods, pod.Name) {
pl := wfl.With().Str("pod", pod.Name).Logger()
pl.Info().Msg("Found a new pod to log : " + pod.Name)
wg.Add(1)
go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg)
pods = append(pods, pod.Name)
}
}
// ── Workflow terminal phase ──────────────────────────────────────────
if node.Phase.Completed() || node.Phase.FailedOrError() {
if node.Phase.Completed() {
wfl.Info().Msg(wfName + " workflow completed")
} else {
wfl.Error().Msg(wfName + " has failed, please refer to the logs")
wfl.Error().Msg(node.Message)
}
wg.Wait()
wfl.Info().Msg(wfName + " exiting")
finalState := enum.SUCCESS
if node.Phase.FailedOrError() {
if !(strings.Contains(node.Message, "context cancel") || strings.Contains(node.Message, "exit")) {
fmt.Println("2 baraka", node.Message)
finalState = enum.FAILURE
}
}
realStart := node.StartedAt.Time
realEnd := node.FinishedAt.Time
if realEnd.IsZero() {
realEnd = time.Now().UTC()
}
if realStart.IsZero() {
realStart = realEnd
}
// Build recap from all observed step results.
steps := make([]octools.StepMetric, 0, len(stepResults))
for bookingID, s := range stepResults {
stepState := enum.SUCCESS
if s.Phase.FailedOrError() {
if !(strings.Contains(s.Message, "context cancel") || strings.Contains(s.Message, "exit")) {
fmt.Println("3 baraka", s.Message)
stepState = enum.FAILURE
}
}
start := s.StartedAt.Time
end := s.FinishedAt.Time
if end.IsZero() {
end = realEnd
}
steps = append(steps, octools.StepMetric{
BookingID: bookingID,
State: stepState.EnumIndex(),
RealStart: &start,
RealEnd: &end,
})
}
emitLifecycleEvent(octools.WORKFLOW_DONE_EVENT, octools.WorkflowLifecycleEvent{
ExecutionID: execID,
ExecutionsID: executionsID,
State: finalState.EnumIndex(),
RealStart: &realStart,
RealEnd: &realEnd,
Steps: steps,
})
reachedTerminalState = true
break
}
}
watcher.Stop()
if reachedTerminalState {
return
}
// ── Watcher closed before terminal state ─────────────────────────────
// The Kubernetes watch API closes the channel on server-side timeouts,
// API server restarts, or network blips. Do NOT assume failure: fetch
// the actual workflow state and reconnect if still running.
wfl.Warn().Msg(wfName + " watcher closed before workflow reached terminal state — checking actual state")
if resolveAndEmitTerminal(wfName, execID, executionsID, tool, nodePhases, stepResults, &wg, wfl) {
return
}
wfl.Info().Msg(wfName + " workflow still running, reconnecting watcher")
time.Sleep(3 * time.Second)
}
}
// resolveAndEmitTerminal fetches the live workflow state via the API. If the
// workflow has reached a terminal phase it emits any missing STEP_DONE events
// followed by WORKFLOW_DONE_EVENT and returns true. Returns false when the
// workflow is still running or the state cannot be determined (caller should
// reconnect the watcher).
func resolveAndEmitTerminal(
wfName string, execID string, executionsID string,
tool tools.Tool,
nodePhases map[string]wfv1.NodePhase,
stepResults map[string]wfv1.NodeStatus,
wg *sync.WaitGroup,
wfl zerolog.Logger,
) bool {
wf, err := tool.GetArgoWorkflow(executionsID, wfName)
if err != nil {
wfl.Warn().Msg("Could not fetch workflow state: " + err.Error())
return false
}
rootNode, ok := wf.Status.Nodes[wfName]
if !ok {
wfl.Warn().Msg("Root node " + wfName + " not found in live workflow status")
return false
}
if !rootNode.Phase.Completed() && !rootNode.Phase.FailedOrError() {
wfl.Info().Msgf("%s still running (phase=%s), will reconnect watcher", wfName, rootNode.Phase)
return false
}
// Emit any STEP_DONE events that were missed while the watcher was down.
for _, stepNode := range wf.Status.Nodes {
if stepNode.Name == wfName {
continue
}
prev := nodePhases[stepNode.Name]
if prev.Completed() || prev.FailedOrError() {
continue // already emitted
}
if !stepNode.Phase.Completed() && !stepNode.Phase.FailedOrError() {
continue
}
bookingID := extractBookingID(stepNode.Name)
if bookingID == "" {
continue
}
stepState := enum.SUCCESS
if stepNode.Phase.FailedOrError() {
if !(strings.Contains(stepNode.Message, "context cancel") || strings.Contains(stepNode.Message, "exit")) {
fmt.Println("4 baraka", stepNode.Message)
stepState = enum.FAILURE
}
realStart := stepNode.StartedAt.Time
realEnd := stepNode.FinishedAt.Time
if realEnd.IsZero() {
realEnd = time.Now().UTC()
}
emitLifecycleEvent(octools.WORKFLOW_STEP_DONE_EVENT, octools.WorkflowLifecycleEvent{
ExecutionID: execID,
ExecutionsID: executionsID,
BookingID: bookingID,
State: stepState.EnumIndex(),
RealStart: &realStart,
RealEnd: &realEnd,
})
// Store for the final recap emitted with WORKFLOW_DONE_EVENT.
stepResults[bookingID] = stepNode
}
// ── Pod log streaming ────────────────────────────────────────────────
for _, pod := range wf.Status.Nodes {
if pod.Type != wfv1.NodeTypePod {
continue
}
if !slices.Contains(pods, pod.Name) {
pl := wfl.With().Str("pod", pod.Name).Logger()
pl.Info().Msg("Found a new pod to log : " + pod.Name)
wg.Add(1)
go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg)
pods = append(pods, pod.Name)
}
realStart := stepNode.StartedAt.Time
realEnd := stepNode.FinishedAt.Time
if realEnd.IsZero() {
realEnd = time.Now().UTC()
}
if realStart.IsZero() {
realStart = realEnd
}
fmt.Println("STEP DONE !!!!! ", stepNode.Name, stepState)
emitLifecycleEvent(octools.WORKFLOW_STEP_DONE_EVENT, octools.WorkflowLifecycleEvent{
ExecutionID: execID,
ExecutionsID: executionsID,
BookingID: bookingID,
State: stepState.EnumIndex(),
RealStart: &realStart,
RealEnd: &realEnd,
})
stepResults[bookingID] = stepNode
nodePhases[stepNode.Name] = stepNode.Phase
}
// ── Workflow terminal phase ──────────────────────────────────────────
if node.Phase.Completed() || node.Phase.FailedOrError() {
if node.Phase.Completed() {
wfl.Info().Msg(wfName + " workflow completed")
} else {
wfl.Error().Msg(wfName + " has failed, please refer to the logs")
wfl.Error().Msg(node.Message)
}
wg.Wait()
wfl.Info().Msg(wfName + " exiting")
wg.Wait()
finalState := enum.SUCCESS
if node.Phase.FailedOrError() {
finalState = enum.FAILURE
}
realStart := node.StartedAt.Time
realEnd := node.FinishedAt.Time
if realEnd.IsZero() {
realEnd = time.Now().UTC()
}
// Build recap from all observed step results.
steps := make([]octools.StepMetric, 0, len(stepResults))
for bookingID, s := range stepResults {
stepState := enum.SUCCESS
if s.Phase.FailedOrError() {
stepState = enum.FAILURE
}
start := s.StartedAt.Time
end := s.FinishedAt.Time
if end.IsZero() {
end = realEnd
}
steps = append(steps, octools.StepMetric{
BookingID: bookingID,
State: stepState.EnumIndex(),
RealStart: &start,
RealEnd: &end,
})
}
emitLifecycleEvent(octools.WORKFLOW_DONE_EVENT, octools.WorkflowLifecycleEvent{
ExecutionID: execID,
ExecutionsID: executionsID,
State: finalState.EnumIndex(),
RealStart: &realStart,
RealEnd: &realEnd,
Steps: steps,
})
break
finalState := enum.SUCCESS
if rootNode.Phase.FailedOrError() {
if !(strings.Contains(rootNode.Message, "context cancel") || strings.Contains(rootNode.Message, "exit")) {
fmt.Println("5 baraka", rootNode.Message)
finalState = enum.FAILURE
}
}
realStart := rootNode.StartedAt.Time
realEnd := rootNode.FinishedAt.Time
if realEnd.IsZero() {
realEnd = time.Now().UTC()
}
if realStart.IsZero() {
realStart = realEnd
}
steps := make([]octools.StepMetric, 0, len(stepResults))
for bookingID, s := range stepResults {
stepState := enum.SUCCESS
if s.Phase.FailedOrError() {
if !(strings.Contains(s.Message, "context cancel") || strings.Contains(s.Message, "exit")) {
fmt.Println("6 baraka", s.Message)
stepState = enum.FAILURE
}
}
start := s.StartedAt.Time
end := s.FinishedAt.Time
if end.IsZero() {
end = realEnd
}
steps = append(steps, octools.StepMetric{
BookingID: bookingID,
State: stepState.EnumIndex(),
RealStart: &start,
RealEnd: &end,
})
}
emitLifecycleEvent(octools.WORKFLOW_DONE_EVENT, octools.WorkflowLifecycleEvent{
ExecutionID: execID,
ExecutionsID: executionsID,
State: finalState.EnumIndex(),
RealStart: &realStart,
RealEnd: &realEnd,
Steps: steps,
})
return true
}
// emitLifecycleEvent publishes a WorkflowLifecycleEvent on the given NATS channel.
@@ -357,7 +546,11 @@ func retrieveCondition(wf *wfv1.Workflow) (c Conditions) {
func logKubernetesPods(executionId string, wfName string, podName string, logger zerolog.Logger, wg *sync.WaitGroup) {
defer wg.Done()
s := strings.Split(podName, ".")
s := strings.SplitN(podName, ".", 2)
if len(s) < 2 {
logger.Error().Str("pod", podName).Msg("Unexpected pod name format, expected wfName.stepName")
return
}
name := s[0] + "-" + s[1]
step := s[1]
@@ -374,10 +567,27 @@ func logKubernetesPods(executionId string, wfName string, podName string, logger
}
scanner := bufio.NewScanner(reader)
scanner.Buffer(make([]byte, 1024*1024), 1024*1024)
for scanner.Scan() {
log := scanner.Text()
podLog := NewArgoPodLog(name, step, log)
jsonified, _ := json.Marshal(podLog)
logger.Info().Msg(string(jsonified))
line := scanner.Text()
podLog := NewArgoPodLog(name, step, line)
jsonified, err := json.Marshal(podLog)
if err != nil {
logger.Error().Err(err).Msg("Failed to marshal pod log")
continue
}
// Propagate the log level from the container output so errors
// are visible above Debug-only sinks.
switch {
case strings.Contains(line, "level=error") || strings.Contains(line, " ERR "):
logger.Error().Msg(string(jsonified))
case strings.Contains(line, "level=warning") || strings.Contains(line, " WRN "):
logger.Warn().Msg(string(jsonified))
default:
logger.Info().Msg(string(jsonified))
}
}
if err := scanner.Err(); err != nil {
logger.Error().Err(err).Str("pod", podName).Msg("Pod log scanner error")
}
}