package daemons import ( "context" "encoding/base64" "fmt" "oc-schedulerd/conf" "time" // already used for ContainerMonitor.watchJob oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/models/common/enum" "github.com/rs/zerolog" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) type ContainerMonitor struct { Monitor LocalMonitor KubeCA string KubeCert string KubeData string KubeHost string KubePort string KubeNamespace string KubeImage string } func NewContainerMonitor(UUID string, peerId string, duration int, scheduledTime time.Time) Executor { return &ContainerMonitor{ Monitor: LocalMonitor{ ExecutionID: UUID, PeerID: peerId, Duration: duration, LokiUrl: oclib.GetConfig().LokiUrl, MongoUrl: oclib.GetConfig().MongoUrl, DBName: oclib.GetConfig().MongoDatabase, ScheduledTime: scheduledTime, }, KubeCA: conf.GetConfig().KubeCA, KubeCert: conf.GetConfig().KubeCert, KubeData: conf.GetConfig().KubeData, KubeHost: conf.GetConfig().KubeHost, KubePort: conf.GetConfig().KubePort, KubeNamespace: conf.GetConfig().KubeNamespace, KubeImage: conf.GetConfig().KubeImage, } } func (cm *ContainerMonitor) PrepareMonitorExec() []string { args := []string{ "-e", cm.Monitor.ExecutionID, "-p", cm.Monitor.PeerID, "-M", "kubernetes", "-H", cm.KubeHost, "-P", cm.KubePort, "-C", cm.KubeCert, "-D", cm.KubeData, "-c", cm.KubeCA, } if cm.Monitor.Duration > 0 { args = append(args, "-t", fmt.Sprintf("%d", cm.Monitor.Duration)) } return args } func (cm *ContainerMonitor) failExec(execID string, l zerolog.Logger, msg string) { l.Error().Msg(msg) oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{ "state": enum.FAILURE.EnumIndex(), }, execID) } func (cm *ContainerMonitor) LaunchMonitor(args []string, execID string, ns string, l zerolog.Logger) { ca, err := base64.StdEncoding.DecodeString(cm.KubeCA) if err != nil { cm.failExec(execID, l, "Failed to decode KubeCA: "+err.Error()) return } cert, err := base64.StdEncoding.DecodeString(cm.KubeCert) if err != nil { cm.failExec(execID, l, "Failed to decode KubeCert: "+err.Error()) return } key, err := base64.StdEncoding.DecodeString(cm.KubeData) if err != nil { cm.failExec(execID, l, "Failed to decode KubeData: "+err.Error()) return } cfg := &rest.Config{ Host: "https://" + cm.KubeHost + ":" + cm.KubePort, TLSClientConfig: rest.TLSClientConfig{ CAData: ca, CertData: cert, KeyData: key, }, } fmt.Println(ca, cert, key) clientset, err := kubernetes.NewForConfig(cfg) if err != nil { cm.failExec(execID, l, "Failed to build Kubernetes client: "+err.Error()) return } backoffLimit := int32(0) l.Info().Str("mongo_url", oclib.GetConfig().MongoUrl).Msg("Env vars for job") job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: "oc-monitord-" + execID, Namespace: ns, }, Spec: batchv1.JobSpec{ BackoffLimit: &backoffLimit, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Annotations: map[string]string{ "k8s.v1.cni.cncf.io/networks": "docker-oc-network", }, }, Spec: corev1.PodSpec{ RestartPolicy: corev1.RestartPolicyNever, Containers: []corev1.Container{ { Name: "oc-monitord", Image: cm.KubeImage, Args: args, Env: []corev1.EnvVar{ {Name: "OC_MONGO_URL", Value: oclib.GetConfig().MongoUrl}, {Name: "OC_MONGO_DATABASE", Value: oclib.GetConfig().MongoDatabase}, {Name: "OC_LOKI_URL", Value: oclib.GetConfig().LokiUrl}, {Name: "OC_NATS_URL", Value: oclib.GetConfig().NATSUrl}, }, }, }, }, }, }, } _, err = clientset.BatchV1().Jobs(ns).Create(context.Background(), job, metav1.CreateOptions{}) if err != nil { fmt.Println("Failed to create Kubernetes Job: ", err) cm.failExec(execID, l, "Failed to create Kubernetes Job: "+err.Error()) return } l.Info().Str("job", "oc-monitord-"+execID).Msg("Kubernetes Job created") go cm.watchJob(clientset, execID, ns, l) } func (cm *ContainerMonitor) watchJob(clientset *kubernetes.Clientset, execID string, ns string, l zerolog.Logger) { jobName := "oc-monitord-" + execID l = l.With().Str("job", jobName).Logger() // Poll until the pod spawned by the job appears (up to 60s) podName := "" for i := 0; i < 60; i++ { pods, err := clientset.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{ LabelSelector: "job-name=" + jobName, }) if err != nil { l.Error().Err(err).Msg("Failed to list pods for job") return } if len(pods.Items) > 0 { podName = pods.Items[0].Name break } time.Sleep(time.Second) } if podName == "" { l.Error().Msg("No pod found for job after 60s") return } l.Info().Str("pod", podName).Msg("Pod found for job") // Wait for the pod to be Running or terminal (up to 120s) for i := 0; i < 120; i++ { pod, err := clientset.CoreV1().Pods(ns).Get(context.Background(), podName, metav1.GetOptions{}) if err != nil { l.Error().Err(err).Str("pod", podName).Msg("Failed to get pod status") return } phase := pod.Status.Phase if phase == corev1.PodRunning || phase == corev1.PodSucceeded || phase == corev1.PodFailed { l.Info().Str("pod", podName).Str("phase", string(phase)).Msg("Pod phase") break } time.Sleep(time.Second) } // Stream pod logs req := clientset.CoreV1().Pods(ns).GetLogs(podName, &corev1.PodLogOptions{Follow: true}) stream, err := req.Stream(context.Background()) if err != nil { l.Error().Err(err).Str("pod", podName).Msg("Failed to stream pod logs") } else { defer stream.Close() l.Info().Str("pod", podName).Msg("Streaming pod logs") logExecution(stream, l) } // Log final job status job, err := clientset.BatchV1().Jobs(ns).Get(context.Background(), jobName, metav1.GetOptions{}) if err != nil { l.Error().Err(err).Msg("Failed to get final job status") return } if job.Status.Succeeded > 0 { l.Info().Msg("Job succeeded") } else { msg := fmt.Sprintf("Job failed with %d failed pod(s)", job.Status.Failed) cm.failExec(execID, l, msg) } }