diff --git a/main.go b/main.go index 07e1599..2c566c9 100644 --- a/main.go +++ b/main.go @@ -1,57 +1,90 @@ package main import ( + "bufio" + "encoding/json" + "fmt" + "io" "log" "os" + "os/exec" + "regexp" + "strings" "oc-monitor/conf" + "oc-monitor/models" + "oc-monitor/workflow_builder" + oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/logs" "github.com/akamensky/argparse" + "github.com/google/uuid" "github.com/goraz/onion" "github.com/rs/zerolog" ) -var logger zerolog.Logger -var parser argparse.Parser +var logger zerolog.Logger +var wf_logger zerolog.Logger +var parser argparse.Parser +var monitorLocal bool +var workflowName string const defaultConfigFile = "/etc/oc/ocmonitor_conf.json" const localConfigFile = "./conf/ocmonitor_conf.json" func main() { - + monitorLocal = false // Test if monitor is launched outside (with parameters) or in a k8s environment (env variables sets) if os.Getenv("KUBERNETES_SERVICE_HOST") == ""{ // Not in a k8s environment, get conf from parameters + fmt.Println("Executes outside of k8s") parser = *argparse.NewParser("oc-monitor","Launch the execution of a workflow given as a parameter and sends the produced logs to a loki database") loadConfig(false, &parser) - } else { - // Executed in a k8s environment + } else { + // Executed in a k8s environment + fmt.Println("Executes inside a k8s") + monitorLocal = true loadConfig(true,nil) } + logs.SetAppName("oc-monitor") logger = logs.CreateLogger("oc-monitor", conf.GetConfig().LokiURL) + logger.Debug().Msg("Loki URL : " + conf.GetConfig().LokiURL) - logger.Debug().Msg("Filename : " + conf.GetConfig().ArgoFile) - logger.Debug().Msg("Container Name : " + conf.GetConfig().ContainerName) + logger.Debug().Msg("Workflow executed : " + conf.GetConfig().WorkflowID) + + oclib.SetConfig(conf.GetConfig().MongoUrl,conf.GetConfig().Database) + oclib.Init("oc-monitor") - // Wait for the argo file to be copied to the pod - wf_found := false - for(!wf_found){ - if _, err := os.Stat("./workflows/" + conf.GetConfig().ArgoFile); err == nil { - wf_found = true - } + // create argo + new_wf := workflow_builder.WorflowDB{} + + err := new_wf.LoadFrom(conf.GetConfig().WorkflowID) + if err != nil { + logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API") } - logger.Debug().Msg("Submitting the argo workflow : " + conf.GetConfig().ArgoFile) + + argo_file_path, err := new_wf.ExportToArgo() + if err != nil { + logger.Error().Msg("Could not create the Argo file for " + conf.GetConfig().WorkflowID ) + logger.Error().Msg(err.Error()) + } + logger.Debug().Msg("Created :" + argo_file_path) + + workflowName = getContainerName(argo_file_path) + + wf_logger = logger.With().Str("argo_name", workflowName).Str("workflow_id",conf.GetConfig().WorkflowID).Logger() + wf_logger.Debug().Msg("Testing argo name") + + + executeWorkflow(argo_file_path) + + // logger.Info().Msg(string(output)) + // // Initialize LokiLogger // lokiLogger := NewLokiLogger("http://localhost:3100/loki/api/v1/push") // Replace with your Loki URL // Run the Argo command - // cmd := exec.Command("argo", "submit", "your-workflow.yaml") - // output, err := cmd.CombinedOutput() - // if err != nil { - // log.Fatalf("failed to run Argo command: %v", err) - // } // logger.Info().Msg(string(output)) // // Send logs to Loki @@ -62,11 +95,112 @@ func main() { log.Println("Logs sent to Loki successfully.") } +func executeWorkflow(argo_file_path string) { + var stdout, stderr io.ReadCloser + var err error + + cmd := exec.Command("argo", "submit", "--watch", "argo_workflows/"+argo_file_path, "--serviceaccount=argo", "-n", "argo") + if stdout, err = cmd.StdoutPipe(); err != nil{ + wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error()) + return + } + + if stderr, err = cmd.StderrPipe(); err != nil{ + wf_logger.Error().Msg("Could not retrieve stderrpipe " + err.Error()) + return + } + + if err := cmd.Start(); err != nil { + panic(err) + } + + go logWorkflow(stdout) + go logWorkflow(stderr) + + + if err := cmd.Wait(); err != nil { + wf_logger.Error().Msg("Could not execute argo submit") + } + + // Function to log the logs generated by the pods + // logPods(need to retrieve the name of the workflow) { + // executes "argo logs nameWorkflow" + // returns the output to a logging method + //} +} + +// We could improve this function by creating an object with the same attribute as the output +// and only send a new log if the current object has different values than the previous +func logWorkflow(pipe io.ReadCloser) { + var current_watch, previous_watch models.ArgoWatch + + watch_output := make([]string,0) + scanner := bufio.NewScanner(pipe) + for scanner.Scan() { + log := scanner.Text() + watch_output = append(watch_output, log) + + if(strings.HasPrefix(log, "Progress:")){ + current_watch = *models.NewArgoLogs(watch_output) + if(!current_watch.Equals(previous_watch)){ + jsonified, err := json.Marshal(current_watch) + if err != nil { + logger.Error().Msg("Could not create watch log") + } + wf_logger.Info().Msg(string(jsonified)) + previous_watch = current_watch + } + } + } +} + func loadConfig(is_k8s bool, parser *argparse.Parser){ var o *onion.Onion - logger = logs.CreateLogger("oc-monitor","") + o = initOnion(o) + + conf.GetConfig().OcCatalogUrl = o.GetStringDefault("oc-catalog","https://localhost:8087") + + + // These variables can only be retrieved in the onion + // Variables that don't depend on the environmen (from conf file), can be loaded after + // We can't use underscore in the env variable names because it's the delimitor with OCMONITOR too + setConf(is_k8s, o, parser) + + if (!IsValidUUID(conf.GetConfig().WorkflowID)){ + logger.Fatal().Msg("Provided ID is not an UUID") + } +} + +func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) { + if is_k8s { + + conf.GetConfig().LokiURL = o.GetStringDefault("lokiurl", "http://127.0.0.1:3100") + conf.GetConfig().WorkflowID = o.GetString("workflow") + conf.GetConfig().LokiURL = o.GetStringDefault("mongourl", "mongodb://127.0.0.1:27017") + conf.GetConfig().Database = o.GetStringDefault("database", "DC_myDC") + } else { + url := parser.String("u", "url", &argparse.Options{Required: true, Default: "http://127.0.0.1:3100", Help: "Url to the Loki database logs will be sent to"}) + workflow := parser.String("w", "workflow", &argparse.Options{Required: true, Help: "Name of the workflow to request from oc-catalog API"}) + mongo := parser.String("m", "mongo", &argparse.Options{Required: true, Help: "URL to reach the MongoDB"}) + db := parser.String("d", "database", &argparse.Options{Required: true, Help: "Name of the database to query in MongoDB"}) + err := parser.Parse(os.Args) + if err != nil { + fmt.Println(parser.Usage(err)) + os.Exit(1) + } + + conf.GetConfig().LokiURL = *url + conf.GetConfig().WorkflowID = *workflow + conf.GetConfig().MongoUrl = *mongo + conf.GetConfig().Database = *db + + } +} + +func initOnion(o *onion.Onion) *onion.Onion { + logger = logs.CreateLogger("oc-monitor", "") configFile := "" l3 := onion.NewEnvLayerPrefix("_", "OCMONITOR") @@ -90,26 +224,18 @@ func loadConfig(is_k8s bool, parser *argparse.Parser){ } else if l2 == nil { o = onion.New(l1, l3) } + return o +} - // These variables can only be retrieved in the onion - // Variables that don't depend on the environmen (from conf file), can be loaded after - if (is_k8s){ - // We can't use underscore in the env variable names because it's the delimitor with OCMONITOR too - conf.GetConfig().LokiURL = o.GetStringDefault("lokiurl", "http://127.0.0.1:3100") - conf.GetConfig().ArgoFile = o.GetString("argofile") - conf.GetConfig().ContainerName = o.GetString("containername") - } else{ - url := parser.String("u", "url", &argparse.Options{Required: true,Default: "http://127.0.0.1:3100"}) - conf.GetConfig().LokiURL = *url - file := parser.String("f", "file", &argparse.Options{Required: true}) - conf.GetConfig().ArgoFile = *file - name := parser.String("n", "name", &argparse.Options{Required: true}) - conf.GetConfig().ContainerName = *name +func IsValidUUID(u string) bool { + _, err := uuid.Parse(u) + return err == nil + } - err := parser.Parse(os.Args) - if err != nil { - logger.Fatal().Msg(parser.Usage(err)) - } + func getContainerName(argo_file string) string { + regex := "([a-zA-Z]+-[a-zA-Z]+)" + re := regexp.MustCompile(regex) - } -} \ No newline at end of file + container_name := re.FindString(argo_file) + return container_name +} diff --git a/workflow_builder/argo_builder.go b/workflow_builder/argo_builder.go index 9d49481..b19006f 100644 --- a/workflow_builder/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -9,7 +9,6 @@ import ( "oc-monitor/logger" . "oc-monitor/models" "os" - "slices" "strings" "time" @@ -21,7 +20,6 @@ import ( type ArgoBuilder struct { graph graph.Graph - branches [][]int Workflow Workflow } @@ -42,7 +40,6 @@ type Spec struct { } func (b *ArgoBuilder) CreateDAG() (string, error) { - fmt.Println("list of branches : ", b.branches) b.createTemplates() b.createDAGstep() @@ -52,7 +49,7 @@ func (b *ArgoBuilder) CreateDAG() (string, error) { b.Workflow.ApiVersion = "argoproj.io/v1alpha1" b.Workflow.Kind = "Workflow" random_name := generateWfName() - b.Workflow.Metadata.GenerateName = "oc-test-" + random_name + b.Workflow.Metadata.GenerateName = "oc-monitor-" + random_name yamlified, err := yaml.Marshal(b.Workflow) if err != nil { @@ -147,21 +144,15 @@ func (b *ArgoBuilder) createVolumes() { } func (b *ArgoBuilder) getDependency(current_computing_id string) (dependencies []string) { - var dependencies_id []string for _, link := range b.graph.Links { - source := b.graph.Items[link.Source.ID].Processing // Instead of searching for the AbstractResource we load the Processing pointer and test if it's nil to know if the item is a processing - if current_computing_id == link.Destination.ID && source != nil && !slices.Contains(dependencies_id, link.Source.ID) { - dependencies_id = append(dependencies_id, link.Source.ID) + source := b.graph.Items[link.Source.ID].Processing + if current_computing_id == link.Destination.ID && source != nil { + dependency_name := getArgoName(source.GetName(), link.Source.ID) + dependencies = append(dependencies, dependency_name) } } - for _, dependency := range dependencies_id { - source := b.graph.Items[dependency].Processing - dependency_name := getArgoName(source.GetName(), dependency) - dependencies = append(dependencies, dependency_name) - } - return } @@ -217,9 +208,7 @@ func getComputingArgs(user_input []string, command string) (list_args []string) return } - for _, arg := range user_input { - list_args = append(list_args, arg) - } + list_args = append(list_args, user_input...) return } @@ -312,4 +301,5 @@ func getStringValue(comp resource_model.AbstractResource, key string) string { return res.(string) } return "" -} \ No newline at end of file +} + diff --git a/workflow_builder/graph.go b/workflow_builder/graph.go index ef65832..0844fe8 100644 --- a/workflow_builder/graph.go +++ b/workflow_builder/graph.go @@ -3,7 +3,6 @@ package workflow_builder import ( "errors" "fmt" - "maps" "oc-monitor/logger" models "oc-monitor/models" @@ -18,7 +17,6 @@ import ( type WorflowDB struct { workflow_name string // used to test if the graph has been instatiated, private so can only be set by a graph's method graph *graph.Graph - links map[int]graph.GraphLink ws models.HttpQuery } @@ -31,7 +29,6 @@ func (w *WorflowDB) LoadFrom(workflow_id string) error { } w.graph = new_wf.Graph - w.links = w.getLinks() w.workflow_name = new_wf.Name @@ -57,43 +54,13 @@ func (w *WorflowDB) getWorkflow( workflow_id string) (workflow *workflow.Workflo } - -func (w *WorflowDB) getLinks() map[int]graph.GraphLink { - links := make(map[int]graph.GraphLink) - for i, link := range(w.graph.Links) { - links[i] = link - } - - return links -} - func (w *WorflowDB) ExportToArgo() (string, error) { if len(w.workflow_name) == 0 || &w.graph == nil { return "",fmt.Errorf("can't export a graph that has not been loaded yet") } - end_links := make(map[int]graph.GraphLink) - for i, link := range w.links { - if (!w.isDCLink(i) && !w.isSource(link.Destination.ID,i)){ - end_links[i] = link - } - } - // index_list := make([]int, len(w.links)) - // list_branches := make([][]string,0) - list_branches := w.getListBranches(end_links, nil,nil) - - // for _, branch := range list_branches{ - // str := "" - // for _, link := range branch{ - // str = str + " --> " + w.getComponentName(w.graph.Links[link].Source) + " linked with " + w.getComponentName(w.links[link].Destination) - // } - - // fmt.Println(str) - // } - - fmt.Println("Identified branches : ", list_branches) - argo_builder := ArgoBuilder{graph : *w.graph, branches: list_branches} + argo_builder := ArgoBuilder{graph : *w.graph} filename, err := argo_builder.CreateDAG() if err != nil { logger.Logger.Error().Msg("Could not create the argo file for " + w.workflow_name) @@ -102,46 +69,6 @@ func (w *WorflowDB) ExportToArgo() (string, error) { return filename, nil } -// Return a list containing the IDs of each link that make up a branch in the graph -func (w *WorflowDB) getListBranches(end_links map[int]graph.GraphLink, unvisited_links_list map[int]graph.GraphLink, current_branch []int) (list_branches [][]int) { - - if current_branch == nil { - current_branch = make([]int, 0) - } - - if unvisited_links_list == nil { - unvisited_links_list = make(map[int]graph.GraphLink,len(w.graph.Links)) - maps.Copy(unvisited_links_list,w.links) - fmt.Println(unvisited_links_list) - } - - for link_id, _ := range end_links { - j := link_id - new_branches := make([][]int,0) - - previous_index := w.getPreviousLink(j, unvisited_links_list) - if len(previous_index) == 0 { - list_branches = append(list_branches, []int{link_id}) - } - - for _, id_link := range previous_index { - current_branch = append([]int{link_id},current_branch...) - delete(unvisited_links_list, link_id) - // create a new branch for each previous link, appending the current path to this node to the created branch - new_end_link := make(map[int]graph.GraphLink,0) - new_end_link[id_link] = w.links[id_link] - new_branches = w.getListBranches(new_end_link,unvisited_links_list,current_branch) - - for _, new_branch := range new_branches{ - current_branch = append(new_branch,link_id) - list_branches = append(list_branches, current_branch) - } - } - } - - return -} - func (w *WorflowDB) ExportToHelm(id string) error { return nil @@ -150,7 +77,7 @@ func (w *WorflowDB) ExportToHelm(id string) error { // Return if it exists a link where Destination is the same as comp_id func (w *WorflowDB) isDestination(comp_id string,link_id int) bool { - for i, link := range w.links{ + for i, link := range w.graph.Links{ if(i !=link_id && link.Destination.ID == comp_id){ return true } @@ -163,7 +90,7 @@ func (w *WorflowDB) isDestination(comp_id string,link_id int) bool { // Return if it exists a link where Source is the same as comp_id func (w *WorflowDB) isSource(comp_id string, link_id int) bool { - for i, link := range w.links{ + for i, link := range w.graph.Links{ if(i !=link_id && link.Source.ID == comp_id && !w.isDCLink(i)){ return true } @@ -173,20 +100,6 @@ func (w *WorflowDB) isSource(comp_id string, link_id int) bool { } -// Returns an index number if their is a link in w.links -// with the same Destination id that the Source id in w.links[linkIndex] -// or nil if not - -func (w *WorflowDB) getPreviousLink(link_id int,map_link map[int]graph.GraphLink) (previous_id []int) { - for k, link := range map_link{ - if(k != link_id && link.Destination == w.links[link_id].Source){ - previous_id = append(previous_id, k) - } - } - - return -} - // returns either computing, data or storage @@ -228,7 +141,7 @@ func (w *WorflowDB) getComponentByType(compType string, link graph.GraphLink) (i } func (w *WorflowDB) isDCLink(link_id int) bool { - link := w.links[link_id] + link := w.graph.Links[link_id] dest := w.graph.Items[link.Destination.ID] dest_id := dest.GetAbstractRessource().GetID()