Compare commits
66 Commits
feature/na
...
feature/mu
| Author | SHA1 | Date | |
|---|---|---|---|
| 04ab15cb09 | |||
| 2b002152a4 | |||
| 7fa115c5e1 | |||
| 91f421af1e | |||
|
|
24bbe81638 | ||
|
|
7c913bec0e | ||
|
|
bdbbd7697a | ||
|
|
6917295fbd | ||
|
|
e1b0ad089c | ||
|
|
483f747754 | ||
|
|
03675d09ae | ||
|
|
f3e84a4f43 | ||
|
|
eae5474552 | ||
|
|
bae9cb2011 | ||
|
|
65b8960703 | ||
|
|
90aa19caeb | ||
|
|
dcb3e2b7cc | ||
|
|
c871d68333 | ||
|
|
6cf5da787a | ||
|
|
fa4db92c92 | ||
|
|
ee94c1aa42 | ||
|
|
c40b18f1d6 | ||
|
|
2932fb2710 | ||
|
|
2343a5329e | ||
|
|
86fa41a376 | ||
|
|
6ec7a670bd | ||
| 6323d4eed4 | |||
| 93f3806b86 | |||
|
|
b3524ccfad | ||
|
|
e2d1746396 | ||
|
|
5f70feab59 | ||
|
|
fb8d994be3 | ||
|
|
6f7acee2df | ||
|
|
31580f1905 | ||
|
|
04d6001fec | ||
|
|
e2ceb6e58d | ||
|
|
cd804fbeb5 | ||
|
|
9aefa18ea8 | ||
|
|
27fd603e36 | ||
|
|
c31184e2ec | ||
|
|
5d8143c93e | ||
|
|
77a9b0770e | ||
|
|
9a17623cab | ||
|
|
4963284056 | ||
|
|
df09585cc9 | ||
|
|
aa20edaf25 | ||
|
|
ade18f1042 | ||
| 42ee6abcb6 | |||
| 08ade1af66 | |||
| 83d118fb05 | |||
| f7f0c9c2d2 | |||
| 73e1747c91 | |||
| 32ce70da6e | |||
| aea7cbd41c | |||
| def56e5822 | |||
| 37c561c5fe | |||
| 3f533a1bfb | |||
| 3fa2cd3336 | |||
| da9a7d3a49 | |||
| 788a3174ea | |||
| 47363566b2 | |||
| 697d7a7145 | |||
| e4874697bc | |||
| 80e81820a4 | |||
| 4e06971668 | |||
| 4c51de03bc |
31
Dockerfile
31
Dockerfile
@@ -1,23 +1,28 @@
|
||||
FROM golang:alpine AS builder
|
||||
FROM golang:alpine AS deps
|
||||
|
||||
WORKDIR /app
|
||||
COPY go.mod go.sum ./
|
||||
RUN sed -i '/replace/d' go.mod
|
||||
RUN go mod download -x
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
|
||||
FROM golang:alpine AS builder
|
||||
LABEL maintainer="IRT PFN"
|
||||
ENV DOCKER_ENVIRONMENT=true
|
||||
WORKDIR /app
|
||||
|
||||
COPY --from=deps /go/pkg /go/pkg
|
||||
COPY --from=deps /app/go.mod /app/go.sum ./
|
||||
|
||||
COPY . .
|
||||
|
||||
RUN apk add git
|
||||
|
||||
RUN go get github.com/beego/bee/v2 && go install github.com/beego/bee/v2@master
|
||||
|
||||
RUN timeout 15 bee run -gendoc=true -downdoc=true -runmode=dev || :
|
||||
|
||||
RUN sed -i 's/http:\/\/127.0.0.1:8080\/swagger\/swagger.json/swagger.json/g' swagger/index.html
|
||||
|
||||
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-w -s" .
|
||||
|
||||
RUN ls /app
|
||||
RUN go build .
|
||||
|
||||
FROM scratch
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY --from=builder /app/oc-monitord .
|
||||
COPY --from=builder /app/oc-monitord .
|
||||
|
||||
ENTRYPOINT ["./oc-monitord"]
|
||||
30
Makefile
Normal file
30
Makefile
Normal file
@@ -0,0 +1,30 @@
|
||||
.DEFAULT_GOAL := all
|
||||
|
||||
build: clean
|
||||
go build .
|
||||
|
||||
run:
|
||||
./oc-monitord
|
||||
|
||||
clean:
|
||||
rm -rf oc-monitord
|
||||
|
||||
docker:
|
||||
DOCKER_BUILDKIT=1 docker build -t oc/oc-monitord:0.0.1 -f Dockerfile . --build-arg=HOST=$(HOST)
|
||||
docker tag oc/oc-monitord:0.0.1 oc/oc-monitord:latest
|
||||
docker tag oc/oc-monitord:0.0.1 oc-monitord:latest
|
||||
|
||||
publish-kind:
|
||||
kind load docker-image oc/oc-monitord:0.0.1 --name opencloud
|
||||
|
||||
publish-registry:
|
||||
@echo "TODO"
|
||||
|
||||
docker-deploy:
|
||||
docker compose up -d
|
||||
|
||||
run-docker: docker publish-kind publish-registry docker-deploy
|
||||
|
||||
all: docker publish-kind publish-registry
|
||||
|
||||
.PHONY: build run clean docker publish-kind publish-registry
|
||||
75
README.md
75
README.md
@@ -1,64 +1,47 @@
|
||||
# oc-monitor
|
||||
|
||||
## Deploy in k8s (dev)
|
||||
DO :
|
||||
make build
|
||||
|
||||
While a registry with all of the OC docker images has not been set-up we can export this image to k3s ctr
|
||||
## Summary
|
||||
|
||||
> docker save oc-monitord:latest | sudo k3s ctr images import -
|
||||
oc-monitord is a daemon which can be run :
|
||||
- as a binary
|
||||
- as a container
|
||||
|
||||
Then in the pod manifest for oc-monitord use :
|
||||
It is used to perform several actions regarding the execution of an Open Cloud workflow :
|
||||
- generating a YAML file that can be interpreted by **Argo Workflow** to create and execute pods in a kubernetes environment
|
||||
- setting up the different resources needed to execute a workflow over several peers/kubernetes nodes with **Admiralty** : token, secrets, targets and sources
|
||||
- creating the workflow and logging the output from
|
||||
- Argo watch, which gives informations about the workflow in general (phase, number of steps executed, status...)
|
||||
- Pods : which are the logs generated by the pods
|
||||
|
||||
```
|
||||
image: docker.io/library/oc-monitord
|
||||
imagePullPolicy: Never
|
||||
```
|
||||
To execute, the daemon needs several options :
|
||||
- **-u** :
|
||||
- **-m** :
|
||||
- **-d** :
|
||||
- **-e** :
|
||||
|
||||
Not doing so will end up in the pod having a `ErrorImagePull`
|
||||
# Notes features/admiralty-docker
|
||||
|
||||
## Allow argo to create services
|
||||
- When executing monitord as a container we need to change any url with "localhost" to the container's host IP.
|
||||
|
||||
In order for monitord to expose **open cloud services** on the node, we need to give him permission to create **k8s services**.
|
||||
|
||||
For that we can update the RBAC configuration for a role already created by argo :
|
||||
|
||||
### Manually edit the rbac authorization
|
||||
|
||||
> kubectl edit roles.rbac.authorization.k8s.io -n argo argo-role
|
||||
|
||||
In rules add a new entry :
|
||||
|
||||
```
|
||||
- apiGroups:
|
||||
- ""
|
||||
resources:
|
||||
- services
|
||||
verbs:
|
||||
- get
|
||||
- create
|
||||
```
|
||||
|
||||
### Patch the rbac authorization with a one liner
|
||||
|
||||
> kubectl patch role argo-role -n argo --type='json' -p='[{"op": "add", "path": "/rules/-", "value": {"apiGroups": [""], "resources": ["services"], "verbs": ["get","create"]}}]'
|
||||
|
||||
### Check wether the modification is effective
|
||||
|
||||
> kubectl auth can-i create services --as=system:serviceaccount:argo:argo -n argo
|
||||
|
||||
This command **must return "yes"**
|
||||
We can :
|
||||
- declare a new parameter 'HOST_IP'
|
||||
- decide that no peer can have "http://localhost" as its url and use an attribute from the peer object or isMyself() from oc-lib if a peer is the current host.
|
||||
|
||||
|
||||
## TODO
|
||||
## TODO
|
||||
|
||||
- [ ] Logs the output of each pods :
|
||||
- logsPods() function already exists
|
||||
- need to implement the logic to create each pod's logger and start the monitoring routing
|
||||
- [ ] Allow the front to known on which IP the service are reachable
|
||||
- [ ] Allow the front to known on which IP the service are reachable
|
||||
- currently doing it by using `kubectl get nodes -o wide`
|
||||
|
||||
- [ ] Implement writing and reading from S3 bucket/MinIO when a data resource is linked to a compute resource.
|
||||
|
||||
### Adding ingress handling to support reverse proxing
|
||||
|
||||
### Adding ingress handling to support reverse proxing
|
||||
|
||||
- Test wether ingress-nginx is running or not
|
||||
- Do something if not found : stop running and send error log OR start installation
|
||||
-
|
||||
-
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ type Config struct {
|
||||
NatsURL string
|
||||
ExecutionID string
|
||||
PeerID string
|
||||
Groups []string
|
||||
Timeout int
|
||||
WorkflowID string
|
||||
Logs string
|
||||
@@ -18,6 +19,7 @@ type Config struct {
|
||||
KubeCA string
|
||||
KubeCert string
|
||||
KubeData string
|
||||
ArgoHost string // when executed in a container will replace addresses with "localhost" in their url
|
||||
}
|
||||
|
||||
var instance *Config
|
||||
|
||||
@@ -1,8 +1,5 @@
|
||||
{
|
||||
"MONGO_URL":"mongodb://mongo:27017/",
|
||||
"NATS_URL":"nats://nats:4222",
|
||||
"MONGO_DATABASE":"DC_myDC",
|
||||
"KUBERNETES_SERVICE_HOST" : "193.50.43.14",
|
||||
"KUBE_CA" : "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJWUxWNkFPQkdrU1F3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOekl6TVRFeU1ETTJNQjRYRFRJME1EZ3dPREV3TVRNMU5sb1hEVEkxTURndwpPREV3TVRNMU5sb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJGQ2Q1MFdPeWdlQ2syQzcKV2FrOWY4MVAvSkJieVRIajRWOXBsTEo0ck5HeHFtSjJOb2xROFYxdUx5RjBtOTQ2Nkc0RmRDQ2dqaXFVSk92Swp3NVRPNnd5alNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFJkOFI5cXVWK2pjeUVmL0ovT1hQSzMyS09XekFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQTArbThqTDBJVldvUTZ0dnB4cFo4NVlMalF1SmpwdXM0aDdnSXRxS3NmUVVDSUI2M2ZNdzFBMm5OVWU1TgpIUGZOcEQwSEtwcVN0Wnk4djIyVzliYlJUNklZCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFRc3hXWk9pbnIrcVp4TmFEQjVGMGsvTDF5cE01VHAxOFRaeU92ektJazQKRTFsZWVqUm9STW0zNmhPeVljbnN3d3JoNnhSUnBpMW5RdGhyMzg0S0Z6MlBvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBYZkVmYXJsZm8zTWhIL3lmemx6Cnl0OWlqbHN3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUxJL2dNYnNMT3MvUUpJa3U2WHVpRVMwTEE2cEJHMXgKcnBlTnpGdlZOekZsQWlFQW1wdjBubjZqN3M0MVI0QzFNMEpSL0djNE53MHdldlFmZWdEVGF1R2p3cFk9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K",
|
||||
"KUBE_TOKEN":"LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSU5ZS1BFb1dhd1NKUzJlRW5oWmlYMk5VZlY1ZlhKV2krSVNnV09TNFE5VTlvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFVUozblJZN0tCNEtUWUx0WnFUMS96VS84a0Z2Sk1lUGhYMm1Vc25pczBiR3FZblkyaVZEeApYVzR2SVhTYjNqcm9iZ1YwSUtDT0twUWs2OHJEbE03ckRBPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo="
|
||||
"MONGO_DATABASE":"DC_myDC"
|
||||
}
|
||||
BIN
docs/admiralty_naming_multi_peer.jpg
Normal file
BIN
docs/admiralty_naming_multi_peer.jpg
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 71 KiB |
BIN
docs/admiralty_setup_schema.jpg
Normal file
BIN
docs/admiralty_setup_schema.jpg
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 91 KiB |
4
env.env
Normal file
4
env.env
Normal file
@@ -0,0 +1,4 @@
|
||||
KUBERNETES_SERVICE_HOST=192.168.1.169
|
||||
KUBE_CA="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFTVlk3ZHZhNEdYTVdkMy9jMlhLN3JLYjlnWXgyNSthaEE0NmkyNVBkSFAKRktQL2UxSVMyWVF0dzNYZW1TTUQxaStZdzJSaVppNUQrSVZUamNtNHdhcnFvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVWtlUVJpNFJiODduME5yRnZaWjZHClc2SU55NnN3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUlnRXA5ck04WmdNclRZSHYxZjNzOW5DZXZZeWVVa3lZUk4KWjUzazdoaytJS1FDSVFDbk05TnVGKzlTakIzNDFacGZ5ays2NEpWdkpSM3BhcmVaejdMd2lhNm9kdz09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K"
|
||||
KUBE_CERT="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJWUxWNkFPQkdrU1F3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOekl6TVRFeU1ETTJNQjRYRFRJME1EZ3dPREV3TVRNMU5sb1hEVEkxTURndwpPREV3TVRNMU5sb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJGQ2Q1MFdPeWdlQ2syQzcKV2FrOWY4MVAvSkJieVRIajRWOXBsTEo0ck5HeHFtSjJOb2xROFYxdUx5RjBtOTQ2Nkc0RmRDQ2dqaXFVSk92Swp3NVRPNnd5alNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFJkOFI5cXVWK2pjeUVmL0ovT1hQSzMyS09XekFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQTArbThqTDBJVldvUTZ0dnB4cFo4NVlMalF1SmpwdXM0aDdnSXRxS3NmUVVDSUI2M2ZNdzFBMm5OVWU1TgpIUGZOcEQwSEtwcVN0Wnk4djIyVzliYlJUNklZCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFRc3hXWk9pbnIrcVp4TmFEQjVGMGsvTDF5cE01VHAxOFRaeU92ektJazQKRTFsZWVqUm9STW0zNmhPeVljbnN3d3JoNnhSUnBpMW5RdGhyMzg0S0Z6MlBvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBYZkVmYXJsZm8zTWhIL3lmemx6Cnl0OWlqbHN3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUxJL2dNYnNMT3MvUUpJa3U2WHVpRVMwTEE2cEJHMXgKcnBlTnpGdlZOekZsQWlFQW1wdjBubjZqN3M0MVI0QzFNMEpSL0djNE53MHdldlFmZWdEVGF1R2p3cFk9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K"
|
||||
KUBE_DATA="LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSU5ZS1BFb1dhd1NKUzJlRW5oWmlYMk5VZlY1ZlhKV2krSVNnV09TNFE5VTlvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFVUozblJZN0tCNEtUWUx0WnFUMS96VS84a0Z2Sk1lUGhYMm1Vc25pczBiR3FZblkyaVZEeApYVzR2SVhTYjNqcm9iZ1YwSUtDT0twUWs2OHJEbE03ckRBPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo="
|
||||
59
go.mod
59
go.mod
@@ -5,20 +5,22 @@ go 1.23.1
|
||||
toolchain go1.23.3
|
||||
|
||||
require (
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250213085018-271cc2caa026
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250624102227-e600fedcab06
|
||||
github.com/akamensky/argparse v1.4.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/goraz/onion v0.1.3
|
||||
github.com/nwtgck/go-fakelish v0.1.3
|
||||
github.com/rs/zerolog v1.33.0
|
||||
github.com/rs/zerolog v1.34.0
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/beego/beego/v2 v2.3.1 // indirect
|
||||
github.com/go-playground/validator/v10 v10.22.0 // indirect
|
||||
github.com/beego/beego/v2 v2.3.7 // indirect
|
||||
github.com/go-playground/validator/v10 v10.26.0 // indirect
|
||||
github.com/golang/protobuf v1.5.4 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
|
||||
github.com/sirupsen/logrus v1.9.3 // indirect
|
||||
github.com/ugorji/go/codec v1.1.7 // indirect
|
||||
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect
|
||||
@@ -26,15 +28,14 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d
|
||||
github.com/argoproj/argo-workflows/v3 v3.6.4
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/biter777/countries v1.7.5 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
|
||||
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.5 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
|
||||
github.com/go-logr/logr v1.4.2 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.21.0 // indirect
|
||||
github.com/go-openapi/jsonreference v0.20.4 // indirect
|
||||
@@ -42,51 +43,49 @@ require (
|
||||
github.com/go-playground/locales v0.14.1 // indirect
|
||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/golang/snappy v1.0.0 // indirect
|
||||
github.com/google/gnostic-models v0.6.8 // indirect
|
||||
github.com/google/go-cmp v0.6.0 // indirect
|
||||
github.com/google/go-cmp v0.7.0 // indirect
|
||||
github.com/google/gofuzz v1.2.0 // indirect
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
||||
github.com/hashicorp/golang-lru v1.0.2 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/compress v1.17.10 // indirect
|
||||
github.com/klauspost/compress v1.18.0 // indirect
|
||||
github.com/leodido/go-urn v1.4.0 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-colorable v0.1.14 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/montanaflynn/stats v0.7.1 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/nats-io/nats.go v1.37.0 // indirect
|
||||
github.com/nats-io/nkeys v0.4.7 // indirect
|
||||
github.com/nats-io/nats.go v1.41.0 // indirect
|
||||
github.com/nats-io/nkeys v0.4.10 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.19.0 // indirect
|
||||
github.com/prometheus/client_model v0.6.0 // indirect
|
||||
github.com/prometheus/common v0.48.0 // indirect
|
||||
github.com/prometheus/procfs v0.12.0 // indirect
|
||||
github.com/prometheus/client_golang v1.22.0 // indirect
|
||||
github.com/prometheus/client_model v0.6.1 // indirect
|
||||
github.com/prometheus/common v0.63.0 // indirect
|
||||
github.com/prometheus/procfs v0.16.0 // indirect
|
||||
github.com/robfig/cron v1.2.0 // indirect
|
||||
github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18 // indirect
|
||||
github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 // indirect
|
||||
github.com/smartystreets/goconvey v1.6.4 // indirect
|
||||
github.com/ugorji/go/codec v1.1.7 // indirect
|
||||
github.com/x448/float16 v0.8.4 // indirect
|
||||
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
|
||||
github.com/xdg-go/scram v1.1.2 // indirect
|
||||
github.com/xdg-go/stringprep v1.0.4 // indirect
|
||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
|
||||
go.mongodb.org/mongo-driver v1.17.1 // indirect
|
||||
golang.org/x/crypto v0.31.0 // indirect
|
||||
golang.org/x/net v0.33.0 // indirect
|
||||
golang.org/x/oauth2 v0.23.0 // indirect
|
||||
golang.org/x/sync v0.10.0 // indirect
|
||||
golang.org/x/sys v0.28.0 // indirect
|
||||
golang.org/x/term v0.27.0 // indirect
|
||||
golang.org/x/text v0.21.0 // indirect
|
||||
go.mongodb.org/mongo-driver v1.17.3 // indirect
|
||||
golang.org/x/crypto v0.37.0 // indirect
|
||||
golang.org/x/net v0.39.0 // indirect
|
||||
golang.org/x/oauth2 v0.25.0 // indirect
|
||||
golang.org/x/sync v0.13.0 // indirect
|
||||
golang.org/x/sys v0.32.0 // indirect
|
||||
golang.org/x/term v0.31.0 // indirect
|
||||
golang.org/x/text v0.24.0 // indirect
|
||||
golang.org/x/time v0.7.0 // indirect
|
||||
google.golang.org/protobuf v1.35.1 // indirect
|
||||
google.golang.org/protobuf v1.36.6 // indirect
|
||||
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
k8s.io/api v0.32.1
|
||||
|
||||
139
go.sum
139
go.sum
@@ -1,31 +1,43 @@
|
||||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250211143301-a098b3797a0f h1:irUdoi0U19KJaG8pn6ahJgD3hJAGYpzNpLonAGdqzIc=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250211143301-a098b3797a0f/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250212150815-c7c1535ba91a h1:kfTSMCOxYiVGNJWD4OrV7YYTf6t4geKxWpGz4EucpEA=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250212150815-c7c1535ba91a/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250213072626-4920322d0afb h1:EybP8jPpIiN5RLiBxr3cvvF9KIaC+uWvzM23ga0t1yI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250213072626-4920322d0afb/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250213085018-271cc2caa026 h1:CYwpofGfpAhMDrT6jqvu9NI/tcgxCD8PKJZDKEfTvVI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250213085018-271cc2caa026/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250217072519-cafadec1469f h1:esLB0EAn8IuOChW35kcBrPaN80z4A4yYyz1mXT45GQo=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250217072519-cafadec1469f/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9 h1:mSFFPwil5Ih+RPBvn88MBerQMtsoHnOuyCZQaf91a34=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963 h1:ADDfqwtWF+VQTMSNAWPuhc4mmiKdgpHNmBB+UI2jRPE=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250617130633-8f2adb76e41c h1:k2y+ocElqwUK5yzyCf3rWrDUzPWbds4MbtG58+Szos0=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250617130633-8f2adb76e41c/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250617133502-9e5266326157 h1:853UvpMOM1QuWLrr/V8biDS8IcQcqHvoJsOT4epxDng=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250617133502-9e5266326157/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250617141444-0b0952b28c7e h1:Z5vLv+Wzzz58abmHRnovoqbkVlKHuC8u8/RLv7FjtZw=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250617141444-0b0952b28c7e/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250617144221-ec7a7e474637 h1:YiZbn6KmjgZ62uM+kH95Snd2nQliDKDnGMAxRr/VoUw=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250617144221-ec7a7e474637/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250624064953-2c8dcbe93d14 h1:iCTrYc2+W2BFLOupRK1sD6sOgsK4NIs6WMC+4LiWCaY=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250624064953-2c8dcbe93d14/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250624093207-3fdf5c3ebf29 h1:JitS1izRltTyOaWnvXnmYywHj0napsL6y0nBYiWUCNo=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250624093207-3fdf5c3ebf29/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250624095852-147c7bc3a1d5 h1:0eV0E3kBZkOyoAurRmP9h4eHmFrZajOxSqoBgM3l3dk=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250624095852-147c7bc3a1d5/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250624102227-e600fedcab06 h1:+RSv62uIC7wsmibsp1XTanQMNznNeOGgPpfhb6ZHT4c=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250624102227-e600fedcab06/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8=
|
||||
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
|
||||
github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc=
|
||||
github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA=
|
||||
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
|
||||
github.com/argoproj/argo-workflows/v3 v3.6.4 h1:5+Cc1UwaQE5ka3w7R3hxZ1TK3M6VjDEXA5WSQ/IXrxY=
|
||||
github.com/argoproj/argo-workflows/v3 v3.6.4/go.mod h1:2f5zB8CkbNCCO1od+kd1dWkVokqcuyvu+tc+Jwx1MZg=
|
||||
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
|
||||
github.com/beego/beego/v2 v2.3.1 h1:7MUKMpJYzOXtCUsTEoXOxsDV/UcHw6CPbaWMlthVNsc=
|
||||
github.com/beego/beego/v2 v2.3.1/go.mod h1:5cqHsOHJIxkq44tBpRvtDe59GuVRVv/9/tyVDxd5ce4=
|
||||
github.com/beego/beego/v2 v2.3.7 h1:z4btKtjU/rfp5BiYHkGD2QPjK9i1E9GH+I7vfhn6Agk=
|
||||
github.com/beego/beego/v2 v2.3.7/go.mod h1:5cqHsOHJIxkq44tBpRvtDe59GuVRVv/9/tyVDxd5ce4=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/biter777/countries v1.7.5 h1:MJ+n3+rSxWQdqVJU8eBy9RqcdH6ePPn4PJHocVWUa+Q=
|
||||
github.com/biter777/countries v1.7.5/go.mod h1:1HSpZ526mYqKJcpT5Ti1kcGQ0L0SrXWIaptUWjFfv2E=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
|
||||
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
|
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
|
||||
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
|
||||
@@ -51,8 +63,8 @@ github.com/etcd-io/etcd v3.3.17+incompatible/go.mod h1:cdZ77EstHBwVtD6iTgzgvogwc
|
||||
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
|
||||
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
|
||||
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
|
||||
github.com/gabriel-vasile/mimetype v1.4.5 h1:J7wGKdGu33ocBOhGy0z653k/lFKLFDPJMG8Gql0kxn4=
|
||||
github.com/gabriel-vasile/mimetype v1.4.5/go.mod h1:ibHel+/kbxn9x2407k1izTA1S81ku1z/DlgOW2QE0M4=
|
||||
github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM=
|
||||
github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8=
|
||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
|
||||
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
|
||||
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||
@@ -68,8 +80,8 @@ github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/o
|
||||
github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY=
|
||||
github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY=
|
||||
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
|
||||
github.com/go-playground/validator/v10 v10.22.0 h1:k6HsTZ0sTnROkhS//R0O+55JgM8C4Bx7ia+JlgcnOao=
|
||||
github.com/go-playground/validator/v10 v10.22.0/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM=
|
||||
github.com/go-playground/validator/v10 v10.26.0 h1:SP05Nqhjcvz81uJaRfEV0YBSSSGMc/iMaVtFbr3Sw2k=
|
||||
github.com/go-playground/validator/v10 v10.26.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo=
|
||||
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
|
||||
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
|
||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
@@ -82,15 +94,15 @@ github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
|
||||
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
|
||||
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
|
||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
|
||||
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
|
||||
github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||
github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I=
|
||||
github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U=
|
||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0=
|
||||
github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
@@ -105,8 +117,8 @@ github.com/goraz/onion v0.1.3 h1:KhyvbDA2b70gcz/d5izfwTiOH8SmrvV43AsVzpng3n0=
|
||||
github.com/goraz/onion v0.1.3/go.mod h1:XEmz1XoBz+wxTgWB8NwuvRm4RAu3vKxvrmYtzK+XCuQ=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
|
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
|
||||
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
|
||||
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
|
||||
github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c=
|
||||
github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
|
||||
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
|
||||
github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
|
||||
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
|
||||
@@ -119,20 +131,23 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
|
||||
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0=
|
||||
github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
|
||||
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
|
||||
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
|
||||
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
|
||||
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
|
||||
github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
|
||||
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
|
||||
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
|
||||
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
|
||||
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
|
||||
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
|
||||
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
|
||||
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
|
||||
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
|
||||
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
|
||||
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
|
||||
@@ -152,10 +167,10 @@ github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8
|
||||
github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
|
||||
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
|
||||
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
||||
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
||||
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
|
||||
github.com/nats-io/nats.go v1.41.0 h1:PzxEva7fflkd+n87OtQTXqCTyLfIIMFJBpyccHLE2Ko=
|
||||
github.com/nats-io/nats.go v1.41.0/go.mod h1:wV73x0FSI/orHPSYoyMeJB+KajMDoWyXmFaRrrYaaTo=
|
||||
github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc=
|
||||
github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/nwtgck/go-fakelish v0.1.3 h1:bA8/xa9hQmzppexIhBvdmztcd/PJ4SPuAUTBdMKZ8G4=
|
||||
@@ -173,26 +188,26 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU=
|
||||
github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k=
|
||||
github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q=
|
||||
github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/prometheus/client_model v0.6.0 h1:k1v3CzpSRUTrKMppY35TLwPvxHqBu0bYgxZzqGIgaos=
|
||||
github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8=
|
||||
github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE=
|
||||
github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc=
|
||||
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
|
||||
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
|
||||
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
|
||||
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
|
||||
github.com/prometheus/common v0.63.0 h1:YR/EIY1o3mEFP/kZCD7iDMnLPlGyuU2Gb3HIcXnA98k=
|
||||
github.com/prometheus/common v0.63.0/go.mod h1:VVFF/fBIoToEnWRVkYoXEkq3R3paCoxG9PXP74SnV18=
|
||||
github.com/prometheus/procfs v0.16.0 h1:xh6oHhKwnOJKMYiYBDWmkHqQPyiY40sny36Cmx2bbsM=
|
||||
github.com/prometheus/procfs v0.16.0/go.mod h1:8veyXUu3nGP7oaCxhX6yeaM5u4stL2FeMXnCqhDthZg=
|
||||
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
|
||||
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
|
||||
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
|
||||
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
|
||||
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
|
||||
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
|
||||
github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8=
|
||||
github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
|
||||
github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0=
|
||||
github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY=
|
||||
github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ=
|
||||
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
|
||||
github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18 h1:DAYUYH5869yV94zvCES9F51oYtN5oGlwjxJJz7ZCnik=
|
||||
github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18/go.mod h1:nkxAfR/5quYxwPZhyDxgasBMnRtBZd0FCEpawpjMUFg=
|
||||
github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 h1:v9ezJDHA1XGxViAUSIoO/Id7Fl63u6d0YmsAm+/p2hs=
|
||||
github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02/go.mod h1:RF16/A3L0xSa0oSERcnhd8Pu3IXSDZSK2gmGIMsttFE=
|
||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/skarademir/naturalsort v0.0.0-20150715044055-69a5d87bef62/go.mod h1:oIdVclZaltY1Nf7OQUkg1/2jImBJ+ZfKZuDIRSwk3p0=
|
||||
@@ -233,16 +248,16 @@ github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfS
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
|
||||
go.mongodb.org/mongo-driver v1.17.1 h1:Wic5cJIwJgSpBhe3lx3+/RybR5PiYRMpVFgO7cOHyIM=
|
||||
go.mongodb.org/mongo-driver v1.17.1/go.mod h1:wwWm/+BuOddhcq3n68LKRmgk2wXzmF6s0SFOa0GINL4=
|
||||
go.mongodb.org/mongo-driver v1.17.3 h1:TQyXhnsWfWtgAhMtOgtYHMTkZIfBTpMTsMnd9ZBeHxQ=
|
||||
go.mongodb.org/mongo-driver v1.17.3/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ=
|
||||
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
|
||||
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
|
||||
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
|
||||
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
||||
@@ -262,12 +277,12 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
|
||||
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
|
||||
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
|
||||
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
|
||||
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs=
|
||||
golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
|
||||
golang.org/x/oauth2 v0.25.0 h1:CY4y7XT9v0cRI9oupztF8AgiIu99L/ksR/Xp/6jrZ70=
|
||||
golang.org/x/oauth2 v0.25.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
@@ -275,8 +290,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
|
||||
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
||||
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
|
||||
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
@@ -292,18 +307,18 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
|
||||
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
|
||||
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
|
||||
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q=
|
||||
golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM=
|
||||
golang.org/x/term v0.31.0 h1:erwDkOK1Msy6offm1mOgvspSkslFnIGsFnxOKoufg3o=
|
||||
golang.org/x/term v0.31.0/go.mod h1:R4BeIy7D95HzImkxGkTW1UQTtP54tio2RyHz7PwK0aw=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
|
||||
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
|
||||
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
|
||||
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
|
||||
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
|
||||
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
|
||||
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
@@ -340,8 +355,8 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8
|
||||
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
|
||||
google.golang.org/grpc v1.63.0 h1:WjKe+dnvABXyPJMD7KDNLxtoGk5tgk+YFWN6cBWjZE8=
|
||||
google.golang.org/grpc v1.63.0/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
|
||||
google.golang.org/protobuf v1.35.1 h1:m3LfL6/Ca+fqnjnlqQXNpFPABW1UD7mjh8KO2mKFytA=
|
||||
google.golang.org/protobuf v1.35.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
|
||||
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
|
||||
235
logger/argo_logs.go
Normal file
235
logger/argo_logs.go
Normal file
@@ -0,0 +1,235 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"oc-monitord/tools"
|
||||
"oc-monitord/utils"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
|
||||
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
|
||||
)
|
||||
|
||||
// An object to monitor the logs generated by a specific pod from a workflow execution
|
||||
type ArgoWatch struct {
|
||||
Name string
|
||||
Namespace string
|
||||
Status string
|
||||
Conditions
|
||||
Created string
|
||||
Started string
|
||||
Duration string
|
||||
Progress string
|
||||
Logs []string
|
||||
}
|
||||
|
||||
type Conditions struct {
|
||||
PodRunning bool
|
||||
Completed bool
|
||||
}
|
||||
|
||||
func (a *ArgoWatch) Equals(arg *ArgoWatch) bool {
|
||||
if arg == nil {
|
||||
return false
|
||||
}
|
||||
return a.Status == arg.Status && a.Progress == arg.Progress && a.Conditions.PodRunning == arg.Conditions.PodRunning && a.Conditions.Completed == arg.Conditions.Completed
|
||||
}
|
||||
|
||||
func NewArgoLogs(name string, namespace string, stepMax int) *ArgoLogs {
|
||||
return &ArgoLogs{
|
||||
Name: "oc-monitor-" + name,
|
||||
Namespace: namespace,
|
||||
CreatedDate: time.Now().Format("2006-01-02 15:04:05"),
|
||||
StepCount: 0,
|
||||
StepMax: stepMax,
|
||||
stop: false,
|
||||
Seen: []string{},
|
||||
}
|
||||
}
|
||||
|
||||
// An object to monitor and log the output of an argo submit
|
||||
type ArgoLogs struct {
|
||||
Name string
|
||||
Namespace string
|
||||
CreatedDate string
|
||||
StepCount int
|
||||
StepMax int
|
||||
stop bool
|
||||
Started time.Time
|
||||
Seen []string
|
||||
Logs []string
|
||||
IsStreaming bool
|
||||
}
|
||||
|
||||
func (a *ArgoLogs) NewWatch() *ArgoWatch {
|
||||
return &ArgoWatch{
|
||||
Name: a.Name,
|
||||
Namespace: a.Namespace,
|
||||
Status: "Pending",
|
||||
Created: a.CreatedDate,
|
||||
Started: a.Started.Format("2006-01-02 15:04:05"),
|
||||
Conditions: Conditions{
|
||||
PodRunning: a.StepCount > 0 && a.StepCount < a.StepMax,
|
||||
Completed: a.StepCount == a.StepMax,
|
||||
},
|
||||
Progress: fmt.Sprintf("%v/%v", a.StepCount, a.StepMax),
|
||||
Duration: "0s",
|
||||
Logs: []string{},
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (a *ArgoLogs) StartStepRecording(current_watch *ArgoWatch, logger zerolog.Logger) {
|
||||
jsonified, _ := json.Marshal(current_watch)
|
||||
logger.Info().Msg(string(jsonified))
|
||||
a.StepCount += 1
|
||||
a.Started = time.Now()
|
||||
}
|
||||
|
||||
|
||||
type ArgoPodLog struct {
|
||||
PodName string
|
||||
Step string
|
||||
Message string
|
||||
}
|
||||
|
||||
func NewArgoPodLog(name string, step string, msg string) ArgoPodLog {
|
||||
return ArgoPodLog{
|
||||
PodName: name,
|
||||
Step: step,
|
||||
Message: msg,
|
||||
}
|
||||
}
|
||||
|
||||
func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface) {
|
||||
var argoWatcher *ArgoWatch
|
||||
var pods []string
|
||||
var node wfv1.NodeStatus
|
||||
|
||||
wfl := utils.GetWFLogger("")
|
||||
wfl.Debug().Msg("Starting to log " + wfName)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
for event := range (watcher.ResultChan()) {
|
||||
wf, ok := event.Object.(*wfv1.Workflow)
|
||||
if !ok {
|
||||
wfl.Error().Msg("unexpected type")
|
||||
continue
|
||||
}
|
||||
if len(wf.Status.Nodes) == 0 {
|
||||
wfl.Info().Msg("No node status yet") // The first output of the channel doesn't contain Nodes so we skip it
|
||||
continue
|
||||
}
|
||||
|
||||
conditions := retrieveCondition(wf)
|
||||
|
||||
// Retrieving the Status for the main node, which is named after the workflow
|
||||
if node, ok = wf.Status.Nodes[wfName]; !ok {
|
||||
bytified, _ := json.MarshalIndent(wf.Status.Nodes,"","\t")
|
||||
wfl.Fatal().Msg("Could not find the " + wfName + " node in \n" + string(bytified))
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
start, _ := time.Parse(time.RFC3339, node.StartedAt.String() )
|
||||
duration := now.Sub(start)
|
||||
|
||||
newWatcher := ArgoWatch{
|
||||
Name: node.Name,
|
||||
Namespace: namespace,
|
||||
Status: string(node.Phase),
|
||||
Created: node.StartedAt.String(),
|
||||
Started: node.StartedAt.String(),
|
||||
Progress: string(node.Progress),
|
||||
Duration: duration.String(),
|
||||
Conditions: conditions,
|
||||
}
|
||||
|
||||
if argoWatcher == nil {
|
||||
argoWatcher = &newWatcher
|
||||
}
|
||||
|
||||
if !newWatcher.Equals(argoWatcher){
|
||||
jsonified, _ := json.Marshal(newWatcher)
|
||||
wfl.Info().Msg(string(jsonified))
|
||||
argoWatcher = &newWatcher
|
||||
}
|
||||
|
||||
// I don't think we need to use WaitGroup here, because the loop itself
|
||||
// acts as blocking process for the main thread, because Argo watch never closes the channel
|
||||
for _, pod := range wf.Status.Nodes{
|
||||
if !slices.Contains(pods,pod.Name){
|
||||
pl := wfl.With().Str("pod", pod.Name).Logger()
|
||||
if wfName == pod.Name { pods = append(pods, pod.Name); continue } // One of the node is the Workflow, the others are the pods so don't try to log on the wf name
|
||||
pl.Info().Msg("Found a new pod to log : " + pod.Name)
|
||||
wg.Add(1)
|
||||
go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg)
|
||||
pods = append(pods, pod.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// Stop listening to the chan when the Workflow is completed or something bad happened
|
||||
if node.Phase.Completed() {
|
||||
wfl.Info().Msg(wfName + " worflow completed")
|
||||
wg.Wait()
|
||||
wfl.Info().Msg(wfName + " exiting")
|
||||
break
|
||||
}
|
||||
if node.Phase.FailedOrError() {
|
||||
wfl.Error().Msg(wfName + "has failed, please refer to the logs")
|
||||
wfl.Error().Msg(node.Message)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func retrieveCondition(wf *wfv1.Workflow) (c Conditions) {
|
||||
for _, cond := range wf.Status.Conditions {
|
||||
if cond.Type == "PodRunning" {
|
||||
c.PodRunning = cond.Status == "True"
|
||||
}
|
||||
if cond.Type == "Completed" {
|
||||
c.Completed = cond.Status == "True"
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
|
||||
}
|
||||
|
||||
// Function needed to be executed as a go thread
|
||||
func logKubernetesPods(executionId string, wfName string,podName string, logger zerolog.Logger, wg *sync.WaitGroup){
|
||||
defer wg.Done()
|
||||
|
||||
s := strings.Split(podName, ".")
|
||||
name := s[0] + "-" + s[1]
|
||||
step := s[1]
|
||||
|
||||
k, err := tools.NewKubernetesTool()
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not get Kubernetes tools")
|
||||
return
|
||||
}
|
||||
|
||||
reader, err := k.GetPodLogger(executionId, wfName, podName)
|
||||
if err != nil {
|
||||
logger.Error().Msg(err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
scanner := bufio.NewScanner(reader)
|
||||
for scanner.Scan() {
|
||||
log := scanner.Text()
|
||||
podLog := NewArgoPodLog(name,step,log)
|
||||
jsonified, _ := json.Marshal(podLog)
|
||||
logger.Info().Msg(string(jsonified))
|
||||
}
|
||||
|
||||
}
|
||||
141
logger/local_argo_logs.go
Normal file
141
logger/local_argo_logs.go
Normal file
@@ -0,0 +1,141 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"oc-monitord/conf"
|
||||
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/logs"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
var logger zerolog.Logger
|
||||
var wfLogger zerolog.Logger
|
||||
|
||||
|
||||
// Take the slice of string that make up one round of stderr outputs from the --watch option in argo submit
|
||||
func NewLocalArgoWatch(inputs []string) *ArgoWatch {
|
||||
var workflow ArgoWatch
|
||||
|
||||
for _, input := range inputs {
|
||||
line := strings.TrimSpace(input)
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
switch {
|
||||
case strings.HasPrefix(line, "Name:"):
|
||||
workflow.Name = parseValue(line)
|
||||
case strings.HasPrefix(line, "Namespace:"):
|
||||
workflow.Namespace = parseValue(line)
|
||||
case strings.HasPrefix(line, "Status:"):
|
||||
workflow.Status = parseValue(line)
|
||||
case strings.HasPrefix(line, "PodRunning"):
|
||||
workflow.PodRunning = parseBoolValue(line)
|
||||
case strings.HasPrefix(line, "Completed"):
|
||||
workflow.Completed = parseBoolValue(line)
|
||||
case strings.HasPrefix(line, "Created:"):
|
||||
workflow.Created = parseValue(line)
|
||||
case strings.HasPrefix(line, "Started:"):
|
||||
workflow.Started = parseValue(line)
|
||||
case strings.HasPrefix(line, "Duration:"):
|
||||
workflow.Duration = parseValue(line)
|
||||
case strings.HasPrefix(line, "Progress:"):
|
||||
workflow.Progress = parseValue(line)
|
||||
}
|
||||
}
|
||||
|
||||
return &workflow
|
||||
}
|
||||
|
||||
|
||||
|
||||
func parseValue(line string) string {
|
||||
parts := strings.SplitN(line, ":", 2)
|
||||
if len(parts) < 2 {
|
||||
return ""
|
||||
}
|
||||
return strings.TrimSpace(parts[1])
|
||||
}
|
||||
|
||||
func parseBoolValue(line string) bool {
|
||||
value := parseValue(line)
|
||||
return value == "True"
|
||||
}
|
||||
|
||||
func LogLocalWorkflow(wfName string, pipe io.ReadCloser, wg *sync.WaitGroup) {
|
||||
logger = logs.GetLogger()
|
||||
|
||||
logger.Debug().Msg("created wf_logger")
|
||||
wfLogger = logger.With().Str("argo_name", wfName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger()
|
||||
|
||||
var current_watch, previous_watch ArgoWatch
|
||||
|
||||
watch_output := make([]string, 0)
|
||||
scanner := bufio.NewScanner(pipe)
|
||||
for scanner.Scan() {
|
||||
log := scanner.Text()
|
||||
watch_output = append(watch_output, log)
|
||||
|
||||
// Log the progress of the WF
|
||||
if strings.HasPrefix(log, "Progress:") {
|
||||
|
||||
current_watch = *NewLocalArgoWatch(watch_output)
|
||||
workflowName := current_watch.Name
|
||||
if !current_watch.Equals(&previous_watch) {
|
||||
wg.Add(1)
|
||||
// checkStatus(current_watch.Status, previous_watch.Status)
|
||||
jsonified, err := json.Marshal(current_watch)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not create watch log for " + workflowName)
|
||||
}
|
||||
wfLogger.Info().Msg(string(jsonified))
|
||||
previous_watch = current_watch
|
||||
current_watch = ArgoWatch{}
|
||||
wg.Done()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// Debug, no logs sent
|
||||
func LogLocalPod(wfName string, pipe io.ReadCloser, steps []string, wg *sync.WaitGroup) {
|
||||
scanner := bufio.NewScanner(pipe)
|
||||
for scanner.Scan() {
|
||||
var podLogger zerolog.Logger
|
||||
wg.Add(1)
|
||||
|
||||
line := scanner.Text()
|
||||
podName := strings.Split(line, ":")[0]
|
||||
podLogger = wfLogger.With().Str("step_name", getStepName(podName, steps)).Logger()
|
||||
log := strings.Split(line,podName+":")[1]
|
||||
podLog := NewArgoPodLog(wfName,podName,log)
|
||||
|
||||
jsonifiedLog, err := json.Marshal(podLog)
|
||||
if err != nil {
|
||||
podLogger.Fatal().Msg(err.Error())
|
||||
}
|
||||
|
||||
podLogger.Info().Msg(string(jsonifiedLog))
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func getStepName(podName string, steps []string) string {
|
||||
|
||||
for _, step := range(steps) {
|
||||
if strings.Contains(podName,step){
|
||||
return step
|
||||
}
|
||||
}
|
||||
|
||||
return "error"
|
||||
}
|
||||
|
||||
224
main.go
224
main.go
@@ -3,17 +3,17 @@ package main
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"regexp"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"oc-monitord/conf"
|
||||
l "oc-monitord/logger"
|
||||
"oc-monitord/models"
|
||||
u "oc-monitord/utils"
|
||||
"oc-monitord/workflow_builder"
|
||||
@@ -54,7 +54,7 @@ func main() {
|
||||
|
||||
os.Setenv("test_service", "true") // Only for service demo, delete before merging on main
|
||||
parser = *argparse.NewParser("oc-monitord", "Launch the execution of a workflow given as a parameter and sends the produced logs to a loki database")
|
||||
loadConfig(false, &parser)
|
||||
setConf(&parser)
|
||||
oclib.InitDaemon("oc-monitord")
|
||||
|
||||
oclib.SetConfig(
|
||||
@@ -65,14 +65,18 @@ func main() {
|
||||
conf.GetConfig().Logs,
|
||||
)
|
||||
|
||||
logger = logs.CreateLogger("oc-monitord")
|
||||
logger = u.GetLogger()
|
||||
|
||||
logger.Debug().Msg("Loki URL : " + conf.GetConfig().LokiURL)
|
||||
logger.Debug().Msg("Workflow executed : " + conf.GetConfig().ExecutionID)
|
||||
logger.Info().Msg("Workflow executed : " + conf.GetConfig().ExecutionID)
|
||||
exec := u.GetExecution(conf.GetConfig().ExecutionID)
|
||||
if exec == nil {
|
||||
logger.Fatal().Msg("Could not retrieve workflow ID from execution ID " + conf.GetConfig().ExecutionID + " on peer " + conf.GetConfig().PeerID)
|
||||
return
|
||||
}
|
||||
conf.GetConfig().WorkflowID = exec.WorkflowID
|
||||
|
||||
logger.Debug().Msg("Starting construction of yaml argo for workflow :" + exec.WorkflowID)
|
||||
logger.Info().Msg("Starting construction of yaml argo for workflow :" + exec.WorkflowID)
|
||||
|
||||
if _, err := os.Stat("./argo_workflows/"); os.IsNotExist(err) {
|
||||
os.Mkdir("./argo_workflows/", 0755)
|
||||
@@ -87,153 +91,125 @@ func main() {
|
||||
logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API")
|
||||
}
|
||||
|
||||
argo_file_path, stepMax, err := new_wf.ExportToArgo(exec.ExecutionsID, conf.GetConfig().Timeout)
|
||||
builder, _, err := new_wf.ExportToArgo(exec.ExecutionsID, conf.GetConfig().Timeout) // Removed stepMax so far, I don't know if we need it anymore
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not create the Argo file for " + conf.GetConfig().WorkflowID)
|
||||
logger.Error().Msg(err.Error())
|
||||
}
|
||||
logger.Debug().Msg("Created :" + argo_file_path)
|
||||
|
||||
workflowName = getContainerName(argo_file_path)
|
||||
argoFilePath, err := builder.CompleteBuild(exec.ExecutionsID)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Error when completing the build of the workflow: " + err.Error())
|
||||
}
|
||||
|
||||
wf_logger = logger.With().Str("argo_name", workflowName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger()
|
||||
wf_logger.Debug().Msg("Testing argo name")
|
||||
workflowName = getContainerName(argoFilePath)
|
||||
|
||||
if conf.GetConfig().KubeHost == "" {
|
||||
// Not in a k8s environment, get conf from parameters
|
||||
fmt.Println("Executes outside of k8s")
|
||||
executeOutside(argo_file_path, stepMax)
|
||||
logger.Info().Msg("Executes outside of k8s")
|
||||
executeOutside(argoFilePath, builder.Workflow)
|
||||
} else {
|
||||
// Executed in a k8s environment
|
||||
fmt.Println("Executes inside a k8s")
|
||||
executeInside(exec.GetID(), "argo", argo_file_path, stepMax)
|
||||
logger.Info().Msg("Executes inside a k8s")
|
||||
// executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID()
|
||||
executeInside(exec.ExecutionsID, argoFilePath)
|
||||
}
|
||||
}
|
||||
|
||||
// So far we only log the output from
|
||||
func executeInside(execID string, ns string, argo_file_path string, stepMax int) {
|
||||
func executeInside(ns string, argo_file_path string) {
|
||||
t, err := tools2.NewService(conf.GetConfig().Mode)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not create KubernetesTool")
|
||||
return
|
||||
}
|
||||
|
||||
name, err := t.CreateArgoWorkflow(argo_file_path, ns)
|
||||
// _ = name
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not create argo workflow : " + err.Error())
|
||||
logger.Info().Msg(fmt.Sprint("CA :" + conf.GetConfig().KubeCA))
|
||||
logger.Info().Msg(fmt.Sprint("Cert :" + conf.GetConfig().KubeCert))
|
||||
logger.Info().Msg(fmt.Sprint("Data :" + conf.GetConfig().KubeData))
|
||||
return
|
||||
} else {
|
||||
split := strings.Split(argo_file_path, "_")
|
||||
argoLogs := models.NewArgoLogs(split[0], "argo", stepMax)
|
||||
argoLogs.StartStepRecording(argoLogs.NewWatch(), wf_logger)
|
||||
err := t.LogWorkflow(execID, ns, name, argo_file_path, stepMax, argoLogs.NewWatch(), argoLogs.NewWatch(), argoLogs, []string{}, logWorkflow)
|
||||
watcher, err := t.GetArgoWatch(ns, workflowName)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not retrieve Watcher : " + err.Error())
|
||||
}
|
||||
|
||||
l.LogKubernetesArgo(name, ns, watcher)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not log workflow : " + err.Error())
|
||||
}
|
||||
|
||||
logger.Info().Msg("Finished, exiting...")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func executeOutside(argo_file_path string, stepMax int) {
|
||||
// var stdout, stderr, stdout_logs, stderr_logs io.ReadCloser
|
||||
var stdout, stderr io.ReadCloser
|
||||
// var stderr io.ReadCloser
|
||||
func executeOutside(argo_file_path string, workflow *models.Workflow) {
|
||||
var stdoutSubmit, stderrSubmit io.ReadCloser
|
||||
var stdoutLogs, stderrLogs io.ReadCloser
|
||||
var wg sync.WaitGroup
|
||||
var err error
|
||||
cmd := exec.Command("argo", "submit", "--log", argo_file_path, "--serviceaccount=argo", "-n", "argo")
|
||||
if stdout, err = cmd.StdoutPipe(); err != nil {
|
||||
|
||||
logger.Debug().Msg("executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID)
|
||||
|
||||
cmdSubmit := exec.Command("argo", "submit", "--watch", argo_file_path, "--serviceaccount", "sa-"+conf.GetConfig().ExecutionID, "-n", conf.GetConfig().ExecutionID)
|
||||
if stdoutSubmit, err = cmdSubmit.StdoutPipe(); err != nil {
|
||||
wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error())
|
||||
return
|
||||
}
|
||||
if err := cmd.Start(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
split := strings.Split(argo_file_path, "_")
|
||||
argoLogs := models.NewArgoLogs(split[0], "argo", stepMax)
|
||||
argoLogs.StartStepRecording(argoLogs.NewWatch(), wf_logger)
|
||||
argoLogs.IsStreaming = true
|
||||
go logWorkflow(argo_file_path, stepMax, stdout, argoLogs.NewWatch(), argoLogs.NewWatch(), argoLogs, []string{}, &wg)
|
||||
|
||||
if err := cmd.Wait(); err != nil {
|
||||
wf_logger.Error().Msg("Could not execute argo submit")
|
||||
wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderr).Text())
|
||||
cmdLogs := exec.Command("argo", "logs", "oc-monitor-"+workflowName, "-n", conf.GetConfig().ExecutionID, "--follow", "--no-color")
|
||||
if stdoutLogs, err = cmdLogs.StdoutPipe(); err != nil {
|
||||
wf_logger.Error().Msg("Could not retrieve stdoutpipe for 'argo logs'" + err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
var steps []string
|
||||
for _, template := range workflow.Spec.Templates {
|
||||
steps = append(steps, template.Name)
|
||||
}
|
||||
|
||||
go l.LogLocalWorkflow(workflowName, stdoutSubmit, &wg)
|
||||
go l.LogLocalPod(workflowName, stdoutLogs, steps, &wg)
|
||||
|
||||
logger.Info().Msg("Starting argo submit")
|
||||
if err := cmdSubmit.Start(); err != nil {
|
||||
wf_logger.Error().Msg("Could not start argo submit")
|
||||
wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text())
|
||||
updateStatus("fatal", "")
|
||||
}
|
||||
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
logger.Info().Msg("Running argo logs")
|
||||
if err := cmdLogs.Run(); err != nil {
|
||||
wf_logger.Error().Msg("Could not run '" + strings.Join(cmdLogs.Args, " ") + "'")
|
||||
|
||||
wf_logger.Fatal().Msg(err.Error() + bufio.NewScanner(stderrLogs).Text())
|
||||
|
||||
}
|
||||
|
||||
logger.Info().Msg("Waiting argo submit")
|
||||
if err := cmdSubmit.Wait(); err != nil {
|
||||
wf_logger.Error().Msg("Could not execute argo submit")
|
||||
wf_logger.Error().Msg(err.Error() + bufio.NewScanner(stderrSubmit).Text())
|
||||
updateStatus("fatal", "")
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// We could improve this function by creating an object with the same attribute as the output
|
||||
// and only send a new log if the current object has different values than the previous
|
||||
func logWorkflow(argo_file_path string, stepMax int, pipe io.ReadCloser,
|
||||
current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch,
|
||||
argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup) {
|
||||
scanner := bufio.NewScanner(pipe)
|
||||
count := 0
|
||||
see := ""
|
||||
seeit := 0
|
||||
for scanner.Scan() {
|
||||
log := scanner.Text()
|
||||
if strings.Contains(log, "capturing logs") && count == 0 {
|
||||
if !argoLogs.IsStreaming {
|
||||
wg.Add(1)
|
||||
}
|
||||
seeit++
|
||||
} else if count == 0 {
|
||||
if argoLogs.IsStreaming {
|
||||
continue
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
if count == 1 {
|
||||
see = log
|
||||
if slices.Contains(argoLogs.Seen, see) && !argoLogs.IsStreaming {
|
||||
wg.Done()
|
||||
seeit--
|
||||
break
|
||||
}
|
||||
}
|
||||
if !slices.Contains(current_watch.Logs, log) {
|
||||
current_watch.Logs = append(current_watch.Logs, log)
|
||||
}
|
||||
count++
|
||||
if strings.Contains(log, "sub-process exited") {
|
||||
current_watch = argoLogs.StopStepRecording(current_watch)
|
||||
argoLogs.Seen = append(argoLogs.Seen, see)
|
||||
if checkStatus(current_watch, previous_watch, argoLogs) {
|
||||
count = 0
|
||||
if !argoLogs.IsStreaming {
|
||||
wg.Done()
|
||||
}
|
||||
seeit--
|
||||
}
|
||||
jsonified, err := json.Marshal(current_watch)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not create watch log")
|
||||
}
|
||||
if current_watch.Status == "Failed" {
|
||||
wf_logger.Error().Msg(string(jsonified))
|
||||
} else {
|
||||
wf_logger.Info().Msg(string(jsonified))
|
||||
}
|
||||
previous_watch = current_watch
|
||||
current_watch = &models.ArgoWatch{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func loadConfig(is_k8s bool, parser *argparse.Parser) {
|
||||
var o *onion.Onion
|
||||
o = initOnion(o)
|
||||
setConf(is_k8s, o, parser)
|
||||
|
||||
if !IsValidUUID(conf.GetConfig().ExecutionID) {
|
||||
logger.Fatal().Msg("Provided ID is not an UUID")
|
||||
}
|
||||
}
|
||||
|
||||
func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) {
|
||||
func setConf(parser *argparse.Parser) {
|
||||
url := parser.String("u", "url", &argparse.Options{Required: true, Default: "http://127.0.0.1:3100", Help: "Url to the Loki database logs will be sent to"})
|
||||
mode := parser.String("M", "mode", &argparse.Options{Required: false, Default: "", Help: "Mode of the execution"})
|
||||
execution := parser.String("e", "execution", &argparse.Options{Required: true, Help: "Execution ID of the workflow to request from oc-catalog API"})
|
||||
peer := parser.String("p", "peer", &argparse.Options{Required: false, Default: "", Help: "Peer ID of the workflow to request from oc-catalog API"})
|
||||
groups := parser.String("g", "groups", &argparse.Options{Required: false, Default: "", Help: "Groups of the peer to request from oc-catalog API"})
|
||||
|
||||
mongo := parser.String("m", "mongo", &argparse.Options{Required: true, Default: "mongodb://127.0.0.1:27017", Help: "URL to reach the MongoDB"})
|
||||
db := parser.String("d", "database", &argparse.Options{Required: true, Default: "DC_myDC", Help: "Name of the database to query in MongoDB"})
|
||||
timeout := parser.Int("t", "timeout", &argparse.Options{Required: false, Default: -1, Help: "Timeout for the execution of the workflow"})
|
||||
@@ -245,9 +221,11 @@ func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) {
|
||||
host := parser.String("H", "host", &argparse.Options{Required: false, Default: "", Help: "Host for the Kubernetes cluster"})
|
||||
port := parser.String("P", "port", &argparse.Options{Required: false, Default: "6443", Help: "Port for the Kubernetes cluster"})
|
||||
|
||||
// argoHost := parser.String("h", "argoHost", &argparse.Options{Required: false, Default: "", Help: "Host where Argo is running from"}) // can't use -h because its reserved to help
|
||||
|
||||
err := parser.Parse(os.Args)
|
||||
if err != nil {
|
||||
fmt.Println(parser.Usage(err))
|
||||
logger.Info().Msg(parser.Usage(err))
|
||||
os.Exit(1)
|
||||
}
|
||||
conf.GetConfig().Logs = "debug"
|
||||
@@ -258,10 +236,12 @@ func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) {
|
||||
conf.GetConfig().Mode = *mode
|
||||
conf.GetConfig().ExecutionID = *execution
|
||||
conf.GetConfig().PeerID = *peer
|
||||
|
||||
conf.GetConfig().Groups = strings.Split((*groups), ",")
|
||||
conf.GetConfig().KubeHost = *host
|
||||
conf.GetConfig().KubePort = *port
|
||||
|
||||
// conf.GetConfig().ArgoHost = *argoHost
|
||||
|
||||
decoded, err := base64.StdEncoding.DecodeString(*ca)
|
||||
if err == nil {
|
||||
conf.GetConfig().KubeCA = string(decoded)
|
||||
@@ -314,28 +294,8 @@ func getContainerName(argo_file string) string {
|
||||
re := regexp.MustCompile(regex)
|
||||
|
||||
container_name := re.FindString(argo_file)
|
||||
return container_name
|
||||
}
|
||||
|
||||
// Uses the ArgoWatch object to update status of the workflow execution object
|
||||
func checkStatus(current *models.ArgoWatch, previous *models.ArgoWatch, argoLogs *models.ArgoLogs) bool {
|
||||
if previous == nil || current.Status != previous.Status || argoLogs.IsStreaming {
|
||||
argoLogs.StepCount += 1
|
||||
if len(current.Logs) > 0 {
|
||||
newLogs := []string{}
|
||||
for _, log := range current.Logs {
|
||||
if !slices.Contains(argoLogs.Logs, log) {
|
||||
newLogs = append(newLogs, log)
|
||||
}
|
||||
}
|
||||
updateStatus(current.Status, strings.Join(newLogs, "\n"))
|
||||
current.Logs = newLogs
|
||||
argoLogs.Logs = append(argoLogs.Logs, newLogs...)
|
||||
} else {
|
||||
updateStatus(current.Status, "")
|
||||
}
|
||||
}
|
||||
return previous == nil || current.Status != previous.Status || argoLogs.IsStreaming
|
||||
return container_name
|
||||
}
|
||||
|
||||
func updateStatus(status string, log string) {
|
||||
|
||||
@@ -1,145 +0,0 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/acarl005/stripansi"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
type ArgoWatch struct {
|
||||
Name string
|
||||
Namespace string
|
||||
Status string
|
||||
Conditions
|
||||
Created string
|
||||
Started string
|
||||
Duration string
|
||||
Progress string
|
||||
Logs []string
|
||||
}
|
||||
|
||||
type Conditions struct {
|
||||
PodRunning bool
|
||||
Completed bool
|
||||
}
|
||||
|
||||
func (a *ArgoWatch) Equals(arg *ArgoWatch) bool {
|
||||
if arg == nil {
|
||||
return false
|
||||
}
|
||||
return a.Status == arg.Status && a.Progress == arg.Progress && a.Conditions.PodRunning == arg.Conditions.PodRunning && a.Conditions.Completed == arg.Conditions.Completed
|
||||
}
|
||||
|
||||
func NewArgoLogs(name string, namespace string, stepMax int) *ArgoLogs {
|
||||
return &ArgoLogs{
|
||||
Name: "oc-monitor-" + name,
|
||||
Namespace: namespace,
|
||||
CreatedDate: time.Now().Format("2006-01-02 15:04:05"),
|
||||
StepCount: 0,
|
||||
StepMax: stepMax,
|
||||
stop: false,
|
||||
Seen: []string{},
|
||||
}
|
||||
}
|
||||
|
||||
type ArgoLogs struct {
|
||||
Name string
|
||||
Namespace string
|
||||
CreatedDate string
|
||||
StepCount int
|
||||
StepMax int
|
||||
stop bool
|
||||
Started time.Time
|
||||
Seen []string
|
||||
Logs []string
|
||||
IsStreaming bool
|
||||
}
|
||||
|
||||
func (a *ArgoLogs) NewWatch() *ArgoWatch {
|
||||
return &ArgoWatch{
|
||||
Name: a.Name,
|
||||
Namespace: a.Namespace,
|
||||
Status: "Pending",
|
||||
Created: a.CreatedDate,
|
||||
Started: a.Started.Format("2006-01-02 15:04:05"),
|
||||
Conditions: Conditions{
|
||||
PodRunning: a.StepCount > 0 && a.StepCount < a.StepMax,
|
||||
Completed: a.StepCount == a.StepMax,
|
||||
},
|
||||
Progress: fmt.Sprintf("%v/%v", a.StepCount, a.StepMax),
|
||||
Duration: "0s",
|
||||
Logs: []string{},
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (a *ArgoLogs) StartStepRecording(current_watch *ArgoWatch, logger zerolog.Logger) {
|
||||
jsonified, _ := json.Marshal(current_watch)
|
||||
logger.Info().Msg(string(jsonified))
|
||||
a.StepCount += 1
|
||||
a.Started = time.Now()
|
||||
}
|
||||
|
||||
func (a *ArgoLogs) StopStepRecording(current *ArgoWatch) *ArgoWatch {
|
||||
fn := strings.Split(a.Name, "_")
|
||||
logs := []string{}
|
||||
err := false
|
||||
end := ""
|
||||
for _, input := range current.Logs {
|
||||
line := strings.TrimSpace(input)
|
||||
if line == "" || !strings.Contains(line, fn[0]) || !strings.Contains(line, ":") {
|
||||
continue
|
||||
}
|
||||
step := strings.Split(line, ":")
|
||||
if strings.Contains(line, "sub-process exited") {
|
||||
b := strings.Split(line, "time=\"")
|
||||
if len(b) > 1 {
|
||||
end = b[1][:19]
|
||||
}
|
||||
}
|
||||
if len(step) < 2 || strings.Contains(line, "time=") || strings.TrimSpace(strings.Join(step[1:], " : ")) == "" || strings.TrimSpace(strings.Join(step[1:], " : ")) == a.Name {
|
||||
continue
|
||||
}
|
||||
log := stripansi.Strip(strings.TrimSpace(strings.Join(step[1:], " : ")))
|
||||
t, e := strconv.Unquote(log)
|
||||
if e == nil {
|
||||
logs = append(logs, t)
|
||||
} else {
|
||||
logs = append(logs, strings.ReplaceAll(log, "\"", "`"))
|
||||
}
|
||||
|
||||
if strings.Contains(logs[len(logs)-1], "Error") {
|
||||
err = true
|
||||
}
|
||||
}
|
||||
status := "Pending"
|
||||
if a.StepCount > 0 {
|
||||
status = "Running"
|
||||
}
|
||||
if a.StepCount == a.StepMax {
|
||||
if err {
|
||||
status = "Failed"
|
||||
} else {
|
||||
status = "Succeeded"
|
||||
}
|
||||
}
|
||||
duration := float64(0)
|
||||
if end != "" {
|
||||
timeE, _ := time.Parse("2006-01-02T15:04:05", end)
|
||||
duration = timeE.Sub(a.Started).Seconds()
|
||||
}
|
||||
current.Conditions = Conditions{
|
||||
PodRunning: a.StepCount > 0 && a.StepCount < a.StepMax,
|
||||
Completed: a.StepCount == a.StepMax,
|
||||
}
|
||||
current.Progress = fmt.Sprintf("%v/%v", a.StepCount, a.StepMax)
|
||||
current.Duration = fmt.Sprintf("%v", fmt.Sprintf("%.2f", duration)+"s")
|
||||
|
||||
current.Status = status
|
||||
return current
|
||||
}
|
||||
@@ -1,5 +1,7 @@
|
||||
package models
|
||||
|
||||
import "gopkg.in/yaml.v3"
|
||||
|
||||
type ServiceResource struct {
|
||||
Action string `yaml:"action,omitempty"`
|
||||
SuccessCondition string `yaml:"successCondition,omitempty"`
|
||||
@@ -15,6 +17,24 @@ type Service struct {
|
||||
Spec ServiceSpec `yaml:"spec"`
|
||||
}
|
||||
|
||||
func (s *Service) BindToArgo(workflow *Workflow) error {
|
||||
service_manifest, err := yaml.Marshal(s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
service_template := Template{Name: "workflow-service-pod",
|
||||
Resource: ServiceResource{
|
||||
Action: "create",
|
||||
SuccessCondition: "status.succeeded > 0",
|
||||
FailureCondition: "status.failed > 3",
|
||||
SetOwnerReference: true,
|
||||
Manifest: string(service_manifest),
|
||||
},
|
||||
}
|
||||
workflow.Spec.Templates = append(workflow.Spec.Templates, service_template)
|
||||
return nil
|
||||
}
|
||||
|
||||
type Metadata struct {
|
||||
Name string `yaml:"name"`
|
||||
}
|
||||
|
||||
@@ -1,8 +1,15 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
w "cloud.o-forge.io/core/oc-lib/models/workflow"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
||||
"cloud.o-forge.io/core/oc-lib/models/common/models"
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||||
)
|
||||
@@ -12,11 +19,60 @@ type Parameter struct {
|
||||
Value string `yaml:"value,omitempty"`
|
||||
}
|
||||
|
||||
type Bounds struct {
|
||||
CPU string `yaml:"cpu,omitempty"`
|
||||
Memory string `yaml:"memory,omitempty"`
|
||||
GPU string `yaml:"nvidia.com/gpu,omitempty"`
|
||||
}
|
||||
|
||||
func NewBounds() *Bounds {
|
||||
return &Bounds{
|
||||
CPU: "0",
|
||||
Memory: "0",
|
||||
GPU: "0",
|
||||
}
|
||||
}
|
||||
|
||||
func (b *Bounds) Set(value float64, what string, isMin bool) bool {
|
||||
i := float64(0)
|
||||
switch what {
|
||||
case "cpu":
|
||||
if newI, err := strconv.ParseFloat(b.CPU, 64); err == nil {
|
||||
i = newI
|
||||
}
|
||||
case "ram":
|
||||
if newI, err := strconv.ParseFloat(b.Memory, 64); err == nil {
|
||||
i = newI
|
||||
}
|
||||
case "gpu":
|
||||
if newI, err := strconv.ParseFloat(b.GPU, 64); err == nil {
|
||||
i = newI
|
||||
}
|
||||
}
|
||||
ok := (value > i && !isMin) || (value < i && isMin)
|
||||
if ok {
|
||||
switch what {
|
||||
case "cpu":
|
||||
b.CPU = fmt.Sprintf("%f", value)
|
||||
return true
|
||||
case "ram":
|
||||
b.Memory = fmt.Sprintf("%fGi", value)
|
||||
return true
|
||||
case "gpu":
|
||||
b.GPU = fmt.Sprintf("%f", value)
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type Container struct {
|
||||
Image string `yaml:"image"`
|
||||
Command []string `yaml:"command,omitempty,flow"`
|
||||
Args []string `yaml:"args,omitempty,flow"`
|
||||
VolumeMounts []VolumeMount `yaml:"volumeMounts,omitempty"`
|
||||
Requests Bounds `yaml:"requests,omitempty"`
|
||||
Limits Bounds `yaml:"limits,omitempty"`
|
||||
}
|
||||
|
||||
func (c *Container) AddVolumeMount(volumeMount VolumeMount, volumes []VolumeMount) []VolumeMount {
|
||||
@@ -37,27 +93,88 @@ func (c *Container) AddVolumeMount(volumeMount VolumeMount, volumes []VolumeMoun
|
||||
return volumes
|
||||
}
|
||||
|
||||
type VolumeMount struct {
|
||||
Name string `yaml:"name"`
|
||||
MountPath string `yaml:"mountPath"`
|
||||
Storage *resources.StorageResource `yaml:"-"`
|
||||
}
|
||||
|
||||
type Task struct {
|
||||
Name string `yaml:"name"`
|
||||
Template string `yaml:"template"`
|
||||
Dependencies []string `yaml:"dependencies,omitempty"`
|
||||
Name string `yaml:"name"`
|
||||
Template string `yaml:"template"`
|
||||
Dependencies []string `yaml:"dependencies,omitempty"`
|
||||
NodeSelector map[string]string `yaml:"nodeSelector,omitempty"`
|
||||
Arguments struct {
|
||||
Parameters []Parameter `yaml:"parameters,omitempty"`
|
||||
} `yaml:"arguments,omitempty"`
|
||||
}
|
||||
|
||||
func NewTask(processingName string, graphItemID string) *Task {
|
||||
unique_name := GetArgoName(processingName, graphItemID)
|
||||
return &Task{
|
||||
Name: unique_name,
|
||||
Template: unique_name,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Task) BindToArgo(
|
||||
dag *Dag,
|
||||
graphItemID string,
|
||||
originWf *w.Workflow,
|
||||
processing *resources.ProcessingResource,
|
||||
firstItems, lastItems []string,
|
||||
) (*Dag, []string, []string) {
|
||||
if instance := processing.GetSelectedInstance(); instance != nil {
|
||||
t.addParams(instance.(*resources.ProcessingInstance).Env)
|
||||
t.addParams(instance.(*resources.ProcessingInstance).Inputs)
|
||||
t.addParams(instance.(*resources.ProcessingInstance).Outputs)
|
||||
}
|
||||
t.Dependencies = TransformDepsToArgo(originWf.GetDependencies(graphItemID))
|
||||
name := ""
|
||||
if originWf.Graph.Items[graphItemID].Processing != nil {
|
||||
name = originWf.Graph.Items[graphItemID].Processing.GetName()
|
||||
}
|
||||
if originWf.Graph.Items[graphItemID].Workflow != nil {
|
||||
name = originWf.Graph.Items[graphItemID].Workflow.GetName()
|
||||
}
|
||||
if len(t.Dependencies) == 0 && name != "" {
|
||||
firstItems = append(firstItems, GetArgoName(name, graphItemID))
|
||||
}
|
||||
if deps := originWf.IsDependancy(graphItemID); len(deps) == 0 && name != "" {
|
||||
lastItems = append(lastItems, GetArgoName(name, graphItemID))
|
||||
}
|
||||
dag.Tasks = append(dag.Tasks, *t)
|
||||
return dag, firstItems, lastItems
|
||||
}
|
||||
|
||||
func (t *Task) addParams(params []models.Param) {
|
||||
for _, value := range params {
|
||||
t.Arguments.Parameters = append(t.Arguments.Parameters, Parameter{
|
||||
Name: value.Name,
|
||||
Value: value.Value,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (t *Task) GetDeps(name string) (int, string) {
|
||||
for i, deps := range t.Dependencies {
|
||||
if strings.Contains(deps, name) {
|
||||
return i, deps
|
||||
}
|
||||
}
|
||||
return 0, ""
|
||||
}
|
||||
|
||||
type Dag struct {
|
||||
Tasks []Task `yaml:"tasks,omitempty"`
|
||||
}
|
||||
|
||||
func (d *Dag) GetTask(taskName string) *Task {
|
||||
for _, task := range d.Tasks {
|
||||
if strings.Contains(task.Name, taskName) {
|
||||
return &task
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type TemplateMetadata struct {
|
||||
Labels map[string]string `yaml:"labels,omitempty"`
|
||||
Labels map[string]string `yaml:"labels,omitempty"`
|
||||
Annotations map[string]string `yaml:"annotations,omitempty"`
|
||||
}
|
||||
|
||||
type Secret struct {
|
||||
@@ -65,6 +182,10 @@ type Secret struct {
|
||||
Key string `yaml:"key"`
|
||||
}
|
||||
|
||||
func NewSecret(name string, key string) *Secret {
|
||||
return &Secret{Name: name, Key: key + "-key"}
|
||||
}
|
||||
|
||||
type Key struct {
|
||||
Key string `yaml:"key"`
|
||||
Bucket string `yaml:"bucket"`
|
||||
@@ -80,6 +201,59 @@ type Artifact struct {
|
||||
S3 *Key `yaml:"s3,omitempty"`
|
||||
}
|
||||
|
||||
func NewArtifact(name string, rw graph.StorageProcessingGraphLink, params []models.Param, template Template) *Artifact {
|
||||
if rw.Write {
|
||||
name += "-" + rw.Destination + "-input-write"
|
||||
} else {
|
||||
name = "-" + rw.Destination + "-input-read"
|
||||
}
|
||||
return &Artifact{
|
||||
Name: name,
|
||||
Path: template.ReplacePerEnv(rw.Source, params),
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Artifact) BindToArgo(storageType enum.StorageType, rw graph.StorageProcessingGraphLink, params []models.Param, template Template) {
|
||||
if rw.Write {
|
||||
template.Outputs.Artifacts = append(template.Inputs.Artifacts, *a)
|
||||
} else {
|
||||
template.Inputs.Artifacts = append(template.Outputs.Artifacts, *a)
|
||||
}
|
||||
}
|
||||
|
||||
func (a *Artifact) bindS3(rw graph.StorageProcessingGraphLink, params []models.Param, template Template) {
|
||||
a.S3 = &Key{
|
||||
Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, params),
|
||||
Insecure: true, // temporary
|
||||
}
|
||||
/* sel := storage.GetSelectedInstance()
|
||||
if sel != nil {
|
||||
if sel.(*resources.StorageResourceInstance).Credentials != nil {
|
||||
tool, err := tools2.NewService(conf.GetConfig().Mode)
|
||||
if err != nil || tool == nil {
|
||||
logger.Error().Msg("Could not create the access secret")
|
||||
} else {
|
||||
id, err := tool.CreateAccessSecret(namespace,
|
||||
sel.(*resources.StorageResourceInstance).Credentials.Login,
|
||||
sel.(*resources.StorageResourceInstance).Credentials.Pass)
|
||||
if err == nil {
|
||||
a.S3.AccessKeySecret = NewSecret(id, "access")
|
||||
a.S3.SecretKeySecret = NewSecret(id, "secret")
|
||||
}
|
||||
}
|
||||
}
|
||||
source := sel.(*resources.StorageResourceInstance).Source
|
||||
a.S3.Key = strings.ReplaceAll(strings.ReplaceAll(a.S3.Key, source+"/", ""), source, "")
|
||||
splits := strings.Split(a.S3.EndPoint, "/")
|
||||
if len(splits) > 1 {
|
||||
a.S3.Bucket = splits[0]
|
||||
a.S3.EndPoint = strings.Join(splits[1:], "/")
|
||||
} else {
|
||||
a.S3.Bucket = splits[0]
|
||||
}
|
||||
} */
|
||||
}
|
||||
|
||||
type InOut struct {
|
||||
Parameters []Parameter `yaml:"parameters"`
|
||||
Artifacts []Artifact `yaml:"artifacts,omitempty"`
|
||||
@@ -95,7 +269,7 @@ type Template struct {
|
||||
Resource ServiceResource `yaml:"resource,omitempty"`
|
||||
}
|
||||
|
||||
func (template *Template) CreateContainer(processing *resources.ProcessingResource, dag *Dag, templateName string) {
|
||||
func (template *Template) CreateContainer(processing *resources.ProcessingResource, dag *Dag) {
|
||||
instance := processing.GetSelectedInstance()
|
||||
if instance == nil {
|
||||
return
|
||||
@@ -116,7 +290,7 @@ func (template *Template) CreateContainer(processing *resources.ProcessingResour
|
||||
template.Outputs.Parameters = append(template.Inputs.Parameters, Parameter{Name: v.Name})
|
||||
}
|
||||
cmd := strings.ReplaceAll(inst.Access.Container.Command, container.Image, "")
|
||||
container.Args = append(container.Args, "echo "+templateName+" && ") // a casual echo to know where we are for logs purpose
|
||||
|
||||
for _, a := range strings.Split(cmd, " ") {
|
||||
container.Args = append(container.Args, template.ReplacePerEnv(a, inst.Env))
|
||||
}
|
||||
@@ -124,6 +298,7 @@ func (template *Template) CreateContainer(processing *resources.ProcessingResour
|
||||
container.Args = append(container.Args, template.ReplacePerEnv(a, inst.Env))
|
||||
}
|
||||
container.Args = []string{strings.Join(container.Args, " ")}
|
||||
|
||||
template.Container = container
|
||||
}
|
||||
|
||||
@@ -138,3 +313,46 @@ func (template *Template) ReplacePerEnv(arg string, envs []models.Param) string
|
||||
}
|
||||
return arg
|
||||
}
|
||||
|
||||
// Add the metadata that allow Admiralty to pick up an Argo Workflow that needs to be reparted
|
||||
// The value of "clustername" is the peerId, which must be replaced by the node name's for this specific execution
|
||||
func (t *Template) AddAdmiraltyAnnotations(peerID, namespace string) error {
|
||||
if t.Metadata.Annotations == nil {
|
||||
t.Metadata.Annotations = make(map[string]string)
|
||||
}
|
||||
|
||||
const key = "admiralty.io/multi-cluster-scheduler"
|
||||
|
||||
var annotation SchedulerAnnotation
|
||||
|
||||
// Parse existing annotation if it exists
|
||||
if val, ok := t.Metadata.Annotations[key]; ok && val != "" {
|
||||
if err := json.Unmarshal([]byte(val), &annotation); err != nil {
|
||||
return fmt.Errorf("failed to parse existing scheduler annotation: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Add new affinity
|
||||
annotation.Affinities = append(annotation.Affinities, affinity{
|
||||
Cluster: "target-" + peerID + "-" + namespace,
|
||||
Namespace: namespace,
|
||||
})
|
||||
|
||||
// Encode back to JSON
|
||||
bytes, err := json.Marshal(annotation)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to encode scheduler annotation: %w", err)
|
||||
}
|
||||
|
||||
t.Metadata.Annotations[key] = string(bytes)
|
||||
return nil
|
||||
}
|
||||
|
||||
type affinity struct {
|
||||
Cluster string `json:"cluster"`
|
||||
Namespace string `json:"namespace"`
|
||||
}
|
||||
|
||||
type SchedulerAnnotation struct {
|
||||
Affinities []affinity `json:"affinities"`
|
||||
}
|
||||
|
||||
92
models/utils.go
Normal file
92
models/utils.go
Normal file
@@ -0,0 +1,92 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
w "cloud.o-forge.io/core/oc-lib/models/workflow"
|
||||
)
|
||||
|
||||
type WorkflowsDependancies struct {
|
||||
FirstWfTasks map[string][]string
|
||||
RelatedWfTasks map[string][]string
|
||||
LastWfTasks map[string][]string
|
||||
}
|
||||
|
||||
func NewWorkflowDependancies() *WorkflowsDependancies {
|
||||
return &WorkflowsDependancies{
|
||||
FirstWfTasks: map[string][]string{},
|
||||
RelatedWfTasks: map[string][]string{},
|
||||
LastWfTasks: map[string][]string{},
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WorkflowsDependancies) BindFirstTasks(depsFunc func(v string) []w.Deps, dag *Dag) {
|
||||
for wfID, firstTasks := range w.FirstWfTasks {
|
||||
deps := depsFunc(wfID)
|
||||
if task := dag.GetTask(wfID); task != nil && len(deps) > 0 {
|
||||
task.Dependencies = append(task.Dependencies, firstTasks...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WorkflowsDependancies) BindRelatedTasks(dag *Dag) {
|
||||
for wfID, relatedWfTasks := range w.RelatedWfTasks {
|
||||
for _, dep := range relatedWfTasks {
|
||||
if task := dag.GetTask(dep); task != nil {
|
||||
index := -1
|
||||
if i, deps := task.GetDeps(wfID); deps != "" {
|
||||
index = i
|
||||
}
|
||||
if index != -1 {
|
||||
task.Dependencies = append(task.Dependencies[:index], task.Dependencies[index+1:]...)
|
||||
}
|
||||
if w.LastWfTasks[wfID] != nil {
|
||||
task.Dependencies = append(task.Dependencies, w.LastWfTasks[wfID]...)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type Workflow struct {
|
||||
ApiVersion string `yaml:"apiVersion"`
|
||||
Kind string `yaml:"kind"`
|
||||
Metadata struct {
|
||||
Name string `yaml:"name"`
|
||||
} `yaml:"metadata"`
|
||||
Spec Spec `yaml:"spec,omitempty"`
|
||||
}
|
||||
|
||||
func (b *Workflow) GetDag() *Dag {
|
||||
for _, t := range b.Spec.Templates {
|
||||
if t.Name == "dag" {
|
||||
return t.Dag
|
||||
}
|
||||
}
|
||||
b.Spec.Templates = append(b.Spec.Templates, Template{Name: "dag", Dag: &Dag{}})
|
||||
return b.Spec.Templates[len(b.Spec.Templates)-1].Dag
|
||||
}
|
||||
|
||||
type Spec struct {
|
||||
ServiceAccountName string `yaml:"serviceAccountName"`
|
||||
Entrypoint string `yaml:"entrypoint"`
|
||||
Arguments []Parameter `yaml:"arguments,omitempty"`
|
||||
Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"`
|
||||
Templates []Template `yaml:"templates"`
|
||||
Timeout int `yaml:"activeDeadlineSeconds,omitempty"`
|
||||
}
|
||||
|
||||
func GetArgoName(raw_name string, component_id string) (formatedName string) {
|
||||
formatedName = strings.ReplaceAll(raw_name, " ", "-")
|
||||
formatedName += "-" + component_id
|
||||
formatedName = strings.ToLower(formatedName)
|
||||
return
|
||||
}
|
||||
|
||||
func TransformDepsToArgo(deps []w.Deps) []string {
|
||||
argoDeps := []string{}
|
||||
for _, dep := range deps {
|
||||
argoDeps = append(argoDeps, GetArgoName(dep.Source, dep.Dest))
|
||||
}
|
||||
return argoDeps
|
||||
}
|
||||
@@ -1,5 +1,12 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||||
)
|
||||
|
||||
type VolumeClaimTemplate struct {
|
||||
Metadata struct {
|
||||
Name string `yaml:"name"`
|
||||
@@ -15,3 +22,22 @@ type VolumeSpec struct {
|
||||
} `yaml:"requests"`
|
||||
} `yaml:"resources"`
|
||||
}
|
||||
|
||||
type VolumeMount struct {
|
||||
Name string `yaml:"name"`
|
||||
MountPath string `yaml:"mountPath"`
|
||||
Storage *resources.StorageResource `yaml:"-"`
|
||||
}
|
||||
|
||||
func (v *VolumeMount) BindToArgo(workflow *Workflow) { // TODO : one think about remote volume but TG
|
||||
index := 0
|
||||
if v.Storage.SelectedInstanceIndex != nil && (*v.Storage.SelectedInstanceIndex) >= 0 {
|
||||
index = *v.Storage.SelectedInstanceIndex
|
||||
}
|
||||
storage := v.Storage.Instances[index]
|
||||
new_volume := VolumeClaimTemplate{}
|
||||
new_volume.Metadata.Name = strings.ReplaceAll(strings.ToLower(v.Name), " ", "-")
|
||||
new_volume.Spec.AccessModes = []string{"ReadWriteOnce"}
|
||||
new_volume.Spec.Resources.Requests.Storage = fmt.Sprintf("%v", storage.SizeGB) + storage.SizeType.ToArgo()
|
||||
workflow.Spec.Volumes = append(workflow.Spec.Volumes, new_volume)
|
||||
}
|
||||
|
||||
BIN
oc-monitord
BIN
oc-monitord
Binary file not shown.
Binary file not shown.
|
Before Width: | Height: | Size: 665 B |
Binary file not shown.
|
Before Width: | Height: | Size: 628 B |
@@ -1,60 +0,0 @@
|
||||
<!-- HTML for static distribution bundle build -->
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>Swagger UI</title>
|
||||
<link rel="stylesheet" type="text/css" href="./swagger-ui.css" />
|
||||
<link rel="icon" type="image/png" href="./favicon-32x32.png" sizes="32x32" />
|
||||
<link rel="icon" type="image/png" href="./favicon-16x16.png" sizes="16x16" />
|
||||
<style>
|
||||
html
|
||||
{
|
||||
box-sizing: border-box;
|
||||
overflow: -moz-scrollbars-vertical;
|
||||
overflow-y: scroll;
|
||||
}
|
||||
|
||||
*,
|
||||
*:before,
|
||||
*:after
|
||||
{
|
||||
box-sizing: inherit;
|
||||
}
|
||||
|
||||
body
|
||||
{
|
||||
margin:0;
|
||||
background: #fafafa;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
|
||||
<body>
|
||||
<div id="swagger-ui"></div>
|
||||
|
||||
<script src="./swagger-ui-bundle.js" charset="UTF-8"> </script>
|
||||
<script src="./swagger-ui-standalone-preset.js" charset="UTF-8"> </script>
|
||||
<script>
|
||||
window.onload = function() {
|
||||
// Begin Swagger UI call region
|
||||
const ui = SwaggerUIBundle({
|
||||
url: "https://petstore.swagger.io/v2/swagger.json",
|
||||
dom_id: '#swagger-ui',
|
||||
deepLinking: true,
|
||||
presets: [
|
||||
SwaggerUIBundle.presets.apis,
|
||||
SwaggerUIStandalonePreset
|
||||
],
|
||||
plugins: [
|
||||
SwaggerUIBundle.plugins.DownloadUrl
|
||||
],
|
||||
layout: "StandaloneLayout"
|
||||
});
|
||||
// End Swagger UI call region
|
||||
|
||||
window.ui = ui;
|
||||
};
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
@@ -1,79 +0,0 @@
|
||||
<!doctype html>
|
||||
<html lang="en-US">
|
||||
<head>
|
||||
<title>Swagger UI: OAuth2 Redirect</title>
|
||||
</head>
|
||||
<body>
|
||||
<script>
|
||||
'use strict';
|
||||
function run () {
|
||||
var oauth2 = window.opener.swaggerUIRedirectOauth2;
|
||||
var sentState = oauth2.state;
|
||||
var redirectUrl = oauth2.redirectUrl;
|
||||
var isValid, qp, arr;
|
||||
|
||||
if (/code|token|error/.test(window.location.hash)) {
|
||||
qp = window.location.hash.substring(1);
|
||||
} else {
|
||||
qp = location.search.substring(1);
|
||||
}
|
||||
|
||||
arr = qp.split("&");
|
||||
arr.forEach(function (v,i,_arr) { _arr[i] = '"' + v.replace('=', '":"') + '"';});
|
||||
qp = qp ? JSON.parse('{' + arr.join() + '}',
|
||||
function (key, value) {
|
||||
return key === "" ? value : decodeURIComponent(value);
|
||||
}
|
||||
) : {};
|
||||
|
||||
isValid = qp.state === sentState;
|
||||
|
||||
if ((
|
||||
oauth2.auth.schema.get("flow") === "accessCode" ||
|
||||
oauth2.auth.schema.get("flow") === "authorizationCode" ||
|
||||
oauth2.auth.schema.get("flow") === "authorization_code"
|
||||
) && !oauth2.auth.code) {
|
||||
if (!isValid) {
|
||||
oauth2.errCb({
|
||||
authId: oauth2.auth.name,
|
||||
source: "auth",
|
||||
level: "warning",
|
||||
message: "Authorization may be unsafe, passed state was changed in server Passed state wasn't returned from auth server"
|
||||
});
|
||||
}
|
||||
|
||||
if (qp.code) {
|
||||
delete oauth2.state;
|
||||
oauth2.auth.code = qp.code;
|
||||
oauth2.callback({auth: oauth2.auth, redirectUrl: redirectUrl});
|
||||
} else {
|
||||
let oauthErrorMsg;
|
||||
if (qp.error) {
|
||||
oauthErrorMsg = "["+qp.error+"]: " +
|
||||
(qp.error_description ? qp.error_description+ ". " : "no accessCode received from the server. ") +
|
||||
(qp.error_uri ? "More info: "+qp.error_uri : "");
|
||||
}
|
||||
|
||||
oauth2.errCb({
|
||||
authId: oauth2.auth.name,
|
||||
source: "auth",
|
||||
level: "error",
|
||||
message: oauthErrorMsg || "[Authorization failed]: no accessCode received from the server"
|
||||
});
|
||||
}
|
||||
} else {
|
||||
oauth2.callback({auth: oauth2.auth, token: qp, isValid: isValid, redirectUrl: redirectUrl});
|
||||
}
|
||||
window.close();
|
||||
}
|
||||
|
||||
if (document.readyState !== 'loading') {
|
||||
run();
|
||||
} else {
|
||||
document.addEventListener('DOMContentLoaded', function () {
|
||||
run();
|
||||
});
|
||||
}
|
||||
</script>
|
||||
</body>
|
||||
</html>
|
||||
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -3,17 +3,15 @@ package tools
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"oc-monitord/models"
|
||||
"sync"
|
||||
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
)
|
||||
|
||||
type Tool interface {
|
||||
CreateArgoWorkflow(path string, ns string) (string, error)
|
||||
CreateAccessSecret(ns string, login string, password string) (string, error)
|
||||
LogWorkflow(execID string, namespace string, workflowName string, argoFilePath string, stepMax int, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch,
|
||||
argoLogs *models.ArgoLogs, seen []string,
|
||||
logFunc func(argoFilePath string, stepMax int, pipe io.ReadCloser, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch,
|
||||
argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup)) error
|
||||
GetArgoWatch(executionId string, wfName string) (watch.Interface, error)
|
||||
GetPodLogger(ns string, wfName string, podName string) (io.ReadCloser, error)
|
||||
}
|
||||
|
||||
var _service = map[string]func() (Tool, error){
|
||||
|
||||
@@ -7,21 +7,18 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"oc-monitord/conf"
|
||||
"oc-monitord/models"
|
||||
"oc-monitord/utils"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
||||
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
|
||||
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
|
||||
"github.com/google/uuid"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
)
|
||||
@@ -52,76 +49,13 @@ func NewKubernetesTool() (Tool, error) {
|
||||
if err != nil {
|
||||
return nil, errors.New("Error creating Kubernetes versionned client: " + err.Error())
|
||||
}
|
||||
|
||||
return &KubernetesTools{
|
||||
Set: clientset,
|
||||
VersionedSet: clientset2,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (k *KubernetesTools) LogWorkflow(execID string, namespace string, workflowName string, argoFilePath string, stepMax int, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs,
|
||||
seen []string, logFunc func(argoFilePath string, stepMax int, pipe io.ReadCloser, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup)) error {
|
||||
exec := utils.GetExecution(execID)
|
||||
if exec == nil {
|
||||
return errors.New("Could not retrieve workflow ID from execution ID " + execID)
|
||||
}
|
||||
if exec.State != enum.SCHEDULED {
|
||||
return nil
|
||||
}
|
||||
k.logWorkflow(namespace, workflowName, argoFilePath, stepMax, current_watch, previous_watch, argoLogs, seen, logFunc)
|
||||
return k.LogWorkflow(execID, namespace, workflowName, argoFilePath, stepMax, current_watch, previous_watch, argoLogs, seen, logFunc)
|
||||
}
|
||||
|
||||
func (k *KubernetesTools) logWorkflow(namespace string, workflowName string, argoFilePath string, stepMax int, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs,
|
||||
seen []string,
|
||||
logFunc func(argoFilePath string, stepMax int, pipe io.ReadCloser, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup)) error {
|
||||
// List pods related to the Argo workflow
|
||||
labelSelector := fmt.Sprintf("workflows.argoproj.io/workflow=%s", workflowName)
|
||||
for retries := 0; retries < 10; retries++ { // Retry for up to ~20 seconds
|
||||
// List workflow pods
|
||||
wfPods, err := k.Set.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{
|
||||
LabelSelector: labelSelector,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// If we found pods, stream logs
|
||||
if len(wfPods.Items) > 0 {
|
||||
var wg sync.WaitGroup
|
||||
// Stream logs from all matching pods
|
||||
for _, pod := range wfPods.Items {
|
||||
for _, container := range pod.Spec.Containers {
|
||||
wg.Add(1)
|
||||
go k.streamLogs(namespace, pod.Name, container.Name, argoFilePath, stepMax, &wg, current_watch, previous_watch, argoLogs, seen, logFunc)
|
||||
}
|
||||
}
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
time.Sleep(2 * time.Second) // Wait before retrying
|
||||
}
|
||||
return errors.New("no pods found for the workflow")
|
||||
}
|
||||
|
||||
// Function to stream logs
|
||||
func (k *KubernetesTools) streamLogs(namespace string, podName string, containerName string,
|
||||
argoFilePath string, stepMax int, wg *sync.WaitGroup, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, seen []string,
|
||||
logFunc func(argo_file_path string, stepMax int, pipe io.ReadCloser, current_watch *models.ArgoWatch, previous_watch *models.ArgoWatch, argoLogs *models.ArgoLogs, seen []string, wg *sync.WaitGroup)) {
|
||||
req := k.Set.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
|
||||
Container: containerName, // Main container
|
||||
Follow: true, // Equivalent to -f flag in kubectl logs
|
||||
})
|
||||
defer wg.Done()
|
||||
// Open stream
|
||||
stream, err := req.Stream(context.Background())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer stream.Close()
|
||||
var internalWg sync.WaitGroup
|
||||
logFunc(argoFilePath, stepMax, stream, current_watch, previous_watch, argoLogs, seen, &internalWg)
|
||||
internalWg.Wait()
|
||||
}
|
||||
|
||||
func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, error) {
|
||||
// Read workflow YAML file
|
||||
workflowYAML, err := os.ReadFile(path)
|
||||
@@ -145,11 +79,12 @@ func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, er
|
||||
}
|
||||
|
||||
// Create the workflow in the "argo" namespace
|
||||
createdWf, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(ns).Create(context.Background(), workflow, metav1.CreateOptions{})
|
||||
createdWf, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(ns).Create(context.TODO(), workflow, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return "", errors.New("failed to create workflow: " + err.Error())
|
||||
}
|
||||
fmt.Printf("workflow %s created in namespace %s\n", createdWf.Name, "argo")
|
||||
l := utils.GetLogger()
|
||||
l.Info().Msg(fmt.Sprintf("workflow %s created in namespace %s\n", createdWf.Name, ns))
|
||||
return createdWf.Name, nil
|
||||
}
|
||||
|
||||
@@ -173,9 +108,80 @@ func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password s
|
||||
Data: secretData,
|
||||
}
|
||||
// Create the Secret in Kubernetes
|
||||
_, err := k.Set.CoreV1().Secrets(namespace).Create(context.Background(), secret, metav1.CreateOptions{})
|
||||
_, err := k.Set.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return "", errors.New("Error creating secret: " + err.Error())
|
||||
}
|
||||
return name, nil
|
||||
}
|
||||
|
||||
func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch.Interface, error){
|
||||
options := metav1.ListOptions{FieldSelector: "metadata.name=oc-monitor-"+wfName}
|
||||
|
||||
watcher, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(executionId).Watch(context.Background(), options)
|
||||
if err != nil {
|
||||
return nil, errors.New("Error executing 'argo watch " + wfName + " -n " + executionId + " with ArgoprojV1alpha1 client")
|
||||
}
|
||||
|
||||
return watcher, nil
|
||||
|
||||
}
|
||||
|
||||
func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) {
|
||||
var targetPod v1.Pod
|
||||
|
||||
|
||||
pods, err := k.Set.CoreV1().Pods(ns).List(context.Background(), metav1.ListOptions{
|
||||
LabelSelector: "workflows.argoproj.io/workflow="+wfName,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to list pods: " + err.Error())
|
||||
}
|
||||
if len(pods.Items) == 0 {
|
||||
|
||||
return nil, fmt.Errorf("no pods found with label workflows.argoproj.io/workflow="+ wfName + " no pods found with label workflows.argoproj.io/node-name=" + nodeName + " in namespace " + ns)
|
||||
}
|
||||
|
||||
for _, pod := range pods.Items {
|
||||
if pod.Annotations["workflows.argoproj.io/node-name"] == nodeName {
|
||||
targetPod = pod
|
||||
}
|
||||
}
|
||||
|
||||
// k8s API throws an error if we try getting logs while the container are not initialized, so we repeat status check there
|
||||
k.testPodReady(targetPod, ns)
|
||||
|
||||
// When using kubec logs for a pod we see it contacts /api/v1/namespaces/NAMESPACE/pods/oc-monitor-PODNAME/log?container=main so we add this container: main to the call
|
||||
req, err := k.Set.CoreV1().Pods(ns).GetLogs(targetPod.Name, &v1.PodLogOptions{Follow: true, Container: "main"}). Stream(context.Background())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf(" Error when trying to get logs for " + targetPod.Name + " : " + err.Error())
|
||||
}
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
func (k *KubernetesTools) testPodReady(pod v1.Pod, ns string) {
|
||||
for {
|
||||
pod, err := k.Set.CoreV1().Pods(ns).Get(context.Background(), pod.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
wfl := utils.GetWFLogger("")
|
||||
wfl.Error().Msg("Error fetching pod: " + err.Error() + "\n")
|
||||
break
|
||||
}
|
||||
|
||||
var initialized bool
|
||||
for _, cond := range pod.Status.Conditions {
|
||||
// It seems that for remote pods the pod gets the Succeeded status before it has time to display the it is ready to run in .status.conditions,so we added the OR condition
|
||||
if (cond.Type == v1.PodReady && cond.Status == v1.ConditionTrue) || pod.Status.Phase == v1.PodSucceeded {
|
||||
initialized = true
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if initialized {
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(2 * time.Second) // avoid hammering the API
|
||||
}
|
||||
}
|
||||
@@ -2,17 +2,46 @@ package utils
|
||||
|
||||
import (
|
||||
"oc-monitord/conf"
|
||||
"sync"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/logs"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
var (
|
||||
logger zerolog.Logger
|
||||
wf_logger zerolog.Logger
|
||||
pods_logger zerolog.Logger
|
||||
onceLogger sync.Once
|
||||
onceWF sync.Once
|
||||
)
|
||||
|
||||
func GetExecution(exec_id string) *workflow_execution.WorkflowExecution {
|
||||
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", conf.GetConfig().PeerID, []string{}, nil).LoadOne(exec_id)
|
||||
if res.Code != 200 {
|
||||
logger := oclib.GetLogger()
|
||||
logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id)
|
||||
logger.Error().Msg("Error retrieving execution " + exec_id)
|
||||
logger.Error().Msg(res.Err)
|
||||
return nil
|
||||
}
|
||||
return res.ToWorkflowExecution()
|
||||
}
|
||||
|
||||
func GetLogger() zerolog.Logger {
|
||||
onceLogger.Do(func(){
|
||||
logger = logs.CreateLogger("oc-monitord")
|
||||
})
|
||||
return logger
|
||||
}
|
||||
|
||||
func GetWFLogger(workflowName string) zerolog.Logger {
|
||||
onceWF.Do(func(){
|
||||
wf_logger = logger.With().
|
||||
Str("argo_name", workflowName).
|
||||
Str("workflow_id", conf.GetConfig().
|
||||
WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger()
|
||||
})
|
||||
return wf_logger
|
||||
}
|
||||
146
workflow_builder/admiralty_setter.go
Normal file
146
workflow_builder/admiralty_setter.go
Normal file
@@ -0,0 +1,146 @@
|
||||
package workflow_builder
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"oc-monitord/utils"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/logs"
|
||||
"cloud.o-forge.io/core/oc-lib/models/peer"
|
||||
tools "cloud.o-forge.io/core/oc-lib/tools"
|
||||
)
|
||||
|
||||
type AdmiraltySetter struct {
|
||||
Id string // ID to identify the execution, correspond to workflow_executions id
|
||||
NodeName string // Allows to retrieve the name of the node used for this execution on each peer {"peerId": "nodeName"}
|
||||
}
|
||||
|
||||
func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string, remotePeerID string) error {
|
||||
|
||||
logger := logs.GetLogger()
|
||||
|
||||
data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", localPeerID, nil, nil).LoadOne(remotePeerID)
|
||||
if data.Code != 200 {
|
||||
logger.Error().Msg("Error while trying to instantiate remote peer " + remotePeerID)
|
||||
return fmt.Errorf(data.Err)
|
||||
}
|
||||
remotePeer := data.ToPeer()
|
||||
|
||||
data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", localPeerID, nil, nil).LoadOne(localPeerID)
|
||||
if data.Code != 200 {
|
||||
logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID)
|
||||
return fmt.Errorf(data.Err)
|
||||
}
|
||||
localPeer := data.ToPeer()
|
||||
|
||||
caller := tools.NewHTTPCaller(
|
||||
map[tools.DataType]map[tools.METHOD]string{
|
||||
tools.ADMIRALTY_SOURCE: {
|
||||
tools.POST: "/:id",
|
||||
},
|
||||
tools.ADMIRALTY_KUBECONFIG: {
|
||||
tools.GET: "/:id",
|
||||
},
|
||||
tools.ADMIRALTY_SECRET: {
|
||||
tools.POST: "/:id/" + remotePeerID,
|
||||
},
|
||||
tools.ADMIRALTY_TARGET: {
|
||||
tools.POST: "/:id/" + remotePeerID,
|
||||
},
|
||||
tools.ADMIRALTY_NODES: {
|
||||
tools.GET: "/:id/" + remotePeerID,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
logger.Info().Msg("\n\n Creating the Admiralty Source on " + remotePeerID + " ns-" + s.Id)
|
||||
_ = s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict}, caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true)
|
||||
logger.Info().Msg("\n\n Retrieving kubeconfig with the secret on " + remotePeerID + " ns-" + s.Id)
|
||||
kubeconfig := s.getKubeconfig(remotePeer, caller)
|
||||
logger.Info().Msg("\n\n Creating a secret from the kubeconfig " + localPeerID + " ns-" + s.Id)
|
||||
_ = s.callRemoteExecution(localPeer, []int{http.StatusCreated}, caller, s.Id, tools.ADMIRALTY_SECRET, tools.POST, kubeconfig, true)
|
||||
logger.Info().Msg("\n\n Creating the Admiralty Target on " + localPeerID + " in namespace " + s.Id)
|
||||
_ = s.callRemoteExecution(localPeer, []int{http.StatusCreated, http.StatusConflict}, caller, s.Id, tools.ADMIRALTY_TARGET, tools.POST, nil, true)
|
||||
logger.Info().Msg("\n\n Checking for the creation of the admiralty node on " + localPeerID + " ns-" + s.Id)
|
||||
s.checkNodeStatus(localPeer, caller)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCaller) map[string]string {
|
||||
var kubedata map[string]string
|
||||
_ = s.callRemoteExecution(peer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_KUBECONFIG, tools.GET, nil, true)
|
||||
if caller.LastResults["body"] == nil || len(caller.LastResults["body"].([]byte)) == 0 {
|
||||
l := utils.GetLogger()
|
||||
l.Error().Msg("Something went wrong when retrieving data from Get call for kubeconfig")
|
||||
panic(0)
|
||||
}
|
||||
err := json.Unmarshal(caller.LastResults["body"].([]byte), &kubedata)
|
||||
if err != nil {
|
||||
l := utils.GetLogger()
|
||||
l.Error().Msg("Something went wrong when unmarshalling data from Get call for kubeconfig")
|
||||
panic(0)
|
||||
}
|
||||
|
||||
return kubedata
|
||||
}
|
||||
|
||||
func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int, caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) map[string]interface{} {
|
||||
l := utils.GetLogger()
|
||||
resp, err := peer.LaunchPeerExecution(peer.UUID, dataID, dt, method, body, caller)
|
||||
if err != nil {
|
||||
l.Error().Msg("Error when executing on peer at" + peer.Url)
|
||||
l.Error().Msg(err.Error())
|
||||
panic(0)
|
||||
}
|
||||
|
||||
if !slices.Contains(expectedCode, caller.LastResults["code"].(int)) {
|
||||
l.Error().Msg(fmt.Sprint("Didn't receive the expected code :", caller.LastResults["code"], "when expecting", expectedCode))
|
||||
if _, ok := caller.LastResults["body"]; ok {
|
||||
l.Info().Msg(string(caller.LastResults["body"].([]byte)))
|
||||
}
|
||||
if panicCode {
|
||||
panic(0)
|
||||
}
|
||||
}
|
||||
|
||||
return resp
|
||||
}
|
||||
|
||||
func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller) {
|
||||
var data map[string]interface{}
|
||||
if resp, ok := caller.LastResults["body"]; ok {
|
||||
json.Unmarshal(resp.([]byte), &data)
|
||||
}
|
||||
|
||||
if node, ok := data["node"]; ok {
|
||||
metadata := node.(map[string]interface{})["metadata"]
|
||||
name := metadata.(map[string]interface{})["name"].(string)
|
||||
s.NodeName = name
|
||||
} else {
|
||||
l := utils.GetLogger()
|
||||
l.Error().Msg("Could not retrieve data about the recently created node")
|
||||
panic(0)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller) {
|
||||
for i := range 5 {
|
||||
time.Sleep(10 * time.Second) // let some time for kube to generate the node
|
||||
_ = s.callRemoteExecution(localPeer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_NODES, tools.GET, nil, false)
|
||||
if caller.LastResults["code"] == 200 {
|
||||
s.storeNodeName(caller)
|
||||
return
|
||||
}
|
||||
if i == 5 {
|
||||
logger.Error().Msg("Node on " + localPeer.Name + " was never found, panicking !")
|
||||
panic(0)
|
||||
}
|
||||
logger.Info().Msg("Could not verify that node is up. Retrying...")
|
||||
}
|
||||
|
||||
}
|
||||
@@ -5,16 +5,16 @@
|
||||
package workflow_builder
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"oc-monitord/conf"
|
||||
"oc-monitord/models"
|
||||
. "oc-monitord/models"
|
||||
tools2 "oc-monitord/tools"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
||||
"cloud.o-forge.io/core/oc-lib/logs"
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||||
w "cloud.o-forge.io/core/oc-lib/models/workflow"
|
||||
"github.com/nwtgck/go-fakelish"
|
||||
@@ -26,84 +26,39 @@ var logger zerolog.Logger
|
||||
|
||||
type ArgoBuilder struct {
|
||||
OriginWorkflow *w.Workflow
|
||||
Workflow Workflow
|
||||
Workflow *models.Workflow
|
||||
Services []*Service
|
||||
Timeout int
|
||||
}
|
||||
|
||||
type Workflow struct {
|
||||
ApiVersion string `yaml:"apiVersion"`
|
||||
Kind string `yaml:"kind"`
|
||||
Metadata struct {
|
||||
Name string `yaml:"name"`
|
||||
} `yaml:"metadata"`
|
||||
Spec Spec `yaml:"spec,omitempty"`
|
||||
}
|
||||
|
||||
func (b *Workflow) getDag() *Dag {
|
||||
for _, t := range b.Spec.Templates {
|
||||
if t.Name == "dag" {
|
||||
return t.Dag
|
||||
}
|
||||
}
|
||||
b.Spec.Templates = append(b.Spec.Templates, Template{Name: "dag", Dag: &Dag{}})
|
||||
return b.Spec.Templates[len(b.Spec.Templates)-1].Dag
|
||||
}
|
||||
|
||||
type Spec struct {
|
||||
Entrypoint string `yaml:"entrypoint"`
|
||||
Arguments []Parameter `yaml:"arguments,omitempty"`
|
||||
Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"`
|
||||
Templates []Template `yaml:"templates"`
|
||||
Timeout int `yaml:"activeDeadlineSeconds,omitempty"`
|
||||
RemotePeers []string
|
||||
}
|
||||
|
||||
// TODO: found on a processing instance linked to storage
|
||||
// add s3, gcs, azure, etc if needed on a link between processing and storage
|
||||
func (b *ArgoBuilder) CreateDAG(namespace string, write bool) (string, int, []string, []string, error) {
|
||||
fmt.Println("Creating DAG", b.OriginWorkflow.Graph.Items)
|
||||
func (b *ArgoBuilder) CreateDAG(namespace string, write bool) (int, []string, []string, error) {
|
||||
logger = logs.GetLogger()
|
||||
logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items))
|
||||
// handle services by checking if there is only one processing with hostname and port
|
||||
firstItems, lastItems, volumes := b.createTemplates(namespace)
|
||||
b.createVolumes(volumes)
|
||||
|
||||
if b.Timeout > 0 {
|
||||
b.Workflow.Spec.Timeout = b.Timeout
|
||||
}
|
||||
b.Workflow.Spec.ServiceAccountName = "sa-" + namespace
|
||||
b.Workflow.Spec.Entrypoint = "dag"
|
||||
b.Workflow.ApiVersion = "argoproj.io/v1alpha1"
|
||||
b.Workflow.Kind = "Workflow"
|
||||
if !write {
|
||||
return "", len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
|
||||
}
|
||||
random_name := fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8)
|
||||
b.Workflow.Metadata.Name = "oc-monitor-" + random_name
|
||||
logger = oclib.GetLogger()
|
||||
yamlified, err := yaml.Marshal(b.Workflow)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not transform object to yaml file")
|
||||
return "", 0, firstItems, lastItems, err
|
||||
}
|
||||
// Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss
|
||||
current_timestamp := time.Now().Format("02_01_2006_150405")
|
||||
file_name := random_name + "_" + current_timestamp + ".yml"
|
||||
workflows_dir := "./argo_workflows/"
|
||||
err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not write the yaml file")
|
||||
return "", 0, firstItems, lastItems, err
|
||||
}
|
||||
return workflows_dir + file_name, len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
|
||||
return len(b.Workflow.GetDag().Tasks), firstItems, lastItems, nil
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []VolumeMount) {
|
||||
volumes := []VolumeMount{}
|
||||
func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []models.VolumeMount) {
|
||||
volumes := []models.VolumeMount{}
|
||||
firstItems := []string{}
|
||||
lastItems := []string{}
|
||||
items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing)
|
||||
fmt.Println("Creating templates", len(items))
|
||||
for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) {
|
||||
logger.Info().Msg(fmt.Sprint("Creating templates", len(items)))
|
||||
for _, item := range items {
|
||||
instance := item.Processing.GetSelectedInstance()
|
||||
fmt.Println("Creating template for", item.Processing.GetName(), instance)
|
||||
logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance))
|
||||
if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil {
|
||||
logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName())
|
||||
return firstItems, lastItems, volumes
|
||||
@@ -111,260 +66,185 @@ func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []V
|
||||
volumes, firstItems, lastItems = b.createArgoTemplates(namespace,
|
||||
item.ID, item.Processing, volumes, firstItems, lastItems)
|
||||
}
|
||||
firstWfTasks := map[string][]string{}
|
||||
latestWfTasks := map[string][]string{}
|
||||
relatedWfTasks := map[string][]string{}
|
||||
for _, wf := range b.OriginWorkflow.Workflows {
|
||||
realWorkflow, code, err := w.NewAccessor(nil).LoadOne(wf)
|
||||
if code != 200 {
|
||||
logger.Error().Msg("Error loading the workflow : " + err.Error())
|
||||
continue
|
||||
}
|
||||
subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Timeout: b.Timeout}
|
||||
_, _, fi, li, err := subBuilder.CreateDAG(namespace, false)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Error creating the subworkflow : " + err.Error())
|
||||
continue
|
||||
}
|
||||
firstWfTasks[wf] = fi
|
||||
if ok, depsOfIds := subBuilder.isArgoDependancy(wf); ok { // IS BEFORE
|
||||
latestWfTasks[wf] = li
|
||||
relatedWfTasks[wf] = depsOfIds
|
||||
}
|
||||
subDag := subBuilder.Workflow.getDag()
|
||||
d := b.Workflow.getDag()
|
||||
d.Tasks = append(d.Tasks, subDag.Tasks...) // add the tasks of the subworkflow to the main workflow
|
||||
b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, subBuilder.Workflow.Spec.Templates...)
|
||||
b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, subBuilder.Workflow.Spec.Volumes...)
|
||||
b.Workflow.Spec.Arguments = append(b.Workflow.Spec.Arguments, subBuilder.Workflow.Spec.Arguments...)
|
||||
b.Services = append(b.Services, subBuilder.Services...)
|
||||
}
|
||||
for wfID, depsOfIds := range relatedWfTasks {
|
||||
for _, dep := range depsOfIds {
|
||||
for _, task := range b.Workflow.getDag().Tasks {
|
||||
if strings.Contains(task.Name, dep) {
|
||||
index := -1
|
||||
for i, depp := range task.Dependencies {
|
||||
if strings.Contains(depp, wfID) {
|
||||
index = i
|
||||
break
|
||||
}
|
||||
}
|
||||
if index != -1 {
|
||||
task.Dependencies = append(task.Dependencies[:index], task.Dependencies[index+1:]...)
|
||||
}
|
||||
task.Dependencies = append(task.Dependencies, latestWfTasks[wfID]...)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for wfID, fi := range firstWfTasks {
|
||||
deps := b.getArgoDependencies(wfID)
|
||||
if len(deps) > 0 {
|
||||
for _, dep := range fi {
|
||||
for _, task := range b.Workflow.getDag().Tasks {
|
||||
if strings.Contains(task.Name, dep) {
|
||||
task.Dependencies = append(task.Dependencies, deps...)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
wfDeps := models.NewWorkflowDependancies()
|
||||
for _, workflowID := range b.OriginWorkflow.Workflows {
|
||||
b.createWorkflowArgoTemplate(workflowID, namespace, wfDeps)
|
||||
}
|
||||
wfDeps.BindRelatedTasks(b.Workflow.GetDag())
|
||||
wfDeps.BindFirstTasks(b.OriginWorkflow.GetDependencies, b.Workflow.GetDag())
|
||||
|
||||
if b.Services != nil {
|
||||
dag := b.Workflow.getDag()
|
||||
dag := b.Workflow.GetDag()
|
||||
dag.Tasks = append(dag.Tasks, Task{Name: "workflow-service-pod", Template: "workflow-service-pod"})
|
||||
b.addServiceToArgo()
|
||||
}
|
||||
return firstItems, lastItems, volumes
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) createArgoTemplates(namespace string,
|
||||
func (b *ArgoBuilder) createWorkflowArgoTemplate(
|
||||
workflowID string,
|
||||
namespace string,
|
||||
wfDeps *models.WorkflowsDependancies,
|
||||
) {
|
||||
realWorkflow, code, err := w.NewAccessor(nil).LoadOne(workflowID)
|
||||
if code != 200 {
|
||||
logger.Error().Msg("Error loading the workflow : " + err.Error())
|
||||
return
|
||||
}
|
||||
subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Workflow: &models.Workflow{}, Timeout: b.Timeout}
|
||||
_, fi, li, err := subBuilder.CreateDAG(namespace, false)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Error creating the subworkflow : " + err.Error())
|
||||
return
|
||||
}
|
||||
wfDeps.FirstWfTasks[workflowID] = fi
|
||||
if depsOfIds := subBuilder.OriginWorkflow.IsDependancy(workflowID); len(depsOfIds) > 0 { // IS BEFORE
|
||||
wfDeps.LastWfTasks[workflowID] = li
|
||||
wfDeps.RelatedWfTasks[workflowID] = models.TransformDepsToArgo(depsOfIds)
|
||||
}
|
||||
subDag := subBuilder.Workflow.GetDag()
|
||||
d := b.Workflow.GetDag()
|
||||
d.Tasks = append(d.Tasks, subDag.Tasks...) // add the tasks of the subworkflow to the main workflow
|
||||
b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, subBuilder.Workflow.Spec.Templates...)
|
||||
b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, subBuilder.Workflow.Spec.Volumes...)
|
||||
b.Workflow.Spec.Arguments = append(b.Workflow.Spec.Arguments, subBuilder.Workflow.Spec.Arguments...)
|
||||
b.Services = append(b.Services, subBuilder.Services...)
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) createArgoTemplates(
|
||||
namespace string,
|
||||
id string,
|
||||
processing *resources.ProcessingResource,
|
||||
volumes []VolumeMount,
|
||||
volumes []models.VolumeMount,
|
||||
firstItems []string,
|
||||
lastItems []string) ([]VolumeMount, []string, []string) {
|
||||
_, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems)
|
||||
template := &Template{Name: getArgoName(processing.GetName(), id)}
|
||||
fmt.Println("Creating template for", template.Name)
|
||||
template.CreateContainer(processing, b.Workflow.getDag(), template.Name)
|
||||
lastItems []string,
|
||||
) ([]models.VolumeMount, []string, []string) {
|
||||
_, firstItems, lastItems = NewTask(processing.Name, id).BindToArgo(b.Workflow.GetDag(), id, b.OriginWorkflow, processing, firstItems, lastItems)
|
||||
template := &Template{Name: models.GetArgoName(processing.GetName(), id)}
|
||||
logger.Info().Msg(fmt.Sprint("Creating template for", template.Name))
|
||||
|
||||
template.CreateContainer(processing, b.Workflow.GetDag())
|
||||
if err := b.RepartiteProcess(*processing, id, template, namespace); err != nil {
|
||||
logger.Error().Msg(fmt.Sprint("problem to sets up repartition expected %v", err.Error()))
|
||||
return volumes, firstItems, lastItems
|
||||
}
|
||||
// get datacenter from the processing
|
||||
if processing.IsService {
|
||||
b.CreateService(id, processing)
|
||||
template.Metadata.Labels = make(map[string]string)
|
||||
template.Metadata.Labels["app"] = "oc-service-" + processing.GetName() // Construct the template for the k8s service and add a link in graph between k8s service and processing
|
||||
}
|
||||
related := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.Graph.IsStorage)
|
||||
for _, r := range related {
|
||||
storage := r.Node.(*resources.StorageResource)
|
||||
for _, linkToStorage := range r.Links {
|
||||
for _, rw := range linkToStorage.StorageLinkInfos {
|
||||
art := Artifact{Path: template.ReplacePerEnv(rw.Source, linkToStorage.Env)}
|
||||
if rw.Write {
|
||||
art.Name = storage.GetName() + "-" + rw.Destination + "-input-write"
|
||||
} else {
|
||||
art.Name = storage.GetName() + "-" + rw.Destination + "-input-read"
|
||||
}
|
||||
if storage.StorageType == enum.S3 {
|
||||
art.S3 = &Key{
|
||||
Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env),
|
||||
Insecure: true, // temporary
|
||||
}
|
||||
sel := storage.GetSelectedInstance()
|
||||
if sel != nil {
|
||||
if sel.(*resources.StorageResourceInstance).Credentials != nil {
|
||||
tool, err := tools2.NewService(conf.GetConfig().Mode)
|
||||
if err != nil || tool == nil {
|
||||
logger.Error().Msg("Could not create the access secret")
|
||||
} else {
|
||||
id, err := tool.CreateAccessSecret(namespace,
|
||||
sel.(*resources.StorageResourceInstance).Credentials.Login,
|
||||
sel.(*resources.StorageResourceInstance).Credentials.Pass)
|
||||
if err == nil {
|
||||
art.S3.AccessKeySecret = &Secret{
|
||||
Name: id,
|
||||
Key: "access-key",
|
||||
}
|
||||
art.S3.SecretKeySecret = &Secret{
|
||||
Name: id,
|
||||
Key: "secret-key",
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source+"/", "")
|
||||
art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source, "")
|
||||
splits := strings.Split(art.S3.EndPoint, "/")
|
||||
if len(splits) > 1 {
|
||||
art.S3.Bucket = splits[0]
|
||||
art.S3.EndPoint = strings.Join(splits[1:], "/")
|
||||
} else {
|
||||
art.S3.Bucket = splits[0]
|
||||
}
|
||||
}
|
||||
}
|
||||
if rw.Write {
|
||||
template.Outputs.Artifacts = append(template.Inputs.Artifacts, art)
|
||||
} else {
|
||||
template.Inputs.Artifacts = append(template.Outputs.Artifacts, art)
|
||||
}
|
||||
}
|
||||
}
|
||||
index := 0
|
||||
if storage.SelectedInstanceIndex != nil && (*storage.SelectedInstanceIndex) >= 0 {
|
||||
index = *storage.SelectedInstanceIndex
|
||||
}
|
||||
s := storage.Instances[index]
|
||||
if s.Local {
|
||||
volumes = template.Container.AddVolumeMount(VolumeMount{
|
||||
Name: strings.ReplaceAll(strings.ToLower(storage.GetName()), " ", "-"),
|
||||
MountPath: s.Source,
|
||||
Storage: storage,
|
||||
}, volumes)
|
||||
}
|
||||
}
|
||||
b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, *template)
|
||||
return volumes, firstItems, lastItems
|
||||
}
|
||||
func (b *ArgoBuilder) addTaskToArgo(dag *Dag, graphItemID string, processing *resources.ProcessingResource,
|
||||
firstItems []string, lastItems []string) (*Dag, []string, []string) {
|
||||
unique_name := getArgoName(processing.GetName(), graphItemID)
|
||||
step := Task{Name: unique_name, Template: unique_name}
|
||||
instance := processing.GetSelectedInstance()
|
||||
if instance != nil {
|
||||
for _, value := range instance.(*resources.ProcessingInstance).Env {
|
||||
step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{
|
||||
Name: value.Name,
|
||||
Value: value.Value,
|
||||
})
|
||||
}
|
||||
for _, value := range instance.(*resources.ProcessingInstance).Inputs {
|
||||
step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{
|
||||
Name: value.Name,
|
||||
Value: value.Value,
|
||||
})
|
||||
}
|
||||
for _, value := range instance.(*resources.ProcessingInstance).Outputs {
|
||||
step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{
|
||||
Name: value.Name,
|
||||
Value: value.Value,
|
||||
})
|
||||
}
|
||||
}
|
||||
step.Dependencies = b.getArgoDependencies(graphItemID)
|
||||
name := ""
|
||||
if b.OriginWorkflow.Graph.Items[graphItemID].Processing != nil {
|
||||
name = b.OriginWorkflow.Graph.Items[graphItemID].Processing.GetName()
|
||||
}
|
||||
if b.OriginWorkflow.Graph.Items[graphItemID].Workflow != nil {
|
||||
name = b.OriginWorkflow.Graph.Items[graphItemID].Workflow.GetName()
|
||||
}
|
||||
if len(step.Dependencies) == 0 && name != "" {
|
||||
firstItems = append(firstItems, getArgoName(name, graphItemID))
|
||||
}
|
||||
if ok, _ := b.isArgoDependancy(graphItemID); !ok && name != "" {
|
||||
lastItems = append(lastItems, getArgoName(name, graphItemID))
|
||||
}
|
||||
dag.Tasks = append(dag.Tasks, step)
|
||||
return dag, firstItems, lastItems
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) createVolumes(volumes []VolumeMount) { // TODO : one think about remote volume but TG
|
||||
func (b *ArgoBuilder) createVolumes(volumes []models.VolumeMount) { // TODO : one think about remote volume but TG
|
||||
for _, volume := range volumes {
|
||||
index := 0
|
||||
if volume.Storage.SelectedInstanceIndex != nil && (*volume.Storage.SelectedInstanceIndex) >= 0 {
|
||||
index = *volume.Storage.SelectedInstanceIndex
|
||||
}
|
||||
storage := volume.Storage.Instances[index]
|
||||
new_volume := VolumeClaimTemplate{}
|
||||
new_volume.Metadata.Name = strings.ReplaceAll(strings.ToLower(volume.Name), " ", "-")
|
||||
new_volume.Spec.AccessModes = []string{"ReadWriteOnce"}
|
||||
new_volume.Spec.Resources.Requests.Storage = fmt.Sprintf("%v", storage.SizeGB) + storage.SizeType.ToArgo()
|
||||
b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume)
|
||||
volume.BindToArgo(b.Workflow)
|
||||
}
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) {
|
||||
dependancyOfIDs := []string{}
|
||||
isDeps := false
|
||||
for _, link := range b.OriginWorkflow.Graph.Links {
|
||||
if _, ok := b.OriginWorkflow.Graph.Items[link.Destination.ID]; !ok {
|
||||
fmt.Println("Could not find the source of the link", link.Destination.ID)
|
||||
// Verify if a processing resource is attached to another Compute than the one hosting
|
||||
// the current Open Cloud instance. If true return the peer ID to contact
|
||||
func (b *ArgoBuilder) RepartiteProcess(processing resources.ProcessingResource, graphID string, template *models.Template, namespace string) error {
|
||||
computeAttached := b.OriginWorkflow.GetByRelatedProcessing(processing.GetID(), b.OriginWorkflow.Graph.IsCompute)
|
||||
if len(computeAttached) == 0 {
|
||||
return errors.New("No compute was found attached to processing " + processing.Name + " : " + processing.UUID)
|
||||
}
|
||||
// Creates an accessor srtictly for Peer Collection
|
||||
for _, related := range computeAttached {
|
||||
instance := related.Node.GetSelectedInstance().(*resources.ComputeResourceInstance)
|
||||
if instance == nil {
|
||||
continue
|
||||
}
|
||||
source := b.OriginWorkflow.Graph.Items[link.Destination.ID].Processing
|
||||
if id == link.Source.ID && source != nil {
|
||||
isDeps = true
|
||||
dependancyOfIDs = append(dependancyOfIDs, getArgoName(source.GetName(), link.Destination.ID))
|
||||
partner := instance.GetSelectedPartnership(conf.GetConfig().PeerID, conf.GetConfig().Groups)
|
||||
if partner == nil {
|
||||
logger.Error().Msg("can't proceed on datacenter because of missing pricing profiles " + related.Node.GetID())
|
||||
continue
|
||||
}
|
||||
wourceWF := b.OriginWorkflow.Graph.Items[link.Destination.ID].Workflow
|
||||
if id == link.Source.ID && wourceWF != nil {
|
||||
isDeps = true
|
||||
dependancyOfIDs = append(dependancyOfIDs, getArgoName(wourceWF.GetName(), link.Destination.ID))
|
||||
garanteed, allowed := b.setResourcesAllowedAndGaranteed(b.Workflow.GetDag(), models.NewBounds(), models.NewBounds(), "gpu", partner)
|
||||
garanteed, allowed = b.setResourcesAllowedAndGaranteed(b.Workflow.GetDag(), garanteed, allowed, "cpu", partner)
|
||||
garanteed.Set(float64(partner.(*resources.ComputeResourcePartnership).MinGaranteedRAMSize), "ram", false)
|
||||
allowed.Set(float64(partner.(*resources.ComputeResourcePartnership).MaxAllowedRAMSize), "ram", false)
|
||||
|
||||
res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil).LoadOne(related.Node.GetCreatorID())
|
||||
if res.Err != "" {
|
||||
return errors.New(res.Err)
|
||||
}
|
||||
|
||||
peer := *res.ToPeer()
|
||||
|
||||
isNotReparted := peer.State == 1
|
||||
logger.Info().Msg(fmt.Sprint("Result IsMySelf for ", peer.UUID, " : ", isNotReparted))
|
||||
if !(isNotReparted) {
|
||||
logger.Debug().Msg("Reparted processing, on " + peer.UUID)
|
||||
b.RemotePeers = append(b.RemotePeers, peer.UUID)
|
||||
template.AddAdmiraltyAnnotations(peer.UUID, namespace)
|
||||
}
|
||||
}
|
||||
return isDeps, dependancyOfIDs
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) getArgoDependencies(id string) (dependencies []string) {
|
||||
for _, link := range b.OriginWorkflow.Graph.Links {
|
||||
if _, ok := b.OriginWorkflow.Graph.Items[link.Source.ID]; !ok {
|
||||
fmt.Println("Could not find the source of the link", link.Source.ID)
|
||||
continue
|
||||
}
|
||||
source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing
|
||||
if id == link.Destination.ID && source != nil {
|
||||
dependency_name := getArgoName(source.GetName(), link.Source.ID)
|
||||
dependencies = append(dependencies, dependency_name)
|
||||
continue
|
||||
func (b *ArgoBuilder) setResourcesAllowedAndGaranteed(dag *Dag, minbound *models.Bounds, maxbound *models.Bounds, typ string, partner resources.ResourcePartnerITF) (*models.Bounds, *models.Bounds) {
|
||||
selector := ""
|
||||
values := map[string]float64{}
|
||||
if typ == "gpu" {
|
||||
values = partner.(*resources.ComputeResourcePartnership).MinGaranteedGPUsMemoryGB
|
||||
} else {
|
||||
values = partner.(*resources.ComputeResourcePartnership).MinGaranteedCPUsCores
|
||||
}
|
||||
for name, GPU := range values {
|
||||
if minbound.Set(float64(GPU), typ, true) {
|
||||
selector = name
|
||||
}
|
||||
}
|
||||
return
|
||||
if selector != "" {
|
||||
for _, t := range dag.Tasks {
|
||||
t.NodeSelector[typ+"-type"] = selector
|
||||
}
|
||||
}
|
||||
if typ == "gpu" {
|
||||
values = partner.(*resources.ComputeResourcePartnership).MaxAllowedGPUsMemoryGB
|
||||
} else {
|
||||
values = partner.(*resources.ComputeResourcePartnership).MaxAllowedCPUsCores
|
||||
}
|
||||
if max, ok := values[selector]; ok {
|
||||
maxbound.Set(float64(max), typ, false)
|
||||
} else {
|
||||
maxbound.GPU = minbound.GPU
|
||||
}
|
||||
return minbound, maxbound
|
||||
}
|
||||
|
||||
func getArgoName(raw_name string, component_id string) (formatedName string) {
|
||||
formatedName = strings.ReplaceAll(raw_name, " ", "-")
|
||||
formatedName += "-" + component_id
|
||||
formatedName = strings.ToLower(formatedName)
|
||||
return
|
||||
// Execute the last actions once the YAML file for the Argo Workflow is created
|
||||
func (b *ArgoBuilder) CompleteBuild(namespace string) (string, error) {
|
||||
logger.Info().Msg("DEV :: Completing build")
|
||||
setter := AdmiraltySetter{Id: namespace}
|
||||
// Setup admiralty for each node
|
||||
for _, peer := range b.RemotePeers {
|
||||
logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer))
|
||||
setter.InitializeAdmiralty(conf.GetConfig().PeerID, peer)
|
||||
}
|
||||
// Generate the YAML file
|
||||
random_name := fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8)
|
||||
b.Workflow.Metadata.Name = "oc-monitor-" + random_name
|
||||
logger = oclib.GetLogger()
|
||||
yamlified, err := yaml.Marshal(b.Workflow)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not transform object to yaml file")
|
||||
return "", err
|
||||
}
|
||||
// Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss
|
||||
current_timestamp := time.Now().Format("02_01_2006_150405")
|
||||
file_name := random_name + "_" + current_timestamp + ".yml"
|
||||
workflows_dir := "./argo_workflows/"
|
||||
err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660)
|
||||
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not write the yaml file")
|
||||
return "", err
|
||||
}
|
||||
|
||||
return workflows_dir + file_name, nil
|
||||
}
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"strings"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
func (b *ArgoBuilder) CreateService(id string, processing *resources.ProcessingResource) {
|
||||
@@ -47,20 +46,9 @@ func (b *ArgoBuilder) completeServicePorts(service *models.Service, id string, p
|
||||
|
||||
func (b *ArgoBuilder) addServiceToArgo() error {
|
||||
for _, service := range b.Services {
|
||||
service_manifest, err := yaml.Marshal(service)
|
||||
if err != nil {
|
||||
if err := service.BindToArgo(b.Workflow); err != nil {
|
||||
return err
|
||||
}
|
||||
service_template := models.Template{Name: "workflow-service-pod",
|
||||
Resource: models.ServiceResource{
|
||||
Action: "create",
|
||||
SuccessCondition: "status.succeeded > 0",
|
||||
FailureCondition: "status.failed > 3",
|
||||
SetOwnerReference: true,
|
||||
Manifest: string(service_manifest),
|
||||
},
|
||||
}
|
||||
b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, service_template)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package workflow_builder
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"oc-monitord/models"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
workflow "cloud.o-forge.io/core/oc-lib/models/workflow"
|
||||
@@ -14,7 +15,7 @@ type WorflowDB struct {
|
||||
|
||||
// Create the obj!ects from the mxgraphxml stored in the workflow given as a parameter
|
||||
func (w *WorflowDB) LoadFrom(workflow_id string, peerID string) error {
|
||||
fmt.Println("Loading workflow from " + workflow_id)
|
||||
logger.Info().Msg("Loading workflow from " + workflow_id)
|
||||
var err error
|
||||
if w.Workflow, err = w.getWorkflow(workflow_id, peerID); err != nil {
|
||||
return err
|
||||
@@ -27,7 +28,7 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo
|
||||
logger := oclib.GetLogger()
|
||||
|
||||
lib_data := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerID, []string{}, nil).LoadOne(workflow_id)
|
||||
fmt.Println("ERR", lib_data.Code, lib_data.Err)
|
||||
logger.Info().Msg(fmt.Sprint("ERR", lib_data.Code, lib_data.Err))
|
||||
if lib_data.Code != 200 {
|
||||
logger.Error().Msg("Error loading the graph")
|
||||
return workflow, errors.New(lib_data.Err)
|
||||
@@ -41,20 +42,20 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo
|
||||
return new_wf, nil
|
||||
}
|
||||
|
||||
func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (string, int, error) {
|
||||
func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, int, error) {
|
||||
logger := oclib.GetLogger()
|
||||
fmt.Println("Exporting to Argo", w.Workflow)
|
||||
logger.Info().Msg(fmt.Sprint("Exporting to Argo", w.Workflow))
|
||||
if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil {
|
||||
return "", 0, fmt.Errorf("can't export a graph that has not been loaded yet")
|
||||
return nil, 0, fmt.Errorf("can't export a graph that has not been loaded yet")
|
||||
}
|
||||
|
||||
argo_builder := ArgoBuilder{OriginWorkflow: w.Workflow, Timeout: timeout}
|
||||
filename, stepMax, _, _, err := argo_builder.CreateDAG(namespace, true)
|
||||
argoBuilder := ArgoBuilder{OriginWorkflow: w.Workflow, Workflow: &models.Workflow{}, Timeout: timeout}
|
||||
stepMax, _, _, err := argoBuilder.CreateDAG(namespace, true)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not create the argo file for " + w.Workflow.Name)
|
||||
return "", 0, err
|
||||
return nil, 0, err
|
||||
}
|
||||
return filename, stepMax, nil
|
||||
return &argoBuilder, stepMax, nil
|
||||
}
|
||||
|
||||
// TODO implement this function
|
||||
|
||||
Reference in New Issue
Block a user