Compare commits
	
		
			5 Commits
		
	
	
		
			feature/mu
			...
			services_d
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 2f9b2fb464 | |||
| d1dab992cf | |||
| 52278609bd | |||
| ed48fc0a55 | |||
| 03ad8c084d | 
							
								
								
									
										18
									
								
								.vscode/launch.json
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										18
									
								
								.vscode/launch.json
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1,18 @@ | ||||
| { | ||||
|     // Use IntelliSense to learn about possible attributes. | ||||
|     // Hover to view descriptions of existing attributes. | ||||
|     // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 | ||||
|     "version": "0.2.0", | ||||
|     "configurations": [ | ||||
|         { | ||||
|             "name": "Launch Package", | ||||
|             "type": "go", | ||||
|             "request": "launch", | ||||
|             "mode": "auto", | ||||
|             "program": "${fileDirname}", | ||||
|             "args": ["-e","e3b7772e-dc9f-4bc1-9f5d-533e23e6cd57" ,"-u","http://127.0.0.1:3100","-m" ,"mongodb://127.0.0.1:27017","-d","DC_myDC"], | ||||
|             "env": {"OCMONITOR_LOKIURL":"http://127.0.0.1:3100","OCMONITOR_WORKFLOW":"8d0f1814-b5ca-436c-ba3f-116d99198fd2","KUBERNETES_SERVICE_HOST":"","OCMONITOR_MONGOURL":"mongodb://127.0.0.1:27017","OCMONITOR_DATABASE":"DC_myDC","test_service":"true"} | ||||
|         } | ||||
|     ] | ||||
| } | ||||
|  | ||||
							
								
								
									
										70
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										70
									
								
								README.md
									
									
									
									
									
								
							| @@ -47,6 +47,76 @@ In rules add a new entry : | ||||
|  | ||||
| This command **must return "yes"** | ||||
|  | ||||
| ## Allow services to be joined with reverse proxy | ||||
|  | ||||
| Since the development has been realised in a K3S environment, we will use the lightweight solution provided by **traefik**.  | ||||
|  | ||||
| We need to install **metallb** to expose our cluster to the exterior and allow packets to reach traefik. | ||||
|  | ||||
| ### Deploy traefik and metallb | ||||
|  | ||||
| - Make sure that helm is installed, else visit : https://helm.sh/docs/intro/install/  | ||||
|  | ||||
| - Add the repositories for traefik and metallb | ||||
| > helm repo add metallb https://metallb.github.io/metallb  | ||||
| > helm repo add traefik https://helm.traefik.io/traefik | ||||
|  | ||||
| >helm repo update | ||||
|  | ||||
| - Create the namespaces for each | ||||
| > kubectl create ns traefik-ingress | ||||
| > kubectl create ns metallb-system  | ||||
|  | ||||
| - Configure the deployment  | ||||
|  | ||||
| ``` | ||||
| cat > traefik-values.yaml <<EOF | ||||
| globalArguments: | ||||
| deployment: | ||||
|   kind: DaemonSet | ||||
| providers: | ||||
|   kubernetesCRD: | ||||
|     enabled: true | ||||
| service: | ||||
|   type: LoadBalancer | ||||
| ingressRoute: | ||||
|   dashboard: | ||||
|     enabled: false | ||||
| EOF | ||||
| ``` | ||||
|  | ||||
| - Launch the installs | ||||
| > helm upgrade --install metallb metallb/metallb --namespace metallb-system | ||||
|  | ||||
| > helm install --namespace=traefik-ingress traefik traefik/traefik --values=./traefik-values.yaml | ||||
|  | ||||
| ### Configure metallb | ||||
|  | ||||
| ``` | ||||
| cat << 'EOF' | kubectl apply -f - | ||||
| apiVersion: metallb.io/v1beta1 | ||||
| kind: IPAddressPool | ||||
| metadata: | ||||
|   name: default-pool | ||||
|   namespace: metallb-system | ||||
| spec: | ||||
|   addresses: | ||||
|   - 192.168.0.200-192.168.0.250 | ||||
| --- | ||||
| apiVersion: metallb.io/v1beta1 | ||||
| kind: L2Advertisement | ||||
| metadata: | ||||
|   name: default | ||||
|   namespace: metallb-system | ||||
| spec: | ||||
|   ipAddressPools: | ||||
|   - default-pool | ||||
| EOF | ||||
| ``` | ||||
|  | ||||
| - Check that the services created in traefik-ingress have an external IP | ||||
|  | ||||
| > kubectl get service -n traefik-ingress -o wide | ||||
|  | ||||
| ## TODO | ||||
|  | ||||
|   | ||||
							
								
								
									
										30
									
								
								demo_nginx/nginx.conf
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										30
									
								
								demo_nginx/nginx.conf
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,30 @@ | ||||
| # nginx.conf | ||||
| user  nginx; | ||||
| worker_processes  auto; | ||||
| error_log  /var/log/nginx/error.log warn; | ||||
| pid        /var/run/nginx.pid; | ||||
| events { | ||||
|   worker_connections  1024; | ||||
| } | ||||
| http { | ||||
|   include       /etc/nginx/mime.types; | ||||
|   default_type  application/octet-stream; | ||||
|   log_format  main  '$remote_addr - $remote_user [$time_local] "$request" ' | ||||
|   '$status $body_bytes_sent "$http_referer" ' | ||||
|   '"$http_user_agent" "$http_x_forwarded_for"'; | ||||
|   access_log  /var/log/nginx/access.log  main; | ||||
|   sendfile        on; | ||||
|   #tcp_nopush     on; | ||||
|    | ||||
|   keepalive_timeout  65; | ||||
|   #gzip  on; | ||||
|   #include /etc/nginx/conf.d/*.conf; | ||||
| server { | ||||
|   listen 80; | ||||
|   location / { | ||||
|     root   /usr/share/nginx/html; | ||||
|     index  index.html index.htm; | ||||
|     try_files $uri $uri/ /index.html; | ||||
|   } | ||||
| } | ||||
| } | ||||
							
								
								
									
										2
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								main.go
									
									
									
									
									
								
							| @@ -46,8 +46,6 @@ const localConfigFile = "./conf/local_ocmonitord_conf.json" | ||||
|  | ||||
| func main() { | ||||
|  | ||||
| 	os.Setenv("test_service","true") 	// Only for service demo, delete before merging on main | ||||
|  | ||||
| 	monitorLocal = false | ||||
| 	// Test if monitor is launched outside (with parameters) or in a k8s environment (env variables sets) | ||||
| 	if os.Getenv("KUBERNETES_SERVICE_HOST") == "" { | ||||
|   | ||||
							
								
								
									
										101
									
								
								models/ingress.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										101
									
								
								models/ingress.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,101 @@ | ||||
| package models | ||||
|  | ||||
| import "strconv" | ||||
|  | ||||
| // apiVersion: networking.k8s.io/v1 | ||||
| // kind: Ingress | ||||
| // metadata: | ||||
| //   name: example-ingress | ||||
| //   namespace: argo | ||||
| //   annotations: | ||||
| //     traefik.ingress.kubernetes.io/router.entrypoints: web  # Utilisation de l'entrypoint HTTP standard | ||||
| // spec: | ||||
| //   rules: | ||||
| //   - http: | ||||
| //       paths: | ||||
| //       - path: /dtf | ||||
| //         pathType: Prefix | ||||
| //         backend: | ||||
| //           service: | ||||
| //             name: workflow-service-qtjk2 | ||||
| //             port: | ||||
| //               number: 80 | ||||
|  | ||||
| var ingress_manifest = &Manifest{ | ||||
| 	ApiVersion: "networking.k8s.io/v1", | ||||
| 	Kind: "Ingress", | ||||
| 	Metadata: Metadata{ | ||||
| 		GenerateName: "ingress-argo-", | ||||
| 	}, | ||||
| } | ||||
|  | ||||
| type Ingress struct { | ||||
| 	ApiVersion string      `yaml:"apiVersion,omitempty"` | ||||
| 	Kind       string      `yaml:"kind,omitempty"` | ||||
| 	Metadata   Metadata    `yaml:"metadata,omitempty"` | ||||
| 	Spec       IngressSpec `yaml:"spec,omitempty"` | ||||
| } | ||||
|  | ||||
|  | ||||
|  | ||||
| type IngressSpec struct { | ||||
| 	Rules []Rule `yaml:"rules,omitempty"` | ||||
| } | ||||
|  | ||||
| type Rule struct { | ||||
| 	HTTP HTTP `yaml:"http,omitempty"` | ||||
| } | ||||
|  | ||||
| type HTTP struct { | ||||
| 	Paths []Path `yaml:"paths,omitempty"` | ||||
| } | ||||
|  | ||||
| type Path struct { | ||||
| 	Path     string  `yaml:"path,omitempty"` | ||||
| 	PathType string  `yaml:"pathType,omitempty"` | ||||
| 	Backend  Backend `yaml:"backend,omitempty"` | ||||
| } | ||||
|  | ||||
| type Backend struct { | ||||
| 	ServiceName string `yaml:"serviceName,omitempty"` | ||||
| 	ServicePort int64  `yaml:"servicePort,omitempty"` | ||||
| } | ||||
|  | ||||
| func NewIngress(contract map[string]map[string]string, serviceName string) Ingress { | ||||
| 	new_ingr := Ingress{ | ||||
| 		ApiVersion: "networking.k8s.io/v1", | ||||
| 		Kind:       "Ingress", | ||||
| 		Metadata: Metadata{ | ||||
| 			GenerateName: "ingress-argo-", | ||||
| 		}, | ||||
| 		Spec: IngressSpec{ | ||||
| 			Rules: []Rule{ | ||||
| 				{ | ||||
| 					HTTP: HTTP{ | ||||
| 						Paths: []Path{}, | ||||
| 						}, | ||||
| 					}, | ||||
| 				}, | ||||
| 			}, | ||||
| 		} | ||||
|  | ||||
| 	for port_to_reverse_str, translations := range contract{ | ||||
| 		 | ||||
| 		port, _ := strconv.ParseInt(port_to_reverse_str,10,64) | ||||
|  | ||||
| 		port_reverse := Path{ | ||||
| 			Path:     translations["reverse"], | ||||
| 			PathType: "Prefix", | ||||
| 			Backend: Backend{ | ||||
| 				ServiceName: serviceName, | ||||
| 				ServicePort:port, | ||||
| 			}, | ||||
| 		} | ||||
|  | ||||
| 		new_ingr.Spec.Rules[0].HTTP.Paths = append(new_ingr.Spec.Rules[0].HTTP.Paths, port_reverse) | ||||
|  | ||||
| 	} | ||||
|  | ||||
| 	return new_ingr | ||||
| } | ||||
|  | ||||
							
								
								
									
										12
									
								
								models/k8s_manifest.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										12
									
								
								models/k8s_manifest.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,12 @@ | ||||
| package models | ||||
|  | ||||
| type Manifest struct { | ||||
| 	ApiVersion string   `yaml:"apiVersion,omitempty"` | ||||
| 	Kind       string   `yaml:"kind"` | ||||
| 	Metadata   Metadata `yaml:"metadata,omitempty"` | ||||
| } | ||||
|  | ||||
| type Metadata struct { | ||||
|     Name      		string	`yaml:"name"` | ||||
|     GenerateName	string	`yaml:"generateName"` | ||||
| } | ||||
| @@ -10,16 +10,11 @@ type ServiceResource struct { | ||||
| } | ||||
|  | ||||
| type Service struct { | ||||
|     APIVersion string            `yaml:"apiVersion"` | ||||
|     Kind       string            `yaml:"kind"` | ||||
|     Metadata   Metadata          `yaml:"metadata"` | ||||
|     Manifest | ||||
|     Spec       ServiceSpec       `yaml:"spec"` | ||||
| } | ||||
|  | ||||
| type Metadata struct { | ||||
|     Name      string            `yaml:"name"` | ||||
|      | ||||
| } | ||||
|  | ||||
|  | ||||
| // ServiceSpec is the specification of the Kubernetes Service | ||||
| type ServiceSpec struct { | ||||
|   | ||||
							
								
								
									
										11
									
								
								traefik-values.yaml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										11
									
								
								traefik-values.yaml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,11 @@ | ||||
| globalArguments: | ||||
| deployment: | ||||
|   kind: DaemonSet | ||||
| providers: | ||||
|   kubernetesCRD: | ||||
|     enabled: true | ||||
| service: | ||||
|   type: LoadBalancer | ||||
| ingressRoute: | ||||
|   dashboard: | ||||
|     enabled: false | ||||
| @@ -5,14 +5,17 @@ | ||||
| package workflow_builder | ||||
|  | ||||
| import ( | ||||
| 	"oc-monitord/models" | ||||
| 	. "oc-monitord/models" | ||||
| 	"os" | ||||
| 	"slices" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	oclib "cloud.o-forge.io/core/oc-lib" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/resource_model" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/resources/processing" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" | ||||
| 	w "cloud.o-forge.io/core/oc-lib/models/workflow" | ||||
| 	"github.com/nwtgck/go-fakelish" | ||||
| @@ -22,23 +25,26 @@ import ( | ||||
|  | ||||
| var logger zerolog.Logger | ||||
|  | ||||
| type ServiceExposure int  | ||||
| const ( | ||||
| 	PAT 	ServiceExposure = iota | ||||
| 	Reverse | ||||
| 	Both | ||||
| ) | ||||
|  | ||||
| type ArgoBuilder struct { | ||||
| 	OriginWorkflow 	w.Workflow | ||||
| 	Workflow       	Workflow | ||||
| 	Services		*Service | ||||
| 	Services		[]Service | ||||
| 	Timeout        	int | ||||
| } | ||||
|  | ||||
| type Workflow struct { | ||||
| 	ApiVersion string `yaml:"apiVersion"` | ||||
| 	Kind       string `yaml:"kind"` | ||||
| 	Metadata   struct { | ||||
| 		Name string `yaml:"name"` | ||||
| 	} `yaml:"metadata"` | ||||
| 	Spec Spec `yaml:"spec,omitempty"` | ||||
| 	Manifest | ||||
| 	Spec ArgoSpec `yaml:"spec,omitempty"` | ||||
| } | ||||
|  | ||||
| type Spec struct { | ||||
| type ArgoSpec struct { | ||||
| 	Entrypoint string                `yaml:"entrypoint"` | ||||
| 	Arguments  []Parameter           `yaml:"arguments,omitempty"` | ||||
| 	Volumes    []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"` | ||||
| @@ -46,13 +52,12 @@ type Spec struct { | ||||
| 	Timeout    int                   `yaml:"activeDeadlineSeconds,omitempty"` | ||||
| } | ||||
|  | ||||
|  | ||||
|  | ||||
| func (b *ArgoBuilder) CreateDAG() (string, error) { | ||||
| 	 | ||||
| 	// handle services by checking if there is only one processing with hostname and port | ||||
| 	 | ||||
| 	b.createNginxVolumes() | ||||
| 	 | ||||
| 	 | ||||
| 		 | ||||
| 	b.createTemplates() | ||||
| 	b.createDAGstep() | ||||
| 	b.createVolumes() | ||||
| @@ -62,7 +67,7 @@ func (b *ArgoBuilder) CreateDAG() (string, error) { | ||||
| 		b.Workflow.Spec.Timeout = b.Timeout | ||||
| 	} | ||||
| 	b.Workflow.Spec.Entrypoint = "dag" | ||||
| 	b.Workflow.ApiVersion = "argoproj.io/v1alpha1" | ||||
| 	b.Workflow.Manifest.ApiVersion = "argoproj.io/v1alpha1" | ||||
| 	b.Workflow.Kind = "Workflow" | ||||
| 	random_name := generateWfName() | ||||
| 	b.Workflow.Metadata.Name = "oc-monitor-" + random_name | ||||
| @@ -91,6 +96,7 @@ func (b *ArgoBuilder) createTemplates() { | ||||
| 		var command string | ||||
| 		var args string | ||||
| 		var env string | ||||
| 		var serv models.Service | ||||
|  | ||||
| 		comp_res := comp.Processing | ||||
|  | ||||
| @@ -114,24 +120,39 @@ func (b *ArgoBuilder) createTemplates() { | ||||
| 		new_temp := Template{Name: argo_name, Container: temp_container} | ||||
| 		new_temp.Inputs.Parameters = inputs_container | ||||
| 		new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "workdir", MountPath: "/mnt/vol"}) // TODO : replace this with a search of the storage / data source name | ||||
| 		new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "nginx-demo", MountPath: "/usr/share/nginx"}) // Used for processing services' demo with nginx  | ||||
| 		 | ||||
| 		if (b.isService(comp.ID)){ | ||||
| 			serv := b.CreateService(comp) | ||||
| 			b.createService(serv, argo_name, comp.ID) | ||||
| 			new_temp.Metadata.Labels = make(map[string]string) | ||||
| 			new_temp.Metadata.Labels["app"] = "oc-service"		// Construct the template for the k8s service and add a link in graph between k8s service and processing | ||||
|  | ||||
| 			serv_type := getServiceExposure(*comp.Processing) | ||||
| 			if serv_type == PAT || serv_type == Both{ | ||||
| 				serv = b.CreateKubeService(comp, NodePort) | ||||
| 				b.addKubeServiceToWorkflow(serv, argo_name, comp.ID) | ||||
| 				new_temp.Metadata.Labels = make(map[string]string) | ||||
| 				new_temp.Metadata.Labels["app"] = serv.Spec.Selector["app"]		// Construct the template for the k8s service and add a link in graph between k8s service and processing | ||||
| 				b.addServiceToArgo(serv) | ||||
| 				ingress := b.CreateIngress(comp,serv) | ||||
| 				b.addIngressToWorfklow(ingress, argo_name, comp.ID) | ||||
|  | ||||
| 			} | ||||
| 			if serv_type == Reverse || serv_type == Both{ | ||||
| 				serv = b.CreateKubeService(comp, ClusterIP) | ||||
| 				// create ingress by passing the service and the processing (or reverse) | ||||
| 				b.addKubeServiceToWorkflow(serv, argo_name, comp.ID) | ||||
| 				new_temp.Metadata.Labels = make(map[string]string) | ||||
| 				new_temp.Metadata.Labels["app"] = serv.Spec.Selector["app"]		// Construct the template for the k8s service and add a link in graph between k8s service and processing | ||||
| 				b.addServiceToArgo(serv) | ||||
| 			} | ||||
|  | ||||
| 			 | ||||
| 			// if err != nil { | ||||
| 			// 	// TODO | ||||
| 			// } | ||||
| 		} | ||||
| 		 | ||||
| 		b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, new_temp) | ||||
|  | ||||
| 	} | ||||
|  | ||||
| 	if b.Services != nil { | ||||
| 		b.addServiceToArgo() | ||||
| 	} | ||||
|  | ||||
| } | ||||
|  | ||||
| @@ -153,8 +174,9 @@ func (b *ArgoBuilder) createDAGstep() { | ||||
| 		new_dag.Tasks = append(new_dag.Tasks, step) | ||||
| 	} | ||||
|  | ||||
| 	if b.Services != nil { | ||||
| 		new_dag.Tasks = append(new_dag.Tasks, Task{Name:"workflow-service-pod", Template: "workflow-service-pod"}) | ||||
| 	for i, _ := range b.Services { | ||||
| 		name := "workflow-service-pod-"+strconv.Itoa(i + 1) | ||||
| 		new_dag.Tasks = append(new_dag.Tasks, Task{Name: name , Template: name}) | ||||
| 	} | ||||
|  | ||||
| 	b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, Template{Name: "dag", Dag: new_dag}) | ||||
| @@ -170,15 +192,6 @@ func (b *ArgoBuilder) createVolumes() { | ||||
| 	b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) | ||||
| } | ||||
|  | ||||
| // For demo purposes, until we implement the use of storage ressources | ||||
| func (b *ArgoBuilder) createNginxVolumes() { | ||||
| 	new_volume := VolumeClaimTemplate{} | ||||
| 	new_volume.Metadata.Name = "nginx-demo" | ||||
| 	new_volume.Spec.AccessModes = []string{"ReadWriteOnce"} | ||||
| 	new_volume.Spec.Resources.Requests.Storage = "1Gi" | ||||
|  | ||||
| 	b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) | ||||
| } | ||||
|  | ||||
|  | ||||
| func (b *ArgoBuilder) getDependency(current_computing_id string) (dependencies []string) { | ||||
| @@ -319,14 +332,29 @@ func (b *ArgoBuilder) isService(id string) bool{ | ||||
| 	return is_exposed | ||||
| } | ||||
|  | ||||
| func getServiceExposure(service processing.ProcessingResource) ServiceExposure{ | ||||
| 	var exposure_type ServiceExposure | ||||
|  | ||||
| func (b *ArgoBuilder) addLabel(name string, id string) { | ||||
| 	argo_name := getArgoName(name,id) | ||||
| 	for _, template := range b.Workflow.Spec.Templates{ | ||||
| 		if template.Name == argo_name{ | ||||
| 			template.Metadata.Labels["app"] = "service-workflow"			 | ||||
| 			return | ||||
| 		} | ||||
| 	contract := getExposeContract(service.ResourceModel.Model["expose"]) | ||||
| 	_, pat := contract["PAT"] | ||||
| 	_, reverse := contract["reverse"] | ||||
| 	 | ||||
| 	if pat && reverse { | ||||
| 		exposure_type=  Both | ||||
| 	} | ||||
| 	if pat { | ||||
| 		exposure_type = PAT | ||||
| 	} | ||||
| 	if reverse{ | ||||
| 		exposure_type = Reverse  | ||||
| 	} | ||||
|  | ||||
| 	return exposure_type | ||||
| 	 | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) CreateIngress(processing processing.ProcessingResource, service Service) Ingress{ | ||||
| 	contract := getExposeContract(processing.ResourceModel.Model["expose"]) | ||||
| 	new_ingress := models.NewIngress(contract,service.Metadata.Name) | ||||
| 	return new_ingress | ||||
| } | ||||
| @@ -7,11 +7,19 @@ import ( | ||||
|  | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/resource_model" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" | ||||
| 	"github.com/nwtgck/go-fakelish" | ||||
| 	"go.mongodb.org/mongo-driver/bson" | ||||
| 	"go.mongodb.org/mongo-driver/bson/primitive" | ||||
| 	"gopkg.in/yaml.v3" | ||||
| ) | ||||
|  | ||||
| type ServiceType string  | ||||
|  | ||||
| const ( | ||||
| 	NodePort	ServiceType = "NodePort" | ||||
| 	ClusterIP	ServiceType = "ClusterIP" | ||||
| ) | ||||
|  | ||||
| // TODO : refactor this method or the deserialization process in oc-lib to get rid of the mongo code | ||||
| func getExposeContract(expose resource_model.Model) map[string]map[string]string { | ||||
| 	contract := make(map[string]map[string]string,0) | ||||
| @@ -40,7 +48,7 @@ func getExposeContract(expose resource_model.Model) map[string]map[string]string | ||||
| } | ||||
|  | ||||
|  | ||||
| func (b *ArgoBuilder) CreateService(processing graph.GraphItem) models.Service{ | ||||
| func (b *ArgoBuilder) CreateKubeService(processing graph.GraphItem, service_type ServiceType) models.Service{ | ||||
| 	 | ||||
| 	// model { | ||||
| 	// 	Type : "dict", | ||||
| @@ -57,23 +65,24 @@ func (b *ArgoBuilder) CreateService(processing graph.GraphItem) models.Service{ | ||||
| 	// } | ||||
| 	 | ||||
|  | ||||
| 	new_service := models.Service{APIVersion: "v1",  | ||||
| 							Kind: "Service",  | ||||
| 							Metadata: models.Metadata{ | ||||
| 								Name: "workflow-service" , | ||||
| 							},  | ||||
| 							Spec: models.ServiceSpec{ | ||||
| 								Selector: map[string]string{"app": "oc-service"}, | ||||
| 								Ports: []models.ServicePort{ | ||||
| 								}, | ||||
| 								Type: "NodePort", | ||||
| 							}, | ||||
| 					} | ||||
| 	 | ||||
| 	new_service := models.Service{ | ||||
| 		Manifest: models.Manifest{ | ||||
| 			ApiVersion: "v1",  | ||||
| 			Kind: "Service",  | ||||
| 			Metadata: models.Metadata{ | ||||
| 				Name: "workflow-service-"+ processing.Processing.Name + "-" + processing.ID , | ||||
| 				}, | ||||
| 			}, | ||||
| 			Spec: models.ServiceSpec{ | ||||
| 				Selector: map[string]string{"app": "service-" + fakelish.GenerateFakeWord(5, 8)}, | ||||
| 				Ports: []models.ServicePort{ | ||||
| 				}, | ||||
| 				Type: string(service_type), | ||||
| 			}, | ||||
| 	} | ||||
|  | ||||
| 	completeServicePorts(&new_service, processing) | ||||
| 	yamlified, _ := yaml.Marshal(new_service) | ||||
| 	x := string(yamlified) | ||||
| 	_ = x | ||||
| 	 | ||||
| 	return new_service | ||||
| } | ||||
|  | ||||
| @@ -90,7 +99,7 @@ func completeServicePorts(service *models.Service, processing graph.GraphItem) { | ||||
| 			return | ||||
| 		} | ||||
|  | ||||
| 		 | ||||
| 		// This condition allows us to create NodePort if PAT is filled or a ClusterIP that only expose port 80 and 443 (for Ingress) | ||||
| 		if _, ok := translation_dict["PAT"]; ok{ | ||||
| 			port_translation, err := strconv.ParseInt(translation_dict["PAT"], 10, 64) | ||||
| 			if err != nil { | ||||
| @@ -98,8 +107,6 @@ func completeServicePorts(service *models.Service, processing graph.GraphItem) { | ||||
| 				return | ||||
| 			} | ||||
| 			 | ||||
|  | ||||
|  | ||||
| 			new_port_translation := models.ServicePort{ | ||||
| 				Name: strings.ToLower(processing.Processing.Name) + processing.ID, | ||||
| 				Port:  port_translation-30000, | ||||
| @@ -108,32 +115,52 @@ func completeServicePorts(service *models.Service, processing graph.GraphItem) { | ||||
| 				Protocol: "TCP", | ||||
| 			} | ||||
| 			service.Spec.Ports = append(service.Spec.Ports, new_port_translation) | ||||
| 		}  | ||||
|  | ||||
| 	} | ||||
|  | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) createService(service models.Service, processing_name string, processing_id string) { | ||||
| 	if b.Services != nil{ | ||||
| 		b.Services.Spec.Ports = append(b.Services.Spec.Ports, service.Spec.Ports...) | ||||
| 		}else { | ||||
| 			b.Services = &service | ||||
| 		} | ||||
| 		} else { | ||||
| 			port_spec := []models.ServicePort{ | ||||
| 				models.ServicePort{ | ||||
| 					Name: strings.ToLower(processing.Processing.Name) + processing.ID + "-80", | ||||
| 					Port:  80, | ||||
| 					TargetPort: 80, | ||||
| 					Protocol: "TCP", | ||||
| 			}, | ||||
| 			models.ServicePort{ | ||||
| 				Name: strings.ToLower(processing.Processing.Name) + processing.ID + "-443", | ||||
| 				Port:  443, | ||||
| 				TargetPort: 443, | ||||
| 				Protocol: "TCP", | ||||
| 			}, | ||||
| 		 | ||||
| 	b.addLabel(processing_name,processing_id) | ||||
| 		} | ||||
| 		service.Spec.Ports = append(service.Spec.Ports,port_spec...) | ||||
|  | ||||
| 		} | ||||
| 	} | ||||
| 	return | ||||
|  | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) addServiceToArgo() error { | ||||
| 	service_manifest, err := yaml.Marshal(b.Services) | ||||
| // The k8s service passed as the parameter only expose one port because it is the result of CreateService() | ||||
| // we check if this port is already exposed by a service in the workflow and we proceed to the creation of a new service OR  | ||||
| // add the port to the list of port exposed by an existing if portAlreadyExposed is false  | ||||
| func (b *ArgoBuilder) addKubeServiceToWorkflow(service models.Service, processing_name string, processing_id string) (label string) { | ||||
| 	if exposed, service_available_port := b.portAlreadyExposed(service.Spec.Ports[0].TargetPort); exposed && service_available_port != nil{ | ||||
| 		// The port you want to expose is already exposed by all the existing services | ||||
| 		service_available_port.Spec.Ports = append(service_available_port.Spec.Ports, service.Spec.Ports...) | ||||
| 		return service_available_port.Spec.Selector["app"] | ||||
| 	}  | ||||
|  | ||||
| 	b.Services = append(b.Services, service) | ||||
| 	return service.Spec.Selector["app"]	 | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) addServiceToArgo(service models.Service) error { | ||||
| 	service_manifest, err := yaml.Marshal(service) | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not marshal service manifest") | ||||
| 		return err | ||||
| 	} | ||||
| 	 | ||||
| 	service_template := models.Template{Name: "workflow-service-pod",  | ||||
| 	service_template := models.Template{Name: "workflow-service-pod-" + strconv.Itoa(len(b.Services)),  | ||||
| 								Resource: models.ServiceResource{ | ||||
| 									Action: "create",  | ||||
| 									SuccessCondition: "status.succeeded > 0", | ||||
| @@ -145,4 +172,44 @@ func (b *ArgoBuilder) addServiceToArgo() error { | ||||
| 	b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, service_template) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
|  | ||||
| func (b *ArgoBuilder) addLabel(name string, id string) { | ||||
| 	argo_name := getArgoName(name,id) | ||||
| 	for _, template := range b.Workflow.Spec.Templates{ | ||||
| 		if template.Name == argo_name{ | ||||
| 			template.Metadata.Labels["app"] = "service-workflow"			 | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) portAlreadyExposed(port int64) (exposed bool, service *models.Service ){ | ||||
| 	// For all already existing k8s services, test if the port in parameter is already exposed and returns the first service that doesn't yet expose this port  | ||||
| 	for _, s := range b.Services { | ||||
| 		i := 0 | ||||
| 		port_exposed := false | ||||
| 		for !port_exposed { | ||||
| 			if s.Spec.Ports[i].TargetPort == port { | ||||
| 				port_exposed = true | ||||
| 			} | ||||
| 		} | ||||
| 		if !port_exposed { | ||||
| 			return false, &s | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	return true, nil | ||||
| } | ||||
|  | ||||
| // If contract has a reverse value | ||||
| 	// Test if service already exist for the argo service | ||||
| 		// Yes : create ingress, associate it with the existing kube service (name + exposed port on service)  | ||||
| 		// No :  | ||||
| 			// - create template for a service that expose the port of the argo service (pod) | ||||
| 			// - store the name of the service and its port exposed | ||||
| 			// - create template for an ingress : manifest must contain the path, name of the service and port exposed on the service | ||||
| func (b *ArgoBuilder) addIngress(){ | ||||
|  | ||||
| } | ||||
		Reference in New Issue
	
	Block a user