logger adjust
This commit is contained in:
@@ -178,37 +178,43 @@ func (cm *ContainerMonitor) watchJob(clientset *kubernetes.Clientset, execID str
|
|||||||
}
|
}
|
||||||
|
|
||||||
if podName == "" {
|
if podName == "" {
|
||||||
l.Error().Msg("No pod found for job after 60s")
|
cm.failExec(execID, l, "No pod found for job after 60s")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
l.Info().Str("pod", podName).Msg("Pod found for job")
|
l.Info().Str("pod", podName).Msg("Pod found for job")
|
||||||
|
|
||||||
// Wait for the pod to be Running or terminal (up to 120s)
|
// Wait for the pod to be Running or terminal (up to 120s)
|
||||||
|
podReady := false
|
||||||
for i := 0; i < 120; i++ {
|
for i := 0; i < 120; i++ {
|
||||||
pod, err := clientset.CoreV1().Pods(ns).Get(context.Background(), podName, metav1.GetOptions{})
|
pod, err := clientset.CoreV1().Pods(ns).Get(context.Background(), podName, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.Error().Err(err).Str("pod", podName).Msg("Failed to get pod status")
|
cm.failExec(execID, l, "Failed to get pod status: "+err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
phase := pod.Status.Phase
|
phase := pod.Status.Phase
|
||||||
if phase == corev1.PodRunning || phase == corev1.PodSucceeded || phase == corev1.PodFailed {
|
if phase == corev1.PodRunning || phase == corev1.PodSucceeded || phase == corev1.PodFailed {
|
||||||
l.Info().Str("pod", podName).Str("phase", string(phase)).Msg("Pod phase")
|
l.Info().Str("pod", podName).Str("phase", string(phase)).Msg("Pod phase")
|
||||||
|
podReady = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
}
|
}
|
||||||
|
if !podReady {
|
||||||
|
cm.failExec(execID, l, "Pod never reached a running/terminal phase after 120s")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Stream pod logs
|
// Stream pod logs
|
||||||
req := clientset.CoreV1().Pods(ns).GetLogs(podName, &corev1.PodLogOptions{Follow: true})
|
req := clientset.CoreV1().Pods(ns).GetLogs(podName, &corev1.PodLogOptions{Follow: true})
|
||||||
stream, err := req.Stream(context.Background())
|
stream, err := req.Stream(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.Error().Err(err).Str("pod", podName).Msg("Failed to stream pod logs")
|
cm.failExec(execID, l, "Failed to stream pod logs: "+err.Error())
|
||||||
} else {
|
return
|
||||||
|
}
|
||||||
defer stream.Close()
|
defer stream.Close()
|
||||||
l.Info().Str("pod", podName).Msg("Streaming pod logs")
|
l.Info().Str("pod", podName).Msg("Streaming pod logs")
|
||||||
logExecution(stream, l)
|
logExecution(stream, l)
|
||||||
}
|
|
||||||
|
|
||||||
// Log final job status
|
// Log final job status
|
||||||
job, err := clientset.BatchV1().Jobs(ns).Get(context.Background(), jobName, metav1.GetOptions{})
|
job, err := clientset.BatchV1().Jobs(ns).Get(context.Background(), jobName, metav1.GetOptions{})
|
||||||
|
|||||||
@@ -7,10 +7,8 @@ import (
|
|||||||
"oc-schedulerd/conf"
|
"oc-schedulerd/conf"
|
||||||
|
|
||||||
oclib "cloud.o-forge.io/core/oc-lib"
|
oclib "cloud.o-forge.io/core/oc-lib"
|
||||||
"cloud.o-forge.io/core/oc-lib/dbs"
|
|
||||||
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
||||||
workflow_execution "cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
workflow_execution "cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var Executions = ScheduledExecution{Execs: map[string]workflow_execution.WorkflowExecution{}}
|
var Executions = ScheduledExecution{Execs: map[string]workflow_execution.WorkflowExecution{}}
|
||||||
@@ -36,12 +34,10 @@ func (em *ExecutionManager) RetrieveNextExecutions() {
|
|||||||
if orderedExec[i] == nil {
|
if orderedExec[i] == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
fmt.Println("Next exec", i)
|
||||||
lead := time.Duration(conf.GetConfig().PrepLeadSeconds) * time.Second
|
lead := time.Duration(conf.GetConfig().PrepLeadSeconds) * time.Second
|
||||||
for execId, exec := range orderedExec[i] {
|
for execId, exec := range orderedExec[i] {
|
||||||
if i == 0 && em.isAStartingExecutionBeforeEnd(&exec) { // BEST EFFORT exception
|
fmt.Println("ExecDate Before", exec.ExecDate.Before(time.Now().UTC().Add(lead)))
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Fire PrepLeadSeconds before the scheduled start so oc-monitord
|
// Fire PrepLeadSeconds before the scheduled start so oc-monitord
|
||||||
// has time to pre-pull images and set up infra before ExecDate.
|
// has time to pre-pull images and set up infra before ExecDate.
|
||||||
if exec.ExecDate.Before(time.Now().UTC().Add(lead)) {
|
if exec.ExecDate.Before(time.Now().UTC().Add(lead)) {
|
||||||
@@ -64,20 +60,6 @@ func (em *ExecutionManager) RetrieveNextExecutions() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (em *ExecutionManager) isAStartingExecutionBeforeEnd(execution *workflow_execution.WorkflowExecution) bool {
|
|
||||||
access := workflow_execution.NewAccessor(nil)
|
|
||||||
l, _, err := access.Search(&dbs.Filters{
|
|
||||||
And: map[string][]dbs.Filter{
|
|
||||||
"execution_date": {{Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(*execution.EndDate)}},
|
|
||||||
"state": {{Operator: dbs.EQUAL.String(), Value: enum.SCHEDULED}},
|
|
||||||
}, // TODO later should refine on each endpoint
|
|
||||||
}, "", false)
|
|
||||||
if err != nil && len(l) == 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (em *ExecutionManager) executeExecution(execution *workflow_execution.WorkflowExecution) {
|
func (em *ExecutionManager) executeExecution(execution *workflow_execution.WorkflowExecution) {
|
||||||
// start execution
|
// start execution
|
||||||
// create the yaml that describes the pod : filename, path/url to Loki
|
// create the yaml that describes the pod : filename, path/url to Loki
|
||||||
|
|||||||
@@ -2,8 +2,8 @@ package daemons
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
)
|
)
|
||||||
@@ -13,11 +13,28 @@ type Executor interface {
|
|||||||
LaunchMonitor(args []string, execID string, ns string, l zerolog.Logger)
|
LaunchMonitor(args []string, execID string, ns string, l zerolog.Logger)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// logExecution streams lines from reader and re-logs them at the appropriate
|
||||||
|
// level by inspecting the zerolog level token already present in each line.
|
||||||
|
// Lines that contain " ERR " or " error" (case-insensitive) are emitted at
|
||||||
|
// Error so that they are visible beyond Debug-only sinks.
|
||||||
func logExecution(reader io.ReadCloser, l zerolog.Logger) {
|
func logExecution(reader io.ReadCloser, l zerolog.Logger) {
|
||||||
scanner := bufio.NewScanner(reader)
|
scanner := bufio.NewScanner(reader)
|
||||||
|
// Increase buffer to 1 MB to handle wide JSON payloads.
|
||||||
|
scanner.Buffer(make([]byte, 1024*1024), 1024*1024)
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
output := scanner.Text()
|
line := scanner.Text()
|
||||||
fmt.Println(output)
|
switch {
|
||||||
l.Debug().Msg(output)
|
case strings.Contains(line, " ERR ") || strings.Contains(line, "level=error"):
|
||||||
|
l.Error().Msg(line)
|
||||||
|
case strings.Contains(line, " WRN ") || strings.Contains(line, "level=warning"):
|
||||||
|
l.Warn().Msg(line)
|
||||||
|
case strings.Contains(line, " INF ") || strings.Contains(line, "level=info"):
|
||||||
|
l.Info().Msg(line)
|
||||||
|
default:
|
||||||
|
l.Debug().Msg(line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := scanner.Err(); err != nil {
|
||||||
|
l.Error().Err(err).Msg("log scanner error")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -3,7 +3,7 @@ module oc-schedulerd
|
|||||||
go 1.25.0
|
go 1.25.0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260324114937-6d0c78946e8b
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260326110203-87cf2cb12af0
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
github.com/rs/zerolog v1.34.0
|
github.com/rs/zerolog v1.34.0
|
||||||
go.mongodb.org/mongo-driver v1.17.4
|
go.mongodb.org/mongo-driver v1.17.4
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -4,6 +4,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260319071818-28b5b7d39ffe h1:CHiWQAX7j/bMf
|
|||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260319071818-28b5b7d39ffe/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260319071818-28b5b7d39ffe/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260324114937-6d0c78946e8b h1:y0rppyzGIQTIyvapWwHZ8t20wMaSaMU6NoZLkMCui8w=
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260324114937-6d0c78946e8b h1:y0rppyzGIQTIyvapWwHZ8t20wMaSaMU6NoZLkMCui8w=
|
||||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260324114937-6d0c78946e8b/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260324114937-6d0c78946e8b/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260326110203-87cf2cb12af0 h1:pQf9k+GSzNGEmrUa00jn9Zcqfp9X4N1Z5ie7InvUf3g=
|
||||||
|
cloud.o-forge.io/core/oc-lib v0.0.0-20260326110203-87cf2cb12af0/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA=
|
||||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||||
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
|
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
|
||||||
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
|
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
|
||||||
|
|||||||
Reference in New Issue
Block a user