Compare commits
	
		
			5 Commits
		
	
	
		
			ade18f1042
			...
			services_d
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 2f9b2fb464 | |||
| d1dab992cf | |||
| 52278609bd | |||
| ed48fc0a55 | |||
| 03ad8c084d | 
							
								
								
									
										18
									
								
								.vscode/launch.json
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										18
									
								
								.vscode/launch.json
									
									
									
									
										vendored
									
									
										Normal file
									
								
							@@ -0,0 +1,18 @@
 | 
			
		||||
{
 | 
			
		||||
    // Use IntelliSense to learn about possible attributes.
 | 
			
		||||
    // Hover to view descriptions of existing attributes.
 | 
			
		||||
    // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
 | 
			
		||||
    "version": "0.2.0",
 | 
			
		||||
    "configurations": [
 | 
			
		||||
        {
 | 
			
		||||
            "name": "Launch Package",
 | 
			
		||||
            "type": "go",
 | 
			
		||||
            "request": "launch",
 | 
			
		||||
            "mode": "auto",
 | 
			
		||||
            "program": "${fileDirname}",
 | 
			
		||||
            "args": ["-e","e3b7772e-dc9f-4bc1-9f5d-533e23e6cd57" ,"-u","http://127.0.0.1:3100","-m" ,"mongodb://127.0.0.1:27017","-d","DC_myDC"],
 | 
			
		||||
            "env": {"OCMONITOR_LOKIURL":"http://127.0.0.1:3100","OCMONITOR_WORKFLOW":"8d0f1814-b5ca-436c-ba3f-116d99198fd2","KUBERNETES_SERVICE_HOST":"","OCMONITOR_MONGOURL":"mongodb://127.0.0.1:27017","OCMONITOR_DATABASE":"DC_myDC","test_service":"true"}
 | 
			
		||||
        }
 | 
			
		||||
    ]
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										70
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										70
									
								
								README.md
									
									
									
									
									
								
							@@ -47,6 +47,76 @@ In rules add a new entry :
 | 
			
		||||
 | 
			
		||||
This command **must return "yes"**
 | 
			
		||||
 | 
			
		||||
## Allow services to be joined with reverse proxy
 | 
			
		||||
 | 
			
		||||
Since the development has been realised in a K3S environment, we will use the lightweight solution provided by **traefik**. 
 | 
			
		||||
 | 
			
		||||
We need to install **metallb** to expose our cluster to the exterior and allow packets to reach traefik.
 | 
			
		||||
 | 
			
		||||
### Deploy traefik and metallb
 | 
			
		||||
 | 
			
		||||
- Make sure that helm is installed, else visit : https://helm.sh/docs/intro/install/ 
 | 
			
		||||
 | 
			
		||||
- Add the repositories for traefik and metallb
 | 
			
		||||
> helm repo add metallb https://metallb.github.io/metallb 
 | 
			
		||||
> helm repo add traefik https://helm.traefik.io/traefik
 | 
			
		||||
 | 
			
		||||
>helm repo update
 | 
			
		||||
 | 
			
		||||
- Create the namespaces for each
 | 
			
		||||
> kubectl create ns traefik-ingress
 | 
			
		||||
> kubectl create ns metallb-system 
 | 
			
		||||
 | 
			
		||||
- Configure the deployment 
 | 
			
		||||
 | 
			
		||||
```
 | 
			
		||||
cat > traefik-values.yaml <<EOF
 | 
			
		||||
globalArguments:
 | 
			
		||||
deployment:
 | 
			
		||||
  kind: DaemonSet
 | 
			
		||||
providers:
 | 
			
		||||
  kubernetesCRD:
 | 
			
		||||
    enabled: true
 | 
			
		||||
service:
 | 
			
		||||
  type: LoadBalancer
 | 
			
		||||
ingressRoute:
 | 
			
		||||
  dashboard:
 | 
			
		||||
    enabled: false
 | 
			
		||||
EOF
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
- Launch the installs
 | 
			
		||||
> helm upgrade --install metallb metallb/metallb --namespace metallb-system
 | 
			
		||||
 | 
			
		||||
> helm install --namespace=traefik-ingress traefik traefik/traefik --values=./traefik-values.yaml
 | 
			
		||||
 | 
			
		||||
### Configure metallb
 | 
			
		||||
 | 
			
		||||
```
 | 
			
		||||
cat << 'EOF' | kubectl apply -f -
 | 
			
		||||
apiVersion: metallb.io/v1beta1
 | 
			
		||||
kind: IPAddressPool
 | 
			
		||||
metadata:
 | 
			
		||||
  name: default-pool
 | 
			
		||||
  namespace: metallb-system
 | 
			
		||||
spec:
 | 
			
		||||
  addresses:
 | 
			
		||||
  - 192.168.0.200-192.168.0.250
 | 
			
		||||
---
 | 
			
		||||
apiVersion: metallb.io/v1beta1
 | 
			
		||||
kind: L2Advertisement
 | 
			
		||||
metadata:
 | 
			
		||||
  name: default
 | 
			
		||||
  namespace: metallb-system
 | 
			
		||||
spec:
 | 
			
		||||
  ipAddressPools:
 | 
			
		||||
  - default-pool
 | 
			
		||||
EOF
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
- Check that the services created in traefik-ingress have an external IP
 | 
			
		||||
 | 
			
		||||
> kubectl get service -n traefik-ingress -o wide
 | 
			
		||||
 | 
			
		||||
## TODO
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										30
									
								
								demo_nginx/nginx.conf
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										30
									
								
								demo_nginx/nginx.conf
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,30 @@
 | 
			
		||||
# nginx.conf
 | 
			
		||||
user  nginx;
 | 
			
		||||
worker_processes  auto;
 | 
			
		||||
error_log  /var/log/nginx/error.log warn;
 | 
			
		||||
pid        /var/run/nginx.pid;
 | 
			
		||||
events {
 | 
			
		||||
  worker_connections  1024;
 | 
			
		||||
}
 | 
			
		||||
http {
 | 
			
		||||
  include       /etc/nginx/mime.types;
 | 
			
		||||
  default_type  application/octet-stream;
 | 
			
		||||
  log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
 | 
			
		||||
  '$status $body_bytes_sent "$http_referer" '
 | 
			
		||||
  '"$http_user_agent" "$http_x_forwarded_for"';
 | 
			
		||||
  access_log  /var/log/nginx/access.log  main;
 | 
			
		||||
  sendfile        on;
 | 
			
		||||
  #tcp_nopush     on;
 | 
			
		||||
  
 | 
			
		||||
  keepalive_timeout  65;
 | 
			
		||||
  #gzip  on;
 | 
			
		||||
  #include /etc/nginx/conf.d/*.conf;
 | 
			
		||||
server {
 | 
			
		||||
  listen 80;
 | 
			
		||||
  location / {
 | 
			
		||||
    root   /usr/share/nginx/html;
 | 
			
		||||
    index  index.html index.htm;
 | 
			
		||||
    try_files $uri $uri/ /index.html;
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										2
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								main.go
									
									
									
									
									
								
							@@ -46,8 +46,6 @@ const localConfigFile = "./conf/local_ocmonitord_conf.json"
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
 | 
			
		||||
	os.Setenv("test_service","true") 	// Only for service demo, delete before merging on main
 | 
			
		||||
 | 
			
		||||
	monitorLocal = false
 | 
			
		||||
	// Test if monitor is launched outside (with parameters) or in a k8s environment (env variables sets)
 | 
			
		||||
	if os.Getenv("KUBERNETES_SERVICE_HOST") == "" {
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										101
									
								
								models/ingress.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										101
									
								
								models/ingress.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,101 @@
 | 
			
		||||
package models
 | 
			
		||||
 | 
			
		||||
import "strconv"
 | 
			
		||||
 | 
			
		||||
// apiVersion: networking.k8s.io/v1
 | 
			
		||||
// kind: Ingress
 | 
			
		||||
// metadata:
 | 
			
		||||
//   name: example-ingress
 | 
			
		||||
//   namespace: argo
 | 
			
		||||
//   annotations:
 | 
			
		||||
//     traefik.ingress.kubernetes.io/router.entrypoints: web  # Utilisation de l'entrypoint HTTP standard
 | 
			
		||||
// spec:
 | 
			
		||||
//   rules:
 | 
			
		||||
//   - http:
 | 
			
		||||
//       paths:
 | 
			
		||||
//       - path: /dtf
 | 
			
		||||
//         pathType: Prefix
 | 
			
		||||
//         backend:
 | 
			
		||||
//           service:
 | 
			
		||||
//             name: workflow-service-qtjk2
 | 
			
		||||
//             port:
 | 
			
		||||
//               number: 80
 | 
			
		||||
 | 
			
		||||
var ingress_manifest = &Manifest{
 | 
			
		||||
	ApiVersion: "networking.k8s.io/v1",
 | 
			
		||||
	Kind: "Ingress",
 | 
			
		||||
	Metadata: Metadata{
 | 
			
		||||
		GenerateName: "ingress-argo-",
 | 
			
		||||
	},
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Ingress struct {
 | 
			
		||||
	ApiVersion string      `yaml:"apiVersion,omitempty"`
 | 
			
		||||
	Kind       string      `yaml:"kind,omitempty"`
 | 
			
		||||
	Metadata   Metadata    `yaml:"metadata,omitempty"`
 | 
			
		||||
	Spec       IngressSpec `yaml:"spec,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
type IngressSpec struct {
 | 
			
		||||
	Rules []Rule `yaml:"rules,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Rule struct {
 | 
			
		||||
	HTTP HTTP `yaml:"http,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type HTTP struct {
 | 
			
		||||
	Paths []Path `yaml:"paths,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Path struct {
 | 
			
		||||
	Path     string  `yaml:"path,omitempty"`
 | 
			
		||||
	PathType string  `yaml:"pathType,omitempty"`
 | 
			
		||||
	Backend  Backend `yaml:"backend,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Backend struct {
 | 
			
		||||
	ServiceName string `yaml:"serviceName,omitempty"`
 | 
			
		||||
	ServicePort int64  `yaml:"servicePort,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewIngress(contract map[string]map[string]string, serviceName string) Ingress {
 | 
			
		||||
	new_ingr := Ingress{
 | 
			
		||||
		ApiVersion: "networking.k8s.io/v1",
 | 
			
		||||
		Kind:       "Ingress",
 | 
			
		||||
		Metadata: Metadata{
 | 
			
		||||
			GenerateName: "ingress-argo-",
 | 
			
		||||
		},
 | 
			
		||||
		Spec: IngressSpec{
 | 
			
		||||
			Rules: []Rule{
 | 
			
		||||
				{
 | 
			
		||||
					HTTP: HTTP{
 | 
			
		||||
						Paths: []Path{},
 | 
			
		||||
						},
 | 
			
		||||
					},
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
	for port_to_reverse_str, translations := range contract{
 | 
			
		||||
		
 | 
			
		||||
		port, _ := strconv.ParseInt(port_to_reverse_str,10,64)
 | 
			
		||||
 | 
			
		||||
		port_reverse := Path{
 | 
			
		||||
			Path:     translations["reverse"],
 | 
			
		||||
			PathType: "Prefix",
 | 
			
		||||
			Backend: Backend{
 | 
			
		||||
				ServiceName: serviceName,
 | 
			
		||||
				ServicePort:port,
 | 
			
		||||
			},
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		new_ingr.Spec.Rules[0].HTTP.Paths = append(new_ingr.Spec.Rules[0].HTTP.Paths, port_reverse)
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return new_ingr
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										12
									
								
								models/k8s_manifest.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										12
									
								
								models/k8s_manifest.go
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,12 @@
 | 
			
		||||
package models
 | 
			
		||||
 | 
			
		||||
type Manifest struct {
 | 
			
		||||
	ApiVersion string   `yaml:"apiVersion,omitempty"`
 | 
			
		||||
	Kind       string   `yaml:"kind"`
 | 
			
		||||
	Metadata   Metadata `yaml:"metadata,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Metadata struct {
 | 
			
		||||
    Name      		string	`yaml:"name"`
 | 
			
		||||
    GenerateName	string	`yaml:"generateName"`
 | 
			
		||||
}
 | 
			
		||||
@@ -10,16 +10,11 @@ type ServiceResource struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Service struct {
 | 
			
		||||
    APIVersion string            `yaml:"apiVersion"`
 | 
			
		||||
    Kind       string            `yaml:"kind"`
 | 
			
		||||
    Metadata   Metadata          `yaml:"metadata"`
 | 
			
		||||
    Manifest
 | 
			
		||||
    Spec       ServiceSpec       `yaml:"spec"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Metadata struct {
 | 
			
		||||
    Name      string            `yaml:"name"`
 | 
			
		||||
    
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
// ServiceSpec is the specification of the Kubernetes Service
 | 
			
		||||
type ServiceSpec struct {
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										11
									
								
								traefik-values.yaml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										11
									
								
								traefik-values.yaml
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,11 @@
 | 
			
		||||
globalArguments:
 | 
			
		||||
deployment:
 | 
			
		||||
  kind: DaemonSet
 | 
			
		||||
providers:
 | 
			
		||||
  kubernetesCRD:
 | 
			
		||||
    enabled: true
 | 
			
		||||
service:
 | 
			
		||||
  type: LoadBalancer
 | 
			
		||||
ingressRoute:
 | 
			
		||||
  dashboard:
 | 
			
		||||
    enabled: false
 | 
			
		||||
@@ -5,14 +5,17 @@
 | 
			
		||||
package workflow_builder
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"oc-monitord/models"
 | 
			
		||||
	. "oc-monitord/models"
 | 
			
		||||
	"os"
 | 
			
		||||
	"slices"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	oclib "cloud.o-forge.io/core/oc-lib"
 | 
			
		||||
	"cloud.o-forge.io/core/oc-lib/models/resource_model"
 | 
			
		||||
	"cloud.o-forge.io/core/oc-lib/models/resources/processing"
 | 
			
		||||
	"cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph"
 | 
			
		||||
	w "cloud.o-forge.io/core/oc-lib/models/workflow"
 | 
			
		||||
	"github.com/nwtgck/go-fakelish"
 | 
			
		||||
@@ -22,23 +25,26 @@ import (
 | 
			
		||||
 | 
			
		||||
var logger zerolog.Logger
 | 
			
		||||
 | 
			
		||||
type ServiceExposure int 
 | 
			
		||||
const (
 | 
			
		||||
	PAT 	ServiceExposure = iota
 | 
			
		||||
	Reverse
 | 
			
		||||
	Both
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type ArgoBuilder struct {
 | 
			
		||||
	OriginWorkflow 	w.Workflow
 | 
			
		||||
	Workflow       	Workflow
 | 
			
		||||
	Services		*Service
 | 
			
		||||
	Services		[]Service
 | 
			
		||||
	Timeout        	int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Workflow struct {
 | 
			
		||||
	ApiVersion string `yaml:"apiVersion"`
 | 
			
		||||
	Kind       string `yaml:"kind"`
 | 
			
		||||
	Metadata   struct {
 | 
			
		||||
		Name string `yaml:"name"`
 | 
			
		||||
	} `yaml:"metadata"`
 | 
			
		||||
	Spec Spec `yaml:"spec,omitempty"`
 | 
			
		||||
	Manifest
 | 
			
		||||
	Spec ArgoSpec `yaml:"spec,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Spec struct {
 | 
			
		||||
type ArgoSpec struct {
 | 
			
		||||
	Entrypoint string                `yaml:"entrypoint"`
 | 
			
		||||
	Arguments  []Parameter           `yaml:"arguments,omitempty"`
 | 
			
		||||
	Volumes    []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"`
 | 
			
		||||
@@ -46,13 +52,12 @@ type Spec struct {
 | 
			
		||||
	Timeout    int                   `yaml:"activeDeadlineSeconds,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
func (b *ArgoBuilder) CreateDAG() (string, error) {
 | 
			
		||||
	
 | 
			
		||||
	// handle services by checking if there is only one processing with hostname and port
 | 
			
		||||
	
 | 
			
		||||
	b.createNginxVolumes()
 | 
			
		||||
	
 | 
			
		||||
	
 | 
			
		||||
		
 | 
			
		||||
	b.createTemplates()
 | 
			
		||||
	b.createDAGstep()
 | 
			
		||||
	b.createVolumes()
 | 
			
		||||
@@ -62,7 +67,7 @@ func (b *ArgoBuilder) CreateDAG() (string, error) {
 | 
			
		||||
		b.Workflow.Spec.Timeout = b.Timeout
 | 
			
		||||
	}
 | 
			
		||||
	b.Workflow.Spec.Entrypoint = "dag"
 | 
			
		||||
	b.Workflow.ApiVersion = "argoproj.io/v1alpha1"
 | 
			
		||||
	b.Workflow.Manifest.ApiVersion = "argoproj.io/v1alpha1"
 | 
			
		||||
	b.Workflow.Kind = "Workflow"
 | 
			
		||||
	random_name := generateWfName()
 | 
			
		||||
	b.Workflow.Metadata.Name = "oc-monitor-" + random_name
 | 
			
		||||
@@ -91,6 +96,7 @@ func (b *ArgoBuilder) createTemplates() {
 | 
			
		||||
		var command string
 | 
			
		||||
		var args string
 | 
			
		||||
		var env string
 | 
			
		||||
		var serv models.Service
 | 
			
		||||
 | 
			
		||||
		comp_res := comp.Processing
 | 
			
		||||
 | 
			
		||||
@@ -114,24 +120,39 @@ func (b *ArgoBuilder) createTemplates() {
 | 
			
		||||
		new_temp := Template{Name: argo_name, Container: temp_container}
 | 
			
		||||
		new_temp.Inputs.Parameters = inputs_container
 | 
			
		||||
		new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "workdir", MountPath: "/mnt/vol"}) // TODO : replace this with a search of the storage / data source name
 | 
			
		||||
		new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "nginx-demo", MountPath: "/usr/share/nginx"}) // Used for processing services' demo with nginx 
 | 
			
		||||
		
 | 
			
		||||
		if (b.isService(comp.ID)){
 | 
			
		||||
			serv := b.CreateService(comp)
 | 
			
		||||
			b.createService(serv, argo_name, comp.ID)
 | 
			
		||||
			new_temp.Metadata.Labels = make(map[string]string)
 | 
			
		||||
			new_temp.Metadata.Labels["app"] = "oc-service"		// Construct the template for the k8s service and add a link in graph between k8s service and processing
 | 
			
		||||
 | 
			
		||||
			serv_type := getServiceExposure(*comp.Processing)
 | 
			
		||||
			if serv_type == PAT || serv_type == Both{
 | 
			
		||||
				serv = b.CreateKubeService(comp, NodePort)
 | 
			
		||||
				b.addKubeServiceToWorkflow(serv, argo_name, comp.ID)
 | 
			
		||||
				new_temp.Metadata.Labels = make(map[string]string)
 | 
			
		||||
				new_temp.Metadata.Labels["app"] = serv.Spec.Selector["app"]		// Construct the template for the k8s service and add a link in graph between k8s service and processing
 | 
			
		||||
				b.addServiceToArgo(serv)
 | 
			
		||||
				ingress := b.CreateIngress(comp,serv)
 | 
			
		||||
				b.addIngressToWorfklow(ingress, argo_name, comp.ID)
 | 
			
		||||
 | 
			
		||||
			}
 | 
			
		||||
			if serv_type == Reverse || serv_type == Both{
 | 
			
		||||
				serv = b.CreateKubeService(comp, ClusterIP)
 | 
			
		||||
				// create ingress by passing the service and the processing (or reverse)
 | 
			
		||||
				b.addKubeServiceToWorkflow(serv, argo_name, comp.ID)
 | 
			
		||||
				new_temp.Metadata.Labels = make(map[string]string)
 | 
			
		||||
				new_temp.Metadata.Labels["app"] = serv.Spec.Selector["app"]		// Construct the template for the k8s service and add a link in graph between k8s service and processing
 | 
			
		||||
				b.addServiceToArgo(serv)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			
 | 
			
		||||
			// if err != nil {
 | 
			
		||||
			// 	// TODO
 | 
			
		||||
			// }
 | 
			
		||||
		}
 | 
			
		||||
		
 | 
			
		||||
		b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, new_temp)
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if b.Services != nil {
 | 
			
		||||
		b.addServiceToArgo()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -153,8 +174,9 @@ func (b *ArgoBuilder) createDAGstep() {
 | 
			
		||||
		new_dag.Tasks = append(new_dag.Tasks, step)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if b.Services != nil {
 | 
			
		||||
		new_dag.Tasks = append(new_dag.Tasks, Task{Name:"workflow-service-pod", Template: "workflow-service-pod"})
 | 
			
		||||
	for i, _ := range b.Services {
 | 
			
		||||
		name := "workflow-service-pod-"+strconv.Itoa(i + 1)
 | 
			
		||||
		new_dag.Tasks = append(new_dag.Tasks, Task{Name: name , Template: name})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, Template{Name: "dag", Dag: new_dag})
 | 
			
		||||
@@ -170,15 +192,6 @@ func (b *ArgoBuilder) createVolumes() {
 | 
			
		||||
	b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// For demo purposes, until we implement the use of storage ressources
 | 
			
		||||
func (b *ArgoBuilder) createNginxVolumes() {
 | 
			
		||||
	new_volume := VolumeClaimTemplate{}
 | 
			
		||||
	new_volume.Metadata.Name = "nginx-demo"
 | 
			
		||||
	new_volume.Spec.AccessModes = []string{"ReadWriteOnce"}
 | 
			
		||||
	new_volume.Spec.Resources.Requests.Storage = "1Gi"
 | 
			
		||||
 | 
			
		||||
	b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
func (b *ArgoBuilder) getDependency(current_computing_id string) (dependencies []string) {
 | 
			
		||||
@@ -319,14 +332,29 @@ func (b *ArgoBuilder) isService(id string) bool{
 | 
			
		||||
	return is_exposed
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getServiceExposure(service processing.ProcessingResource) ServiceExposure{
 | 
			
		||||
	var exposure_type ServiceExposure
 | 
			
		||||
 | 
			
		||||
func (b *ArgoBuilder) addLabel(name string, id string) {
 | 
			
		||||
	argo_name := getArgoName(name,id)
 | 
			
		||||
	for _, template := range b.Workflow.Spec.Templates{
 | 
			
		||||
		if template.Name == argo_name{
 | 
			
		||||
			template.Metadata.Labels["app"] = "service-workflow"			
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	contract := getExposeContract(service.ResourceModel.Model["expose"])
 | 
			
		||||
	_, pat := contract["PAT"]
 | 
			
		||||
	_, reverse := contract["reverse"]
 | 
			
		||||
	
 | 
			
		||||
	if pat && reverse {
 | 
			
		||||
		exposure_type=  Both
 | 
			
		||||
	}
 | 
			
		||||
	if pat {
 | 
			
		||||
		exposure_type = PAT
 | 
			
		||||
	}
 | 
			
		||||
	if reverse{
 | 
			
		||||
		exposure_type = Reverse 
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return exposure_type
 | 
			
		||||
	
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *ArgoBuilder) CreateIngress(processing processing.ProcessingResource, service Service) Ingress{
 | 
			
		||||
	contract := getExposeContract(processing.ResourceModel.Model["expose"])
 | 
			
		||||
	new_ingress := models.NewIngress(contract,service.Metadata.Name)
 | 
			
		||||
	return new_ingress
 | 
			
		||||
}
 | 
			
		||||
@@ -7,11 +7,19 @@ import (
 | 
			
		||||
 | 
			
		||||
	"cloud.o-forge.io/core/oc-lib/models/resource_model"
 | 
			
		||||
	"cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph"
 | 
			
		||||
	"github.com/nwtgck/go-fakelish"
 | 
			
		||||
	"go.mongodb.org/mongo-driver/bson"
 | 
			
		||||
	"go.mongodb.org/mongo-driver/bson/primitive"
 | 
			
		||||
	"gopkg.in/yaml.v3"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type ServiceType string 
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	NodePort	ServiceType = "NodePort"
 | 
			
		||||
	ClusterIP	ServiceType = "ClusterIP"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// TODO : refactor this method or the deserialization process in oc-lib to get rid of the mongo code
 | 
			
		||||
func getExposeContract(expose resource_model.Model) map[string]map[string]string {
 | 
			
		||||
	contract := make(map[string]map[string]string,0)
 | 
			
		||||
@@ -40,7 +48,7 @@ func getExposeContract(expose resource_model.Model) map[string]map[string]string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
func (b *ArgoBuilder) CreateService(processing graph.GraphItem) models.Service{
 | 
			
		||||
func (b *ArgoBuilder) CreateKubeService(processing graph.GraphItem, service_type ServiceType) models.Service{
 | 
			
		||||
	
 | 
			
		||||
	// model {
 | 
			
		||||
	// 	Type : "dict",
 | 
			
		||||
@@ -57,23 +65,24 @@ func (b *ArgoBuilder) CreateService(processing graph.GraphItem) models.Service{
 | 
			
		||||
	// }
 | 
			
		||||
	
 | 
			
		||||
 | 
			
		||||
	new_service := models.Service{APIVersion: "v1", 
 | 
			
		||||
							Kind: "Service", 
 | 
			
		||||
							Metadata: models.Metadata{
 | 
			
		||||
								Name: "workflow-service" ,
 | 
			
		||||
							}, 
 | 
			
		||||
							Spec: models.ServiceSpec{
 | 
			
		||||
								Selector: map[string]string{"app": "oc-service"},
 | 
			
		||||
								Ports: []models.ServicePort{
 | 
			
		||||
								},
 | 
			
		||||
								Type: "NodePort",
 | 
			
		||||
							},
 | 
			
		||||
					}
 | 
			
		||||
	
 | 
			
		||||
	new_service := models.Service{
 | 
			
		||||
		Manifest: models.Manifest{
 | 
			
		||||
			ApiVersion: "v1", 
 | 
			
		||||
			Kind: "Service", 
 | 
			
		||||
			Metadata: models.Metadata{
 | 
			
		||||
				Name: "workflow-service-"+ processing.Processing.Name + "-" + processing.ID ,
 | 
			
		||||
				},
 | 
			
		||||
			},
 | 
			
		||||
			Spec: models.ServiceSpec{
 | 
			
		||||
				Selector: map[string]string{"app": "service-" + fakelish.GenerateFakeWord(5, 8)},
 | 
			
		||||
				Ports: []models.ServicePort{
 | 
			
		||||
				},
 | 
			
		||||
				Type: string(service_type),
 | 
			
		||||
			},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	completeServicePorts(&new_service, processing)
 | 
			
		||||
	yamlified, _ := yaml.Marshal(new_service)
 | 
			
		||||
	x := string(yamlified)
 | 
			
		||||
	_ = x
 | 
			
		||||
	
 | 
			
		||||
	return new_service
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -90,7 +99,7 @@ func completeServicePorts(service *models.Service, processing graph.GraphItem) {
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		
 | 
			
		||||
		// This condition allows us to create NodePort if PAT is filled or a ClusterIP that only expose port 80 and 443 (for Ingress)
 | 
			
		||||
		if _, ok := translation_dict["PAT"]; ok{
 | 
			
		||||
			port_translation, err := strconv.ParseInt(translation_dict["PAT"], 10, 64)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
@@ -98,8 +107,6 @@ func completeServicePorts(service *models.Service, processing graph.GraphItem) {
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
			new_port_translation := models.ServicePort{
 | 
			
		||||
				Name: strings.ToLower(processing.Processing.Name) + processing.ID,
 | 
			
		||||
				Port:  port_translation-30000,
 | 
			
		||||
@@ -108,32 +115,52 @@ func completeServicePorts(service *models.Service, processing graph.GraphItem) {
 | 
			
		||||
				Protocol: "TCP",
 | 
			
		||||
			}
 | 
			
		||||
			service.Spec.Ports = append(service.Spec.Ports, new_port_translation)
 | 
			
		||||
		} 
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *ArgoBuilder) createService(service models.Service, processing_name string, processing_id string) {
 | 
			
		||||
	if b.Services != nil{
 | 
			
		||||
		b.Services.Spec.Ports = append(b.Services.Spec.Ports, service.Spec.Ports...)
 | 
			
		||||
		}else {
 | 
			
		||||
			b.Services = &service
 | 
			
		||||
		}
 | 
			
		||||
		} else {
 | 
			
		||||
			port_spec := []models.ServicePort{
 | 
			
		||||
				models.ServicePort{
 | 
			
		||||
					Name: strings.ToLower(processing.Processing.Name) + processing.ID + "-80",
 | 
			
		||||
					Port:  80,
 | 
			
		||||
					TargetPort: 80,
 | 
			
		||||
					Protocol: "TCP",
 | 
			
		||||
			},
 | 
			
		||||
			models.ServicePort{
 | 
			
		||||
				Name: strings.ToLower(processing.Processing.Name) + processing.ID + "-443",
 | 
			
		||||
				Port:  443,
 | 
			
		||||
				TargetPort: 443,
 | 
			
		||||
				Protocol: "TCP",
 | 
			
		||||
			},
 | 
			
		||||
		
 | 
			
		||||
	b.addLabel(processing_name,processing_id)
 | 
			
		||||
		}
 | 
			
		||||
		service.Spec.Ports = append(service.Spec.Ports,port_spec...)
 | 
			
		||||
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *ArgoBuilder) addServiceToArgo() error {
 | 
			
		||||
	service_manifest, err := yaml.Marshal(b.Services)
 | 
			
		||||
// The k8s service passed as the parameter only expose one port because it is the result of CreateService()
 | 
			
		||||
// we check if this port is already exposed by a service in the workflow and we proceed to the creation of a new service OR 
 | 
			
		||||
// add the port to the list of port exposed by an existing if portAlreadyExposed is false 
 | 
			
		||||
func (b *ArgoBuilder) addKubeServiceToWorkflow(service models.Service, processing_name string, processing_id string) (label string) {
 | 
			
		||||
	if exposed, service_available_port := b.portAlreadyExposed(service.Spec.Ports[0].TargetPort); exposed && service_available_port != nil{
 | 
			
		||||
		// The port you want to expose is already exposed by all the existing services
 | 
			
		||||
		service_available_port.Spec.Ports = append(service_available_port.Spec.Ports, service.Spec.Ports...)
 | 
			
		||||
		return service_available_port.Spec.Selector["app"]
 | 
			
		||||
	} 
 | 
			
		||||
 | 
			
		||||
	b.Services = append(b.Services, service)
 | 
			
		||||
	return service.Spec.Selector["app"]	
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *ArgoBuilder) addServiceToArgo(service models.Service) error {
 | 
			
		||||
	service_manifest, err := yaml.Marshal(service)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		logger.Error().Msg("Could not marshal service manifest")
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	
 | 
			
		||||
	service_template := models.Template{Name: "workflow-service-pod", 
 | 
			
		||||
	service_template := models.Template{Name: "workflow-service-pod-" + strconv.Itoa(len(b.Services)), 
 | 
			
		||||
								Resource: models.ServiceResource{
 | 
			
		||||
									Action: "create", 
 | 
			
		||||
									SuccessCondition: "status.succeeded > 0",
 | 
			
		||||
@@ -145,4 +172,44 @@ func (b *ArgoBuilder) addServiceToArgo() error {
 | 
			
		||||
	b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, service_template)
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
func (b *ArgoBuilder) addLabel(name string, id string) {
 | 
			
		||||
	argo_name := getArgoName(name,id)
 | 
			
		||||
	for _, template := range b.Workflow.Spec.Templates{
 | 
			
		||||
		if template.Name == argo_name{
 | 
			
		||||
			template.Metadata.Labels["app"] = "service-workflow"			
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *ArgoBuilder) portAlreadyExposed(port int64) (exposed bool, service *models.Service ){
 | 
			
		||||
	// For all already existing k8s services, test if the port in parameter is already exposed and returns the first service that doesn't yet expose this port 
 | 
			
		||||
	for _, s := range b.Services {
 | 
			
		||||
		i := 0
 | 
			
		||||
		port_exposed := false
 | 
			
		||||
		for !port_exposed {
 | 
			
		||||
			if s.Spec.Ports[i].TargetPort == port {
 | 
			
		||||
				port_exposed = true
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		if !port_exposed {
 | 
			
		||||
			return false, &s
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return true, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// If contract has a reverse value
 | 
			
		||||
	// Test if service already exist for the argo service
 | 
			
		||||
		// Yes : create ingress, associate it with the existing kube service (name + exposed port on service) 
 | 
			
		||||
		// No : 
 | 
			
		||||
			// - create template for a service that expose the port of the argo service (pod)
 | 
			
		||||
			// - store the name of the service and its port exposed
 | 
			
		||||
			// - create template for an ingress : manifest must contain the path, name of the service and port exposed on the service
 | 
			
		||||
func (b *ArgoBuilder) addIngress(){
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user