oc-monitord/tools/kubernetes.go

187 lines
5.7 KiB
Go
Raw Normal View History

2025-02-14 12:00:29 +01:00
package tools
import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"oc-monitord/conf"
2025-02-17 16:54:25 +01:00
"oc-monitord/utils"
2025-02-14 12:00:29 +01:00
"os"
2025-02-17 16:54:25 +01:00
"time"
2025-02-14 12:00:29 +01:00
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
"github.com/google/uuid"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/watch"
2025-02-14 12:00:29 +01:00
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
type KubernetesTools struct {
Set *kubernetes.Clientset
VersionedSet *versioned.Clientset
}
func NewKubernetesTool() (Tool, error) {
// Load Kubernetes config (from ~/.kube/config)
config := &rest.Config{
Host: conf.GetConfig().KubeHost + ":" + conf.GetConfig().KubePort,
TLSClientConfig: rest.TLSClientConfig{
CAData: []byte(conf.GetConfig().KubeCA),
CertData: []byte(conf.GetConfig().KubeCert),
KeyData: []byte(conf.GetConfig().KubeData),
},
}
// Create clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, errors.New("Error creating Kubernetes client: " + err.Error())
}
clientset2, err := versioned.NewForConfig(config)
if err != nil {
return nil, errors.New("Error creating Kubernetes versionned client: " + err.Error())
}
2025-02-14 12:00:29 +01:00
return &KubernetesTools{
Set: clientset,
VersionedSet: clientset2,
}, nil
}
2025-02-17 16:54:25 +01:00
func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, error) {
2025-02-14 12:00:29 +01:00
// Read workflow YAML file
workflowYAML, err := os.ReadFile(path)
if err != nil {
2025-02-17 16:54:25 +01:00
return "", err
2025-02-14 12:00:29 +01:00
}
// Decode the YAML into a Workflow struct
scheme := runtime.NewScheme()
_ = wfv1.AddToScheme(scheme)
codecs := serializer.NewCodecFactory(scheme)
decode := codecs.UniversalDeserializer().Decode
obj, _, err := decode(workflowYAML, nil, nil)
if err != nil {
2025-02-17 16:54:25 +01:00
return "", errors.New("failed to decode YAML: " + err.Error())
2025-02-14 12:00:29 +01:00
}
workflow, ok := obj.(*wfv1.Workflow)
if !ok {
2025-02-17 16:54:25 +01:00
return "", errors.New("decoded object is not a Workflow")
2025-02-14 12:00:29 +01:00
}
// Create the workflow in the "argo" namespace
createdWf, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(ns).Create(context.TODO(), workflow, metav1.CreateOptions{})
2025-02-14 12:00:29 +01:00
if err != nil {
2025-02-17 16:54:25 +01:00
return "", errors.New("failed to create workflow: " + err.Error())
2025-02-14 12:00:29 +01:00
}
fmt.Printf("workflow %s created in namespace %s\n", createdWf.Name, ns)
2025-02-17 16:54:25 +01:00
return createdWf.Name, nil
2025-02-14 12:00:29 +01:00
}
func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password string) (string, error) {
// Namespace where the secret will be created
namespace := "default"
// Encode the secret data (Kubernetes requires base64-encoded values)
secretData := map[string][]byte{
"access-key": []byte(base64.StdEncoding.EncodeToString([]byte(login))),
"secret-key": []byte(base64.StdEncoding.EncodeToString([]byte(password))),
}
// Define the Secret object
name := uuid.New().String()
secret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Type: v1.SecretTypeOpaque,
Data: secretData,
}
// Create the Secret in Kubernetes
_, err := k.Set.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{})
2025-02-14 12:00:29 +01:00
if err != nil {
return "", errors.New("Error creating secret: " + err.Error())
}
return name, nil
}
func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch.Interface, error){
wfl := utils.GetWFLogger("")
wfl.Debug().Msg("Starting argo watch with argo lib")
fmt.Println("metadata.name=oc-monitor-"+wfName + " in namespace : " + executionId)
options := metav1.ListOptions{FieldSelector: "metadata.name=oc-monitor-"+wfName}
fmt.Println(options)
watcher, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(executionId).Watch(context.TODO(), options)
if err != nil {
return nil, errors.New("Error executing 'argo watch " + wfName + " -n " + executionId + " with ArgoprojV1alpha1 client")
}
return watcher, nil
}
func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) {
var targetPod v1.Pod
pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{
LabelSelector: "workflows.argoproj.io/workflow="+wfName,
})
if err != nil {
return nil, fmt.Errorf("failed to list pods: " + err.Error())
}
if len(pods.Items) == 0 {
return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/node-name=" + nodeName)
}
for _, pod := range pods.Items {
if pod.Annotations["workflows.argoproj.io/node-name"] == nodeName {
targetPod = pod
}
}
// k8s API throws an error if we try getting logs while the container are not initialized, so we repeat status check there
k.testPodReady(targetPod, ns)
// When using kubec logs for a pod we see it contacts /api/v1/namespaces/NAMESPACE/pods/oc-monitor-PODNAME/log?container=main so we add this container: main to the call
req, err := k.Set.CoreV1().Pods(ns).GetLogs(targetPod.Name, &v1.PodLogOptions{Follow: true, Container: "main"}). Stream(context.Background())
if err != nil {
return nil, fmt.Errorf(" Error when trying to get logs for " + targetPod.Name + " : " + err.Error())
}
return req, nil
}
func (k *KubernetesTools) testPodReady(pod v1.Pod, ns string) {
for {
pod, err := k.Set.CoreV1().Pods(ns).Get(context.Background(), pod.Name, metav1.GetOptions{})
if err != nil {
wfl := utils.GetWFLogger("")
wfl.Error().Msg("Error fetching pod: " + err.Error() + "\n")
break
}
var initialized bool
for _, cond := range pod.Status.Conditions {
if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue {
initialized = true
return
}
}
if initialized {
return
}
time.Sleep(2 * time.Second) // avoid hammering the API
}
}