Build up Clean Schedulerd + Lib Kube

This commit is contained in:
mr
2026-02-25 13:19:46 +01:00
parent 142a81197b
commit dbc41f0326
9 changed files with 315 additions and 193 deletions

View File

@@ -1,24 +1,30 @@
package daemons
import (
"bytes"
"encoding/json"
"context"
"encoding/base64"
"fmt"
"io"
"net/http"
"oc-schedulerd/conf"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"github.com/rs/zerolog"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
type ContainerMonitor struct {
Monitor LocalMonitor
KubeCA string
KubeCert string
KubeData string
KubeHost string
KubePort string
Monitor LocalMonitor
KubeCA string
KubeCert string
KubeData string
KubeHost string
KubePort string
KubeNamespace string
KubeImage string
}
func NewContainerMonitor(UUID string, peerId string, duration int) Executor {
@@ -31,11 +37,13 @@ func NewContainerMonitor(UUID string, peerId string, duration int) Executor {
MongoUrl: oclib.GetConfig().MongoUrl,
DBName: oclib.GetConfig().MongoDatabase,
},
KubeCA: conf.GetConfig().KubeCA,
KubeCert: conf.GetConfig().KubeCert,
KubeData: conf.GetConfig().KubeData,
KubeHost: conf.GetConfig().KubeHost,
KubePort: conf.GetConfig().KubePort,
KubeCA: conf.GetConfig().KubeCA,
KubeCert: conf.GetConfig().KubeCert,
KubeData: conf.GetConfig().KubeData,
KubeHost: conf.GetConfig().KubeHost,
KubePort: conf.GetConfig().KubePort,
KubeNamespace: conf.GetConfig().KubeNamespace,
KubeImage: conf.GetConfig().KubeImage,
}
}
@@ -60,88 +68,76 @@ func (cm *ContainerMonitor) PrepareMonitorExec() []string {
}
return args
}
// Contact the docker's API at the KubeHost's URL to :
// - Check if the image exists
// - Create the container
// - Start the container
func (cm *ContainerMonitor) LaunchMonitor(args []string, l zerolog.Logger) {
func (cm *ContainerMonitor) failExec(execID string, l zerolog.Logger, msg string) {
l.Error().Msg(msg)
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.FAILURE.EnumIndex(),
}, execID)
}
var containerID string
imageName := "oc-monitord"
url := "http://" + cm.KubeHost + ":2375"
func (cm *ContainerMonitor) LaunchMonitor(args []string, execID string, l zerolog.Logger) {
resp, err := http.Get(url + "/images/" + imageName + "/json")
ca, err := base64.StdEncoding.DecodeString(cm.KubeCA)
if err != nil {
l.Fatal().Msg("Error when contacting the docker API on " + url + ": " + err.Error())
cm.failExec(execID, l, "Failed to decode KubeCA: "+err.Error())
return
}
if resp.StatusCode != http.StatusOK {
d, _ := io.ReadAll(resp.Body)
l.Fatal().Msg("Couldn't find the oc-monitord image : " + string(d))
}
dataCreation := map[string]interface{}{"Image": imageName, "Cmd": args}
byteData, err := json.Marshal(dataCreation)
cert, err := base64.StdEncoding.DecodeString(cm.KubeCert)
if err != nil {
l.Fatal().Msg("Error when contacting the creating request body : " + err.Error())
cm.failExec(execID, l, "Failed to decode KubeCert: "+err.Error())
return
}
r, _ := http.NewRequest("POST", url+"/containers/create", bytes.NewBuffer(byteData))
r.Header.Add("Content-Type", "application/json")
resp, err = http.DefaultClient.Do(r)
key, err := base64.StdEncoding.DecodeString(cm.KubeData)
if err != nil {
l.Fatal().Msg("Error when contacting the docker API on " + url + ": " + err.Error())
}
if resp.StatusCode == 201 {
var d map[string]interface{}
b, err := io.ReadAll(resp.Body)
if err != nil {
l.Fatal().Msg(err.Error())
}
err = json.Unmarshal(b, &d)
if err != nil {
l.Fatal().Msg(err.Error())
}
containerID = d["Id"].(string)
} else {
d, _ := io.ReadAll(resp.Body)
l.Fatal().Msg("Error when creating the container on " + url + "\n " + string(d))
}
networkName := "oc"
dataNetwork, _ := json.Marshal(map[string]string{"Container": containerID})
r, _ = http.NewRequest("POST", url+"/networks/"+networkName+"/connect", bytes.NewBuffer(dataNetwork))
r.Header.Add("Content-Type", "application/json")
resp, err = http.DefaultClient.Do(r)
if err != nil {
l.Fatal().Msg("Error when contacting the docker API on " + url + ": " + err.Error())
}
if resp.StatusCode != 200 {
d, _ := io.ReadAll(resp.Body)
l.Error().Msg("Error when adding container to the network : " + string(d))
cm.failExec(execID, l, "Failed to decode KubeData: "+err.Error())
return
}
resp, err = http.Post(url+"/containers/"+containerID+"/start", "", nil)
cfg := &rest.Config{
Host: "https://" + cm.KubeHost + ":" + cm.KubePort,
TLSClientConfig: rest.TLSClientConfig{
CAData: ca,
CertData: cert,
KeyData: key,
},
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
l.Fatal().Msg("Error when contacting the docker API on " + url + ": " + err.Error())
cm.failExec(execID, l, "Failed to build Kubernetes client: "+err.Error())
return
}
if resp.StatusCode >= 300 {
d, _ := io.ReadAll(resp.Body)
l.Fatal().Msg("Error when starting the container on " + url + "\n " + string(d))
backoffLimit := int32(0)
job := &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "oc-monitord-" + execID,
Namespace: cm.KubeNamespace,
},
Spec: batchv1.JobSpec{
BackoffLimit: &backoffLimit,
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyNever,
Containers: []corev1.Container{
{
Name: "oc-monitord",
Image: cm.KubeImage,
Args: args,
},
},
},
},
},
}
l.Info().Msg("Started container " + containerID)
// we can add logging with GET /containers/id/logs?stdout=true&follow=true
_, err = clientset.BatchV1().Jobs(cm.KubeNamespace).Create(context.Background(), job, metav1.CreateOptions{})
if err != nil {
cm.failExec(execID, l, "Failed to create Kubernetes Job: "+err.Error())
return
}
// logExecution(stdoutMonitord, l)
l.Info().Msg("Started Kubernetes Job oc-monitord-" + execID)
}

View File

@@ -6,6 +6,7 @@ import (
"os/exec"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"github.com/rs/zerolog"
)
@@ -53,19 +54,26 @@ func (lm *LocalMonitor) PrepareMonitorExec() []string {
return args
}
func (lm *LocalMonitor) LaunchMonitor(args []string, l zerolog.Logger) {
func (lm *LocalMonitor) LaunchMonitor(args []string, execID string, l zerolog.Logger) {
cmd := exec.Command(conf.GetConfig().MonitorPath, args...)
fmt.Printf("Command : %v\n", cmd)
stdoutMonitord, err := cmd.StdoutPipe()
if err != nil {
l.Error().Msg("Could not retrieve stdoutpipe for execution of oc-monitord" + err.Error())
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.FAILURE.EnumIndex(),
}, execID)
return
}
err = cmd.Start()
if err != nil {
l.Error().Msg("Could not start oc-monitor for " + lm.ExecutionID + " : " + err.Error())
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.FAILURE.EnumIndex(),
}, execID)
return
}
logExecution(stdoutMonitord, l)
}

View File

@@ -1,7 +1,6 @@
package daemons
import (
"oc-schedulerd/conf"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
@@ -79,37 +78,16 @@ func (em *ExecutionManager) executeExecution(execution *workflow_execution.Workf
duration = int(execution.EndDate.Sub(execution.ExecDate).Seconds())
}
if conf.GetConfig().Mode == "local" {
executor = NewLocalMonitor(execution.UUID, execution.CreatorID, duration)
}
if conf.GetConfig().Mode == "container" {
executor = NewContainerMonitor(execution.UUID, execution.CreatorID, duration)
}
executor = NewContainerMonitor(execution.UUID, execution.CreatorID, duration)
if executor == nil {
logger.Fatal().Msg("Could not create executor")
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.FAILURE.EnumIndex(),
}, execution.GetID())
return
}
args := executor.PrepareMonitorExec()
executor.LaunchMonitor(args, logger)
// if exec_method == "k8s" {
// logger.Error().Msg("TODO : executing oc-monitor in a k8s")
// } else {
// logger.Debug().Msg("Executing oc-monitor localy")
// duration := 0
// if Execution.EndDate != nil {
// duration = int(Execution.EndDate.Sub(Execution.ExecDate).Seconds())
// }
// monitor := LocalMonitor{
// Logger: logger,
// Duration: duration,
// ExecutionID: Execution.ExecutionsID,
// PeerID: Execution.CreatorID,
// LokiUrl: conf.GetConfig().LokiUrl,
// }
// monitor.LaunchLocalMonitor()
// }
executor.LaunchMonitor(args, execution.GetID(), logger)
}

View File

@@ -9,7 +9,7 @@ import (
type Executor interface {
PrepareMonitorExec() []string
LaunchMonitor(args []string, l zerolog.Logger)
LaunchMonitor(args []string, execID string, l zerolog.Logger)
}
func logExecution(reader io.ReadCloser, l zerolog.Logger) {
@@ -18,4 +18,4 @@ func logExecution(reader io.ReadCloser, l zerolog.Logger) {
output := scanner.Text()
l.Debug().Msg(output)
}
}
}

View File

@@ -9,7 +9,6 @@ import (
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
@@ -25,7 +24,7 @@ type ScheduledExecution struct {
func (sb *ScheduledExecution) DeleteSchedules(resp tools.NATSResponse) {
var m map[string]string
json.Unmarshal(resp.Payload, m)
json.Unmarshal(resp.Payload, &m)
Executions.Mu.Lock()
defer Executions.Mu.Unlock()
delete(sb.Execs, m["id"])
@@ -38,6 +37,9 @@ func (sb *ScheduledExecution) AddSchedules(new_executions []*workflow_execution.
fmt.Println("Adding "+exec.UUID, !sb.execIsSet(exec))
if !sb.execIsSet(exec) {
sb.Execs[exec.UUID] = *exec
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.SCHEDULED.EnumIndex(),
}, exec.GetID())
}
}
}
@@ -91,18 +93,19 @@ func (s *ScheduleManager) getExecution(from time.Time, to time.Time) (exec_list
func (s *ScheduleManager) ExecuteWorkflow(resp tools.NATSResponse) {
var m map[string]string
json.Unmarshal(resp.Payload, &m)
res := resources.WorkflowResource{}
access := res.GetAccessor(&tools.APIRequest{})
if d, code, err := access.LoadOne(m["workflow_id"]); code == 200 && err == nil {
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW), nil)
if d := access.LoadOne(fmt.Sprintf("%v", m["workflow_id"])); d.Err == "" {
eventExec := &workflow_execution.WorkflowExecution{
WorkflowID: d.GetID(),
WorkflowID: d.Data.GetID(),
ExecDate: time.Now(),
ExecutionsID: uuid.New().String(),
State: enum.SCHEDULED,
}
Executions.AddSchedules([]*workflow_execution.WorkflowExecution{eventExec}, s.Logger)
exec := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).StoreOne(eventExec.Serialize(eventExec))
if execc := exec.ToWorkflowExecution(); execc != nil {
Executions.AddSchedules([]*workflow_execution.WorkflowExecution{execc}, s.Logger)
}
}
}
func (s *ScheduleManager) GetNextScheduledWorkflows(_ tools.NATSResponse) {