Docker OC-MONITORD

This commit is contained in:
mr 2025-02-17 16:54:25 +01:00
parent 34547e8b2f
commit 91a87fbc4d
7 changed files with 244 additions and 150 deletions

212
main.go
View File

@ -2,18 +2,20 @@ package main
import ( import (
"bufio" "bufio"
"encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
"os" "os"
"os/exec" "os/exec"
"regexp" "regexp"
"strconv" "slices"
"strings" "strings"
"sync" "sync"
"oc-monitord/conf" "oc-monitord/conf"
"oc-monitord/models" "oc-monitord/models"
u "oc-monitord/utils"
"oc-monitord/workflow_builder" "oc-monitord/workflow_builder"
oclib "cloud.o-forge.io/core/oc-lib" oclib "cloud.o-forge.io/core/oc-lib"
@ -49,19 +51,10 @@ const defaultConfigFile = "/etc/oc/ocmonitord_conf.json"
const localConfigFile = "./conf/local_ocmonitord_conf.json" const localConfigFile = "./conf/local_ocmonitord_conf.json"
func main() { func main() {
os.Setenv("test_service", "true") // Only for service demo, delete before merging on main
// Test if monitor is launched outside (with parameters) or in a k8s environment (env variables sets) os.Setenv("test_service", "true") // Only for service demo, delete before merging on main
if os.Getenv("KUBERNETES_SERVICE_HOST") == "" { parser = *argparse.NewParser("oc-monitord", "Launch the execution of a workflow given as a parameter and sends the produced logs to a loki database")
// Not in a k8s environment, get conf from parameters loadConfig(false, &parser)
fmt.Println("Executes outside of k8s")
parser = *argparse.NewParser("oc-monitord", "Launch the execution of a workflow given as a parameter and sends the produced logs to a loki database")
loadConfig(false, &parser)
} else {
// Executed in a k8s environment
fmt.Println("Executes inside a k8s")
loadConfig(true, nil)
}
oclib.InitDaemon("oc-monitord") oclib.InitDaemon("oc-monitord")
oclib.SetConfig( oclib.SetConfig(
@ -76,7 +69,7 @@ func main() {
logger.Debug().Msg("Loki URL : " + conf.GetConfig().LokiURL) logger.Debug().Msg("Loki URL : " + conf.GetConfig().LokiURL)
logger.Debug().Msg("Workflow executed : " + conf.GetConfig().ExecutionID) logger.Debug().Msg("Workflow executed : " + conf.GetConfig().ExecutionID)
exec := getExecution(conf.GetConfig().ExecutionID) exec := u.GetExecution(conf.GetConfig().ExecutionID)
conf.GetConfig().WorkflowID = exec.WorkflowID conf.GetConfig().WorkflowID = exec.WorkflowID
logger.Debug().Msg("Starting construction of yaml argo for workflow :" + exec.WorkflowID) logger.Debug().Msg("Starting construction of yaml argo for workflow :" + exec.WorkflowID)
@ -106,33 +99,36 @@ func main() {
wf_logger = logger.With().Str("argo_name", workflowName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger() wf_logger = logger.With().Str("argo_name", workflowName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger()
wf_logger.Debug().Msg("Testing argo name") wf_logger.Debug().Msg("Testing argo name")
if os.Getenv("KUBERNETES_SERVICE_HOST") == "" { if conf.GetConfig().KubeHost == "" {
// Not in a k8s environment, get conf from parameters // Not in a k8s environment, get conf from parameters
fmt.Println("Executes outside of k8s") fmt.Println("Executes outside of k8s")
executeInside(argo_file_path, stepMax) executeOutside(argo_file_path, stepMax)
} else { } else {
// Executed in a k8s environment // Executed in a k8s environment
fmt.Println("Executes inside a k8s") fmt.Println("Executes inside a k8s")
loadConfig(true, nil) executeInside(exec.GetID(), "argo", argo_file_path, stepMax)
} }
} }
func getExecution(exec_id string) *workflow_execution.WorkflowExecution {
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", conf.GetConfig().PeerID, []string{}, nil).LoadOne(exec_id)
if res.Code != 200 {
logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id)
return nil
}
return res.ToWorkflowExecution()
}
// So far we only log the output from // So far we only log the output from
func executeInside(argo_file_path string, stepMax int) { func executeInside(execID string, ns string, argo_file_path string, stepMax int) {
t, err := tools2.NewService(conf.GetConfig().Mode) t, err := tools2.NewService(conf.GetConfig().Mode)
if err != nil { if err != nil {
logger.Error().Msg("Could not create KubernetesTool") logger.Error().Msg("Could not create KubernetesTool")
} }
t.CreateArgoWorkflow(argo_file_path) name, err := t.CreateArgoWorkflow(argo_file_path, ns)
t.LogWorkflow("argo", workflowName, argo_file_path, stepMax, logWorkflow) if err != nil {
logger.Error().Msg("Could not create argo workflow : " + err.Error())
} else {
split := strings.Split(argo_file_path, "_")
argoLogs := models.NewArgoLogs(split[0], "argo", stepMax)
argoLogs.StartStepRecording(argoLogs.NewWatch(), wf_logger)
err := t.LogWorkflow(execID, ns, name, argo_file_path, stepMax, argoLogs.NewWatch(), argoLogs.NewWatch(), argoLogs, []string{}, logWorkflow)
if err != nil {
logger.Error().Msg("Could not log workflow : " + err.Error())
}
}
} }
func executeOutside(argo_file_path string, stepMax int) { func executeOutside(argo_file_path string, stepMax int) {
@ -140,7 +136,7 @@ func executeOutside(argo_file_path string, stepMax int) {
var stdout, stderr io.ReadCloser var stdout, stderr io.ReadCloser
// var stderr io.ReadCloser // var stderr io.ReadCloser
var err error var err error
cmd := exec.Command("argo", "submit", "--log", "./argo_workflows/"+argo_file_path, "--serviceaccount=argo", "-n", "argo") cmd := exec.Command("argo", "submit", "--log", argo_file_path, "--serviceaccount=argo", "-n", "argo")
if stdout, err = cmd.StdoutPipe(); err != nil { if stdout, err = cmd.StdoutPipe(); err != nil {
wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error()) wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error())
return return
@ -149,7 +145,11 @@ func executeOutside(argo_file_path string, stepMax int) {
panic(err) panic(err)
} }
var wg sync.WaitGroup var wg sync.WaitGroup
go logWorkflow(argo_file_path, stepMax, stdout, &wg) split := strings.Split(argo_file_path, "_")
argoLogs := models.NewArgoLogs(split[0], "argo", stepMax)
argoLogs.StartStepRecording(argoLogs.NewWatch(), wf_logger)
argoLogs.IsStreaming = true
go logWorkflow(argo_file_path, stepMax, stdout, argoLogs.NewWatch(), argoLogs.NewWatch(), argoLogs, []string{}, &wg)
if err := cmd.Wait(); err != nil { if err := cmd.Wait(); err != nil {
wf_logger.Error().Msg("Could not execute argo submit") wf_logger.Error().Msg("Could not execute argo submit")
@ -161,33 +161,53 @@ func executeOutside(argo_file_path string, stepMax int) {
// We could improve this function by creating an object with the same attribute as the output // We could improve this function by creating an object with the same attribute as the output
// and only send a new log if the current object has different values than the previous // and only send a new log if the current object has different values than the previous
func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser, wg *sync.WaitGroup) { func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser,
var current_watch, previous_watch *models.ArgoWatch current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch,
split := strings.Split(argo_file_path, "_") argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup) {
argoLogs := models.NewArgoLogs(split[0], "argo", stepMax)
watch_output := make([]string, 0)
scanner := bufio.NewScanner(pipe) scanner := bufio.NewScanner(pipe)
count := 0
see := ""
seeit := 0
for scanner.Scan() { for scanner.Scan() {
log := scanner.Text() log := scanner.Text()
watch_output = append(watch_output, log) if strings.Contains(log, "capturing logs") && count == 0 {
if strings.Contains(log, "Progress:") { if !argoLogs.IsStreaming {
current_watch = argoLogs.StopStepRecording(watch_output) wg.Add(1)
watch_output = []string{} }
} else if strings.Contains(log, "sub-process exited") { seeit++
current_watch = argoLogs.StopStepRecording(watch_output) } else if count == 0 {
if argoLogs.IsStreaming {
continue
} else {
break
}
} }
if current_watch != nil && !current_watch.Equals(previous_watch) && current_watch.Name != "" { if count == 1 {
wg.Add(1) see = log
checkStatus(current_watch, previous_watch) if slices.Contains(argoLogs.Seen, see) && !argoLogs.IsStreaming {
wg.Done()
seeit--
break
}
}
if !slices.Contains(current_watch.Logs, log) {
current_watch.Logs = append(current_watch.Logs, log)
}
count++
if strings.Contains(log, "sub-process exited") {
current_watch = argoLogs.StopStepRecording(current_watch)
argoLogs.Seen = append(argoLogs.Seen, see)
if checkStatus(current_watch, previous_watch, argoLogs) {
count = 0
if !argoLogs.IsStreaming {
wg.Done()
}
seeit--
}
jsonified, err := json.Marshal(current_watch) jsonified, err := json.Marshal(current_watch)
if err != nil { if err != nil {
logger.Error().Msg("Could not create watch log") logger.Error().Msg("Could not create watch log")
} }
if strings.Contains(strings.ToLower(strings.Join(current_watch.Logs, " ")), "error") || strings.Contains(strings.ToLower(strings.ToLower(strings.Join(current_watch.Logs, " "))), "err") {
current_watch.Status = "Failed"
}
if current_watch.Status == "Failed" { if current_watch.Status == "Failed" {
wf_logger.Error().Msg(string(jsonified)) wf_logger.Error().Msg(string(jsonified))
} else { } else {
@ -195,8 +215,6 @@ func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser, wg *syn
} }
previous_watch = current_watch previous_watch = current_watch
current_watch = &models.ArgoWatch{} current_watch = &models.ArgoWatch{}
watch_output = []string{}
wg.Done()
} }
} }
} }
@ -212,44 +230,50 @@ func loadConfig(is_k8s bool, parser *argparse.Parser) {
} }
func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) { func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) {
if is_k8s { url := parser.String("u", "url", &argparse.Options{Required: true, Default: "http://127.0.0.1:3100", Help: "Url to the Loki database logs will be sent to"})
conf.GetConfig().LokiURL = o.GetStringDefault("lokiurl", "http://127.0.0.1:3100") mode := parser.String("M", "mode", &argparse.Options{Required: false, Default: "", Help: "Mode of the execution"})
i, err := strconv.Atoi(o.GetString("timeout")) execution := parser.String("e", "execution", &argparse.Options{Required: true, Help: "Execution ID of the workflow to request from oc-catalog API"})
if err == nil { peer := parser.String("p", "peer", &argparse.Options{Required: false, Default: "", Help: "Peer ID of the workflow to request from oc-catalog API"})
conf.GetConfig().Timeout = i mongo := parser.String("m", "mongo", &argparse.Options{Required: true, Default: "mongodb://127.0.0.1:27017", Help: "URL to reach the MongoDB"})
} else { db := parser.String("d", "database", &argparse.Options{Required: true, Default: "DC_myDC", Help: "Name of the database to query in MongoDB"})
logger.Error().Msg("Could not parse timeout, using default value") timeout := parser.Int("t", "timeout", &argparse.Options{Required: false, Default: -1, Help: "Timeout for the execution of the workflow"})
}
conf.GetConfig().ExecutionID = o.GetString("workflow")
conf.GetConfig().PeerID = o.GetString("peer")
mongo := o.GetStringDefault("mongourl", "mongodb://127.0.0.1:27017")
db := o.GetStringDefault("database", "DC_myDC")
conf.GetConfig().MongoURL = mongo ca := parser.String("c", "ca", &argparse.Options{Required: false, Default: "", Help: "CA file for the Kubernetes cluster"})
conf.GetConfig().Database = db cert := parser.String("C", "cert", &argparse.Options{Required: false, Default: "", Help: "Cert file for the Kubernetes cluster"})
} else { data := parser.String("D", "data", &argparse.Options{Required: false, Default: "", Help: "Data file for the Kubernetes cluster"})
url := parser.String("u", "url", &argparse.Options{Required: true, Default: "http://127.0.0.1:3100", Help: "Url to the Loki database logs will be sent to"})
mode := parser.String("M", "mode", &argparse.Options{Required: false, Default: "kubernetes", Help: "Mode of the execution"}) host := parser.String("H", "host", &argparse.Options{Required: false, Default: "", Help: "Host for the Kubernetes cluster"})
execution := parser.String("e", "execution", &argparse.Options{Required: true, Help: "Execution ID of the workflow to request from oc-catalog API"}) port := parser.String("P", "port", &argparse.Options{Required: false, Default: "6443", Help: "Port for the Kubernetes cluster"})
peer := parser.String("p", "peer", &argparse.Options{Required: false, Default: "", Help: "Peer ID of the workflow to request from oc-catalog API"})
mongo := parser.String("m", "mongo", &argparse.Options{Required: true, Default: "mongodb://127.0.0.1:27017", Help: "URL to reach the MongoDB"}) err := parser.Parse(os.Args)
db := parser.String("d", "database", &argparse.Options{Required: true, Default: "DC_myDC", Help: "Name of the database to query in MongoDB"}) if err != nil {
timeout := parser.Int("t", "timeout", &argparse.Options{Required: false, Default: -1, Help: "Timeout for the execution of the workflow"}) fmt.Println(parser.Usage(err))
err := parser.Parse(os.Args) os.Exit(1)
if err != nil {
fmt.Println(parser.Usage(err))
os.Exit(1)
}
conf.GetConfig().Logs = "debug"
conf.GetConfig().LokiURL = *url
conf.GetConfig().MongoURL = *mongo
conf.GetConfig().Database = *db
conf.GetConfig().Timeout = *timeout
conf.GetConfig().Mode = *mode
conf.GetConfig().ExecutionID = *execution
conf.GetConfig().PeerID = *peer
} }
conf.GetConfig().Logs = "debug"
conf.GetConfig().LokiURL = *url
conf.GetConfig().MongoURL = *mongo
conf.GetConfig().Database = *db
conf.GetConfig().Timeout = *timeout
conf.GetConfig().Mode = *mode
conf.GetConfig().ExecutionID = *execution
conf.GetConfig().PeerID = *peer
conf.GetConfig().KubeHost = *host
conf.GetConfig().KubePort = *port
decoded, err := base64.StdEncoding.DecodeString(*ca)
if err == nil {
conf.GetConfig().KubeCA = string(decoded)
}
decoded, err = base64.StdEncoding.DecodeString(*cert)
if err == nil {
conf.GetConfig().KubeCert = string(decoded)
}
decoded, err = base64.StdEncoding.DecodeString(*data)
if err == nil {
conf.GetConfig().KubeData = string(decoded)
}
} }
func initOnion(o *onion.Onion) *onion.Onion { func initOnion(o *onion.Onion) *onion.Onion {
@ -294,14 +318,24 @@ func getContainerName(argo_file string) string {
} }
// Uses the ArgoWatch object to update status of the workflow execution object // Uses the ArgoWatch object to update status of the workflow execution object
func checkStatus(current *models.ArgoWatch, previous *models.ArgoWatch) { func checkStatus(current *models.ArgoWatch, previous *models.ArgoWatch, argoLogs *models.ArgoLogs) bool {
if previous != nil && current.Status != previous.Status { if previous == nil || current.Status != previous.Status || argoLogs.IsStreaming {
argoLogs.StepCount += 1
if len(current.Logs) > 0 { if len(current.Logs) > 0 {
updateStatus(current.Status, current.Logs[0]) newLogs := []string{}
for _, log := range current.Logs {
if !slices.Contains(argoLogs.Logs, log) {
newLogs = append(newLogs, log)
}
}
updateStatus(current.Status, strings.Join(newLogs, "\n"))
current.Logs = newLogs
argoLogs.Logs = append(argoLogs.Logs, newLogs...)
} else { } else {
updateStatus(current.Status, "") updateStatus(current.Status, "")
} }
} }
return previous == nil || current.Status != previous.Status || argoLogs.IsStreaming
} }
func updateStatus(status string, log string) { func updateStatus(status string, log string) {

View File

@ -1,12 +1,14 @@
package models package models
import ( import (
"encoding/json"
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/acarl005/stripansi" "github.com/acarl005/stripansi"
"github.com/rs/zerolog"
) )
type ArgoWatch struct { type ArgoWatch struct {
@ -41,6 +43,7 @@ func NewArgoLogs(name string, namespace string, stepMax int) *ArgoLogs {
StepCount: 0, StepCount: 0,
StepMax: stepMax, StepMax: stepMax,
stop: false, stop: false,
Seen: []string{},
} }
} }
@ -52,19 +55,42 @@ type ArgoLogs struct {
StepMax int StepMax int
stop bool stop bool
Started time.Time Started time.Time
Seen []string
Logs []string
IsStreaming bool
} }
func (a *ArgoLogs) StartStepRecording() { func (a *ArgoLogs) NewWatch() *ArgoWatch {
return &ArgoWatch{
Name: a.Name,
Namespace: a.Namespace,
Status: "Pending",
Created: a.CreatedDate,
Started: a.Started.Format("2006-01-02 15:04:05"),
Conditions: Conditions{
PodRunning: a.StepCount > 0 && a.StepCount < a.StepMax,
Completed: a.StepCount == a.StepMax,
},
Progress: fmt.Sprintf("%v/%v", a.StepCount, a.StepMax),
Duration: "0s",
Logs: []string{},
}
}
func (a *ArgoLogs) StartStepRecording(current_watch *ArgoWatch, logger zerolog.Logger) {
jsonified, _ := json.Marshal(current_watch)
logger.Info().Msg(string(jsonified))
a.StepCount += 1 a.StepCount += 1
a.Started = time.Now() a.Started = time.Now()
} }
func (a *ArgoLogs) StopStepRecording(inputs []string) *ArgoWatch { func (a *ArgoLogs) StopStepRecording(current *ArgoWatch) *ArgoWatch {
fn := strings.Split(a.Name, "_") fn := strings.Split(a.Name, "_")
logs := []string{} logs := []string{}
err := false err := false
end := "" end := ""
for _, input := range inputs { for _, input := range current.Logs {
line := strings.TrimSpace(input) line := strings.TrimSpace(input)
if line == "" || !strings.Contains(line, fn[0]) || !strings.Contains(line, ":") { if line == "" || !strings.Contains(line, fn[0]) || !strings.Contains(line, ":") {
continue continue
@ -107,22 +133,13 @@ func (a *ArgoLogs) StopStepRecording(inputs []string) *ArgoWatch {
timeE, _ := time.Parse("2006-01-02T15:04:05", end) timeE, _ := time.Parse("2006-01-02T15:04:05", end)
duration = timeE.Sub(a.Started).Seconds() duration = timeE.Sub(a.Started).Seconds()
} }
argo := &ArgoWatch{ current.Conditions = Conditions{
Name: a.Name, PodRunning: a.StepCount > 0 && a.StepCount < a.StepMax,
Namespace: a.Namespace, Completed: a.StepCount == a.StepMax,
Status: status,
Created: a.CreatedDate,
Started: a.Started.Format("2006-01-02 15:04:05"),
Conditions: Conditions{
PodRunning: a.StepCount > 0 && a.StepCount < a.StepMax,
Completed: a.StepCount == a.StepMax,
},
Progress: fmt.Sprintf("%v/%v", a.StepCount, a.StepMax),
Duration: fmt.Sprintf("%v", fmt.Sprintf("%.2f", duration)+"s"),
Logs: logs,
} }
if !argo.Completed { current.Progress = fmt.Sprintf("%v/%v", a.StepCount, a.StepMax)
a.StartStepRecording() current.Duration = fmt.Sprintf("%v", fmt.Sprintf("%.2f", duration)+"s")
}
return argo current.Status = status
return current
} }

Binary file not shown.

View File

@ -3,14 +3,17 @@ package tools
import ( import (
"errors" "errors"
"io" "io"
"oc-monitord/models"
"sync" "sync"
) )
type Tool interface { type Tool interface {
CreateArgoWorkflow(path string) error CreateArgoWorkflow(path string, ns string) (string, error)
CreateAccessSecret(ns string, login string, password string) (string, error) CreateAccessSecret(ns string, login string, password string) (string, error)
LogWorkflow(namespace string, workflowName string, argoFilePath string, stepMax int, LogWorkflow(execID string, namespace string, workflowName string, argoFilePath string, stepMax int, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch,
logFunc func(argoFilePath string, stepMax int, pipe io.ReadCloser, wg *sync.WaitGroup)) error argoLogs *models.ArgoLogs, seen []string,
logFunc func(argoFilePath string, stepMax int, pipe io.ReadCloser, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch,
argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup)) error
} }
var _service = map[string]func() (Tool, error){ var _service = map[string]func() (Tool, error){

View File

@ -7,9 +7,13 @@ import (
"fmt" "fmt"
"io" "io"
"oc-monitord/conf" "oc-monitord/conf"
"oc-monitord/models"
"oc-monitord/utils"
"os" "os"
"sync" "sync"
"time"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
"github.com/google/uuid" "github.com/google/uuid"
@ -54,57 +58,75 @@ func NewKubernetesTool() (Tool, error) {
}, nil }, nil
} }
func (k *KubernetesTools) LogWorkflow(namespace string, workflowName string, argoFilePath string, stepMax int, func (k *KubernetesTools) LogWorkflow(execID string, namespace string, workflowName string, argoFilePath string, stepMax int, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs,
logFunc func(argoFilePath string, stepMax int, pipe io.ReadCloser, wg *sync.WaitGroup)) error { seen []string, logFunc func(argoFilePath string, stepMax int, pipe io.ReadCloser, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup)) error {
exec := utils.GetExecution(execID)
if exec == nil {
return errors.New("Could not retrieve workflow ID from execution ID " + execID)
}
if exec.State == enum.FAILURE || exec.State == enum.SUCCESS {
return nil
}
k.logWorkflow(namespace, workflowName, argoFilePath, stepMax, current_watch, previous_watch, argoLogs, seen, logFunc)
return k.LogWorkflow(execID, namespace, workflowName, argoFilePath, stepMax, current_watch, previous_watch, argoLogs, seen, logFunc)
}
func (k *KubernetesTools) logWorkflow(namespace string, workflowName string, argoFilePath string, stepMax int, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs,
seen []string,
logFunc func(argoFilePath string, stepMax int, pipe io.ReadCloser, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup)) error {
// List pods related to the Argo workflow // List pods related to the Argo workflow
labelSelector := fmt.Sprintf("workflows.argoproj.io/workflow=%s", workflowName) labelSelector := fmt.Sprintf("workflows.argoproj.io/workflow=%s", workflowName)
pods, err := k.Set.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{ for retries := 0; retries < 10; retries++ { // Retry for up to ~20 seconds
LabelSelector: labelSelector, // List workflow pods
}) wfPods, err := k.Set.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{
if err != nil { LabelSelector: labelSelector,
panic(fmt.Sprintf("failed to list pods: %v", err)) })
} if err != nil {
return err
if len(pods.Items) == 0 {
return errors.New("no pods found for the workflow")
}
var wg *sync.WaitGroup
// Stream logs from all matching pods
wg.Add(len(pods.Items))
for _, pod := range pods.Items {
for _, container := range pod.Spec.Containers {
fmt.Printf("streaming logs for Pod: %s, Container: %s\n", pod.Name, container.Name)
go k.streamLogs(namespace, pod.Name, container.Name, argoFilePath, stepMax, wg, logFunc)
} }
// If we found pods, stream logs
if len(wfPods.Items) > 0 {
var wg sync.WaitGroup
// Stream logs from all matching pods
for _, pod := range wfPods.Items {
for _, container := range pod.Spec.Containers {
wg.Add(1)
go k.streamLogs(namespace, pod.Name, container.Name, argoFilePath, stepMax, &wg, current_watch, previous_watch, argoLogs, seen, logFunc)
}
}
wg.Wait()
return nil
}
time.Sleep(2 * time.Second) // Wait before retrying
} }
wg.Wait() return errors.New("no pods found for the workflow")
return nil
} }
// Function to stream logs // Function to stream logs
func (k *KubernetesTools) streamLogs(namespace string, podName string, containerName string, func (k *KubernetesTools) streamLogs(namespace string, podName string, containerName string,
argoFilePath string, stepMax int, wg *sync.WaitGroup, argoFilePath string, stepMax int, wg *sync.WaitGroup, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, seen []string,
logFunc func(argo_file_path string, stepMax int, pipe io.ReadCloser, wg *sync.WaitGroup)) { logFunc func(argo_file_path string, stepMax int, pipe io.ReadCloser, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup)) {
req := k.Set.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{ req := k.Set.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
Container: containerName, Container: containerName, // Main container
Follow: true, // Equivalent to -f flag in kubectl logs Follow: true, // Equivalent to -f flag in kubectl logs
}) })
defer wg.Done()
// Open stream // Open stream
stream, err := req.Stream(context.TODO()) stream, err := req.Stream(context.Background())
if err != nil { if err != nil {
fmt.Printf("Error opening log stream for pod %s: %v\n", podName, err)
return return
} }
defer stream.Close() defer stream.Close()
logFunc(argoFilePath, stepMax, stream, wg) var internalWg sync.WaitGroup
logFunc(argoFilePath, stepMax, stream, current_watch, previous_watch, argoLogs, seen, &internalWg)
internalWg.Wait()
} }
func (k *KubernetesTools) CreateArgoWorkflow(path string) error { func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, error) {
// Read workflow YAML file // Read workflow YAML file
workflowYAML, err := os.ReadFile(path) workflowYAML, err := os.ReadFile(path)
if err != nil { if err != nil {
return err return "", err
} }
// Decode the YAML into a Workflow struct // Decode the YAML into a Workflow struct
scheme := runtime.NewScheme() scheme := runtime.NewScheme()
@ -114,21 +136,21 @@ func (k *KubernetesTools) CreateArgoWorkflow(path string) error {
obj, _, err := decode(workflowYAML, nil, nil) obj, _, err := decode(workflowYAML, nil, nil)
if err != nil { if err != nil {
return errors.New("failed to decode YAML: " + err.Error()) return "", errors.New("failed to decode YAML: " + err.Error())
} }
workflow, ok := obj.(*wfv1.Workflow) workflow, ok := obj.(*wfv1.Workflow)
if !ok { if !ok {
return errors.New("decoded object is not a Workflow") return "", errors.New("decoded object is not a Workflow")
} }
// Create the workflow in the "argo" namespace // Create the workflow in the "argo" namespace
createdWf, err := k.VersionedSet.ArgoprojV1alpha1().Workflows("argo").Create(context.TODO(), workflow, metav1.CreateOptions{}) createdWf, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(ns).Create(context.Background(), workflow, metav1.CreateOptions{})
if err != nil { if err != nil {
return errors.New("failed to create workflow: " + err.Error()) return "", errors.New("failed to create workflow: " + err.Error())
} }
fmt.Printf("workflow %s created in namespace %s\n", createdWf.Name, "argo") fmt.Printf("workflow %s created in namespace %s\n", createdWf.Name, "argo")
return nil return createdWf.Name, nil
} }
func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password string) (string, error) { func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password string) (string, error) {
@ -151,7 +173,7 @@ func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password s
Data: secretData, Data: secretData,
} }
// Create the Secret in Kubernetes // Create the Secret in Kubernetes
_, err := k.Set.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{}) _, err := k.Set.CoreV1().Secrets(namespace).Create(context.Background(), secret, metav1.CreateOptions{})
if err != nil { if err != nil {
return "", errors.New("Error creating secret: " + err.Error()) return "", errors.New("Error creating secret: " + err.Error())
} }

18
utils/utils.go Normal file
View File

@ -0,0 +1,18 @@
package utils
import (
"oc-monitord/conf"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
)
func GetExecution(exec_id string) *workflow_execution.WorkflowExecution {
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", conf.GetConfig().PeerID, []string{}, nil).LoadOne(exec_id)
if res.Code != 200 {
logger := oclib.GetLogger()
logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id)
return nil
}
return res.ToWorkflowExecution()
}

View File

@ -92,7 +92,7 @@ func (b *ArgoBuilder) CreateDAG(namespace string, write bool) (string, int, []st
logger.Error().Msg("Could not write the yaml file") logger.Error().Msg("Could not write the yaml file")
return "", 0, firstItems, lastItems, err return "", 0, firstItems, lastItems, err
} }
return file_name, len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil return workflows_dir + file_name, len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
} }
func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []VolumeMount) { func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []VolumeMount) {