diff --git a/main.go b/main.go index 6787182..46f0513 100644 --- a/main.go +++ b/main.go @@ -2,18 +2,20 @@ package main import ( "bufio" + "encoding/base64" "encoding/json" "fmt" "io" "os" "os/exec" "regexp" - "strconv" + "slices" "strings" "sync" "oc-monitord/conf" "oc-monitord/models" + u "oc-monitord/utils" "oc-monitord/workflow_builder" oclib "cloud.o-forge.io/core/oc-lib" @@ -49,19 +51,10 @@ const defaultConfigFile = "/etc/oc/ocmonitord_conf.json" const localConfigFile = "./conf/local_ocmonitord_conf.json" func main() { - os.Setenv("test_service", "true") // Only for service demo, delete before merging on main - // Test if monitor is launched outside (with parameters) or in a k8s environment (env variables sets) - if os.Getenv("KUBERNETES_SERVICE_HOST") == "" { - // Not in a k8s environment, get conf from parameters - fmt.Println("Executes outside of k8s") - parser = *argparse.NewParser("oc-monitord", "Launch the execution of a workflow given as a parameter and sends the produced logs to a loki database") - loadConfig(false, &parser) - } else { - // Executed in a k8s environment - fmt.Println("Executes inside a k8s") - loadConfig(true, nil) - } + os.Setenv("test_service", "true") // Only for service demo, delete before merging on main + parser = *argparse.NewParser("oc-monitord", "Launch the execution of a workflow given as a parameter and sends the produced logs to a loki database") + loadConfig(false, &parser) oclib.InitDaemon("oc-monitord") oclib.SetConfig( @@ -76,7 +69,7 @@ func main() { logger.Debug().Msg("Loki URL : " + conf.GetConfig().LokiURL) logger.Debug().Msg("Workflow executed : " + conf.GetConfig().ExecutionID) - exec := getExecution(conf.GetConfig().ExecutionID) + exec := u.GetExecution(conf.GetConfig().ExecutionID) conf.GetConfig().WorkflowID = exec.WorkflowID logger.Debug().Msg("Starting construction of yaml argo for workflow :" + exec.WorkflowID) @@ -106,33 +99,36 @@ func main() { wf_logger = logger.With().Str("argo_name", workflowName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger() wf_logger.Debug().Msg("Testing argo name") - if os.Getenv("KUBERNETES_SERVICE_HOST") == "" { + if conf.GetConfig().KubeHost == "" { // Not in a k8s environment, get conf from parameters fmt.Println("Executes outside of k8s") - executeInside(argo_file_path, stepMax) + executeOutside(argo_file_path, stepMax) } else { // Executed in a k8s environment fmt.Println("Executes inside a k8s") - loadConfig(true, nil) + executeInside(exec.GetID(), "argo", argo_file_path, stepMax) } } -func getExecution(exec_id string) *workflow_execution.WorkflowExecution { - res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", conf.GetConfig().PeerID, []string{}, nil).LoadOne(exec_id) - if res.Code != 200 { - logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id) - return nil - } - return res.ToWorkflowExecution() -} // So far we only log the output from -func executeInside(argo_file_path string, stepMax int) { +func executeInside(execID string, ns string, argo_file_path string, stepMax int) { t, err := tools2.NewService(conf.GetConfig().Mode) if err != nil { logger.Error().Msg("Could not create KubernetesTool") } - t.CreateArgoWorkflow(argo_file_path) - t.LogWorkflow("argo", workflowName, argo_file_path, stepMax, logWorkflow) + name, err := t.CreateArgoWorkflow(argo_file_path, ns) + if err != nil { + logger.Error().Msg("Could not create argo workflow : " + err.Error()) + } else { + split := strings.Split(argo_file_path, "_") + argoLogs := models.NewArgoLogs(split[0], "argo", stepMax) + argoLogs.StartStepRecording(argoLogs.NewWatch(), wf_logger) + err := t.LogWorkflow(execID, ns, name, argo_file_path, stepMax, argoLogs.NewWatch(), argoLogs.NewWatch(), argoLogs, []string{}, logWorkflow) + if err != nil { + logger.Error().Msg("Could not log workflow : " + err.Error()) + } + } + } func executeOutside(argo_file_path string, stepMax int) { @@ -140,7 +136,7 @@ func executeOutside(argo_file_path string, stepMax int) { var stdout, stderr io.ReadCloser // var stderr io.ReadCloser var err error - cmd := exec.Command("argo", "submit", "--log", "./argo_workflows/"+argo_file_path, "--serviceaccount=argo", "-n", "argo") + cmd := exec.Command("argo", "submit", "--log", argo_file_path, "--serviceaccount=argo", "-n", "argo") if stdout, err = cmd.StdoutPipe(); err != nil { wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error()) return @@ -149,7 +145,11 @@ func executeOutside(argo_file_path string, stepMax int) { panic(err) } var wg sync.WaitGroup - go logWorkflow(argo_file_path, stepMax, stdout, &wg) + split := strings.Split(argo_file_path, "_") + argoLogs := models.NewArgoLogs(split[0], "argo", stepMax) + argoLogs.StartStepRecording(argoLogs.NewWatch(), wf_logger) + argoLogs.IsStreaming = true + go logWorkflow(argo_file_path, stepMax, stdout, argoLogs.NewWatch(), argoLogs.NewWatch(), argoLogs, []string{}, &wg) if err := cmd.Wait(); err != nil { wf_logger.Error().Msg("Could not execute argo submit") @@ -161,33 +161,53 @@ func executeOutside(argo_file_path string, stepMax int) { // We could improve this function by creating an object with the same attribute as the output // and only send a new log if the current object has different values than the previous -func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser, wg *sync.WaitGroup) { - var current_watch, previous_watch *models.ArgoWatch - split := strings.Split(argo_file_path, "_") - argoLogs := models.NewArgoLogs(split[0], "argo", stepMax) - - watch_output := make([]string, 0) +func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser, + current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, + argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup) { scanner := bufio.NewScanner(pipe) + count := 0 + see := "" + seeit := 0 for scanner.Scan() { log := scanner.Text() - watch_output = append(watch_output, log) - if strings.Contains(log, "Progress:") { - current_watch = argoLogs.StopStepRecording(watch_output) - watch_output = []string{} - } else if strings.Contains(log, "sub-process exited") { - current_watch = argoLogs.StopStepRecording(watch_output) + if strings.Contains(log, "capturing logs") && count == 0 { + if !argoLogs.IsStreaming { + wg.Add(1) + } + seeit++ + } else if count == 0 { + if argoLogs.IsStreaming { + continue + } else { + break + } } - if current_watch != nil && !current_watch.Equals(previous_watch) && current_watch.Name != "" { - wg.Add(1) - checkStatus(current_watch, previous_watch) + if count == 1 { + see = log + if slices.Contains(argoLogs.Seen, see) && !argoLogs.IsStreaming { + wg.Done() + seeit-- + break + } + } + if !slices.Contains(current_watch.Logs, log) { + current_watch.Logs = append(current_watch.Logs, log) + } + count++ + if strings.Contains(log, "sub-process exited") { + current_watch = argoLogs.StopStepRecording(current_watch) + argoLogs.Seen = append(argoLogs.Seen, see) + if checkStatus(current_watch, previous_watch, argoLogs) { + count = 0 + if !argoLogs.IsStreaming { + wg.Done() + } + seeit-- + } jsonified, err := json.Marshal(current_watch) if err != nil { logger.Error().Msg("Could not create watch log") } - if strings.Contains(strings.ToLower(strings.Join(current_watch.Logs, " ")), "error") || strings.Contains(strings.ToLower(strings.ToLower(strings.Join(current_watch.Logs, " "))), "err") { - current_watch.Status = "Failed" - } - if current_watch.Status == "Failed" { wf_logger.Error().Msg(string(jsonified)) } else { @@ -195,8 +215,6 @@ func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser, wg *syn } previous_watch = current_watch current_watch = &models.ArgoWatch{} - watch_output = []string{} - wg.Done() } } } @@ -212,44 +230,50 @@ func loadConfig(is_k8s bool, parser *argparse.Parser) { } func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) { - if is_k8s { - conf.GetConfig().LokiURL = o.GetStringDefault("lokiurl", "http://127.0.0.1:3100") - i, err := strconv.Atoi(o.GetString("timeout")) - if err == nil { - conf.GetConfig().Timeout = i - } else { - logger.Error().Msg("Could not parse timeout, using default value") - } - conf.GetConfig().ExecutionID = o.GetString("workflow") - conf.GetConfig().PeerID = o.GetString("peer") - mongo := o.GetStringDefault("mongourl", "mongodb://127.0.0.1:27017") - db := o.GetStringDefault("database", "DC_myDC") + url := parser.String("u", "url", &argparse.Options{Required: true, Default: "http://127.0.0.1:3100", Help: "Url to the Loki database logs will be sent to"}) + mode := parser.String("M", "mode", &argparse.Options{Required: false, Default: "", Help: "Mode of the execution"}) + execution := parser.String("e", "execution", &argparse.Options{Required: true, Help: "Execution ID of the workflow to request from oc-catalog API"}) + peer := parser.String("p", "peer", &argparse.Options{Required: false, Default: "", Help: "Peer ID of the workflow to request from oc-catalog API"}) + mongo := parser.String("m", "mongo", &argparse.Options{Required: true, Default: "mongodb://127.0.0.1:27017", Help: "URL to reach the MongoDB"}) + db := parser.String("d", "database", &argparse.Options{Required: true, Default: "DC_myDC", Help: "Name of the database to query in MongoDB"}) + timeout := parser.Int("t", "timeout", &argparse.Options{Required: false, Default: -1, Help: "Timeout for the execution of the workflow"}) - conf.GetConfig().MongoURL = mongo - conf.GetConfig().Database = db - } else { - url := parser.String("u", "url", &argparse.Options{Required: true, Default: "http://127.0.0.1:3100", Help: "Url to the Loki database logs will be sent to"}) - mode := parser.String("M", "mode", &argparse.Options{Required: false, Default: "kubernetes", Help: "Mode of the execution"}) - execution := parser.String("e", "execution", &argparse.Options{Required: true, Help: "Execution ID of the workflow to request from oc-catalog API"}) - peer := parser.String("p", "peer", &argparse.Options{Required: false, Default: "", Help: "Peer ID of the workflow to request from oc-catalog API"}) - mongo := parser.String("m", "mongo", &argparse.Options{Required: true, Default: "mongodb://127.0.0.1:27017", Help: "URL to reach the MongoDB"}) - db := parser.String("d", "database", &argparse.Options{Required: true, Default: "DC_myDC", Help: "Name of the database to query in MongoDB"}) - timeout := parser.Int("t", "timeout", &argparse.Options{Required: false, Default: -1, Help: "Timeout for the execution of the workflow"}) - err := parser.Parse(os.Args) - if err != nil { - fmt.Println(parser.Usage(err)) - os.Exit(1) - } - conf.GetConfig().Logs = "debug" - conf.GetConfig().LokiURL = *url - conf.GetConfig().MongoURL = *mongo - conf.GetConfig().Database = *db - conf.GetConfig().Timeout = *timeout - conf.GetConfig().Mode = *mode - conf.GetConfig().ExecutionID = *execution - conf.GetConfig().PeerID = *peer + ca := parser.String("c", "ca", &argparse.Options{Required: false, Default: "", Help: "CA file for the Kubernetes cluster"}) + cert := parser.String("C", "cert", &argparse.Options{Required: false, Default: "", Help: "Cert file for the Kubernetes cluster"}) + data := parser.String("D", "data", &argparse.Options{Required: false, Default: "", Help: "Data file for the Kubernetes cluster"}) + + host := parser.String("H", "host", &argparse.Options{Required: false, Default: "", Help: "Host for the Kubernetes cluster"}) + port := parser.String("P", "port", &argparse.Options{Required: false, Default: "6443", Help: "Port for the Kubernetes cluster"}) + + err := parser.Parse(os.Args) + if err != nil { + fmt.Println(parser.Usage(err)) + os.Exit(1) } + conf.GetConfig().Logs = "debug" + conf.GetConfig().LokiURL = *url + conf.GetConfig().MongoURL = *mongo + conf.GetConfig().Database = *db + conf.GetConfig().Timeout = *timeout + conf.GetConfig().Mode = *mode + conf.GetConfig().ExecutionID = *execution + conf.GetConfig().PeerID = *peer + conf.GetConfig().KubeHost = *host + conf.GetConfig().KubePort = *port + + decoded, err := base64.StdEncoding.DecodeString(*ca) + if err == nil { + conf.GetConfig().KubeCA = string(decoded) + } + decoded, err = base64.StdEncoding.DecodeString(*cert) + if err == nil { + conf.GetConfig().KubeCert = string(decoded) + } + decoded, err = base64.StdEncoding.DecodeString(*data) + if err == nil { + conf.GetConfig().KubeData = string(decoded) + } } func initOnion(o *onion.Onion) *onion.Onion { @@ -294,14 +318,24 @@ func getContainerName(argo_file string) string { } // Uses the ArgoWatch object to update status of the workflow execution object -func checkStatus(current *models.ArgoWatch, previous *models.ArgoWatch) { - if previous != nil && current.Status != previous.Status { +func checkStatus(current *models.ArgoWatch, previous *models.ArgoWatch, argoLogs *models.ArgoLogs) bool { + if previous == nil || current.Status != previous.Status || argoLogs.IsStreaming { + argoLogs.StepCount += 1 if len(current.Logs) > 0 { - updateStatus(current.Status, current.Logs[0]) + newLogs := []string{} + for _, log := range current.Logs { + if !slices.Contains(argoLogs.Logs, log) { + newLogs = append(newLogs, log) + } + } + updateStatus(current.Status, strings.Join(newLogs, "\n")) + current.Logs = newLogs + argoLogs.Logs = append(argoLogs.Logs, newLogs...) } else { updateStatus(current.Status, "") } } + return previous == nil || current.Status != previous.Status || argoLogs.IsStreaming } func updateStatus(status string, log string) { diff --git a/models/argo_logs.go b/models/argo_logs.go index 534adac..1796813 100644 --- a/models/argo_logs.go +++ b/models/argo_logs.go @@ -1,12 +1,14 @@ package models import ( + "encoding/json" "fmt" "strconv" "strings" "time" "github.com/acarl005/stripansi" + "github.com/rs/zerolog" ) type ArgoWatch struct { @@ -41,6 +43,7 @@ func NewArgoLogs(name string, namespace string, stepMax int) *ArgoLogs { StepCount: 0, StepMax: stepMax, stop: false, + Seen: []string{}, } } @@ -52,19 +55,42 @@ type ArgoLogs struct { StepMax int stop bool Started time.Time + Seen []string + Logs []string + IsStreaming bool } -func (a *ArgoLogs) StartStepRecording() { +func (a *ArgoLogs) NewWatch() *ArgoWatch { + return &ArgoWatch{ + Name: a.Name, + Namespace: a.Namespace, + Status: "Pending", + Created: a.CreatedDate, + Started: a.Started.Format("2006-01-02 15:04:05"), + Conditions: Conditions{ + PodRunning: a.StepCount > 0 && a.StepCount < a.StepMax, + Completed: a.StepCount == a.StepMax, + }, + Progress: fmt.Sprintf("%v/%v", a.StepCount, a.StepMax), + Duration: "0s", + Logs: []string{}, + } + +} + +func (a *ArgoLogs) StartStepRecording(current_watch *ArgoWatch, logger zerolog.Logger) { + jsonified, _ := json.Marshal(current_watch) + logger.Info().Msg(string(jsonified)) a.StepCount += 1 a.Started = time.Now() } -func (a *ArgoLogs) StopStepRecording(inputs []string) *ArgoWatch { +func (a *ArgoLogs) StopStepRecording(current *ArgoWatch) *ArgoWatch { fn := strings.Split(a.Name, "_") logs := []string{} err := false end := "" - for _, input := range inputs { + for _, input := range current.Logs { line := strings.TrimSpace(input) if line == "" || !strings.Contains(line, fn[0]) || !strings.Contains(line, ":") { continue @@ -107,22 +133,13 @@ func (a *ArgoLogs) StopStepRecording(inputs []string) *ArgoWatch { timeE, _ := time.Parse("2006-01-02T15:04:05", end) duration = timeE.Sub(a.Started).Seconds() } - argo := &ArgoWatch{ - Name: a.Name, - Namespace: a.Namespace, - Status: status, - Created: a.CreatedDate, - Started: a.Started.Format("2006-01-02 15:04:05"), - Conditions: Conditions{ - PodRunning: a.StepCount > 0 && a.StepCount < a.StepMax, - Completed: a.StepCount == a.StepMax, - }, - Progress: fmt.Sprintf("%v/%v", a.StepCount, a.StepMax), - Duration: fmt.Sprintf("%v", fmt.Sprintf("%.2f", duration)+"s"), - Logs: logs, + current.Conditions = Conditions{ + PodRunning: a.StepCount > 0 && a.StepCount < a.StepMax, + Completed: a.StepCount == a.StepMax, } - if !argo.Completed { - a.StartStepRecording() - } - return argo + current.Progress = fmt.Sprintf("%v/%v", a.StepCount, a.StepMax) + current.Duration = fmt.Sprintf("%v", fmt.Sprintf("%.2f", duration)+"s") + + current.Status = status + return current } diff --git a/oc-monitord b/oc-monitord index 4379cde..7506b41 100755 Binary files a/oc-monitord and b/oc-monitord differ diff --git a/tools/interface.go b/tools/interface.go index 99c3a4a..2b356d8 100644 --- a/tools/interface.go +++ b/tools/interface.go @@ -3,14 +3,17 @@ package tools import ( "errors" "io" + "oc-monitord/models" "sync" ) type Tool interface { - CreateArgoWorkflow(path string) error + CreateArgoWorkflow(path string, ns string) (string, error) CreateAccessSecret(ns string, login string, password string) (string, error) - LogWorkflow(namespace string, workflowName string, argoFilePath string, stepMax int, - logFunc func(argoFilePath string, stepMax int, pipe io.ReadCloser, wg *sync.WaitGroup)) error + LogWorkflow(execID string, namespace string, workflowName string, argoFilePath string, stepMax int, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, + argoLogs *models.ArgoLogs, seen []string, + logFunc func(argoFilePath string, stepMax int, pipe io.ReadCloser, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, + argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup)) error } var _service = map[string]func() (Tool, error){ diff --git a/tools/kubernetes.go b/tools/kubernetes.go index 687f389..df68cc7 100644 --- a/tools/kubernetes.go +++ b/tools/kubernetes.go @@ -7,9 +7,13 @@ import ( "fmt" "io" "oc-monitord/conf" + "oc-monitord/models" + "oc-monitord/utils" "os" "sync" + "time" + "cloud.o-forge.io/core/oc-lib/models/common/enum" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" "github.com/google/uuid" @@ -54,57 +58,75 @@ func NewKubernetesTool() (Tool, error) { }, nil } -func (k *KubernetesTools) LogWorkflow(namespace string, workflowName string, argoFilePath string, stepMax int, - logFunc func(argoFilePath string, stepMax int, pipe io.ReadCloser, wg *sync.WaitGroup)) error { +func (k *KubernetesTools) LogWorkflow(execID string, namespace string, workflowName string, argoFilePath string, stepMax int, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, + seen []string, logFunc func(argoFilePath string, stepMax int, pipe io.ReadCloser, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup)) error { + exec := utils.GetExecution(execID) + if exec == nil { + return errors.New("Could not retrieve workflow ID from execution ID " + execID) + } + if exec.State == enum.FAILURE || exec.State == enum.SUCCESS { + return nil + } + k.logWorkflow(namespace, workflowName, argoFilePath, stepMax, current_watch, previous_watch, argoLogs, seen, logFunc) + return k.LogWorkflow(execID, namespace, workflowName, argoFilePath, stepMax, current_watch, previous_watch, argoLogs, seen, logFunc) +} + +func (k *KubernetesTools) logWorkflow(namespace string, workflowName string, argoFilePath string, stepMax int, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, + seen []string, + logFunc func(argoFilePath string, stepMax int, pipe io.ReadCloser, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup)) error { // List pods related to the Argo workflow labelSelector := fmt.Sprintf("workflows.argoproj.io/workflow=%s", workflowName) - pods, err := k.Set.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{ - LabelSelector: labelSelector, - }) - if err != nil { - panic(fmt.Sprintf("failed to list pods: %v", err)) - } - - if len(pods.Items) == 0 { - return errors.New("no pods found for the workflow") - } - var wg *sync.WaitGroup - // Stream logs from all matching pods - wg.Add(len(pods.Items)) - for _, pod := range pods.Items { - for _, container := range pod.Spec.Containers { - fmt.Printf("streaming logs for Pod: %s, Container: %s\n", pod.Name, container.Name) - go k.streamLogs(namespace, pod.Name, container.Name, argoFilePath, stepMax, wg, logFunc) + for retries := 0; retries < 10; retries++ { // Retry for up to ~20 seconds + // List workflow pods + wfPods, err := k.Set.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return err } + // If we found pods, stream logs + if len(wfPods.Items) > 0 { + var wg sync.WaitGroup + // Stream logs from all matching pods + for _, pod := range wfPods.Items { + for _, container := range pod.Spec.Containers { + wg.Add(1) + go k.streamLogs(namespace, pod.Name, container.Name, argoFilePath, stepMax, &wg, current_watch, previous_watch, argoLogs, seen, logFunc) + } + } + wg.Wait() + return nil + } + time.Sleep(2 * time.Second) // Wait before retrying } - wg.Wait() - return nil + return errors.New("no pods found for the workflow") } // Function to stream logs func (k *KubernetesTools) streamLogs(namespace string, podName string, containerName string, - argoFilePath string, stepMax int, wg *sync.WaitGroup, - logFunc func(argo_file_path string, stepMax int, pipe io.ReadCloser, wg *sync.WaitGroup)) { + argoFilePath string, stepMax int, wg *sync.WaitGroup, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, seen []string, + logFunc func(argo_file_path string, stepMax int, pipe io.ReadCloser, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup)) { req := k.Set.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{ - Container: containerName, - Follow: true, // Equivalent to -f flag in kubectl logs + Container: containerName, // Main container + Follow: true, // Equivalent to -f flag in kubectl logs }) - + defer wg.Done() // Open stream - stream, err := req.Stream(context.TODO()) + stream, err := req.Stream(context.Background()) if err != nil { - fmt.Printf("Error opening log stream for pod %s: %v\n", podName, err) return } defer stream.Close() - logFunc(argoFilePath, stepMax, stream, wg) + var internalWg sync.WaitGroup + logFunc(argoFilePath, stepMax, stream, current_watch, previous_watch, argoLogs, seen, &internalWg) + internalWg.Wait() } -func (k *KubernetesTools) CreateArgoWorkflow(path string) error { +func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, error) { // Read workflow YAML file workflowYAML, err := os.ReadFile(path) if err != nil { - return err + return "", err } // Decode the YAML into a Workflow struct scheme := runtime.NewScheme() @@ -114,21 +136,21 @@ func (k *KubernetesTools) CreateArgoWorkflow(path string) error { obj, _, err := decode(workflowYAML, nil, nil) if err != nil { - return errors.New("failed to decode YAML: " + err.Error()) + return "", errors.New("failed to decode YAML: " + err.Error()) } workflow, ok := obj.(*wfv1.Workflow) if !ok { - return errors.New("decoded object is not a Workflow") + return "", errors.New("decoded object is not a Workflow") } // Create the workflow in the "argo" namespace - createdWf, err := k.VersionedSet.ArgoprojV1alpha1().Workflows("argo").Create(context.TODO(), workflow, metav1.CreateOptions{}) + createdWf, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(ns).Create(context.Background(), workflow, metav1.CreateOptions{}) if err != nil { - return errors.New("failed to create workflow: " + err.Error()) + return "", errors.New("failed to create workflow: " + err.Error()) } fmt.Printf("workflow %s created in namespace %s\n", createdWf.Name, "argo") - return nil + return createdWf.Name, nil } func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password string) (string, error) { @@ -151,7 +173,7 @@ func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password s Data: secretData, } // Create the Secret in Kubernetes - _, err := k.Set.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{}) + _, err := k.Set.CoreV1().Secrets(namespace).Create(context.Background(), secret, metav1.CreateOptions{}) if err != nil { return "", errors.New("Error creating secret: " + err.Error()) } diff --git a/utils/utils.go b/utils/utils.go new file mode 100644 index 0000000..1222230 --- /dev/null +++ b/utils/utils.go @@ -0,0 +1,18 @@ +package utils + +import ( + "oc-monitord/conf" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" +) + +func GetExecution(exec_id string) *workflow_execution.WorkflowExecution { + res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", conf.GetConfig().PeerID, []string{}, nil).LoadOne(exec_id) + if res.Code != 200 { + logger := oclib.GetLogger() + logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id) + return nil + } + return res.ToWorkflowExecution() +} diff --git a/workflow_builder/argo_builder.go b/workflow_builder/argo_builder.go index 8b29ee7..2b146eb 100644 --- a/workflow_builder/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -92,7 +92,7 @@ func (b *ArgoBuilder) CreateDAG(namespace string, write bool) (string, int, []st logger.Error().Msg("Could not write the yaml file") return "", 0, firstItems, lastItems, err } - return file_name, len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil + return workflows_dir + file_name, len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil } func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []VolumeMount) {