From 90fa0b8edd79298380e51d4456db801d940ec57c Mon Sep 17 00:00:00 2001 From: pb Date: Fri, 25 Apr 2025 11:14:54 +0200 Subject: [PATCH] Divided the execution between local and container and created an interface responsible for preparing and launching the execution --- conf/conf.go | 24 ++--- daemons/execute_monitor_container.go | 132 +++++++++++++++++++++++++++ daemons/execute_monitor_local.go | 56 +++++++----- daemons/execution_manager.go | 56 ++++++++---- daemons/interface.go | 21 +++++ 5 files changed, 237 insertions(+), 52 deletions(-) create mode 100644 daemons/execute_monitor_container.go create mode 100644 daemons/interface.go diff --git a/conf/conf.go b/conf/conf.go index ed29337..3e03005 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -8,18 +8,18 @@ import ( ) type Config struct { - MonitorPath string - MongoUrl string - DBName string - Logs string - LokiUrl string - NatsUrl string - Mode string - KubeHost string - KubePort string - KubeCA string - KubeCert string - KubeData string + MonitorPath string + MongoUrl string + DBName string + Logs string + LokiUrl string + NatsUrl string + Mode string + KubeHost string + KubePort string + KubeCA string + KubeCert string + KubeData string } var instance *Config diff --git a/daemons/execute_monitor_container.go b/daemons/execute_monitor_container.go new file mode 100644 index 0000000..57b5642 --- /dev/null +++ b/daemons/execute_monitor_container.go @@ -0,0 +1,132 @@ +package daemons + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "oc-schedulerd/conf" + + "github.com/rs/zerolog" +) + +type ContainerMonitor struct { + Monitor LocalMonitor + KubeCA string + KubeCert string + KubeData string + KubeHost string + KubePort string +} + +func NewContainerMonitor(executionsId string, peerId string, duration int) (Executor){ + return &ContainerMonitor{ + Monitor: LocalMonitor{ + ExecutionID: executionsId, + PeerID: peerId, + Duration: duration, + LokiUrl: conf.GetConfig().LokiUrl, + MongoUrl: conf.GetConfig().MongoUrl, + DBName: conf.GetConfig().DBName, + }, + KubeCA: conf.GetConfig().KubeCA, + KubeCert: conf.GetConfig().KubeCert, + KubeData: conf.GetConfig().KubeData, + KubeHost: conf.GetConfig().KubeHost, + KubePort: conf.GetConfig().KubePort, + } +} + +func (cm *ContainerMonitor) PrepareMonitorExec() []string { + + args := []string{ + "-e", cm.Monitor.ExecutionID, + "-p", cm.Monitor.PeerID, + "-u", cm.Monitor.LokiUrl, + "-m", cm.Monitor.MongoUrl, + "-d", cm.Monitor.DBName, + "-M", "kubernetes", + "-H", cm.KubeHost, + "-P", cm.KubePort, + "-C", cm.KubeCert, + "-D", cm.KubeData, + "-c", cm.KubeCA, + } + + if cm.Monitor.Duration > 0 { + args = append(args, "-t", fmt.Sprintf("%d", cm.Monitor.Duration)) + } + + return args + +} + +// Contact the docker's API at the KubeHost's URL to : +// - Check if the image exists +// - Create the container +// - Start the container +func (cm *ContainerMonitor) LaunchMonitor(args []string, l zerolog.Logger) { + + var containerID string + imageName := "oc-monitord" + url := "http://" + conf.GetConfig().KubeHost + ":2375" + + resp, err := http.Get(url + "/images/" + imageName + "/json") + if err != nil { + l.Fatal().Msg("Error when contacting the docker API on " + conf.GetConfig().KubeHost + ": " + err.Error()) + } + + if resp.StatusCode != http.StatusOK { + d, _ := io.ReadAll(resp.Body) + l.Fatal().Msg("Couldn't find the oc-monitord image : " + string(d)) + } + + dataCreation := map[string]interface{}{"Image": imageName, "Cmd" : args} + byteData, err := json.Marshal(dataCreation) + if err != nil { + l.Fatal().Msg("Error when contacting the creating request body : " + err.Error()) + } + + r, _ := http.NewRequest("POST",url + "/containers/create", bytes.NewBuffer(byteData)) + r.Header.Add("Content-Type","application/json") + resp, err = http.DefaultClient.Do(r) + if err != nil { + l.Fatal().Msg("Error when contacting the docker API on " + conf.GetConfig().KubeHost + ": " + err.Error()) + } + + if resp.StatusCode == 201 { + var d map[string]interface{} + + b, err := io.ReadAll(resp.Body) + if err != nil { + l.Fatal().Msg(err.Error()) + } + + err = json.Unmarshal(b, &d) + if err != nil { + l.Fatal().Msg(err.Error()) + } + + containerID = d["Id"].(string) + } else { + d, _ := io.ReadAll(resp.Body) + l.Fatal().Msg("Error when creating the container on " + conf.GetConfig().KubeHost + "\n " + string(d)) + } + + + resp, err = http.Post( url + "/containers/" + containerID + "/start", "", nil) + if err != nil { + l.Fatal().Msg("Error when contacting the docker API on " + conf.GetConfig().KubeHost + ": " + err.Error()) + } + + if resp.StatusCode >= 300 { + d, _ := io.ReadAll(resp.Body) + l.Fatal().Msg("Error when starting the container on " + conf.GetConfig().KubeHost + "\n " + string(d)) + } + + l.Info().Msg("Started container " + containerID) + // we can add logging with GET /containers/id/logs?stdout=true&follow=true + + // logExecution(stdoutMonitord, l) +} \ No newline at end of file diff --git a/daemons/execute_monitor_local.go b/daemons/execute_monitor_local.go index c2eabf4..33c57bf 100644 --- a/daemons/execute_monitor_local.go +++ b/daemons/execute_monitor_local.go @@ -1,12 +1,10 @@ package daemons import ( - "bufio" "fmt" "oc-schedulerd/conf" "os/exec" - oclib "cloud.o-forge.io/core/oc-lib" "github.com/rs/zerolog" ) @@ -14,50 +12,62 @@ type LocalMonitor struct { ExecutionID string PeerID string Duration int - Logger zerolog.Logger + LokiUrl string + MongoUrl string + DBName string + } -func (lm *LocalMonitor) LaunchLocalMonitor() { - if lm.ExecutionID == "" { - lm.Logger.Error().Msg("Missing parameter in LocalMonitor") +func NewLocalMonitor(executionsId string, peerId string, duration int) (Executor){ + return &LocalMonitor{ + ExecutionID: executionsId, + PeerID: peerId, + Duration: duration, + LokiUrl: conf.GetConfig().LokiUrl, + MongoUrl: conf.GetConfig().MongoUrl, + DBName: conf.GetConfig().DBName, } - lm.execKube() } -func (lm *LocalMonitor) execKube() { +// func (lm *LocalMonitor) LaunchLocalMonitor() { +// if lm.ExecutionID == "" { +// lm.Logger.Error().Msg("Missing parameter in LocalMonitor") +// } - l := oclib.GetLogger() +// } + +func (lm *LocalMonitor) PrepareMonitorExec() []string { args := []string{ - "-e", lm.ExecutionID, "-p", lm.PeerID, "-u", conf.GetConfig().LokiUrl, "-m", conf.GetConfig().MongoUrl, - "-d", conf.GetConfig().DBName, + "-e", lm.ExecutionID, + "-p", lm.PeerID, + "-u", lm.LokiUrl, + "-m", lm.MongoUrl, + "-d", lm.DBName, } - if conf.GetConfig().Mode == "kubernetes" { - args = append(args, []string{"-M", conf.GetConfig().Mode, "-H", conf.GetConfig().KubeHost, "-P", conf.GetConfig().KubePort, - "-C", conf.GetConfig().KubeCert, "-D", conf.GetConfig().KubeData, "-c", conf.GetConfig().KubeCA}...) - } + if lm.Duration > 0 { args = append(args, "-t", fmt.Sprintf("%d", lm.Duration)) } + return args +} + +func (lm *LocalMonitor) LaunchMonitor(args []string, l zerolog.Logger) { cmd := exec.Command(conf.GetConfig().MonitorPath, args...) fmt.Printf("Command : %v\n", cmd) - stdoutMonitord, err := cmd.StdoutPipe(); + stdoutMonitord, err := cmd.StdoutPipe() if err != nil { l.Error().Msg("Could not retrieve stdoutpipe for execution of oc-monitord" + err.Error()) - return } err = cmd.Start() if err != nil { - lm.Logger.Error().Msg("Could not start oc-monitor for " + lm.ExecutionID + " : " + err.Error()) + l.Error().Msg("Could not start oc-monitor for " + lm.ExecutionID + " : " + err.Error()) } - scanner := bufio.NewScanner(stdoutMonitord) - for scanner.Scan() { - output := scanner.Text() - l.Debug().Msg(output) - } + logExecution(stdoutMonitord, l) } + diff --git a/daemons/execution_manager.go b/daemons/execution_manager.go index 352087c..fcb48d4 100644 --- a/daemons/execution_manager.go +++ b/daemons/execution_manager.go @@ -2,7 +2,7 @@ package daemons import ( "fmt" - "os" + "oc-schedulerd/conf" "time" oclib "cloud.o-forge.io/core/oc-lib" @@ -38,22 +38,44 @@ func (em *ExecutionManager) RetrieveNextExecutions() { func (em *ExecutionManager) executeExecution(Execution *workflow_execution.WorkflowExecution) { // start execution // create the yaml that describes the pod : filename, path/url to Loki - exec_method := os.Getenv("MONITOR_METHOD") + var executor Executor + // exec_method := os.Getenv("MONITOR_METHOD") logger := oclib.GetLogger() - if exec_method == "k8s" { - logger.Error().Msg("TODO : executing oc-monitor in a k8s") - } else { - logger.Debug().Msg("Executing oc-monitor localy") - duration := 0 - if Execution.EndDate != nil { - duration = int(Execution.EndDate.Sub(Execution.ExecDate).Seconds()) - } - monitor := LocalMonitor{ - Logger: logger, - Duration: duration, - ExecutionID: Execution.ExecutionsID, - PeerID: Execution.CreatorID, - } - monitor.LaunchLocalMonitor() + duration := 0 + if Execution.EndDate != nil { + duration = int(Execution.EndDate.Sub(Execution.ExecDate).Seconds()) } + + if conf.GetConfig().Mode == "local" { + executor = NewLocalMonitor(Execution.ExecutionsID, Execution.CreatorID, duration) + } + + if conf.GetConfig().Mode == "container" { + executor = NewContainerMonitor(Execution.ExecutionsID, Execution.CreatorID, duration) + } + + if executor == nil { + logger.Fatal().Msg("Could not create logger") + } + args := executor.PrepareMonitorExec() + executor.LaunchMonitor(args,logger) + + // if exec_method == "k8s" { + // logger.Error().Msg("TODO : executing oc-monitor in a k8s") + // } else { + // logger.Debug().Msg("Executing oc-monitor localy") + // duration := 0 + // if Execution.EndDate != nil { + // duration = int(Execution.EndDate.Sub(Execution.ExecDate).Seconds()) + // } + // monitor := LocalMonitor{ + // Logger: logger, + // Duration: duration, + // ExecutionID: Execution.ExecutionsID, + // PeerID: Execution.CreatorID, + // LokiUrl: conf.GetConfig().LokiUrl, + + // } + // monitor.LaunchLocalMonitor() + // } } diff --git a/daemons/interface.go b/daemons/interface.go new file mode 100644 index 0000000..7b5f4fe --- /dev/null +++ b/daemons/interface.go @@ -0,0 +1,21 @@ +package daemons + +import ( + "bufio" + "io" + + "github.com/rs/zerolog" +) + +type Executor interface { + PrepareMonitorExec() []string + LaunchMonitor(args []string, l zerolog.Logger) +} + +func logExecution(reader io.ReadCloser, l zerolog.Logger) { + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + output := scanner.Text() + l.Debug().Msg(output) + } +} \ No newline at end of file