Compare commits
	
		
			5 Commits
		
	
	
		
			main
			...
			services_d
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 2f9b2fb464 | |||
| d1dab992cf | |||
| 52278609bd | |||
| ed48fc0a55 | |||
| 03ad8c084d | 
							
								
								
									
										18
									
								
								.vscode/launch.json
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										18
									
								
								.vscode/launch.json
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1,18 @@ | |||||||
|  | { | ||||||
|  |     // Use IntelliSense to learn about possible attributes. | ||||||
|  |     // Hover to view descriptions of existing attributes. | ||||||
|  |     // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 | ||||||
|  |     "version": "0.2.0", | ||||||
|  |     "configurations": [ | ||||||
|  |         { | ||||||
|  |             "name": "Launch Package", | ||||||
|  |             "type": "go", | ||||||
|  |             "request": "launch", | ||||||
|  |             "mode": "auto", | ||||||
|  |             "program": "${fileDirname}", | ||||||
|  |             "args": ["-e","e3b7772e-dc9f-4bc1-9f5d-533e23e6cd57" ,"-u","http://127.0.0.1:3100","-m" ,"mongodb://127.0.0.1:27017","-d","DC_myDC"], | ||||||
|  |             "env": {"OCMONITOR_LOKIURL":"http://127.0.0.1:3100","OCMONITOR_WORKFLOW":"8d0f1814-b5ca-436c-ba3f-116d99198fd2","KUBERNETES_SERVICE_HOST":"","OCMONITOR_MONGOURL":"mongodb://127.0.0.1:27017","OCMONITOR_DATABASE":"DC_myDC","test_service":"true"} | ||||||
|  |         } | ||||||
|  |     ] | ||||||
|  | } | ||||||
|  |  | ||||||
							
								
								
									
										70
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										70
									
								
								README.md
									
									
									
									
									
								
							| @@ -47,6 +47,76 @@ In rules add a new entry : | |||||||
|  |  | ||||||
| This command **must return "yes"** | This command **must return "yes"** | ||||||
|  |  | ||||||
|  | ## Allow services to be joined with reverse proxy | ||||||
|  |  | ||||||
|  | Since the development has been realised in a K3S environment, we will use the lightweight solution provided by **traefik**.  | ||||||
|  |  | ||||||
|  | We need to install **metallb** to expose our cluster to the exterior and allow packets to reach traefik. | ||||||
|  |  | ||||||
|  | ### Deploy traefik and metallb | ||||||
|  |  | ||||||
|  | - Make sure that helm is installed, else visit : https://helm.sh/docs/intro/install/  | ||||||
|  |  | ||||||
|  | - Add the repositories for traefik and metallb | ||||||
|  | > helm repo add metallb https://metallb.github.io/metallb  | ||||||
|  | > helm repo add traefik https://helm.traefik.io/traefik | ||||||
|  |  | ||||||
|  | >helm repo update | ||||||
|  |  | ||||||
|  | - Create the namespaces for each | ||||||
|  | > kubectl create ns traefik-ingress | ||||||
|  | > kubectl create ns metallb-system  | ||||||
|  |  | ||||||
|  | - Configure the deployment  | ||||||
|  |  | ||||||
|  | ``` | ||||||
|  | cat > traefik-values.yaml <<EOF | ||||||
|  | globalArguments: | ||||||
|  | deployment: | ||||||
|  |   kind: DaemonSet | ||||||
|  | providers: | ||||||
|  |   kubernetesCRD: | ||||||
|  |     enabled: true | ||||||
|  | service: | ||||||
|  |   type: LoadBalancer | ||||||
|  | ingressRoute: | ||||||
|  |   dashboard: | ||||||
|  |     enabled: false | ||||||
|  | EOF | ||||||
|  | ``` | ||||||
|  |  | ||||||
|  | - Launch the installs | ||||||
|  | > helm upgrade --install metallb metallb/metallb --namespace metallb-system | ||||||
|  |  | ||||||
|  | > helm install --namespace=traefik-ingress traefik traefik/traefik --values=./traefik-values.yaml | ||||||
|  |  | ||||||
|  | ### Configure metallb | ||||||
|  |  | ||||||
|  | ``` | ||||||
|  | cat << 'EOF' | kubectl apply -f - | ||||||
|  | apiVersion: metallb.io/v1beta1 | ||||||
|  | kind: IPAddressPool | ||||||
|  | metadata: | ||||||
|  |   name: default-pool | ||||||
|  |   namespace: metallb-system | ||||||
|  | spec: | ||||||
|  |   addresses: | ||||||
|  |   - 192.168.0.200-192.168.0.250 | ||||||
|  | --- | ||||||
|  | apiVersion: metallb.io/v1beta1 | ||||||
|  | kind: L2Advertisement | ||||||
|  | metadata: | ||||||
|  |   name: default | ||||||
|  |   namespace: metallb-system | ||||||
|  | spec: | ||||||
|  |   ipAddressPools: | ||||||
|  |   - default-pool | ||||||
|  | EOF | ||||||
|  | ``` | ||||||
|  |  | ||||||
|  | - Check that the services created in traefik-ingress have an external IP | ||||||
|  |  | ||||||
|  | > kubectl get service -n traefik-ingress -o wide | ||||||
|  |  | ||||||
| ## TODO | ## TODO | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										30
									
								
								demo_nginx/nginx.conf
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										30
									
								
								demo_nginx/nginx.conf
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,30 @@ | |||||||
|  | # nginx.conf | ||||||
|  | user  nginx; | ||||||
|  | worker_processes  auto; | ||||||
|  | error_log  /var/log/nginx/error.log warn; | ||||||
|  | pid        /var/run/nginx.pid; | ||||||
|  | events { | ||||||
|  |   worker_connections  1024; | ||||||
|  | } | ||||||
|  | http { | ||||||
|  |   include       /etc/nginx/mime.types; | ||||||
|  |   default_type  application/octet-stream; | ||||||
|  |   log_format  main  '$remote_addr - $remote_user [$time_local] "$request" ' | ||||||
|  |   '$status $body_bytes_sent "$http_referer" ' | ||||||
|  |   '"$http_user_agent" "$http_x_forwarded_for"'; | ||||||
|  |   access_log  /var/log/nginx/access.log  main; | ||||||
|  |   sendfile        on; | ||||||
|  |   #tcp_nopush     on; | ||||||
|  |    | ||||||
|  |   keepalive_timeout  65; | ||||||
|  |   #gzip  on; | ||||||
|  |   #include /etc/nginx/conf.d/*.conf; | ||||||
|  | server { | ||||||
|  |   listen 80; | ||||||
|  |   location / { | ||||||
|  |     root   /usr/share/nginx/html; | ||||||
|  |     index  index.html index.htm; | ||||||
|  |     try_files $uri $uri/ /index.html; | ||||||
|  |   } | ||||||
|  | } | ||||||
|  | } | ||||||
							
								
								
									
										2
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								main.go
									
									
									
									
									
								
							| @@ -46,8 +46,6 @@ const localConfigFile = "./conf/local_ocmonitord_conf.json" | |||||||
|  |  | ||||||
| func main() { | func main() { | ||||||
|  |  | ||||||
| 	os.Setenv("test_service","true") 	// Only for service demo, delete before merging on main |  | ||||||
|  |  | ||||||
| 	monitorLocal = false | 	monitorLocal = false | ||||||
| 	// Test if monitor is launched outside (with parameters) or in a k8s environment (env variables sets) | 	// Test if monitor is launched outside (with parameters) or in a k8s environment (env variables sets) | ||||||
| 	if os.Getenv("KUBERNETES_SERVICE_HOST") == "" { | 	if os.Getenv("KUBERNETES_SERVICE_HOST") == "" { | ||||||
|   | |||||||
							
								
								
									
										101
									
								
								models/ingress.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										101
									
								
								models/ingress.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,101 @@ | |||||||
|  | package models | ||||||
|  |  | ||||||
|  | import "strconv" | ||||||
|  |  | ||||||
|  | // apiVersion: networking.k8s.io/v1 | ||||||
|  | // kind: Ingress | ||||||
|  | // metadata: | ||||||
|  | //   name: example-ingress | ||||||
|  | //   namespace: argo | ||||||
|  | //   annotations: | ||||||
|  | //     traefik.ingress.kubernetes.io/router.entrypoints: web  # Utilisation de l'entrypoint HTTP standard | ||||||
|  | // spec: | ||||||
|  | //   rules: | ||||||
|  | //   - http: | ||||||
|  | //       paths: | ||||||
|  | //       - path: /dtf | ||||||
|  | //         pathType: Prefix | ||||||
|  | //         backend: | ||||||
|  | //           service: | ||||||
|  | //             name: workflow-service-qtjk2 | ||||||
|  | //             port: | ||||||
|  | //               number: 80 | ||||||
|  |  | ||||||
|  | var ingress_manifest = &Manifest{ | ||||||
|  | 	ApiVersion: "networking.k8s.io/v1", | ||||||
|  | 	Kind: "Ingress", | ||||||
|  | 	Metadata: Metadata{ | ||||||
|  | 		GenerateName: "ingress-argo-", | ||||||
|  | 	}, | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type Ingress struct { | ||||||
|  | 	ApiVersion string      `yaml:"apiVersion,omitempty"` | ||||||
|  | 	Kind       string      `yaml:"kind,omitempty"` | ||||||
|  | 	Metadata   Metadata    `yaml:"metadata,omitempty"` | ||||||
|  | 	Spec       IngressSpec `yaml:"spec,omitempty"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  | type IngressSpec struct { | ||||||
|  | 	Rules []Rule `yaml:"rules,omitempty"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type Rule struct { | ||||||
|  | 	HTTP HTTP `yaml:"http,omitempty"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type HTTP struct { | ||||||
|  | 	Paths []Path `yaml:"paths,omitempty"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type Path struct { | ||||||
|  | 	Path     string  `yaml:"path,omitempty"` | ||||||
|  | 	PathType string  `yaml:"pathType,omitempty"` | ||||||
|  | 	Backend  Backend `yaml:"backend,omitempty"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type Backend struct { | ||||||
|  | 	ServiceName string `yaml:"serviceName,omitempty"` | ||||||
|  | 	ServicePort int64  `yaml:"servicePort,omitempty"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func NewIngress(contract map[string]map[string]string, serviceName string) Ingress { | ||||||
|  | 	new_ingr := Ingress{ | ||||||
|  | 		ApiVersion: "networking.k8s.io/v1", | ||||||
|  | 		Kind:       "Ingress", | ||||||
|  | 		Metadata: Metadata{ | ||||||
|  | 			GenerateName: "ingress-argo-", | ||||||
|  | 		}, | ||||||
|  | 		Spec: IngressSpec{ | ||||||
|  | 			Rules: []Rule{ | ||||||
|  | 				{ | ||||||
|  | 					HTTP: HTTP{ | ||||||
|  | 						Paths: []Path{}, | ||||||
|  | 						}, | ||||||
|  | 					}, | ||||||
|  | 				}, | ||||||
|  | 			}, | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 	for port_to_reverse_str, translations := range contract{ | ||||||
|  | 		 | ||||||
|  | 		port, _ := strconv.ParseInt(port_to_reverse_str,10,64) | ||||||
|  |  | ||||||
|  | 		port_reverse := Path{ | ||||||
|  | 			Path:     translations["reverse"], | ||||||
|  | 			PathType: "Prefix", | ||||||
|  | 			Backend: Backend{ | ||||||
|  | 				ServiceName: serviceName, | ||||||
|  | 				ServicePort:port, | ||||||
|  | 			}, | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		new_ingr.Spec.Rules[0].HTTP.Paths = append(new_ingr.Spec.Rules[0].HTTP.Paths, port_reverse) | ||||||
|  |  | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return new_ingr | ||||||
|  | } | ||||||
|  |  | ||||||
							
								
								
									
										12
									
								
								models/k8s_manifest.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										12
									
								
								models/k8s_manifest.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,12 @@ | |||||||
|  | package models | ||||||
|  |  | ||||||
|  | type Manifest struct { | ||||||
|  | 	ApiVersion string   `yaml:"apiVersion,omitempty"` | ||||||
|  | 	Kind       string   `yaml:"kind"` | ||||||
|  | 	Metadata   Metadata `yaml:"metadata,omitempty"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | type Metadata struct { | ||||||
|  |     Name      		string	`yaml:"name"` | ||||||
|  |     GenerateName	string	`yaml:"generateName"` | ||||||
|  | } | ||||||
| @@ -10,16 +10,11 @@ type ServiceResource struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| type Service struct { | type Service struct { | ||||||
|     APIVersion string            `yaml:"apiVersion"` |     Manifest | ||||||
|     Kind       string            `yaml:"kind"` |  | ||||||
|     Metadata   Metadata          `yaml:"metadata"` |  | ||||||
|     Spec       ServiceSpec       `yaml:"spec"` |     Spec       ServiceSpec       `yaml:"spec"` | ||||||
| } | } | ||||||
|  |  | ||||||
| type Metadata struct { |  | ||||||
|     Name      string            `yaml:"name"` |  | ||||||
|  |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // ServiceSpec is the specification of the Kubernetes Service | // ServiceSpec is the specification of the Kubernetes Service | ||||||
| type ServiceSpec struct { | type ServiceSpec struct { | ||||||
|   | |||||||
							
								
								
									
										11
									
								
								traefik-values.yaml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										11
									
								
								traefik-values.yaml
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,11 @@ | |||||||
|  | globalArguments: | ||||||
|  | deployment: | ||||||
|  |   kind: DaemonSet | ||||||
|  | providers: | ||||||
|  |   kubernetesCRD: | ||||||
|  |     enabled: true | ||||||
|  | service: | ||||||
|  |   type: LoadBalancer | ||||||
|  | ingressRoute: | ||||||
|  |   dashboard: | ||||||
|  |     enabled: false | ||||||
| @@ -5,14 +5,17 @@ | |||||||
| package workflow_builder | package workflow_builder | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"oc-monitord/models" | ||||||
| 	. "oc-monitord/models" | 	. "oc-monitord/models" | ||||||
| 	"os" | 	"os" | ||||||
| 	"slices" | 	"slices" | ||||||
|  | 	"strconv" | ||||||
| 	"strings" | 	"strings" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	oclib "cloud.o-forge.io/core/oc-lib" | 	oclib "cloud.o-forge.io/core/oc-lib" | ||||||
| 	"cloud.o-forge.io/core/oc-lib/models/resource_model" | 	"cloud.o-forge.io/core/oc-lib/models/resource_model" | ||||||
|  | 	"cloud.o-forge.io/core/oc-lib/models/resources/processing" | ||||||
| 	"cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" | 	"cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" | ||||||
| 	w "cloud.o-forge.io/core/oc-lib/models/workflow" | 	w "cloud.o-forge.io/core/oc-lib/models/workflow" | ||||||
| 	"github.com/nwtgck/go-fakelish" | 	"github.com/nwtgck/go-fakelish" | ||||||
| @@ -22,23 +25,26 @@ import ( | |||||||
|  |  | ||||||
| var logger zerolog.Logger | var logger zerolog.Logger | ||||||
|  |  | ||||||
|  | type ServiceExposure int  | ||||||
|  | const ( | ||||||
|  | 	PAT 	ServiceExposure = iota | ||||||
|  | 	Reverse | ||||||
|  | 	Both | ||||||
|  | ) | ||||||
|  |  | ||||||
| type ArgoBuilder struct { | type ArgoBuilder struct { | ||||||
| 	OriginWorkflow 	w.Workflow | 	OriginWorkflow 	w.Workflow | ||||||
| 	Workflow       	Workflow | 	Workflow       	Workflow | ||||||
| 	Services		*Service | 	Services		[]Service | ||||||
| 	Timeout        	int | 	Timeout        	int | ||||||
| } | } | ||||||
|  |  | ||||||
| type Workflow struct { | type Workflow struct { | ||||||
| 	ApiVersion string `yaml:"apiVersion"` | 	Manifest | ||||||
| 	Kind       string `yaml:"kind"` | 	Spec ArgoSpec `yaml:"spec,omitempty"` | ||||||
| 	Metadata   struct { |  | ||||||
| 		Name string `yaml:"name"` |  | ||||||
| 	} `yaml:"metadata"` |  | ||||||
| 	Spec Spec `yaml:"spec,omitempty"` |  | ||||||
| } | } | ||||||
|  |  | ||||||
| type Spec struct { | type ArgoSpec struct { | ||||||
| 	Entrypoint string                `yaml:"entrypoint"` | 	Entrypoint string                `yaml:"entrypoint"` | ||||||
| 	Arguments  []Parameter           `yaml:"arguments,omitempty"` | 	Arguments  []Parameter           `yaml:"arguments,omitempty"` | ||||||
| 	Volumes    []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"` | 	Volumes    []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"` | ||||||
| @@ -46,13 +52,12 @@ type Spec struct { | |||||||
| 	Timeout    int                   `yaml:"activeDeadlineSeconds,omitempty"` | 	Timeout    int                   `yaml:"activeDeadlineSeconds,omitempty"` | ||||||
| } | } | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
| func (b *ArgoBuilder) CreateDAG() (string, error) { | func (b *ArgoBuilder) CreateDAG() (string, error) { | ||||||
| 	 | 	 | ||||||
| 	// handle services by checking if there is only one processing with hostname and port | 	// handle services by checking if there is only one processing with hostname and port | ||||||
| 		 | 		 | ||||||
| 	b.createNginxVolumes() |  | ||||||
| 	 |  | ||||||
| 	 |  | ||||||
| 	b.createTemplates() | 	b.createTemplates() | ||||||
| 	b.createDAGstep() | 	b.createDAGstep() | ||||||
| 	b.createVolumes() | 	b.createVolumes() | ||||||
| @@ -62,7 +67,7 @@ func (b *ArgoBuilder) CreateDAG() (string, error) { | |||||||
| 		b.Workflow.Spec.Timeout = b.Timeout | 		b.Workflow.Spec.Timeout = b.Timeout | ||||||
| 	} | 	} | ||||||
| 	b.Workflow.Spec.Entrypoint = "dag" | 	b.Workflow.Spec.Entrypoint = "dag" | ||||||
| 	b.Workflow.ApiVersion = "argoproj.io/v1alpha1" | 	b.Workflow.Manifest.ApiVersion = "argoproj.io/v1alpha1" | ||||||
| 	b.Workflow.Kind = "Workflow" | 	b.Workflow.Kind = "Workflow" | ||||||
| 	random_name := generateWfName() | 	random_name := generateWfName() | ||||||
| 	b.Workflow.Metadata.Name = "oc-monitor-" + random_name | 	b.Workflow.Metadata.Name = "oc-monitor-" + random_name | ||||||
| @@ -91,6 +96,7 @@ func (b *ArgoBuilder) createTemplates() { | |||||||
| 		var command string | 		var command string | ||||||
| 		var args string | 		var args string | ||||||
| 		var env string | 		var env string | ||||||
|  | 		var serv models.Service | ||||||
|  |  | ||||||
| 		comp_res := comp.Processing | 		comp_res := comp.Processing | ||||||
|  |  | ||||||
| @@ -114,24 +120,39 @@ func (b *ArgoBuilder) createTemplates() { | |||||||
| 		new_temp := Template{Name: argo_name, Container: temp_container} | 		new_temp := Template{Name: argo_name, Container: temp_container} | ||||||
| 		new_temp.Inputs.Parameters = inputs_container | 		new_temp.Inputs.Parameters = inputs_container | ||||||
| 		new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "workdir", MountPath: "/mnt/vol"}) // TODO : replace this with a search of the storage / data source name | 		new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "workdir", MountPath: "/mnt/vol"}) // TODO : replace this with a search of the storage / data source name | ||||||
| 		new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "nginx-demo", MountPath: "/usr/share/nginx"}) // Used for processing services' demo with nginx  |  | ||||||
| 		 | 		 | ||||||
| 		if (b.isService(comp.ID)){ | 		if (b.isService(comp.ID)){ | ||||||
| 			serv := b.CreateService(comp) |  | ||||||
| 			b.createService(serv, argo_name, comp.ID) | 			serv_type := getServiceExposure(*comp.Processing) | ||||||
|  | 			if serv_type == PAT || serv_type == Both{ | ||||||
|  | 				serv = b.CreateKubeService(comp, NodePort) | ||||||
|  | 				b.addKubeServiceToWorkflow(serv, argo_name, comp.ID) | ||||||
| 				new_temp.Metadata.Labels = make(map[string]string) | 				new_temp.Metadata.Labels = make(map[string]string) | ||||||
| 			new_temp.Metadata.Labels["app"] = "oc-service"		// Construct the template for the k8s service and add a link in graph between k8s service and processing | 				new_temp.Metadata.Labels["app"] = serv.Spec.Selector["app"]		// Construct the template for the k8s service and add a link in graph between k8s service and processing | ||||||
|  | 				b.addServiceToArgo(serv) | ||||||
|  | 				ingress := b.CreateIngress(comp,serv) | ||||||
|  | 				b.addIngressToWorfklow(ingress, argo_name, comp.ID) | ||||||
|  |  | ||||||
|  | 			} | ||||||
|  | 			if serv_type == Reverse || serv_type == Both{ | ||||||
|  | 				serv = b.CreateKubeService(comp, ClusterIP) | ||||||
|  | 				// create ingress by passing the service and the processing (or reverse) | ||||||
|  | 				b.addKubeServiceToWorkflow(serv, argo_name, comp.ID) | ||||||
|  | 				new_temp.Metadata.Labels = make(map[string]string) | ||||||
|  | 				new_temp.Metadata.Labels["app"] = serv.Spec.Selector["app"]		// Construct the template for the k8s service and add a link in graph between k8s service and processing | ||||||
|  | 				b.addServiceToArgo(serv) | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			 | ||||||
| 			// if err != nil { | 			// if err != nil { | ||||||
| 			// 	// TODO | 			// 	// TODO | ||||||
| 			// } | 			// } | ||||||
| 		} | 		} | ||||||
| 		 | 		 | ||||||
| 		b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, new_temp) | 		b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, new_temp) | ||||||
|  |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if b.Services != nil { |  | ||||||
| 		b.addServiceToArgo() |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -153,8 +174,9 @@ func (b *ArgoBuilder) createDAGstep() { | |||||||
| 		new_dag.Tasks = append(new_dag.Tasks, step) | 		new_dag.Tasks = append(new_dag.Tasks, step) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if b.Services != nil { | 	for i, _ := range b.Services { | ||||||
| 		new_dag.Tasks = append(new_dag.Tasks, Task{Name:"workflow-service-pod", Template: "workflow-service-pod"}) | 		name := "workflow-service-pod-"+strconv.Itoa(i + 1) | ||||||
|  | 		new_dag.Tasks = append(new_dag.Tasks, Task{Name: name , Template: name}) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, Template{Name: "dag", Dag: new_dag}) | 	b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, Template{Name: "dag", Dag: new_dag}) | ||||||
| @@ -170,15 +192,6 @@ func (b *ArgoBuilder) createVolumes() { | |||||||
| 	b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) | 	b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) | ||||||
| } | } | ||||||
|  |  | ||||||
| // For demo purposes, until we implement the use of storage ressources |  | ||||||
| func (b *ArgoBuilder) createNginxVolumes() { |  | ||||||
| 	new_volume := VolumeClaimTemplate{} |  | ||||||
| 	new_volume.Metadata.Name = "nginx-demo" |  | ||||||
| 	new_volume.Spec.AccessModes = []string{"ReadWriteOnce"} |  | ||||||
| 	new_volume.Spec.Resources.Requests.Storage = "1Gi" |  | ||||||
|  |  | ||||||
| 	b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) |  | ||||||
| } |  | ||||||
|  |  | ||||||
|  |  | ||||||
| func (b *ArgoBuilder) getDependency(current_computing_id string) (dependencies []string) { | func (b *ArgoBuilder) getDependency(current_computing_id string) (dependencies []string) { | ||||||
| @@ -319,14 +332,29 @@ func (b *ArgoBuilder) isService(id string) bool{ | |||||||
| 	return is_exposed | 	return is_exposed | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func getServiceExposure(service processing.ProcessingResource) ServiceExposure{ | ||||||
|  | 	var exposure_type ServiceExposure | ||||||
|  |  | ||||||
| func (b *ArgoBuilder) addLabel(name string, id string) { | 	contract := getExposeContract(service.ResourceModel.Model["expose"]) | ||||||
| 	argo_name := getArgoName(name,id) | 	_, pat := contract["PAT"] | ||||||
| 	for _, template := range b.Workflow.Spec.Templates{ | 	_, reverse := contract["reverse"] | ||||||
| 		if template.Name == argo_name{ | 	 | ||||||
| 			template.Metadata.Labels["app"] = "service-workflow"			 | 	if pat && reverse { | ||||||
| 			return | 		exposure_type=  Both | ||||||
| 	} | 	} | ||||||
|  | 	if pat { | ||||||
|  | 		exposure_type = PAT | ||||||
| 	} | 	} | ||||||
|  | 	if reverse{ | ||||||
|  | 		exposure_type = Reverse  | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return exposure_type | ||||||
|  | 	 | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func (b *ArgoBuilder) CreateIngress(processing processing.ProcessingResource, service Service) Ingress{ | ||||||
|  | 	contract := getExposeContract(processing.ResourceModel.Model["expose"]) | ||||||
|  | 	new_ingress := models.NewIngress(contract,service.Metadata.Name) | ||||||
|  | 	return new_ingress | ||||||
|  | } | ||||||
| @@ -7,11 +7,19 @@ import ( | |||||||
|  |  | ||||||
| 	"cloud.o-forge.io/core/oc-lib/models/resource_model" | 	"cloud.o-forge.io/core/oc-lib/models/resource_model" | ||||||
| 	"cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" | 	"cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" | ||||||
|  | 	"github.com/nwtgck/go-fakelish" | ||||||
| 	"go.mongodb.org/mongo-driver/bson" | 	"go.mongodb.org/mongo-driver/bson" | ||||||
| 	"go.mongodb.org/mongo-driver/bson/primitive" | 	"go.mongodb.org/mongo-driver/bson/primitive" | ||||||
| 	"gopkg.in/yaml.v3" | 	"gopkg.in/yaml.v3" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | type ServiceType string  | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	NodePort	ServiceType = "NodePort" | ||||||
|  | 	ClusterIP	ServiceType = "ClusterIP" | ||||||
|  | ) | ||||||
|  |  | ||||||
| // TODO : refactor this method or the deserialization process in oc-lib to get rid of the mongo code | // TODO : refactor this method or the deserialization process in oc-lib to get rid of the mongo code | ||||||
| func getExposeContract(expose resource_model.Model) map[string]map[string]string { | func getExposeContract(expose resource_model.Model) map[string]map[string]string { | ||||||
| 	contract := make(map[string]map[string]string,0) | 	contract := make(map[string]map[string]string,0) | ||||||
| @@ -40,7 +48,7 @@ func getExposeContract(expose resource_model.Model) map[string]map[string]string | |||||||
| } | } | ||||||
|  |  | ||||||
|  |  | ||||||
| func (b *ArgoBuilder) CreateService(processing graph.GraphItem) models.Service{ | func (b *ArgoBuilder) CreateKubeService(processing graph.GraphItem, service_type ServiceType) models.Service{ | ||||||
| 	 | 	 | ||||||
| 	// model { | 	// model { | ||||||
| 	// 	Type : "dict", | 	// 	Type : "dict", | ||||||
| @@ -57,23 +65,24 @@ func (b *ArgoBuilder) CreateService(processing graph.GraphItem) models.Service{ | |||||||
| 	// } | 	// } | ||||||
| 	 | 	 | ||||||
|  |  | ||||||
| 	new_service := models.Service{APIVersion: "v1",  | 	new_service := models.Service{ | ||||||
|  | 		Manifest: models.Manifest{ | ||||||
|  | 			ApiVersion: "v1",  | ||||||
| 			Kind: "Service",  | 			Kind: "Service",  | ||||||
| 			Metadata: models.Metadata{ | 			Metadata: models.Metadata{ | ||||||
| 								Name: "workflow-service" , | 				Name: "workflow-service-"+ processing.Processing.Name + "-" + processing.ID , | ||||||
|  | 				}, | ||||||
| 			}, | 			}, | ||||||
| 			Spec: models.ServiceSpec{ | 			Spec: models.ServiceSpec{ | ||||||
| 								Selector: map[string]string{"app": "oc-service"}, | 				Selector: map[string]string{"app": "service-" + fakelish.GenerateFakeWord(5, 8)}, | ||||||
| 				Ports: []models.ServicePort{ | 				Ports: []models.ServicePort{ | ||||||
| 				}, | 				}, | ||||||
| 								Type: "NodePort", | 				Type: string(service_type), | ||||||
| 			}, | 			}, | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	completeServicePorts(&new_service, processing) | 	completeServicePorts(&new_service, processing) | ||||||
| 	yamlified, _ := yaml.Marshal(new_service) | 	 | ||||||
| 	x := string(yamlified) |  | ||||||
| 	_ = x |  | ||||||
| 	return new_service | 	return new_service | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -90,7 +99,7 @@ func completeServicePorts(service *models.Service, processing graph.GraphItem) { | |||||||
| 			return | 			return | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		 | 		// This condition allows us to create NodePort if PAT is filled or a ClusterIP that only expose port 80 and 443 (for Ingress) | ||||||
| 		if _, ok := translation_dict["PAT"]; ok{ | 		if _, ok := translation_dict["PAT"]; ok{ | ||||||
| 			port_translation, err := strconv.ParseInt(translation_dict["PAT"], 10, 64) | 			port_translation, err := strconv.ParseInt(translation_dict["PAT"], 10, 64) | ||||||
| 			if err != nil { | 			if err != nil { | ||||||
| @@ -98,8 +107,6 @@ func completeServicePorts(service *models.Service, processing graph.GraphItem) { | |||||||
| 				return | 				return | ||||||
| 			} | 			} | ||||||
| 			 | 			 | ||||||
|  |  | ||||||
|  |  | ||||||
| 			new_port_translation := models.ServicePort{ | 			new_port_translation := models.ServicePort{ | ||||||
| 				Name: strings.ToLower(processing.Processing.Name) + processing.ID, | 				Name: strings.ToLower(processing.Processing.Name) + processing.ID, | ||||||
| 				Port:  port_translation-30000, | 				Port:  port_translation-30000, | ||||||
| @@ -108,32 +115,52 @@ func completeServicePorts(service *models.Service, processing graph.GraphItem) { | |||||||
| 				Protocol: "TCP", | 				Protocol: "TCP", | ||||||
| 			} | 			} | ||||||
| 			service.Spec.Ports = append(service.Spec.Ports, new_port_translation) | 			service.Spec.Ports = append(service.Spec.Ports, new_port_translation) | ||||||
| 		}  | 		} else { | ||||||
|  | 			port_spec := []models.ServicePort{ | ||||||
|  | 				models.ServicePort{ | ||||||
|  | 					Name: strings.ToLower(processing.Processing.Name) + processing.ID + "-80", | ||||||
|  | 					Port:  80, | ||||||
|  | 					TargetPort: 80, | ||||||
|  | 					Protocol: "TCP", | ||||||
|  | 			}, | ||||||
|  | 			models.ServicePort{ | ||||||
|  | 				Name: strings.ToLower(processing.Processing.Name) + processing.ID + "-443", | ||||||
|  | 				Port:  443, | ||||||
|  | 				TargetPort: 443, | ||||||
|  | 				Protocol: "TCP", | ||||||
|  | 			}, | ||||||
| 		 | 		 | ||||||
| 		} | 		} | ||||||
|  | 		service.Spec.Ports = append(service.Spec.Ports,port_spec...) | ||||||
|  |  | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
| 	return | 	return | ||||||
|  |  | ||||||
| } | } | ||||||
|  |  | ||||||
| func (b *ArgoBuilder) createService(service models.Service, processing_name string, processing_id string) { | // The k8s service passed as the parameter only expose one port because it is the result of CreateService() | ||||||
| 	if b.Services != nil{ | // we check if this port is already exposed by a service in the workflow and we proceed to the creation of a new service OR  | ||||||
| 		b.Services.Spec.Ports = append(b.Services.Spec.Ports, service.Spec.Ports...) | // add the port to the list of port exposed by an existing if portAlreadyExposed is false  | ||||||
| 		}else { | func (b *ArgoBuilder) addKubeServiceToWorkflow(service models.Service, processing_name string, processing_id string) (label string) { | ||||||
| 			b.Services = &service | 	if exposed, service_available_port := b.portAlreadyExposed(service.Spec.Ports[0].TargetPort); exposed && service_available_port != nil{ | ||||||
|  | 		// The port you want to expose is already exposed by all the existing services | ||||||
|  | 		service_available_port.Spec.Ports = append(service_available_port.Spec.Ports, service.Spec.Ports...) | ||||||
|  | 		return service_available_port.Spec.Selector["app"] | ||||||
| 	}  | 	}  | ||||||
|  |  | ||||||
| 	b.addLabel(processing_name,processing_id) | 	b.Services = append(b.Services, service) | ||||||
|  | 	return service.Spec.Selector["app"]	 | ||||||
| } | } | ||||||
|  |  | ||||||
| func (b *ArgoBuilder) addServiceToArgo() error { | func (b *ArgoBuilder) addServiceToArgo(service models.Service) error { | ||||||
| 	service_manifest, err := yaml.Marshal(b.Services) | 	service_manifest, err := yaml.Marshal(service) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		logger.Error().Msg("Could not marshal service manifest") | 		logger.Error().Msg("Could not marshal service manifest") | ||||||
| 		return err | 		return err | ||||||
| 	} | 	} | ||||||
| 	 | 	 | ||||||
| 	service_template := models.Template{Name: "workflow-service-pod",  | 	service_template := models.Template{Name: "workflow-service-pod-" + strconv.Itoa(len(b.Services)),  | ||||||
| 								Resource: models.ServiceResource{ | 								Resource: models.ServiceResource{ | ||||||
| 									Action: "create",  | 									Action: "create",  | ||||||
| 									SuccessCondition: "status.succeeded > 0", | 									SuccessCondition: "status.succeeded > 0", | ||||||
| @@ -146,3 +173,43 @@ func (b *ArgoBuilder) addServiceToArgo() error { | |||||||
|  |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  |  | ||||||
|  | func (b *ArgoBuilder) addLabel(name string, id string) { | ||||||
|  | 	argo_name := getArgoName(name,id) | ||||||
|  | 	for _, template := range b.Workflow.Spec.Templates{ | ||||||
|  | 		if template.Name == argo_name{ | ||||||
|  | 			template.Metadata.Labels["app"] = "service-workflow"			 | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (b *ArgoBuilder) portAlreadyExposed(port int64) (exposed bool, service *models.Service ){ | ||||||
|  | 	// For all already existing k8s services, test if the port in parameter is already exposed and returns the first service that doesn't yet expose this port  | ||||||
|  | 	for _, s := range b.Services { | ||||||
|  | 		i := 0 | ||||||
|  | 		port_exposed := false | ||||||
|  | 		for !port_exposed { | ||||||
|  | 			if s.Spec.Ports[i].TargetPort == port { | ||||||
|  | 				port_exposed = true | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 		if !port_exposed { | ||||||
|  | 			return false, &s | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return true, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // If contract has a reverse value | ||||||
|  | 	// Test if service already exist for the argo service | ||||||
|  | 		// Yes : create ingress, associate it with the existing kube service (name + exposed port on service)  | ||||||
|  | 		// No :  | ||||||
|  | 			// - create template for a service that expose the port of the argo service (pod) | ||||||
|  | 			// - store the name of the service and its port exposed | ||||||
|  | 			// - create template for an ingress : manifest must contain the path, name of the service and port exposed on the service | ||||||
|  | func (b *ArgoBuilder) addIngress(){ | ||||||
|  |  | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user