Divided the execution between local and container and created an interface responsible for preparing and launching the execution

This commit is contained in:
pb 2025-04-25 11:14:54 +02:00
parent b43cb6d758
commit 90fa0b8edd
5 changed files with 237 additions and 52 deletions

View File

@ -8,18 +8,18 @@ import (
) )
type Config struct { type Config struct {
MonitorPath string MonitorPath string
MongoUrl string MongoUrl string
DBName string DBName string
Logs string Logs string
LokiUrl string LokiUrl string
NatsUrl string NatsUrl string
Mode string Mode string
KubeHost string KubeHost string
KubePort string KubePort string
KubeCA string KubeCA string
KubeCert string KubeCert string
KubeData string KubeData string
} }
var instance *Config var instance *Config

View File

@ -0,0 +1,132 @@
package daemons
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"oc-schedulerd/conf"
"github.com/rs/zerolog"
)
type ContainerMonitor struct {
Monitor LocalMonitor
KubeCA string
KubeCert string
KubeData string
KubeHost string
KubePort string
}
func NewContainerMonitor(executionsId string, peerId string, duration int) (Executor){
return &ContainerMonitor{
Monitor: LocalMonitor{
ExecutionID: executionsId,
PeerID: peerId,
Duration: duration,
LokiUrl: conf.GetConfig().LokiUrl,
MongoUrl: conf.GetConfig().MongoUrl,
DBName: conf.GetConfig().DBName,
},
KubeCA: conf.GetConfig().KubeCA,
KubeCert: conf.GetConfig().KubeCert,
KubeData: conf.GetConfig().KubeData,
KubeHost: conf.GetConfig().KubeHost,
KubePort: conf.GetConfig().KubePort,
}
}
func (cm *ContainerMonitor) PrepareMonitorExec() []string {
args := []string{
"-e", cm.Monitor.ExecutionID,
"-p", cm.Monitor.PeerID,
"-u", cm.Monitor.LokiUrl,
"-m", cm.Monitor.MongoUrl,
"-d", cm.Monitor.DBName,
"-M", "kubernetes",
"-H", cm.KubeHost,
"-P", cm.KubePort,
"-C", cm.KubeCert,
"-D", cm.KubeData,
"-c", cm.KubeCA,
}
if cm.Monitor.Duration > 0 {
args = append(args, "-t", fmt.Sprintf("%d", cm.Monitor.Duration))
}
return args
}
// Contact the docker's API at the KubeHost's URL to :
// - Check if the image exists
// - Create the container
// - Start the container
func (cm *ContainerMonitor) LaunchMonitor(args []string, l zerolog.Logger) {
var containerID string
imageName := "oc-monitord"
url := "http://" + conf.GetConfig().KubeHost + ":2375"
resp, err := http.Get(url + "/images/" + imageName + "/json")
if err != nil {
l.Fatal().Msg("Error when contacting the docker API on " + conf.GetConfig().KubeHost + ": " + err.Error())
}
if resp.StatusCode != http.StatusOK {
d, _ := io.ReadAll(resp.Body)
l.Fatal().Msg("Couldn't find the oc-monitord image : " + string(d))
}
dataCreation := map[string]interface{}{"Image": imageName, "Cmd" : args}
byteData, err := json.Marshal(dataCreation)
if err != nil {
l.Fatal().Msg("Error when contacting the creating request body : " + err.Error())
}
r, _ := http.NewRequest("POST",url + "/containers/create", bytes.NewBuffer(byteData))
r.Header.Add("Content-Type","application/json")
resp, err = http.DefaultClient.Do(r)
if err != nil {
l.Fatal().Msg("Error when contacting the docker API on " + conf.GetConfig().KubeHost + ": " + err.Error())
}
if resp.StatusCode == 201 {
var d map[string]interface{}
b, err := io.ReadAll(resp.Body)
if err != nil {
l.Fatal().Msg(err.Error())
}
err = json.Unmarshal(b, &d)
if err != nil {
l.Fatal().Msg(err.Error())
}
containerID = d["Id"].(string)
} else {
d, _ := io.ReadAll(resp.Body)
l.Fatal().Msg("Error when creating the container on " + conf.GetConfig().KubeHost + "\n " + string(d))
}
resp, err = http.Post( url + "/containers/" + containerID + "/start", "", nil)
if err != nil {
l.Fatal().Msg("Error when contacting the docker API on " + conf.GetConfig().KubeHost + ": " + err.Error())
}
if resp.StatusCode >= 300 {
d, _ := io.ReadAll(resp.Body)
l.Fatal().Msg("Error when starting the container on " + conf.GetConfig().KubeHost + "\n " + string(d))
}
l.Info().Msg("Started container " + containerID)
// we can add logging with GET /containers/id/logs?stdout=true&follow=true
// logExecution(stdoutMonitord, l)
}

View File

@ -1,12 +1,10 @@
package daemons package daemons
import ( import (
"bufio"
"fmt" "fmt"
"oc-schedulerd/conf" "oc-schedulerd/conf"
"os/exec" "os/exec"
oclib "cloud.o-forge.io/core/oc-lib"
"github.com/rs/zerolog" "github.com/rs/zerolog"
) )
@ -14,50 +12,62 @@ type LocalMonitor struct {
ExecutionID string ExecutionID string
PeerID string PeerID string
Duration int Duration int
Logger zerolog.Logger LokiUrl string
MongoUrl string
DBName string
} }
func (lm *LocalMonitor) LaunchLocalMonitor() { func NewLocalMonitor(executionsId string, peerId string, duration int) (Executor){
if lm.ExecutionID == "" { return &LocalMonitor{
lm.Logger.Error().Msg("Missing parameter in LocalMonitor") ExecutionID: executionsId,
PeerID: peerId,
Duration: duration,
LokiUrl: conf.GetConfig().LokiUrl,
MongoUrl: conf.GetConfig().MongoUrl,
DBName: conf.GetConfig().DBName,
} }
lm.execKube()
} }
func (lm *LocalMonitor) execKube() { // func (lm *LocalMonitor) LaunchLocalMonitor() {
// if lm.ExecutionID == "" {
// lm.Logger.Error().Msg("Missing parameter in LocalMonitor")
// }
l := oclib.GetLogger() // }
func (lm *LocalMonitor) PrepareMonitorExec() []string {
args := []string{ args := []string{
"-e", lm.ExecutionID, "-p", lm.PeerID, "-u", conf.GetConfig().LokiUrl, "-m", conf.GetConfig().MongoUrl, "-e", lm.ExecutionID,
"-d", conf.GetConfig().DBName, "-p", lm.PeerID,
"-u", lm.LokiUrl,
"-m", lm.MongoUrl,
"-d", lm.DBName,
} }
if conf.GetConfig().Mode == "kubernetes" {
args = append(args, []string{"-M", conf.GetConfig().Mode, "-H", conf.GetConfig().KubeHost, "-P", conf.GetConfig().KubePort,
"-C", conf.GetConfig().KubeCert, "-D", conf.GetConfig().KubeData, "-c", conf.GetConfig().KubeCA}...)
}
if lm.Duration > 0 { if lm.Duration > 0 {
args = append(args, "-t", fmt.Sprintf("%d", lm.Duration)) args = append(args, "-t", fmt.Sprintf("%d", lm.Duration))
} }
return args
}
func (lm *LocalMonitor) LaunchMonitor(args []string, l zerolog.Logger) {
cmd := exec.Command(conf.GetConfig().MonitorPath, args...) cmd := exec.Command(conf.GetConfig().MonitorPath, args...)
fmt.Printf("Command : %v\n", cmd) fmt.Printf("Command : %v\n", cmd)
stdoutMonitord, err := cmd.StdoutPipe(); stdoutMonitord, err := cmd.StdoutPipe()
if err != nil { if err != nil {
l.Error().Msg("Could not retrieve stdoutpipe for execution of oc-monitord" + err.Error()) l.Error().Msg("Could not retrieve stdoutpipe for execution of oc-monitord" + err.Error())
return
} }
err = cmd.Start() err = cmd.Start()
if err != nil { if err != nil {
lm.Logger.Error().Msg("Could not start oc-monitor for " + lm.ExecutionID + " : " + err.Error()) l.Error().Msg("Could not start oc-monitor for " + lm.ExecutionID + " : " + err.Error())
} }
scanner := bufio.NewScanner(stdoutMonitord) logExecution(stdoutMonitord, l)
for scanner.Scan() {
output := scanner.Text()
l.Debug().Msg(output)
}
} }

View File

@ -2,7 +2,7 @@ package daemons
import ( import (
"fmt" "fmt"
"os" "oc-schedulerd/conf"
"time" "time"
oclib "cloud.o-forge.io/core/oc-lib" oclib "cloud.o-forge.io/core/oc-lib"
@ -38,22 +38,44 @@ func (em *ExecutionManager) RetrieveNextExecutions() {
func (em *ExecutionManager) executeExecution(Execution *workflow_execution.WorkflowExecution) { func (em *ExecutionManager) executeExecution(Execution *workflow_execution.WorkflowExecution) {
// start execution // start execution
// create the yaml that describes the pod : filename, path/url to Loki // create the yaml that describes the pod : filename, path/url to Loki
exec_method := os.Getenv("MONITOR_METHOD") var executor Executor
// exec_method := os.Getenv("MONITOR_METHOD")
logger := oclib.GetLogger() logger := oclib.GetLogger()
if exec_method == "k8s" { duration := 0
logger.Error().Msg("TODO : executing oc-monitor in a k8s") if Execution.EndDate != nil {
} else { duration = int(Execution.EndDate.Sub(Execution.ExecDate).Seconds())
logger.Debug().Msg("Executing oc-monitor localy")
duration := 0
if Execution.EndDate != nil {
duration = int(Execution.EndDate.Sub(Execution.ExecDate).Seconds())
}
monitor := LocalMonitor{
Logger: logger,
Duration: duration,
ExecutionID: Execution.ExecutionsID,
PeerID: Execution.CreatorID,
}
monitor.LaunchLocalMonitor()
} }
if conf.GetConfig().Mode == "local" {
executor = NewLocalMonitor(Execution.ExecutionsID, Execution.CreatorID, duration)
}
if conf.GetConfig().Mode == "container" {
executor = NewContainerMonitor(Execution.ExecutionsID, Execution.CreatorID, duration)
}
if executor == nil {
logger.Fatal().Msg("Could not create logger")
}
args := executor.PrepareMonitorExec()
executor.LaunchMonitor(args,logger)
// if exec_method == "k8s" {
// logger.Error().Msg("TODO : executing oc-monitor in a k8s")
// } else {
// logger.Debug().Msg("Executing oc-monitor localy")
// duration := 0
// if Execution.EndDate != nil {
// duration = int(Execution.EndDate.Sub(Execution.ExecDate).Seconds())
// }
// monitor := LocalMonitor{
// Logger: logger,
// Duration: duration,
// ExecutionID: Execution.ExecutionsID,
// PeerID: Execution.CreatorID,
// LokiUrl: conf.GetConfig().LokiUrl,
// }
// monitor.LaunchLocalMonitor()
// }
} }

21
daemons/interface.go Normal file
View File

@ -0,0 +1,21 @@
package daemons
import (
"bufio"
"io"
"github.com/rs/zerolog"
)
type Executor interface {
PrepareMonitorExec() []string
LaunchMonitor(args []string, l zerolog.Logger)
}
func logExecution(reader io.ReadCloser, l zerolog.Logger) {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
output := scanner.Text()
l.Debug().Msg(output)
}
}