Execution workflow execute change

This commit is contained in:
mr
2026-02-25 13:20:44 +01:00
parent 921ee900ce
commit 84f6af6e44
2 changed files with 76 additions and 115 deletions

View File

@@ -11,6 +11,8 @@ import (
"sync" "sync"
"time" "time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
@@ -93,22 +95,21 @@ func (a *ArgoLogs) StartStepRecording(current_watch *ArgoWatch, logger zerolog.L
a.Started = time.Now() a.Started = time.Now()
} }
type ArgoPodLog struct { type ArgoPodLog struct {
PodName string PodName string
Step string Step string
Message string Message string
} }
func NewArgoPodLog(name string, step string, msg string) ArgoPodLog { func NewArgoPodLog(name string, step string, msg string) ArgoPodLog {
return ArgoPodLog{ return ArgoPodLog{
PodName: name, PodName: name,
Step: step, Step: step,
Message: msg, Message: msg,
} }
} }
func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) { func LogKubernetesArgo(wfName string, execID string, namespace string, watcher watch.Interface) {
var argoWatcher *ArgoWatch var argoWatcher *ArgoWatch
var pods []string var pods []string
var node wfv1.NodeStatus var node wfv1.NodeStatus
@@ -118,14 +119,14 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface)
var wg sync.WaitGroup var wg sync.WaitGroup
for event := range (watcher.ResultChan()) { for event := range watcher.ResultChan() {
wf, ok := event.Object.(*wfv1.Workflow) wf, ok := event.Object.(*wfv1.Workflow)
if !ok { if !ok {
wfl.Error().Msg("unexpected type") wfl.Error().Msg("unexpected type")
continue continue
} }
if len(wf.Status.Nodes) == 0 { if len(wf.Status.Nodes) == 0 {
wfl.Info().Msg("No node status yet") // The first output of the channel doesn't contain Nodes so we skip it wfl.Info().Msg("No node status yet") // The first output of the channel doesn't contain Nodes so we skip it
continue continue
} }
@@ -133,22 +134,22 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface)
// Retrieving the Status for the main node, which is named after the workflow // Retrieving the Status for the main node, which is named after the workflow
if node, ok = wf.Status.Nodes[wfName]; !ok { if node, ok = wf.Status.Nodes[wfName]; !ok {
bytified, _ := json.MarshalIndent(wf.Status.Nodes,"","\t") bytified, _ := json.MarshalIndent(wf.Status.Nodes, "", "\t")
wfl.Fatal().Msg("Could not find the " + wfName + " node in \n" + string(bytified)) wfl.Fatal().Msg("Could not find the " + wfName + " node in \n" + string(bytified))
} }
now := time.Now() now := time.Now()
start, _ := time.Parse(time.RFC3339, node.StartedAt.String() ) start, _ := time.Parse(time.RFC3339, node.StartedAt.String())
duration := now.Sub(start) duration := now.Sub(start)
newWatcher := ArgoWatch{ newWatcher := ArgoWatch{
Name: node.Name, Name: node.Name,
Namespace: namespace, Namespace: namespace,
Status: string(node.Phase), Status: string(node.Phase),
Created: node.StartedAt.String(), Created: node.StartedAt.String(),
Started: node.StartedAt.String(), Started: node.StartedAt.String(),
Progress: string(node.Progress), Progress: string(node.Progress),
Duration: duration.String(), Duration: duration.String(),
Conditions: conditions, Conditions: conditions,
} }
@@ -156,7 +157,7 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface)
argoWatcher = &newWatcher argoWatcher = &newWatcher
} }
if !newWatcher.Equals(argoWatcher){ if !newWatcher.Equals(argoWatcher) {
jsonified, _ := json.Marshal(newWatcher) jsonified, _ := json.Marshal(newWatcher)
wfl.Info().Msg(string(jsonified)) wfl.Info().Msg(string(jsonified))
argoWatcher = &newWatcher argoWatcher = &newWatcher
@@ -164,11 +165,14 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface)
// I don't think we need to use WaitGroup here, because the loop itself // I don't think we need to use WaitGroup here, because the loop itself
// acts as blocking process for the main thread, because Argo watch never closes the channel // acts as blocking process for the main thread, because Argo watch never closes the channel
for _, pod := range wf.Status.Nodes{ for _, pod := range wf.Status.Nodes {
if !slices.Contains(pods,pod.Name){ if !slices.Contains(pods, pod.Name) {
pl := wfl.With().Str("pod", pod.Name).Logger() pl := wfl.With().Str("pod", pod.Name).Logger()
if wfName == pod.Name { pods = append(pods, pod.Name); continue } // One of the node is the Workflow, the others are the pods so don't try to log on the wf name if wfName == pod.Name {
pl.Info().Msg("Found a new pod to log : " + pod.Name) pods = append(pods, pod.Name)
continue
} // One of the node is the Workflow, the others are the pods so don't try to log on the wf name
pl.Info().Msg("Found a new pod to log : " + pod.Name)
wg.Add(1) wg.Add(1)
go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg) go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg)
pods = append(pods, pod.Name) pods = append(pods, pod.Name)
@@ -180,10 +184,16 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface)
wfl.Info().Msg(wfName + " worflow completed") wfl.Info().Msg(wfName + " worflow completed")
wg.Wait() wg.Wait()
wfl.Info().Msg(wfName + " exiting") wfl.Info().Msg(wfName + " exiting")
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.SUCCESS.EnumIndex(),
}, execID)
break break
} }
if node.Phase.FailedOrError() { if node.Phase.FailedOrError() {
wfl.Error().Msg(wfName + "has failed, please refer to the logs") wfl.Error().Msg(wfName + "has failed, please refer to the logs")
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.FAILURE.EnumIndex(),
}, execID)
wfl.Error().Msg(node.Message) wfl.Error().Msg(node.Message)
break break
} }
@@ -205,7 +215,7 @@ func retrieveCondition(wf *wfv1.Workflow) (c Conditions) {
} }
// Function needed to be executed as a go thread // Function needed to be executed as a go thread
func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger, wg *sync.WaitGroup){ func logKubernetesPods(executionId string, wfName string, podName string, logger zerolog.Logger, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
s := strings.Split(podName, ".") s := strings.Split(podName, ".")
@@ -227,7 +237,7 @@ func logKubernetesPods(executionId string, wfName string,podName string, logger
scanner := bufio.NewScanner(reader) scanner := bufio.NewScanner(reader)
for scanner.Scan() { for scanner.Scan() {
log := scanner.Text() log := scanner.Text()
podLog := NewArgoPodLog(name,step,log) podLog := NewArgoPodLog(name, step, log)
jsonified, _ := json.Marshal(podLog) jsonified, _ := json.Marshal(podLog)
logger.Info().Msg(string(jsonified)) logger.Info().Msg(string(jsonified))
} }

97
main.go
View File

@@ -1,16 +1,11 @@
package main package main
import ( import (
"bufio"
"encoding/base64" "encoding/base64"
"fmt" "fmt"
"io"
"os" "os"
"os/exec"
"regexp" "regexp"
"strings" "strings"
"sync"
"time"
"oc-monitord/conf" "oc-monitord/conf"
l "oc-monitord/logger" l "oc-monitord/logger"
@@ -21,6 +16,7 @@ import (
"cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/booking" "cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/models/workflow_execution"
@@ -53,7 +49,7 @@ func main() {
os.Setenv("test_service", "true") // Only for service demo, delete before merging on main os.Setenv("test_service", "true") // Only for service demo, delete before merging on main
parser = *argparse.NewParser("oc-monitord", "Launch the execution of a workflow given as a parameter and sends the produced logs to a loki database") parser = *argparse.NewParser("oc-monitord", "Launch the execution of a workflow given as a parameter and sends the produced logs to a loki database")
loadConfig(false, &parser) loadConfig(&parser)
oclib.InitDaemon("oc-monitord") oclib.InitDaemon("oc-monitord")
logger = u.GetLogger() logger = u.GetLogger()
@@ -63,6 +59,10 @@ func main() {
exec := u.GetExecution(conf.GetConfig().ExecutionID) exec := u.GetExecution(conf.GetConfig().ExecutionID)
if exec == nil { if exec == nil {
logger.Fatal().Msg("Could not retrieve workflow ID from execution ID " + conf.GetConfig().ExecutionID + " on peer " + conf.GetConfig().PeerID) logger.Fatal().Msg("Could not retrieve workflow ID from execution ID " + conf.GetConfig().ExecutionID + " on peer " + conf.GetConfig().PeerID)
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.FAILURE.EnumIndex(),
}, conf.GetConfig().ExecutionID)
return
} }
conf.GetConfig().WorkflowID = exec.WorkflowID conf.GetConfig().WorkflowID = exec.WorkflowID
@@ -85,29 +85,36 @@ func main() {
if err != nil { if err != nil {
logger.Error().Msg("Could not create the Argo file for " + conf.GetConfig().WorkflowID) logger.Error().Msg("Could not create the Argo file for " + conf.GetConfig().WorkflowID)
logger.Error().Msg(err.Error()) logger.Error().Msg(err.Error())
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.FAILURE.EnumIndex(),
}, exec.GetID())
return
} }
argoFilePath, err := builder.CompleteBuild(exec.ExecutionsID) argoFilePath, err := builder.CompleteBuild(exec.ExecutionsID)
if err != nil { if err != nil {
logger.Error().Msg("Error when completing the build of the workflow: " + err.Error()) logger.Error().Msg("Error when completing the build of the workflow: " + err.Error())
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.FAILURE.EnumIndex(),
}, exec.GetID())
return
} }
workflowName = getContainerName(argoFilePath) workflowName = getContainerName(argoFilePath)
if conf.GetConfig().KubeHost == "" { if conf.GetConfig().KubeHost == "" {
// Not in a k8s environment, get conf from parameters // Not in a k8s environment, get conf from parameters
logger.Info().Msg("Executes outside of k8s") panic("can't exec with no kube for argo deployment")
executeOutside(argoFilePath, builder.Workflow)
} else { } else {
// Executed in a k8s environment // Executed in a k8s environment
logger.Info().Msg("Executes inside a k8s") logger.Info().Msg("Executes inside a k8s")
// executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID() // executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID()
executeInside(exec.ExecutionsID, argoFilePath) executeInside(exec.ExecutionsID, exec.GetID(), argoFilePath)
} }
} }
// So far we only log the output from // So far we only log the output from
func executeInside(ns string, argo_file_path string) { func executeInside(ns string, execID string, argo_file_path string) {
t, err := tools2.NewService(conf.GetConfig().Mode) t, err := tools2.NewService(conf.GetConfig().Mode)
if err != nil { if err != nil {
logger.Error().Msg("Could not create KubernetesTool") logger.Error().Msg("Could not create KubernetesTool")
@@ -126,84 +133,28 @@ func executeInside(ns string, argo_file_path string) {
watcher, err := t.GetArgoWatch(ns, workflowName) watcher, err := t.GetArgoWatch(ns, workflowName)
if err != nil { if err != nil {
logger.Error().Msg("Could not retrieve Watcher : " + err.Error()) logger.Error().Msg("Could not retrieve Watcher : " + err.Error())
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
"state": enum.FAILURE.EnumIndex(),
}, execID)
} }
l.LogKubernetesArgo(name, ns, watcher) l.LogKubernetesArgo(name, execID, ns, watcher)
if err != nil {
logger.Error().Msg("Could not log workflow : " + err.Error())
}
logger.Info().Msg("Finished, exiting...") logger.Info().Msg("Finished, exiting...")
} }
} }
func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { func loadConfig(parser *argparse.Parser) {
var stdoutSubmit, stderrSubmit io.ReadCloser
var stdoutLogs, stderrLogs io.ReadCloser
var wg sync.WaitGroup
var err error
logger.Debug().Msg("executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID)
cmdSubmit := exec.Command("argo", "submit", "--watch", argo_file_path, "--serviceaccount", "sa-"+conf.GetConfig().ExecutionID, "-n", conf.GetConfig().ExecutionID)
if stdoutSubmit, err = cmdSubmit.StdoutPipe(); err != nil {
wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error())
return
}
cmdLogs := exec.Command("argo", "logs", "oc-monitor-"+workflowName, "-n", conf.GetConfig().ExecutionID, "--follow", "--no-color")
if stdoutLogs, err = cmdLogs.StdoutPipe(); err != nil {
wf_logger.Error().Msg("Could not retrieve stdoutpipe for 'argo logs'" + err.Error())
return
}
var steps []string
for _, template := range workflow.Spec.Templates {
steps = append(steps, template.Name)
}
go l.LogLocalWorkflow(workflowName, stdoutSubmit, &wg)
go l.LogLocalPod(workflowName, stdoutLogs, steps, &wg)
logger.Info().Msg("Starting argo submit")
if err := cmdSubmit.Start(); err != nil {
wf_logger.Error().Msg("Could not start argo submit")
wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text())
updateStatus("fatal", "")
}
time.Sleep(5 * time.Second)
logger.Info().Msg("Running argo logs")
if err := cmdLogs.Run(); err != nil {
wf_logger.Error().Msg("Could not run '" + strings.Join(cmdLogs.Args, " ") + "'")
wf_logger.Fatal().Msg(err.Error() + bufio.NewScanner(stderrLogs).Text())
}
logger.Info().Msg("Waiting argo submit")
if err := cmdSubmit.Wait(); err != nil {
wf_logger.Error().Msg("Could not execute argo submit")
wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text())
updateStatus("fatal", "")
}
wg.Wait()
}
func loadConfig(is_k8s bool, parser *argparse.Parser) {
var o *onion.Onion var o *onion.Onion
o = initOnion(o) o = initOnion(o)
setConf(is_k8s, o, parser) setConf(parser)
// if !IsValidUUID(conf.GetConfig().ExecutionID) { // if !IsValidUUID(conf.GetConfig().ExecutionID) {
// logger.Fatal().Msg("Provided ID is not an UUID") // logger.Fatal().Msg("Provided ID is not an UUID")
// } // }
} }
func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) { func setConf(parser *argparse.Parser) {
url := parser.String("u", "url", &argparse.Options{Required: true, Default: "http://127.0.0.1:3100", Help: "Url to the Loki database logs will be sent to"}) url := parser.String("u", "url", &argparse.Options{Required: true, Default: "http://127.0.0.1:3100", Help: "Url to the Loki database logs will be sent to"})
mode := parser.String("M", "mode", &argparse.Options{Required: false, Default: "", Help: "Mode of the execution"}) mode := parser.String("M", "mode", &argparse.Options{Required: false, Default: "", Help: "Mode of the execution"})
execution := parser.String("e", "execution", &argparse.Options{Required: true, Help: "Execution ID of the workflow to request from oc-catalog API"}) execution := parser.String("e", "execution", &argparse.Options{Required: true, Help: "Execution ID of the workflow to request from oc-catalog API"})