package tools import ( "context" "encoding/base64" "errors" "fmt" "io" "oc-monitord/conf" "oc-monitord/utils" "os" "time" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" "github.com/google/uuid" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) type KubernetesTools struct { Set *kubernetes.Clientset VersionedSet *versioned.Clientset } func NewKubernetesTool() (Tool, error) { // Load Kubernetes config (from ~/.kube/config) config := &rest.Config{ Host: conf.GetConfig().KubeHost + ":" + conf.GetConfig().KubePort, TLSClientConfig: rest.TLSClientConfig{ CAData: []byte(conf.GetConfig().KubeCA), CertData: []byte(conf.GetConfig().KubeCert), KeyData: []byte(conf.GetConfig().KubeData), }, } // Create clientset clientset, err := kubernetes.NewForConfig(config) if err != nil { return nil, errors.New("Error creating Kubernetes client: " + err.Error()) } clientset2, err := versioned.NewForConfig(config) if err != nil { return nil, errors.New("Error creating Kubernetes versionned client: " + err.Error()) } return &KubernetesTools{ Set: clientset, VersionedSet: clientset2, }, nil } func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, error) { // Read workflow YAML file workflowYAML, err := os.ReadFile(path) if err != nil { return "", err } // Decode the YAML into a Workflow struct scheme := runtime.NewScheme() _ = wfv1.AddToScheme(scheme) codecs := serializer.NewCodecFactory(scheme) decode := codecs.UniversalDeserializer().Decode obj, _, err := decode(workflowYAML, nil, nil) if err != nil { return "", errors.New("failed to decode YAML: " + err.Error()) } workflow, ok := obj.(*wfv1.Workflow) if !ok { return "", errors.New("decoded object is not a Workflow") } // Create the workflow in the "argo" namespace createdWf, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(ns).Create(context.TODO(), workflow, metav1.CreateOptions{}) if err != nil { return "", errors.New("failed to create workflow: " + err.Error()) } l := utils.GetLogger() l.Info().Msg(fmt.Sprintf("workflow %s created in namespace %s\n", createdWf.Name, ns)) return createdWf.Name, nil } func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password string) (string, error) { // Namespace where the secret will be created namespace := "default" // Encode the secret data (Kubernetes requires base64-encoded values) secretData := map[string][]byte{ "access-key": []byte(base64.StdEncoding.EncodeToString([]byte(login))), "secret-key": []byte(base64.StdEncoding.EncodeToString([]byte(password))), } // Define the Secret object name := uuid.New().String() secret := &v1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: ns, }, Type: v1.SecretTypeOpaque, Data: secretData, } // Create the Secret in Kubernetes _, err := k.Set.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{}) if err != nil { return "", errors.New("Error creating secret: " + err.Error()) } return name, nil } func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch.Interface, error){ wfl := utils.GetWFLogger("") wfl.Debug().Msg("Starting argo watch with argo lib") options := metav1.ListOptions{FieldSelector: "metadata.name=oc-monitor-"+wfName} watcher, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(executionId).Watch(context.Background(), options) if err != nil { return nil, errors.New("Error executing 'argo watch " + wfName + " -n " + executionId + " with ArgoprojV1alpha1 client") } return watcher, nil } func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) { var targetPod v1.Pod pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{ LabelSelector: "workflows.argoproj.io/workflow="+wfName, }) if err != nil { return nil, fmt.Errorf("failed to list pods: " + err.Error()) } if len(pods.Items) == 0 { return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/workflow="+ wfName + " no pods found with label workflows.argoproj.io/node-name=" + nodeName + " in namespace " + ns) } for _, pod := range pods.Items { if pod.Annotations["workflows.argoproj.io/node-name"] == nodeName { targetPod = pod } } // k8s API throws an error if we try getting logs while the container are not initialized, so we repeat status check there k.testPodReady(targetPod, ns) // When using kubec logs for a pod we see it contacts /api/v1/namespaces/NAMESPACE/pods/oc-monitor-PODNAME/log?container=main so we add this container: main to the call req, err := k.Set.CoreV1().Pods(ns).GetLogs(targetPod.Name, &v1.PodLogOptions{Follow: true, Container: "main"}). Stream(context.Background()) if err != nil { return nil, fmt.Errorf(" Error when trying to get logs for " + targetPod.Name + " : " + err.Error()) } return req, nil } func (k *KubernetesTools) testPodReady(pod v1.Pod, ns string) { for { pod, err := k.Set.CoreV1().Pods(ns).Get(context.Background(), pod.Name, metav1.GetOptions{}) if err != nil { wfl := utils.GetWFLogger("") wfl.Error().Msg("Error fetching pod: " + err.Error() + "\n") break } var initialized bool for _, cond := range pod.Status.Conditions { // It seems that for remote pods the pod gets the Succeeded status before it has time to display the it is ready to run in .status.conditions,so we added the OR condition if (cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue) || pod.Status.Phase == v1.PodSucceeded { initialized = true return } } if initialized { return } time.Sleep(2 * time.Second) // avoid hammering the API } }