Compare commits
	
		
			32 Commits
		
	
	
		
			feature/ad
			...
			feature/mu
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 04ab15cb09 | |||
| 2b002152a4 | |||
| 7fa115c5e1 | |||
| 91f421af1e | |||
|  | 24bbe81638 | ||
|  | 7c913bec0e | ||
|  | bdbbd7697a | ||
|  | 6917295fbd | ||
|  | e1b0ad089c | ||
|  | 483f747754 | ||
|  | 03675d09ae | ||
|  | f3e84a4f43 | ||
|  | eae5474552 | ||
|  | bae9cb2011 | ||
|  | 65b8960703 | ||
|  | 90aa19caeb | ||
|  | dcb3e2b7cc | ||
|  | c871d68333 | ||
|  | 6cf5da787a | ||
|  | fa4db92c92 | ||
|  | ee94c1aa42 | ||
|  | c40b18f1d6 | ||
|  | 2932fb2710 | ||
|  | 2343a5329e | ||
|  | 86fa41a376 | ||
|  | 6ec7a670bd | ||
| 6323d4eed4 | |||
| 93f3806b86 | |||
|  | ade18f1042 | ||
| 83d118fb05 | |||
| f7f0c9c2d2 | |||
| aea7cbd41c | 
							
								
								
									
										7
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										7
									
								
								Makefile
									
									
									
									
									
								
							| @@ -10,7 +10,7 @@ clean: | ||||
| 	rm -rf oc-monitord | ||||
|  | ||||
| docker: | ||||
| 	DOCKER_BUILDKIT=1 docker build -t oc/oc-monitord:0.0.1 -f Dockerfile . | ||||
| 	DOCKER_BUILDKIT=1 docker build -t oc/oc-monitord:0.0.1 -f Dockerfile . --build-arg=HOST=$(HOST) | ||||
| 	docker tag oc/oc-monitord:0.0.1 oc/oc-monitord:latest | ||||
| 	docker tag oc/oc-monitord:0.0.1 oc-monitord:latest | ||||
|  | ||||
| @@ -20,6 +20,11 @@ publish-kind: | ||||
| publish-registry: | ||||
| 	@echo "TODO" | ||||
|  | ||||
| docker-deploy: | ||||
| 	docker compose up -d | ||||
|  | ||||
| run-docker: docker publish-kind publish-registry docker-deploy | ||||
|  | ||||
| all: docker publish-kind publish-registry | ||||
|  | ||||
| .PHONY: build run clean docker publish-kind publish-registry | ||||
							
								
								
									
										71
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										71
									
								
								README.md
									
									
									
									
									
								
							| @@ -1,52 +1,26 @@ | ||||
| # oc-monitor | ||||
|  | ||||
| ## Deploy in k8s (dev) | ||||
| DO : | ||||
|   make build | ||||
|  | ||||
| While a registry with all of the OC docker images has not been set-up we can export this image to k3s ctr | ||||
| ## Summary | ||||
|  | ||||
| > docker save oc-monitord:latest | sudo k3s ctr images import - | ||||
| oc-monitord is a daemon which can be run : | ||||
| - as a binary | ||||
| - as a container | ||||
|  | ||||
| Then in the pod manifest for oc-monitord use :  | ||||
|  | ||||
| ``` | ||||
| image: docker.io/library/oc-monitord | ||||
| imagePullPolicy: Never | ||||
| ``` | ||||
|  | ||||
| Not doing so will end up in the pod having a `ErrorImagePull` | ||||
|  | ||||
| ## Allow argo to create services | ||||
|  | ||||
| In order for monitord to expose **open cloud services** on the node, we need to give him permission to create **k8s services**. | ||||
|  | ||||
| For that we can update the RBAC configuration for a role already created by argo :  | ||||
|  | ||||
| ### Manually edit the rbac authorization  | ||||
|  | ||||
| > kubectl edit roles.rbac.authorization.k8s.io -n argo argo-role | ||||
|  | ||||
| In rules add a new entry :  | ||||
|  | ||||
| ``` | ||||
| - apiGroups: | ||||
|   - "" | ||||
|   resources:   | ||||
|   - services | ||||
|     verbs:                                                                                                                                                           | ||||
|     - get | ||||
|     - create | ||||
| ``` | ||||
|  | ||||
| ### Patch the rbac authorization with a one liner  | ||||
|  | ||||
| > kubectl patch role argo-role -n argo --type='json' -p='[{"op": "add", "path": "/rules/-", "value": {"apiGroups": [""], "resources": ["services"], "verbs": ["get","create"]}}]' | ||||
|  | ||||
| ### Check wether the modification is effective  | ||||
|  | ||||
| > kubectl auth can-i create services --as=system:serviceaccount:argo:argo -n argo | ||||
|  | ||||
| This command **must return "yes"** | ||||
| It is used to perform several actions regarding the execution of an Open Cloud workflow : | ||||
| - generating a YAML file that can be interpreted by **Argo Workflow** to create and execute pods in a kubernetes environment | ||||
| - setting up the different resources needed to execute a workflow over several peers/kubernetes nodes with **Admiralty** : token, secrets, targets and sources | ||||
| - creating the workflow and logging the output from  | ||||
|   - Argo watch, which gives informations about the workflow in general (phase, number of steps executed, status...) | ||||
|   - Pods : which are the logs generated by the pods  | ||||
|  | ||||
| To execute, the daemon needs several options :  | ||||
| - **-u** :   | ||||
| - **-m** : | ||||
| - **-d** : | ||||
| - **-e** : | ||||
|  | ||||
| # Notes features/admiralty-docker | ||||
|  | ||||
| @@ -57,14 +31,17 @@ This command **must return "yes"** | ||||
|   - decide that no peer can have "http://localhost" as its url and use an attribute from the peer object or isMyself() from oc-lib if a peer is the current host. | ||||
|  | ||||
|  | ||||
| ## TODO | ||||
| ## TODO | ||||
|  | ||||
| - [ ] Allow the front to known on which IP the service are reachable | ||||
| - [ ] Allow the front to known on which IP the service are reachable | ||||
|   - currently doing it by using `kubectl get nodes -o wide` | ||||
|  | ||||
| - [ ] Implement writing and reading from S3 bucket/MinIO when a data resource is linked to a compute resource. | ||||
|  | ||||
| ### Adding ingress handling to support reverse proxing | ||||
|  | ||||
| ### Adding ingress handling to support reverse proxing | ||||
|  | ||||
| - Test wether ingress-nginx is running or not | ||||
|   - Do something if not found : stop running and send error log OR start installation | ||||
| -  | ||||
| -  | ||||
|  | ||||
|   | ||||
| @@ -9,6 +9,7 @@ type Config struct { | ||||
| 	NatsURL     string | ||||
| 	ExecutionID string | ||||
| 	PeerID      string | ||||
| 	Groups      []string | ||||
| 	Timeout     int | ||||
| 	WorkflowID  string | ||||
| 	Logs        string | ||||
| @@ -18,7 +19,7 @@ type Config struct { | ||||
| 	KubeCA      string | ||||
| 	KubeCert    string | ||||
| 	KubeData    string | ||||
| 	ArgoHost	string 		// when executed in a container will replace addresses with "localhost" in their url | ||||
| 	ArgoHost    string // when executed in a container will replace addresses with "localhost" in their url | ||||
| } | ||||
|  | ||||
| var instance *Config | ||||
|   | ||||
							
								
								
									
										
											BIN
										
									
								
								docs/admiralty_naming_multi_peer.jpg
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								docs/admiralty_naming_multi_peer.jpg
									
									
									
									
									
										Normal file
									
								
							
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 71 KiB | 
							
								
								
									
										
											BIN
										
									
								
								docs/admiralty_setup_schema.jpg
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								docs/admiralty_setup_schema.jpg
									
									
									
									
									
										Normal file
									
								
							
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 91 KiB | 
							
								
								
									
										4
									
								
								env.env
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								env.env
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,4 @@ | ||||
| KUBERNETES_SERVICE_HOST=192.168.1.169 | ||||
| KUBE_CA="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFTVlk3ZHZhNEdYTVdkMy9jMlhLN3JLYjlnWXgyNSthaEE0NmkyNVBkSFAKRktQL2UxSVMyWVF0dzNYZW1TTUQxaStZdzJSaVppNUQrSVZUamNtNHdhcnFvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVWtlUVJpNFJiODduME5yRnZaWjZHClc2SU55NnN3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUlnRXA5ck04WmdNclRZSHYxZjNzOW5DZXZZeWVVa3lZUk4KWjUzazdoaytJS1FDSVFDbk05TnVGKzlTakIzNDFacGZ5ays2NEpWdkpSM3BhcmVaejdMd2lhNm9kdz09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K" | ||||
| KUBE_CERT="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJWUxWNkFPQkdrU1F3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOekl6TVRFeU1ETTJNQjRYRFRJME1EZ3dPREV3TVRNMU5sb1hEVEkxTURndwpPREV3TVRNMU5sb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJGQ2Q1MFdPeWdlQ2syQzcKV2FrOWY4MVAvSkJieVRIajRWOXBsTEo0ck5HeHFtSjJOb2xROFYxdUx5RjBtOTQ2Nkc0RmRDQ2dqaXFVSk92Swp3NVRPNnd5alNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFJkOFI5cXVWK2pjeUVmL0ovT1hQSzMyS09XekFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQTArbThqTDBJVldvUTZ0dnB4cFo4NVlMalF1SmpwdXM0aDdnSXRxS3NmUVVDSUI2M2ZNdzFBMm5OVWU1TgpIUGZOcEQwSEtwcVN0Wnk4djIyVzliYlJUNklZCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFRc3hXWk9pbnIrcVp4TmFEQjVGMGsvTDF5cE01VHAxOFRaeU92ektJazQKRTFsZWVqUm9STW0zNmhPeVljbnN3d3JoNnhSUnBpMW5RdGhyMzg0S0Z6MlBvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBYZkVmYXJsZm8zTWhIL3lmemx6Cnl0OWlqbHN3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUxJL2dNYnNMT3MvUUpJa3U2WHVpRVMwTEE2cEJHMXgKcnBlTnpGdlZOekZsQWlFQW1wdjBubjZqN3M0MVI0QzFNMEpSL0djNE53MHdldlFmZWdEVGF1R2p3cFk9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K" | ||||
| KUBE_DATA="LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSU5ZS1BFb1dhd1NKUzJlRW5oWmlYMk5VZlY1ZlhKV2krSVNnV09TNFE5VTlvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFVUozblJZN0tCNEtUWUx0WnFUMS96VS84a0Z2Sk1lUGhYMm1Vc25pczBiR3FZblkyaVZEeApYVzR2SVhTYjNqcm9iZ1YwSUtDT0twUWs2OHJEbE03ckRBPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo=" | ||||
							
								
								
									
										5
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										5
									
								
								go.mod
									
									
									
									
									
								
							| @@ -5,7 +5,7 @@ go 1.23.1 | ||||
| toolchain go1.23.3 | ||||
|  | ||||
| require ( | ||||
| 	cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9 | ||||
| 	cloud.o-forge.io/core/oc-lib v0.0.0-20250624102227-e600fedcab06 | ||||
| 	github.com/akamensky/argparse v1.4.0 | ||||
| 	github.com/google/uuid v1.6.0 | ||||
| 	github.com/goraz/onion v0.1.3 | ||||
| @@ -20,6 +20,7 @@ require ( | ||||
| 	github.com/golang/protobuf v1.5.4 // indirect | ||||
| 	github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect | ||||
| 	github.com/sirupsen/logrus v1.9.3 // indirect | ||||
| 	github.com/ugorji/go/codec v1.1.7 // indirect | ||||
| 	google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect | ||||
| 	google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect | ||||
| 	google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect | ||||
| @@ -27,7 +28,6 @@ require ( | ||||
| ) | ||||
|  | ||||
| require ( | ||||
| 	github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d | ||||
| 	github.com/argoproj/argo-workflows/v3 v3.6.4 | ||||
| 	github.com/beorn7/perks v1.0.1 // indirect | ||||
| 	github.com/biter777/countries v1.7.5 // indirect | ||||
| @@ -71,7 +71,6 @@ require ( | ||||
| 	github.com/robfig/cron v1.2.0 // indirect | ||||
| 	github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 // indirect | ||||
| 	github.com/smartystreets/goconvey v1.6.4 // indirect | ||||
| 	github.com/ugorji/go/codec v1.1.7 // indirect | ||||
| 	github.com/x448/float16 v0.8.4 // indirect | ||||
| 	github.com/xdg-go/pbkdf2 v1.0.0 // indirect | ||||
| 	github.com/xdg-go/scram v1.1.2 // indirect | ||||
|   | ||||
							
								
								
									
										22
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										22
									
								
								go.sum
									
									
									
									
									
								
							| @@ -1,10 +1,28 @@ | ||||
| cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= | ||||
| cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250217072519-cafadec1469f h1:esLB0EAn8IuOChW35kcBrPaN80z4A4yYyz1mXT45GQo= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250217072519-cafadec1469f/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9 h1:mSFFPwil5Ih+RPBvn88MBerQMtsoHnOuyCZQaf91a34= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963 h1:ADDfqwtWF+VQTMSNAWPuhc4mmiKdgpHNmBB+UI2jRPE= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250617130633-8f2adb76e41c h1:k2y+ocElqwUK5yzyCf3rWrDUzPWbds4MbtG58+Szos0= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250617130633-8f2adb76e41c/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250617133502-9e5266326157 h1:853UvpMOM1QuWLrr/V8biDS8IcQcqHvoJsOT4epxDng= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250617133502-9e5266326157/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250617141444-0b0952b28c7e h1:Z5vLv+Wzzz58abmHRnovoqbkVlKHuC8u8/RLv7FjtZw= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250617141444-0b0952b28c7e/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250617144221-ec7a7e474637 h1:YiZbn6KmjgZ62uM+kH95Snd2nQliDKDnGMAxRr/VoUw= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250617144221-ec7a7e474637/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250624064953-2c8dcbe93d14 h1:iCTrYc2+W2BFLOupRK1sD6sOgsK4NIs6WMC+4LiWCaY= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250624064953-2c8dcbe93d14/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250624093207-3fdf5c3ebf29 h1:JitS1izRltTyOaWnvXnmYywHj0napsL6y0nBYiWUCNo= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250624093207-3fdf5c3ebf29/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250624095852-147c7bc3a1d5 h1:0eV0E3kBZkOyoAurRmP9h4eHmFrZajOxSqoBgM3l3dk= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250624095852-147c7bc3a1d5/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250624102227-e600fedcab06 h1:+RSv62uIC7wsmibsp1XTanQMNznNeOGgPpfhb6ZHT4c= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250624102227-e600fedcab06/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= | ||||
| github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= | ||||
| github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= | ||||
| github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= | ||||
| github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc= | ||||
| github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= | ||||
| github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= | ||||
|   | ||||
| @@ -7,6 +7,8 @@ import ( | ||||
| 	"oc-monitord/tools" | ||||
| 	"oc-monitord/utils" | ||||
| 	"slices" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/rs/zerolog" | ||||
| @@ -106,13 +108,16 @@ func NewArgoPodLog(name string, step string, msg string) ArgoPodLog { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interface) { | ||||
| func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) { | ||||
| 	var argoWatcher *ArgoWatch | ||||
| 	var pods []string | ||||
| 	var node wfv1.NodeStatus | ||||
|  | ||||
| 	wfl := utils.GetWFLogger("") | ||||
| 	wfl.Debug().Msg("Starting to log " + wfName) | ||||
|  | ||||
| 	var wg sync.WaitGroup | ||||
| 	 | ||||
| 	for event := range (watcher.ResultChan()) { | ||||
| 		wf, ok := event.Object.(*wfv1.Workflow) | ||||
| 		if !ok { | ||||
| @@ -120,7 +125,7 @@ func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interfac | ||||
| 			continue | ||||
| 		} | ||||
| 		if len(wf.Status.Nodes) == 0 { | ||||
| 			wfl.Debug().Msg("No node status yet")	// The first output of the channel doesn't contain Nodes so we skip it | ||||
| 			wfl.Info().Msg("No node status yet")	// The first output of the channel doesn't contain Nodes so we skip it | ||||
| 			continue | ||||
| 		} | ||||
| 		 | ||||
| @@ -138,7 +143,7 @@ func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interfac | ||||
|  | ||||
| 		newWatcher := ArgoWatch{ | ||||
| 			Name: node.Name, | ||||
| 			Namespace: executionID, | ||||
| 			Namespace: namespace, | ||||
| 			Status: string(node.Phase), | ||||
| 			Created: node.StartedAt.String(), | ||||
| 			Started: node.StartedAt.String(), | ||||
| @@ -163,7 +168,9 @@ func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interfac | ||||
| 			if !slices.Contains(pods,pod.Name){ | ||||
| 				pl := wfl.With().Str("pod",  pod.Name).Logger() | ||||
| 				if wfName == pod.Name { pods = append(pods, pod.Name); continue }	// One of the node is the Workflow, the others are the pods so don't try to log on the wf name | ||||
| 				go logKubernetesPods(executionID, wfName, pod.Name, pl) | ||||
| 				pl.Info().Msg("Found a new pod to log : "  + pod.Name) | ||||
| 				wg.Add(1) | ||||
| 				go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg) | ||||
| 				pods = append(pods, pod.Name) | ||||
| 			}  | ||||
| 		} | ||||
| @@ -171,6 +178,8 @@ func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interfac | ||||
| 		// Stop listening to the chan when the Workflow is completed or something bad happened | ||||
| 		if node.Phase.Completed() { | ||||
| 			wfl.Info().Msg(wfName + " worflow completed") | ||||
| 			wg.Wait() | ||||
| 			wfl.Info().Msg(wfName + " exiting") | ||||
| 			break | ||||
| 		} | ||||
| 		if node.Phase.FailedOrError() { | ||||
| @@ -196,24 +205,31 @@ func retrieveCondition(wf *wfv1.Workflow) (c Conditions) { | ||||
| } | ||||
|  | ||||
| // Function needed to be executed as a go thread  | ||||
| func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger){ | ||||
| func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger, wg *sync.WaitGroup){ | ||||
| 	defer wg.Done() | ||||
| 	 | ||||
| 	s := strings.Split(podName, ".") | ||||
| 	name := s[0] + "-" + s[1] | ||||
| 	step := s[1] | ||||
| 	 | ||||
| 	k, err := tools.NewKubernetesTool() | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not get Kubernetes tools") | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	 | ||||
| 	reader, err := k.GetPodLogger(executionId, wfName, podName) | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg(err.Error()) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	 | ||||
| 	scanner := bufio.NewScanner(reader) | ||||
| 	for scanner.Scan() { | ||||
| 		log := scanner.Text() | ||||
| 		podLog := NewArgoPodLog(wfName,podName,log) | ||||
| 		podLog := NewArgoPodLog(name,step,log) | ||||
| 		jsonified, _ := json.Marshal(podLog) | ||||
| 		logger.Info().Msg(string(jsonified)) | ||||
| 	} | ||||
| 	 | ||||
| } | ||||
| @@ -3,7 +3,6 @@ package logger | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"oc-monitord/conf" | ||||
|  | ||||
| @@ -71,7 +70,6 @@ func LogLocalWorkflow(wfName string, pipe io.ReadCloser, wg *sync.WaitGroup) { | ||||
| 	logger = logs.GetLogger() | ||||
|  | ||||
| 	logger.Debug().Msg("created wf_logger") | ||||
| 	fmt.Println("created wf_logger") | ||||
| 	wfLogger = logger.With().Str("argo_name", wfName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger() | ||||
|  | ||||
| 	var current_watch, previous_watch ArgoWatch | ||||
| @@ -111,7 +109,6 @@ func LogLocalPod(wfName string, pipe io.ReadCloser, steps []string, wg *sync.Wai | ||||
| 	scanner := bufio.NewScanner(pipe) | ||||
| 	for scanner.Scan() { | ||||
| 		var podLogger zerolog.Logger | ||||
| 		fmt.Println("new line") | ||||
| 		wg.Add(1) | ||||
| 		 | ||||
| 		line := scanner.Text() | ||||
|   | ||||
							
								
								
									
										79
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										79
									
								
								main.go
									
									
									
									
									
								
							| @@ -14,6 +14,7 @@ import ( | ||||
|  | ||||
| 	"oc-monitord/conf" | ||||
| 	l "oc-monitord/logger" | ||||
| 	"oc-monitord/models" | ||||
| 	u "oc-monitord/utils" | ||||
| 	"oc-monitord/workflow_builder" | ||||
|  | ||||
| @@ -53,7 +54,7 @@ func main() { | ||||
|  | ||||
| 	os.Setenv("test_service", "true") // Only for service demo, delete before merging on main | ||||
| 	parser = *argparse.NewParser("oc-monitord", "Launch the execution of a workflow given as a parameter and sends the produced logs to a loki database") | ||||
| 	loadConfig(false, &parser) | ||||
| 	setConf(&parser) | ||||
| 	oclib.InitDaemon("oc-monitord") | ||||
|  | ||||
| 	oclib.SetConfig( | ||||
| @@ -67,11 +68,15 @@ func main() { | ||||
| 	logger = u.GetLogger() | ||||
|  | ||||
| 	logger.Debug().Msg("Loki URL : " + conf.GetConfig().LokiURL) | ||||
| 	logger.Debug().Msg("Workflow executed : " + conf.GetConfig().ExecutionID) | ||||
| 	logger.Info().Msg("Workflow executed : " + conf.GetConfig().ExecutionID) | ||||
| 	exec := u.GetExecution(conf.GetConfig().ExecutionID) | ||||
| 	if exec == nil { | ||||
| 		logger.Fatal().Msg("Could not retrieve workflow ID from execution ID " + conf.GetConfig().ExecutionID + " on peer " + conf.GetConfig().PeerID) | ||||
| 		return | ||||
| 	} | ||||
| 	conf.GetConfig().WorkflowID = exec.WorkflowID | ||||
|  | ||||
| 	logger.Debug().Msg("Starting construction of yaml argo for workflow :" + exec.WorkflowID) | ||||
| 	logger.Info().Msg("Starting construction of yaml argo for workflow :" + exec.WorkflowID) | ||||
|  | ||||
| 	if _, err := os.Stat("./argo_workflows/"); os.IsNotExist(err) { | ||||
| 		os.Mkdir("./argo_workflows/", 0755) | ||||
| @@ -83,7 +88,6 @@ func main() { | ||||
|  | ||||
| 	err := new_wf.LoadFrom(conf.GetConfig().WorkflowID, conf.GetConfig().PeerID) | ||||
| 	if err != nil { | ||||
|  | ||||
| 		logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API") | ||||
| 	} | ||||
|  | ||||
| @@ -95,76 +99,75 @@ func main() { | ||||
|  | ||||
| 	argoFilePath, err := builder.CompleteBuild(exec.ExecutionsID) | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg(err.Error()) | ||||
| 		logger.Error().Msg("Error when completing the build of the workflow: " + err.Error()) | ||||
| 	} | ||||
|  | ||||
| 	workflowName = getContainerName(argoFilePath) | ||||
|  | ||||
| 	wf_logger := u.GetWFLogger(workflowName) | ||||
| 	wf_logger.Debug().Msg("Testing argo name") | ||||
|  | ||||
| 	if conf.GetConfig().KubeHost == "" { | ||||
| 		// Not in a k8s environment, get conf from parameters | ||||
| 		fmt.Println("Executes outside of k8s") | ||||
| 		logger.Info().Msg("Executes outside of k8s") | ||||
| 		executeOutside(argoFilePath, builder.Workflow) | ||||
| 	} else { | ||||
| 		// Executed in a k8s environment | ||||
| 		fmt.Println("Executes inside a k8s") | ||||
| 		logger.Info().Msg("Executes inside a k8s") | ||||
| 		// executeInside(exec.GetID(), "argo", argo_file_path, stepMax)  // commenting to use conf.ExecutionID instead of exec.GetID() | ||||
| 		executeInside(conf.GetConfig().ExecutionID, conf.GetConfig().ExecutionID, argoFilePath) | ||||
| 		executeInside(exec.ExecutionsID, argoFilePath) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // So far we only log the output from | ||||
| func executeInside(execID string, ns string, argo_file_path string) { | ||||
| func executeInside(ns string, argo_file_path string) { | ||||
| 	t, err := tools2.NewService(conf.GetConfig().Mode) | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not create KubernetesTool") | ||||
| 		return | ||||
| 	} | ||||
| 	 | ||||
|  | ||||
| 	name, err := t.CreateArgoWorkflow(argo_file_path, ns) | ||||
| 	_ = name  | ||||
| 	// _ = name | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not create argo workflow : " + err.Error()) | ||||
| 		fmt.Println("CA :" + conf.GetConfig().KubeCA) | ||||
| 		fmt.Println("Cert :" + conf.GetConfig().KubeCert) | ||||
| 		fmt.Println("Data :" + conf.GetConfig().KubeData) | ||||
| 		logger.Info().Msg(fmt.Sprint("CA :" + conf.GetConfig().KubeCA)) | ||||
| 		logger.Info().Msg(fmt.Sprint("Cert :" + conf.GetConfig().KubeCert)) | ||||
| 		logger.Info().Msg(fmt.Sprint("Data :" + conf.GetConfig().KubeData)) | ||||
| 		return | ||||
| 	} else { | ||||
| 		watcher, err := t.GetArgoWatch(execID, workflowName) | ||||
| 		watcher, err := t.GetArgoWatch(ns, workflowName) | ||||
| 		if err != nil { | ||||
| 			logger.Error().Msg("Could not retrieve Watcher : " + err.Error()) | ||||
| 		} | ||||
|  | ||||
| 		l.LogKubernetesArgo(name, execID, watcher) | ||||
| 		l.LogKubernetesArgo(name, ns, watcher) | ||||
| 		if err != nil { | ||||
| 			logger.Error().Msg("Could not log workflow : " + err.Error()) | ||||
| 		} | ||||
|  | ||||
| 		logger.Info().Msg("Finished, exiting...") | ||||
| 	} | ||||
|  | ||||
| } | ||||
|  | ||||
| func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { | ||||
| func executeOutside(argo_file_path string, workflow *models.Workflow) { | ||||
| 	var stdoutSubmit, stderrSubmit io.ReadCloser | ||||
| 	var stdoutLogs, stderrLogs io.ReadCloser | ||||
| 	var wg sync.WaitGroup | ||||
| 	var err error | ||||
|  | ||||
| 	logger.Debug().Msg("executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID ) | ||||
| 	logger.Debug().Msg("executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID) | ||||
|  | ||||
| 	cmdSubmit := exec.Command("argo", "submit", "--watch", argo_file_path, "--serviceaccount", "sa-"+conf.GetConfig().ExecutionID, "-n", conf.GetConfig().ExecutionID) | ||||
| 	if stdoutSubmit, err = cmdSubmit.StdoutPipe(); err != nil { | ||||
| 		wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error()) | ||||
| 		return | ||||
| 	} | ||||
| 	 | ||||
| 	cmdLogs := exec.Command("argo", "logs", "oc-monitor-"+workflowName, "-n", conf.GetConfig().ExecutionID, "--follow","--no-color") | ||||
|  | ||||
| 	cmdLogs := exec.Command("argo", "logs", "oc-monitor-"+workflowName, "-n", conf.GetConfig().ExecutionID, "--follow", "--no-color") | ||||
| 	if stdoutLogs, err = cmdLogs.StdoutPipe(); err != nil { | ||||
| 		wf_logger.Error().Msg("Could not retrieve stdoutpipe for 'argo logs'" + err.Error()) | ||||
| 		return | ||||
| 	} | ||||
| 	 | ||||
|  | ||||
| 	var steps []string | ||||
| 	for _, template := range workflow.Spec.Templates { | ||||
| 		steps = append(steps, template.Name) | ||||
| @@ -173,7 +176,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { | ||||
| 	go l.LogLocalWorkflow(workflowName, stdoutSubmit, &wg) | ||||
| 	go l.LogLocalPod(workflowName, stdoutLogs, steps, &wg) | ||||
|  | ||||
| 	fmt.Println("Starting argo submit") | ||||
| 	logger.Info().Msg("Starting argo submit") | ||||
| 	if err := cmdSubmit.Start(); err != nil { | ||||
| 		wf_logger.Error().Msg("Could not start argo submit") | ||||
| 		wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text()) | ||||
| @@ -182,15 +185,15 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { | ||||
|  | ||||
| 	time.Sleep(5 * time.Second) | ||||
|  | ||||
| 	fmt.Println("Running argo logs") | ||||
| 	logger.Info().Msg("Running argo logs") | ||||
| 	if err := cmdLogs.Run(); err != nil { | ||||
| 		wf_logger.Error().Msg("Could not run '" + strings.Join(cmdLogs.Args, " ") + "'") | ||||
| 		 | ||||
|  | ||||
| 		wf_logger.Fatal().Msg(err.Error() + bufio.NewScanner(stderrLogs).Text()) | ||||
|  | ||||
| 	} | ||||
|  | ||||
| 	fmt.Println("Waiting argo submit") | ||||
| 	logger.Info().Msg("Waiting argo submit") | ||||
| 	if err := cmdSubmit.Wait(); err != nil { | ||||
| 		wf_logger.Error().Msg("Could not execute argo submit") | ||||
| 		wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text()) | ||||
| @@ -200,22 +203,13 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { | ||||
| 	wg.Wait() | ||||
| } | ||||
|  | ||||
|  | ||||
| func loadConfig(is_k8s bool, parser *argparse.Parser) { | ||||
| 	var o *onion.Onion | ||||
| 	o = initOnion(o) | ||||
| 	setConf(is_k8s, o, parser) | ||||
|  | ||||
| 	// if !IsValidUUID(conf.GetConfig().ExecutionID) { | ||||
| 	// 	logger.Fatal().Msg("Provided ID is not an UUID") | ||||
| 	// } | ||||
| } | ||||
|  | ||||
| func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) { | ||||
| func setConf(parser *argparse.Parser) { | ||||
| 	url := parser.String("u", "url", &argparse.Options{Required: true, Default: "http://127.0.0.1:3100", Help: "Url to the Loki database logs will be sent to"}) | ||||
| 	mode := parser.String("M", "mode", &argparse.Options{Required: false, Default: "", Help: "Mode of the execution"}) | ||||
| 	execution := parser.String("e", "execution", &argparse.Options{Required: true, Help: "Execution ID of the workflow to request from oc-catalog API"}) | ||||
| 	peer := parser.String("p", "peer", &argparse.Options{Required: false, Default: "", Help: "Peer ID of the workflow to request from oc-catalog API"}) | ||||
| 	groups := parser.String("g", "groups", &argparse.Options{Required: false, Default: "", Help: "Groups of the peer to request from oc-catalog API"}) | ||||
|  | ||||
| 	mongo := parser.String("m", "mongo", &argparse.Options{Required: true, Default: "mongodb://127.0.0.1:27017", Help: "URL to reach the MongoDB"}) | ||||
| 	db := parser.String("d", "database", &argparse.Options{Required: true, Default: "DC_myDC", Help: "Name of the database to query in MongoDB"}) | ||||
| 	timeout := parser.Int("t", "timeout", &argparse.Options{Required: false, Default: -1, Help: "Timeout for the execution of the workflow"}) | ||||
| @@ -231,7 +225,7 @@ func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) { | ||||
|  | ||||
| 	err := parser.Parse(os.Args) | ||||
| 	if err != nil { | ||||
| 		fmt.Println(parser.Usage(err)) | ||||
| 		logger.Info().Msg(parser.Usage(err)) | ||||
| 		os.Exit(1) | ||||
| 	} | ||||
| 	conf.GetConfig().Logs = "debug" | ||||
| @@ -242,7 +236,7 @@ func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) { | ||||
| 	conf.GetConfig().Mode = *mode | ||||
| 	conf.GetConfig().ExecutionID = *execution | ||||
| 	conf.GetConfig().PeerID = *peer | ||||
|  | ||||
| 	conf.GetConfig().Groups = strings.Split((*groups), ",") | ||||
| 	conf.GetConfig().KubeHost = *host | ||||
| 	conf.GetConfig().KubePort = *port | ||||
|  | ||||
| @@ -304,7 +298,6 @@ func getContainerName(argo_file string) string { | ||||
| 	return container_name | ||||
| } | ||||
|  | ||||
|  | ||||
| func updateStatus(status string, log string) { | ||||
| 	exec_id := conf.GetConfig().ExecutionID | ||||
|  | ||||
|   | ||||
| @@ -1,5 +1,7 @@ | ||||
| package models | ||||
|  | ||||
| import "gopkg.in/yaml.v3" | ||||
|  | ||||
| type ServiceResource struct { | ||||
| 	Action            string `yaml:"action,omitempty"` | ||||
| 	SuccessCondition  string `yaml:"successCondition,omitempty"` | ||||
| @@ -15,6 +17,24 @@ type Service struct { | ||||
| 	Spec       ServiceSpec `yaml:"spec"` | ||||
| } | ||||
|  | ||||
| func (s *Service) BindToArgo(workflow *Workflow) error { | ||||
| 	service_manifest, err := yaml.Marshal(s) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	service_template := Template{Name: "workflow-service-pod", | ||||
| 		Resource: ServiceResource{ | ||||
| 			Action:            "create", | ||||
| 			SuccessCondition:  "status.succeeded > 0", | ||||
| 			FailureCondition:  "status.failed > 3", | ||||
| 			SetOwnerReference: true, | ||||
| 			Manifest:          string(service_manifest), | ||||
| 		}, | ||||
| 	} | ||||
| 	workflow.Spec.Templates = append(workflow.Spec.Templates, service_template) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| type Metadata struct { | ||||
| 	Name string `yaml:"name"` | ||||
| } | ||||
|   | ||||
| @@ -1,8 +1,15 @@ | ||||
| package models | ||||
|  | ||||
| import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
|  | ||||
| 	w "cloud.o-forge.io/core/oc-lib/models/workflow" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/workflow/graph" | ||||
|  | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/common/enum" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/common/models" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/resources" | ||||
| ) | ||||
| @@ -12,11 +19,60 @@ type Parameter struct { | ||||
| 	Value string `yaml:"value,omitempty"` | ||||
| } | ||||
|  | ||||
| type Bounds struct { | ||||
| 	CPU    string `yaml:"cpu,omitempty"` | ||||
| 	Memory string `yaml:"memory,omitempty"` | ||||
| 	GPU    string `yaml:"nvidia.com/gpu,omitempty"` | ||||
| } | ||||
|  | ||||
| func NewBounds() *Bounds { | ||||
| 	return &Bounds{ | ||||
| 		CPU:    "0", | ||||
| 		Memory: "0", | ||||
| 		GPU:    "0", | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (b *Bounds) Set(value float64, what string, isMin bool) bool { | ||||
| 	i := float64(0) | ||||
| 	switch what { | ||||
| 	case "cpu": | ||||
| 		if newI, err := strconv.ParseFloat(b.CPU, 64); err == nil { | ||||
| 			i = newI | ||||
| 		} | ||||
| 	case "ram": | ||||
| 		if newI, err := strconv.ParseFloat(b.Memory, 64); err == nil { | ||||
| 			i = newI | ||||
| 		} | ||||
| 	case "gpu": | ||||
| 		if newI, err := strconv.ParseFloat(b.GPU, 64); err == nil { | ||||
| 			i = newI | ||||
| 		} | ||||
| 	} | ||||
| 	ok := (value > i && !isMin) || (value < i && isMin) | ||||
| 	if ok { | ||||
| 		switch what { | ||||
| 		case "cpu": | ||||
| 			b.CPU = fmt.Sprintf("%f", value) | ||||
| 			return true | ||||
| 		case "ram": | ||||
| 			b.Memory = fmt.Sprintf("%fGi", value) | ||||
| 			return true | ||||
| 		case "gpu": | ||||
| 			b.GPU = fmt.Sprintf("%f", value) | ||||
| 			return true | ||||
| 		} | ||||
| 	} | ||||
| 	return false | ||||
| } | ||||
|  | ||||
| type Container struct { | ||||
| 	Image        string        `yaml:"image"` | ||||
| 	Command      []string      `yaml:"command,omitempty,flow"` | ||||
| 	Args         []string      `yaml:"args,omitempty,flow"` | ||||
| 	VolumeMounts []VolumeMount `yaml:"volumeMounts,omitempty"` | ||||
| 	Requests     Bounds        `yaml:"requests,omitempty"` | ||||
| 	Limits       Bounds        `yaml:"limits,omitempty"` | ||||
| } | ||||
|  | ||||
| func (c *Container) AddVolumeMount(volumeMount VolumeMount, volumes []VolumeMount) []VolumeMount { | ||||
| @@ -37,27 +93,87 @@ func (c *Container) AddVolumeMount(volumeMount VolumeMount, volumes []VolumeMoun | ||||
| 	return volumes | ||||
| } | ||||
|  | ||||
| type VolumeMount struct { | ||||
| 	Name      string                     `yaml:"name"` | ||||
| 	MountPath string                     `yaml:"mountPath"` | ||||
| 	Storage   *resources.StorageResource `yaml:"-"` | ||||
| } | ||||
|  | ||||
| type Task struct { | ||||
| 	Name         string   `yaml:"name"` | ||||
| 	Template     string   `yaml:"template"` | ||||
| 	Dependencies []string `yaml:"dependencies,omitempty"` | ||||
| 	Name         string            `yaml:"name"` | ||||
| 	Template     string            `yaml:"template"` | ||||
| 	Dependencies []string          `yaml:"dependencies,omitempty"` | ||||
| 	NodeSelector map[string]string `yaml:"nodeSelector,omitempty"` | ||||
| 	Arguments    struct { | ||||
| 		Parameters []Parameter `yaml:"parameters,omitempty"` | ||||
| 	} `yaml:"arguments,omitempty"` | ||||
| } | ||||
|  | ||||
| func NewTask(processingName string, graphItemID string) *Task { | ||||
| 	unique_name := GetArgoName(processingName, graphItemID) | ||||
| 	return &Task{ | ||||
| 		Name:     unique_name, | ||||
| 		Template: unique_name, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (t *Task) BindToArgo( | ||||
| 	dag *Dag, | ||||
| 	graphItemID string, | ||||
| 	originWf *w.Workflow, | ||||
| 	processing *resources.ProcessingResource, | ||||
| 	firstItems, lastItems []string, | ||||
| ) (*Dag, []string, []string) { | ||||
| 	if instance := processing.GetSelectedInstance(); instance != nil { | ||||
| 		t.addParams(instance.(*resources.ProcessingInstance).Env) | ||||
| 		t.addParams(instance.(*resources.ProcessingInstance).Inputs) | ||||
| 		t.addParams(instance.(*resources.ProcessingInstance).Outputs) | ||||
| 	} | ||||
| 	t.Dependencies = TransformDepsToArgo(originWf.GetDependencies(graphItemID)) | ||||
| 	name := "" | ||||
| 	if originWf.Graph.Items[graphItemID].Processing != nil { | ||||
| 		name = originWf.Graph.Items[graphItemID].Processing.GetName() | ||||
| 	} | ||||
| 	if originWf.Graph.Items[graphItemID].Workflow != nil { | ||||
| 		name = originWf.Graph.Items[graphItemID].Workflow.GetName() | ||||
| 	} | ||||
| 	if len(t.Dependencies) == 0 && name != "" { | ||||
| 		firstItems = append(firstItems, GetArgoName(name, graphItemID)) | ||||
| 	} | ||||
| 	if deps := originWf.IsDependancy(graphItemID); len(deps) == 0 && name != "" { | ||||
| 		lastItems = append(lastItems, GetArgoName(name, graphItemID)) | ||||
| 	} | ||||
| 	dag.Tasks = append(dag.Tasks, *t) | ||||
| 	return dag, firstItems, lastItems | ||||
| } | ||||
|  | ||||
| func (t *Task) addParams(params []models.Param) { | ||||
| 	for _, value := range params { | ||||
| 		t.Arguments.Parameters = append(t.Arguments.Parameters, Parameter{ | ||||
| 			Name:  value.Name, | ||||
| 			Value: value.Value, | ||||
| 		}) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (t *Task) GetDeps(name string) (int, string) { | ||||
| 	for i, deps := range t.Dependencies { | ||||
| 		if strings.Contains(deps, name) { | ||||
| 			return i, deps | ||||
| 		} | ||||
| 	} | ||||
| 	return 0, "" | ||||
| } | ||||
|  | ||||
| type Dag struct { | ||||
| 	Tasks []Task `yaml:"tasks,omitempty"` | ||||
| } | ||||
|  | ||||
| func (d *Dag) GetTask(taskName string) *Task { | ||||
| 	for _, task := range d.Tasks { | ||||
| 		if strings.Contains(task.Name, taskName) { | ||||
| 			return &task | ||||
| 		} | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| type TemplateMetadata struct { | ||||
| 	Labels map[string]string `yaml:"labels,omitempty"` | ||||
| 	Labels      map[string]string `yaml:"labels,omitempty"` | ||||
| 	Annotations map[string]string `yaml:"annotations,omitempty"` | ||||
| } | ||||
|  | ||||
| @@ -66,6 +182,10 @@ type Secret struct { | ||||
| 	Key  string `yaml:"key"` | ||||
| } | ||||
|  | ||||
| func NewSecret(name string, key string) *Secret { | ||||
| 	return &Secret{Name: name, Key: key + "-key"} | ||||
| } | ||||
|  | ||||
| type Key struct { | ||||
| 	Key             string  `yaml:"key"` | ||||
| 	Bucket          string  `yaml:"bucket"` | ||||
| @@ -81,6 +201,59 @@ type Artifact struct { | ||||
| 	S3   *Key   `yaml:"s3,omitempty"` | ||||
| } | ||||
|  | ||||
| func NewArtifact(name string, rw graph.StorageProcessingGraphLink, params []models.Param, template Template) *Artifact { | ||||
| 	if rw.Write { | ||||
| 		name += "-" + rw.Destination + "-input-write" | ||||
| 	} else { | ||||
| 		name = "-" + rw.Destination + "-input-read" | ||||
| 	} | ||||
| 	return &Artifact{ | ||||
| 		Name: name, | ||||
| 		Path: template.ReplacePerEnv(rw.Source, params), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (a *Artifact) BindToArgo(storageType enum.StorageType, rw graph.StorageProcessingGraphLink, params []models.Param, template Template) { | ||||
| 	if rw.Write { | ||||
| 		template.Outputs.Artifacts = append(template.Inputs.Artifacts, *a) | ||||
| 	} else { | ||||
| 		template.Inputs.Artifacts = append(template.Outputs.Artifacts, *a) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (a *Artifact) bindS3(rw graph.StorageProcessingGraphLink, params []models.Param, template Template) { | ||||
| 	a.S3 = &Key{ | ||||
| 		Key:      template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, params), | ||||
| 		Insecure: true, // temporary | ||||
| 	} | ||||
| 	/* sel := storage.GetSelectedInstance() | ||||
| 	if sel != nil { | ||||
| 		if sel.(*resources.StorageResourceInstance).Credentials != nil { | ||||
| 			tool, err := tools2.NewService(conf.GetConfig().Mode) | ||||
| 			if err != nil || tool == nil { | ||||
| 				logger.Error().Msg("Could not create the access secret") | ||||
| 			} else { | ||||
| 				id, err := tool.CreateAccessSecret(namespace, | ||||
| 					sel.(*resources.StorageResourceInstance).Credentials.Login, | ||||
| 					sel.(*resources.StorageResourceInstance).Credentials.Pass) | ||||
| 				if err == nil { | ||||
| 					a.S3.AccessKeySecret = NewSecret(id, "access") | ||||
| 					a.S3.SecretKeySecret = NewSecret(id, "secret") | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		source := sel.(*resources.StorageResourceInstance).Source | ||||
| 		a.S3.Key = strings.ReplaceAll(strings.ReplaceAll(a.S3.Key, source+"/", ""), source, "") | ||||
| 		splits := strings.Split(a.S3.EndPoint, "/") | ||||
| 		if len(splits) > 1 { | ||||
| 			a.S3.Bucket = splits[0] | ||||
| 			a.S3.EndPoint = strings.Join(splits[1:], "/") | ||||
| 		} else { | ||||
| 			a.S3.Bucket = splits[0] | ||||
| 		} | ||||
| 	} */ | ||||
| } | ||||
|  | ||||
| type InOut struct { | ||||
| 	Parameters []Parameter `yaml:"parameters"` | ||||
| 	Artifacts  []Artifact  `yaml:"artifacts,omitempty"` | ||||
| @@ -143,10 +316,43 @@ func (template *Template) ReplacePerEnv(arg string, envs []models.Param) string | ||||
|  | ||||
| // Add the metadata that allow Admiralty to pick up an Argo Workflow that needs to be reparted | ||||
| // The value of "clustername" is the peerId, which must be replaced by the node name's for this specific execution | ||||
| func (t *Template) AddAdmiraltyAnnotations(peerId string){ | ||||
| func (t *Template) AddAdmiraltyAnnotations(peerID, namespace string) error { | ||||
| 	if t.Metadata.Annotations == nil { | ||||
| 		t.Metadata.Annotations = make(map[string]string) | ||||
| 	} | ||||
| 	t.Metadata.Annotations["multicluster.admiralty.io/elect"] = "" | ||||
| 	t.Metadata.Annotations["multicluster.admiralty.io/clustername"] = peerId | ||||
| } | ||||
|  | ||||
| 	const key = "admiralty.io/multi-cluster-scheduler" | ||||
|  | ||||
| 	var annotation SchedulerAnnotation | ||||
|  | ||||
| 	// Parse existing annotation if it exists | ||||
| 	if val, ok := t.Metadata.Annotations[key]; ok && val != "" { | ||||
| 		if err := json.Unmarshal([]byte(val), &annotation); err != nil { | ||||
| 			return fmt.Errorf("failed to parse existing scheduler annotation: %w", err) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Add new affinity | ||||
| 	annotation.Affinities = append(annotation.Affinities, affinity{ | ||||
| 		Cluster:   "target-" + peerID + "-" + namespace, | ||||
| 		Namespace: namespace, | ||||
| 	}) | ||||
|  | ||||
| 	// Encode back to JSON | ||||
| 	bytes, err := json.Marshal(annotation) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("failed to encode scheduler annotation: %w", err) | ||||
| 	} | ||||
|  | ||||
| 	t.Metadata.Annotations[key] = string(bytes) | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| type affinity struct { | ||||
| 	Cluster   string `json:"cluster"` | ||||
| 	Namespace string `json:"namespace"` | ||||
| } | ||||
|  | ||||
| type SchedulerAnnotation struct { | ||||
| 	Affinities []affinity `json:"affinities"` | ||||
| } | ||||
|   | ||||
							
								
								
									
										92
									
								
								models/utils.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										92
									
								
								models/utils.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,92 @@ | ||||
| package models | ||||
|  | ||||
| import ( | ||||
| 	"strings" | ||||
|  | ||||
| 	w "cloud.o-forge.io/core/oc-lib/models/workflow" | ||||
| ) | ||||
|  | ||||
| type WorkflowsDependancies struct { | ||||
| 	FirstWfTasks   map[string][]string | ||||
| 	RelatedWfTasks map[string][]string | ||||
| 	LastWfTasks    map[string][]string | ||||
| } | ||||
|  | ||||
| func NewWorkflowDependancies() *WorkflowsDependancies { | ||||
| 	return &WorkflowsDependancies{ | ||||
| 		FirstWfTasks:   map[string][]string{}, | ||||
| 		RelatedWfTasks: map[string][]string{}, | ||||
| 		LastWfTasks:    map[string][]string{}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (w *WorkflowsDependancies) BindFirstTasks(depsFunc func(v string) []w.Deps, dag *Dag) { | ||||
| 	for wfID, firstTasks := range w.FirstWfTasks { | ||||
| 		deps := depsFunc(wfID) | ||||
| 		if task := dag.GetTask(wfID); task != nil && len(deps) > 0 { | ||||
| 			task.Dependencies = append(task.Dependencies, firstTasks...) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (w *WorkflowsDependancies) BindRelatedTasks(dag *Dag) { | ||||
| 	for wfID, relatedWfTasks := range w.RelatedWfTasks { | ||||
| 		for _, dep := range relatedWfTasks { | ||||
| 			if task := dag.GetTask(dep); task != nil { | ||||
| 				index := -1 | ||||
| 				if i, deps := task.GetDeps(wfID); deps != "" { | ||||
| 					index = i | ||||
| 				} | ||||
| 				if index != -1 { | ||||
| 					task.Dependencies = append(task.Dependencies[:index], task.Dependencies[index+1:]...) | ||||
| 				} | ||||
| 				if w.LastWfTasks[wfID] != nil { | ||||
| 					task.Dependencies = append(task.Dependencies, w.LastWfTasks[wfID]...) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| type Workflow struct { | ||||
| 	ApiVersion string `yaml:"apiVersion"` | ||||
| 	Kind       string `yaml:"kind"` | ||||
| 	Metadata   struct { | ||||
| 		Name string `yaml:"name"` | ||||
| 	} `yaml:"metadata"` | ||||
| 	Spec Spec `yaml:"spec,omitempty"` | ||||
| } | ||||
|  | ||||
| func (b *Workflow) GetDag() *Dag { | ||||
| 	for _, t := range b.Spec.Templates { | ||||
| 		if t.Name == "dag" { | ||||
| 			return t.Dag | ||||
| 		} | ||||
| 	} | ||||
| 	b.Spec.Templates = append(b.Spec.Templates, Template{Name: "dag", Dag: &Dag{}}) | ||||
| 	return b.Spec.Templates[len(b.Spec.Templates)-1].Dag | ||||
| } | ||||
|  | ||||
| type Spec struct { | ||||
| 	ServiceAccountName string                `yaml:"serviceAccountName"` | ||||
| 	Entrypoint         string                `yaml:"entrypoint"` | ||||
| 	Arguments          []Parameter           `yaml:"arguments,omitempty"` | ||||
| 	Volumes            []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"` | ||||
| 	Templates          []Template            `yaml:"templates"` | ||||
| 	Timeout            int                   `yaml:"activeDeadlineSeconds,omitempty"` | ||||
| } | ||||
|  | ||||
| func GetArgoName(raw_name string, component_id string) (formatedName string) { | ||||
| 	formatedName = strings.ReplaceAll(raw_name, " ", "-") | ||||
| 	formatedName += "-" + component_id | ||||
| 	formatedName = strings.ToLower(formatedName) | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func TransformDepsToArgo(deps []w.Deps) []string { | ||||
| 	argoDeps := []string{} | ||||
| 	for _, dep := range deps { | ||||
| 		argoDeps = append(argoDeps, GetArgoName(dep.Source, dep.Dest)) | ||||
| 	} | ||||
| 	return argoDeps | ||||
| } | ||||
| @@ -1,5 +1,12 @@ | ||||
| package models | ||||
|  | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"strings" | ||||
|  | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/resources" | ||||
| ) | ||||
|  | ||||
| type VolumeClaimTemplate struct { | ||||
| 	Metadata struct { | ||||
| 		Name string `yaml:"name"` | ||||
| @@ -15,3 +22,22 @@ type VolumeSpec struct { | ||||
| 		} `yaml:"requests"` | ||||
| 	} `yaml:"resources"` | ||||
| } | ||||
|  | ||||
| type VolumeMount struct { | ||||
| 	Name      string                     `yaml:"name"` | ||||
| 	MountPath string                     `yaml:"mountPath"` | ||||
| 	Storage   *resources.StorageResource `yaml:"-"` | ||||
| } | ||||
|  | ||||
| func (v *VolumeMount) BindToArgo(workflow *Workflow) { // TODO : one think about remote volume but TG | ||||
| 	index := 0 | ||||
| 	if v.Storage.SelectedInstanceIndex != nil && (*v.Storage.SelectedInstanceIndex) >= 0 { | ||||
| 		index = *v.Storage.SelectedInstanceIndex | ||||
| 	} | ||||
| 	storage := v.Storage.Instances[index] | ||||
| 	new_volume := VolumeClaimTemplate{} | ||||
| 	new_volume.Metadata.Name = strings.ReplaceAll(strings.ToLower(v.Name), " ", "-") | ||||
| 	new_volume.Spec.AccessModes = []string{"ReadWriteOnce"} | ||||
| 	new_volume.Spec.Resources.Requests.Storage = fmt.Sprintf("%v", storage.SizeGB) + storage.SizeType.ToArgo() | ||||
| 	workflow.Spec.Volumes = append(workflow.Spec.Volumes, new_volume) | ||||
| } | ||||
|   | ||||
							
								
								
									
										
											BIN
										
									
								
								oc-monitord
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								oc-monitord
									
									
									
									
									
										Executable file
									
								
							
										
											Binary file not shown.
										
									
								
							| @@ -83,7 +83,8 @@ func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, er | ||||
| 	if err != nil { | ||||
| 		return "", errors.New("failed to create workflow: " + err.Error()) | ||||
| 	} | ||||
| 	fmt.Printf("workflow %s created in namespace %s\n", createdWf.Name, ns) | ||||
| 	l := utils.GetLogger() | ||||
| 	l.Info().Msg(fmt.Sprintf("workflow %s created in namespace %s\n", createdWf.Name, ns)) | ||||
| 	return createdWf.Name, nil | ||||
| } | ||||
|  | ||||
| @@ -115,16 +116,12 @@ func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password s | ||||
| } | ||||
|  | ||||
| func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch.Interface, error){ | ||||
| 	wfl := utils.GetWFLogger("") | ||||
| 	wfl.Debug().Msg("Starting argo watch with argo lib") | ||||
| 	fmt.Println("metadata.name=oc-monitor-"+wfName + "  in namespace : " + executionId) | ||||
| 	options := metav1.ListOptions{FieldSelector: "metadata.name=oc-monitor-"+wfName} | ||||
| 	fmt.Println(options) | ||||
| 	watcher, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(executionId).Watch(context.TODO(), options) | ||||
|  | ||||
| 	watcher, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(executionId).Watch(context.Background(), options) | ||||
| 	if err != nil { | ||||
| 		return nil, errors.New("Error executing 'argo watch " + wfName + " -n " + executionId + " with ArgoprojV1alpha1 client") | ||||
| 	} | ||||
| 	 | ||||
|  | ||||
| 	return watcher, nil  | ||||
|  | ||||
| @@ -133,16 +130,18 @@ func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch | ||||
| func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) { | ||||
| 	var targetPod v1.Pod | ||||
|  | ||||
| 	 | ||||
| 	pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{ | ||||
|         LabelSelector: "workflows.argoproj.io/workflow="+wfName, | ||||
| 		LabelSelector: "workflows.argoproj.io/workflow="+wfName, | ||||
|     }) | ||||
|     if err != nil { | ||||
| 		return nil, fmt.Errorf("failed to list pods: " + err.Error()) | ||||
|     } | ||||
|     if len(pods.Items) == 0 { | ||||
| 		return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/node-name=" + nodeName) | ||||
| 		 | ||||
| 		return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/workflow="+ wfName + " no pods found with label workflows.argoproj.io/node-name=" + nodeName + " in namespace " + ns) | ||||
|     } | ||||
|  | ||||
| 	 | ||||
|     for _, pod := range pods.Items { | ||||
| 		if pod.Annotations["workflows.argoproj.io/node-name"] == nodeName { | ||||
| 			targetPod = pod | ||||
| @@ -172,7 +171,8 @@ func (k *KubernetesTools) testPodReady(pod v1.Pod, ns string) { | ||||
| 	 | ||||
| 		var initialized bool | ||||
| 		for _, cond := range pod.Status.Conditions { | ||||
| 			if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue { | ||||
| 			// It seems that for remote pods the pod gets the Succeeded status before it has time to display the it is ready to run in .status.conditions,so we added the OR condition | ||||
| 			if (cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue) || pod.Status.Phase == v1.PodSucceeded {		 | ||||
| 				initialized = true | ||||
| 				return | ||||
| 			} | ||||
|   | ||||
| @@ -17,11 +17,13 @@ var ( | ||||
| 	onceLogger 	sync.Once | ||||
| 	onceWF 		sync.Once | ||||
| ) | ||||
|  | ||||
| func GetExecution(exec_id string) *workflow_execution.WorkflowExecution { | ||||
| 	res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", conf.GetConfig().PeerID, []string{}, nil).LoadOne(exec_id) | ||||
| 	if res.Code != 200 { | ||||
| 		logger := oclib.GetLogger() | ||||
| 		logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id) | ||||
| 		logger.Error().Msg("Error retrieving execution " + exec_id) | ||||
| 		logger.Error().Msg(res.Err) | ||||
| 		return nil | ||||
| 	} | ||||
| 	return res.ToWorkflowExecution() | ||||
|   | ||||
| @@ -4,6 +4,7 @@ import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"oc-monitord/utils" | ||||
| 	"slices" | ||||
| 	"time" | ||||
|  | ||||
| @@ -13,24 +14,23 @@ import ( | ||||
| 	tools "cloud.o-forge.io/core/oc-lib/tools" | ||||
| ) | ||||
|  | ||||
|  | ||||
| type AdmiraltySetter struct { | ||||
| 	Id			string				// ID to identify the execution, correspond to workflow_executions id | ||||
| 	NodeName 	string	// Allows to retrieve the name of the node used for this execution on each peer {"peerId": "nodeName"} | ||||
| 	Id       string // ID to identify the execution, correspond to workflow_executions id | ||||
| 	NodeName string // Allows to retrieve the name of the node used for this execution on each peer {"peerId": "nodeName"} | ||||
| } | ||||
|  | ||||
| func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID string) error { | ||||
| 	 | ||||
| 	logger = logs.GetLogger() | ||||
| func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string, remotePeerID string) error { | ||||
|  | ||||
| 	data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(remotePeerID) | ||||
| 	logger := logs.GetLogger() | ||||
|  | ||||
| 	data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", localPeerID, nil, nil).LoadOne(remotePeerID) | ||||
| 	if data.Code != 200 { | ||||
| 		logger.Error().Msg("Error while trying to instantiate remote peer " + remotePeerID) | ||||
| 		return fmt.Errorf(data.Err) | ||||
| 	} | ||||
| 	remotePeer := data.ToPeer() | ||||
|  | ||||
| 	data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(localPeerID) | ||||
| 	data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", localPeerID, nil, nil).LoadOne(localPeerID) | ||||
| 	if data.Code != 200 { | ||||
| 		logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID) | ||||
| 		return fmt.Errorf(data.Err) | ||||
| @@ -39,35 +39,35 @@ func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID st | ||||
|  | ||||
| 	caller := tools.NewHTTPCaller( | ||||
| 		map[tools.DataType]map[tools.METHOD]string{ | ||||
| 			tools.ADMIRALTY_SOURCE: map[tools.METHOD]string{ | ||||
| 				tools.POST :"/:id", | ||||
| 			tools.ADMIRALTY_SOURCE: { | ||||
| 				tools.POST: "/:id", | ||||
| 			}, | ||||
| 			tools.ADMIRALTY_KUBECONFIG: map[tools.METHOD]string{ | ||||
| 				tools.GET:"/:id", | ||||
| 			tools.ADMIRALTY_KUBECONFIG: { | ||||
| 				tools.GET: "/:id", | ||||
| 			}, | ||||
| 			tools.ADMIRALTY_SECRET: map[tools.METHOD]string{ | ||||
| 				tools.POST:"/:id", | ||||
| 			tools.ADMIRALTY_SECRET: { | ||||
| 				tools.POST: "/:id/" + remotePeerID, | ||||
| 			}, | ||||
| 			tools.ADMIRALTY_TARGET: map[tools.METHOD]string{ | ||||
| 				tools.POST:"/:id", | ||||
| 			tools.ADMIRALTY_TARGET: { | ||||
| 				tools.POST: "/:id/" + remotePeerID, | ||||
| 			}, | ||||
| 			tools.ADMIRALTY_NODES: map[tools.METHOD]string{ | ||||
| 				tools.GET:"/:id", | ||||
| 			tools.ADMIRALTY_NODES: { | ||||
| 				tools.GET: "/:id/" + remotePeerID, | ||||
| 			}, | ||||
| 		}, | ||||
| 	) | ||||
|  | ||||
| 	logger.Info().Msg(" Creating the Admiralty Source on " + remotePeerID + " ns-" + s.Id + "\n\n") | ||||
| 	_ = s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict},caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true) | ||||
| 	logger.Info().Msg(" Retrieving kubeconfig with the secret on " + remotePeerID + " ns-" + s.Id + "\n\n") | ||||
| 	logger.Info().Msg("\n\n  Creating the Admiralty Source on " + remotePeerID + " ns-" + s.Id) | ||||
| 	_ = s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict}, caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true) | ||||
| 	logger.Info().Msg("\n\n  Retrieving kubeconfig with the secret on " + remotePeerID + " ns-" + s.Id) | ||||
| 	kubeconfig := s.getKubeconfig(remotePeer, caller) | ||||
| 	logger.Info().Msg(" Creating a secret from the kubeconfig " + localPeerID + " ns-" + s.Id + "\n\n") | ||||
| 	_ = s.callRemoteExecution(localPeer, []int{http.StatusCreated}, caller,s.Id, tools.ADMIRALTY_SECRET, tools.POST,kubeconfig, true) | ||||
| 	logger.Info().Msg(" Creating the Admiralty Target on " + localPeerID + " ns-" + s.Id + "\n\n") | ||||
| 	_ = s.callRemoteExecution(localPeer,[]int{http.StatusCreated, http.StatusConflict},caller,s.Id,tools.ADMIRALTY_TARGET,tools.POST, nil, true) | ||||
| 	logger.Info().Msg(" Checking for the creation of the admiralty node on " + localPeerID + " ns-" + s.Id + "\n\n") | ||||
| 	s.checkNodeStatus(localPeer,caller) | ||||
| 	 | ||||
| 	logger.Info().Msg("\n\n  Creating a secret from the kubeconfig " + localPeerID + " ns-" + s.Id) | ||||
| 	_ = s.callRemoteExecution(localPeer, []int{http.StatusCreated}, caller, s.Id, tools.ADMIRALTY_SECRET, tools.POST, kubeconfig, true) | ||||
| 	logger.Info().Msg("\n\n Creating the Admiralty Target on " + localPeerID + " in namespace " + s.Id) | ||||
| 	_ = s.callRemoteExecution(localPeer, []int{http.StatusCreated, http.StatusConflict}, caller, s.Id, tools.ADMIRALTY_TARGET, tools.POST, nil, true) | ||||
| 	logger.Info().Msg("\n\n  Checking for the creation of the admiralty node on " + localPeerID + " ns-" + s.Id) | ||||
| 	s.checkNodeStatus(localPeer, caller) | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| @@ -75,31 +75,33 @@ func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCalle | ||||
| 	var kubedata map[string]string | ||||
| 	_ = s.callRemoteExecution(peer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_KUBECONFIG, tools.GET, nil, true) | ||||
| 	if caller.LastResults["body"] == nil || len(caller.LastResults["body"].([]byte)) == 0 { | ||||
| 		fmt.Println("Something went wrong when retrieving data from Get call for kubeconfig") | ||||
| 		l := utils.GetLogger() | ||||
| 		l.Error().Msg("Something went wrong when retrieving data from Get call for kubeconfig") | ||||
| 		panic(0) | ||||
| 	} | ||||
| 	err := json.Unmarshal(caller.LastResults["body"].([]byte), &kubedata) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("Something went wrong when unmarshalling data from Get call for kubeconfig") | ||||
| 		l := utils.GetLogger() | ||||
| 		l.Error().Msg("Something went wrong when unmarshalling data from Get call for kubeconfig") | ||||
| 		panic(0) | ||||
| 	} | ||||
|  | ||||
| 	return kubedata | ||||
| } | ||||
|  | ||||
| func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int,caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) *peer.PeerExecution { | ||||
| func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int, caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) map[string]interface{} { | ||||
| 	l := utils.GetLogger() | ||||
| 	resp, err := peer.LaunchPeerExecution(peer.UUID, dataID, dt, method, body, caller) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("Error when executing on peer at", peer.Url) | ||||
| 		fmt.Println(err) | ||||
| 		l.Error().Msg("Error when executing on peer at" + peer.Url) | ||||
| 		l.Error().Msg(err.Error()) | ||||
| 		panic(0) | ||||
| 	} | ||||
|  | ||||
| 	if !slices.Contains(expectedCode, caller.LastResults["code"].(int)) { | ||||
| 		fmt.Println("Didn't receive the expected code :", caller.LastResults["code"], "when expecting", expectedCode) | ||||
| 		l.Error().Msg(fmt.Sprint("Didn't receive the expected code :", caller.LastResults["code"], "when expecting", expectedCode)) | ||||
| 		if _, ok := caller.LastResults["body"]; ok { | ||||
| 			logger.Info().Msg(string(caller.LastResults["body"].([]byte))) | ||||
| 			// fmt.Println(string(caller.LastResults["body"].([]byte))) | ||||
| 			l.Info().Msg(string(caller.LastResults["body"].([]byte))) | ||||
| 		} | ||||
| 		if panicCode { | ||||
| 			panic(0) | ||||
| @@ -109,7 +111,7 @@ func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int, | ||||
| 	return resp | ||||
| } | ||||
|  | ||||
| func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller){ | ||||
| func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller) { | ||||
| 	var data map[string]interface{} | ||||
| 	if resp, ok := caller.LastResults["body"]; ok { | ||||
| 		json.Unmarshal(resp.([]byte), &data) | ||||
| @@ -120,15 +122,16 @@ func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller){ | ||||
| 		name := metadata.(map[string]interface{})["name"].(string) | ||||
| 		s.NodeName = name | ||||
| 	} else { | ||||
| 		fmt.Println("Could not retrieve data about the recently created node") | ||||
| 		l := utils.GetLogger() | ||||
| 		l.Error().Msg("Could not retrieve data about the recently created node") | ||||
| 		panic(0) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller){ | ||||
| 	for i := range(5) { | ||||
| 		time.Sleep(5 * time.Second) // let some time for kube to generate the node | ||||
| 		_ = s.callRemoteExecution(localPeer,[]int{http.StatusOK},caller,s.Id,tools.ADMIRALTY_NODES,tools.GET, nil, false) | ||||
| func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller) { | ||||
| 	for i := range 5 { | ||||
| 		time.Sleep(10 * time.Second) // let some time for kube to generate the node | ||||
| 		_ = s.callRemoteExecution(localPeer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_NODES, tools.GET, nil, false) | ||||
| 		if caller.LastResults["code"] == 200 { | ||||
| 			s.storeNodeName(caller) | ||||
| 			return | ||||
| @@ -139,5 +142,5 @@ func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HT | ||||
| 		} | ||||
| 		logger.Info().Msg("Could not verify that node is up. Retrying...") | ||||
| 	} | ||||
| 	 | ||||
| } | ||||
|  | ||||
| } | ||||
|   | ||||
| @@ -5,17 +5,16 @@ | ||||
| package workflow_builder | ||||
|  | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"oc-monitord/conf" | ||||
| 	"oc-monitord/models" | ||||
| 	. "oc-monitord/models" | ||||
| 	tools2 "oc-monitord/tools" | ||||
| 	"os" | ||||
| 	"strings" | ||||
| 	"time" | ||||
|  | ||||
| 	oclib "cloud.o-forge.io/core/oc-lib" | ||||
| 	"cloud.o-forge.io/core/oc-lib/logs" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/common/enum" | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/resources" | ||||
| 	w "cloud.o-forge.io/core/oc-lib/models/workflow" | ||||
| 	"github.com/nwtgck/go-fakelish" | ||||
| @@ -26,74 +25,40 @@ import ( | ||||
| var logger zerolog.Logger | ||||
|  | ||||
| type ArgoBuilder struct { | ||||
| 	OriginWorkflow 	*w.Workflow | ||||
| 	Workflow       	Workflow | ||||
| 	Services       	[]*Service | ||||
| 	Timeout        	int | ||||
| 	RemotePeers		[]string | ||||
| } | ||||
|  | ||||
| type Workflow struct { | ||||
| 	ApiVersion string `yaml:"apiVersion"` | ||||
| 	Kind       string `yaml:"kind"` | ||||
| 	Metadata   struct { | ||||
| 		Name string `yaml:"name"` | ||||
| 	} `yaml:"metadata"` | ||||
| 	Spec Spec `yaml:"spec,omitempty"` | ||||
| } | ||||
|  | ||||
| func (b *Workflow) getDag() *Dag { | ||||
| 	for _, t := range b.Spec.Templates { | ||||
| 		if t.Name == "dag" { | ||||
| 			return t.Dag | ||||
| 		} | ||||
| 	} | ||||
| 	b.Spec.Templates = append(b.Spec.Templates, Template{Name: "dag", Dag: &Dag{}}) | ||||
| 	return b.Spec.Templates[len(b.Spec.Templates)-1].Dag | ||||
| } | ||||
|  | ||||
| type Spec struct { | ||||
| 	ServiceAccountName	string					`yaml:"serviceAccountName"` | ||||
| 	Entrypoint 			string                	`yaml:"entrypoint"` | ||||
| 	Arguments  			[]Parameter           	`yaml:"arguments,omitempty"` | ||||
| 	Volumes    			[]VolumeClaimTemplate 	`yaml:"volumeClaimTemplates,omitempty"` | ||||
| 	Templates  			[]Template            	`yaml:"templates"` | ||||
| 	Timeout    			int                   	`yaml:"activeDeadlineSeconds,omitempty"` | ||||
| 	OriginWorkflow *w.Workflow | ||||
| 	Workflow       *models.Workflow | ||||
| 	Services       []*Service | ||||
| 	Timeout        int | ||||
| 	RemotePeers    []string | ||||
| } | ||||
|  | ||||
| // TODO: found on a processing instance linked to storage | ||||
| // add s3, gcs, azure, etc if needed on a link between processing and storage | ||||
| func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, []string, error) { | ||||
| func (b *ArgoBuilder) CreateDAG(namespace string, write bool) (int, []string, []string, error) { | ||||
| 	logger = logs.GetLogger() | ||||
| 	fmt.Println("Creating DAG", b.OriginWorkflow.Graph.Items) | ||||
| 	logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items)) | ||||
| 	// handle services by checking if there is only one processing with hostname and port | ||||
| 	firstItems, lastItems, volumes := b.createTemplates(namespace) | ||||
| 	b.createVolumes(volumes) | ||||
|  | ||||
| 	if b.Timeout > 0 { | ||||
| 		b.Workflow.Spec.Timeout = b.Timeout | ||||
| 	} | ||||
| 	b.Workflow.Spec.ServiceAccountName = "sa-"+namespace | ||||
| 	b.Workflow.Spec.ServiceAccountName = "sa-" + namespace | ||||
| 	b.Workflow.Spec.Entrypoint = "dag" | ||||
| 	b.Workflow.ApiVersion = "argoproj.io/v1alpha1" | ||||
| 	b.Workflow.Kind = "Workflow" | ||||
| 	if !write { | ||||
| 		return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil | ||||
| 	} | ||||
| 	 | ||||
| 	 | ||||
| 	return  len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil | ||||
| 	return len(b.Workflow.GetDag().Tasks), firstItems, lastItems, nil | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []VolumeMount) { | ||||
| 	volumes := []VolumeMount{} | ||||
| func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []models.VolumeMount) { | ||||
| 	volumes := []models.VolumeMount{} | ||||
| 	firstItems := []string{} | ||||
| 	lastItems := []string{} | ||||
| 	items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) | ||||
| 	fmt.Println("Creating templates", len(items)) | ||||
| 	for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) { | ||||
| 	logger.Info().Msg(fmt.Sprint("Creating templates", len(items))) | ||||
| 	for _, item := range items { | ||||
| 		instance := item.Processing.GetSelectedInstance() | ||||
| 		fmt.Println("Creating template for", item.Processing.GetName(), instance) | ||||
| 		logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance)) | ||||
| 		if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil { | ||||
| 			logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName()) | ||||
| 			return firstItems, lastItems, volumes | ||||
| @@ -101,87 +66,68 @@ func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []V | ||||
| 		volumes, firstItems, lastItems = b.createArgoTemplates(namespace, | ||||
| 			item.ID, item.Processing, volumes, firstItems, lastItems) | ||||
| 	} | ||||
| 	firstWfTasks := map[string][]string{} | ||||
| 	latestWfTasks := map[string][]string{} | ||||
| 	relatedWfTasks := map[string][]string{} | ||||
| 	for _, wf := range b.OriginWorkflow.Workflows { | ||||
| 		realWorkflow, code, err := w.NewAccessor(nil).LoadOne(wf) | ||||
| 		if code != 200 { | ||||
| 			logger.Error().Msg("Error loading the workflow : " + err.Error()) | ||||
| 			continue | ||||
| 		} | ||||
| 		subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Timeout: b.Timeout} | ||||
| 		_, fi, li, err := subBuilder.CreateDAG(namespace, false) | ||||
| 		if err != nil { | ||||
| 			logger.Error().Msg("Error creating the subworkflow : " + err.Error()) | ||||
| 			continue | ||||
| 		} | ||||
| 		firstWfTasks[wf] = fi | ||||
| 		if ok, depsOfIds := subBuilder.isArgoDependancy(wf); ok { // IS BEFORE | ||||
| 			latestWfTasks[wf] = li | ||||
| 			relatedWfTasks[wf] = depsOfIds | ||||
| 		} | ||||
| 		subDag := subBuilder.Workflow.getDag() | ||||
| 		d := b.Workflow.getDag() | ||||
| 		d.Tasks = append(d.Tasks, subDag.Tasks...) // add the tasks of the subworkflow to the main workflow | ||||
| 		b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, subBuilder.Workflow.Spec.Templates...) | ||||
| 		b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, subBuilder.Workflow.Spec.Volumes...) | ||||
| 		b.Workflow.Spec.Arguments = append(b.Workflow.Spec.Arguments, subBuilder.Workflow.Spec.Arguments...) | ||||
| 		b.Services = append(b.Services, subBuilder.Services...) | ||||
| 	} | ||||
| 	for wfID, depsOfIds := range relatedWfTasks { | ||||
| 		for _, dep := range depsOfIds { | ||||
| 			for _, task := range b.Workflow.getDag().Tasks { | ||||
| 				if strings.Contains(task.Name, dep) { | ||||
| 					index := -1 | ||||
| 					for i, depp := range task.Dependencies { | ||||
| 						if strings.Contains(depp, wfID) { | ||||
| 							index = i | ||||
| 							break | ||||
| 						} | ||||
| 					} | ||||
| 					if index != -1 { | ||||
| 						task.Dependencies = append(task.Dependencies[:index], task.Dependencies[index+1:]...) | ||||
| 					} | ||||
| 					task.Dependencies = append(task.Dependencies, latestWfTasks[wfID]...) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| 	for wfID, fi := range firstWfTasks { | ||||
| 		deps := b.getArgoDependencies(wfID) | ||||
| 		if len(deps) > 0 { | ||||
| 			for _, dep := range fi { | ||||
| 				for _, task := range b.Workflow.getDag().Tasks { | ||||
| 					if strings.Contains(task.Name, dep) { | ||||
| 						task.Dependencies = append(task.Dependencies, deps...) | ||||
| 					} | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 	wfDeps := models.NewWorkflowDependancies() | ||||
| 	for _, workflowID := range b.OriginWorkflow.Workflows { | ||||
| 		b.createWorkflowArgoTemplate(workflowID, namespace, wfDeps) | ||||
| 	} | ||||
| 	wfDeps.BindRelatedTasks(b.Workflow.GetDag()) | ||||
| 	wfDeps.BindFirstTasks(b.OriginWorkflow.GetDependencies, b.Workflow.GetDag()) | ||||
|  | ||||
| 	if b.Services != nil { | ||||
| 		dag := b.Workflow.getDag() | ||||
| 		dag := b.Workflow.GetDag() | ||||
| 		dag.Tasks = append(dag.Tasks, Task{Name: "workflow-service-pod", Template: "workflow-service-pod"}) | ||||
| 		b.addServiceToArgo() | ||||
| 	} | ||||
| 	return firstItems, lastItems, volumes | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) createArgoTemplates(namespace string, | ||||
| func (b *ArgoBuilder) createWorkflowArgoTemplate( | ||||
| 	workflowID string, | ||||
| 	namespace string, | ||||
| 	wfDeps *models.WorkflowsDependancies, | ||||
| ) { | ||||
| 	realWorkflow, code, err := w.NewAccessor(nil).LoadOne(workflowID) | ||||
| 	if code != 200 { | ||||
| 		logger.Error().Msg("Error loading the workflow : " + err.Error()) | ||||
| 		return | ||||
| 	} | ||||
| 	subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Workflow: &models.Workflow{}, Timeout: b.Timeout} | ||||
| 	_, fi, li, err := subBuilder.CreateDAG(namespace, false) | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Error creating the subworkflow : " + err.Error()) | ||||
| 		return | ||||
| 	} | ||||
| 	wfDeps.FirstWfTasks[workflowID] = fi | ||||
| 	if depsOfIds := subBuilder.OriginWorkflow.IsDependancy(workflowID); len(depsOfIds) > 0 { // IS BEFORE | ||||
| 		wfDeps.LastWfTasks[workflowID] = li | ||||
| 		wfDeps.RelatedWfTasks[workflowID] = models.TransformDepsToArgo(depsOfIds) | ||||
| 	} | ||||
| 	subDag := subBuilder.Workflow.GetDag() | ||||
| 	d := b.Workflow.GetDag() | ||||
| 	d.Tasks = append(d.Tasks, subDag.Tasks...) // add the tasks of the subworkflow to the main workflow | ||||
| 	b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, subBuilder.Workflow.Spec.Templates...) | ||||
| 	b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, subBuilder.Workflow.Spec.Volumes...) | ||||
| 	b.Workflow.Spec.Arguments = append(b.Workflow.Spec.Arguments, subBuilder.Workflow.Spec.Arguments...) | ||||
| 	b.Services = append(b.Services, subBuilder.Services...) | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) createArgoTemplates( | ||||
| 	namespace string, | ||||
| 	id string, | ||||
| 	processing *resources.ProcessingResource, | ||||
| 	volumes []VolumeMount, | ||||
| 	volumes []models.VolumeMount, | ||||
| 	firstItems []string, | ||||
| 	lastItems []string) ([]VolumeMount, []string, []string) { | ||||
| 	_, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems) | ||||
| 	template := &Template{Name: getArgoName(processing.GetName(), id)} | ||||
| 	fmt.Println("Creating template for", template.Name) | ||||
| 	isReparted, peerId := b.isProcessingReparted(*processing,id) | ||||
| 	template.CreateContainer(processing, b.Workflow.getDag()) | ||||
| 	if isReparted { | ||||
| 		b.RemotePeers = append(b.RemotePeers, peerId) | ||||
| 		template.AddAdmiraltyAnnotations(peerId) | ||||
| 	lastItems []string, | ||||
| ) ([]models.VolumeMount, []string, []string) { | ||||
| 	_, firstItems, lastItems = NewTask(processing.Name, id).BindToArgo(b.Workflow.GetDag(), id, b.OriginWorkflow, processing, firstItems, lastItems) | ||||
| 	template := &Template{Name: models.GetArgoName(processing.GetName(), id)} | ||||
| 	logger.Info().Msg(fmt.Sprint("Creating template for", template.Name)) | ||||
|  | ||||
| 	template.CreateContainer(processing, b.Workflow.GetDag()) | ||||
| 	if err := b.RepartiteProcess(*processing, id, template, namespace); err != nil { | ||||
| 		logger.Error().Msg(fmt.Sprint("problem to sets up repartition expected %v", err.Error())) | ||||
| 		return volumes, firstItems, lastItems | ||||
| 	} | ||||
| 	// get datacenter from the processing | ||||
| 	if processing.IsService { | ||||
| @@ -189,259 +135,97 @@ func (b *ArgoBuilder) createArgoTemplates(namespace string, | ||||
| 		template.Metadata.Labels = make(map[string]string) | ||||
| 		template.Metadata.Labels["app"] = "oc-service-" + processing.GetName() // Construct the template for the k8s service and add a link in graph between k8s service and processing | ||||
| 	} | ||||
| 	related := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.Graph.IsStorage) | ||||
| 	for _, r := range related { | ||||
| 		storage := r.Node.(*resources.StorageResource) | ||||
| 		for _, linkToStorage := range r.Links { | ||||
| 			for _, rw := range linkToStorage.StorageLinkInfos { | ||||
| 				art := Artifact{Path: template.ReplacePerEnv(rw.Source, linkToStorage.Env)} | ||||
| 				if rw.Write { | ||||
| 					art.Name = storage.GetName() + "-" + rw.Destination + "-input-write" | ||||
| 				} else { | ||||
| 					art.Name = storage.GetName() + "-" + rw.Destination + "-input-read" | ||||
| 				} | ||||
| 				if storage.StorageType == enum.S3 { | ||||
| 					art.S3 = &Key{ | ||||
| 						Key:      template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env), | ||||
| 						Insecure: true, // temporary | ||||
| 					} | ||||
| 					sel := storage.GetSelectedInstance() | ||||
| 					if sel != nil { | ||||
| 						if sel.(*resources.StorageResourceInstance).Credentials != nil { | ||||
| 							tool, err := tools2.NewService(conf.GetConfig().Mode) | ||||
| 							if err != nil || tool == nil { | ||||
| 								logger.Error().Msg("Could not create the access secret") | ||||
| 							} else { | ||||
| 								id, err := tool.CreateAccessSecret(namespace, | ||||
| 									sel.(*resources.StorageResourceInstance).Credentials.Login, | ||||
| 									sel.(*resources.StorageResourceInstance).Credentials.Pass) | ||||
| 								if err == nil { | ||||
| 									art.S3.AccessKeySecret = &Secret{ | ||||
| 										Name: id, | ||||
| 										Key:  "access-key", | ||||
| 									} | ||||
| 									art.S3.SecretKeySecret = &Secret{ | ||||
| 										Name: id, | ||||
| 										Key:  "secret-key", | ||||
| 									} | ||||
| 								} | ||||
| 							} | ||||
| 						} | ||||
| 						art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source+"/", "") | ||||
| 						art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source, "") | ||||
| 						splits := strings.Split(art.S3.EndPoint, "/") | ||||
| 						if len(splits) > 1 { | ||||
| 							art.S3.Bucket = splits[0] | ||||
| 							art.S3.EndPoint = strings.Join(splits[1:], "/") | ||||
| 						} else { | ||||
| 							art.S3.Bucket = splits[0] | ||||
| 						} | ||||
| 					} | ||||
| 				} | ||||
| 				if rw.Write { | ||||
| 					template.Outputs.Artifacts = append(template.Inputs.Artifacts, art) | ||||
| 				} else { | ||||
| 					template.Inputs.Artifacts = append(template.Outputs.Artifacts, art) | ||||
| 				} | ||||
| 			} | ||||
| 		} | ||||
| 		index := 0 | ||||
| 		if storage.SelectedInstanceIndex != nil && (*storage.SelectedInstanceIndex) >= 0 { | ||||
| 			index = *storage.SelectedInstanceIndex | ||||
| 		} | ||||
| 		s := storage.Instances[index] | ||||
| 		if s.Local { | ||||
| 			volumes = template.Container.AddVolumeMount(VolumeMount{ | ||||
| 				Name:      strings.ReplaceAll(strings.ToLower(storage.GetName()), " ", "-"), | ||||
| 				MountPath: s.Source, | ||||
| 				Storage:   storage, | ||||
| 			}, volumes) | ||||
| 		} | ||||
| 	} | ||||
| 	b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, *template) | ||||
| 	return volumes, firstItems, lastItems | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) addTaskToArgo(dag *Dag, graphItemID string, processing *resources.ProcessingResource, | ||||
| 	firstItems []string, lastItems []string) (*Dag, []string, []string) { | ||||
| 	unique_name := getArgoName(processing.GetName(), graphItemID) | ||||
| 	step := Task{Name: unique_name, Template: unique_name} | ||||
| 	instance := processing.GetSelectedInstance() | ||||
| 	if instance != nil { | ||||
| 		for _, value := range instance.(*resources.ProcessingInstance).Env { | ||||
| 			step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{ | ||||
| 				Name:  value.Name, | ||||
| 				Value: value.Value, | ||||
| 			}) | ||||
| 		} | ||||
| 		for _, value := range instance.(*resources.ProcessingInstance).Inputs { | ||||
| 			step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{ | ||||
| 				Name:  value.Name, | ||||
| 				Value: value.Value, | ||||
| 			}) | ||||
| 		} | ||||
| 		for _, value := range instance.(*resources.ProcessingInstance).Outputs { | ||||
| 			step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{ | ||||
| 				Name:  value.Name, | ||||
| 				Value: value.Value, | ||||
| 			}) | ||||
| 		} | ||||
| 	} | ||||
| 	step.Dependencies = b.getArgoDependencies(graphItemID) | ||||
| 	name := "" | ||||
| 	if b.OriginWorkflow.Graph.Items[graphItemID].Processing != nil { | ||||
| 		name = b.OriginWorkflow.Graph.Items[graphItemID].Processing.GetName() | ||||
| 	} | ||||
| 	if b.OriginWorkflow.Graph.Items[graphItemID].Workflow != nil { | ||||
| 		name = b.OriginWorkflow.Graph.Items[graphItemID].Workflow.GetName() | ||||
| 	} | ||||
| 	if len(step.Dependencies) == 0 && name != "" { | ||||
| 		firstItems = append(firstItems, getArgoName(name, graphItemID)) | ||||
| 	} | ||||
| 	if ok, _ := b.isArgoDependancy(graphItemID); !ok && name != "" { | ||||
| 		lastItems = append(lastItems, getArgoName(name, graphItemID)) | ||||
| 	} | ||||
| 	dag.Tasks = append(dag.Tasks, step) | ||||
| 	return dag, firstItems, lastItems | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) createVolumes(volumes []VolumeMount) { // TODO : one think about remote volume but TG | ||||
| func (b *ArgoBuilder) createVolumes(volumes []models.VolumeMount) { // TODO : one think about remote volume but TG | ||||
| 	for _, volume := range volumes { | ||||
| 		index := 0 | ||||
| 		if volume.Storage.SelectedInstanceIndex != nil && (*volume.Storage.SelectedInstanceIndex) >= 0 { | ||||
| 			index = *volume.Storage.SelectedInstanceIndex | ||||
| 		} | ||||
| 		storage := volume.Storage.Instances[index] | ||||
| 		new_volume := VolumeClaimTemplate{} | ||||
| 		new_volume.Metadata.Name = strings.ReplaceAll(strings.ToLower(volume.Name), " ", "-") | ||||
| 		new_volume.Spec.AccessModes = []string{"ReadWriteOnce"} | ||||
| 		new_volume.Spec.Resources.Requests.Storage = fmt.Sprintf("%v", storage.SizeGB) + storage.SizeType.ToArgo() | ||||
| 		b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) | ||||
| 		volume.BindToArgo(b.Workflow) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) { | ||||
| 	dependancyOfIDs := []string{} | ||||
| 	isDeps := false | ||||
| 	for _, link := range b.OriginWorkflow.Graph.Links { | ||||
| 		if _, ok := b.OriginWorkflow.Graph.Items[link.Destination.ID]; !ok { | ||||
| 			fmt.Println("Could not find the source of the link", link.Destination.ID) | ||||
| 			continue | ||||
| 		} | ||||
| 		source := b.OriginWorkflow.Graph.Items[link.Destination.ID].Processing | ||||
| 		if id == link.Source.ID && source != nil { | ||||
| 			isDeps = true | ||||
| 			dependancyOfIDs = append(dependancyOfIDs, getArgoName(source.GetName(), link.Destination.ID)) | ||||
| 		} | ||||
| 		wourceWF := b.OriginWorkflow.Graph.Items[link.Destination.ID].Workflow | ||||
| 		if id == link.Source.ID && wourceWF != nil { | ||||
| 			isDeps = true | ||||
| 			dependancyOfIDs = append(dependancyOfIDs, getArgoName(wourceWF.GetName(), link.Destination.ID)) | ||||
| 		} | ||||
| 	} | ||||
| 	return isDeps, dependancyOfIDs | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) getArgoDependencies(id string) (dependencies []string) { | ||||
| 	for _, link := range b.OriginWorkflow.Graph.Links { | ||||
| 		if _, ok := b.OriginWorkflow.Graph.Items[link.Source.ID]; !ok { | ||||
| 			fmt.Println("Could not find the source of the link", link.Source.ID) | ||||
| 			continue | ||||
| 		} | ||||
| 		source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing | ||||
| 		if id == link.Destination.ID && source != nil { | ||||
| 			dependency_name := getArgoName(source.GetName(), link.Source.ID) | ||||
| 			dependencies = append(dependencies, dependency_name) | ||||
| 			continue | ||||
| 		} | ||||
| 	} | ||||
| 	return | ||||
| } | ||||
|  | ||||
| func getArgoName(raw_name string, component_id string) (formatedName string) { | ||||
| 	formatedName = strings.ReplaceAll(raw_name, " ", "-") | ||||
| 	formatedName += "-" + component_id | ||||
| 	formatedName = strings.ToLower(formatedName) | ||||
| 	return | ||||
| } | ||||
|  | ||||
| // Verify if a processing resource is attached to another Compute than the one hosting | ||||
| // the current Open Cloud instance. If true return the peer ID to contact | ||||
| func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource, graphID string) (bool,string) { | ||||
| 	computeAttached := b.retrieveProcessingCompute(graphID) | ||||
| 	if computeAttached == nil { | ||||
| 		logger.Error().Msg("No compute was found attached to processing " + processing.Name + " : " + processing.UUID ) | ||||
| 		panic(0) | ||||
| func (b *ArgoBuilder) RepartiteProcess(processing resources.ProcessingResource, graphID string, template *models.Template, namespace string) error { | ||||
| 	computeAttached := b.OriginWorkflow.GetByRelatedProcessing(processing.GetID(), b.OriginWorkflow.Graph.IsCompute) | ||||
| 	if len(computeAttached) == 0 { | ||||
| 		return errors.New("No compute was found attached to processing " + processing.Name + " : " + processing.UUID) | ||||
| 	} | ||||
|  | ||||
| 	 | ||||
| 	// Creates an accessor srtictly for Peer Collection  | ||||
| 	req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"","",nil,nil) | ||||
| 	if req == nil { | ||||
| 		fmt.Println("TODO : handle error when trying to create a request on the Peer Collection") | ||||
| 		return false, "" | ||||
| 	}  | ||||
|  | ||||
| 	res := req.LoadOne(computeAttached.CreatorID) | ||||
| 	if res.Err != "" { | ||||
| 		fmt.Print("TODO : handle error when requesting PeerID") | ||||
| 		fmt.Print(res.Err) | ||||
| 		return false, "" | ||||
| 	} | ||||
| 	 | ||||
| 	peer := *res.ToPeer() | ||||
|  | ||||
| 	isNotReparted, _ := peer.IsMySelf() | ||||
| 	fmt.Println("Result IsMySelf for ", peer.UUID ," : ", isNotReparted) | ||||
| 	 | ||||
| 	return !isNotReparted, peer.UUID | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.ComputeResource { | ||||
| 	for _, link := range b.OriginWorkflow.Graph.Links { | ||||
| 		// If a link contains the id of the processing | ||||
| 		var oppositeId string  | ||||
| 		if link.Source.ID == graphID{ | ||||
| 			oppositeId = link.Destination.ID | ||||
| 		} else if(link.Destination.ID == graphID){ | ||||
| 			oppositeId = link.Source.ID | ||||
| 	// Creates an accessor srtictly for Peer Collection | ||||
| 	for _, related := range computeAttached { | ||||
| 		instance := related.Node.GetSelectedInstance().(*resources.ComputeResourceInstance) | ||||
| 		if instance == nil { | ||||
| 			continue | ||||
| 		} | ||||
| 		fmt.Println("OppositeId : ", oppositeId) | ||||
| 		if oppositeId != "" { | ||||
| 			dt, res := b.OriginWorkflow.Graph.GetResource(oppositeId) | ||||
| 			if dt == oclib.COMPUTE_RESOURCE { | ||||
| 				return res.(*resources.ComputeResource) | ||||
| 			} else { | ||||
| 				continue | ||||
| 			} | ||||
| 		partner := instance.GetSelectedPartnership(conf.GetConfig().PeerID, conf.GetConfig().Groups) | ||||
| 		if partner == nil { | ||||
| 			logger.Error().Msg("can't proceed on datacenter because of missing pricing profiles " + related.Node.GetID()) | ||||
| 			continue | ||||
| 		} | ||||
| 		garanteed, allowed := b.setResourcesAllowedAndGaranteed(b.Workflow.GetDag(), models.NewBounds(), models.NewBounds(), "gpu", partner) | ||||
| 		garanteed, allowed = b.setResourcesAllowedAndGaranteed(b.Workflow.GetDag(), garanteed, allowed, "cpu", partner) | ||||
| 		garanteed.Set(float64(partner.(*resources.ComputeResourcePartnership).MinGaranteedRAMSize), "ram", false) | ||||
| 		allowed.Set(float64(partner.(*resources.ComputeResourcePartnership).MaxAllowedRAMSize), "ram", false) | ||||
|  | ||||
| 		res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil).LoadOne(related.Node.GetCreatorID()) | ||||
| 		if res.Err != "" { | ||||
| 			return errors.New(res.Err) | ||||
| 		} | ||||
|  | ||||
| 		peer := *res.ToPeer() | ||||
|  | ||||
| 		isNotReparted := peer.State == 1 | ||||
| 		logger.Info().Msg(fmt.Sprint("Result IsMySelf for ", peer.UUID, " : ", isNotReparted)) | ||||
| 		if !(isNotReparted) { | ||||
| 			logger.Debug().Msg("Reparted processing, on " + peer.UUID) | ||||
| 			b.RemotePeers = append(b.RemotePeers, peer.UUID) | ||||
| 			template.AddAdmiraltyAnnotations(peer.UUID, namespace) | ||||
| 		} | ||||
| 	} | ||||
| 		 | ||||
| 	return nil  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) setResourcesAllowedAndGaranteed(dag *Dag, minbound *models.Bounds, maxbound *models.Bounds, typ string, partner resources.ResourcePartnerITF) (*models.Bounds, *models.Bounds) { | ||||
| 	selector := "" | ||||
| 	values := map[string]float64{} | ||||
| 	if typ == "gpu" { | ||||
| 		values = partner.(*resources.ComputeResourcePartnership).MinGaranteedGPUsMemoryGB | ||||
| 	} else { | ||||
| 		values = partner.(*resources.ComputeResourcePartnership).MinGaranteedCPUsCores | ||||
| 	} | ||||
| 	for name, GPU := range values { | ||||
| 		if minbound.Set(float64(GPU), typ, true) { | ||||
| 			selector = name | ||||
| 		} | ||||
| 	} | ||||
| 	if selector != "" { | ||||
| 		for _, t := range dag.Tasks { | ||||
| 			t.NodeSelector[typ+"-type"] = selector | ||||
| 		} | ||||
| 	} | ||||
| 	if typ == "gpu" { | ||||
| 		values = partner.(*resources.ComputeResourcePartnership).MaxAllowedGPUsMemoryGB | ||||
| 	} else { | ||||
| 		values = partner.(*resources.ComputeResourcePartnership).MaxAllowedCPUsCores | ||||
| 	} | ||||
| 	if max, ok := values[selector]; ok { | ||||
| 		maxbound.Set(float64(max), typ, false) | ||||
| 	} else { | ||||
| 		maxbound.GPU = minbound.GPU | ||||
| 	} | ||||
| 	return minbound, maxbound | ||||
| } | ||||
|  | ||||
| // Execute the last actions once the YAML file for the Argo Workflow is created | ||||
| func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { | ||||
| 	fmt.Println("DEV :: Completing build") | ||||
| 	setter := AdmiraltySetter{Id: executionsId} | ||||
| func (b *ArgoBuilder) CompleteBuild(namespace string) (string, error) { | ||||
| 	logger.Info().Msg("DEV :: Completing build") | ||||
| 	setter := AdmiraltySetter{Id: namespace} | ||||
| 	// Setup admiralty for each node | ||||
| 	for _, peer := range b.RemotePeers { | ||||
| 		fmt.Println("DEV :: Launching Admiralty Setup for ", peer) | ||||
| 		setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer) | ||||
| 		logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer)) | ||||
| 		setter.InitializeAdmiralty(conf.GetConfig().PeerID, peer) | ||||
| 	} | ||||
|  | ||||
| 	// Update the name of the admiralty node to use  | ||||
| 	for _, template := range b.Workflow.Spec.Templates { | ||||
| 		if len(template.Metadata.Annotations) > 0 { | ||||
| 			if resp, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok { | ||||
| 				fmt.Println(resp) | ||||
| 				template.Metadata.Annotations["multicluster.admiralty.io/clustername"] = "target-" + conf.GetConfig().ExecutionID | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// Generate the YAML file | ||||
| 	random_name := fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8) | ||||
| 	b.Workflow.Metadata.Name = "oc-monitor-" + random_name | ||||
| @@ -449,18 +233,18 @@ func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { | ||||
| 	yamlified, err := yaml.Marshal(b.Workflow) | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not transform object to yaml file") | ||||
| 		return "", err  | ||||
| 		return "", err | ||||
| 	} | ||||
| 	// Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss | ||||
| 	current_timestamp := time.Now().Format("02_01_2006_150405") | ||||
| 	file_name := random_name + "_" + current_timestamp + ".yml" | ||||
| 	workflows_dir := "./argo_workflows/" | ||||
| 	err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660) | ||||
| 	 | ||||
|  | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not write the yaml file") | ||||
| 		return "", err | ||||
| 	} | ||||
|  | ||||
| 	return workflows_dir + file_name, nil | ||||
| } | ||||
| } | ||||
|   | ||||
| @@ -5,7 +5,6 @@ import ( | ||||
| 	"strings" | ||||
|  | ||||
| 	"cloud.o-forge.io/core/oc-lib/models/resources" | ||||
| 	"gopkg.in/yaml.v3" | ||||
| ) | ||||
|  | ||||
| func (b *ArgoBuilder) CreateService(id string, processing *resources.ProcessingResource) { | ||||
| @@ -47,20 +46,9 @@ func (b *ArgoBuilder) completeServicePorts(service *models.Service, id string, p | ||||
|  | ||||
| func (b *ArgoBuilder) addServiceToArgo() error { | ||||
| 	for _, service := range b.Services { | ||||
| 		service_manifest, err := yaml.Marshal(service) | ||||
| 		if err != nil { | ||||
| 		if err := service.BindToArgo(b.Workflow); err != nil { | ||||
| 			return err | ||||
| 		} | ||||
| 		service_template := models.Template{Name: "workflow-service-pod", | ||||
| 			Resource: models.ServiceResource{ | ||||
| 				Action:            "create", | ||||
| 				SuccessCondition:  "status.succeeded > 0", | ||||
| 				FailureCondition:  "status.failed > 3", | ||||
| 				SetOwnerReference: true, | ||||
| 				Manifest:          string(service_manifest), | ||||
| 			}, | ||||
| 		} | ||||
| 		b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, service_template) | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|   | ||||
| @@ -3,6 +3,7 @@ package workflow_builder | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"fmt" | ||||
| 	"oc-monitord/models" | ||||
|  | ||||
| 	oclib "cloud.o-forge.io/core/oc-lib" | ||||
| 	workflow "cloud.o-forge.io/core/oc-lib/models/workflow" | ||||
| @@ -14,7 +15,7 @@ type WorflowDB struct { | ||||
|  | ||||
| // Create the obj!ects from the mxgraphxml stored in the workflow given as a parameter | ||||
| func (w *WorflowDB) LoadFrom(workflow_id string, peerID string) error { | ||||
| 	fmt.Println("Loading workflow from " + workflow_id) | ||||
| 	logger.Info().Msg("Loading workflow from " + workflow_id) | ||||
| 	var err error | ||||
| 	if w.Workflow, err = w.getWorkflow(workflow_id, peerID); err != nil { | ||||
| 		return err | ||||
| @@ -27,7 +28,7 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo | ||||
| 	logger := oclib.GetLogger() | ||||
|  | ||||
| 	lib_data := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerID, []string{}, nil).LoadOne(workflow_id) | ||||
| 	fmt.Println("ERR", lib_data.Code, lib_data.Err) | ||||
| 	logger.Info().Msg(fmt.Sprint("ERR", lib_data.Code, lib_data.Err)) | ||||
| 	if lib_data.Code != 200 { | ||||
| 		logger.Error().Msg("Error loading the graph") | ||||
| 		return workflow, errors.New(lib_data.Err) | ||||
| @@ -43,12 +44,12 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo | ||||
|  | ||||
| func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, int, error) { | ||||
| 	logger := oclib.GetLogger() | ||||
| 	fmt.Println("Exporting to Argo", w.Workflow) | ||||
| 	logger.Info().Msg(fmt.Sprint("Exporting to Argo", w.Workflow)) | ||||
| 	if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil { | ||||
| 		return nil, 0, fmt.Errorf("can't export a graph that has not been loaded yet") | ||||
| 	} | ||||
|  | ||||
| 	argoBuilder := ArgoBuilder{OriginWorkflow: w.Workflow, Timeout: timeout} | ||||
| 	argoBuilder := ArgoBuilder{OriginWorkflow: w.Workflow, Workflow: &models.Workflow{}, Timeout: timeout} | ||||
| 	stepMax, _, _, err := argoBuilder.CreateDAG(namespace, true) | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not create the argo file for " + w.Workflow.Name) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user