Compare commits
	
		
			28 Commits
		
	
	
		
			feature/ad
			...
			test-ram
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|  | 411effb000 | ||
|  | 7c913bec0e | ||
|  | bdbbd7697a | ||
|  | 6917295fbd | ||
|  | e1b0ad089c | ||
|  | 483f747754 | ||
|  | 03675d09ae | ||
|  | f3e84a4f43 | ||
|  | eae5474552 | ||
|  | bae9cb2011 | ||
|  | 65b8960703 | ||
|  | 90aa19caeb | ||
|  | dcb3e2b7cc | ||
|  | c871d68333 | ||
|  | 6cf5da787a | ||
|  | fa4db92c92 | ||
|  | ee94c1aa42 | ||
|  | c40b18f1d6 | ||
|  | 2932fb2710 | ||
|  | 2343a5329e | ||
|  | 86fa41a376 | ||
|  | 6ec7a670bd | ||
| 6323d4eed4 | |||
| 93f3806b86 | |||
|  | ade18f1042 | ||
| 83d118fb05 | |||
| f7f0c9c2d2 | |||
| aea7cbd41c | 
							
								
								
									
										4
									
								
								Makefile
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								Makefile
									
									
									
									
									
								
							| @@ -3,6 +3,8 @@ | ||||
| build: clean | ||||
| 	go build . | ||||
|  | ||||
| dev: build | ||||
|  | ||||
| run: | ||||
| 	./oc-monitord | ||||
|  | ||||
| @@ -22,4 +24,4 @@ publish-registry: | ||||
|  | ||||
| all: docker publish-kind publish-registry | ||||
|  | ||||
| .PHONY: build run clean docker publish-kind publish-registry | ||||
| .PHONY: build run clean docker publish-kind publish-registry | ||||
|   | ||||
							
								
								
									
										71
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										71
									
								
								README.md
									
									
									
									
									
								
							| @@ -1,52 +1,26 @@ | ||||
| # oc-monitor | ||||
|  | ||||
| ## Deploy in k8s (dev) | ||||
| DO : | ||||
|   make build | ||||
|  | ||||
| While a registry with all of the OC docker images has not been set-up we can export this image to k3s ctr | ||||
| ## Summary | ||||
|  | ||||
| > docker save oc-monitord:latest | sudo k3s ctr images import - | ||||
| oc-monitord is a daemon which can be run : | ||||
| - as a binary | ||||
| - as a container | ||||
|  | ||||
| Then in the pod manifest for oc-monitord use :  | ||||
|  | ||||
| ``` | ||||
| image: docker.io/library/oc-monitord | ||||
| imagePullPolicy: Never | ||||
| ``` | ||||
|  | ||||
| Not doing so will end up in the pod having a `ErrorImagePull` | ||||
|  | ||||
| ## Allow argo to create services | ||||
|  | ||||
| In order for monitord to expose **open cloud services** on the node, we need to give him permission to create **k8s services**. | ||||
|  | ||||
| For that we can update the RBAC configuration for a role already created by argo :  | ||||
|  | ||||
| ### Manually edit the rbac authorization  | ||||
|  | ||||
| > kubectl edit roles.rbac.authorization.k8s.io -n argo argo-role | ||||
|  | ||||
| In rules add a new entry :  | ||||
|  | ||||
| ``` | ||||
| - apiGroups: | ||||
|   - "" | ||||
|   resources:   | ||||
|   - services | ||||
|     verbs:                                                                                                                                                           | ||||
|     - get | ||||
|     - create | ||||
| ``` | ||||
|  | ||||
| ### Patch the rbac authorization with a one liner  | ||||
|  | ||||
| > kubectl patch role argo-role -n argo --type='json' -p='[{"op": "add", "path": "/rules/-", "value": {"apiGroups": [""], "resources": ["services"], "verbs": ["get","create"]}}]' | ||||
|  | ||||
| ### Check wether the modification is effective  | ||||
|  | ||||
| > kubectl auth can-i create services --as=system:serviceaccount:argo:argo -n argo | ||||
|  | ||||
| This command **must return "yes"** | ||||
| It is used to perform several actions regarding the execution of an Open Cloud workflow : | ||||
| - generating a YAML file that can be interpreted by **Argo Workflow** to create and execute pods in a kubernetes environment | ||||
| - setting up the different resources needed to execute a workflow over several peers/kubernetes nodes with **Admiralty** : token, secrets, targets and sources | ||||
| - creating the workflow and logging the output from  | ||||
|   - Argo watch, which gives informations about the workflow in general (phase, number of steps executed, status...) | ||||
|   - Pods : which are the logs generated by the pods  | ||||
|  | ||||
| To execute, the daemon needs several options :  | ||||
| - **-u** :   | ||||
| - **-m** : | ||||
| - **-d** : | ||||
| - **-e** : | ||||
|  | ||||
| # Notes features/admiralty-docker | ||||
|  | ||||
| @@ -57,14 +31,17 @@ This command **must return "yes"** | ||||
|   - decide that no peer can have "http://localhost" as its url and use an attribute from the peer object or isMyself() from oc-lib if a peer is the current host. | ||||
|  | ||||
|  | ||||
| ## TODO | ||||
| ## TODO | ||||
|  | ||||
| - [ ] Allow the front to known on which IP the service are reachable | ||||
| - [ ] Allow the front to known on which IP the service are reachable | ||||
|   - currently doing it by using `kubectl get nodes -o wide` | ||||
|  | ||||
| - [ ] Implement writing and reading from S3 bucket/MinIO when a data resource is linked to a compute resource. | ||||
|  | ||||
| ### Adding ingress handling to support reverse proxing | ||||
|  | ||||
| ### Adding ingress handling to support reverse proxing | ||||
|  | ||||
| - Test wether ingress-nginx is running or not | ||||
|   - Do something if not found : stop running and send error log OR start installation | ||||
| -  | ||||
| -  | ||||
|  | ||||
|   | ||||
							
								
								
									
										
											BIN
										
									
								
								docs/admiralty_naming_multi_peer.jpg
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								docs/admiralty_naming_multi_peer.jpg
									
									
									
									
									
										Normal file
									
								
							
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 71 KiB | 
							
								
								
									
										
											BIN
										
									
								
								docs/admiralty_setup_schema.jpg
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								docs/admiralty_setup_schema.jpg
									
									
									
									
									
										Normal file
									
								
							
										
											Binary file not shown.
										
									
								
							| After Width: | Height: | Size: 91 KiB | 
							
								
								
									
										4
									
								
								env.env
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								env.env
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,4 @@ | ||||
| KUBERNETES_SERVICE_HOST=192.168.1.169 | ||||
| KUBE_CA="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFTVlk3ZHZhNEdYTVdkMy9jMlhLN3JLYjlnWXgyNSthaEE0NmkyNVBkSFAKRktQL2UxSVMyWVF0dzNYZW1TTUQxaStZdzJSaVppNUQrSVZUamNtNHdhcnFvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVWtlUVJpNFJiODduME5yRnZaWjZHClc2SU55NnN3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUlnRXA5ck04WmdNclRZSHYxZjNzOW5DZXZZeWVVa3lZUk4KWjUzazdoaytJS1FDSVFDbk05TnVGKzlTakIzNDFacGZ5ays2NEpWdkpSM3BhcmVaejdMd2lhNm9kdz09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K" | ||||
| KUBE_CERT="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJWUxWNkFPQkdrU1F3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOekl6TVRFeU1ETTJNQjRYRFRJME1EZ3dPREV3TVRNMU5sb1hEVEkxTURndwpPREV3TVRNMU5sb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJGQ2Q1MFdPeWdlQ2syQzcKV2FrOWY4MVAvSkJieVRIajRWOXBsTEo0ck5HeHFtSjJOb2xROFYxdUx5RjBtOTQ2Nkc0RmRDQ2dqaXFVSk92Swp3NVRPNnd5alNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFJkOFI5cXVWK2pjeUVmL0ovT1hQSzMyS09XekFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQTArbThqTDBJVldvUTZ0dnB4cFo4NVlMalF1SmpwdXM0aDdnSXRxS3NmUVVDSUI2M2ZNdzFBMm5OVWU1TgpIUGZOcEQwSEtwcVN0Wnk4djIyVzliYlJUNklZCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFRc3hXWk9pbnIrcVp4TmFEQjVGMGsvTDF5cE01VHAxOFRaeU92ektJazQKRTFsZWVqUm9STW0zNmhPeVljbnN3d3JoNnhSUnBpMW5RdGhyMzg0S0Z6MlBvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBYZkVmYXJsZm8zTWhIL3lmemx6Cnl0OWlqbHN3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUxJL2dNYnNMT3MvUUpJa3U2WHVpRVMwTEE2cEJHMXgKcnBlTnpGdlZOekZsQWlFQW1wdjBubjZqN3M0MVI0QzFNMEpSL0djNE53MHdldlFmZWdEVGF1R2p3cFk9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K" | ||||
| KUBE_DATA="LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSU5ZS1BFb1dhd1NKUzJlRW5oWmlYMk5VZlY1ZlhKV2krSVNnV09TNFE5VTlvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFVUozblJZN0tCNEtUWUx0WnFUMS96VS84a0Z2Sk1lUGhYMm1Vc25pczBiR3FZblkyaVZEeApYVzR2SVhTYjNqcm9iZ1YwSUtDT0twUWs2OHJEbE03ckRBPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo=" | ||||
							
								
								
									
										3
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								go.mod
									
									
									
									
									
								
							| @@ -20,6 +20,7 @@ require ( | ||||
| 	github.com/golang/protobuf v1.5.4 // indirect | ||||
| 	github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect | ||||
| 	github.com/sirupsen/logrus v1.9.3 // indirect | ||||
| 	github.com/ugorji/go/codec v1.1.7 // indirect | ||||
| 	google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect | ||||
| 	google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect | ||||
| 	google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect | ||||
| @@ -27,7 +28,6 @@ require ( | ||||
| ) | ||||
|  | ||||
| require ( | ||||
| 	github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d | ||||
| 	github.com/argoproj/argo-workflows/v3 v3.6.4 | ||||
| 	github.com/beorn7/perks v1.0.1 // indirect | ||||
| 	github.com/biter777/countries v1.7.5 // indirect | ||||
| @@ -71,7 +71,6 @@ require ( | ||||
| 	github.com/robfig/cron v1.2.0 // indirect | ||||
| 	github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 // indirect | ||||
| 	github.com/smartystreets/goconvey v1.6.4 // indirect | ||||
| 	github.com/ugorji/go/codec v1.1.7 // indirect | ||||
| 	github.com/x448/float16 v0.8.4 // indirect | ||||
| 	github.com/xdg-go/pbkdf2 v1.0.0 // indirect | ||||
| 	github.com/xdg-go/scram v1.1.2 // indirect | ||||
|   | ||||
							
								
								
									
										2
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.sum
									
									
									
									
									
								
							| @@ -3,8 +3,6 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9 h1:mSFFPwil5Ih+RPBvn88MBerQMtsoHnOuyCZQaf91a34= | ||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE= | ||||
| github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= | ||||
| github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= | ||||
| github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= | ||||
| github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc= | ||||
| github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= | ||||
| github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= | ||||
|   | ||||
| @@ -7,6 +7,8 @@ import ( | ||||
| 	"oc-monitord/tools" | ||||
| 	"oc-monitord/utils" | ||||
| 	"slices" | ||||
| 	"strings" | ||||
| 	"sync" | ||||
| 	"time" | ||||
|  | ||||
| 	"github.com/rs/zerolog" | ||||
| @@ -106,13 +108,16 @@ func NewArgoPodLog(name string, step string, msg string) ArgoPodLog { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interface) { | ||||
| func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) { | ||||
| 	var argoWatcher *ArgoWatch | ||||
| 	var pods []string | ||||
| 	var node wfv1.NodeStatus | ||||
|  | ||||
| 	wfl := utils.GetWFLogger("") | ||||
| 	wfl.Debug().Msg("Starting to log " + wfName) | ||||
|  | ||||
| 	var wg sync.WaitGroup | ||||
| 	 | ||||
| 	for event := range (watcher.ResultChan()) { | ||||
| 		wf, ok := event.Object.(*wfv1.Workflow) | ||||
| 		if !ok { | ||||
| @@ -138,7 +143,7 @@ func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interfac | ||||
|  | ||||
| 		newWatcher := ArgoWatch{ | ||||
| 			Name: node.Name, | ||||
| 			Namespace: executionID, | ||||
| 			Namespace: namespace, | ||||
| 			Status: string(node.Phase), | ||||
| 			Created: node.StartedAt.String(), | ||||
| 			Started: node.StartedAt.String(), | ||||
| @@ -163,7 +168,9 @@ func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interfac | ||||
| 			if !slices.Contains(pods,pod.Name){ | ||||
| 				pl := wfl.With().Str("pod",  pod.Name).Logger() | ||||
| 				if wfName == pod.Name { pods = append(pods, pod.Name); continue }	// One of the node is the Workflow, the others are the pods so don't try to log on the wf name | ||||
| 				go logKubernetesPods(executionID, wfName, pod.Name, pl) | ||||
| 				pl.Info().Msg("Found a new pod to log : "  + pod.Name) | ||||
| 				wg.Add(1) | ||||
| 				go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg) | ||||
| 				pods = append(pods, pod.Name) | ||||
| 			}  | ||||
| 		} | ||||
| @@ -171,6 +178,8 @@ func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interfac | ||||
| 		// Stop listening to the chan when the Workflow is completed or something bad happened | ||||
| 		if node.Phase.Completed() { | ||||
| 			wfl.Info().Msg(wfName + " worflow completed") | ||||
| 			wg.Wait() | ||||
| 			wfl.Info().Msg(wfName + " exiting") | ||||
| 			break | ||||
| 		} | ||||
| 		if node.Phase.FailedOrError() { | ||||
| @@ -196,24 +205,31 @@ func retrieveCondition(wf *wfv1.Workflow) (c Conditions) { | ||||
| } | ||||
|  | ||||
| // Function needed to be executed as a go thread  | ||||
| func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger){ | ||||
| func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger, wg *sync.WaitGroup){ | ||||
| 	defer wg.Done() | ||||
| 	 | ||||
| 	s := strings.Split(podName, ".") | ||||
| 	name := s[0] + "-" + s[1] | ||||
| 	step := s[1] | ||||
| 	 | ||||
| 	k, err := tools.NewKubernetesTool() | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not get Kubernetes tools") | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	 | ||||
| 	reader, err := k.GetPodLogger(executionId, wfName, podName) | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg(err.Error()) | ||||
| 		return | ||||
| 	} | ||||
|  | ||||
| 	 | ||||
| 	scanner := bufio.NewScanner(reader) | ||||
| 	for scanner.Scan() { | ||||
| 		log := scanner.Text() | ||||
| 		podLog := NewArgoPodLog(wfName,podName,log) | ||||
| 		podLog := NewArgoPodLog(name,step,log) | ||||
| 		jsonified, _ := json.Marshal(podLog) | ||||
| 		logger.Info().Msg(string(jsonified)) | ||||
| 	} | ||||
| 	 | ||||
| } | ||||
| @@ -3,7 +3,6 @@ package logger | ||||
| import ( | ||||
| 	"bufio" | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"io" | ||||
| 	"oc-monitord/conf" | ||||
|  | ||||
| @@ -71,7 +70,6 @@ func LogLocalWorkflow(wfName string, pipe io.ReadCloser, wg *sync.WaitGroup) { | ||||
| 	logger = logs.GetLogger() | ||||
|  | ||||
| 	logger.Debug().Msg("created wf_logger") | ||||
| 	fmt.Println("created wf_logger") | ||||
| 	wfLogger = logger.With().Str("argo_name", wfName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger() | ||||
|  | ||||
| 	var current_watch, previous_watch ArgoWatch | ||||
| @@ -111,7 +109,6 @@ func LogLocalPod(wfName string, pipe io.ReadCloser, steps []string, wg *sync.Wai | ||||
| 	scanner := bufio.NewScanner(pipe) | ||||
| 	for scanner.Scan() { | ||||
| 		var podLogger zerolog.Logger | ||||
| 		fmt.Println("new line") | ||||
| 		wg.Add(1) | ||||
| 		 | ||||
| 		line := scanner.Text() | ||||
|   | ||||
							
								
								
									
										34
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										34
									
								
								main.go
									
									
									
									
									
								
							| @@ -69,6 +69,9 @@ func main() { | ||||
| 	logger.Debug().Msg("Loki URL : " + conf.GetConfig().LokiURL) | ||||
| 	logger.Debug().Msg("Workflow executed : " + conf.GetConfig().ExecutionID) | ||||
| 	exec := u.GetExecution(conf.GetConfig().ExecutionID) | ||||
| 	if exec == nil { | ||||
| 		logger.Fatal().Msg("Could not retrieve workflow ID from execution ID " + conf.GetConfig().ExecutionID + " on peer " + conf.GetConfig().PeerID) | ||||
| 	} | ||||
| 	conf.GetConfig().WorkflowID = exec.WorkflowID | ||||
|  | ||||
| 	logger.Debug().Msg("Starting construction of yaml argo for workflow :" + exec.WorkflowID) | ||||
| @@ -83,7 +86,6 @@ func main() { | ||||
|  | ||||
| 	err := new_wf.LoadFrom(conf.GetConfig().WorkflowID, conf.GetConfig().PeerID) | ||||
| 	if err != nil { | ||||
|  | ||||
| 		logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API") | ||||
| 	} | ||||
|  | ||||
| @@ -95,7 +97,7 @@ func main() { | ||||
|  | ||||
| 	argoFilePath, err := builder.CompleteBuild(exec.ExecutionsID) | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg(err.Error()) | ||||
| 		logger.Error().Msg("Error when completing the build of the workflow: " + err.Error()) | ||||
| 	} | ||||
|  | ||||
| 	workflowName = getContainerName(argoFilePath) | ||||
| @@ -105,13 +107,13 @@ func main() { | ||||
|  | ||||
| 	if conf.GetConfig().KubeHost == "" { | ||||
| 		// Not in a k8s environment, get conf from parameters | ||||
| 		fmt.Println("Executes outside of k8s") | ||||
| 		logger.Info().Msg("Executes outside of k8s") | ||||
| 		executeOutside(argoFilePath, builder.Workflow) | ||||
| 	} else { | ||||
| 		// Executed in a k8s environment | ||||
| 		fmt.Println("Executes inside a k8s") | ||||
| 		logger.Info().Msg("Executes inside a k8s") | ||||
| 		// executeInside(exec.GetID(), "argo", argo_file_path, stepMax)  // commenting to use conf.ExecutionID instead of exec.GetID() | ||||
| 		executeInside(conf.GetConfig().ExecutionID, conf.GetConfig().ExecutionID, argoFilePath) | ||||
| 		executeInside(conf.GetConfig().ExecutionID, exec.ExecutionsID, argoFilePath) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -124,23 +126,25 @@ func executeInside(execID string, ns string, argo_file_path string) { | ||||
| 	} | ||||
| 	 | ||||
| 	name, err := t.CreateArgoWorkflow(argo_file_path, ns) | ||||
| 	_ = name  | ||||
| 	// _ = name  | ||||
| 	if err != nil { | ||||
| 		logger.Error().Msg("Could not create argo workflow : " + err.Error()) | ||||
| 		fmt.Println("CA :" + conf.GetConfig().KubeCA) | ||||
| 		fmt.Println("Cert :" + conf.GetConfig().KubeCert) | ||||
| 		fmt.Println("Data :" + conf.GetConfig().KubeData) | ||||
| 		logger.Info().Msg(fmt.Sprint("CA :" + conf.GetConfig().KubeCA)) | ||||
| 		logger.Info().Msg(fmt.Sprint("Cert :" + conf.GetConfig().KubeCert)) | ||||
| 		logger.Info().Msg(fmt.Sprint("Data :" + conf.GetConfig().KubeData)) | ||||
| 		return | ||||
| 	} else { | ||||
| 		watcher, err := t.GetArgoWatch(execID, workflowName) | ||||
| 		watcher, err := t.GetArgoWatch(ns, workflowName) | ||||
| 		if err != nil { | ||||
| 			logger.Error().Msg("Could not retrieve Watcher : " + err.Error()) | ||||
| 		} | ||||
|  | ||||
| 		l.LogKubernetesArgo(name, execID, watcher) | ||||
| 		l.LogKubernetesArgo(name, ns, watcher) | ||||
| 		if err != nil { | ||||
| 			logger.Error().Msg("Could not log workflow : " + err.Error()) | ||||
| 		} | ||||
|  | ||||
| 		logger.Info().Msg("Finished, exiting...") | ||||
| 	} | ||||
|  | ||||
| } | ||||
| @@ -173,7 +177,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { | ||||
| 	go l.LogLocalWorkflow(workflowName, stdoutSubmit, &wg) | ||||
| 	go l.LogLocalPod(workflowName, stdoutLogs, steps, &wg) | ||||
|  | ||||
| 	fmt.Println("Starting argo submit") | ||||
| 	logger.Info().Msg("Starting argo submit") | ||||
| 	if err := cmdSubmit.Start(); err != nil { | ||||
| 		wf_logger.Error().Msg("Could not start argo submit") | ||||
| 		wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text()) | ||||
| @@ -182,7 +186,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { | ||||
|  | ||||
| 	time.Sleep(5 * time.Second) | ||||
|  | ||||
| 	fmt.Println("Running argo logs") | ||||
| 	logger.Info().Msg("Running argo logs") | ||||
| 	if err := cmdLogs.Run(); err != nil { | ||||
| 		wf_logger.Error().Msg("Could not run '" + strings.Join(cmdLogs.Args, " ") + "'") | ||||
| 		 | ||||
| @@ -190,7 +194,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) { | ||||
|  | ||||
| 	} | ||||
|  | ||||
| 	fmt.Println("Waiting argo submit") | ||||
| 	logger.Info().Msg("Waiting argo submit") | ||||
| 	if err := cmdSubmit.Wait(); err != nil { | ||||
| 		wf_logger.Error().Msg("Could not execute argo submit") | ||||
| 		wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text()) | ||||
| @@ -231,7 +235,7 @@ func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) { | ||||
|  | ||||
| 	err := parser.Parse(os.Args) | ||||
| 	if err != nil { | ||||
| 		fmt.Println(parser.Usage(err)) | ||||
| 		logger.Info().Msg(parser.Usage(err)) | ||||
| 		os.Exit(1) | ||||
| 	} | ||||
| 	conf.GetConfig().Logs = "debug" | ||||
|   | ||||
							
								
								
									
										
											BIN
										
									
								
								oc-monitord
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								oc-monitord
									
									
									
									
									
										Executable file
									
								
							
										
											Binary file not shown.
										
									
								
							| @@ -83,7 +83,8 @@ func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, er | ||||
| 	if err != nil { | ||||
| 		return "", errors.New("failed to create workflow: " + err.Error()) | ||||
| 	} | ||||
| 	fmt.Printf("workflow %s created in namespace %s\n", createdWf.Name, ns) | ||||
| 	l := utils.GetLogger() | ||||
| 	l.Info().Msg(fmt.Sprintf("workflow %s created in namespace %s\n", createdWf.Name, ns)) | ||||
| 	return createdWf.Name, nil | ||||
| } | ||||
|  | ||||
| @@ -117,14 +118,12 @@ func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password s | ||||
| func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch.Interface, error){ | ||||
| 	wfl := utils.GetWFLogger("") | ||||
| 	wfl.Debug().Msg("Starting argo watch with argo lib") | ||||
| 	fmt.Println("metadata.name=oc-monitor-"+wfName + "  in namespace : " + executionId) | ||||
| 	options := metav1.ListOptions{FieldSelector: "metadata.name=oc-monitor-"+wfName} | ||||
| 	fmt.Println(options) | ||||
| 	watcher, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(executionId).Watch(context.TODO(), options) | ||||
|  | ||||
| 	watcher, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(executionId).Watch(context.Background(), options) | ||||
| 	if err != nil { | ||||
| 		return nil, errors.New("Error executing 'argo watch " + wfName + " -n " + executionId + " with ArgoprojV1alpha1 client") | ||||
| 	} | ||||
| 	 | ||||
|  | ||||
| 	return watcher, nil  | ||||
|  | ||||
| @@ -133,16 +132,18 @@ func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch | ||||
| func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) { | ||||
| 	var targetPod v1.Pod | ||||
|  | ||||
| 	 | ||||
| 	pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{ | ||||
|         LabelSelector: "workflows.argoproj.io/workflow="+wfName, | ||||
| 		LabelSelector: "workflows.argoproj.io/workflow="+wfName, | ||||
|     }) | ||||
|     if err != nil { | ||||
| 		return nil, fmt.Errorf("failed to list pods: " + err.Error()) | ||||
|     } | ||||
|     if len(pods.Items) == 0 { | ||||
| 		return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/node-name=" + nodeName) | ||||
| 		 | ||||
| 		return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/workflow="+ wfName + " no pods found with label workflows.argoproj.io/node-name=" + nodeName + " in namespace " + ns) | ||||
|     } | ||||
|  | ||||
| 	 | ||||
|     for _, pod := range pods.Items { | ||||
| 		if pod.Annotations["workflows.argoproj.io/node-name"] == nodeName { | ||||
| 			targetPod = pod | ||||
| @@ -172,7 +173,8 @@ func (k *KubernetesTools) testPodReady(pod v1.Pod, ns string) { | ||||
| 	 | ||||
| 		var initialized bool | ||||
| 		for _, cond := range pod.Status.Conditions { | ||||
| 			if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue { | ||||
| 			// It seems that for remote pods the pod gets the Succeeded status before it has time to display the it is ready to run in .status.conditions,so we added the OR condition | ||||
| 			if (cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue) || pod.Status.Phase == v1.PodSucceeded {		 | ||||
| 				initialized = true | ||||
| 				return | ||||
| 			} | ||||
|   | ||||
| @@ -17,11 +17,13 @@ var ( | ||||
| 	onceLogger 	sync.Once | ||||
| 	onceWF 		sync.Once | ||||
| ) | ||||
|  | ||||
| func GetExecution(exec_id string) *workflow_execution.WorkflowExecution { | ||||
| 	res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", conf.GetConfig().PeerID, []string{}, nil).LoadOne(exec_id) | ||||
| 	if res.Code != 200 { | ||||
| 		logger := oclib.GetLogger() | ||||
| 		logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id) | ||||
| 		logger.Error().Msg("Error retrieving execution " + exec_id) | ||||
| 		logger.Error().Msg(res.Err) | ||||
| 		return nil | ||||
| 	} | ||||
| 	return res.ToWorkflowExecution() | ||||
|   | ||||
| @@ -4,6 +4,7 @@ import ( | ||||
| 	"encoding/json" | ||||
| 	"fmt" | ||||
| 	"net/http" | ||||
| 	"oc-monitord/utils" | ||||
| 	"slices" | ||||
| 	"time" | ||||
|  | ||||
| @@ -21,7 +22,7 @@ type AdmiraltySetter struct { | ||||
|  | ||||
| func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID string) error { | ||||
| 	 | ||||
| 	logger = logs.GetLogger() | ||||
| 	logger := logs.GetLogger() | ||||
|  | ||||
| 	data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(remotePeerID) | ||||
| 	if data.Code != 200 { | ||||
| @@ -30,42 +31,42 @@ func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID st | ||||
| 	} | ||||
| 	remotePeer := data.ToPeer() | ||||
|  | ||||
| 	data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(localPeerID) | ||||
| 	if data.Code != 200 { | ||||
| 		logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID) | ||||
| 		return fmt.Errorf(data.Err) | ||||
| 	} | ||||
| 	localPeer := data.ToPeer() | ||||
| 			data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(localPeerID) | ||||
| 			if data.Code != 200 { | ||||
| 				logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID) | ||||
| 				return fmt.Errorf(data.Err) | ||||
| 			} | ||||
| 			localPeer := data.ToPeer() | ||||
|  | ||||
| 	caller := tools.NewHTTPCaller( | ||||
| 		map[tools.DataType]map[tools.METHOD]string{ | ||||
| 			tools.ADMIRALTY_SOURCE: map[tools.METHOD]string{ | ||||
| 				tools.POST :"/:id", | ||||
| 			caller := tools.NewHTTPCaller( | ||||
| 				map[tools.DataType]map[tools.METHOD]string{ | ||||
| 					tools.ADMIRALTY_SOURCE: { | ||||
| 						tools.POST :"/:id", | ||||
| 					}, | ||||
| 					tools.ADMIRALTY_KUBECONFIG: { | ||||
| 						tools.GET:"/:id", | ||||
| 					}, | ||||
| 					tools.ADMIRALTY_SECRET: { | ||||
| 						tools.POST:"/:id/" + remotePeerID, | ||||
| 					}, | ||||
| 					tools.ADMIRALTY_TARGET: { | ||||
| 						tools.POST:"/:id/" + remotePeerID, | ||||
| 					}, | ||||
| 					tools.ADMIRALTY_NODES: { | ||||
| 						tools.GET:"/:id/" + remotePeerID, | ||||
| 					}, | ||||
| 			}, | ||||
| 			tools.ADMIRALTY_KUBECONFIG: map[tools.METHOD]string{ | ||||
| 				tools.GET:"/:id", | ||||
| 			}, | ||||
| 			tools.ADMIRALTY_SECRET: map[tools.METHOD]string{ | ||||
| 				tools.POST:"/:id", | ||||
| 			}, | ||||
| 			tools.ADMIRALTY_TARGET: map[tools.METHOD]string{ | ||||
| 				tools.POST:"/:id", | ||||
| 			}, | ||||
| 			tools.ADMIRALTY_NODES: map[tools.METHOD]string{ | ||||
| 				tools.GET:"/:id", | ||||
| 			}, | ||||
| 		}, | ||||
| 	) | ||||
|  | ||||
| 	logger.Info().Msg(" Creating the Admiralty Source on " + remotePeerID + " ns-" + s.Id + "\n\n") | ||||
| 		) | ||||
| 	 | ||||
| 	logger.Info().Msg("\n\n  Creating the Admiralty Source on " + remotePeerID + " ns-" + s.Id) | ||||
| 	_ = s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict},caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true) | ||||
| 	logger.Info().Msg(" Retrieving kubeconfig with the secret on " + remotePeerID + " ns-" + s.Id + "\n\n") | ||||
| 	logger.Info().Msg("\n\n  Retrieving kubeconfig with the secret on " + remotePeerID + " ns-" + s.Id) | ||||
| 	kubeconfig := s.getKubeconfig(remotePeer, caller) | ||||
| 	logger.Info().Msg(" Creating a secret from the kubeconfig " + localPeerID + " ns-" + s.Id + "\n\n") | ||||
| 	logger.Info().Msg("\n\n  Creating a secret from the kubeconfig " + localPeerID + " ns-" + s.Id) | ||||
| 	_ = s.callRemoteExecution(localPeer, []int{http.StatusCreated}, caller,s.Id, tools.ADMIRALTY_SECRET, tools.POST,kubeconfig, true) | ||||
| 	logger.Info().Msg(" Creating the Admiralty Target on " + localPeerID + " ns-" + s.Id + "\n\n") | ||||
| 	logger.Info().Msg("\n\n Creating the Admiralty Target on " + localPeerID + " in namespace " + s.Id ) | ||||
| 	_ = s.callRemoteExecution(localPeer,[]int{http.StatusCreated, http.StatusConflict},caller,s.Id,tools.ADMIRALTY_TARGET,tools.POST, nil, true) | ||||
| 	logger.Info().Msg(" Checking for the creation of the admiralty node on " + localPeerID + " ns-" + s.Id + "\n\n") | ||||
| 	logger.Info().Msg("\n\n  Checking for the creation of the admiralty node on " + localPeerID + " ns-" + s.Id) | ||||
| 	s.checkNodeStatus(localPeer,caller) | ||||
| 	 | ||||
| 	return nil | ||||
| @@ -75,12 +76,14 @@ func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCalle | ||||
| 	var kubedata map[string]string | ||||
| 	_ = s.callRemoteExecution(peer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_KUBECONFIG, tools.GET, nil, true) | ||||
| 	if caller.LastResults["body"] == nil || len(caller.LastResults["body"].([]byte)) == 0 { | ||||
| 		fmt.Println("Something went wrong when retrieving data from Get call for kubeconfig") | ||||
| 		l := utils.GetLogger() | ||||
| 		l.Error().Msg("Something went wrong when retrieving data from Get call for kubeconfig") | ||||
| 		panic(0) | ||||
| 	} | ||||
| 	err := json.Unmarshal(caller.LastResults["body"].([]byte), &kubedata) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("Something went wrong when unmarshalling data from Get call for kubeconfig") | ||||
| 		l := utils.GetLogger() | ||||
| 		l.Error().Msg("Something went wrong when unmarshalling data from Get call for kubeconfig") | ||||
| 		panic(0) | ||||
| 	} | ||||
|  | ||||
| @@ -88,18 +91,18 @@ func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCalle | ||||
| } | ||||
|  | ||||
| func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int,caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) *peer.PeerExecution { | ||||
| 	l := utils.GetLogger() | ||||
| 	resp, err := peer.LaunchPeerExecution(peer.UUID, dataID, dt, method, body, caller) | ||||
| 	if err != nil { | ||||
| 		fmt.Println("Error when executing on peer at", peer.Url) | ||||
| 		fmt.Println(err) | ||||
| 		l.Error().Msg("Error when executing on peer at" + peer.Url) | ||||
| 		l.Error().Msg(err.Error()) | ||||
| 		panic(0) | ||||
| 	} | ||||
|  | ||||
| 	if !slices.Contains(expectedCode, caller.LastResults["code"].(int)) { | ||||
| 		fmt.Println("Didn't receive the expected code :", caller.LastResults["code"], "when expecting", expectedCode) | ||||
| 		l.Error().Msg(fmt.Sprint("Didn't receive the expected code :", caller.LastResults["code"], "when expecting", expectedCode)) | ||||
| 		if _, ok := caller.LastResults["body"]; ok { | ||||
| 			logger.Info().Msg(string(caller.LastResults["body"].([]byte))) | ||||
| 			// fmt.Println(string(caller.LastResults["body"].([]byte))) | ||||
| 			l.Info().Msg(string(caller.LastResults["body"].([]byte))) | ||||
| 		} | ||||
| 		if panicCode { | ||||
| 			panic(0) | ||||
| @@ -120,14 +123,15 @@ func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller){ | ||||
| 		name := metadata.(map[string]interface{})["name"].(string) | ||||
| 		s.NodeName = name | ||||
| 	} else { | ||||
| 		fmt.Println("Could not retrieve data about the recently created node") | ||||
| 		l := utils.GetLogger() | ||||
| 		l.Error().Msg("Could not retrieve data about the recently created node") | ||||
| 		panic(0) | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller){ | ||||
| 	for i := range(5) { | ||||
| 		time.Sleep(5 * time.Second) // let some time for kube to generate the node | ||||
| 		time.Sleep(10 * time.Second) // let some time for kube to generate the node | ||||
| 		_ = s.callRemoteExecution(localPeer,[]int{http.StatusOK},caller,s.Id,tools.ADMIRALTY_NODES,tools.GET, nil, false) | ||||
| 		if caller.LastResults["code"] == 200 { | ||||
| 			s.storeNodeName(caller) | ||||
|   | ||||
| @@ -26,11 +26,11 @@ import ( | ||||
| var logger zerolog.Logger | ||||
|  | ||||
| type ArgoBuilder struct { | ||||
| 	OriginWorkflow 	*w.Workflow | ||||
| 	Workflow       	Workflow | ||||
| 	Services       	[]*Service | ||||
| 	Timeout        	int | ||||
| 	RemotePeers		[]string | ||||
| 	OriginWorkflow *w.Workflow | ||||
| 	Workflow       Workflow | ||||
| 	Services       []*Service | ||||
| 	Timeout        int | ||||
| 	RemotePeers    []string | ||||
| } | ||||
|  | ||||
| type Workflow struct { | ||||
| @@ -59,13 +59,16 @@ type Spec struct { | ||||
| 	Volumes    			[]VolumeClaimTemplate 	`yaml:"volumeClaimTemplates,omitempty"` | ||||
| 	Templates  			[]Template            	`yaml:"templates"` | ||||
| 	Timeout    			int                   	`yaml:"activeDeadlineSeconds,omitempty"` | ||||
| 	NodeSelector		struct{ | ||||
| 							NodeRole string `yaml:"node-role"` | ||||
| 						} `yaml:"nodeSelector"` | ||||
| } | ||||
|  | ||||
| // TODO: found on a processing instance linked to storage | ||||
| // add s3, gcs, azure, etc if needed on a link between processing and storage | ||||
| func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, []string, error) { | ||||
| 	logger = logs.GetLogger() | ||||
| 	fmt.Println("Creating DAG", b.OriginWorkflow.Graph.Items) | ||||
| 	logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items)) | ||||
| 	// handle services by checking if there is only one processing with hostname and port | ||||
| 	firstItems, lastItems, volumes := b.createTemplates(namespace) | ||||
| 	b.createVolumes(volumes) | ||||
| @@ -73,6 +76,7 @@ func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, [ | ||||
| 	if b.Timeout > 0 { | ||||
| 		b.Workflow.Spec.Timeout = b.Timeout | ||||
| 	} | ||||
| 	b.Workflow.Spec.NodeSelector.NodeRole = "worker" | ||||
| 	b.Workflow.Spec.ServiceAccountName = "sa-"+namespace | ||||
| 	b.Workflow.Spec.Entrypoint = "dag" | ||||
| 	b.Workflow.ApiVersion = "argoproj.io/v1alpha1" | ||||
| @@ -90,10 +94,10 @@ func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []V | ||||
| 	firstItems := []string{} | ||||
| 	lastItems := []string{} | ||||
| 	items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) | ||||
| 	fmt.Println("Creating templates", len(items)) | ||||
| 	logger.Info().Msg(fmt.Sprint("Creating templates", len(items))) | ||||
| 	for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) { | ||||
| 		instance := item.Processing.GetSelectedInstance() | ||||
| 		fmt.Println("Creating template for", item.Processing.GetName(), instance) | ||||
| 		logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance)) | ||||
| 		if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil { | ||||
| 			logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName()) | ||||
| 			return firstItems, lastItems, volumes | ||||
| @@ -176,10 +180,12 @@ func (b *ArgoBuilder) createArgoTemplates(namespace string, | ||||
| 	lastItems []string) ([]VolumeMount, []string, []string) { | ||||
| 	_, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems) | ||||
| 	template := &Template{Name: getArgoName(processing.GetName(), id)} | ||||
| 	fmt.Println("Creating template for", template.Name) | ||||
| 	isReparted, peerId := b.isProcessingReparted(*processing,id) | ||||
| 	logger.Info().Msg(fmt.Sprint("Creating template for", template.Name)) | ||||
| 	isReparted, peerId := b.isProcessingReparted(*processing, id) | ||||
| 	template.CreateContainer(processing, b.Workflow.getDag()) | ||||
| 	 | ||||
| 	if isReparted { | ||||
| 		logger.Debug().Msg("Reparted processing, on " + peerId) | ||||
| 		b.RemotePeers = append(b.RemotePeers, peerId) | ||||
| 		template.AddAdmiraltyAnnotations(peerId) | ||||
| 	} | ||||
| @@ -325,7 +331,7 @@ func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) { | ||||
| 	isDeps := false | ||||
| 	for _, link := range b.OriginWorkflow.Graph.Links { | ||||
| 		if _, ok := b.OriginWorkflow.Graph.Items[link.Destination.ID]; !ok { | ||||
| 			fmt.Println("Could not find the source of the link", link.Destination.ID) | ||||
| 			logger.Info().Msg(fmt.Sprint("Could not find the source of the link", link.Destination.ID)) | ||||
| 			continue | ||||
| 		} | ||||
| 		source := b.OriginWorkflow.Graph.Items[link.Destination.ID].Processing | ||||
| @@ -345,7 +351,7 @@ func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) { | ||||
| func (b *ArgoBuilder) getArgoDependencies(id string) (dependencies []string) { | ||||
| 	for _, link := range b.OriginWorkflow.Graph.Links { | ||||
| 		if _, ok := b.OriginWorkflow.Graph.Items[link.Source.ID]; !ok { | ||||
| 			fmt.Println("Could not find the source of the link", link.Source.ID) | ||||
| 			logger.Info().Msg(fmt.Sprint("Could not find the source of the link", link.Source.ID)) | ||||
| 			continue | ||||
| 		} | ||||
| 		source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing | ||||
| @@ -367,20 +373,19 @@ func getArgoName(raw_name string, component_id string) (formatedName string) { | ||||
|  | ||||
| // Verify if a processing resource is attached to another Compute than the one hosting | ||||
| // the current Open Cloud instance. If true return the peer ID to contact | ||||
| func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource, graphID string) (bool,string) { | ||||
| func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource, graphID string) (bool, string) { | ||||
| 	computeAttached := b.retrieveProcessingCompute(graphID) | ||||
| 	if computeAttached == nil { | ||||
| 		logger.Error().Msg("No compute was found attached to processing " + processing.Name + " : " + processing.UUID ) | ||||
| 		logger.Error().Msg("No compute was found attached to processing " + processing.Name + " : " + processing.UUID) | ||||
| 		panic(0) | ||||
| 	} | ||||
|  | ||||
| 	 | ||||
| 	// Creates an accessor srtictly for Peer Collection  | ||||
| 	req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"","",nil,nil) | ||||
| 	// Creates an accessor srtictly for Peer Collection | ||||
| 	req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil) | ||||
| 	if req == nil { | ||||
| 		fmt.Println("TODO : handle error when trying to create a request on the Peer Collection") | ||||
| 		return false, "" | ||||
| 	}  | ||||
| 	} | ||||
|  | ||||
| 	res := req.LoadOne(computeAttached.CreatorID) | ||||
| 	if res.Err != "" { | ||||
| @@ -388,25 +393,25 @@ func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResour | ||||
| 		fmt.Print(res.Err) | ||||
| 		return false, "" | ||||
| 	} | ||||
| 	 | ||||
|  | ||||
| 	peer := *res.ToPeer() | ||||
|  | ||||
| 	isNotReparted, _ := peer.IsMySelf() | ||||
| 	fmt.Println("Result IsMySelf for ", peer.UUID ," : ", isNotReparted) | ||||
| 	 | ||||
| 	isNotReparted := peer.State == 1 | ||||
| 	logger.Info().Msg(fmt.Sprint("Result IsMySelf for ", peer.UUID, " : ", isNotReparted)) | ||||
|  | ||||
| 	return !isNotReparted, peer.UUID | ||||
| } | ||||
|  | ||||
| func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.ComputeResource { | ||||
| 	for _, link := range b.OriginWorkflow.Graph.Links { | ||||
| 		// If a link contains the id of the processing | ||||
| 		var oppositeId string  | ||||
| 		if link.Source.ID == graphID{ | ||||
| 		var oppositeId string | ||||
| 		if link.Source.ID == graphID { | ||||
| 			oppositeId = link.Destination.ID | ||||
| 		} else if(link.Destination.ID == graphID){ | ||||
| 		} else if link.Destination.ID == graphID { | ||||
| 			oppositeId = link.Source.ID | ||||
| 		} | ||||
| 		fmt.Println("OppositeId : ", oppositeId) | ||||
| 		 | ||||
| 		if oppositeId != "" { | ||||
| 			dt, res := b.OriginWorkflow.Graph.GetResource(oppositeId) | ||||
| 			if dt == oclib.COMPUTE_RESOURCE { | ||||
| @@ -417,27 +422,25 @@ func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.Compu | ||||
| 		} | ||||
|  | ||||
| 	} | ||||
| 		 | ||||
| 	return nil  | ||||
| } | ||||
|  | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| // Execute the last actions once the YAML file for the Argo Workflow is created | ||||
| func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { | ||||
| 	fmt.Println("DEV :: Completing build") | ||||
| 	logger.Info().Msg(fmt.Sprint("DEV :: Completing build")) | ||||
| 	setter := AdmiraltySetter{Id: executionsId} | ||||
| 	// Setup admiralty for each node | ||||
| 	for _, peer := range b.RemotePeers { | ||||
| 		fmt.Println("DEV :: Launching Admiralty Setup for ", peer) | ||||
| 		logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer)) | ||||
| 		setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer) | ||||
| 	} | ||||
|  | ||||
| 	// Update the name of the admiralty node to use  | ||||
| 	for _, template := range b.Workflow.Spec.Templates { | ||||
| 		if len(template.Metadata.Annotations) > 0 { | ||||
| 			if resp, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok { | ||||
| 				fmt.Println(resp) | ||||
| 				template.Metadata.Annotations["multicluster.admiralty.io/clustername"] = "target-" + conf.GetConfig().ExecutionID | ||||
| 			if peerId, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok { | ||||
| 				template.Metadata.Annotations["multicluster.admiralty.io/clustername"] = "target-" + peerId + "-" + conf.GetConfig().ExecutionID | ||||
| 			} | ||||
| 		} | ||||
| 	} | ||||
| @@ -463,4 +466,4 @@ func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { | ||||
| 	} | ||||
|  | ||||
| 	return workflows_dir + file_name, nil | ||||
| } | ||||
| } | ||||
|   | ||||
| @@ -14,7 +14,7 @@ type WorflowDB struct { | ||||
|  | ||||
| // Create the obj!ects from the mxgraphxml stored in the workflow given as a parameter | ||||
| func (w *WorflowDB) LoadFrom(workflow_id string, peerID string) error { | ||||
| 	fmt.Println("Loading workflow from " + workflow_id) | ||||
| 	logger.Info().Msg("Loading workflow from " + workflow_id) | ||||
| 	var err error | ||||
| 	if w.Workflow, err = w.getWorkflow(workflow_id, peerID); err != nil { | ||||
| 		return err | ||||
| @@ -27,7 +27,7 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo | ||||
| 	logger := oclib.GetLogger() | ||||
|  | ||||
| 	lib_data := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerID, []string{}, nil).LoadOne(workflow_id) | ||||
| 	fmt.Println("ERR", lib_data.Code, lib_data.Err) | ||||
| 	logger.Info().Msg(fmt.Sprint("ERR", lib_data.Code, lib_data.Err)) | ||||
| 	if lib_data.Code != 200 { | ||||
| 		logger.Error().Msg("Error loading the graph") | ||||
| 		return workflow, errors.New(lib_data.Err) | ||||
| @@ -43,7 +43,7 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo | ||||
|  | ||||
| func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, int, error) { | ||||
| 	logger := oclib.GetLogger() | ||||
| 	fmt.Println("Exporting to Argo", w.Workflow) | ||||
| 	logger.Info().Msg(fmt.Sprint("Exporting to Argo", w.Workflow)) | ||||
| 	if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil { | ||||
| 		return nil, 0, fmt.Errorf("can't export a graph that has not been loaded yet") | ||||
| 	} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user