From 84f6af6e448b3d18e276acc25e2b40d5c4ee838b Mon Sep 17 00:00:00 2001 From: mr Date: Wed, 25 Feb 2026 13:20:44 +0100 Subject: [PATCH] Execution workflow execute change --- logger/argo_logs.go | 94 +++++++++++++++++++++++-------------------- main.go | 97 +++++++++++---------------------------------- 2 files changed, 76 insertions(+), 115 deletions(-) diff --git a/logger/argo_logs.go b/logger/argo_logs.go index 64fd6b8..b0a3f8e 100644 --- a/logger/argo_logs.go +++ b/logger/argo_logs.go @@ -11,6 +11,8 @@ import ( "sync" "time" + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/models/common/enum" "github.com/rs/zerolog" "k8s.io/apimachinery/pkg/watch" @@ -54,7 +56,7 @@ func NewArgoLogs(name string, namespace string, stepMax int) *ArgoLogs { } } -// An object to monitor and log the output of an argo submit +// An object to monitor and log the output of an argo submit type ArgoLogs struct { Name string Namespace string @@ -93,22 +95,21 @@ func (a *ArgoLogs) StartStepRecording(current_watch *ArgoWatch, logger zerolog.L a.Started = time.Now() } - type ArgoPodLog struct { - PodName string - Step string - Message string + PodName string + Step string + Message string } func NewArgoPodLog(name string, step string, msg string) ArgoPodLog { return ArgoPodLog{ PodName: name, - Step: step, + Step: step, Message: msg, } } -func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) { +func LogKubernetesArgo(wfName string, execID string, namespace string, watcher watch.Interface) { var argoWatcher *ArgoWatch var pods []string var node wfv1.NodeStatus @@ -117,38 +118,38 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) wfl.Debug().Msg("Starting to log " + wfName) var wg sync.WaitGroup - - for event := range (watcher.ResultChan()) { + + for event := range watcher.ResultChan() { wf, ok := event.Object.(*wfv1.Workflow) if !ok { wfl.Error().Msg("unexpected type") continue } if len(wf.Status.Nodes) == 0 { - wfl.Info().Msg("No node status yet") // The first output of the channel doesn't contain Nodes so we skip it + wfl.Info().Msg("No node status yet") // The first output of the channel doesn't contain Nodes so we skip it continue } - - conditions := retrieveCondition(wf) + + conditions := retrieveCondition(wf) // Retrieving the Status for the main node, which is named after the workflow if node, ok = wf.Status.Nodes[wfName]; !ok { - bytified, _ := json.MarshalIndent(wf.Status.Nodes,"","\t") + bytified, _ := json.MarshalIndent(wf.Status.Nodes, "", "\t") wfl.Fatal().Msg("Could not find the " + wfName + " node in \n" + string(bytified)) } now := time.Now() - start, _ := time.Parse(time.RFC3339, node.StartedAt.String() ) + start, _ := time.Parse(time.RFC3339, node.StartedAt.String()) duration := now.Sub(start) newWatcher := ArgoWatch{ - Name: node.Name, - Namespace: namespace, - Status: string(node.Phase), - Created: node.StartedAt.String(), - Started: node.StartedAt.String(), - Progress: string(node.Progress), - Duration: duration.String(), + Name: node.Name, + Namespace: namespace, + Status: string(node.Phase), + Created: node.StartedAt.String(), + Started: node.StartedAt.String(), + Progress: string(node.Progress), + Duration: duration.String(), Conditions: conditions, } @@ -156,23 +157,26 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) argoWatcher = &newWatcher } - if !newWatcher.Equals(argoWatcher){ + if !newWatcher.Equals(argoWatcher) { jsonified, _ := json.Marshal(newWatcher) wfl.Info().Msg(string(jsonified)) argoWatcher = &newWatcher } - - // I don't think we need to use WaitGroup here, because the loop itself + + // I don't think we need to use WaitGroup here, because the loop itself // acts as blocking process for the main thread, because Argo watch never closes the channel - for _, pod := range wf.Status.Nodes{ - if !slices.Contains(pods,pod.Name){ - pl := wfl.With().Str("pod", pod.Name).Logger() - if wfName == pod.Name { pods = append(pods, pod.Name); continue } // One of the node is the Workflow, the others are the pods so don't try to log on the wf name - pl.Info().Msg("Found a new pod to log : " + pod.Name) + for _, pod := range wf.Status.Nodes { + if !slices.Contains(pods, pod.Name) { + pl := wfl.With().Str("pod", pod.Name).Logger() + if wfName == pod.Name { + pods = append(pods, pod.Name) + continue + } // One of the node is the Workflow, the others are the pods so don't try to log on the wf name + pl.Info().Msg("Found a new pod to log : " + pod.Name) wg.Add(1) go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg) pods = append(pods, pod.Name) - } + } } // Stop listening to the chan when the Workflow is completed or something bad happened @@ -180,11 +184,17 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) wfl.Info().Msg(wfName + " worflow completed") wg.Wait() wfl.Info().Msg(wfName + " exiting") + oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{ + "state": enum.SUCCESS.EnumIndex(), + }, execID) break } if node.Phase.FailedOrError() { wfl.Error().Msg(wfName + "has failed, please refer to the logs") - wfl.Error().Msg(node.Message) + oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{ + "state": enum.FAILURE.EnumIndex(), + }, execID) + wfl.Error().Msg(node.Message) break } } @@ -200,36 +210,36 @@ func retrieveCondition(wf *wfv1.Workflow) (c Conditions) { } } - return - + return + } -// Function needed to be executed as a go thread -func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger, wg *sync.WaitGroup){ +// Function needed to be executed as a go thread +func logKubernetesPods(executionId string, wfName string, podName string, logger zerolog.Logger, wg *sync.WaitGroup) { defer wg.Done() - + s := strings.Split(podName, ".") name := s[0] + "-" + s[1] step := s[1] - + k, err := tools.NewKubernetesTool() if err != nil { logger.Error().Msg("Could not get Kubernetes tools") return } - + reader, err := k.GetPodLogger(executionId, wfName, podName) if err != nil { logger.Error().Msg(err.Error()) return } - + scanner := bufio.NewScanner(reader) for scanner.Scan() { log := scanner.Text() - podLog := NewArgoPodLog(name,step,log) + podLog := NewArgoPodLog(name, step, log) jsonified, _ := json.Marshal(podLog) logger.Info().Msg(string(jsonified)) } - -} \ No newline at end of file + +} diff --git a/main.go b/main.go index ce8c804..8a25828 100644 --- a/main.go +++ b/main.go @@ -1,16 +1,11 @@ package main import ( - "bufio" "encoding/base64" "fmt" - "io" "os" - "os/exec" "regexp" "strings" - "sync" - "time" "oc-monitord/conf" l "oc-monitord/logger" @@ -21,6 +16,7 @@ import ( "cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/models/booking" + "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" @@ -53,7 +49,7 @@ func main() { os.Setenv("test_service", "true") // Only for service demo, delete before merging on main parser = *argparse.NewParser("oc-monitord", "Launch the execution of a workflow given as a parameter and sends the produced logs to a loki database") - loadConfig(false, &parser) + loadConfig(&parser) oclib.InitDaemon("oc-monitord") logger = u.GetLogger() @@ -63,6 +59,10 @@ func main() { exec := u.GetExecution(conf.GetConfig().ExecutionID) if exec == nil { logger.Fatal().Msg("Could not retrieve workflow ID from execution ID " + conf.GetConfig().ExecutionID + " on peer " + conf.GetConfig().PeerID) + oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{ + "state": enum.FAILURE.EnumIndex(), + }, conf.GetConfig().ExecutionID) + return } conf.GetConfig().WorkflowID = exec.WorkflowID @@ -85,29 +85,36 @@ func main() { if err != nil { logger.Error().Msg("Could not create the Argo file for " + conf.GetConfig().WorkflowID) logger.Error().Msg(err.Error()) + oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{ + "state": enum.FAILURE.EnumIndex(), + }, exec.GetID()) + return } argoFilePath, err := builder.CompleteBuild(exec.ExecutionsID) if err != nil { logger.Error().Msg("Error when completing the build of the workflow: " + err.Error()) + oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{ + "state": enum.FAILURE.EnumIndex(), + }, exec.GetID()) + return } workflowName = getContainerName(argoFilePath) if conf.GetConfig().KubeHost == "" { // Not in a k8s environment, get conf from parameters - logger.Info().Msg("Executes outside of k8s") - executeOutside(argoFilePath, builder.Workflow) + panic("can't exec with no kube for argo deployment") } else { // Executed in a k8s environment logger.Info().Msg("Executes inside a k8s") // executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID() - executeInside(exec.ExecutionsID, argoFilePath) + executeInside(exec.ExecutionsID, exec.GetID(), argoFilePath) } } // So far we only log the output from -func executeInside(ns string, argo_file_path string) { +func executeInside(ns string, execID string, argo_file_path string) { t, err := tools2.NewService(conf.GetConfig().Mode) if err != nil { logger.Error().Msg("Could not create KubernetesTool") @@ -126,84 +133,28 @@ func executeInside(ns string, argo_file_path string) { watcher, err := t.GetArgoWatch(ns, workflowName) if err != nil { logger.Error().Msg("Could not retrieve Watcher : " + err.Error()) + oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{ + "state": enum.FAILURE.EnumIndex(), + }, execID) } - l.LogKubernetesArgo(name, ns, watcher) - if err != nil { - logger.Error().Msg("Could not log workflow : " + err.Error()) - } - + l.LogKubernetesArgo(name, execID, ns, watcher) logger.Info().Msg("Finished, exiting...") } } -func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { - var stdoutSubmit, stderrSubmit io.ReadCloser - var stdoutLogs, stderrLogs io.ReadCloser - var wg sync.WaitGroup - var err error - - logger.Debug().Msg("executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID) - - cmdSubmit := exec.Command("argo", "submit", "--watch", argo_file_path, "--serviceaccount", "sa-"+conf.GetConfig().ExecutionID, "-n", conf.GetConfig().ExecutionID) - if stdoutSubmit, err = cmdSubmit.StdoutPipe(); err != nil { - wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error()) - return - } - - cmdLogs := exec.Command("argo", "logs", "oc-monitor-"+workflowName, "-n", conf.GetConfig().ExecutionID, "--follow", "--no-color") - if stdoutLogs, err = cmdLogs.StdoutPipe(); err != nil { - wf_logger.Error().Msg("Could not retrieve stdoutpipe for 'argo logs'" + err.Error()) - return - } - - var steps []string - for _, template := range workflow.Spec.Templates { - steps = append(steps, template.Name) - } - - go l.LogLocalWorkflow(workflowName, stdoutSubmit, &wg) - go l.LogLocalPod(workflowName, stdoutLogs, steps, &wg) - - logger.Info().Msg("Starting argo submit") - if err := cmdSubmit.Start(); err != nil { - wf_logger.Error().Msg("Could not start argo submit") - wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text()) - updateStatus("fatal", "") - } - - time.Sleep(5 * time.Second) - - logger.Info().Msg("Running argo logs") - if err := cmdLogs.Run(); err != nil { - wf_logger.Error().Msg("Could not run '" + strings.Join(cmdLogs.Args, " ") + "'") - - wf_logger.Fatal().Msg(err.Error() + bufio.NewScanner(stderrLogs).Text()) - - } - - logger.Info().Msg("Waiting argo submit") - if err := cmdSubmit.Wait(); err != nil { - wf_logger.Error().Msg("Could not execute argo submit") - wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text()) - updateStatus("fatal", "") - } - - wg.Wait() -} - -func loadConfig(is_k8s bool, parser *argparse.Parser) { +func loadConfig(parser *argparse.Parser) { var o *onion.Onion o = initOnion(o) - setConf(is_k8s, o, parser) + setConf(parser) // if !IsValidUUID(conf.GetConfig().ExecutionID) { // logger.Fatal().Msg("Provided ID is not an UUID") // } } -func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) { +func setConf(parser *argparse.Parser) { url := parser.String("u", "url", &argparse.Options{Required: true, Default: "http://127.0.0.1:3100", Help: "Url to the Loki database logs will be sent to"}) mode := parser.String("M", "mode", &argparse.Options{Required: false, Default: "", Help: "Mode of the execution"}) execution := parser.String("e", "execution", &argparse.Options{Required: true, Help: "Execution ID of the workflow to request from oc-catalog API"})