Compare commits

..

No commits in common. "main" and "feature/admiralty-docker" have entirely different histories.

16 changed files with 173 additions and 172 deletions

View File

@ -3,8 +3,6 @@
build: clean build: clean
go build . go build .
dev: build
run: run:
./oc-monitord ./oc-monitord
@ -24,4 +22,4 @@ publish-registry:
all: docker publish-kind publish-registry all: docker publish-kind publish-registry
.PHONY: build run clean docker publish-kind publish-registry .PHONY: build run clean docker publish-kind publish-registry

View File

@ -1,26 +1,52 @@
# oc-monitor # oc-monitor
DO : ## Deploy in k8s (dev)
make build
## Summary While a registry with all of the OC docker images has not been set-up we can export this image to k3s ctr
oc-monitord is a daemon which can be run : > docker save oc-monitord:latest | sudo k3s ctr images import -
- as a binary
- as a container
It is used to perform several actions regarding the execution of an Open Cloud workflow : Then in the pod manifest for oc-monitord use :
- generating a YAML file that can be interpreted by **Argo Workflow** to create and execute pods in a kubernetes environment
- setting up the different resources needed to execute a workflow over several peers/kubernetes nodes with **Admiralty** : token, secrets, targets and sources ```
- creating the workflow and logging the output from image: docker.io/library/oc-monitord
- Argo watch, which gives informations about the workflow in general (phase, number of steps executed, status...) imagePullPolicy: Never
- Pods : which are the logs generated by the pods ```
Not doing so will end up in the pod having a `ErrorImagePull`
## Allow argo to create services
In order for monitord to expose **open cloud services** on the node, we need to give him permission to create **k8s services**.
For that we can update the RBAC configuration for a role already created by argo :
### Manually edit the rbac authorization
> kubectl edit roles.rbac.authorization.k8s.io -n argo argo-role
In rules add a new entry :
```
- apiGroups:
- ""
resources:
- services
verbs:
- get
- create
```
### Patch the rbac authorization with a one liner
> kubectl patch role argo-role -n argo --type='json' -p='[{"op": "add", "path": "/rules/-", "value": {"apiGroups": [""], "resources": ["services"], "verbs": ["get","create"]}}]'
### Check wether the modification is effective
> kubectl auth can-i create services --as=system:serviceaccount:argo:argo -n argo
This command **must return "yes"**
To execute, the daemon needs several options :
- **-u** :
- **-m** :
- **-d** :
- **-e** :
# Notes features/admiralty-docker # Notes features/admiralty-docker
@ -31,17 +57,14 @@ To execute, the daemon needs several options :
- decide that no peer can have "http://localhost" as its url and use an attribute from the peer object or isMyself() from oc-lib if a peer is the current host. - decide that no peer can have "http://localhost" as its url and use an attribute from the peer object or isMyself() from oc-lib if a peer is the current host.
## TODO ## TODO
- [ ] Allow the front to known on which IP the service are reachable - [ ] Allow the front to known on which IP the service are reachable
- currently doing it by using `kubectl get nodes -o wide` - currently doing it by using `kubectl get nodes -o wide`
- [ ] Implement writing and reading from S3 bucket/MinIO when a data resource is linked to a compute resource.
### Adding ingress handling to support reverse proxing
### Adding ingress handling to support reverse proxing
- Test wether ingress-nginx is running or not - Test wether ingress-nginx is running or not
- Do something if not found : stop running and send error log OR start installation - Do something if not found : stop running and send error log OR start installation
- -

Binary file not shown.

Before

Width:  |  Height:  |  Size: 71 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 91 KiB

View File

@ -1,4 +0,0 @@
KUBERNETES_SERVICE_HOST=192.168.1.169
KUBE_CA="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFTVlk3ZHZhNEdYTVdkMy9jMlhLN3JLYjlnWXgyNSthaEE0NmkyNVBkSFAKRktQL2UxSVMyWVF0dzNYZW1TTUQxaStZdzJSaVppNUQrSVZUamNtNHdhcnFvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVWtlUVJpNFJiODduME5yRnZaWjZHClc2SU55NnN3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUlnRXA5ck04WmdNclRZSHYxZjNzOW5DZXZZeWVVa3lZUk4KWjUzazdoaytJS1FDSVFDbk05TnVGKzlTakIzNDFacGZ5ays2NEpWdkpSM3BhcmVaejdMd2lhNm9kdz09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K"
KUBE_CERT="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJWUxWNkFPQkdrU1F3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOekl6TVRFeU1ETTJNQjRYRFRJME1EZ3dPREV3TVRNMU5sb1hEVEkxTURndwpPREV3TVRNMU5sb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJGQ2Q1MFdPeWdlQ2syQzcKV2FrOWY4MVAvSkJieVRIajRWOXBsTEo0ck5HeHFtSjJOb2xROFYxdUx5RjBtOTQ2Nkc0RmRDQ2dqaXFVSk92Swp3NVRPNnd5alNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFJkOFI5cXVWK2pjeUVmL0ovT1hQSzMyS09XekFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQTArbThqTDBJVldvUTZ0dnB4cFo4NVlMalF1SmpwdXM0aDdnSXRxS3NmUVVDSUI2M2ZNdzFBMm5OVWU1TgpIUGZOcEQwSEtwcVN0Wnk4djIyVzliYlJUNklZCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFRc3hXWk9pbnIrcVp4TmFEQjVGMGsvTDF5cE01VHAxOFRaeU92ektJazQKRTFsZWVqUm9STW0zNmhPeVljbnN3d3JoNnhSUnBpMW5RdGhyMzg0S0Z6MlBvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBYZkVmYXJsZm8zTWhIL3lmemx6Cnl0OWlqbHN3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUxJL2dNYnNMT3MvUUpJa3U2WHVpRVMwTEE2cEJHMXgKcnBlTnpGdlZOekZsQWlFQW1wdjBubjZqN3M0MVI0QzFNMEpSL0djNE53MHdldlFmZWdEVGF1R2p3cFk9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K"
KUBE_DATA="LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSU5ZS1BFb1dhd1NKUzJlRW5oWmlYMk5VZlY1ZlhKV2krSVNnV09TNFE5VTlvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFVUozblJZN0tCNEtUWUx0WnFUMS96VS84a0Z2Sk1lUGhYMm1Vc25pczBiR3FZblkyaVZEeApYVzR2SVhTYjNqcm9iZ1YwSUtDT0twUWs2OHJEbE03ckRBPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo="

3
go.mod
View File

@ -20,7 +20,6 @@ require (
github.com/golang/protobuf v1.5.4 // indirect github.com/golang/protobuf v1.5.4 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect github.com/sirupsen/logrus v1.9.3 // indirect
github.com/ugorji/go/codec v1.1.7 // indirect
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect
@ -28,6 +27,7 @@ require (
) )
require ( require (
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
github.com/argoproj/argo-workflows/v3 v3.6.4 github.com/argoproj/argo-workflows/v3 v3.6.4
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/biter777/countries v1.7.5 // indirect github.com/biter777/countries v1.7.5 // indirect
@ -71,6 +71,7 @@ require (
github.com/robfig/cron v1.2.0 // indirect github.com/robfig/cron v1.2.0 // indirect
github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 // indirect github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 // indirect
github.com/smartystreets/goconvey v1.6.4 // indirect github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/ugorji/go/codec v1.1.7 // indirect
github.com/x448/float16 v0.8.4 // indirect github.com/x448/float16 v0.8.4 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/scram v1.1.2 // indirect

2
go.sum
View File

@ -3,6 +3,8 @@ cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9 h1:mSFFPwil5Ih+RPBvn88MBerQMtsoHnOuyCZQaf91a34= cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9 h1:mSFFPwil5Ih+RPBvn88MBerQMtsoHnOuyCZQaf91a34=
cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE= cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8=
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc= github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc=
github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=

View File

@ -7,8 +7,6 @@ import (
"oc-monitord/tools" "oc-monitord/tools"
"oc-monitord/utils" "oc-monitord/utils"
"slices" "slices"
"strings"
"sync"
"time" "time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@ -108,16 +106,13 @@ func NewArgoPodLog(name string, step string, msg string) ArgoPodLog {
} }
} }
func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) { func LogKubernetesArgo(wfName string, executionID string, watcher watch.Interface) {
var argoWatcher *ArgoWatch var argoWatcher *ArgoWatch
var pods []string var pods []string
var node wfv1.NodeStatus var node wfv1.NodeStatus
wfl := utils.GetWFLogger("") wfl := utils.GetWFLogger("")
wfl.Debug().Msg("Starting to log " + wfName)
var wg sync.WaitGroup
for event := range (watcher.ResultChan()) { for event := range (watcher.ResultChan()) {
wf, ok := event.Object.(*wfv1.Workflow) wf, ok := event.Object.(*wfv1.Workflow)
if !ok { if !ok {
@ -125,7 +120,7 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface)
continue continue
} }
if len(wf.Status.Nodes) == 0 { if len(wf.Status.Nodes) == 0 {
wfl.Info().Msg("No node status yet") // The first output of the channel doesn't contain Nodes so we skip it wfl.Debug().Msg("No node status yet") // The first output of the channel doesn't contain Nodes so we skip it
continue continue
} }
@ -143,7 +138,7 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface)
newWatcher := ArgoWatch{ newWatcher := ArgoWatch{
Name: node.Name, Name: node.Name,
Namespace: namespace, Namespace: executionID,
Status: string(node.Phase), Status: string(node.Phase),
Created: node.StartedAt.String(), Created: node.StartedAt.String(),
Started: node.StartedAt.String(), Started: node.StartedAt.String(),
@ -168,9 +163,7 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface)
if !slices.Contains(pods,pod.Name){ if !slices.Contains(pods,pod.Name){
pl := wfl.With().Str("pod", pod.Name).Logger() pl := wfl.With().Str("pod", pod.Name).Logger()
if wfName == pod.Name { pods = append(pods, pod.Name); continue } // One of the node is the Workflow, the others are the pods so don't try to log on the wf name if wfName == pod.Name { pods = append(pods, pod.Name); continue } // One of the node is the Workflow, the others are the pods so don't try to log on the wf name
pl.Info().Msg("Found a new pod to log : " + pod.Name) go logKubernetesPods(executionID, wfName, pod.Name, pl)
wg.Add(1)
go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg)
pods = append(pods, pod.Name) pods = append(pods, pod.Name)
} }
} }
@ -178,8 +171,6 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface)
// Stop listening to the chan when the Workflow is completed or something bad happened // Stop listening to the chan when the Workflow is completed or something bad happened
if node.Phase.Completed() { if node.Phase.Completed() {
wfl.Info().Msg(wfName + " worflow completed") wfl.Info().Msg(wfName + " worflow completed")
wg.Wait()
wfl.Info().Msg(wfName + " exiting")
break break
} }
if node.Phase.FailedOrError() { if node.Phase.FailedOrError() {
@ -205,31 +196,24 @@ func retrieveCondition(wf *wfv1.Workflow) (c Conditions) {
} }
// Function needed to be executed as a go thread // Function needed to be executed as a go thread
func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger, wg *sync.WaitGroup){ func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger){
defer wg.Done()
s := strings.Split(podName, ".")
name := s[0] + "-" + s[1]
step := s[1]
k, err := tools.NewKubernetesTool() k, err := tools.NewKubernetesTool()
if err != nil { if err != nil {
logger.Error().Msg("Could not get Kubernetes tools") logger.Error().Msg("Could not get Kubernetes tools")
return return
} }
reader, err := k.GetPodLogger(executionId, wfName, podName) reader, err := k.GetPodLogger(executionId, wfName, podName)
if err != nil { if err != nil {
logger.Error().Msg(err.Error()) logger.Error().Msg(err.Error())
return return
} }
scanner := bufio.NewScanner(reader) scanner := bufio.NewScanner(reader)
for scanner.Scan() { for scanner.Scan() {
log := scanner.Text() log := scanner.Text()
podLog := NewArgoPodLog(name,step,log) podLog := NewArgoPodLog(wfName,podName,log)
jsonified, _ := json.Marshal(podLog) jsonified, _ := json.Marshal(podLog)
logger.Info().Msg(string(jsonified)) logger.Info().Msg(string(jsonified))
} }
} }

View File

@ -3,6 +3,7 @@ package logger
import ( import (
"bufio" "bufio"
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"oc-monitord/conf" "oc-monitord/conf"
@ -70,6 +71,7 @@ func LogLocalWorkflow(wfName string, pipe io.ReadCloser, wg *sync.WaitGroup) {
logger = logs.GetLogger() logger = logs.GetLogger()
logger.Debug().Msg("created wf_logger") logger.Debug().Msg("created wf_logger")
fmt.Println("created wf_logger")
wfLogger = logger.With().Str("argo_name", wfName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger() wfLogger = logger.With().Str("argo_name", wfName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger()
var current_watch, previous_watch ArgoWatch var current_watch, previous_watch ArgoWatch
@ -109,6 +111,7 @@ func LogLocalPod(wfName string, pipe io.ReadCloser, steps []string, wg *sync.Wai
scanner := bufio.NewScanner(pipe) scanner := bufio.NewScanner(pipe)
for scanner.Scan() { for scanner.Scan() {
var podLogger zerolog.Logger var podLogger zerolog.Logger
fmt.Println("new line")
wg.Add(1) wg.Add(1)
line := scanner.Text() line := scanner.Text()

41
main.go
View File

@ -67,14 +67,11 @@ func main() {
logger = u.GetLogger() logger = u.GetLogger()
logger.Debug().Msg("Loki URL : " + conf.GetConfig().LokiURL) logger.Debug().Msg("Loki URL : " + conf.GetConfig().LokiURL)
logger.Info().Msg("Workflow executed : " + conf.GetConfig().ExecutionID) logger.Debug().Msg("Workflow executed : " + conf.GetConfig().ExecutionID)
exec := u.GetExecution(conf.GetConfig().ExecutionID) exec := u.GetExecution(conf.GetConfig().ExecutionID)
if exec == nil {
logger.Fatal().Msg("Could not retrieve workflow ID from execution ID " + conf.GetConfig().ExecutionID + " on peer " + conf.GetConfig().PeerID)
}
conf.GetConfig().WorkflowID = exec.WorkflowID conf.GetConfig().WorkflowID = exec.WorkflowID
logger.Info().Msg("Starting construction of yaml argo for workflow :" + exec.WorkflowID) logger.Debug().Msg("Starting construction of yaml argo for workflow :" + exec.WorkflowID)
if _, err := os.Stat("./argo_workflows/"); os.IsNotExist(err) { if _, err := os.Stat("./argo_workflows/"); os.IsNotExist(err) {
os.Mkdir("./argo_workflows/", 0755) os.Mkdir("./argo_workflows/", 0755)
@ -86,6 +83,7 @@ func main() {
err := new_wf.LoadFrom(conf.GetConfig().WorkflowID, conf.GetConfig().PeerID) err := new_wf.LoadFrom(conf.GetConfig().WorkflowID, conf.GetConfig().PeerID)
if err != nil { if err != nil {
logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API") logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API")
} }
@ -97,20 +95,23 @@ func main() {
argoFilePath, err := builder.CompleteBuild(exec.ExecutionsID) argoFilePath, err := builder.CompleteBuild(exec.ExecutionsID)
if err != nil { if err != nil {
logger.Error().Msg("Error when completing the build of the workflow: " + err.Error()) logger.Error().Msg(err.Error())
} }
workflowName = getContainerName(argoFilePath) workflowName = getContainerName(argoFilePath)
wf_logger := u.GetWFLogger(workflowName)
wf_logger.Debug().Msg("Testing argo name")
if conf.GetConfig().KubeHost == "" { if conf.GetConfig().KubeHost == "" {
// Not in a k8s environment, get conf from parameters // Not in a k8s environment, get conf from parameters
logger.Info().Msg("Executes outside of k8s") fmt.Println("Executes outside of k8s")
executeOutside(argoFilePath, builder.Workflow) executeOutside(argoFilePath, builder.Workflow)
} else { } else {
// Executed in a k8s environment // Executed in a k8s environment
logger.Info().Msg("Executes inside a k8s") fmt.Println("Executes inside a k8s")
// executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID() // executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID()
executeInside(conf.GetConfig().ExecutionID, exec.ExecutionsID, argoFilePath) executeInside(conf.GetConfig().ExecutionID, conf.GetConfig().ExecutionID, argoFilePath)
} }
} }
@ -123,25 +124,23 @@ func executeInside(execID string, ns string, argo_file_path string) {
} }
name, err := t.CreateArgoWorkflow(argo_file_path, ns) name, err := t.CreateArgoWorkflow(argo_file_path, ns)
// _ = name _ = name
if err != nil { if err != nil {
logger.Error().Msg("Could not create argo workflow : " + err.Error()) logger.Error().Msg("Could not create argo workflow : " + err.Error())
logger.Info().Msg(fmt.Sprint("CA :" + conf.GetConfig().KubeCA)) fmt.Println("CA :" + conf.GetConfig().KubeCA)
logger.Info().Msg(fmt.Sprint("Cert :" + conf.GetConfig().KubeCert)) fmt.Println("Cert :" + conf.GetConfig().KubeCert)
logger.Info().Msg(fmt.Sprint("Data :" + conf.GetConfig().KubeData)) fmt.Println("Data :" + conf.GetConfig().KubeData)
return return
} else { } else {
watcher, err := t.GetArgoWatch(ns, workflowName) watcher, err := t.GetArgoWatch(execID, workflowName)
if err != nil { if err != nil {
logger.Error().Msg("Could not retrieve Watcher : " + err.Error()) logger.Error().Msg("Could not retrieve Watcher : " + err.Error())
} }
l.LogKubernetesArgo(name, ns, watcher) l.LogKubernetesArgo(name, execID, watcher)
if err != nil { if err != nil {
logger.Error().Msg("Could not log workflow : " + err.Error()) logger.Error().Msg("Could not log workflow : " + err.Error())
} }
logger.Info().Msg("Finished, exiting...")
} }
} }
@ -174,7 +173,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) {
go l.LogLocalWorkflow(workflowName, stdoutSubmit, &wg) go l.LogLocalWorkflow(workflowName, stdoutSubmit, &wg)
go l.LogLocalPod(workflowName, stdoutLogs, steps, &wg) go l.LogLocalPod(workflowName, stdoutLogs, steps, &wg)
logger.Info().Msg("Starting argo submit") fmt.Println("Starting argo submit")
if err := cmdSubmit.Start(); err != nil { if err := cmdSubmit.Start(); err != nil {
wf_logger.Error().Msg("Could not start argo submit") wf_logger.Error().Msg("Could not start argo submit")
wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text()) wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text())
@ -183,7 +182,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
logger.Info().Msg("Running argo logs") fmt.Println("Running argo logs")
if err := cmdLogs.Run(); err != nil { if err := cmdLogs.Run(); err != nil {
wf_logger.Error().Msg("Could not run '" + strings.Join(cmdLogs.Args, " ") + "'") wf_logger.Error().Msg("Could not run '" + strings.Join(cmdLogs.Args, " ") + "'")
@ -191,7 +190,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) {
} }
logger.Info().Msg("Waiting argo submit") fmt.Println("Waiting argo submit")
if err := cmdSubmit.Wait(); err != nil { if err := cmdSubmit.Wait(); err != nil {
wf_logger.Error().Msg("Could not execute argo submit") wf_logger.Error().Msg("Could not execute argo submit")
wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text()) wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text())
@ -232,7 +231,7 @@ func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) {
err := parser.Parse(os.Args) err := parser.Parse(os.Args)
if err != nil { if err != nil {
logger.Info().Msg(parser.Usage(err)) fmt.Println(parser.Usage(err))
os.Exit(1) os.Exit(1)
} }
conf.GetConfig().Logs = "debug" conf.GetConfig().Logs = "debug"

Binary file not shown.

View File

@ -83,8 +83,7 @@ func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, er
if err != nil { if err != nil {
return "", errors.New("failed to create workflow: " + err.Error()) return "", errors.New("failed to create workflow: " + err.Error())
} }
l := utils.GetLogger() fmt.Printf("workflow %s created in namespace %s\n", createdWf.Name, ns)
l.Info().Msg(fmt.Sprintf("workflow %s created in namespace %s\n", createdWf.Name, ns))
return createdWf.Name, nil return createdWf.Name, nil
} }
@ -116,12 +115,16 @@ func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password s
} }
func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch.Interface, error){ func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch.Interface, error){
wfl := utils.GetWFLogger("")
wfl.Debug().Msg("Starting argo watch with argo lib")
fmt.Println("metadata.name=oc-monitor-"+wfName + " in namespace : " + executionId)
options := metav1.ListOptions{FieldSelector: "metadata.name=oc-monitor-"+wfName} options := metav1.ListOptions{FieldSelector: "metadata.name=oc-monitor-"+wfName}
fmt.Println(options)
watcher, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(executionId).Watch(context.Background(), options) watcher, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(executionId).Watch(context.TODO(), options)
if err != nil { if err != nil {
return nil, errors.New("Error executing 'argo watch " + wfName + " -n " + executionId + " with ArgoprojV1alpha1 client") return nil, errors.New("Error executing 'argo watch " + wfName + " -n " + executionId + " with ArgoprojV1alpha1 client")
} }
return watcher, nil return watcher, nil
@ -130,18 +133,16 @@ func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch
func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) { func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) {
var targetPod v1.Pod var targetPod v1.Pod
pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{ pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{
LabelSelector: "workflows.argoproj.io/workflow="+wfName, LabelSelector: "workflows.argoproj.io/workflow="+wfName,
}) })
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to list pods: " + err.Error()) return nil, fmt.Errorf("failed to list pods: " + err.Error())
} }
if len(pods.Items) == 0 { if len(pods.Items) == 0 {
return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/node-name=" + nodeName)
return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/workflow="+ wfName + " no pods found with label workflows.argoproj.io/node-name=" + nodeName + " in namespace " + ns)
} }
for _, pod := range pods.Items { for _, pod := range pods.Items {
if pod.Annotations["workflows.argoproj.io/node-name"] == nodeName { if pod.Annotations["workflows.argoproj.io/node-name"] == nodeName {
targetPod = pod targetPod = pod
@ -171,8 +172,7 @@ func (k *KubernetesTools) testPodReady(pod v1.Pod, ns string) {
var initialized bool var initialized bool
for _, cond := range pod.Status.Conditions { for _, cond := range pod.Status.Conditions {
// It seems that for remote pods the pod gets the Succeeded status before it has time to display the it is ready to run in .status.conditions,so we added the OR condition if cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue {
if (cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue) || pod.Status.Phase == v1.PodSucceeded {
initialized = true initialized = true
return return
} }

View File

@ -17,13 +17,11 @@ var (
onceLogger sync.Once onceLogger sync.Once
onceWF sync.Once onceWF sync.Once
) )
func GetExecution(exec_id string) *workflow_execution.WorkflowExecution { func GetExecution(exec_id string) *workflow_execution.WorkflowExecution {
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", conf.GetConfig().PeerID, []string{}, nil).LoadOne(exec_id) res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", conf.GetConfig().PeerID, []string{}, nil).LoadOne(exec_id)
if res.Code != 200 { if res.Code != 200 {
logger := oclib.GetLogger() logger := oclib.GetLogger()
logger.Error().Msg("Error retrieving execution " + exec_id) logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id)
logger.Error().Msg(res.Err)
return nil return nil
} }
return res.ToWorkflowExecution() return res.ToWorkflowExecution()

View File

@ -4,7 +4,6 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"oc-monitord/utils"
"slices" "slices"
"time" "time"
@ -22,7 +21,7 @@ type AdmiraltySetter struct {
func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID string) error { func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID string) error {
logger := logs.GetLogger() logger = logs.GetLogger()
data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(remotePeerID) data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(remotePeerID)
if data.Code != 200 { if data.Code != 200 {
@ -31,42 +30,42 @@ func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID st
} }
remotePeer := data.ToPeer() remotePeer := data.ToPeer()
data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(localPeerID) data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(localPeerID)
if data.Code != 200 { if data.Code != 200 {
logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID) logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID)
return fmt.Errorf(data.Err) return fmt.Errorf(data.Err)
} }
localPeer := data.ToPeer() localPeer := data.ToPeer()
caller := tools.NewHTTPCaller( caller := tools.NewHTTPCaller(
map[tools.DataType]map[tools.METHOD]string{ map[tools.DataType]map[tools.METHOD]string{
tools.ADMIRALTY_SOURCE: { tools.ADMIRALTY_SOURCE: map[tools.METHOD]string{
tools.POST :"/:id", tools.POST :"/:id",
},
tools.ADMIRALTY_KUBECONFIG: {
tools.GET:"/:id",
},
tools.ADMIRALTY_SECRET: {
tools.POST:"/:id/" + remotePeerID,
},
tools.ADMIRALTY_TARGET: {
tools.POST:"/:id/" + remotePeerID,
},
tools.ADMIRALTY_NODES: {
tools.GET:"/:id/" + remotePeerID,
},
}, },
) tools.ADMIRALTY_KUBECONFIG: map[tools.METHOD]string{
tools.GET:"/:id",
logger.Info().Msg("\n\n Creating the Admiralty Source on " + remotePeerID + " ns-" + s.Id) },
tools.ADMIRALTY_SECRET: map[tools.METHOD]string{
tools.POST:"/:id",
},
tools.ADMIRALTY_TARGET: map[tools.METHOD]string{
tools.POST:"/:id",
},
tools.ADMIRALTY_NODES: map[tools.METHOD]string{
tools.GET:"/:id",
},
},
)
logger.Info().Msg(" Creating the Admiralty Source on " + remotePeerID + " ns-" + s.Id + "\n\n")
_ = s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict},caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true) _ = s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict},caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true)
logger.Info().Msg("\n\n Retrieving kubeconfig with the secret on " + remotePeerID + " ns-" + s.Id) logger.Info().Msg(" Retrieving kubeconfig with the secret on " + remotePeerID + " ns-" + s.Id + "\n\n")
kubeconfig := s.getKubeconfig(remotePeer, caller) kubeconfig := s.getKubeconfig(remotePeer, caller)
logger.Info().Msg("\n\n Creating a secret from the kubeconfig " + localPeerID + " ns-" + s.Id) logger.Info().Msg(" Creating a secret from the kubeconfig " + localPeerID + " ns-" + s.Id + "\n\n")
_ = s.callRemoteExecution(localPeer, []int{http.StatusCreated}, caller,s.Id, tools.ADMIRALTY_SECRET, tools.POST,kubeconfig, true) _ = s.callRemoteExecution(localPeer, []int{http.StatusCreated}, caller,s.Id, tools.ADMIRALTY_SECRET, tools.POST,kubeconfig, true)
logger.Info().Msg("\n\n Creating the Admiralty Target on " + localPeerID + " in namespace " + s.Id ) logger.Info().Msg(" Creating the Admiralty Target on " + localPeerID + " ns-" + s.Id + "\n\n")
_ = s.callRemoteExecution(localPeer,[]int{http.StatusCreated, http.StatusConflict},caller,s.Id,tools.ADMIRALTY_TARGET,tools.POST, nil, true) _ = s.callRemoteExecution(localPeer,[]int{http.StatusCreated, http.StatusConflict},caller,s.Id,tools.ADMIRALTY_TARGET,tools.POST, nil, true)
logger.Info().Msg("\n\n Checking for the creation of the admiralty node on " + localPeerID + " ns-" + s.Id) logger.Info().Msg(" Checking for the creation of the admiralty node on " + localPeerID + " ns-" + s.Id + "\n\n")
s.checkNodeStatus(localPeer,caller) s.checkNodeStatus(localPeer,caller)
return nil return nil
@ -76,14 +75,12 @@ func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCalle
var kubedata map[string]string var kubedata map[string]string
_ = s.callRemoteExecution(peer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_KUBECONFIG, tools.GET, nil, true) _ = s.callRemoteExecution(peer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_KUBECONFIG, tools.GET, nil, true)
if caller.LastResults["body"] == nil || len(caller.LastResults["body"].([]byte)) == 0 { if caller.LastResults["body"] == nil || len(caller.LastResults["body"].([]byte)) == 0 {
l := utils.GetLogger() fmt.Println("Something went wrong when retrieving data from Get call for kubeconfig")
l.Error().Msg("Something went wrong when retrieving data from Get call for kubeconfig")
panic(0) panic(0)
} }
err := json.Unmarshal(caller.LastResults["body"].([]byte), &kubedata) err := json.Unmarshal(caller.LastResults["body"].([]byte), &kubedata)
if err != nil { if err != nil {
l := utils.GetLogger() fmt.Println("Something went wrong when unmarshalling data from Get call for kubeconfig")
l.Error().Msg("Something went wrong when unmarshalling data from Get call for kubeconfig")
panic(0) panic(0)
} }
@ -91,18 +88,18 @@ func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCalle
} }
func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int,caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) *peer.PeerExecution { func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int,caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) *peer.PeerExecution {
l := utils.GetLogger()
resp, err := peer.LaunchPeerExecution(peer.UUID, dataID, dt, method, body, caller) resp, err := peer.LaunchPeerExecution(peer.UUID, dataID, dt, method, body, caller)
if err != nil { if err != nil {
l.Error().Msg("Error when executing on peer at" + peer.Url) fmt.Println("Error when executing on peer at", peer.Url)
l.Error().Msg(err.Error()) fmt.Println(err)
panic(0) panic(0)
} }
if !slices.Contains(expectedCode, caller.LastResults["code"].(int)) { if !slices.Contains(expectedCode, caller.LastResults["code"].(int)) {
l.Error().Msg(fmt.Sprint("Didn't receive the expected code :", caller.LastResults["code"], "when expecting", expectedCode)) fmt.Println("Didn't receive the expected code :", caller.LastResults["code"], "when expecting", expectedCode)
if _, ok := caller.LastResults["body"]; ok { if _, ok := caller.LastResults["body"]; ok {
l.Info().Msg(string(caller.LastResults["body"].([]byte))) logger.Info().Msg(string(caller.LastResults["body"].([]byte)))
// fmt.Println(string(caller.LastResults["body"].([]byte)))
} }
if panicCode { if panicCode {
panic(0) panic(0)
@ -123,15 +120,14 @@ func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller){
name := metadata.(map[string]interface{})["name"].(string) name := metadata.(map[string]interface{})["name"].(string)
s.NodeName = name s.NodeName = name
} else { } else {
l := utils.GetLogger() fmt.Println("Could not retrieve data about the recently created node")
l.Error().Msg("Could not retrieve data about the recently created node")
panic(0) panic(0)
} }
} }
func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller){ func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller){
for i := range(5) { for i := range(5) {
time.Sleep(10 * time.Second) // let some time for kube to generate the node time.Sleep(5 * time.Second) // let some time for kube to generate the node
_ = s.callRemoteExecution(localPeer,[]int{http.StatusOK},caller,s.Id,tools.ADMIRALTY_NODES,tools.GET, nil, false) _ = s.callRemoteExecution(localPeer,[]int{http.StatusOK},caller,s.Id,tools.ADMIRALTY_NODES,tools.GET, nil, false)
if caller.LastResults["code"] == 200 { if caller.LastResults["code"] == 200 {
s.storeNodeName(caller) s.storeNodeName(caller)

View File

@ -26,11 +26,11 @@ import (
var logger zerolog.Logger var logger zerolog.Logger
type ArgoBuilder struct { type ArgoBuilder struct {
OriginWorkflow *w.Workflow OriginWorkflow *w.Workflow
Workflow Workflow Workflow Workflow
Services []*Service Services []*Service
Timeout int Timeout int
RemotePeers []string RemotePeers []string
} }
type Workflow struct { type Workflow struct {
@ -65,7 +65,7 @@ type Spec struct {
// add s3, gcs, azure, etc if needed on a link between processing and storage // add s3, gcs, azure, etc if needed on a link between processing and storage
func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, []string, error) { func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, []string, error) {
logger = logs.GetLogger() logger = logs.GetLogger()
logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items)) fmt.Println("Creating DAG", b.OriginWorkflow.Graph.Items)
// handle services by checking if there is only one processing with hostname and port // handle services by checking if there is only one processing with hostname and port
firstItems, lastItems, volumes := b.createTemplates(namespace) firstItems, lastItems, volumes := b.createTemplates(namespace)
b.createVolumes(volumes) b.createVolumes(volumes)
@ -90,10 +90,10 @@ func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []V
firstItems := []string{} firstItems := []string{}
lastItems := []string{} lastItems := []string{}
items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing)
logger.Info().Msg(fmt.Sprint("Creating templates", len(items))) fmt.Println("Creating templates", len(items))
for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) { for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) {
instance := item.Processing.GetSelectedInstance() instance := item.Processing.GetSelectedInstance()
logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance)) fmt.Println("Creating template for", item.Processing.GetName(), instance)
if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil { if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil {
logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName()) logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName())
return firstItems, lastItems, volumes return firstItems, lastItems, volumes
@ -176,12 +176,10 @@ func (b *ArgoBuilder) createArgoTemplates(namespace string,
lastItems []string) ([]VolumeMount, []string, []string) { lastItems []string) ([]VolumeMount, []string, []string) {
_, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems) _, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems)
template := &Template{Name: getArgoName(processing.GetName(), id)} template := &Template{Name: getArgoName(processing.GetName(), id)}
logger.Info().Msg(fmt.Sprint("Creating template for", template.Name)) fmt.Println("Creating template for", template.Name)
isReparted, peerId := b.isProcessingReparted(*processing, id) isReparted, peerId := b.isProcessingReparted(*processing,id)
template.CreateContainer(processing, b.Workflow.getDag()) template.CreateContainer(processing, b.Workflow.getDag())
if isReparted { if isReparted {
logger.Debug().Msg("Reparted processing, on " + peerId)
b.RemotePeers = append(b.RemotePeers, peerId) b.RemotePeers = append(b.RemotePeers, peerId)
template.AddAdmiraltyAnnotations(peerId) template.AddAdmiraltyAnnotations(peerId)
} }
@ -327,7 +325,7 @@ func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) {
isDeps := false isDeps := false
for _, link := range b.OriginWorkflow.Graph.Links { for _, link := range b.OriginWorkflow.Graph.Links {
if _, ok := b.OriginWorkflow.Graph.Items[link.Destination.ID]; !ok { if _, ok := b.OriginWorkflow.Graph.Items[link.Destination.ID]; !ok {
logger.Info().Msg(fmt.Sprint("Could not find the source of the link", link.Destination.ID)) fmt.Println("Could not find the source of the link", link.Destination.ID)
continue continue
} }
source := b.OriginWorkflow.Graph.Items[link.Destination.ID].Processing source := b.OriginWorkflow.Graph.Items[link.Destination.ID].Processing
@ -347,7 +345,7 @@ func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) {
func (b *ArgoBuilder) getArgoDependencies(id string) (dependencies []string) { func (b *ArgoBuilder) getArgoDependencies(id string) (dependencies []string) {
for _, link := range b.OriginWorkflow.Graph.Links { for _, link := range b.OriginWorkflow.Graph.Links {
if _, ok := b.OriginWorkflow.Graph.Items[link.Source.ID]; !ok { if _, ok := b.OriginWorkflow.Graph.Items[link.Source.ID]; !ok {
logger.Info().Msg(fmt.Sprint("Could not find the source of the link", link.Source.ID)) fmt.Println("Could not find the source of the link", link.Source.ID)
continue continue
} }
source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing
@ -369,19 +367,20 @@ func getArgoName(raw_name string, component_id string) (formatedName string) {
// Verify if a processing resource is attached to another Compute than the one hosting // Verify if a processing resource is attached to another Compute than the one hosting
// the current Open Cloud instance. If true return the peer ID to contact // the current Open Cloud instance. If true return the peer ID to contact
func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource, graphID string) (bool, string) { func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource, graphID string) (bool,string) {
computeAttached := b.retrieveProcessingCompute(graphID) computeAttached := b.retrieveProcessingCompute(graphID)
if computeAttached == nil { if computeAttached == nil {
logger.Error().Msg("No compute was found attached to processing " + processing.Name + " : " + processing.UUID) logger.Error().Msg("No compute was found attached to processing " + processing.Name + " : " + processing.UUID )
panic(0) panic(0)
} }
// Creates an accessor srtictly for Peer Collection
req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil) // Creates an accessor srtictly for Peer Collection
req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"","",nil,nil)
if req == nil { if req == nil {
fmt.Println("TODO : handle error when trying to create a request on the Peer Collection") fmt.Println("TODO : handle error when trying to create a request on the Peer Collection")
return false, "" return false, ""
} }
res := req.LoadOne(computeAttached.CreatorID) res := req.LoadOne(computeAttached.CreatorID)
if res.Err != "" { if res.Err != "" {
@ -389,25 +388,25 @@ func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResour
fmt.Print(res.Err) fmt.Print(res.Err)
return false, "" return false, ""
} }
peer := *res.ToPeer() peer := *res.ToPeer()
isNotReparted := peer.State == 1 isNotReparted, _ := peer.IsMySelf()
logger.Info().Msg(fmt.Sprint("Result IsMySelf for ", peer.UUID, " : ", isNotReparted)) fmt.Println("Result IsMySelf for ", peer.UUID ," : ", isNotReparted)
return !isNotReparted, peer.UUID return !isNotReparted, peer.UUID
} }
func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.ComputeResource { func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.ComputeResource {
for _, link := range b.OriginWorkflow.Graph.Links { for _, link := range b.OriginWorkflow.Graph.Links {
// If a link contains the id of the processing // If a link contains the id of the processing
var oppositeId string var oppositeId string
if link.Source.ID == graphID { if link.Source.ID == graphID{
oppositeId = link.Destination.ID oppositeId = link.Destination.ID
} else if link.Destination.ID == graphID { } else if(link.Destination.ID == graphID){
oppositeId = link.Source.ID oppositeId = link.Source.ID
} }
fmt.Println("OppositeId : ", oppositeId)
if oppositeId != "" { if oppositeId != "" {
dt, res := b.OriginWorkflow.Graph.GetResource(oppositeId) dt, res := b.OriginWorkflow.Graph.GetResource(oppositeId)
if dt == oclib.COMPUTE_RESOURCE { if dt == oclib.COMPUTE_RESOURCE {
@ -418,25 +417,27 @@ func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.Compu
} }
} }
return nil return nil
} }
// Execute the last actions once the YAML file for the Argo Workflow is created // Execute the last actions once the YAML file for the Argo Workflow is created
func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) {
logger.Info().Msg(fmt.Sprint("DEV :: Completing build")) fmt.Println("DEV :: Completing build")
setter := AdmiraltySetter{Id: executionsId} setter := AdmiraltySetter{Id: executionsId}
// Setup admiralty for each node // Setup admiralty for each node
for _, peer := range b.RemotePeers { for _, peer := range b.RemotePeers {
logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer)) fmt.Println("DEV :: Launching Admiralty Setup for ", peer)
setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer) setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer)
} }
// Update the name of the admiralty node to use // Update the name of the admiralty node to use
for _, template := range b.Workflow.Spec.Templates { for _, template := range b.Workflow.Spec.Templates {
if len(template.Metadata.Annotations) > 0 { if len(template.Metadata.Annotations) > 0 {
if peerId, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok { if resp, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok {
template.Metadata.Annotations["multicluster.admiralty.io/clustername"] = "target-" + peerId + "-" + conf.GetConfig().ExecutionID fmt.Println(resp)
template.Metadata.Annotations["multicluster.admiralty.io/clustername"] = "target-" + conf.GetConfig().ExecutionID
} }
} }
} }
@ -462,4 +463,4 @@ func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) {
} }
return workflows_dir + file_name, nil return workflows_dir + file_name, nil
} }

View File

@ -14,7 +14,7 @@ type WorflowDB struct {
// Create the obj!ects from the mxgraphxml stored in the workflow given as a parameter // Create the obj!ects from the mxgraphxml stored in the workflow given as a parameter
func (w *WorflowDB) LoadFrom(workflow_id string, peerID string) error { func (w *WorflowDB) LoadFrom(workflow_id string, peerID string) error {
logger.Info().Msg("Loading workflow from " + workflow_id) fmt.Println("Loading workflow from " + workflow_id)
var err error var err error
if w.Workflow, err = w.getWorkflow(workflow_id, peerID); err != nil { if w.Workflow, err = w.getWorkflow(workflow_id, peerID); err != nil {
return err return err
@ -27,7 +27,7 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo
logger := oclib.GetLogger() logger := oclib.GetLogger()
lib_data := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerID, []string{}, nil).LoadOne(workflow_id) lib_data := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerID, []string{}, nil).LoadOne(workflow_id)
logger.Info().Msg(fmt.Sprint("ERR", lib_data.Code, lib_data.Err)) fmt.Println("ERR", lib_data.Code, lib_data.Err)
if lib_data.Code != 200 { if lib_data.Code != 200 {
logger.Error().Msg("Error loading the graph") logger.Error().Msg("Error loading the graph")
return workflow, errors.New(lib_data.Err) return workflow, errors.New(lib_data.Err)
@ -43,7 +43,7 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo
func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, int, error) { func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, int, error) {
logger := oclib.GetLogger() logger := oclib.GetLogger()
logger.Info().Msg(fmt.Sprint("Exporting to Argo", w.Workflow)) fmt.Println("Exporting to Argo", w.Workflow)
if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil { if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil {
return nil, 0, fmt.Errorf("can't export a graph that has not been loaded yet") return nil, 0, fmt.Errorf("can't export a graph that has not been loaded yet")
} }