diff --git a/logger/local_argo_logs.go b/logger/local_argo_logs.go index 95dcdfb..7d3e3c5 100644 --- a/logger/local_argo_logs.go +++ b/logger/local_argo_logs.go @@ -3,7 +3,6 @@ package logger import ( "bufio" "encoding/json" - "fmt" "io" "oc-monitord/conf" @@ -71,7 +70,6 @@ func LogLocalWorkflow(wfName string, pipe io.ReadCloser, wg *sync.WaitGroup) { logger = logs.GetLogger() logger.Debug().Msg("created wf_logger") - fmt.Println("created wf_logger") wfLogger = logger.With().Str("argo_name", wfName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger() var current_watch, previous_watch ArgoWatch @@ -111,7 +109,6 @@ func LogLocalPod(wfName string, pipe io.ReadCloser, steps []string, wg *sync.Wai scanner := bufio.NewScanner(pipe) for scanner.Scan() { var podLogger zerolog.Logger - fmt.Println("new line") wg.Add(1) line := scanner.Text() diff --git a/main.go b/main.go index 264416e..115881f 100644 --- a/main.go +++ b/main.go @@ -106,11 +106,11 @@ func main() { if conf.GetConfig().KubeHost == "" { // Not in a k8s environment, get conf from parameters - fmt.Println("Executes outside of k8s") + logger.Info().Msg("Executes outside of k8s") executeOutside(argoFilePath, builder.Workflow) } else { // Executed in a k8s environment - fmt.Println("Executes inside a k8s") + logger.Info().Msg("Executes inside a k8s") // executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID() executeInside(conf.GetConfig().ExecutionID, conf.GetConfig().ExecutionID, argoFilePath) } @@ -128,9 +128,9 @@ func executeInside(execID string, ns string, argo_file_path string) { _ = name if err != nil { logger.Error().Msg("Could not create argo workflow : " + err.Error()) - fmt.Println("CA :" + conf.GetConfig().KubeCA) - fmt.Println("Cert :" + conf.GetConfig().KubeCert) - fmt.Println("Data :" + conf.GetConfig().KubeData) + logger.Info().Msg(fmt.Sprint("CA :" + conf.GetConfig().KubeCA)) + logger.Info().Msg(fmt.Sprint("Cert :" + conf.GetConfig().KubeCert)) + logger.Info().Msg(fmt.Sprint("Data :" + conf.GetConfig().KubeData)) return } else { watcher, err := t.GetArgoWatch(execID, workflowName) @@ -174,7 +174,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { go l.LogLocalWorkflow(workflowName, stdoutSubmit, &wg) go l.LogLocalPod(workflowName, stdoutLogs, steps, &wg) - fmt.Println("Starting argo submit") + logger.Info().Msg("Starting argo submit") if err := cmdSubmit.Start(); err != nil { wf_logger.Error().Msg("Could not start argo submit") wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text()) @@ -183,7 +183,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { time.Sleep(5 * time.Second) - fmt.Println("Running argo logs") + logger.Info().Msg("Running argo logs") if err := cmdLogs.Run(); err != nil { wf_logger.Error().Msg("Could not run '" + strings.Join(cmdLogs.Args, " ") + "'") @@ -191,7 +191,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { } - fmt.Println("Waiting argo submit") + logger.Info().Msg("Waiting argo submit") if err := cmdSubmit.Wait(); err != nil { wf_logger.Error().Msg("Could not execute argo submit") wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text()) @@ -232,7 +232,7 @@ func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) { err := parser.Parse(os.Args) if err != nil { - fmt.Println(parser.Usage(err)) + logger.Info().Msg(parser.Usage(err)) os.Exit(1) } conf.GetConfig().Logs = "debug" diff --git a/tools/kubernetes.go b/tools/kubernetes.go index 1825cf7..a5bfe44 100644 --- a/tools/kubernetes.go +++ b/tools/kubernetes.go @@ -83,7 +83,8 @@ func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, er if err != nil { return "", errors.New("failed to create workflow: " + err.Error()) } - fmt.Printf("workflow %s created in namespace %s\n", createdWf.Name, ns) + l := utils.GetLogger() + l.Info().Msg(fmt.Sprintf("workflow %s created in namespace %s\n", createdWf.Name, ns)) return createdWf.Name, nil } @@ -117,9 +118,7 @@ func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password s func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch.Interface, error){ wfl := utils.GetWFLogger("") wfl.Debug().Msg("Starting argo watch with argo lib") - fmt.Println("metadata.name=oc-monitor-"+wfName + " in namespace : " + executionId) options := metav1.ListOptions{FieldSelector: "metadata.name=oc-monitor-"+wfName} - fmt.Println(options) watcher, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(executionId).Watch(context.TODO(), options) if err != nil { return nil, errors.New("Error executing 'argo watch " + wfName + " -n " + executionId + " with ArgoprojV1alpha1 client") diff --git a/workflow_builder/admiralty_setter.go b/workflow_builder/admiralty_setter.go index 4628695..70743b5 100644 --- a/workflow_builder/admiralty_setter.go +++ b/workflow_builder/admiralty_setter.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "net/http" + "oc-monitord/utils" "slices" "time" @@ -75,12 +76,14 @@ func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCalle var kubedata map[string]string _ = s.callRemoteExecution(peer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_KUBECONFIG, tools.GET, nil, true) if caller.LastResults["body"] == nil || len(caller.LastResults["body"].([]byte)) == 0 { - fmt.Println("Something went wrong when retrieving data from Get call for kubeconfig") + l := utils.GetLogger() + l.Error().Msg("Something went wrong when retrieving data from Get call for kubeconfig") panic(0) } err := json.Unmarshal(caller.LastResults["body"].([]byte), &kubedata) if err != nil { - fmt.Println("Something went wrong when unmarshalling data from Get call for kubeconfig") + l := utils.GetLogger() + l.Error().Msg("Something went wrong when unmarshalling data from Get call for kubeconfig") panic(0) } @@ -88,18 +91,18 @@ func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCalle } func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int,caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) *peer.PeerExecution { + l := utils.GetLogger() resp, err := peer.LaunchPeerExecution(peer.UUID, dataID, dt, method, body, caller) if err != nil { - fmt.Println("Error when executing on peer at", peer.Url) - fmt.Println(err) + l.Error().Msg("Error when executing on peer at" + peer.Url) + l.Error().Msg(err.Error()) panic(0) } if !slices.Contains(expectedCode, caller.LastResults["code"].(int)) { - fmt.Println("Didn't receive the expected code :", caller.LastResults["code"], "when expecting", expectedCode) + l.Error().Msg(fmt.Sprint("Didn't receive the expected code :", caller.LastResults["code"], "when expecting", expectedCode)) if _, ok := caller.LastResults["body"]; ok { - logger.Info().Msg(string(caller.LastResults["body"].([]byte))) - // fmt.Println(string(caller.LastResults["body"].([]byte))) + l.Info().Msg(string(caller.LastResults["body"].([]byte))) } if panicCode { panic(0) @@ -120,7 +123,8 @@ func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller){ name := metadata.(map[string]interface{})["name"].(string) s.NodeName = name } else { - fmt.Println("Could not retrieve data about the recently created node") + l := utils.GetLogger() + l.Error().Msg("Could not retrieve data about the recently created node") panic(0) } } diff --git a/workflow_builder/argo_builder.go b/workflow_builder/argo_builder.go index f29a505..b536160 100644 --- a/workflow_builder/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -65,7 +65,7 @@ type Spec struct { // add s3, gcs, azure, etc if needed on a link between processing and storage func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, []string, error) { logger = logs.GetLogger() - fmt.Println("Creating DAG", b.OriginWorkflow.Graph.Items) + logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items)) // handle services by checking if there is only one processing with hostname and port firstItems, lastItems, volumes := b.createTemplates(namespace) b.createVolumes(volumes) @@ -90,10 +90,10 @@ func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []V firstItems := []string{} lastItems := []string{} items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) - fmt.Println("Creating templates", len(items)) + logger.Info().Msg(fmt.Sprint("Creating templates", len(items))) for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) { instance := item.Processing.GetSelectedInstance() - fmt.Println("Creating template for", item.Processing.GetName(), instance) + logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance)) if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil { logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName()) return firstItems, lastItems, volumes @@ -176,7 +176,7 @@ func (b *ArgoBuilder) createArgoTemplates(namespace string, lastItems []string) ([]VolumeMount, []string, []string) { _, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems) template := &Template{Name: getArgoName(processing.GetName(), id)} - fmt.Println("Creating template for", template.Name) + logger.Info().Msg(fmt.Sprint("Creating template for", template.Name)) isReparted, peerId := b.isProcessingReparted(*processing, id) template.CreateContainer(processing, b.Workflow.getDag()) if isReparted { @@ -325,7 +325,7 @@ func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) { isDeps := false for _, link := range b.OriginWorkflow.Graph.Links { if _, ok := b.OriginWorkflow.Graph.Items[link.Destination.ID]; !ok { - fmt.Println("Could not find the source of the link", link.Destination.ID) + logger.Info().Msg(fmt.Sprint("Could not find the source of the link", link.Destination.ID)) continue } source := b.OriginWorkflow.Graph.Items[link.Destination.ID].Processing @@ -345,7 +345,7 @@ func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) { func (b *ArgoBuilder) getArgoDependencies(id string) (dependencies []string) { for _, link := range b.OriginWorkflow.Graph.Links { if _, ok := b.OriginWorkflow.Graph.Items[link.Source.ID]; !ok { - fmt.Println("Could not find the source of the link", link.Source.ID) + logger.Info().Msg(fmt.Sprint("Could not find the source of the link", link.Source.ID)) continue } source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing @@ -391,7 +391,7 @@ func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResour peer := *res.ToPeer() isNotReparted := peer.State == 1 - fmt.Println("Result IsMySelf for ", peer.UUID, " : ", isNotReparted) + logger.Info().Msg(fmt.Sprint("Result IsMySelf for ", peer.UUID, " : ", isNotReparted)) return !isNotReparted, peer.UUID } @@ -405,7 +405,7 @@ func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.Compu } else if link.Destination.ID == graphID { oppositeId = link.Source.ID } - fmt.Println("OppositeId : ", oppositeId) + if oppositeId != "" { dt, res := b.OriginWorkflow.Graph.GetResource(oppositeId) if dt == oclib.COMPUTE_RESOURCE { @@ -422,19 +422,18 @@ func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.Compu // Execute the last actions once the YAML file for the Argo Workflow is created func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { - fmt.Println("DEV :: Completing build") + logger.Info().Msg(fmt.Sprint("DEV :: Completing build")) setter := AdmiraltySetter{Id: executionsId} // Setup admiralty for each node for _, peer := range b.RemotePeers { - fmt.Println("DEV :: Launching Admiralty Setup for ", peer) + logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer)) setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer) } // Update the name of the admiralty node to use for _, template := range b.Workflow.Spec.Templates { if len(template.Metadata.Annotations) > 0 { - if resp, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok { - fmt.Println(resp) + if _, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok { template.Metadata.Annotations["multicluster.admiralty.io/clustername"] = "target-" + conf.GetConfig().ExecutionID } } diff --git a/workflow_builder/graph.go b/workflow_builder/graph.go index 6cb688e..edc0fc3 100644 --- a/workflow_builder/graph.go +++ b/workflow_builder/graph.go @@ -14,7 +14,7 @@ type WorflowDB struct { // Create the obj!ects from the mxgraphxml stored in the workflow given as a parameter func (w *WorflowDB) LoadFrom(workflow_id string, peerID string) error { - fmt.Println("Loading workflow from " + workflow_id) + logger.Info().Msg("Loading workflow from " + workflow_id) var err error if w.Workflow, err = w.getWorkflow(workflow_id, peerID); err != nil { return err @@ -27,7 +27,7 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo logger := oclib.GetLogger() lib_data := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerID, []string{}, nil).LoadOne(workflow_id) - fmt.Println("ERR", lib_data.Code, lib_data.Err) + logger.Info().Msg(fmt.Sprint("ERR", lib_data.Code, lib_data.Err)) if lib_data.Code != 200 { logger.Error().Msg("Error loading the graph") return workflow, errors.New(lib_data.Err) @@ -43,7 +43,7 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, int, error) { logger := oclib.GetLogger() - fmt.Println("Exporting to Argo", w.Workflow) + logger.Info().Msg(fmt.Sprint("Exporting to Argo", w.Workflow)) if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil { return nil, 0, fmt.Errorf("can't export a graph that has not been loaded yet") }