package infrastructure import ( "bytes" "context" "encoding/base64" "encoding/json" "errors" "fmt" "html/template" "oc-datacenter/conf" "strings" authv1 "k8s.io/api/authentication/v1" v1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) type KubernetesService struct { Set *kubernetes.Clientset } func NewKubernetesService() (Infrastructure, error) { config := &rest.Config{ Host: conf.GetConfig().KubeHost + ":" + conf.GetConfig().KubePort, TLSClientConfig: rest.TLSClientConfig{ CAData: []byte(conf.GetConfig().KubeCA), CertData: []byte(conf.GetConfig().KubeCert), KeyData: []byte(conf.GetConfig().KubeData), }, } // Create clientset clientset, err := kubernetes.NewForConfig(config) fmt.Println("NewForConfig", clientset, err) if err != nil { return nil, errors.New("Error creating Kubernetes client: " + err.Error()) } if clientset == nil { return nil, errors.New("Error creating Kubernetes client: clientset is nil") } return &KubernetesService{ Set: clientset, }, nil } func NewRemoteKubernetesService(url string, ca string, cert string, key string) (Infrastructure, error) { decodedCa, _ := base64.StdEncoding.DecodeString(ca) decodedCert, _ := base64.StdEncoding.DecodeString(cert) decodedKey, _ := base64.StdEncoding.DecodeString(key) config := &rest.Config{ Host: url + ":6443", TLSClientConfig: rest.TLSClientConfig{ CAData: decodedCa, CertData: decodedCert, KeyData: decodedKey, }, } // Create clientset clientset, err := kubernetes.NewForConfig(config) fmt.Println("NewForConfig", clientset, err) if err != nil { return nil, errors.New("Error creating Kubernetes client: " + err.Error()) } if clientset == nil { return nil, errors.New("Error creating Kubernetes client: clientset is nil") } return &KubernetesService{ Set: clientset, }, nil } func (k *KubernetesService) CreateNamespace(ctx context.Context, ns string) error { // Define the namespace fmt.Println("ExecutionID in CreateNamespace() : ", ns) namespace := &v1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: ns, }, } // Create the namespace fmt.Println("Creating namespace...", k.Set) if _, err := k.Set.CoreV1().Namespaces().Create(ctx, namespace, metav1.CreateOptions{}); err != nil { return errors.New("Error creating namespace: " + err.Error()) } fmt.Println("Namespace created successfully!") return nil } func (k *KubernetesService) CreateServiceAccount(ctx context.Context, ns string) error { // Create the ServiceAccount object serviceAccount := &v1.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: "sa-" + ns, Namespace: ns, }, } // Create the ServiceAccount in the specified namespace _, err := k.Set.CoreV1().ServiceAccounts(ns).Create(ctx, serviceAccount, metav1.CreateOptions{}) if err != nil { return errors.New("Failed to create ServiceAccount: " + err.Error()) } return nil } func (k *KubernetesService) CreateRole(ctx context.Context, ns string, role string, groups [][]string, resources [][]string, verbs [][]string) error { // Create the Role object if len(groups) != len(resources) || len(resources) != len(verbs) { return errors.New("Invalid input: groups, resources, and verbs must have the same length") } rules := []rbacv1.PolicyRule{} for i, group := range groups { rules = append(rules, rbacv1.PolicyRule{ APIGroups: group, Resources: resources[i], Verbs: verbs[i], }) } r := &rbacv1.Role{ ObjectMeta: metav1.ObjectMeta{ Name: role, Namespace: ns, }, Rules: rules, } // Create the Role in the specified namespace _, err := k.Set.RbacV1().Roles(ns).Create(ctx, r, metav1.CreateOptions{}) if err != nil { return errors.New("Failed to create Role: " + err.Error()) } return nil } func (k *KubernetesService) CreateRoleBinding(ctx context.Context, ns string, roleBinding string, role string) error { // Create the RoleBinding object rb := &rbacv1.RoleBinding{ ObjectMeta: metav1.ObjectMeta{ Name: roleBinding, Namespace: ns, }, Subjects: []rbacv1.Subject{ { Kind: "ServiceAccount", Name: "sa-" + ns, Namespace: ns, }, }, RoleRef: rbacv1.RoleRef{ Kind: "Role", Name: role, APIGroup: "rbac.authorization.k8s.io", }, } // Create the RoleBinding in the specified namespace _, err := k.Set.RbacV1().RoleBindings(ns).Create(ctx, rb, metav1.CreateOptions{}) if err != nil { return errors.New("Failed to create RoleBinding: " + err.Error()) } return nil } func (k *KubernetesService) DeleteNamespace(ctx context.Context, ns string) error { // Delete the namespace if err := k.Set.CoreV1().Namespaces().Delete(ctx, ns, metav1.DeleteOptions{}); err != nil { return errors.New("Error deleting namespace: " + err.Error()) } fmt.Println("Namespace deleted successfully!") return nil } // Returns the string representing the token generated for the serviceAccount // in the namespace identified by the value `ns` with the name sa-`ns`, which is valid for // `duration` seconds func (k *KubernetesService) GenerateToken(ctx context.Context, ns string, duration int) (string, error) { // Define TokenRequest (valid for 1 hour) d := int64(duration) tokenRequest := &authv1.TokenRequest{ Spec: authv1.TokenRequestSpec{ ExpirationSeconds: &d, // 1 hour validity }, } // Generate the token token, err := k.Set.CoreV1(). ServiceAccounts(ns). CreateToken(ctx, "sa-"+ns, tokenRequest, metav1.CreateOptions{}) if err != nil { return "", errors.New("Failed to create token for ServiceAccount: " + err.Error()) } return token.Status.Token, nil } // Needs refactoring : // - Retrieving the metada (in a method that Unmarshall the part of the json in a metadata object) func (k *KubernetesService) GetTargets(ctx context.Context) ([]string,error){ var listTargets []string resp, err := getCDRapiKube(*k.Set, ctx,"/apis/multicluster.admiralty.io/v1alpha1/targets") if err != nil { return nil,err } fmt.Println(string(resp)) var targetDict map[string]interface{} err = json.Unmarshal(resp,&targetDict) if err != nil { fmt.Println("TODO: handle the error when unmarshalling k8s API response") return nil, err } b, _ := json.MarshalIndent(targetDict,""," ") fmt.Println(string(b)) data := targetDict["items"].([]interface{}) for _, item := range data { var metadata metav1.ObjectMeta item := item.(map[string]interface{}) byteMetada, err := json.Marshal(item["metadata"]) if err != nil { fmt.Println("Error while Marshalling metadata field") return nil,err } err = json.Unmarshal(byteMetada,&metadata) if err != nil { fmt.Println("Error while Unmarshalling metadata field to the library object") return nil,err } listTargets = append(listTargets, metadata.Name) } return listTargets,nil } // Admiralty Target allows a cluster to deploy pods to remote cluster // // The remote cluster must : // // - have declared a Source resource // // - have declared the same namespace as the one where the pods are created in the local cluster // // - have delcared a serviceAccount with sufficient permission to create pods func (k *KubernetesService) CreateAdmiraltyTarget(context context.Context,executionId string)([]byte,error){ exists, err := k.GetKubeconfigSecret(context,executionId) if err != nil { fmt.Println("Error verifying kube-secret before creating target") return nil, err } if exists == nil { fmt.Println("Target needs to be binded to a secret in namespace ",executionId) return nil, nil // Maybe we could create a wrapper for errors and add more info to have } var targetManifest string var tpl bytes.Buffer tmpl, err := template.New("target"). Parse("{\"apiVersion\": \"multicluster.admiralty.io/v1alpha1\", \"kind\": \"Target\", \"metadata\": {\"name\": \"target-{{.ExecutionId}}\"}, \"spec\": { \"kubeconfigSecret\" :{\"name\": \"kube-secret-{{.ExecutionId}}\"}} }") if err != nil { fmt.Println("Error creating the template for the target Manifest") return nil, err } err = tmpl.Execute(&tpl, map[string]string{"ExecutionId":executionId}) targetManifest = tpl.String() resp, err := postCDRapiKube( *k.Set, context, "/apis/multicluster.admiralty.io/v1alpha1/namespaces/"+ executionId +"/targets", []byte(targetManifest), map[string]string{"fieldManager":"kubectl-client-side-apply"}, map[string]string{"fieldValidation":"Strict"}, ) if err != nil { fmt.Println("Error trying to create a Source on remote cluster : ", err , " : ", resp) return nil, err } return resp, nil } // Admiralty Source allows a cluster to receive pods from a remote cluster // // The source must be associated to a serviceAccount, which will execute the pods locally. // This serviceAccount must have sufficient permission to create and patch pods // // This method is temporary to implement the use of Admiralty, but must be edited // to rather contact the oc-datacenter from the remote cluster to create the source // locally and retrieve the token for the serviceAccount func (k *KubernetesService) CreateAdmiraltySource(context context.Context,executionId string) ([]byte, error) { var sourceManifest string var tpl bytes.Buffer tmpl, err := template.New("source"). Parse("{\"apiVersion\": \"multicluster.admiralty.io/v1alpha1\", \"kind\": \"Source\", \"metadata\": {\"name\": \"source-{{.ExecutionId}}\"}, \"spec\": {\"serviceAccountName\": \"sa-{{.ExecutionId}}\"} }") if err != nil { fmt.Println("Error creating the template for the source Manifest") return nil, err } err = tmpl.Execute(&tpl, map[string]string{"ExecutionId":executionId}) sourceManifest = tpl.String() resp, err := postCDRapiKube( *k.Set, context, "/apis/multicluster.admiralty.io/v1alpha1/namespaces/"+ executionId +"/sources", []byte(sourceManifest), map[string]string{"fieldManager":"kubectl-client-side-apply"}, map[string]string{"fieldValidation":"Strict"}, ) // We can add more info to the log with the content of resp if not nil if err != nil { fmt.Println("Error trying to create a Source on remote cluster : ", err , " : ", resp) return nil, err } return resp, nil } // Create a secret from a kubeconfing. Use it to create the secret binded to an Admiralty // target, which must contain the serviceAccount's token value func (k *KubernetesService) CreateKubeconfigSecret(context context.Context,kubeconfig string, executionId string) ([]byte, error) { config, err := base64.StdEncoding.DecodeString(kubeconfig) // config, err := base64.RawStdEncoding.DecodeString(kubeconfig) if err != nil { fmt.Println("Error while encoding kubeconfig") fmt.Println(err) return nil, err } secretManifest := &v1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: "kube-secret-" + executionId, Namespace: executionId, }, Data: map[string][]byte{ "config": config, }, } exists, err := k.GetKubeconfigSecret(context,executionId) if err != nil { fmt.Println("Error verifying if kube secret exists in namespace ", executionId) return nil, err } if exists != nil { fmt.Println("kube-secret already exists in namespace", executionId) fmt.Println("Overriding existing kube-secret with a newer resource") // TODO : implement DeleteKubeConfigSecret(executionID) deleted, err := k.DeleteKubeConfigSecret(executionId) _ = deleted _ = err } resp, err := k.Set.CoreV1(). Secrets(executionId). Create(context,secretManifest,metav1.CreateOptions{}) if err != nil { fmt.Println("Error while trying to contact API to get secret kube-secret-"+executionId) fmt.Println(err) return nil, err } data, err := json.Marshal(resp) if err != nil { fmt.Println("Couldn't marshal resp from : ", data) fmt.Println(err) return nil, err } return data, nil } func (k *KubernetesService) GetKubeconfigSecret(context context.Context,executionId string) ([]byte, error) { resp, err := k.Set.CoreV1(). Secrets(executionId). Get(context,"kube-secret-"+executionId,metav1.GetOptions{}) if err != nil { if(apierrors.IsNotFound(err)){ fmt.Println("kube-secret not found for execution", executionId) return nil, nil } fmt.Println("Error while trying to contact API to get secret kube-secret-"+executionId) fmt.Println(err) return nil, err } data, err := json.Marshal(resp) if err != nil { fmt.Println("Couldn't marshal resp from : ", data) fmt.Println(err) return nil, err } return data, nil } func (k *KubernetesService) DeleteKubeConfigSecret(executionID string) ([]byte, error){ return []byte{}, nil } func getCDRapiKube(client kubernetes.Clientset, ctx context.Context, path string) ([]byte,error) { resp, err := client.RESTClient().Get(). AbsPath(path). DoRaw(ctx) // from https://stackoverflow.com/questions/60764908/how-to-access-kubernetes-crd-using-client-go if err != nil { fmt.Println("Error from k8s API when getting " + path + " : " , err) return nil,err } return resp, nil } func postCDRapiKube(client kubernetes.Clientset, ctx context.Context, path string, body []byte, params ...map[string]string) ([]byte, error){ req := client.RESTClient(). Post(). AbsPath(path). Body(body) for _, param := range params { for k,v := range param { req = req.Param(k,v) } } resp, err := req.DoRaw(ctx) if err != nil { fmt.Println("Error from k8s API when posting " + string(body) + " to " + path + " : " , err) return nil,err } return resp, nil } // Returns the Kubernetes' Node object corresponding to the executionID if it exists on this host // // The node is created when an admiralty Target (on host) can connect to an admiralty Source (on remote) func (k *KubernetesService) GetOneNode(context context.Context,executionID string) (*v1.Node, error) { res, err := k.Set.CoreV1(). Nodes(). List( context, metav1.ListOptions{}, ) if err != nil { fmt.Println("Error getting the list of nodes from k8s API") fmt.Println(err) return nil, err } for _, node := range res.Items { if isNode := strings.Contains(node.Name,"admiralty-"+executionID+"-target-"+executionID+"-"); isNode { return &node, nil } } return nil, nil }