Compare commits

..

25 Commits

Author SHA1 Message Date
pb
b372c10ab0 changed how peerId and namespace are concatenated to name admiralty resources 2025-05-16 11:09:57 +02:00
pb
fd6186c6df updated how we search for nodes 2025-05-15 10:28:55 +02:00
pb
5069b3455a updated how we search for nodes 2025-05-15 10:27:28 +02:00
pb
cb2e4f6028 added the right naming convention to kubeConfigSecret in target 2025-05-15 09:37:35 +02:00
pb
35facf1b74 added :peer to admiralty routes to create peer related resources 2025-05-13 16:33:48 +02:00
pb
24e0137444 shortened how targets are named 2025-05-12 15:00:56 +02:00
pb
ba940bfc80 changed the way kube manifest are applyied 2025-05-12 12:23:38 +02:00
pb
063d57d9e7 updated comments and logs 2025-05-12 12:21:12 +02:00
pb
484c742c31 corrected a typo from a copy/pasted log line 2025-05-06 18:10:53 +02:00
pb
cc3b2a6cfc uncommenting createNamespace method 2025-05-06 18:10:09 +02:00
pb
8e8d0d3e01 added a new parameter to the /admiralty/targets route to specify the peerId of the peer targeted, allowing to name differently peers targeted in a namespace 2025-05-05 16:13:49 +02:00
pb
03f81c66f9 Changed name of the method to create source to be coherent with the one to create target 2025-05-05 15:51:39 +02:00
pb
be721059e5 Merge branch 'main' of https://cloud.o-forge.io/core/oc-datacenter 2025-04-29 11:55:15 +02:00
pb
aa42f5f49c debug some typo 2025-04-11 15:45:28 +02:00
pb
98c54eb080 typo when passing gvr for target 2025-04-11 15:34:43 +02:00
pb
afe442d17f refactored the way we apply Source and Target with dynamic client 2025-04-11 15:24:47 +02:00
pb
46b7713404 corrected Apply target 2025-04-11 12:10:54 +02:00
pb
d5ad32e2e4 [NEED REFACTORING] added DynamicClient constructor to make API calls on CDRs 2025-04-11 12:00:21 +02:00
pb
e4ecb8c1db replaced k8s go-client create with apply for the creation of the Secret 2025-04-10 15:48:08 +02:00
pb
cca59faeab when creating source or target returns a 409, don't return an error. Post should be replaced by Put but not working 2025-04-10 14:22:37 +02:00
pb
2cf8923d95 more logs 2025-04-08 10:31:29 +02:00
pb
47ed1b4562 added the label multicluster-scheduler=enabled when creating namespace 2025-04-08 10:31:29 +02:00
pb
063f47c87b Merge branch 'main' of https://cloud.o-forge.io/core/oc-datacenter into feature/admiralty 2025-04-04 18:03:18 +02:00
pb
4bfb16cba6 added node to the returned data when it was found 2025-04-01 11:46:32 +02:00
pb
b08e6a1e70 corrected the options in bee run 2025-03-14 16:58:37 +01:00
6 changed files with 218 additions and 118 deletions

View File

@ -30,7 +30,7 @@ RUN export CGO_ENABLED=0 && \
COPY . . COPY . .
RUN sed -i '/replace/d' go.mod RUN sed -i '/replace/d' go.mod
RUN if [ ! -f swagger/index.html ]; then timeout 15 bee run --gendoc=true --downdoc=true; fi RUN if [ ! -f swagger/index.html ]; then timeout 15 bee run -gendoc=true -downdoc=true; fi
RUN bee generate routers RUN bee generate routers
RUN bee generate docs RUN bee generate docs
RUN bee pack RUN bee pack

View File

@ -10,10 +10,13 @@ import (
"slices" "slices"
"time" "time"
oclib "cloud.o-forge.io/core/oc-lib"
beego "github.com/beego/beego/v2/server/web" beego "github.com/beego/beego/v2/server/web"
jwt "github.com/golang-jwt/jwt/v5" jwt "github.com/golang-jwt/jwt/v5"
"gopkg.in/yaml.v2" "gopkg.in/yaml.v2"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
) )
type KubeInfo struct { type KubeInfo struct {
@ -117,12 +120,12 @@ func (c *AdmiraltyController) GetOneTarget() {
c.ServeJSON() c.ServeJSON()
} }
// @Title CreateSource // @Title CreateAdmiraltySource
// @Description Create an Admiralty Source on remote cluster // @Description Create an Admiralty Source on remote cluster
// @Param execution path string true "execution id of the workflow" // @Param execution path string true "execution id of the workflow"
// @Success 201 // @Success 201
// @router /source/:execution [post] // @router /source/:execution [post]
func (c *AdmiraltyController) CreateSource() { func (c *AdmiraltyController) CreateAdmiraltySource() {
execution := c.Ctx.Input.Param(":execution") execution := c.Ctx.Input.Param(":execution")
fmt.Println("execution :: ", execution) fmt.Println("execution :: ", execution)
@ -139,6 +142,12 @@ func (c *AdmiraltyController) CreateSource() {
res, err := serv.CreateAdmiraltySource(c.Ctx.Request.Context(),execution) res, err := serv.CreateAdmiraltySource(c.Ctx.Request.Context(),execution)
if err != nil { if err != nil {
if apierrors.IsAlreadyExists(err) {
c.Ctx.Output.SetStatus(409)
c.Data["json"] = map[string]string{"info" : "A source already exists for this namespace : " + execution}
c.ServeJSON()
return
}
// change code to 500 // change code to 500
c.Ctx.Output.SetStatus(500) c.Ctx.Output.SetStatus(500)
c.Data["json"] = map[string]string{"error": err.Error()} c.Data["json"] = map[string]string{"error": err.Error()}
@ -159,13 +168,21 @@ func (c *AdmiraltyController) CreateSource() {
// @Title CreateAdmiraltyTarget // @Title CreateAdmiraltyTarget
// @Description Create an Admiralty Target in the namespace associated to the executionID // @Description Create an Admiralty Target in the namespace associated to the executionID
// @Param execution path string true "execution id of the workflow" // @Param execution path string true "execution id of the workflow"
// @Param peer path string true "peerId of the peer the target points to"
// @Success 201 // @Success 201
// @router /target/:execution [post] // @router /target/:execution/:peer [post]
func (c *AdmiraltyController) CreateAdmiraltyTarget(){ func (c *AdmiraltyController) CreateAdmiraltyTarget(){
var data map[string]interface{} var data map[string]interface{}
execution := c.Ctx.Input.Param(":execution") execution := c.Ctx.Input.Param(":execution")
peerId := c.Ctx.Input.Param(":peer")
if execution == "" || peerId == "" {
c.Ctx.Output.SetStatus(400)
c.Data["json"] = map[string]string{"error" : "parameters can be empty " + "execution: " + execution + " peer: " + peerId}
c.ServeJSON()
return
}
serv, err := infrastructure.NewService() serv, err := infrastructure.NewService()
if err != nil { if err != nil {
@ -176,7 +193,7 @@ func (c *AdmiraltyController) CreateAdmiraltyTarget(){
return return
} }
resp, err := serv.CreateAdmiraltyTarget(c.Ctx.Request.Context(),execution) resp, err := serv.CreateAdmiraltyTarget(c.Ctx.Request.Context(),execution, peerId)
if err != nil { if err != nil {
// change code to 500 // change code to 500
c.Ctx.Output.SetStatus(500) c.Ctx.Output.SetStatus(500)
@ -210,14 +227,16 @@ func (c *AdmiraltyController) CreateAdmiraltyTarget(){
// @Title GetKubeSecret // @Title GetKubeSecret
// @Description Retrieve the secret created from a Kubeconfig that will be associated to an Admiralty Target // @Description Retrieve the secret created from a Kubeconfig that will be associated to an Admiralty Target
// @Param execution path string true "execution id of the workflow" // @Param execution path string true "execution id of the workflow"
// @Param peer path string true "UUID of the peer to which the resource is linked"
// @Success 200 // @Success 200
// @router /secret/:execution [get] // @router /secret/:execution/:peer [get]
func(c *AdmiraltyController) GetKubeSecret() { func(c *AdmiraltyController) GetKubeSecret() {
var data map[string]interface{} var data map[string]interface{}
execution := c.Ctx.Input.Param(":execution") execution := c.Ctx.Input.Param(":execution")
peerId := c.Ctx.Input.Param(":peer")
serv, err := infrastructure.NewService() serv, err := infrastructure.NewService()
if err != nil { if err != nil {
@ -228,7 +247,7 @@ func(c *AdmiraltyController) GetKubeSecret() {
return return
} }
resp, err := serv.GetKubeconfigSecret(c.Ctx.Request.Context(),execution) resp, err := serv.GetKubeconfigSecret(c.Ctx.Request.Context(),execution, peerId)
if err != nil { if err != nil {
// change code to 500 // change code to 500
c.Ctx.Output.SetStatus(500) c.Ctx.Output.SetStatus(500)
@ -260,9 +279,10 @@ func(c *AdmiraltyController) GetKubeSecret() {
// @Description Creat a secret from a Kubeconfig that will be associated to an Admiralty Target // @Description Creat a secret from a Kubeconfig that will be associated to an Admiralty Target
// @Param execution path string true "execution id of the workflow" // @Param execution path string true "execution id of the workflow"
// @Param peer path string true "UUID of the peer to which the resource is linked"
// @Param kubeconfig body controllers.RemoteKubeconfig true "Kubeconfig to use when creating secret" // @Param kubeconfig body controllers.RemoteKubeconfig true "Kubeconfig to use when creating secret"
// @Success 201 // @Success 201
// @router /secret/:execution [post] // @router /secret/:execution/:peer [post]
func (c *AdmiraltyController) CreateKubeSecret() { func (c *AdmiraltyController) CreateKubeSecret() {
var kubeconfig RemoteKubeconfig var kubeconfig RemoteKubeconfig
var respData map[string]interface{} var respData map[string]interface{}
@ -279,9 +299,8 @@ func (c *AdmiraltyController) CreateKubeSecret() {
return return
} }
execution := c.Ctx.Input.Param(":execution") execution := c.Ctx.Input.Param(":execution")
peerId := c.Ctx.Input.Param(":peer")
serv, err := infrastructure.NewService() serv, err := infrastructure.NewService()
if err != nil { if err != nil {
@ -292,7 +311,7 @@ func (c *AdmiraltyController) CreateKubeSecret() {
return return
} }
resp, err := serv.CreateKubeconfigSecret(c.Ctx.Request.Context(),*kubeconfig.Data,execution) resp, err := serv.CreateKubeconfigSecret(c.Ctx.Request.Context(),*kubeconfig.Data,execution, peerId)
if err != nil { if err != nil {
// change code to 500 // change code to 500
c.Ctx.Output.SetStatus(500) c.Ctx.Output.SetStatus(500)
@ -311,13 +330,13 @@ func (c *AdmiraltyController) CreateKubeSecret() {
// @name GetAdmiraltyNodes // @name GetAdmiraltyNodes
// @description Allows user to test if an admiralty connection has already been established : Target and valid Secret set up on the local host and Source set up on remote host // @description Allows user to test if an admiralty connection has already been established : Target and valid Secret set up on the local host and Source set up on remote host
// @Param execution path string true "execution id of the workflow" // @Param execution path string true "execution id of the workflow"
// @Param peer path string true "UUID of the peer to which the resource is linked"
// @Success 200 // @Success 200
// @router /node/:execution [get] // @router /node/:execution/:peer [get]
func (c *AdmiraltyController) GetNodeReady(){ func (c *AdmiraltyController) GetNodeReady(){
var secret v1.Secret var secret v1.Secret
execution := c.Ctx.Input.Param(":execution") execution := c.Ctx.Input.Param(":execution")
peerId := c.Ctx.Input.Param(":peer")
serv, err := infrastructure.NewService() serv, err := infrastructure.NewService()
if err != nil { if err != nil {
@ -328,7 +347,7 @@ func (c *AdmiraltyController) GetNodeReady(){
return return
} }
node, err := serv.GetOneNode(c.Ctx.Request.Context(),execution) node, err := serv.GetOneNode(c.Ctx.Request.Context(),execution, peerId)
if err != nil { if err != nil {
// change code to 500 // change code to 500
c.Ctx.Output.SetStatus(500) c.Ctx.Output.SetStatus(500)
@ -339,13 +358,15 @@ func (c *AdmiraltyController) GetNodeReady(){
if node == nil { if node == nil {
c.Ctx.Output.SetStatus(404) c.Ctx.Output.SetStatus(404)
c.Data["json"] = map[string]string{ c.Data["json"] = map[string]string{
"error" : "the node for " + execution + " can't be found, make sure both target and source resources are set up on local and remote hosts", "node" : "the node for " + execution + " can't be found, make sure both target and source resources are set up on local and remote hosts",
} }
c.ServeJSON() c.ServeJSON()
return return
} }
resp, err := serv.GetKubeconfigSecret(c.Ctx.Request.Context(),execution)
resp, err := serv.GetKubeconfigSecret(c.Ctx.Request.Context(),execution, peerId)
if err != nil { if err != nil {
// change code to 500 // change code to 500
c.Ctx.Output.SetStatus(500) c.Ctx.Output.SetStatus(500)
@ -359,7 +380,6 @@ func (c *AdmiraltyController) GetNodeReady(){
c.ServeJSON() c.ServeJSON()
return return
} }
// Extract JWT token RS265 encoded // Extract JWT token RS265 encoded
var editedKubeconfig map[string]interface{} var editedKubeconfig map[string]interface{}
@ -393,14 +413,15 @@ func (c *AdmiraltyController) GetNodeReady(){
} }
if *isExpired { if *isExpired {
c.Data["json"] = map[string]string{ c.Data["json"] = map[string]interface{}{
"token" : "token in the secret is expired and must be regenerated", "token" : "token in the secret is expired and must be regenerated",
"node": node,
} }
c.Ctx.Output.SetStatus(410) c.Ctx.Output.SetStatus(410)
c.ServeJSON() c.ServeJSON()
} }
c.Data["json"] = map[string]bool{"ok": true} c.Data["json"] = map[string]interface{}{"node": node,"token": true}
c.ServeJSON() c.ServeJSON()
} }
@ -426,6 +447,8 @@ func retrieveTokenFromKonfig(editedKubeconfig map[string]interface{}) (string,er
} }
func isTokenExpired(token string) (*bool, error){ func isTokenExpired(token string) (*bool, error){
logger := oclib.GetLogger()
t, _, err := new(jwt.Parser).ParseUnverified(token, jwt.MapClaims{}) t, _, err := new(jwt.Parser).ParseUnverified(token, jwt.MapClaims{})
if err != nil { if err != nil {
fmt.Println("couldn't decode token") fmt.Println("couldn't decode token")
@ -437,7 +460,11 @@ func isTokenExpired(token string) (*bool, error){
fmt.Println("Error while checking token's expiration time") fmt.Println("Error while checking token's expiration time")
return nil, err return nil, err
} }
fmt.Println("Expiration date : " + expiration.UTC().Format("2006-01-02T15:04:05"))
logger.Debug().Msg("Expiration date : " + expiration.UTC().Format("2006-01-02T15:04:05"))
logger.Debug().Msg(fmt.Sprint("Now : ", time.Now().Unix()))
logger.Debug().Msg(fmt.Sprint("Token : ", expiration.Unix()))
expired := expiration.Unix() < time.Now().Unix() expired := expiration.Unix() < time.Now().Unix()

View File

@ -247,9 +247,9 @@ func (o *BookingController) Post() {
return return
} }
/*if err := o.createNamespace(resp.ExecutionsID); err != nil { if err := o.createNamespace(resp.ExecutionsID); err != nil {
fmt.Println(err.Error()) fmt.Println(err.Error())
}*/ }
o.Data["json"] = map[string]interface{}{ o.Data["json"] = map[string]interface{}{
"data": []interface{}{b}, "data": []interface{}{b},

View File

@ -16,11 +16,11 @@ type Infrastructure interface {
CreateRoleBinding(ctx context.Context, ns string, roleBinding string, role string) error CreateRoleBinding(ctx context.Context, ns string, roleBinding string, role string) error
CreateRole(ctx context.Context, ns string, role string, groups [][]string, resources [][]string, verbs [][]string) error CreateRole(ctx context.Context, ns string, role string, groups [][]string, resources [][]string, verbs [][]string) error
GetTargets(ctx context.Context) ([]string,error) GetTargets(ctx context.Context) ([]string,error)
CreateAdmiraltySource(context context.Context,executionId string) ([]byte, error) CreateAdmiraltySource(context context.Context, executionId string) ([]byte, error)
CreateKubeconfigSecret(context context.Context,kubeconfig string, executionId string) ([]byte, error) CreateKubeconfigSecret(context context.Context, kubeconfig string, executionId string, peerId string) ([]byte, error)
GetKubeconfigSecret(context context.Context,executionId string) ([]byte, error) GetKubeconfigSecret(context context.Context, executionId string, peerId string) ([]byte, error)
CreateAdmiraltyTarget(context context.Context,executionId string)([]byte,error) CreateAdmiraltyTarget(context context.Context, executionId string, peerId string)([]byte,error)
GetOneNode(context context.Context,executionID string) (*v1.Node, error) GetOneNode(context context.Context, executionID string, peerId string) (*v1.Node, error)
} }
var _service = map[string]func() (Infrastructure, error){ var _service = map[string]func() (Infrastructure, error){

View File

@ -1,13 +1,11 @@
package infrastructure package infrastructure
import ( import (
"bytes"
"context" "context"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"html/template"
"oc-datacenter/conf" "oc-datacenter/conf"
"strings" "strings"
@ -16,12 +14,40 @@ import (
rbacv1 "k8s.io/api/rbac/v1" rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
apply "k8s.io/client-go/applyconfigurations/core/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
) )
var gvrSources = schema.GroupVersionResource{Group: "multicluster.admiralty.io", Version: "v1alpha1", Resource: "sources"}
var gvrTargets = schema.GroupVersionResource{Group: "multicluster.admiralty.io", Version: "v1alpha1", Resource: "targets"}
type KubernetesService struct { type KubernetesService struct {
Set *kubernetes.Clientset Set *kubernetes.Clientset
}
func NewDynamicClient() (*dynamic.DynamicClient, error) {
config := &rest.Config{
Host: conf.GetConfig().KubeHost + ":" + conf.GetConfig().KubePort,
TLSClientConfig: rest.TLSClientConfig{
CAData: []byte(conf.GetConfig().KubeCA),
CertData: []byte(conf.GetConfig().KubeCert),
KeyData: []byte(conf.GetConfig().KubeData),
},
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, errors.New("Error creating Dynamic client: " + err.Error())
}
if dynamicClient == nil {
return nil, errors.New("Error creating Dynamic client: dynamicClient is nil")
}
return dynamicClient, nil
} }
func NewKubernetesService() (Infrastructure, error) { func NewKubernetesService() (Infrastructure, error) {
@ -33,6 +59,7 @@ func NewKubernetesService() (Infrastructure, error) {
KeyData: []byte(conf.GetConfig().KubeData), KeyData: []byte(conf.GetConfig().KubeData),
}, },
} }
// Create clientset // Create clientset
clientset, err := kubernetes.NewForConfig(config) clientset, err := kubernetes.NewForConfig(config)
fmt.Println("NewForConfig", clientset, err) fmt.Println("NewForConfig", clientset, err)
@ -43,6 +70,7 @@ func NewKubernetesService() (Infrastructure, error) {
return nil, errors.New("Error creating Kubernetes client: clientset is nil") return nil, errors.New("Error creating Kubernetes client: clientset is nil")
} }
return &KubernetesService{ return &KubernetesService{
Set: clientset, Set: clientset,
}, nil }, nil
@ -82,6 +110,9 @@ func (k *KubernetesService) CreateNamespace(ctx context.Context, ns string) erro
namespace := &v1.Namespace{ namespace := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: ns, Name: ns,
Labels: map[string]string{
"multicluster-scheduler":"enabled",
},
}, },
} }
// Create the namespace // Create the namespace
@ -249,8 +280,8 @@ func (k *KubernetesService) GetTargets(ctx context.Context) ([]string, error) {
// - have declared the same namespace as the one where the pods are created in the local cluster // - have declared the same namespace as the one where the pods are created in the local cluster
// //
// - have delcared a serviceAccount with sufficient permission to create pods // - have delcared a serviceAccount with sufficient permission to create pods
func (k *KubernetesService) CreateAdmiraltyTarget(context context.Context, executionId string) ([]byte, error) { func (k *KubernetesService) CreateAdmiraltyTarget(context context.Context, executionId string, peerId string) ([]byte, error) {
exists, err := k.GetKubeconfigSecret(context, executionId) exists, err := k.GetKubeconfigSecret(context, executionId, peerId)
if err != nil { if err != nil {
fmt.Println("Error verifying kube-secret before creating target") fmt.Println("Error verifying kube-secret before creating target")
return nil, err return nil, err
@ -260,33 +291,32 @@ func (k *KubernetesService) CreateAdmiraltyTarget(context context.Context, execu
fmt.Println("Target needs to be binded to a secret in namespace ", executionId) fmt.Println("Target needs to be binded to a secret in namespace ", executionId)
return nil, nil // Maybe we could create a wrapper for errors and add more info to have return nil, nil // Maybe we could create a wrapper for errors and add more info to have
} }
targetName := "target-" + getConcatenatedName(peerId,executionId)
target := map[string]interface{}{
"apiVersion": "multicluster.admiralty.io/v1alpha1",
"kind": "Target",
"metadata": map[string]interface{}{
"name": targetName,
"namespace": executionId,
"labels": map[string]interface{}{
"peer": peerId,
},
},
"spec": map[string]interface{}{
"kubeconfigSecret": map[string]string{
"name" : "kube-secret-"+ getConcatenatedName(peerId, executionId),
},
},
}
var targetManifest string res, err := dynamicClientApply(executionId, targetName, gvrTargets, context, target)
var tpl bytes.Buffer
tmpl, err := template.New("target").
Parse("{\"apiVersion\": \"multicluster.admiralty.io/v1alpha1\", \"kind\": \"Target\", \"metadata\": {\"name\": \"target-{{.ExecutionId}}\"}, \"spec\": { \"kubeconfigSecret\" :{\"name\": \"kube-secret-{{.ExecutionId}}\"}} }")
if err != nil { if err != nil {
fmt.Println("Error creating the template for the target Manifest") return nil, errors.New("Error when trying to apply Target definition :" + err.Error())
return nil, err
} }
err = tmpl.Execute(&tpl, map[string]string{"ExecutionId": executionId}) return res, nil
targetManifest = tpl.String()
resp, err := postCDRapiKube(
*k.Set,
context,
"/apis/multicluster.admiralty.io/v1alpha1/namespaces/"+executionId+"/targets",
[]byte(targetManifest),
map[string]string{"fieldManager": "kubectl-client-side-apply"},
map[string]string{"fieldValidation": "Strict"},
)
if err != nil {
fmt.Println("Error trying to create a Source on remote cluster : ", err, " : ", resp)
return nil, err
}
return resp, nil
} }
// Admiralty Source allows a cluster to receive pods from a remote cluster // Admiralty Source allows a cluster to receive pods from a remote cluster
@ -297,40 +327,32 @@ func (k *KubernetesService) CreateAdmiraltyTarget(context context.Context, execu
// This method is temporary to implement the use of Admiralty, but must be edited // This method is temporary to implement the use of Admiralty, but must be edited
// to rather contact the oc-datacenter from the remote cluster to create the source // to rather contact the oc-datacenter from the remote cluster to create the source
// locally and retrieve the token for the serviceAccount // locally and retrieve the token for the serviceAccount
func (k *KubernetesService) CreateAdmiraltySource(context context.Context, executionId string) ([]byte, error) { func (k *KubernetesService) CreateAdmiraltySource(context context.Context,executionId string) ([]byte, error) {
var sourceManifest string
var tpl bytes.Buffer source := map[string]interface{}{
tmpl, err := template.New("source"). "apiVersion": "multicluster.admiralty.io/v1alpha1",
Parse("{\"apiVersion\": \"multicluster.admiralty.io/v1alpha1\", \"kind\": \"Source\", \"metadata\": {\"name\": \"source-{{.ExecutionId}}\"}, \"spec\": {\"serviceAccountName\": \"sa-{{.ExecutionId}}\"} }") "kind": "Source",
"metadata": map[string]interface{}{
"name": "source-"+executionId,
"namespace": executionId,
},
"spec": map[string]interface{}{
"serviceAccountName": "sa-"+executionId,
},
}
res, err := dynamicClientApply(executionId, "source-" + executionId,gvrSources, context, source)
if err != nil { if err != nil {
fmt.Println("Error creating the template for the source Manifest") return nil, errors.New("Error when trying to apply Source definition :" + err.Error())
return nil, err
} }
err = tmpl.Execute(&tpl, map[string]string{"ExecutionId": executionId}) return res, nil
sourceManifest = tpl.String()
resp, err := postCDRapiKube(
*k.Set,
context,
"/apis/multicluster.admiralty.io/v1alpha1/namespaces/"+executionId+"/sources",
[]byte(sourceManifest),
map[string]string{"fieldManager": "kubectl-client-side-apply"},
map[string]string{"fieldValidation": "Strict"},
)
// We can add more info to the log with the content of resp if not nil
if err != nil {
fmt.Println("Error trying to create a Source on remote cluster : ", err, " : ", resp)
return nil, err
}
return resp, nil
} }
// Create a secret from a kubeconfing. Use it to create the secret binded to an Admiralty // Create a secret from a kubeconfing. Use it to create the secret binded to an Admiralty
// target, which must contain the serviceAccount's token value // target, which must contain the serviceAccount's token value
func (k *KubernetesService) CreateKubeconfigSecret(context context.Context, kubeconfig string, executionId string) ([]byte, error) { func (k *KubernetesService) CreateKubeconfigSecret(context context.Context, kubeconfig string, executionId string, peerId string) ([]byte, error) {
config, err := base64.StdEncoding.DecodeString(kubeconfig) config, err := base64.StdEncoding.DecodeString(kubeconfig)
// config, err := base64.RawStdEncoding.DecodeString(kubeconfig) // config, err := base64.RawStdEncoding.DecodeString(kubeconfig)
if err != nil { if err != nil {
@ -339,33 +361,38 @@ func (k *KubernetesService) CreateKubeconfigSecret(context context.Context, kube
return nil, err return nil, err
} }
secretManifest := &v1.Secret{ secretApplyConfig := apply.Secret("kube-secret-" + getConcatenatedName(peerId, executionId),
ObjectMeta: metav1.ObjectMeta{ executionId).
Name: "kube-secret-" + executionId, WithData(map[string][]byte{
Namespace: executionId, "config": config,
}, },
Data: map[string][]byte{ )
"config": config,
},
}
// exists, err := k.GetKubeconfigSecret(context,executionId)
// if err != nil {
// fmt.Println("Error verifying if kube secret exists in namespace ", executionId)
// return nil, err
// }
// if exists != nil {
// fmt.Println("kube-secret already exists in namespace", executionId)
// fmt.Println("Overriding existing kube-secret with a newer resource")
// // TODO : implement DeleteKubeConfigSecret(executionID)
// deleted, err := k.DeleteKubeConfigSecret(executionId)
// _ = deleted
// _ = err
// }
exists, err := k.GetKubeconfigSecret(context, executionId)
if err != nil {
fmt.Println("Error verifying if kube secret exists in namespace ", executionId)
return nil, err
}
if exists != nil {
fmt.Println("kube-secret already exists in namespace", executionId)
fmt.Println("Overriding existing kube-secret with a newer resource")
// TODO : implement DeleteKubeConfigSecret(executionID)
deleted, err := k.DeleteKubeConfigSecret(executionId)
_ = deleted
_ = err
}
resp, err := k.Set.CoreV1(). resp, err := k.Set.CoreV1().
Secrets(executionId). Secrets(executionId).
Create(context, secretManifest, metav1.CreateOptions{}) Apply(context,
secretApplyConfig,
metav1.ApplyOptions{
FieldManager: "admiralty-manager",
})
if err != nil { if err != nil {
fmt.Println("Error while trying to contact API to get secret kube-secret-" + executionId) fmt.Println("Error while trying to contact API to get secret kube-secret-" + executionId)
fmt.Println(err) fmt.Println(err)
@ -381,10 +408,10 @@ func (k *KubernetesService) CreateKubeconfigSecret(context context.Context, kube
return data, nil return data, nil
} }
func (k *KubernetesService) GetKubeconfigSecret(context context.Context, executionId string) ([]byte, error) { func (k *KubernetesService) GetKubeconfigSecret(context context.Context, executionId string, peerId string) ([]byte, error) {
resp, err := k.Set.CoreV1(). resp, err := k.Set.CoreV1().
Secrets(executionId). Secrets(executionId).
Get(context, "kube-secret-"+executionId, metav1.GetOptions{}) Get(context, "kube-secret-"+ getConcatenatedName(peerId, executionId), metav1.GetOptions{})
if err != nil { if err != nil {
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
@ -425,7 +452,40 @@ func getCDRapiKube(client kubernetes.Clientset, ctx context.Context, path string
return resp, nil return resp, nil
} }
func postCDRapiKube(client kubernetes.Clientset, ctx context.Context, path string, body []byte, params ...map[string]string) ([]byte, error) { func dynamicClientApply(executionId string, resourceName string, resourceDefinition schema.GroupVersionResource, ctx context.Context, object map[string]interface{}) ([]byte, error) {
cli, err := NewDynamicClient()
if err != nil {
return nil, errors.New("Could not retrieve dynamic client when creating Admiralty Source : " + err.Error())
}
res, err := cli.Resource(resourceDefinition).
Namespace(executionId).
Apply(ctx,
resourceName,
&unstructured.Unstructured{Object: object},
metav1.ApplyOptions{
FieldManager: "kubectl-client-side-apply",
},
)
if err != nil {
o, err := json.Marshal(object)
fmt.Println("Error from k8s API when applying " + fmt.Sprint(string(o)) + " to " + gvrSources.String() + " : " , err)
return nil,err
}
// We can add more info to the log with the content of resp if not nil
resByte, err := json.Marshal(res)
if err != nil {
// fmt.Println("Error trying to create a Source on remote cluster : ", err , " : ", res)
return nil, err
}
return resByte, nil
}
func putCDRapiKube(client kubernetes.Clientset, ctx context.Context, path string, body []byte, params ...map[string]string) ([]byte, error){
req := client.RESTClient(). req := client.RESTClient().
Post(). Post().
AbsPath(path). AbsPath(path).
@ -450,7 +510,9 @@ func postCDRapiKube(client kubernetes.Clientset, ctx context.Context, path strin
// Returns the Kubernetes' Node object corresponding to the executionID if it exists on this host // Returns the Kubernetes' Node object corresponding to the executionID if it exists on this host
// //
// The node is created when an admiralty Target (on host) can connect to an admiralty Source (on remote) // The node is created when an admiralty Target (on host) can connect to an admiralty Source (on remote)
func (k *KubernetesService) GetOneNode(context context.Context, executionID string) (*v1.Node, error) { func (k *KubernetesService) GetOneNode(context context.Context, executionID string, peerId string) (*v1.Node, error) {
concatenatedName := getConcatenatedName(peerId, executionID)
res, err := k.Set.CoreV1(). res, err := k.Set.CoreV1().
Nodes(). Nodes().
List( List(
@ -464,10 +526,21 @@ func (k *KubernetesService) GetOneNode(context context.Context, executionID stri
} }
for _, node := range res.Items { for _, node := range res.Items {
if isNode := strings.Contains(node.Name, "admiralty-"+executionID+"-target-"+executionID+"-"); isNode { if isNode := strings.Contains(node.Name, "admiralty-"+ executionID +"-target-"+ concatenatedName + "-"); isNode {
return &node, nil return &node, nil
} }
} }
return nil, nil return nil, nil
} }
// Returns a concatenation of the peerId and namespace in order for
// kubernetes ressources to have a unique name, under 63 characters
// and yet identify which peer they are created for
func getConcatenatedName(peerId string, namespace string) string {
s := strings.Split(namespace, "-")[:2]
n := s[0] + "-" + s[1]
return peerId + "-" + n
}

View File

@ -19,7 +19,7 @@ func init() {
beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"] = append(beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"], beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"] = append(beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"],
beego.ControllerComments{ beego.ControllerComments{
Method: "GetNodeReady", Method: "GetNodeReady",
Router: `/node/:execution`, Router: `/node/:execution/:peer`,
AllowHTTPMethods: []string{"get"}, AllowHTTPMethods: []string{"get"},
MethodParams: param.Make(), MethodParams: param.Make(),
Filters: nil, Filters: nil,
@ -28,7 +28,7 @@ func init() {
beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"] = append(beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"], beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"] = append(beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"],
beego.ControllerComments{ beego.ControllerComments{
Method: "GetKubeSecret", Method: "GetKubeSecret",
Router: `/secret/:execution`, Router: `/secret/:execution/:peer`,
AllowHTTPMethods: []string{"get"}, AllowHTTPMethods: []string{"get"},
MethodParams: param.Make(), MethodParams: param.Make(),
Filters: nil, Filters: nil,
@ -37,7 +37,7 @@ func init() {
beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"] = append(beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"], beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"] = append(beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"],
beego.ControllerComments{ beego.ControllerComments{
Method: "CreateKubeSecret", Method: "CreateKubeSecret",
Router: `/secret/:execution`, Router: `/secret/:execution/:peer`,
AllowHTTPMethods: []string{"post"}, AllowHTTPMethods: []string{"post"},
MethodParams: param.Make(), MethodParams: param.Make(),
Filters: nil, Filters: nil,
@ -45,7 +45,7 @@ func init() {
beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"] = append(beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"], beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"] = append(beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"],
beego.ControllerComments{ beego.ControllerComments{
Method: "CreateSource", Method: "CreateAdmiraltySource",
Router: `/source/:execution`, Router: `/source/:execution`,
AllowHTTPMethods: []string{"post"}, AllowHTTPMethods: []string{"post"},
MethodParams: param.Make(), MethodParams: param.Make(),
@ -55,7 +55,7 @@ func init() {
beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"] = append(beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"], beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"] = append(beego.GlobalControllerRouter["oc-datacenter/controllers:AdmiraltyController"],
beego.ControllerComments{ beego.ControllerComments{
Method: "CreateAdmiraltyTarget", Method: "CreateAdmiraltyTarget",
Router: `/target/:execution`, Router: `/target/:execution/:peer`,
AllowHTTPMethods: []string{"post"}, AllowHTTPMethods: []string{"post"},
MethodParams: param.Make(), MethodParams: param.Make(),
Filters: nil, Filters: nil,