package tools import ( "context" "encoding/base64" "errors" "fmt" "io" "oc-monitord/conf" "oc-monitord/utils" "os" "time" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" "github.com/google/uuid" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) type KubernetesTools struct { Set *kubernetes.Clientset VersionedSet *versioned.Clientset } func NewKubernetesTool() (Tool, error) { // Load Kubernetes config (from ~/.kube/config) config := &rest.Config{ Host: conf.GetConfig().KubeHost + ":" + conf.GetConfig().KubePort, TLSClientConfig: rest.TLSClientConfig{ CAData: []byte(conf.GetConfig().KubeCA), CertData: []byte(conf.GetConfig().KubeCert), KeyData: []byte(conf.GetConfig().KubeData), }, } // Create clientset clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, errors.New("Error creating Kubernetes client: " + err.Error()) } clientset2, err := versioned.NewForConfig(config) if err != nil { return nil, errors.New("Error creating Kubernetes versionned client: " + err.Error()) } return &KubernetesTools{ Set: clientset, VersionedSet: clientset2, }, nil } func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, error) { // Read workflow YAML file workflowYAML, err := os.ReadFile(path) if err != nil { return "", err } // Decode the YAML into a Workflow struct scheme := runtime.NewScheme() _ = wfv1.AddToScheme(scheme) codecs := serializer.NewCodecFactory(scheme) decode := codecs.UniversalDeserializer().Decode obj, _, err := decode(workflowYAML, nil, nil) if err != nil { return "", errors.New("failed to decode YAML: " + err.Error()) } workflow, ok := obj.(*wfv1.Workflow) if !ok { return "", errors.New("decoded object is not a Workflow") } // Create the workflow in the "argo" namespace createdWf, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(ns).Create(context.TODO(), workflow, metav1.CreateOptions{}) if err != nil { return "", errors.New("failed to create workflow: " + err.Error()) } fmt.Printf("workflow %s created in namespace %s\n", createdWf.Name, ns) return createdWf.Name, nil } func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password string) (string, error) { // Namespace where the secret will be created namespace := "default" // Encode the secret data (Kubernetes requires base64-encoded values) secretData := map[string][]byte{ "access-key": []byte(base64.StdEncoding.EncodeToString([]byte(login))), "secret-key": []byte(base64.StdEncoding.EncodeToString([]byte(password))), } // Define the Secret object name := uuid.New().String() secret := &v1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: ns, }, Type: v1.SecretTypeOpaque, Data: secretData, } // Create the Secret in Kubernetes _, err := k.Set.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{}) if err != nil { return "", errors.New("Error creating secret: " + err.Error()) } return name, nil } func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch.Interface, error){ wfl := utils.GetWFLogger("") wfl.Debug().Msg("Starting argo watch with argo lib") fmt.Println("metadata.name=oc-monitor-"+wfName + " in namespace : " + executionId) options := metav1.ListOptions{FieldSelector: "metadata.name=oc-monitor-"+wfName} fmt.Println(options) watcher, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(executionId).Watch(context.TODO(), options) if err != nil { return nil, errors.New("Error executing 'argo watch " + wfName + " -n " + executionId + " with ArgoprojV1alpha1 client") } return watcher, nil } func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) { var targetPod v1.Pod pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{ LabelSelector: "workflows.argoproj.io/workflow="+wfName, }) if err != nil { return nil, fmt.Errorf("failed to list pods: " + err.Error()) } if len(pods.Items) == 0 { return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/node-name=" + nodeName) } for _, pod := range pods.Items { if pod.Annotations["workflows.argoproj.io/node-name"] == nodeName { targetPod = pod } } // k8s API throws an error if we try getting logs while the container are not initialized, so we repeat status check there k.testPodReady(targetPod, ns) // When using kubec logs for a pod we see it contacts /api/v1/namespaces/NAMESPACE/pods/oc-monitor-PODNAME/log?container=main so we add this container: main to the call req, err := k.Set.CoreV1().Pods(ns).GetLogs(targetPod.Name, &v1.PodLogOptions{Follow: true, Container: "main"}). Stream(context.Background()) if err != nil { return nil, fmt.Errorf(" Error when trying to get logs for " + targetPod.Name + " : " + err.Error()) } return req, nil } func (k *KubernetesTools) testPodReady(pod v1.Pod, ns string) { for { pod, err := k.Set.CoreV1().Pods(ns).Get(context.Background(), pod.Name, metav1.GetOptions{}) if err != nil { wfl := utils.GetWFLogger("") wfl.Error().Msg("Error fetching pod: " + err.Error() + "\n") break } var initialized bool for _, cond := range pod.Status.Conditions { if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue { initialized = true return } } if initialized { return } time.Sleep(2 * time.Second) // avoid hammering the API } }