Compare commits
8 Commits
feature/mi
...
feature/ev
| Author | SHA1 | Date | |
|---|---|---|---|
| 2be337a121 | |||
| 5dea87e410 | |||
| ced35dad67 | |||
| 3889f20250 | |||
|
|
d069baa8ce | ||
|
|
f22e7f4514 | ||
|
|
7eed5d10e9 | ||
| 30ae0a7c5b |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -22,3 +22,4 @@
|
||||
go.work
|
||||
|
||||
argo_workflows/*
|
||||
env.env
|
||||
7
Makefile
7
Makefile
@@ -12,12 +12,11 @@ clean:
|
||||
rm -rf oc-monitord
|
||||
|
||||
docker:
|
||||
DOCKER_BUILDKIT=1 docker build -t oc/oc-monitord:0.0.1 -f Dockerfile .
|
||||
docker tag oc/oc-monitord:0.0.1 oc/oc-monitord:latest
|
||||
docker tag oc/oc-monitord:0.0.1 oc-monitord:latest
|
||||
DOCKER_BUILDKIT=1 docker build -t oc-monitord -f Dockerfile .
|
||||
docker tag oc-monitord oc/oc-monitord:0.0.1
|
||||
|
||||
publish-kind:
|
||||
kind load docker-image oc/oc-monitord:0.0.1 --name opencloud
|
||||
kind load docker-image oc/oc-monitord:0.0.1 --name opencloud | true
|
||||
|
||||
publish-registry:
|
||||
@echo "TODO"
|
||||
|
||||
18
go.mod
18
go.mod
@@ -5,7 +5,7 @@ go 1.23.1
|
||||
toolchain go1.23.3
|
||||
|
||||
require (
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250715125819-e735f78e58c6
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d
|
||||
github.com/akamensky/argparse v1.4.0
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/goraz/onion v0.1.3
|
||||
@@ -60,11 +60,11 @@ require (
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/montanaflynn/stats v0.7.1 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/nats-io/nats.go v1.43.0 // indirect
|
||||
github.com/nats-io/nats.go v1.44.0 // indirect
|
||||
github.com/nats-io/nkeys v0.4.11 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.22.0 // indirect
|
||||
github.com/prometheus/client_golang v1.23.0 // indirect
|
||||
github.com/prometheus/client_model v0.6.2 // indirect
|
||||
github.com/prometheus/common v0.65.0 // indirect
|
||||
github.com/prometheus/procfs v0.17.0 // indirect
|
||||
@@ -77,15 +77,15 @@ require (
|
||||
github.com/xdg-go/stringprep v1.0.4 // indirect
|
||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
|
||||
go.mongodb.org/mongo-driver v1.17.4 // indirect
|
||||
golang.org/x/crypto v0.40.0 // indirect
|
||||
golang.org/x/net v0.42.0 // indirect
|
||||
golang.org/x/crypto v0.41.0 // indirect
|
||||
golang.org/x/net v0.43.0 // indirect
|
||||
golang.org/x/oauth2 v0.30.0 // indirect
|
||||
golang.org/x/sync v0.16.0 // indirect
|
||||
golang.org/x/sys v0.34.0 // indirect
|
||||
golang.org/x/term v0.33.0 // indirect
|
||||
golang.org/x/text v0.27.0 // indirect
|
||||
golang.org/x/sys v0.35.0 // indirect
|
||||
golang.org/x/term v0.34.0 // indirect
|
||||
golang.org/x/text v0.28.0 // indirect
|
||||
golang.org/x/time v0.7.0 // indirect
|
||||
google.golang.org/protobuf v1.36.6 // indirect
|
||||
google.golang.org/protobuf v1.36.7 // indirect
|
||||
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
k8s.io/api v0.32.1
|
||||
|
||||
26
go.sum
26
go.sum
@@ -8,6 +8,16 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20250704084459-443546027b27 h1:iogk6pV3gybzQ
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250704084459-443546027b27/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250715125819-e735f78e58c6 h1:Gnkv59Ntl2RebC5tNauXuxyRXLfZ2XAJ0+ujMyFte5U=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250715125819-e735f78e58c6/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250805113921-40a61387b9f1 h1:53KzZ+1JqRY6J7EVzQpNBmLzNuxb8oHNW3UgqxkYABo=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250805113921-40a61387b9f1/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250808140536-e7a71188a3b5 h1:bmEG0M99WXWCHsMNUgfYqQNEM0YyN4dXxYV1LCY5EYg=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250808140536-e7a71188a3b5/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250808141553-f4b0cf5683de h1:s47eEnWRCjBMOxbec5ROHztuwu0Zo7MuXgqWizgkiXU=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20250808141553-f4b0cf5683de/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260113155325-5cdfc28d2f51 h1:jlSEprNaUBe628uP9a9TrJ16Q5Ej6OxHlAKNtrHrN2o=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260113155325-5cdfc28d2f51/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d h1:6oGSN4Fb+H7LNVbUEN7vaDtWBHZTdd2Y1BkBdZ7MLXE=
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc=
|
||||
github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA=
|
||||
@@ -157,6 +167,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
|
||||
github.com/nats-io/nats.go v1.43.0 h1:uRFZ2FEoRvP64+UUhaTokyS18XBCR/xM2vQZKO4i8ug=
|
||||
github.com/nats-io/nats.go v1.43.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
|
||||
github.com/nats-io/nats.go v1.44.0 h1:ECKVrDLdh/kDPV1g0gAQ+2+m2KprqZK5O/eJAyAnH2M=
|
||||
github.com/nats-io/nats.go v1.44.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
|
||||
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
|
||||
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
@@ -178,6 +190,8 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI
|
||||
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q=
|
||||
github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0=
|
||||
github.com/prometheus/client_golang v1.23.0 h1:ust4zpdl9r4trLY/gSjlm07PuiBq2ynaXXlptpfy8Uc=
|
||||
github.com/prometheus/client_golang v1.23.0/go.mod h1:i/o0R9ByOnHX0McrTMTyhYvKE4haaf2mW08I+jGAjEE=
|
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
|
||||
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
|
||||
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
|
||||
@@ -256,6 +270,8 @@ golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
|
||||
golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
|
||||
golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM=
|
||||
golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY=
|
||||
golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4=
|
||||
golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
||||
@@ -281,6 +297,8 @@ golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
|
||||
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
|
||||
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
|
||||
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
|
||||
golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE=
|
||||
golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
|
||||
golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI=
|
||||
@@ -317,12 +335,16 @@ golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
|
||||
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
|
||||
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
|
||||
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
golang.org/x/term v0.32.0 h1:DR4lr0TjUs3epypdhTOkMmuF5CDFJ/8pOnbzMZPQ7bg=
|
||||
golang.org/x/term v0.32.0/go.mod h1:uZG1FhGx848Sqfsq4/DlJr3xGGsYMu/L5GW4abiaEPQ=
|
||||
golang.org/x/term v0.33.0 h1:NuFncQrRcaRvVmgRkvM3j/F00gWIAlcmlB8ACEKmGIg=
|
||||
golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0=
|
||||
golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4=
|
||||
golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw=
|
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
|
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
@@ -333,6 +355,8 @@ golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
|
||||
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
|
||||
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4=
|
||||
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU=
|
||||
golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng=
|
||||
golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU=
|
||||
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
|
||||
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
@@ -371,6 +395,8 @@ google.golang.org/grpc v1.63.0 h1:WjKe+dnvABXyPJMD7KDNLxtoGk5tgk+YFWN6cBWjZE8=
|
||||
google.golang.org/grpc v1.63.0/go.mod h1:WAX/8DgncnokcFUldAxq7GeB5DXHDbMF+lLvDomNkRA=
|
||||
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
|
||||
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
|
||||
google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A=
|
||||
google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
|
||||
22
main.go
22
main.go
@@ -89,7 +89,7 @@ func main() {
|
||||
logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API")
|
||||
}
|
||||
|
||||
builder, _, err := new_wf.ExportToArgo(exec.ExecutionsID, conf.GetConfig().Timeout) // Removed stepMax so far, I don't know if we need it anymore
|
||||
builder, _, err := new_wf.ExportToArgo(exec, conf.GetConfig().Timeout) // Removed stepMax so far, I don't know if we need it anymore
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not create the Argo file for " + conf.GetConfig().WorkflowID)
|
||||
logger.Error().Msg(err.Error())
|
||||
@@ -110,20 +110,20 @@ func main() {
|
||||
// Executed in a k8s environment
|
||||
logger.Info().Msg("Executes inside a k8s")
|
||||
// executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID()
|
||||
executeInside(conf.GetConfig().ExecutionID, exec.ExecutionsID, argoFilePath)
|
||||
executeInside(exec.ExecutionsID, argoFilePath)
|
||||
}
|
||||
}
|
||||
|
||||
// So far we only log the output from
|
||||
func executeInside(execID string, ns string, argo_file_path string) {
|
||||
func executeInside(ns string, argo_file_path string) {
|
||||
t, err := tools2.NewService(conf.GetConfig().Mode)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not create KubernetesTool")
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
name, err := t.CreateArgoWorkflow(argo_file_path, ns)
|
||||
// _ = name
|
||||
// _ = name
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not create argo workflow : " + err.Error())
|
||||
logger.Info().Msg(fmt.Sprint("CA :" + conf.GetConfig().KubeCA))
|
||||
@@ -152,20 +152,20 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) {
|
||||
var wg sync.WaitGroup
|
||||
var err error
|
||||
|
||||
logger.Debug().Msg("executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID )
|
||||
logger.Debug().Msg("executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID)
|
||||
|
||||
cmdSubmit := exec.Command("argo", "submit", "--watch", argo_file_path, "--serviceaccount", "sa-"+conf.GetConfig().ExecutionID, "-n", conf.GetConfig().ExecutionID)
|
||||
if stdoutSubmit, err = cmdSubmit.StdoutPipe(); err != nil {
|
||||
wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
cmdLogs := exec.Command("argo", "logs", "oc-monitor-"+workflowName, "-n", conf.GetConfig().ExecutionID, "--follow","--no-color")
|
||||
|
||||
cmdLogs := exec.Command("argo", "logs", "oc-monitor-"+workflowName, "-n", conf.GetConfig().ExecutionID, "--follow", "--no-color")
|
||||
if stdoutLogs, err = cmdLogs.StdoutPipe(); err != nil {
|
||||
wf_logger.Error().Msg("Could not retrieve stdoutpipe for 'argo logs'" + err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
var steps []string
|
||||
for _, template := range workflow.Spec.Templates {
|
||||
steps = append(steps, template.Name)
|
||||
@@ -186,7 +186,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) {
|
||||
logger.Info().Msg("Running argo logs")
|
||||
if err := cmdLogs.Run(); err != nil {
|
||||
wf_logger.Error().Msg("Could not run '" + strings.Join(cmdLogs.Args, " ") + "'")
|
||||
|
||||
|
||||
wf_logger.Fatal().Msg(err.Error() + bufio.NewScanner(stderrLogs).Text())
|
||||
|
||||
}
|
||||
@@ -201,7 +201,6 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) {
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
|
||||
func loadConfig(is_k8s bool, parser *argparse.Parser) {
|
||||
var o *onion.Onion
|
||||
o = initOnion(o)
|
||||
@@ -305,7 +304,6 @@ func getContainerName(argo_file string) string {
|
||||
return container_name
|
||||
}
|
||||
|
||||
|
||||
func updateStatus(status string, log string) {
|
||||
exec_id := conf.GetConfig().ExecutionID
|
||||
|
||||
|
||||
@@ -1,10 +1,16 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"oc-monitord/conf"
|
||||
"strings"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/models/common/models"
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources/native_tools"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
)
|
||||
|
||||
type Parameter struct {
|
||||
@@ -57,7 +63,7 @@ type Dag struct {
|
||||
}
|
||||
|
||||
type TemplateMetadata struct {
|
||||
Labels map[string]string `yaml:"labels,omitempty"`
|
||||
Labels map[string]string `yaml:"labels,omitempty"`
|
||||
Annotations map[string]string `yaml:"annotations,omitempty"`
|
||||
}
|
||||
|
||||
@@ -96,8 +102,34 @@ type Template struct {
|
||||
Resource ServiceResource `yaml:"resource,omitempty"`
|
||||
}
|
||||
|
||||
func (template *Template) CreateContainer(processing *resources.ProcessingResource, dag *Dag) {
|
||||
instance := processing.GetSelectedInstance()
|
||||
func (template *Template) CreateEventContainer(exec *workflow_execution.WorkflowExecution, nt *resources.NativeTool, dag *Dag) {
|
||||
container := Container{Image: "natsio/nats-box"}
|
||||
container.Command = []string{"sh", "-c"} // all is bash
|
||||
|
||||
var event *native_tools.WorkflowEventParams
|
||||
b, err := json.Marshal(nt.Params)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
err = json.Unmarshal(b, event)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
if event != nil {
|
||||
container.Args = append(container.Args, "nats pub --server "+conf.GetConfig().NatsURL+":4222 "+tools.WORKFLOW_EVENT.GenerateKey(tools.WORKFLOW_EVENT.String())+" '{\"workflow_id\":\""+event.WorkflowResourceID+"\"}'")
|
||||
container.Args = []string{strings.Join(container.Args, " ")}
|
||||
template.Container = container
|
||||
}
|
||||
}
|
||||
|
||||
func (template *Template) CreateContainer(exec *workflow_execution.WorkflowExecution, processing *resources.ProcessingResource, dag *Dag) {
|
||||
index := 0
|
||||
if d, ok := exec.SelectedInstances[processing.GetID()]; ok {
|
||||
index = d
|
||||
}
|
||||
instance := processing.GetSelectedInstance(&index)
|
||||
if instance == nil {
|
||||
return
|
||||
}
|
||||
@@ -143,10 +175,10 @@ func (template *Template) ReplacePerEnv(arg string, envs []models.Param) string
|
||||
|
||||
// Add the metadata that allow Admiralty to pick up an Argo Workflow that needs to be reparted
|
||||
// The value of "clustername" is the peerId, which must be replaced by the node name's for this specific execution
|
||||
func (t *Template) AddAdmiraltyAnnotations(peerId string){
|
||||
func (t *Template) AddAdmiraltyAnnotations(peerId string) {
|
||||
if t.Metadata.Annotations == nil {
|
||||
t.Metadata.Annotations = make(map[string]string)
|
||||
}
|
||||
t.Metadata.Annotations["multicluster.admiralty.io/elect"] = ""
|
||||
t.Metadata.Annotations["multicluster.admiralty.io/clustername"] = peerId
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,8 +18,11 @@ import (
|
||||
"cloud.o-forge.io/core/oc-lib/logs"
|
||||
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources/native_tools"
|
||||
w "cloud.o-forge.io/core/oc-lib/models/workflow"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
"github.com/nwtgck/go-fakelish"
|
||||
"github.com/rs/zerolog"
|
||||
"gopkg.in/yaml.v3"
|
||||
@@ -55,22 +58,22 @@ func (b *Workflow) getDag() *Dag {
|
||||
}
|
||||
|
||||
type Spec struct {
|
||||
ServiceAccountName string `yaml:"serviceAccountName,omitempty"`
|
||||
Entrypoint string `yaml:"entrypoint"`
|
||||
Arguments []Parameter `yaml:"arguments,omitempty"`
|
||||
Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"`
|
||||
Templates []Template `yaml:"templates"`
|
||||
Timeout int `yaml:"activeDeadlineSeconds,omitempty"`
|
||||
ServiceAccountName string `yaml:"serviceAccountName,omitempty"`
|
||||
Entrypoint string `yaml:"entrypoint"`
|
||||
Arguments []Parameter `yaml:"arguments,omitempty"`
|
||||
Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"`
|
||||
Templates []Template `yaml:"templates"`
|
||||
Timeout int `yaml:"activeDeadlineSeconds,omitempty"`
|
||||
}
|
||||
|
||||
// TODO: found on a processing instance linked to storage
|
||||
// add s3, gcs, azure, etc if needed on a link between processing and storage
|
||||
func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, []string, error) {
|
||||
func (b *ArgoBuilder) CreateDAG(exec *workflow_execution.WorkflowExecution, namespace string, write bool) (int, []string, []string, error) {
|
||||
logger = logs.GetLogger()
|
||||
logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items))
|
||||
// handle services by checking if there is only one processing with hostname and port
|
||||
firstItems, lastItems, volumes := b.createTemplates(namespace)
|
||||
b.createVolumes(volumes)
|
||||
firstItems, lastItems, volumes := b.createTemplates(exec, namespace)
|
||||
b.createVolumes(exec, volumes)
|
||||
|
||||
if b.Timeout > 0 {
|
||||
b.Workflow.Spec.Timeout = b.Timeout
|
||||
@@ -82,27 +85,44 @@ func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, [
|
||||
if !write {
|
||||
return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
|
||||
}
|
||||
|
||||
|
||||
return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
|
||||
|
||||
return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []VolumeMount) {
|
||||
|
||||
func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution, namespace string) ([]string, []string, []VolumeMount) {
|
||||
volumes := []VolumeMount{}
|
||||
firstItems := []string{}
|
||||
lastItems := []string{}
|
||||
items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing)
|
||||
logger.Info().Msg(fmt.Sprint("Creating templates", len(items)))
|
||||
for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) {
|
||||
instance := item.Processing.GetSelectedInstance()
|
||||
index := 0
|
||||
_, res := item.GetResource()
|
||||
if d, ok := exec.SelectedInstances[res.GetID()]; ok {
|
||||
index = d
|
||||
}
|
||||
instance := item.Processing.GetSelectedInstance(&index)
|
||||
logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance))
|
||||
if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil {
|
||||
logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName())
|
||||
return firstItems, lastItems, volumes
|
||||
}
|
||||
volumes, firstItems, lastItems = b.createArgoTemplates(namespace,
|
||||
volumes, firstItems, lastItems = b.createArgoTemplates(exec,
|
||||
namespace,
|
||||
item.ID, item.Processing, volumes, firstItems, lastItems)
|
||||
}
|
||||
for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsNativeTool) {
|
||||
if item.NativeTool.Kind != int(native_tools.WORKFLOW_EVENT) {
|
||||
continue
|
||||
}
|
||||
index := 0
|
||||
_, res := item.GetResource()
|
||||
if d, ok := exec.SelectedInstances[res.GetID()]; ok {
|
||||
index = d
|
||||
}
|
||||
instance := item.NativeTool.GetSelectedInstance(&index)
|
||||
logger.Info().Msg(fmt.Sprint("Creating template for", item.NativeTool.GetName(), instance))
|
||||
volumes, firstItems, lastItems = b.createArgoTemplates(exec,
|
||||
namespace, item.ID, item.NativeTool, volumes, firstItems, lastItems)
|
||||
}
|
||||
firstWfTasks := map[string][]string{}
|
||||
latestWfTasks := map[string][]string{}
|
||||
relatedWfTasks := map[string][]string{}
|
||||
@@ -113,7 +133,7 @@ func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []V
|
||||
continue
|
||||
}
|
||||
subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Timeout: b.Timeout}
|
||||
_, fi, li, err := subBuilder.CreateDAG(namespace, false)
|
||||
_, fi, li, err := subBuilder.CreateDAG(exec, namespace, false)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Error creating the subworkflow : " + err.Error())
|
||||
continue
|
||||
@@ -170,37 +190,44 @@ func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []V
|
||||
return firstItems, lastItems, volumes
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) createArgoTemplates(namespace string,
|
||||
func (b *ArgoBuilder) createArgoTemplates(
|
||||
exec *workflow_execution.WorkflowExecution,
|
||||
namespace string,
|
||||
id string,
|
||||
processing *resources.ProcessingResource,
|
||||
processing resources.ResourceInterface,
|
||||
volumes []VolumeMount,
|
||||
firstItems []string,
|
||||
lastItems []string) ([]VolumeMount, []string, []string) {
|
||||
_, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems)
|
||||
_, firstItems, lastItems = b.addTaskToArgo(exec, b.Workflow.getDag(), id, processing, firstItems, lastItems)
|
||||
template := &Template{Name: getArgoName(processing.GetName(), id)}
|
||||
logger.Info().Msg(fmt.Sprint("Creating template for", template.Name))
|
||||
isReparted, peerId := b.isProcessingReparted(*processing, id)
|
||||
template.CreateContainer(processing, b.Workflow.getDag())
|
||||
|
||||
isReparted, peerId := b.isProcessingReparted(processing, id)
|
||||
if processing.GetType() == tools.PROCESSING_RESOURCE.String() {
|
||||
template.CreateContainer(exec, processing.(*resources.ProcessingResource), b.Workflow.getDag())
|
||||
} else if processing.GetType() == tools.NATIVE_TOOL.String() {
|
||||
template.CreateEventContainer(exec, processing.(*resources.NativeTool), b.Workflow.getDag())
|
||||
}
|
||||
|
||||
if isReparted {
|
||||
logger.Debug().Msg("Reparted processing, on " + peerId)
|
||||
b.RemotePeers = append(b.RemotePeers, peerId)
|
||||
template.AddAdmiraltyAnnotations(peerId)
|
||||
}
|
||||
// get datacenter from the processing
|
||||
if processing.IsService {
|
||||
b.CreateService(id, processing)
|
||||
if processing.GetType() == tools.PROCESSING_RESOURCE.String() && processing.(*resources.ProcessingResource).IsService {
|
||||
b.CreateService(exec, id, processing)
|
||||
template.Metadata.Labels = make(map[string]string)
|
||||
template.Metadata.Labels["app"] = "oc-service-" + processing.GetName() // Construct the template for the k8s service and add a link in graph between k8s service and processing
|
||||
}
|
||||
|
||||
volumes = b.addStorageAnnotations(id, template, namespace, volumes)
|
||||
volumes = b.addStorageAnnotations(exec, id, template, namespace, volumes)
|
||||
b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, *template)
|
||||
return volumes, firstItems, lastItems
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) addStorageAnnotations(id string, template *Template, namespace string, volumes []VolumeMount) []VolumeMount {
|
||||
func (b *ArgoBuilder) addStorageAnnotations(exec *workflow_execution.WorkflowExecution, id string, template *Template, namespace string, volumes []VolumeMount) []VolumeMount {
|
||||
related := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.Graph.IsStorage) // Retrieve all of the storage node linked to the processing for which we create the template
|
||||
|
||||
for _, r := range related {
|
||||
storage := r.Node.(*resources.StorageResource)
|
||||
for _, linkToStorage := range r.Links {
|
||||
@@ -217,19 +244,19 @@ func (b *ArgoBuilder) addStorageAnnotations(id string, template *Template, names
|
||||
|
||||
if storage.StorageType == enum.S3 {
|
||||
|
||||
b.addS3annotations(&art, template, rw, linkToStorage, storage, namespace)
|
||||
b.addS3annotations(exec, &art, template, rw, linkToStorage, storage, namespace)
|
||||
}
|
||||
|
||||
if rw.Write {
|
||||
template.Outputs.Artifacts = append(template.Inputs.Artifacts, art)
|
||||
template.Outputs.Artifacts = append(template.Outputs.Artifacts, art)
|
||||
} else {
|
||||
template.Inputs.Artifacts = append(template.Outputs.Artifacts, art)
|
||||
template.Inputs.Artifacts = append(template.Inputs.Artifacts, art)
|
||||
}
|
||||
}
|
||||
}
|
||||
index := 0
|
||||
if storage.SelectedInstanceIndex != nil && (*storage.SelectedInstanceIndex) >= 0 {
|
||||
index = *storage.SelectedInstanceIndex
|
||||
if s, ok := exec.SelectedInstances[storage.GetID()]; ok {
|
||||
index = s
|
||||
}
|
||||
s := storage.Instances[index]
|
||||
if s.Local {
|
||||
@@ -241,10 +268,10 @@ func (b *ArgoBuilder) addStorageAnnotations(id string, template *Template, names
|
||||
}
|
||||
}
|
||||
return volumes
|
||||
}
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) addS3annotations(exec *workflow_execution.WorkflowExecution, art *Artifact, template *Template, rw graph.StorageProcessingGraphLink, linkToStorage graph.GraphLink, storage *resources.StorageResource, namespace string) {
|
||||
|
||||
func (b *ArgoBuilder) addS3annotations(art *Artifact, template *Template, rw graph.StorageProcessingGraphLink, linkToStorage graph.GraphLink, storage *resources.StorageResource, namespace string) {
|
||||
|
||||
art.S3 = &Key{
|
||||
// Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env),
|
||||
Insecure: true, // temporary
|
||||
@@ -252,58 +279,65 @@ func (b *ArgoBuilder) addS3annotations(art *Artifact, template *Template, rw gra
|
||||
if rw.Write {
|
||||
art.S3.Key = rw.Destination + "/" + rw.FileName
|
||||
} else {
|
||||
art.S3.Key = rw.Source
|
||||
art.S3.Key = rw.Source
|
||||
}
|
||||
sel := storage.GetSelectedInstance()
|
||||
index := 0
|
||||
if d, ok := exec.SelectedInstances[storage.GetID()]; ok {
|
||||
index = d
|
||||
}
|
||||
sel := storage.GetSelectedInstance(&index)
|
||||
// v0.1 : add the storage.Source to the s3 object
|
||||
// v0.2 : test if the storage.Source exists in the configMap and quit if not
|
||||
// v1 : v0.2 + if doesn't exist edit/create the configMap with the response from API call
|
||||
if sel != nil {
|
||||
b.addAuthInformation(storage, namespace, art)
|
||||
art.S3.Bucket = namespace // DEFAULT : will need to update this to create an unique
|
||||
art.S3.EndPoint = sel.(*resources.StorageResourceInstance).Source
|
||||
b.addAuthInformation(exec, storage, namespace, art)
|
||||
art.S3.Bucket = namespace // DEFAULT : will need to update this to create an unique
|
||||
art.S3.EndPoint = sel.(*resources.StorageResourceInstance).Source
|
||||
}
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) addAuthInformation(exec *workflow_execution.WorkflowExecution, storage *resources.StorageResource, namespace string, art *Artifact) {
|
||||
index := 0
|
||||
if d, ok := exec.SelectedInstances[storage.GetID()]; ok {
|
||||
index = d
|
||||
}
|
||||
|
||||
sel := storage.GetSelectedInstance(&index)
|
||||
|
||||
func (b *ArgoBuilder) addAuthInformation(storage *resources.StorageResource, namespace string, art *Artifact) {
|
||||
|
||||
sel := storage.GetSelectedInstance()
|
||||
|
||||
tool, err := tools2.NewService(conf.GetConfig().Mode)
|
||||
if err != nil || tool == nil {
|
||||
logger.Fatal().Msg("Could not create the access secret :" + err.Error())
|
||||
}
|
||||
|
||||
secretName, err := b.SetupS3Credentials(storage, namespace, tool) // this method return should be updated once we have decided how to retrieve credentials
|
||||
|
||||
if err == nil {
|
||||
art.S3.AccessKeySecret = &Secret{
|
||||
Name: secretName,
|
||||
Key: "access-key",
|
||||
}
|
||||
art.S3.SecretKeySecret = &Secret{
|
||||
Name: secretName,
|
||||
Key: "secret-key",
|
||||
}
|
||||
|
||||
secretName, err := b.SetupS3Credentials(storage, namespace, tool) // this method return should be updated once we have decided how to retrieve credentials
|
||||
|
||||
if err == nil {
|
||||
art.S3.AccessKeySecret = &Secret{
|
||||
Name: secretName,
|
||||
Key: "access-key",
|
||||
}
|
||||
art.S3.SecretKeySecret = &Secret{
|
||||
Name: secretName,
|
||||
Key: "secret-key",
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source+"/", "")
|
||||
art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source, "")
|
||||
splits := strings.Split(art.S3.EndPoint, "/")
|
||||
if len(splits) > 1 {
|
||||
art.S3.Bucket = splits[0]
|
||||
art.S3.EndPoint = strings.Join(splits[1:], "/")
|
||||
} else {
|
||||
art.S3.Bucket = splits[0]
|
||||
} else {
|
||||
art.S3.Bucket = splits[0]
|
||||
}
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) SetupS3Credentials(storage *resources.StorageResource, namespace string, tool tools2.Tool) (string, error) {
|
||||
s := tool.GetS3Secret(storage.UUID, namespace)
|
||||
// var s *v1.Secret
|
||||
accessKey, secretKey := retrieveMinioCredential("peer",namespace)
|
||||
|
||||
accessKey, secretKey := retrieveMinioCredential("peer", namespace)
|
||||
|
||||
if s == nil {
|
||||
id, err := tool.CreateAccessSecret(
|
||||
accessKey,
|
||||
@@ -318,22 +352,24 @@ func (b *ArgoBuilder) SetupS3Credentials(storage *resources.StorageResource, nam
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// s.Name = "toto"
|
||||
return s.Name, nil
|
||||
|
||||
|
||||
}
|
||||
|
||||
// This method needs to evolve to an API call to the peer passed as a parameter
|
||||
func retrieveMinioCredential(peer string, namespace string) (string,string) {
|
||||
func retrieveMinioCredential(peer string, namespace string) (string, string) {
|
||||
return "hF9wRGog75JuMdshWeEZ", "OwXXJkVQyb5l1aVPdOegKOtDJGoP1dJYeo8O7mDW"
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) addTaskToArgo(dag *Dag, graphItemID string, processing *resources.ProcessingResource,
|
||||
func (b *ArgoBuilder) addTaskToArgo(exec *workflow_execution.WorkflowExecution, dag *Dag, graphItemID string, processing resources.ResourceInterface,
|
||||
firstItems []string, lastItems []string) (*Dag, []string, []string) {
|
||||
unique_name := getArgoName(processing.GetName(), graphItemID)
|
||||
step := Task{Name: unique_name, Template: unique_name}
|
||||
instance := processing.GetSelectedInstance()
|
||||
unique_name := getArgoName(processing.GetName(), graphItemID)
|
||||
step := Task{Name: unique_name, Template: unique_name}
|
||||
index := 0
|
||||
if d, ok := exec.SelectedInstances[processing.GetID()]; ok {
|
||||
index = d
|
||||
}
|
||||
instance := processing.GetSelectedInstance(&index)
|
||||
if instance != nil {
|
||||
for _, value := range instance.(*resources.ProcessingInstance).Env {
|
||||
step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{
|
||||
@@ -372,11 +408,11 @@ func (b *ArgoBuilder) addTaskToArgo(dag *Dag, graphItemID string, processing *re
|
||||
return dag, firstItems, lastItems
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) createVolumes(volumes []VolumeMount) { // TODO : one think about remote volume but TG
|
||||
func (b *ArgoBuilder) createVolumes(exec *workflow_execution.WorkflowExecution, volumes []VolumeMount) { // TODO : one think about remote volume but TG
|
||||
for _, volume := range volumes {
|
||||
index := 0
|
||||
if volume.Storage.SelectedInstanceIndex != nil && (*volume.Storage.SelectedInstanceIndex) >= 0 {
|
||||
index = *volume.Storage.SelectedInstanceIndex
|
||||
if s, ok := exec.SelectedInstances[volume.Storage.GetID()]; ok {
|
||||
index = s
|
||||
}
|
||||
storage := volume.Storage.Instances[index]
|
||||
new_volume := VolumeClaimTemplate{}
|
||||
@@ -434,10 +470,10 @@ func getArgoName(raw_name string, component_id string) (formatedName string) {
|
||||
|
||||
// Verify if a processing resource is attached to another Compute than the one hosting
|
||||
// the current Open Cloud instance. If true return the peer ID to contact
|
||||
func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource, graphID string) (bool, string) {
|
||||
func (b *ArgoBuilder) isProcessingReparted(processing resources.ResourceInterface, graphID string) (bool, string) {
|
||||
computeAttached := b.retrieveProcessingCompute(graphID)
|
||||
if computeAttached == nil {
|
||||
logger.Error().Msg("No compute was found attached to processing " + processing.Name + " : " + processing.UUID)
|
||||
logger.Error().Msg("No compute was found attached to processing " + processing.GetName() + " : " + processing.GetID())
|
||||
panic(0)
|
||||
}
|
||||
|
||||
@@ -472,7 +508,7 @@ func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.Compu
|
||||
} else if link.Destination.ID == graphID {
|
||||
oppositeId = link.Source.ID
|
||||
}
|
||||
|
||||
|
||||
if oppositeId != "" {
|
||||
dt, res := b.OriginWorkflow.Graph.GetResource(oppositeId)
|
||||
if dt == oclib.COMPUTE_RESOURCE {
|
||||
@@ -494,10 +530,10 @@ func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) {
|
||||
// Setup admiralty for each node
|
||||
for _, peer := range b.RemotePeers {
|
||||
logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer))
|
||||
setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer)
|
||||
setter.InitializeAdmiralty(conf.GetConfig().PeerID, peer)
|
||||
}
|
||||
|
||||
// Update the name of the admiralty node to use
|
||||
// Update the name of the admiralty node to use
|
||||
for _, template := range b.Workflow.Spec.Templates {
|
||||
if len(template.Metadata.Annotations) > 0 {
|
||||
if peerId, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok {
|
||||
@@ -513,14 +549,14 @@ func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) {
|
||||
yamlified, err := yaml.Marshal(b.Workflow)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not transform object to yaml file")
|
||||
return "", err
|
||||
return "", err
|
||||
}
|
||||
// Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss
|
||||
current_timestamp := time.Now().Format("02_01_2006_150405")
|
||||
file_name := random_name + "_" + current_timestamp + ".yml"
|
||||
workflows_dir := "./argo_workflows/"
|
||||
err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660)
|
||||
|
||||
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not write the yaml file")
|
||||
return "", err
|
||||
|
||||
@@ -5,10 +5,11 @@ import (
|
||||
"strings"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
func (b *ArgoBuilder) CreateService(id string, processing *resources.ProcessingResource) {
|
||||
func (b *ArgoBuilder) CreateService(exec *workflow_execution.WorkflowExecution, id string, processing resources.ResourceInterface) {
|
||||
new_service := models.Service{
|
||||
APIVersion: "v1",
|
||||
Kind: "Service",
|
||||
@@ -24,17 +25,21 @@ func (b *ArgoBuilder) CreateService(id string, processing *resources.ProcessingR
|
||||
if processing == nil {
|
||||
return
|
||||
}
|
||||
b.completeServicePorts(&new_service, id, processing)
|
||||
b.completeServicePorts(exec, &new_service, id, processing)
|
||||
b.Services = append(b.Services, &new_service)
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) completeServicePorts(service *models.Service, id string, processing *resources.ProcessingResource) {
|
||||
instance := processing.GetSelectedInstance()
|
||||
func (b *ArgoBuilder) completeServicePorts(exec *workflow_execution.WorkflowExecution, service *models.Service, id string, processing resources.ResourceInterface) {
|
||||
index := 0
|
||||
if d, ok := exec.SelectedInstances[processing.GetID()]; ok {
|
||||
index = d
|
||||
}
|
||||
instance := processing.GetSelectedInstance(&index)
|
||||
if instance != nil && instance.(*resources.ProcessingInstance).Access != nil && instance.(*resources.ProcessingInstance).Access.Container != nil {
|
||||
for _, execute := range instance.(*resources.ProcessingInstance).Access.Container.Exposes {
|
||||
if execute.PAT != 0 {
|
||||
new_port_translation := models.ServicePort{
|
||||
Name: strings.ToLower(processing.Name) + id,
|
||||
Name: strings.ToLower(processing.GetName()) + id,
|
||||
Port: execute.Port,
|
||||
TargetPort: execute.PAT,
|
||||
Protocol: "TCP",
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
workflow "cloud.o-forge.io/core/oc-lib/models/workflow"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||
)
|
||||
|
||||
type WorflowDB struct {
|
||||
@@ -41,7 +42,7 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo
|
||||
return new_wf, nil
|
||||
}
|
||||
|
||||
func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, int, error) {
|
||||
func (w *WorflowDB) ExportToArgo(exec *workflow_execution.WorkflowExecution, timeout int) (*ArgoBuilder, int, error) {
|
||||
logger := oclib.GetLogger()
|
||||
logger.Info().Msg(fmt.Sprint("Exporting to Argo", w.Workflow))
|
||||
if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil {
|
||||
@@ -49,7 +50,7 @@ func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, i
|
||||
}
|
||||
|
||||
argoBuilder := ArgoBuilder{OriginWorkflow: w.Workflow, Timeout: timeout}
|
||||
stepMax, _, _, err := argoBuilder.CreateDAG(namespace, true)
|
||||
stepMax, _, _, err := argoBuilder.CreateDAG(exec, exec.ExecutionsID, true)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not create the argo file for " + w.Workflow.Name)
|
||||
return nil, 0, err
|
||||
|
||||
Reference in New Issue
Block a user