diff --git a/daemons/execute_monitor_container.go b/daemons/execute_monitor_container.go index ee416a7..53cddc5 100644 --- a/daemons/execute_monitor_container.go +++ b/daemons/execute_monitor_container.go @@ -178,37 +178,43 @@ func (cm *ContainerMonitor) watchJob(clientset *kubernetes.Clientset, execID str } if podName == "" { - l.Error().Msg("No pod found for job after 60s") + cm.failExec(execID, l, "No pod found for job after 60s") return } l.Info().Str("pod", podName).Msg("Pod found for job") // Wait for the pod to be Running or terminal (up to 120s) + podReady := false for i := 0; i < 120; i++ { pod, err := clientset.CoreV1().Pods(ns).Get(context.Background(), podName, metav1.GetOptions{}) if err != nil { - l.Error().Err(err).Str("pod", podName).Msg("Failed to get pod status") + cm.failExec(execID, l, "Failed to get pod status: "+err.Error()) return } phase := pod.Status.Phase if phase == corev1.PodRunning || phase == corev1.PodSucceeded || phase == corev1.PodFailed { l.Info().Str("pod", podName).Str("phase", string(phase)).Msg("Pod phase") + podReady = true break } time.Sleep(time.Second) } + if !podReady { + cm.failExec(execID, l, "Pod never reached a running/terminal phase after 120s") + return + } // Stream pod logs req := clientset.CoreV1().Pods(ns).GetLogs(podName, &corev1.PodLogOptions{Follow: true}) stream, err := req.Stream(context.Background()) if err != nil { - l.Error().Err(err).Str("pod", podName).Msg("Failed to stream pod logs") - } else { - defer stream.Close() - l.Info().Str("pod", podName).Msg("Streaming pod logs") - logExecution(stream, l) + cm.failExec(execID, l, "Failed to stream pod logs: "+err.Error()) + return } + defer stream.Close() + l.Info().Str("pod", podName).Msg("Streaming pod logs") + logExecution(stream, l) // Log final job status job, err := clientset.BatchV1().Jobs(ns).Get(context.Background(), jobName, metav1.GetOptions{}) diff --git a/daemons/execution_manager.go b/daemons/execution_manager.go index 14ac5f3..812b705 100644 --- a/daemons/execution_manager.go +++ b/daemons/execution_manager.go @@ -7,10 +7,8 @@ import ( "oc-schedulerd/conf" oclib "cloud.o-forge.io/core/oc-lib" - "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/common/enum" workflow_execution "cloud.o-forge.io/core/oc-lib/models/workflow_execution" - "go.mongodb.org/mongo-driver/bson/primitive" ) var Executions = ScheduledExecution{Execs: map[string]workflow_execution.WorkflowExecution{}} @@ -36,12 +34,10 @@ func (em *ExecutionManager) RetrieveNextExecutions() { if orderedExec[i] == nil { continue } - + fmt.Println("Next exec", i) lead := time.Duration(conf.GetConfig().PrepLeadSeconds) * time.Second for execId, exec := range orderedExec[i] { - if i == 0 && em.isAStartingExecutionBeforeEnd(&exec) { // BEST EFFORT exception - continue - } + fmt.Println("ExecDate Before", exec.ExecDate.Before(time.Now().UTC().Add(lead))) // Fire PrepLeadSeconds before the scheduled start so oc-monitord // has time to pre-pull images and set up infra before ExecDate. if exec.ExecDate.Before(time.Now().UTC().Add(lead)) { @@ -64,20 +60,6 @@ func (em *ExecutionManager) RetrieveNextExecutions() { } } -func (em *ExecutionManager) isAStartingExecutionBeforeEnd(execution *workflow_execution.WorkflowExecution) bool { - access := workflow_execution.NewAccessor(nil) - l, _, err := access.Search(&dbs.Filters{ - And: map[string][]dbs.Filter{ - "execution_date": {{Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(*execution.EndDate)}}, - "state": {{Operator: dbs.EQUAL.String(), Value: enum.SCHEDULED}}, - }, // TODO later should refine on each endpoint - }, "", false) - if err != nil && len(l) == 0 { - return false - } - return true -} - func (em *ExecutionManager) executeExecution(execution *workflow_execution.WorkflowExecution) { // start execution // create the yaml that describes the pod : filename, path/url to Loki diff --git a/daemons/interface.go b/daemons/interface.go index ab25580..40354bb 100644 --- a/daemons/interface.go +++ b/daemons/interface.go @@ -2,8 +2,8 @@ package daemons import ( "bufio" - "fmt" "io" + "strings" "github.com/rs/zerolog" ) @@ -13,11 +13,28 @@ type Executor interface { LaunchMonitor(args []string, execID string, ns string, l zerolog.Logger) } +// logExecution streams lines from reader and re-logs them at the appropriate +// level by inspecting the zerolog level token already present in each line. +// Lines that contain " ERR " or " error" (case-insensitive) are emitted at +// Error so that they are visible beyond Debug-only sinks. func logExecution(reader io.ReadCloser, l zerolog.Logger) { scanner := bufio.NewScanner(reader) + // Increase buffer to 1 MB to handle wide JSON payloads. + scanner.Buffer(make([]byte, 1024*1024), 1024*1024) for scanner.Scan() { - output := scanner.Text() - fmt.Println(output) - l.Debug().Msg(output) + line := scanner.Text() + switch { + case strings.Contains(line, " ERR ") || strings.Contains(line, "level=error"): + l.Error().Msg(line) + case strings.Contains(line, " WRN ") || strings.Contains(line, "level=warning"): + l.Warn().Msg(line) + case strings.Contains(line, " INF ") || strings.Contains(line, "level=info"): + l.Info().Msg(line) + default: + l.Debug().Msg(line) + } + } + if err := scanner.Err(); err != nil { + l.Error().Err(err).Msg("log scanner error") } } diff --git a/go.mod b/go.mod index 810fbc1..91b26b9 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-schedulerd go 1.25.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260324114937-6d0c78946e8b + cloud.o-forge.io/core/oc-lib v0.0.0-20260326110203-87cf2cb12af0 github.com/google/uuid v1.6.0 github.com/rs/zerolog v1.34.0 go.mongodb.org/mongo-driver v1.17.4 diff --git a/go.sum b/go.sum index d7a5d56..a315176 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260319071818-28b5b7d39ffe h1:CHiWQAX7j/bMf cloud.o-forge.io/core/oc-lib v0.0.0-20260319071818-28b5b7d39ffe/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= cloud.o-forge.io/core/oc-lib v0.0.0-20260324114937-6d0c78946e8b h1:y0rppyzGIQTIyvapWwHZ8t20wMaSaMU6NoZLkMCui8w= cloud.o-forge.io/core/oc-lib v0.0.0-20260324114937-6d0c78946e8b/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260326110203-87cf2cb12af0 h1:pQf9k+GSzNGEmrUa00jn9Zcqfp9X4N1Z5ie7InvUf3g= +cloud.o-forge.io/core/oc-lib v0.0.0-20260326110203-87cf2cb12af0/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=