Monitor With Data Storage + Datas

This commit is contained in:
mr
2024-10-11 13:44:16 +02:00
parent f388351476
commit 310395f869
10 changed files with 532 additions and 491 deletions

83
main.go
View File

@@ -95,7 +95,7 @@ func main() {
logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API")
}
argo_file_path, err := new_wf.ExportToArgo(conf.GetConfig().Timeout)
argo_file_path, stepMax, err := new_wf.ExportToArgo(conf.GetConfig().Timeout)
if err != nil {
logger.Error().Msg("Could not create the Argo file for " + conf.GetConfig().WorkflowID)
logger.Error().Msg(err.Error())
@@ -107,20 +107,17 @@ func main() {
wf_logger = logger.With().Str("argo_name", workflowName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger()
wf_logger.Debug().Msg("Testing argo name")
executeWorkflow(argo_file_path)
executeWorkflow(argo_file_path, stepMax)
}
// Return the Workflow ID associated to a workflow execution object
func getWorkflowId(exec_id string) string {
res := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), exec_id)
if res.Code != 200 {
logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id)
return ""
}
wf_exec := res.ToWorkflowExecution()
return wf_exec.WorkflowID
@@ -128,29 +125,22 @@ func getWorkflowId(exec_id string) string {
// So far we only log the output from
func executeWorkflow(argo_file_path string) {
func executeWorkflow(argo_file_path string, stepMax int) {
// var stdout, stderr, stdout_logs, stderr_logs io.ReadCloser
var stdout, stderr io.ReadCloser
// var stderr io.ReadCloser
var err error
cmd := exec.Command("argo", "submit", "--watch", "./argo_workflows/"+argo_file_path, "--serviceaccount=argo", "-n", "argo")
cmd := exec.Command("argo", "submit", "--log", "./argo_workflows/"+argo_file_path, "--serviceaccount=argo", "-n", "argo")
fmt.Println(cmd)
if stdout, err = cmd.StdoutPipe(); err != nil {
wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error())
return
}
if stderr, err = cmd.StderrPipe(); err != nil {
wf_logger.Error().Msg("Could not retrieve stderrpipe " + err.Error())
return
}
if err := cmd.Start(); err != nil {
panic(err)
}
var wg sync.WaitGroup
go logWorkflow(stdout, &wg)
go logWorkflow(argo_file_path, stepMax, stdout, &wg)
if err := cmd.Wait(); err != nil {
wf_logger.Error().Msg("Could not execute argo submit")
@@ -163,46 +153,43 @@ func executeWorkflow(argo_file_path string) {
// We could improve this function by creating an object with the same attribute as the output
// and only send a new log if the current object has different values than the previous
func logWorkflow(pipe io.ReadCloser, wg *sync.WaitGroup) {
var current_watch, previous_watch models.ArgoWatch
func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser, wg *sync.WaitGroup) {
var current_watch, previous_watch *models.ArgoWatch
split := strings.Split(argo_file_path, "_")
argoLogs := models.NewArgoLogs(split[0], "argo", stepMax)
watch_output := make([]string, 0)
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
log := scanner.Text()
watch_output = append(watch_output, log)
if strings.HasPrefix(log, "Progress:") {
current_watch = *models.NewArgoLogs(watch_output)
workflowName = current_watch.Name
if !current_watch.Equals(previous_watch) {
wg.Add(1)
checkStatus(current_watch.Status, previous_watch.Status)
jsonified, err := json.Marshal(current_watch)
if err != nil {
logger.Error().Msg("Could not create watch log")
}
wf_logger.Info().Msg(string(jsonified))
previous_watch = current_watch
current_watch = models.ArgoWatch{}
wg.Done()
if strings.Contains(log, "Progress:") {
current_watch = argoLogs.StopStepRecording(watch_output)
watch_output = []string{}
} else if strings.Contains(log, "sub-process exited") {
current_watch = argoLogs.StopStepRecording(watch_output)
}
if current_watch != nil && !current_watch.Equals(previous_watch) && current_watch.Name != "" {
wg.Add(1)
checkStatus(current_watch, previous_watch)
jsonified, err := json.Marshal(current_watch)
if err != nil {
logger.Error().Msg("Could not create watch log")
}
if current_watch.Status == "Failed" {
wf_logger.Error().Msg(string(jsonified))
} else {
wf_logger.Info().Msg(string(jsonified))
}
previous_watch = current_watch
current_watch = &models.ArgoWatch{}
watch_output = []string{}
wg.Done()
}
}
}
// Debug, no logs sent
func logPods(pipe io.ReadCloser, name string) {
pods_logger = wf_logger.With().Str("pod_name", name).Logger()
scanner := bufio.NewScanner(pipe)
for scanner.Scan() {
log := scanner.Text()
pods_logger.Info().Msg(log)
}
}
func loadConfig(is_k8s bool, parser *argparse.Parser) {
var o *onion.Onion
@@ -298,9 +285,9 @@ func getContainerName(argo_file string) string {
}
// Uses the ArgoWatch object to update status of the workflow execution object
func checkStatus(current string, previous string) {
if current != previous {
updateStatus(current)
func checkStatus(current *models.ArgoWatch, previous *models.ArgoWatch) {
if previous != nil && current.Status != previous.Status {
updateStatus(current.Status)
}
}
@@ -317,6 +304,4 @@ func updateStatus(status string) {
if res.Code != 200 {
logger.Error().Msg("Could not update status for workflow execution " + exec_id)
}
fmt.Printf("status argo : %s /nstatus db : %s", status, serialized["state"])
}