Compare commits
	
		
			28 Commits
		
	
	
		
			feature/ad
			...
			test-ram
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 411effb000 | ||
|  | 7c913bec0e | ||
|  | bdbbd7697a | ||
|  | 6917295fbd | ||
|  | e1b0ad089c | ||
|  | 483f747754 | ||
|  | 03675d09ae | ||
|  | f3e84a4f43 | ||
|  | eae5474552 | ||
|  | bae9cb2011 | ||
|  | 65b8960703 | ||
|  | 90aa19caeb | ||
|  | dcb3e2b7cc | ||
|  | c871d68333 | ||
|  | 6cf5da787a | ||
|  | fa4db92c92 | ||
|  | ee94c1aa42 | ||
|  | c40b18f1d6 | ||
|  | 2932fb2710 | ||
|  | 2343a5329e | ||
|  | 86fa41a376 | ||
|  | 6ec7a670bd | ||
| 6323d4eed4 | |||
| 93f3806b86 | |||
|  | ade18f1042 | ||
| 83d118fb05 | |||
| f7f0c9c2d2 | |||
| aea7cbd41c | 
							
								
								
									
										2
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								Makefile
									
									
									
									
									
								
							| @@ -3,6 +3,8 @@ | |||||||
| build: clean | build: clean | ||||||
| 	go build . | 	go build . | ||||||
|  |  | ||||||
|  | dev: build | ||||||
|  |  | ||||||
| run: | run: | ||||||
| 	./oc-monitord | 	./oc-monitord | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										69
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										69
									
								
								README.md
									
									
									
									
									
								
							| @@ -1,52 +1,26 @@ | |||||||
| # oc-monitor | # oc-monitor | ||||||
|  |  | ||||||
| ## Deploy in k8s (dev) | DO : | ||||||
|  |   make build | ||||||
|  |  | ||||||
| While a registry with all of the OC docker images has not been set-up we can export this image to k3s ctr | ## Summary | ||||||
|  |  | ||||||
| > docker save oc-monitord:latest | sudo k3s ctr images import - | oc-monitord is a daemon which can be run : | ||||||
|  | - as a binary | ||||||
|  | - as a container | ||||||
|  |  | ||||||
| Then in the pod manifest for oc-monitord use :  | It is used to perform several actions regarding the execution of an Open Cloud workflow : | ||||||
|  | - generating a YAML file that can be interpreted by **Argo Workflow** to create and execute pods in a kubernetes environment | ||||||
| ``` | - setting up the different resources needed to execute a workflow over several peers/kubernetes nodes with **Admiralty** : token, secrets, targets and sources | ||||||
| image: docker.io/library/oc-monitord | - creating the workflow and logging the output from  | ||||||
| imagePullPolicy: Never |   - Argo watch, which gives informations about the workflow in general (phase, number of steps executed, status...) | ||||||
| ``` |   - Pods : which are the logs generated by the pods  | ||||||
|  |  | ||||||
| Not doing so will end up in the pod having a `ErrorImagePull` |  | ||||||
|  |  | ||||||
| ## Allow argo to create services |  | ||||||
|  |  | ||||||
| In order for monitord to expose **open cloud services** on the node, we need to give him permission to create **k8s services**. |  | ||||||
|  |  | ||||||
| For that we can update the RBAC configuration for a role already created by argo :  |  | ||||||
|  |  | ||||||
| ### Manually edit the rbac authorization  |  | ||||||
|  |  | ||||||
| > kubectl edit roles.rbac.authorization.k8s.io -n argo argo-role |  | ||||||
|  |  | ||||||
| In rules add a new entry :  |  | ||||||
|  |  | ||||||
| ``` |  | ||||||
| - apiGroups: |  | ||||||
|   - "" |  | ||||||
|   resources:   |  | ||||||
|   - services |  | ||||||
|     verbs:                                                                                                                                                           |  | ||||||
|     - get |  | ||||||
|     - create |  | ||||||
| ``` |  | ||||||
|  |  | ||||||
| ### Patch the rbac authorization with a one liner  |  | ||||||
|  |  | ||||||
| > kubectl patch role argo-role -n argo --type='json' -p='[{"op": "add", "path": "/rules/-", "value": {"apiGroups": [""], "resources": ["services"], "verbs": ["get","create"]}}]' |  | ||||||
|  |  | ||||||
| ### Check wether the modification is effective  |  | ||||||
|  |  | ||||||
| > kubectl auth can-i create services --as=system:serviceaccount:argo:argo -n argo |  | ||||||
|  |  | ||||||
| This command **must return "yes"** |  | ||||||
|  |  | ||||||
|  | To execute, the daemon needs several options :  | ||||||
|  | - **-u** :   | ||||||
|  | - **-m** : | ||||||
|  | - **-d** : | ||||||
|  | - **-e** : | ||||||
|  |  | ||||||
| # Notes features/admiralty-docker | # Notes features/admiralty-docker | ||||||
|  |  | ||||||
| @@ -57,14 +31,17 @@ This command **must return "yes"** | |||||||
|   - decide that no peer can have "http://localhost" as its url and use an attribute from the peer object or isMyself() from oc-lib if a peer is the current host. |   - decide that no peer can have "http://localhost" as its url and use an attribute from the peer object or isMyself() from oc-lib if a peer is the current host. | ||||||
|  |  | ||||||
|  |  | ||||||
| ## TODO | ## TODO | ||||||
|  |  | ||||||
| - [ ] Allow the front to known on which IP the service are reachable | - [ ] Allow the front to known on which IP the service are reachable | ||||||
|   - currently doing it by using `kubectl get nodes -o wide` |   - currently doing it by using `kubectl get nodes -o wide` | ||||||
|  |  | ||||||
|  | - [ ] Implement writing and reading from S3 bucket/MinIO when a data resource is linked to a compute resource. | ||||||
|  |  | ||||||
| ### Adding ingress handling to support reverse proxing |  | ||||||
|  | ### Adding ingress handling to support reverse proxing | ||||||
|  |  | ||||||
| - Test wether ingress-nginx is running or not | - Test wether ingress-nginx is running or not | ||||||
|   - Do something if not found : stop running and send error log OR start installation |   - Do something if not found : stop running and send error log OR start installation | ||||||
| -  | -  | ||||||
|  |  | ||||||
|   | |||||||
							
								
								
									
										
											BIN
										
									
								
								docs/admiralty_naming_multi_peer.jpg
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								docs/admiralty_naming_multi_peer.jpg
									
									
									
									
									
										Normal file
									
								
							
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 71 KiB | 
							
								
								
									
										
											BIN
										
									
								
								docs/admiralty_setup_schema.jpg
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								docs/admiralty_setup_schema.jpg
									
									
									
									
									
										Normal file
									
								
							
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 91 KiB | 
							
								
								
									
										4
									
								
								env.env
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								env.env
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,4 @@ | |||||||
|  | KUBERNETES_SERVICE_HOST=192.168.1.169 | ||||||
|  | KUBE_CA="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFTVlk3ZHZhNEdYTVdkMy9jMlhLN3JLYjlnWXgyNSthaEE0NmkyNVBkSFAKRktQL2UxSVMyWVF0dzNYZW1TTUQxaStZdzJSaVppNUQrSVZUamNtNHdhcnFvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVWtlUVJpNFJiODduME5yRnZaWjZHClc2SU55NnN3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUlnRXA5ck04WmdNclRZSHYxZjNzOW5DZXZZeWVVa3lZUk4KWjUzazdoaytJS1FDSVFDbk05TnVGKzlTakIzNDFacGZ5ays2NEpWdkpSM3BhcmVaejdMd2lhNm9kdz09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K" | ||||||
|  | KUBE_CERT="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJWUxWNkFPQkdrU1F3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOekl6TVRFeU1ETTJNQjRYRFRJME1EZ3dPREV3TVRNMU5sb1hEVEkxTURndwpPREV3TVRNMU5sb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJGQ2Q1MFdPeWdlQ2syQzcKV2FrOWY4MVAvSkJieVRIajRWOXBsTEo0ck5HeHFtSjJOb2xROFYxdUx5RjBtOTQ2Nkc0RmRDQ2dqaXFVSk92Swp3NVRPNnd5alNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFJkOFI5cXVWK2pjeUVmL0ovT1hQSzMyS09XekFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQTArbThqTDBJVldvUTZ0dnB4cFo4NVlMalF1SmpwdXM0aDdnSXRxS3NmUVVDSUI2M2ZNdzFBMm5OVWU1TgpIUGZOcEQwSEtwcVN0Wnk4djIyVzliYlJUNklZCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFRc3hXWk9pbnIrcVp4TmFEQjVGMGsvTDF5cE01VHAxOFRaeU92ektJazQKRTFsZWVqUm9STW0zNmhPeVljbnN3d3JoNnhSUnBpMW5RdGhyMzg0S0Z6MlBvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBYZkVmYXJsZm8zTWhIL3lmemx6Cnl0OWlqbHN3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUxJL2dNYnNMT3MvUUpJa3U2WHVpRVMwTEE2cEJHMXgKcnBlTnpGdlZOekZsQWlFQW1wdjBubjZqN3M0MVI0QzFNMEpSL0djNE53MHdldlFmZWdEVGF1R2p3cFk9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K" | ||||||
|  | KUBE_DATA="LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSU5ZS1BFb1dhd1NKUzJlRW5oWmlYMk5VZlY1ZlhKV2krSVNnV09TNFE5VTlvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFVUozblJZN0tCNEtUWUx0WnFUMS96VS84a0Z2Sk1lUGhYMm1Vc25pczBiR3FZblkyaVZEeApYVzR2SVhTYjNqcm9iZ1YwSUtDT0twUWs2OHJEbE03ckRBPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo=" | ||||||
							
								
								
									
										3
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								go.mod
									
									
									
									
									
								
							| @@ -20,6 +20,7 @@ require ( | |||||||
| 	github.com/golang/protobuf v1.5.4 // indirect | 	github.com/golang/protobuf v1.5.4 // indirect | ||||||
| 	github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect | 	github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect | ||||||
| 	github.com/sirupsen/logrus v1.9.3 // indirect | 	github.com/sirupsen/logrus v1.9.3 // indirect | ||||||
|  | 	github.com/ugorji/go/codec v1.1.7 // indirect | ||||||
| 	google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect | 	google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect | ||||||
| 	google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect | 	google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect | ||||||
| 	google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect | 	google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect | ||||||
| @@ -27,7 +28,6 @@ require ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| require ( | require ( | ||||||
| 	github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d |  | ||||||
| 	github.com/argoproj/argo-workflows/v3 v3.6.4 | 	github.com/argoproj/argo-workflows/v3 v3.6.4 | ||||||
| 	github.com/beorn7/perks v1.0.1 // indirect | 	github.com/beorn7/perks v1.0.1 // indirect | ||||||
| 	github.com/biter777/countries v1.7.5 // indirect | 	github.com/biter777/countries v1.7.5 // indirect | ||||||
| @@ -71,7 +71,6 @@ require ( | |||||||
| 	github.com/robfig/cron v1.2.0 // indirect | 	github.com/robfig/cron v1.2.0 // indirect | ||||||
| 	github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 // indirect | 	github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 // indirect | ||||||
| 	github.com/smartystreets/goconvey v1.6.4 // indirect | 	github.com/smartystreets/goconvey v1.6.4 // indirect | ||||||
| 	github.com/ugorji/go/codec v1.1.7 // indirect |  | ||||||
| 	github.com/x448/float16 v0.8.4 // indirect | 	github.com/x448/float16 v0.8.4 // indirect | ||||||
| 	github.com/xdg-go/pbkdf2 v1.0.0 // indirect | 	github.com/xdg-go/pbkdf2 v1.0.0 // indirect | ||||||
| 	github.com/xdg-go/scram v1.1.2 // indirect | 	github.com/xdg-go/scram v1.1.2 // indirect | ||||||
|   | |||||||
							
								
								
									
										2
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.sum
									
									
									
									
									
								
							| @@ -3,8 +3,6 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT | |||||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9 h1:mSFFPwil5Ih+RPBvn88MBerQMtsoHnOuyCZQaf91a34= | cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9 h1:mSFFPwil5Ih+RPBvn88MBerQMtsoHnOuyCZQaf91a34= | ||||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE= | cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE= | ||||||
| github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= | github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= | ||||||
| github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= |  | ||||||
| github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= |  | ||||||
| github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc= | github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc= | ||||||
| github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= | github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= | ||||||
| github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= | github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= | ||||||
|   | |||||||
| @@ -7,6 +7,8 @@ import ( | |||||||
| 	"oc-monitord/tools" | 	"oc-monitord/tools" | ||||||
| 	"oc-monitord/utils" | 	"oc-monitord/utils" | ||||||
| 	"slices" | 	"slices" | ||||||
|  | 	"strings" | ||||||
|  | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/rs/zerolog" | 	"github.com/rs/zerolog" | ||||||
| @@ -106,12 +108,15 @@ func NewArgoPodLog(name string, step string, msg string) ArgoPodLog { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interface) { | func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) { | ||||||
| 	var argoWatcher *ArgoWatch | 	var argoWatcher *ArgoWatch | ||||||
| 	var pods []string | 	var pods []string | ||||||
| 	var node wfv1.NodeStatus | 	var node wfv1.NodeStatus | ||||||
|  |  | ||||||
| 	wfl := utils.GetWFLogger("") | 	wfl := utils.GetWFLogger("") | ||||||
|  | 	wfl.Debug().Msg("Starting to log " + wfName) | ||||||
|  |  | ||||||
|  | 	var wg sync.WaitGroup | ||||||
| 	 | 	 | ||||||
| 	for event := range (watcher.ResultChan()) { | 	for event := range (watcher.ResultChan()) { | ||||||
| 		wf, ok := event.Object.(*wfv1.Workflow) | 		wf, ok := event.Object.(*wfv1.Workflow) | ||||||
| @@ -138,7 +143,7 @@ func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interfac | |||||||
|  |  | ||||||
| 		newWatcher := ArgoWatch{ | 		newWatcher := ArgoWatch{ | ||||||
| 			Name: node.Name, | 			Name: node.Name, | ||||||
| 			Namespace: executionID, | 			Namespace: namespace, | ||||||
| 			Status: string(node.Phase), | 			Status: string(node.Phase), | ||||||
| 			Created: node.StartedAt.String(), | 			Created: node.StartedAt.String(), | ||||||
| 			Started: node.StartedAt.String(), | 			Started: node.StartedAt.String(), | ||||||
| @@ -163,7 +168,9 @@ func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interfac | |||||||
| 			if !slices.Contains(pods,pod.Name){ | 			if !slices.Contains(pods,pod.Name){ | ||||||
| 				pl := wfl.With().Str("pod",  pod.Name).Logger() | 				pl := wfl.With().Str("pod",  pod.Name).Logger() | ||||||
| 				if wfName == pod.Name { pods = append(pods, pod.Name); continue }	// One of the node is the Workflow, the others are the pods so don't try to log on the wf name | 				if wfName == pod.Name { pods = append(pods, pod.Name); continue }	// One of the node is the Workflow, the others are the pods so don't try to log on the wf name | ||||||
| 				go logKubernetesPods(executionID, wfName, pod.Name, pl) | 				pl.Info().Msg("Found a new pod to log : "  + pod.Name) | ||||||
|  | 				wg.Add(1) | ||||||
|  | 				go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg) | ||||||
| 				pods = append(pods, pod.Name) | 				pods = append(pods, pod.Name) | ||||||
| 			}  | 			}  | ||||||
| 		} | 		} | ||||||
| @@ -171,6 +178,8 @@ func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interfac | |||||||
| 		// Stop listening to the chan when the Workflow is completed or something bad happened | 		// Stop listening to the chan when the Workflow is completed or something bad happened | ||||||
| 		if node.Phase.Completed() { | 		if node.Phase.Completed() { | ||||||
| 			wfl.Info().Msg(wfName + " worflow completed") | 			wfl.Info().Msg(wfName + " worflow completed") | ||||||
|  | 			wg.Wait() | ||||||
|  | 			wfl.Info().Msg(wfName + " exiting") | ||||||
| 			break | 			break | ||||||
| 		} | 		} | ||||||
| 		if node.Phase.FailedOrError() { | 		if node.Phase.FailedOrError() { | ||||||
| @@ -196,7 +205,13 @@ func retrieveCondition(wf *wfv1.Workflow) (c Conditions) { | |||||||
| } | } | ||||||
|  |  | ||||||
| // Function needed to be executed as a go thread  | // Function needed to be executed as a go thread  | ||||||
| func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger){ | func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger, wg *sync.WaitGroup){ | ||||||
|  | 	defer wg.Done() | ||||||
|  | 	 | ||||||
|  | 	s := strings.Split(podName, ".") | ||||||
|  | 	name := s[0] + "-" + s[1] | ||||||
|  | 	step := s[1] | ||||||
|  | 	 | ||||||
| 	k, err := tools.NewKubernetesTool() | 	k, err := tools.NewKubernetesTool() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		logger.Error().Msg("Could not get Kubernetes tools") | 		logger.Error().Msg("Could not get Kubernetes tools") | ||||||
| @@ -212,8 +227,9 @@ func logKubernetesPods(executionId string, wfName string,podName string, logger | |||||||
| 	scanner := bufio.NewScanner(reader) | 	scanner := bufio.NewScanner(reader) | ||||||
| 	for scanner.Scan() { | 	for scanner.Scan() { | ||||||
| 		log := scanner.Text() | 		log := scanner.Text() | ||||||
| 		podLog := NewArgoPodLog(wfName,podName,log) | 		podLog := NewArgoPodLog(name,step,log) | ||||||
| 		jsonified, _ := json.Marshal(podLog) | 		jsonified, _ := json.Marshal(podLog) | ||||||
| 		logger.Info().Msg(string(jsonified)) | 		logger.Info().Msg(string(jsonified)) | ||||||
| 	} | 	} | ||||||
|  | 	 | ||||||
| } | } | ||||||
| @@ -3,7 +3,6 @@ package logger | |||||||
| import ( | import ( | ||||||
| 	"bufio" | 	"bufio" | ||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"fmt" |  | ||||||
| 	"io" | 	"io" | ||||||
| 	"oc-monitord/conf" | 	"oc-monitord/conf" | ||||||
|  |  | ||||||
| @@ -71,7 +70,6 @@ func LogLocalWorkflow(wfName string, pipe io.ReadCloser, wg *sync.WaitGroup) { | |||||||
| 	logger = logs.GetLogger() | 	logger = logs.GetLogger() | ||||||
|  |  | ||||||
| 	logger.Debug().Msg("created wf_logger") | 	logger.Debug().Msg("created wf_logger") | ||||||
| 	fmt.Println("created wf_logger") |  | ||||||
| 	wfLogger = logger.With().Str("argo_name", wfName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger() | 	wfLogger = logger.With().Str("argo_name", wfName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger() | ||||||
|  |  | ||||||
| 	var current_watch, previous_watch ArgoWatch | 	var current_watch, previous_watch ArgoWatch | ||||||
| @@ -111,7 +109,6 @@ func LogLocalPod(wfName string, pipe io.ReadCloser, steps []string, wg *sync.Wai | |||||||
| 	scanner := bufio.NewScanner(pipe) | 	scanner := bufio.NewScanner(pipe) | ||||||
| 	for scanner.Scan() { | 	for scanner.Scan() { | ||||||
| 		var podLogger zerolog.Logger | 		var podLogger zerolog.Logger | ||||||
| 		fmt.Println("new line") |  | ||||||
| 		wg.Add(1) | 		wg.Add(1) | ||||||
| 		 | 		 | ||||||
| 		line := scanner.Text() | 		line := scanner.Text() | ||||||
|   | |||||||
							
								
								
									
										34
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										34
									
								
								main.go
									
									
									
									
									
								
							| @@ -69,6 +69,9 @@ func main() { | |||||||
| 	logger.Debug().Msg("Loki URL : " + conf.GetConfig().LokiURL) | 	logger.Debug().Msg("Loki URL : " + conf.GetConfig().LokiURL) | ||||||
| 	logger.Debug().Msg("Workflow executed : " + conf.GetConfig().ExecutionID) | 	logger.Debug().Msg("Workflow executed : " + conf.GetConfig().ExecutionID) | ||||||
| 	exec := u.GetExecution(conf.GetConfig().ExecutionID) | 	exec := u.GetExecution(conf.GetConfig().ExecutionID) | ||||||
|  | 	if exec == nil { | ||||||
|  | 		logger.Fatal().Msg("Could not retrieve workflow ID from execution ID " + conf.GetConfig().ExecutionID + " on peer " + conf.GetConfig().PeerID) | ||||||
|  | 	} | ||||||
| 	conf.GetConfig().WorkflowID = exec.WorkflowID | 	conf.GetConfig().WorkflowID = exec.WorkflowID | ||||||
|  |  | ||||||
| 	logger.Debug().Msg("Starting construction of yaml argo for workflow :" + exec.WorkflowID) | 	logger.Debug().Msg("Starting construction of yaml argo for workflow :" + exec.WorkflowID) | ||||||
| @@ -83,7 +86,6 @@ func main() { | |||||||
|  |  | ||||||
| 	err := new_wf.LoadFrom(conf.GetConfig().WorkflowID, conf.GetConfig().PeerID) | 	err := new_wf.LoadFrom(conf.GetConfig().WorkflowID, conf.GetConfig().PeerID) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
|  |  | ||||||
| 		logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API") | 		logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API") | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -95,7 +97,7 @@ func main() { | |||||||
|  |  | ||||||
| 	argoFilePath, err := builder.CompleteBuild(exec.ExecutionsID) | 	argoFilePath, err := builder.CompleteBuild(exec.ExecutionsID) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		logger.Error().Msg(err.Error()) | 		logger.Error().Msg("Error when completing the build of the workflow: " + err.Error()) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	workflowName = getContainerName(argoFilePath) | 	workflowName = getContainerName(argoFilePath) | ||||||
| @@ -105,13 +107,13 @@ func main() { | |||||||
|  |  | ||||||
| 	if conf.GetConfig().KubeHost == "" { | 	if conf.GetConfig().KubeHost == "" { | ||||||
| 		// Not in a k8s environment, get conf from parameters | 		// Not in a k8s environment, get conf from parameters | ||||||
| 		fmt.Println("Executes outside of k8s") | 		logger.Info().Msg("Executes outside of k8s") | ||||||
| 		executeOutside(argoFilePath, builder.Workflow) | 		executeOutside(argoFilePath, builder.Workflow) | ||||||
| 	} else { | 	} else { | ||||||
| 		// Executed in a k8s environment | 		// Executed in a k8s environment | ||||||
| 		fmt.Println("Executes inside a k8s") | 		logger.Info().Msg("Executes inside a k8s") | ||||||
| 		// executeInside(exec.GetID(), "argo", argo_file_path, stepMax)  // commenting to use conf.ExecutionID instead of exec.GetID() | 		// executeInside(exec.GetID(), "argo", argo_file_path, stepMax)  // commenting to use conf.ExecutionID instead of exec.GetID() | ||||||
| 		executeInside(conf.GetConfig().ExecutionID, conf.GetConfig().ExecutionID, argoFilePath) | 		executeInside(conf.GetConfig().ExecutionID, exec.ExecutionsID, argoFilePath) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -124,23 +126,25 @@ func executeInside(execID string, ns string, argo_file_path string) { | |||||||
| 	} | 	} | ||||||
| 	 | 	 | ||||||
| 	name, err := t.CreateArgoWorkflow(argo_file_path, ns) | 	name, err := t.CreateArgoWorkflow(argo_file_path, ns) | ||||||
| 	_ = name  | 	// _ = name  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		logger.Error().Msg("Could not create argo workflow : " + err.Error()) | 		logger.Error().Msg("Could not create argo workflow : " + err.Error()) | ||||||
| 		fmt.Println("CA :" + conf.GetConfig().KubeCA) | 		logger.Info().Msg(fmt.Sprint("CA :" + conf.GetConfig().KubeCA)) | ||||||
| 		fmt.Println("Cert :" + conf.GetConfig().KubeCert) | 		logger.Info().Msg(fmt.Sprint("Cert :" + conf.GetConfig().KubeCert)) | ||||||
| 		fmt.Println("Data :" + conf.GetConfig().KubeData) | 		logger.Info().Msg(fmt.Sprint("Data :" + conf.GetConfig().KubeData)) | ||||||
| 		return | 		return | ||||||
| 	} else { | 	} else { | ||||||
| 		watcher, err := t.GetArgoWatch(execID, workflowName) | 		watcher, err := t.GetArgoWatch(ns, workflowName) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			logger.Error().Msg("Could not retrieve Watcher : " + err.Error()) | 			logger.Error().Msg("Could not retrieve Watcher : " + err.Error()) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		l.LogKubernetesArgo(name, execID, watcher) | 		l.LogKubernetesArgo(name, ns, watcher) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
| 			logger.Error().Msg("Could not log workflow : " + err.Error()) | 			logger.Error().Msg("Could not log workflow : " + err.Error()) | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
|  | 		logger.Info().Msg("Finished, exiting...") | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| } | } | ||||||
| @@ -173,7 +177,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { | |||||||
| 	go l.LogLocalWorkflow(workflowName, stdoutSubmit, &wg) | 	go l.LogLocalWorkflow(workflowName, stdoutSubmit, &wg) | ||||||
| 	go l.LogLocalPod(workflowName, stdoutLogs, steps, &wg) | 	go l.LogLocalPod(workflowName, stdoutLogs, steps, &wg) | ||||||
|  |  | ||||||
| 	fmt.Println("Starting argo submit") | 	logger.Info().Msg("Starting argo submit") | ||||||
| 	if err := cmdSubmit.Start(); err != nil { | 	if err := cmdSubmit.Start(); err != nil { | ||||||
| 		wf_logger.Error().Msg("Could not start argo submit") | 		wf_logger.Error().Msg("Could not start argo submit") | ||||||
| 		wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text()) | 		wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text()) | ||||||
| @@ -182,7 +186,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { | |||||||
|  |  | ||||||
| 	time.Sleep(5 * time.Second) | 	time.Sleep(5 * time.Second) | ||||||
|  |  | ||||||
| 	fmt.Println("Running argo logs") | 	logger.Info().Msg("Running argo logs") | ||||||
| 	if err := cmdLogs.Run(); err != nil { | 	if err := cmdLogs.Run(); err != nil { | ||||||
| 		wf_logger.Error().Msg("Could not run '" + strings.Join(cmdLogs.Args, " ") + "'") | 		wf_logger.Error().Msg("Could not run '" + strings.Join(cmdLogs.Args, " ") + "'") | ||||||
| 		 | 		 | ||||||
| @@ -190,7 +194,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { | |||||||
|  |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	fmt.Println("Waiting argo submit") | 	logger.Info().Msg("Waiting argo submit") | ||||||
| 	if err := cmdSubmit.Wait(); err != nil { | 	if err := cmdSubmit.Wait(); err != nil { | ||||||
| 		wf_logger.Error().Msg("Could not execute argo submit") | 		wf_logger.Error().Msg("Could not execute argo submit") | ||||||
| 		wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text()) | 		wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text()) | ||||||
| @@ -231,7 +235,7 @@ func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) { | |||||||
|  |  | ||||||
| 	err := parser.Parse(os.Args) | 	err := parser.Parse(os.Args) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		fmt.Println(parser.Usage(err)) | 		logger.Info().Msg(parser.Usage(err)) | ||||||
| 		os.Exit(1) | 		os.Exit(1) | ||||||
| 	} | 	} | ||||||
| 	conf.GetConfig().Logs = "debug" | 	conf.GetConfig().Logs = "debug" | ||||||
|   | |||||||
							
								
								
									
										
											BIN
										
									
								
								oc-monitord
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								oc-monitord
									
									
									
									
									
										Executable file
									
								
							
										
											Binary file not shown.
										
									
								
							| @@ -83,7 +83,8 @@ func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, er | |||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return "", errors.New("failed to create workflow: " + err.Error()) | 		return "", errors.New("failed to create workflow: " + err.Error()) | ||||||
| 	} | 	} | ||||||
| 	fmt.Printf("workflow %s created in namespace %s\n", createdWf.Name, ns) | 	l := utils.GetLogger() | ||||||
|  | 	l.Info().Msg(fmt.Sprintf("workflow %s created in namespace %s\n", createdWf.Name, ns)) | ||||||
| 	return createdWf.Name, nil | 	return createdWf.Name, nil | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -117,15 +118,13 @@ func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password s | |||||||
| func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch.Interface, error){ | func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch.Interface, error){ | ||||||
| 	wfl := utils.GetWFLogger("") | 	wfl := utils.GetWFLogger("") | ||||||
| 	wfl.Debug().Msg("Starting argo watch with argo lib") | 	wfl.Debug().Msg("Starting argo watch with argo lib") | ||||||
| 	fmt.Println("metadata.name=oc-monitor-"+wfName + "  in namespace : " + executionId) |  | ||||||
| 	options := metav1.ListOptions{FieldSelector: "metadata.name=oc-monitor-"+wfName} | 	options := metav1.ListOptions{FieldSelector: "metadata.name=oc-monitor-"+wfName} | ||||||
| 	fmt.Println(options) |  | ||||||
| 	watcher, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(executionId).Watch(context.TODO(), options) | 	watcher, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(executionId).Watch(context.Background(), options) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, errors.New("Error executing 'argo watch " + wfName + " -n " + executionId + " with ArgoprojV1alpha1 client") | 		return nil, errors.New("Error executing 'argo watch " + wfName + " -n " + executionId + " with ArgoprojV1alpha1 client") | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  |  | ||||||
| 	return watcher, nil  | 	return watcher, nil  | ||||||
|  |  | ||||||
| } | } | ||||||
| @@ -133,14 +132,16 @@ func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch | |||||||
| func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) { | func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) { | ||||||
| 	var targetPod v1.Pod | 	var targetPod v1.Pod | ||||||
|  |  | ||||||
|  | 	 | ||||||
| 	pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{ | 	pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{ | ||||||
|         LabelSelector: "workflows.argoproj.io/workflow="+wfName, | 		LabelSelector: "workflows.argoproj.io/workflow="+wfName, | ||||||
|     }) |     }) | ||||||
|     if err != nil { |     if err != nil { | ||||||
| 		return nil, fmt.Errorf("failed to list pods: " + err.Error()) | 		return nil, fmt.Errorf("failed to list pods: " + err.Error()) | ||||||
|     } |     } | ||||||
|     if len(pods.Items) == 0 { |     if len(pods.Items) == 0 { | ||||||
| 		return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/node-name=" + nodeName) | 		 | ||||||
|  | 		return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/workflow="+ wfName + " no pods found with label workflows.argoproj.io/node-name=" + nodeName + " in namespace " + ns) | ||||||
|     } |     } | ||||||
| 	 | 	 | ||||||
|     for _, pod := range pods.Items { |     for _, pod := range pods.Items { | ||||||
| @@ -172,7 +173,8 @@ func (k *KubernetesTools) testPodReady(pod v1.Pod, ns string) { | |||||||
| 	 | 	 | ||||||
| 		var initialized bool | 		var initialized bool | ||||||
| 		for _, cond := range pod.Status.Conditions { | 		for _, cond := range pod.Status.Conditions { | ||||||
| 			if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue { | 			// It seems that for remote pods the pod gets the Succeeded status before it has time to display the it is ready to run in .status.conditions,so we added the OR condition | ||||||
|  | 			if (cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue) || pod.Status.Phase == v1.PodSucceeded {		 | ||||||
| 				initialized = true | 				initialized = true | ||||||
| 				return | 				return | ||||||
| 			} | 			} | ||||||
|   | |||||||
| @@ -17,11 +17,13 @@ var ( | |||||||
| 	onceLogger 	sync.Once | 	onceLogger 	sync.Once | ||||||
| 	onceWF 		sync.Once | 	onceWF 		sync.Once | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func GetExecution(exec_id string) *workflow_execution.WorkflowExecution { | func GetExecution(exec_id string) *workflow_execution.WorkflowExecution { | ||||||
| 	res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", conf.GetConfig().PeerID, []string{}, nil).LoadOne(exec_id) | 	res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", conf.GetConfig().PeerID, []string{}, nil).LoadOne(exec_id) | ||||||
| 	if res.Code != 200 { | 	if res.Code != 200 { | ||||||
| 		logger := oclib.GetLogger() | 		logger := oclib.GetLogger() | ||||||
| 		logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id) | 		logger.Error().Msg("Error retrieving execution " + exec_id) | ||||||
|  | 		logger.Error().Msg(res.Err) | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 	return res.ToWorkflowExecution() | 	return res.ToWorkflowExecution() | ||||||
|   | |||||||
| @@ -4,6 +4,7 @@ import ( | |||||||
| 	"encoding/json" | 	"encoding/json" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"net/http" | 	"net/http" | ||||||
|  | 	"oc-monitord/utils" | ||||||
| 	"slices" | 	"slices" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| @@ -21,7 +22,7 @@ type AdmiraltySetter struct { | |||||||
|  |  | ||||||
| func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID string) error { | func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID string) error { | ||||||
| 	 | 	 | ||||||
| 	logger = logs.GetLogger() | 	logger := logs.GetLogger() | ||||||
|  |  | ||||||
| 	data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(remotePeerID) | 	data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(remotePeerID) | ||||||
| 	if data.Code != 200 { | 	if data.Code != 200 { | ||||||
| @@ -30,42 +31,42 @@ func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID st | |||||||
| 	} | 	} | ||||||
| 	remotePeer := data.ToPeer() | 	remotePeer := data.ToPeer() | ||||||
|  |  | ||||||
| 	data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(localPeerID) | 			data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(localPeerID) | ||||||
| 	if data.Code != 200 { | 			if data.Code != 200 { | ||||||
| 		logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID) | 				logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID) | ||||||
| 		return fmt.Errorf(data.Err) | 				return fmt.Errorf(data.Err) | ||||||
| 	} | 			} | ||||||
| 	localPeer := data.ToPeer() | 			localPeer := data.ToPeer() | ||||||
|  |  | ||||||
| 	caller := tools.NewHTTPCaller( | 			caller := tools.NewHTTPCaller( | ||||||
| 		map[tools.DataType]map[tools.METHOD]string{ | 				map[tools.DataType]map[tools.METHOD]string{ | ||||||
| 			tools.ADMIRALTY_SOURCE: map[tools.METHOD]string{ | 					tools.ADMIRALTY_SOURCE: { | ||||||
| 				tools.POST :"/:id", | 						tools.POST :"/:id", | ||||||
|  | 					}, | ||||||
|  | 					tools.ADMIRALTY_KUBECONFIG: { | ||||||
|  | 						tools.GET:"/:id", | ||||||
|  | 					}, | ||||||
|  | 					tools.ADMIRALTY_SECRET: { | ||||||
|  | 						tools.POST:"/:id/" + remotePeerID, | ||||||
|  | 					}, | ||||||
|  | 					tools.ADMIRALTY_TARGET: { | ||||||
|  | 						tools.POST:"/:id/" + remotePeerID, | ||||||
|  | 					}, | ||||||
|  | 					tools.ADMIRALTY_NODES: { | ||||||
|  | 						tools.GET:"/:id/" + remotePeerID, | ||||||
|  | 					}, | ||||||
| 			}, | 			}, | ||||||
| 			tools.ADMIRALTY_KUBECONFIG: map[tools.METHOD]string{ | 		) | ||||||
| 				tools.GET:"/:id", |  | ||||||
| 			}, |  | ||||||
| 			tools.ADMIRALTY_SECRET: map[tools.METHOD]string{ |  | ||||||
| 				tools.POST:"/:id", |  | ||||||
| 			}, |  | ||||||
| 			tools.ADMIRALTY_TARGET: map[tools.METHOD]string{ |  | ||||||
| 				tools.POST:"/:id", |  | ||||||
| 			}, |  | ||||||
| 			tools.ADMIRALTY_NODES: map[tools.METHOD]string{ |  | ||||||
| 				tools.GET:"/:id", |  | ||||||
| 			}, |  | ||||||
| 		}, |  | ||||||
| 	) |  | ||||||
| 	 | 	 | ||||||
| 	logger.Info().Msg(" Creating the Admiralty Source on " + remotePeerID + " ns-" + s.Id + "\n\n") | 	logger.Info().Msg("\n\n  Creating the Admiralty Source on " + remotePeerID + " ns-" + s.Id) | ||||||
| 	_ = s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict},caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true) | 	_ = s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict},caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true) | ||||||
| 	logger.Info().Msg(" Retrieving kubeconfig with the secret on " + remotePeerID + " ns-" + s.Id + "\n\n") | 	logger.Info().Msg("\n\n  Retrieving kubeconfig with the secret on " + remotePeerID + " ns-" + s.Id) | ||||||
| 	kubeconfig := s.getKubeconfig(remotePeer, caller) | 	kubeconfig := s.getKubeconfig(remotePeer, caller) | ||||||
| 	logger.Info().Msg(" Creating a secret from the kubeconfig " + localPeerID + " ns-" + s.Id + "\n\n") | 	logger.Info().Msg("\n\n  Creating a secret from the kubeconfig " + localPeerID + " ns-" + s.Id) | ||||||
| 	_ = s.callRemoteExecution(localPeer, []int{http.StatusCreated}, caller,s.Id, tools.ADMIRALTY_SECRET, tools.POST,kubeconfig, true) | 	_ = s.callRemoteExecution(localPeer, []int{http.StatusCreated}, caller,s.Id, tools.ADMIRALTY_SECRET, tools.POST,kubeconfig, true) | ||||||
| 	logger.Info().Msg(" Creating the Admiralty Target on " + localPeerID + " ns-" + s.Id + "\n\n") | 	logger.Info().Msg("\n\n Creating the Admiralty Target on " + localPeerID + " in namespace " + s.Id ) | ||||||
| 	_ = s.callRemoteExecution(localPeer,[]int{http.StatusCreated, http.StatusConflict},caller,s.Id,tools.ADMIRALTY_TARGET,tools.POST, nil, true) | 	_ = s.callRemoteExecution(localPeer,[]int{http.StatusCreated, http.StatusConflict},caller,s.Id,tools.ADMIRALTY_TARGET,tools.POST, nil, true) | ||||||
| 	logger.Info().Msg(" Checking for the creation of the admiralty node on " + localPeerID + " ns-" + s.Id + "\n\n") | 	logger.Info().Msg("\n\n  Checking for the creation of the admiralty node on " + localPeerID + " ns-" + s.Id) | ||||||
| 	s.checkNodeStatus(localPeer,caller) | 	s.checkNodeStatus(localPeer,caller) | ||||||
| 	 | 	 | ||||||
| 	return nil | 	return nil | ||||||
| @@ -75,12 +76,14 @@ func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCalle | |||||||
| 	var kubedata map[string]string | 	var kubedata map[string]string | ||||||
| 	_ = s.callRemoteExecution(peer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_KUBECONFIG, tools.GET, nil, true) | 	_ = s.callRemoteExecution(peer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_KUBECONFIG, tools.GET, nil, true) | ||||||
| 	if caller.LastResults["body"] == nil || len(caller.LastResults["body"].([]byte)) == 0 { | 	if caller.LastResults["body"] == nil || len(caller.LastResults["body"].([]byte)) == 0 { | ||||||
| 		fmt.Println("Something went wrong when retrieving data from Get call for kubeconfig") | 		l := utils.GetLogger() | ||||||
|  | 		l.Error().Msg("Something went wrong when retrieving data from Get call for kubeconfig") | ||||||
| 		panic(0) | 		panic(0) | ||||||
| 	} | 	} | ||||||
| 	err := json.Unmarshal(caller.LastResults["body"].([]byte), &kubedata) | 	err := json.Unmarshal(caller.LastResults["body"].([]byte), &kubedata) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		fmt.Println("Something went wrong when unmarshalling data from Get call for kubeconfig") | 		l := utils.GetLogger() | ||||||
|  | 		l.Error().Msg("Something went wrong when unmarshalling data from Get call for kubeconfig") | ||||||
| 		panic(0) | 		panic(0) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -88,18 +91,18 @@ func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCalle | |||||||
| } | } | ||||||
|  |  | ||||||
| func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int,caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) *peer.PeerExecution { | func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int,caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) *peer.PeerExecution { | ||||||
|  | 	l := utils.GetLogger() | ||||||
| 	resp, err := peer.LaunchPeerExecution(peer.UUID, dataID, dt, method, body, caller) | 	resp, err := peer.LaunchPeerExecution(peer.UUID, dataID, dt, method, body, caller) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		fmt.Println("Error when executing on peer at", peer.Url) | 		l.Error().Msg("Error when executing on peer at" + peer.Url) | ||||||
| 		fmt.Println(err) | 		l.Error().Msg(err.Error()) | ||||||
| 		panic(0) | 		panic(0) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if !slices.Contains(expectedCode, caller.LastResults["code"].(int)) { | 	if !slices.Contains(expectedCode, caller.LastResults["code"].(int)) { | ||||||
| 		fmt.Println("Didn't receive the expected code :", caller.LastResults["code"], "when expecting", expectedCode) | 		l.Error().Msg(fmt.Sprint("Didn't receive the expected code :", caller.LastResults["code"], "when expecting", expectedCode)) | ||||||
| 		if _, ok := caller.LastResults["body"]; ok { | 		if _, ok := caller.LastResults["body"]; ok { | ||||||
| 			logger.Info().Msg(string(caller.LastResults["body"].([]byte))) | 			l.Info().Msg(string(caller.LastResults["body"].([]byte))) | ||||||
| 			// fmt.Println(string(caller.LastResults["body"].([]byte))) |  | ||||||
| 		} | 		} | ||||||
| 		if panicCode { | 		if panicCode { | ||||||
| 			panic(0) | 			panic(0) | ||||||
| @@ -120,14 +123,15 @@ func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller){ | |||||||
| 		name := metadata.(map[string]interface{})["name"].(string) | 		name := metadata.(map[string]interface{})["name"].(string) | ||||||
| 		s.NodeName = name | 		s.NodeName = name | ||||||
| 	} else { | 	} else { | ||||||
| 		fmt.Println("Could not retrieve data about the recently created node") | 		l := utils.GetLogger() | ||||||
|  | 		l.Error().Msg("Could not retrieve data about the recently created node") | ||||||
| 		panic(0) | 		panic(0) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller){ | func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller){ | ||||||
| 	for i := range(5) { | 	for i := range(5) { | ||||||
| 		time.Sleep(5 * time.Second) // let some time for kube to generate the node | 		time.Sleep(10 * time.Second) // let some time for kube to generate the node | ||||||
| 		_ = s.callRemoteExecution(localPeer,[]int{http.StatusOK},caller,s.Id,tools.ADMIRALTY_NODES,tools.GET, nil, false) | 		_ = s.callRemoteExecution(localPeer,[]int{http.StatusOK},caller,s.Id,tools.ADMIRALTY_NODES,tools.GET, nil, false) | ||||||
| 		if caller.LastResults["code"] == 200 { | 		if caller.LastResults["code"] == 200 { | ||||||
| 			s.storeNodeName(caller) | 			s.storeNodeName(caller) | ||||||
|   | |||||||
| @@ -26,11 +26,11 @@ import ( | |||||||
| var logger zerolog.Logger | var logger zerolog.Logger | ||||||
|  |  | ||||||
| type ArgoBuilder struct { | type ArgoBuilder struct { | ||||||
| 	OriginWorkflow 	*w.Workflow | 	OriginWorkflow *w.Workflow | ||||||
| 	Workflow       	Workflow | 	Workflow       Workflow | ||||||
| 	Services       	[]*Service | 	Services       []*Service | ||||||
| 	Timeout        	int | 	Timeout        int | ||||||
| 	RemotePeers		[]string | 	RemotePeers    []string | ||||||
| } | } | ||||||
|  |  | ||||||
| type Workflow struct { | type Workflow struct { | ||||||
| @@ -59,13 +59,16 @@ type Spec struct { | |||||||
| 	Volumes    			[]VolumeClaimTemplate 	`yaml:"volumeClaimTemplates,omitempty"` | 	Volumes    			[]VolumeClaimTemplate 	`yaml:"volumeClaimTemplates,omitempty"` | ||||||
| 	Templates  			[]Template            	`yaml:"templates"` | 	Templates  			[]Template            	`yaml:"templates"` | ||||||
| 	Timeout    			int                   	`yaml:"activeDeadlineSeconds,omitempty"` | 	Timeout    			int                   	`yaml:"activeDeadlineSeconds,omitempty"` | ||||||
|  | 	NodeSelector		struct{ | ||||||
|  | 							NodeRole string `yaml:"node-role"` | ||||||
|  | 						} `yaml:"nodeSelector"` | ||||||
| } | } | ||||||
|  |  | ||||||
| // TODO: found on a processing instance linked to storage | // TODO: found on a processing instance linked to storage | ||||||
| // add s3, gcs, azure, etc if needed on a link between processing and storage | // add s3, gcs, azure, etc if needed on a link between processing and storage | ||||||
| func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, []string, error) { | func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, []string, error) { | ||||||
| 	logger = logs.GetLogger() | 	logger = logs.GetLogger() | ||||||
| 	fmt.Println("Creating DAG", b.OriginWorkflow.Graph.Items) | 	logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items)) | ||||||
| 	// handle services by checking if there is only one processing with hostname and port | 	// handle services by checking if there is only one processing with hostname and port | ||||||
| 	firstItems, lastItems, volumes := b.createTemplates(namespace) | 	firstItems, lastItems, volumes := b.createTemplates(namespace) | ||||||
| 	b.createVolumes(volumes) | 	b.createVolumes(volumes) | ||||||
| @@ -73,6 +76,7 @@ func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, [ | |||||||
| 	if b.Timeout > 0 { | 	if b.Timeout > 0 { | ||||||
| 		b.Workflow.Spec.Timeout = b.Timeout | 		b.Workflow.Spec.Timeout = b.Timeout | ||||||
| 	} | 	} | ||||||
|  | 	b.Workflow.Spec.NodeSelector.NodeRole = "worker" | ||||||
| 	b.Workflow.Spec.ServiceAccountName = "sa-"+namespace | 	b.Workflow.Spec.ServiceAccountName = "sa-"+namespace | ||||||
| 	b.Workflow.Spec.Entrypoint = "dag" | 	b.Workflow.Spec.Entrypoint = "dag" | ||||||
| 	b.Workflow.ApiVersion = "argoproj.io/v1alpha1" | 	b.Workflow.ApiVersion = "argoproj.io/v1alpha1" | ||||||
| @@ -90,10 +94,10 @@ func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []V | |||||||
| 	firstItems := []string{} | 	firstItems := []string{} | ||||||
| 	lastItems := []string{} | 	lastItems := []string{} | ||||||
| 	items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) | 	items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) | ||||||
| 	fmt.Println("Creating templates", len(items)) | 	logger.Info().Msg(fmt.Sprint("Creating templates", len(items))) | ||||||
| 	for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) { | 	for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) { | ||||||
| 		instance := item.Processing.GetSelectedInstance() | 		instance := item.Processing.GetSelectedInstance() | ||||||
| 		fmt.Println("Creating template for", item.Processing.GetName(), instance) | 		logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance)) | ||||||
| 		if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil { | 		if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil { | ||||||
| 			logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName()) | 			logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName()) | ||||||
| 			return firstItems, lastItems, volumes | 			return firstItems, lastItems, volumes | ||||||
| @@ -176,10 +180,12 @@ func (b *ArgoBuilder) createArgoTemplates(namespace string, | |||||||
| 	lastItems []string) ([]VolumeMount, []string, []string) { | 	lastItems []string) ([]VolumeMount, []string, []string) { | ||||||
| 	_, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems) | 	_, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems) | ||||||
| 	template := &Template{Name: getArgoName(processing.GetName(), id)} | 	template := &Template{Name: getArgoName(processing.GetName(), id)} | ||||||
| 	fmt.Println("Creating template for", template.Name) | 	logger.Info().Msg(fmt.Sprint("Creating template for", template.Name)) | ||||||
| 	isReparted, peerId := b.isProcessingReparted(*processing,id) | 	isReparted, peerId := b.isProcessingReparted(*processing, id) | ||||||
| 	template.CreateContainer(processing, b.Workflow.getDag()) | 	template.CreateContainer(processing, b.Workflow.getDag()) | ||||||
|  | 	 | ||||||
| 	if isReparted { | 	if isReparted { | ||||||
|  | 		logger.Debug().Msg("Reparted processing, on " + peerId) | ||||||
| 		b.RemotePeers = append(b.RemotePeers, peerId) | 		b.RemotePeers = append(b.RemotePeers, peerId) | ||||||
| 		template.AddAdmiraltyAnnotations(peerId) | 		template.AddAdmiraltyAnnotations(peerId) | ||||||
| 	} | 	} | ||||||
| @@ -325,7 +331,7 @@ func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) { | |||||||
| 	isDeps := false | 	isDeps := false | ||||||
| 	for _, link := range b.OriginWorkflow.Graph.Links { | 	for _, link := range b.OriginWorkflow.Graph.Links { | ||||||
| 		if _, ok := b.OriginWorkflow.Graph.Items[link.Destination.ID]; !ok { | 		if _, ok := b.OriginWorkflow.Graph.Items[link.Destination.ID]; !ok { | ||||||
| 			fmt.Println("Could not find the source of the link", link.Destination.ID) | 			logger.Info().Msg(fmt.Sprint("Could not find the source of the link", link.Destination.ID)) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		source := b.OriginWorkflow.Graph.Items[link.Destination.ID].Processing | 		source := b.OriginWorkflow.Graph.Items[link.Destination.ID].Processing | ||||||
| @@ -345,7 +351,7 @@ func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) { | |||||||
| func (b *ArgoBuilder) getArgoDependencies(id string) (dependencies []string) { | func (b *ArgoBuilder) getArgoDependencies(id string) (dependencies []string) { | ||||||
| 	for _, link := range b.OriginWorkflow.Graph.Links { | 	for _, link := range b.OriginWorkflow.Graph.Links { | ||||||
| 		if _, ok := b.OriginWorkflow.Graph.Items[link.Source.ID]; !ok { | 		if _, ok := b.OriginWorkflow.Graph.Items[link.Source.ID]; !ok { | ||||||
| 			fmt.Println("Could not find the source of the link", link.Source.ID) | 			logger.Info().Msg(fmt.Sprint("Could not find the source of the link", link.Source.ID)) | ||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
| 		source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing | 		source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing | ||||||
| @@ -367,16 +373,15 @@ func getArgoName(raw_name string, component_id string) (formatedName string) { | |||||||
|  |  | ||||||
| // Verify if a processing resource is attached to another Compute than the one hosting | // Verify if a processing resource is attached to another Compute than the one hosting | ||||||
| // the current Open Cloud instance. If true return the peer ID to contact | // the current Open Cloud instance. If true return the peer ID to contact | ||||||
| func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource, graphID string) (bool,string) { | func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource, graphID string) (bool, string) { | ||||||
| 	computeAttached := b.retrieveProcessingCompute(graphID) | 	computeAttached := b.retrieveProcessingCompute(graphID) | ||||||
| 	if computeAttached == nil { | 	if computeAttached == nil { | ||||||
| 		logger.Error().Msg("No compute was found attached to processing " + processing.Name + " : " + processing.UUID ) | 		logger.Error().Msg("No compute was found attached to processing " + processing.Name + " : " + processing.UUID) | ||||||
| 		panic(0) | 		panic(0) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	 |  | ||||||
| 	// Creates an accessor srtictly for Peer Collection | 	// Creates an accessor srtictly for Peer Collection | ||||||
| 	req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"","",nil,nil) | 	req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil) | ||||||
| 	if req == nil { | 	if req == nil { | ||||||
| 		fmt.Println("TODO : handle error when trying to create a request on the Peer Collection") | 		fmt.Println("TODO : handle error when trying to create a request on the Peer Collection") | ||||||
| 		return false, "" | 		return false, "" | ||||||
| @@ -391,8 +396,8 @@ func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResour | |||||||
|  |  | ||||||
| 	peer := *res.ToPeer() | 	peer := *res.ToPeer() | ||||||
|  |  | ||||||
| 	isNotReparted, _ := peer.IsMySelf() | 	isNotReparted := peer.State == 1 | ||||||
| 	fmt.Println("Result IsMySelf for ", peer.UUID ," : ", isNotReparted) | 	logger.Info().Msg(fmt.Sprint("Result IsMySelf for ", peer.UUID, " : ", isNotReparted)) | ||||||
|  |  | ||||||
| 	return !isNotReparted, peer.UUID | 	return !isNotReparted, peer.UUID | ||||||
| } | } | ||||||
| @@ -401,12 +406,12 @@ func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.Compu | |||||||
| 	for _, link := range b.OriginWorkflow.Graph.Links { | 	for _, link := range b.OriginWorkflow.Graph.Links { | ||||||
| 		// If a link contains the id of the processing | 		// If a link contains the id of the processing | ||||||
| 		var oppositeId string | 		var oppositeId string | ||||||
| 		if link.Source.ID == graphID{ | 		if link.Source.ID == graphID { | ||||||
| 			oppositeId = link.Destination.ID | 			oppositeId = link.Destination.ID | ||||||
| 		} else if(link.Destination.ID == graphID){ | 		} else if link.Destination.ID == graphID { | ||||||
| 			oppositeId = link.Source.ID | 			oppositeId = link.Source.ID | ||||||
| 		} | 		} | ||||||
| 		fmt.Println("OppositeId : ", oppositeId) | 		 | ||||||
| 		if oppositeId != "" { | 		if oppositeId != "" { | ||||||
| 			dt, res := b.OriginWorkflow.Graph.GetResource(oppositeId) | 			dt, res := b.OriginWorkflow.Graph.GetResource(oppositeId) | ||||||
| 			if dt == oclib.COMPUTE_RESOURCE { | 			if dt == oclib.COMPUTE_RESOURCE { | ||||||
| @@ -421,23 +426,21 @@ func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.Compu | |||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  |  | ||||||
| // Execute the last actions once the YAML file for the Argo Workflow is created | // Execute the last actions once the YAML file for the Argo Workflow is created | ||||||
| func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { | func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { | ||||||
| 	fmt.Println("DEV :: Completing build") | 	logger.Info().Msg(fmt.Sprint("DEV :: Completing build")) | ||||||
| 	setter := AdmiraltySetter{Id: executionsId} | 	setter := AdmiraltySetter{Id: executionsId} | ||||||
| 	// Setup admiralty for each node | 	// Setup admiralty for each node | ||||||
| 	for _, peer := range b.RemotePeers { | 	for _, peer := range b.RemotePeers { | ||||||
| 		fmt.Println("DEV :: Launching Admiralty Setup for ", peer) | 		logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer)) | ||||||
| 		setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer) | 		setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Update the name of the admiralty node to use  | 	// Update the name of the admiralty node to use  | ||||||
| 	for _, template := range b.Workflow.Spec.Templates { | 	for _, template := range b.Workflow.Spec.Templates { | ||||||
| 		if len(template.Metadata.Annotations) > 0 { | 		if len(template.Metadata.Annotations) > 0 { | ||||||
| 			if resp, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok { | 			if peerId, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok { | ||||||
| 				fmt.Println(resp) | 				template.Metadata.Annotations["multicluster.admiralty.io/clustername"] = "target-" + peerId + "-" + conf.GetConfig().ExecutionID | ||||||
| 				template.Metadata.Annotations["multicluster.admiralty.io/clustername"] = "target-" + conf.GetConfig().ExecutionID |  | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -14,7 +14,7 @@ type WorflowDB struct { | |||||||
|  |  | ||||||
| // Create the obj!ects from the mxgraphxml stored in the workflow given as a parameter | // Create the obj!ects from the mxgraphxml stored in the workflow given as a parameter | ||||||
| func (w *WorflowDB) LoadFrom(workflow_id string, peerID string) error { | func (w *WorflowDB) LoadFrom(workflow_id string, peerID string) error { | ||||||
| 	fmt.Println("Loading workflow from " + workflow_id) | 	logger.Info().Msg("Loading workflow from " + workflow_id) | ||||||
| 	var err error | 	var err error | ||||||
| 	if w.Workflow, err = w.getWorkflow(workflow_id, peerID); err != nil { | 	if w.Workflow, err = w.getWorkflow(workflow_id, peerID); err != nil { | ||||||
| 		return err | 		return err | ||||||
| @@ -27,7 +27,7 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo | |||||||
| 	logger := oclib.GetLogger() | 	logger := oclib.GetLogger() | ||||||
|  |  | ||||||
| 	lib_data := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerID, []string{}, nil).LoadOne(workflow_id) | 	lib_data := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerID, []string{}, nil).LoadOne(workflow_id) | ||||||
| 	fmt.Println("ERR", lib_data.Code, lib_data.Err) | 	logger.Info().Msg(fmt.Sprint("ERR", lib_data.Code, lib_data.Err)) | ||||||
| 	if lib_data.Code != 200 { | 	if lib_data.Code != 200 { | ||||||
| 		logger.Error().Msg("Error loading the graph") | 		logger.Error().Msg("Error loading the graph") | ||||||
| 		return workflow, errors.New(lib_data.Err) | 		return workflow, errors.New(lib_data.Err) | ||||||
| @@ -43,7 +43,7 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo | |||||||
|  |  | ||||||
| func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, int, error) { | func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, int, error) { | ||||||
| 	logger := oclib.GetLogger() | 	logger := oclib.GetLogger() | ||||||
| 	fmt.Println("Exporting to Argo", w.Workflow) | 	logger.Info().Msg(fmt.Sprint("Exporting to Argo", w.Workflow)) | ||||||
| 	if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil { | 	if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil { | ||||||
| 		return nil, 0, fmt.Errorf("can't export a graph that has not been loaded yet") | 		return nil, 0, fmt.Errorf("can't export a graph that has not been loaded yet") | ||||||
| 	} | 	} | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user