From 98b6cdaae5fba7f0f477a0909d7d93cea830b433 Mon Sep 17 00:00:00 2001 From: pb Date: Wed, 7 Aug 2024 17:24:09 +0200 Subject: [PATCH] logging workflow execution --- conf/conf.go | 1 + go.mod | 8 ++-- go.sum | 12 +++++ main.go | 131 ++++++++++++++++++++++++++++++++++++++++----------- 4 files changed, 120 insertions(+), 32 deletions(-) diff --git a/conf/conf.go b/conf/conf.go index 9133eb8..24eef5c 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -4,6 +4,7 @@ import "sync" type Config struct { LokiURL string + ExecutionID string WorkflowID string OcCatalogUrl string MongoUrl string diff --git a/go.mod b/go.mod index 506c29a..c525bbf 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-monitor go 1.22.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20240805134753-17f62b649523 + cloud.o-forge.io/core/oc-lib v0.0.0-20240807134103-0ec80473ccf7 github.com/akamensky/argparse v1.4.0 github.com/goraz/onion v0.1.3 github.com/nats-io/nats-server/v2 v2.10.18 @@ -35,9 +35,9 @@ require ( github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect go.mongodb.org/mongo-driver v1.16.0 // indirect - golang.org/x/crypto v0.25.0 // indirect - golang.org/x/net v0.27.0 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/net v0.28.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.23.0 // indirect - golang.org/x/text v0.16.0 // indirect + golang.org/x/text v0.17.0 // indirect ) diff --git a/go.sum b/go.sum index b780dd1..b2a7e93 100644 --- a/go.sum +++ b/go.sum @@ -28,6 +28,12 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20240801145026-b125c9be0152 h1:eV8M9cjPpyzG9 cloud.o-forge.io/core/oc-lib v0.0.0-20240801145026-b125c9be0152/go.mod h1:V5EL+NV2s9P1/BcFm3/icfLeBYVVMLl1Z0F0eecJZGo= cloud.o-forge.io/core/oc-lib v0.0.0-20240805134753-17f62b649523 h1:/gN4169dtvbyi3+oLfHTMe8RlfX+P4VrV+1nfAThS+k= cloud.o-forge.io/core/oc-lib v0.0.0-20240805134753-17f62b649523/go.mod h1:V5EL+NV2s9P1/BcFm3/icfLeBYVVMLl1Z0F0eecJZGo= +cloud.o-forge.io/core/oc-lib v0.0.0-20240807111844-fe78927e739c h1:mt9c6vvW9sYhcIA5TM7YMHz5ItVYliatfWFPC4Gs/HM= +cloud.o-forge.io/core/oc-lib v0.0.0-20240807111844-fe78927e739c/go.mod h1:V5EL+NV2s9P1/BcFm3/icfLeBYVVMLl1Z0F0eecJZGo= +cloud.o-forge.io/core/oc-lib v0.0.0-20240807131622-6df71bde1d5e h1:LIZ2Mxwd9NQD2B6O07LZ1Zc7l6/eOhY9iOcFyr6DGDQ= +cloud.o-forge.io/core/oc-lib v0.0.0-20240807131622-6df71bde1d5e/go.mod h1:V5EL+NV2s9P1/BcFm3/icfLeBYVVMLl1Z0F0eecJZGo= +cloud.o-forge.io/core/oc-lib v0.0.0-20240807134103-0ec80473ccf7 h1:Q9fFnvEf0XzQvnCZ815wTRQ6zP/efU9RUcKXgcDoCng= +cloud.o-forge.io/core/oc-lib v0.0.0-20240807134103-0ec80473ccf7/go.mod h1:V5EL+NV2s9P1/BcFm3/icfLeBYVVMLl1Z0F0eecJZGo= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc= github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= @@ -149,6 +155,8 @@ golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.25.0 h1:ypSNr+bnYL2YhwoMt2zPxHFmbAN1KZs/njMG3hxUp30= golang.org/x/crypto v0.25.0/go.mod h1:T+wALwcMOSE0kXgUAnPAHqTLW+XHgcELELW8VaDgm/M= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -157,6 +165,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= @@ -186,6 +196,8 @@ golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= diff --git a/main.go b/main.go index 2c566c9..67c67c4 100644 --- a/main.go +++ b/main.go @@ -5,18 +5,22 @@ import ( "encoding/json" "fmt" "io" - "log" "os" "os/exec" "regexp" "strings" + "sync" + "time" "oc-monitor/conf" "oc-monitor/models" "oc-monitor/workflow_builder" oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/logs" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" + "github.com/akamensky/argparse" "github.com/google/uuid" "github.com/goraz/onion" @@ -29,6 +33,7 @@ var parser argparse.Parser var monitorLocal bool var workflowName string +const namespace = "-n argo" const defaultConfigFile = "/etc/oc/ocmonitor_conf.json" const localConfigFile = "./conf/ocmonitor_conf.json" @@ -51,12 +56,15 @@ func main() { logger = logs.CreateLogger("oc-monitor", conf.GetConfig().LokiURL) logger.Debug().Msg("Loki URL : " + conf.GetConfig().LokiURL) - logger.Debug().Msg("Workflow executed : " + conf.GetConfig().WorkflowID) + logger.Debug().Msg("Workflow executed : " + conf.GetConfig().ExecutionID) oclib.SetConfig(conf.GetConfig().MongoUrl,conf.GetConfig().Database) oclib.Init("oc-monitor") - // create argo + wf_id := retrieveWorkflowId(conf.GetConfig().ExecutionID) + conf.GetConfig().WorkflowID = wf_id + + // // create argo new_wf := workflow_builder.WorflowDB{} err := new_wf.LoadFrom(conf.GetConfig().WorkflowID) @@ -73,33 +81,35 @@ func main() { workflowName = getContainerName(argo_file_path) - wf_logger = logger.With().Str("argo_name", workflowName).Str("workflow_id",conf.GetConfig().WorkflowID).Logger() + wf_logger = logger.With().Str("argo_name", workflowName).Str("workflow_id",conf.GetConfig().WorkflowID).Str("workflow_execution_id",conf.GetConfig().ExecutionID).Logger() wf_logger.Debug().Msg("Testing argo name") executeWorkflow(argo_file_path) + + fmt.Println("Logs sent to Loki successfully.") +} - // logger.Info().Msg(string(output)) +func retrieveWorkflowId(exec_id string) string { - // // Initialize LokiLogger - // lokiLogger := NewLokiLogger("http://localhost:3100/loki/api/v1/push") // Replace with your Loki URL + res := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION),exec_id) - // Run the Argo command + if res.Code != 200 { + logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id) + return "" + } - // logger.Info().Msg(string(output)) - // // Send logs to Loki - // if err := lokiLogger.Log(`{job="argo"}`, string(output)); err != nil { - // log.Fatalf("failed to send logs to Loki: %v", err) - // } + wf_exec := res.ToWorkflowExecution() - log.Println("Logs sent to Loki successfully.") + return wf_exec.WorkflowID } func executeWorkflow(argo_file_path string) { var stdout, stderr io.ReadCloser + // var stderr io.ReadCloser var err error - cmd := exec.Command("argo", "submit", "--watch", "argo_workflows/"+argo_file_path, "--serviceaccount=argo", "-n", "argo") + cmd := exec.Command("argo", "submit", "--watch", "argo_workflows/"+argo_file_path, "--serviceaccount=argo", "-n","argo") if stdout, err = cmd.StdoutPipe(); err != nil{ wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error()) return @@ -114,24 +124,28 @@ func executeWorkflow(argo_file_path string) { panic(err) } - go logWorkflow(stdout) - go logWorkflow(stderr) + var wg sync.WaitGroup + + go logWorkflow(stdout, &wg) + go logWorkflow(stderr,&wg) + + + time.Sleep(time.Second * 1) + go logPods(workflowName) if err := cmd.Wait(); err != nil { wf_logger.Error().Msg("Could not execute argo submit") + wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderr).Text()) + // updateStatus(exec_id, FATAL) } - // Function to log the logs generated by the pods - // logPods(need to retrieve the name of the workflow) { - // executes "argo logs nameWorkflow" - // returns the output to a logging method - //} + wg.Wait() } // We could improve this function by creating an object with the same attribute as the output // and only send a new log if the current object has different values than the previous -func logWorkflow(pipe io.ReadCloser) { +func logWorkflow(pipe io.ReadCloser, wg *sync.WaitGroup) { var current_watch, previous_watch models.ArgoWatch watch_output := make([]string,0) @@ -142,18 +156,56 @@ func logWorkflow(pipe io.ReadCloser) { if(strings.HasPrefix(log, "Progress:")){ current_watch = *models.NewArgoLogs(watch_output) + fmt.Println("Status : " + current_watch.Status) + workflowName = current_watch.Name if(!current_watch.Equals(previous_watch)){ + wg.Add(1) + defer wg.Done() + if(current_watch.Status == "Succeeded"){ + fmt.Print() + } + checkStatus(current_watch.Status, previous_watch.Status) jsonified, err := json.Marshal(current_watch) if err != nil { logger.Error().Msg("Could not create watch log") } wf_logger.Info().Msg(string(jsonified)) previous_watch = current_watch + current_watch = models.ArgoWatch{} } } } } +// Debug, no logs sent +func logPods(workflow_name string){ + var stderr io.ReadCloser + var err error + + cmd := exec.Command("argo","logs",workflow_name, "-n", "argo") + + if stderr, err = cmd.StderrPipe(); err != nil{ + wf_logger.Error().Msg("Could not retrieve stderrpipe " + err.Error()) + return + } + + if err := cmd.Start(); err != nil { + panic(err) + } + + scanner := bufio.NewScanner(stderr) + for scanner.Scan() { + log := scanner.Text() + // fmt.Println(log) + wf_logger.Info().Msg(log) + } + + if err := cmd.Wait(); err != nil { + wf_logger.Error().Msg("Could not execute argo logs") + } + +} + func loadConfig(is_k8s bool, parser *argparse.Parser){ var o *onion.Onion @@ -168,7 +220,7 @@ func loadConfig(is_k8s bool, parser *argparse.Parser){ // We can't use underscore in the env variable names because it's the delimitor with OCMONITOR too setConf(is_k8s, o, parser) - if (!IsValidUUID(conf.GetConfig().WorkflowID)){ + if (!IsValidUUID(conf.GetConfig().ExecutionID)){ logger.Fatal().Msg("Provided ID is not an UUID") } } @@ -177,12 +229,12 @@ func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) { if is_k8s { conf.GetConfig().LokiURL = o.GetStringDefault("lokiurl", "http://127.0.0.1:3100") - conf.GetConfig().WorkflowID = o.GetString("workflow") - conf.GetConfig().LokiURL = o.GetStringDefault("mongourl", "mongodb://127.0.0.1:27017") + conf.GetConfig().ExecutionID = o.GetString("workflow") + conf.GetConfig().MongoUrl = o.GetStringDefault("mongourl", "mongodb://127.0.0.1:27017") conf.GetConfig().Database = o.GetStringDefault("database", "DC_myDC") } else { url := parser.String("u", "url", &argparse.Options{Required: true, Default: "http://127.0.0.1:3100", Help: "Url to the Loki database logs will be sent to"}) - workflow := parser.String("w", "workflow", &argparse.Options{Required: true, Help: "Name of the workflow to request from oc-catalog API"}) + workflow := parser.String("w", "workflow", &argparse.Options{Required: true, Help: "Execution ID of the workflow to request from oc-catalog API"}) mongo := parser.String("m", "mongo", &argparse.Options{Required: true, Help: "URL to reach the MongoDB"}) db := parser.String("d", "database", &argparse.Options{Required: true, Help: "Name of the database to query in MongoDB"}) err := parser.Parse(os.Args) @@ -192,7 +244,7 @@ func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) { } conf.GetConfig().LokiURL = *url - conf.GetConfig().WorkflowID = *workflow + conf.GetConfig().ExecutionID = *workflow conf.GetConfig().MongoUrl = *mongo conf.GetConfig().Database = *db @@ -239,3 +291,26 @@ func IsValidUUID(u string) bool { container_name := re.FindString(argo_file) return container_name } + +// Uses the ArgoWatch object to update status of the workflow execution object +func checkStatus(current string, previous string) { + if current != previous { + updateStatus(current) + } +} + +func updateStatus(status string) { + exec_id := conf.GetConfig().ExecutionID + + wf_exec := &workflow_execution.WorkflowExecution{} + wf_exec.ArgoStatusToState(status) + + serialized := wf_exec.Serialize() + res := oclib.UpdateOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION),serialized, exec_id) + + if res.Code != 200 { + logger.Error().Msg("Could not update status for workflow execution " + exec_id) + } + + fmt.Printf("status argo : %s /nstatus db : %s",status,serialized["state"]) +} \ No newline at end of file