9 Commits

16 changed files with 695 additions and 645 deletions

View File

@@ -3,6 +3,8 @@
build: clean
go build .
dev: build
run:
./oc-monitord
@@ -10,7 +12,7 @@ clean:
rm -rf oc-monitord
docker:
DOCKER_BUILDKIT=1 docker build -t oc/oc-monitord:0.0.1 -f Dockerfile . --build-arg=HOST=$(HOST)
DOCKER_BUILDKIT=1 docker build -t oc/oc-monitord:0.0.1 -f Dockerfile .
docker tag oc/oc-monitord:0.0.1 oc/oc-monitord:latest
docker tag oc/oc-monitord:0.0.1 oc-monitord:latest
@@ -20,11 +22,6 @@ publish-kind:
publish-registry:
@echo "TODO"
docker-deploy:
docker compose up -d
run-docker: docker publish-kind publish-registry docker-deploy
all: docker publish-kind publish-registry
.PHONY: build run clean docker publish-kind publish-registry
.PHONY: build run clean docker publish-kind publish-registry

View File

@@ -9,7 +9,6 @@ type Config struct {
NatsURL string
ExecutionID string
PeerID string
Groups []string
Timeout int
WorkflowID string
Logs string
@@ -19,7 +18,7 @@ type Config struct {
KubeCA string
KubeCert string
KubeData string
ArgoHost string // when executed in a container will replace addresses with "localhost" in their url
ArgoHost string // when executed in a container will replace addresses with "localhost" in their url
}
var instance *Config

34
go.mod
View File

@@ -5,7 +5,7 @@ go 1.23.1
toolchain go1.23.3
require (
cloud.o-forge.io/core/oc-lib v0.0.0-20250624102227-e600fedcab06
cloud.o-forge.io/core/oc-lib v0.0.0-20250715125819-e735f78e58c6
github.com/akamensky/argparse v1.4.0
github.com/google/uuid v1.6.0
github.com/goraz/onion v0.1.3
@@ -15,8 +15,8 @@ require (
)
require (
github.com/beego/beego/v2 v2.3.7 // indirect
github.com/go-playground/validator/v10 v10.26.0 // indirect
github.com/beego/beego/v2 v2.3.8 // indirect
github.com/go-playground/validator/v10 v10.27.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
@@ -35,7 +35,7 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.8 // indirect
github.com/gabriel-vasile/mimetype v1.4.9 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
github.com/go-openapi/jsonreference v0.20.4 // indirect
@@ -60,14 +60,14 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/nats.go v1.41.0 // indirect
github.com/nats-io/nkeys v0.4.10 // indirect
github.com/nats-io/nats.go v1.43.0 // indirect
github.com/nats-io/nkeys v0.4.11 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.22.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.63.0 // indirect
github.com/prometheus/procfs v0.16.0 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.65.0 // indirect
github.com/prometheus/procfs v0.17.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 // indirect
github.com/smartystreets/goconvey v1.6.4 // indirect
@@ -76,14 +76,14 @@ require (
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
go.mongodb.org/mongo-driver v1.17.3 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/net v0.39.0 // indirect
golang.org/x/oauth2 v0.25.0 // indirect
golang.org/x/sync v0.13.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/term v0.31.0 // indirect
golang.org/x/text v0.24.0 // indirect
go.mongodb.org/mongo-driver v1.17.4 // indirect
golang.org/x/crypto v0.40.0 // indirect
golang.org/x/net v0.42.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/sys v0.34.0 // indirect
golang.org/x/term v0.33.0 // indirect
golang.org/x/text v0.27.0 // indirect
golang.org/x/time v0.7.0 // indirect
google.golang.org/protobuf v1.36.6 // indirect
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect

114
go.sum
View File

@@ -1,27 +1,13 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.o-forge.io/core/oc-lib v0.0.0-20250217072519-cafadec1469f h1:esLB0EAn8IuOChW35kcBrPaN80z4A4yYyz1mXT45GQo=
cloud.o-forge.io/core/oc-lib v0.0.0-20250217072519-cafadec1469f/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE=
cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9 h1:mSFFPwil5Ih+RPBvn88MBerQMtsoHnOuyCZQaf91a34=
cloud.o-forge.io/core/oc-lib v0.0.0-20250313155727-88c88cac5bc9/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE=
cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963 h1:ADDfqwtWF+VQTMSNAWPuhc4mmiKdgpHNmBB+UI2jRPE=
cloud.o-forge.io/core/oc-lib v0.0.0-20250612084738-2a0ab8e54963/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE=
cloud.o-forge.io/core/oc-lib v0.0.0-20250617130633-8f2adb76e41c h1:k2y+ocElqwUK5yzyCf3rWrDUzPWbds4MbtG58+Szos0=
cloud.o-forge.io/core/oc-lib v0.0.0-20250617130633-8f2adb76e41c/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20250617133502-9e5266326157 h1:853UvpMOM1QuWLrr/V8biDS8IcQcqHvoJsOT4epxDng=
cloud.o-forge.io/core/oc-lib v0.0.0-20250617133502-9e5266326157/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20250617141444-0b0952b28c7e h1:Z5vLv+Wzzz58abmHRnovoqbkVlKHuC8u8/RLv7FjtZw=
cloud.o-forge.io/core/oc-lib v0.0.0-20250617141444-0b0952b28c7e/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20250617144221-ec7a7e474637 h1:YiZbn6KmjgZ62uM+kH95Snd2nQliDKDnGMAxRr/VoUw=
cloud.o-forge.io/core/oc-lib v0.0.0-20250617144221-ec7a7e474637/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20250624064953-2c8dcbe93d14 h1:iCTrYc2+W2BFLOupRK1sD6sOgsK4NIs6WMC+4LiWCaY=
cloud.o-forge.io/core/oc-lib v0.0.0-20250624064953-2c8dcbe93d14/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20250624093207-3fdf5c3ebf29 h1:JitS1izRltTyOaWnvXnmYywHj0napsL6y0nBYiWUCNo=
cloud.o-forge.io/core/oc-lib v0.0.0-20250624093207-3fdf5c3ebf29/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20250624095852-147c7bc3a1d5 h1:0eV0E3kBZkOyoAurRmP9h4eHmFrZajOxSqoBgM3l3dk=
cloud.o-forge.io/core/oc-lib v0.0.0-20250624095852-147c7bc3a1d5/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20250624102227-e600fedcab06 h1:+RSv62uIC7wsmibsp1XTanQMNznNeOGgPpfhb6ZHT4c=
cloud.o-forge.io/core/oc-lib v0.0.0-20250624102227-e600fedcab06/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20250604083300-387785b40cb0 h1:iEm/Rf9I0OSCcncuFy61YOSZ3jdRlhJ/oLD97Pc2pCQ=
cloud.o-forge.io/core/oc-lib v0.0.0-20250604083300-387785b40cb0/go.mod h1:2roQbUpv3a6mTIr5oU1ux31WbN8YucyyQvCQ0FqwbcE=
cloud.o-forge.io/core/oc-lib v0.0.0-20250620085001-583ca2fbacd5 h1:FEBwueVOOWKYf0tJuE0EKNIbjxmTyCMgkT4qATYsfbo=
cloud.o-forge.io/core/oc-lib v0.0.0-20250620085001-583ca2fbacd5/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20250704084459-443546027b27 h1:iogk6pV3gybzQDBXMI6Qd/jvSA1h+3oRE+vLl1MRjew=
cloud.o-forge.io/core/oc-lib v0.0.0-20250704084459-443546027b27/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20250715125819-e735f78e58c6 h1:Gnkv59Ntl2RebC5tNauXuxyRXLfZ2XAJ0+ujMyFte5U=
cloud.o-forge.io/core/oc-lib v0.0.0-20250715125819-e735f78e58c6/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc=
github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA=
@@ -29,8 +15,8 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd
github.com/argoproj/argo-workflows/v3 v3.6.4 h1:5+Cc1UwaQE5ka3w7R3hxZ1TK3M6VjDEXA5WSQ/IXrxY=
github.com/argoproj/argo-workflows/v3 v3.6.4/go.mod h1:2f5zB8CkbNCCO1od+kd1dWkVokqcuyvu+tc+Jwx1MZg=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/beego/beego/v2 v2.3.7 h1:z4btKtjU/rfp5BiYHkGD2QPjK9i1E9GH+I7vfhn6Agk=
github.com/beego/beego/v2 v2.3.7/go.mod h1:5cqHsOHJIxkq44tBpRvtDe59GuVRVv/9/tyVDxd5ce4=
github.com/beego/beego/v2 v2.3.8 h1:wplhB1pF4TxR+2SS4PUej8eDoH4xGfxuHfS7wAk9VBc=
github.com/beego/beego/v2 v2.3.8/go.mod h1:8vl9+RrXqvodrl9C8yivX1e6le6deCK6RWeq8R7gTTg=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/biter777/countries v1.7.5 h1:MJ+n3+rSxWQdqVJU8eBy9RqcdH6ePPn4PJHocVWUa+Q=
@@ -63,8 +49,8 @@ github.com/etcd-io/etcd v3.3.17+incompatible/go.mod h1:cdZ77EstHBwVtD6iTgzgvogwc
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E=
github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ=
github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM=
github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8=
github.com/gabriel-vasile/mimetype v1.4.9 h1:5k+WDwEsD9eTLL8Tz3L0VnmVh9QxGjRmjBvAG7U/oYY=
github.com/gabriel-vasile/mimetype v1.4.9/go.mod h1:WnSQhFKJuBlRyLiKohA/2DtIlPFAbguNaG7QCHcyGok=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
@@ -82,6 +68,8 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.26.0 h1:SP05Nqhjcvz81uJaRfEV0YBSSSGMc/iMaVtFbr3Sw2k=
github.com/go-playground/validator/v10 v10.26.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo=
github.com/go-playground/validator/v10 v10.27.0 h1:w8+XrWVMhGkxOaaowyKH35gFydVHOvC0/uWoy2Fzwn4=
github.com/go-playground/validator/v10 v10.27.0/go.mod h1:I5QpIEbmr8On7W0TktmJAumgzX4CA1XNl4ZmDuVHKKo=
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
@@ -167,10 +155,10 @@ github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8
github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/nats-io/nats.go v1.41.0 h1:PzxEva7fflkd+n87OtQTXqCTyLfIIMFJBpyccHLE2Ko=
github.com/nats-io/nats.go v1.41.0/go.mod h1:wV73x0FSI/orHPSYoyMeJB+KajMDoWyXmFaRrrYaaTo=
github.com/nats-io/nkeys v0.4.10 h1:glmRrpCmYLHByYcePvnTBEAwawwapjCPMjy2huw20wc=
github.com/nats-io/nkeys v0.4.10/go.mod h1:OjRrnIKnWBFl+s4YK5ChQfvHP2fxqZexrKJoVVyWB3U=
github.com/nats-io/nats.go v1.43.0 h1:uRFZ2FEoRvP64+UUhaTokyS18XBCR/xM2vQZKO4i8ug=
github.com/nats-io/nats.go v1.43.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/nwtgck/go-fakelish v0.1.3 h1:bA8/xa9hQmzppexIhBvdmztcd/PJ4SPuAUTBdMKZ8G4=
@@ -191,12 +179,16 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH
github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q=
github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.63.0 h1:YR/EIY1o3mEFP/kZCD7iDMnLPlGyuU2Gb3HIcXnA98k=
github.com/prometheus/common v0.63.0/go.mod h1:VVFF/fBIoToEnWRVkYoXEkq3R3paCoxG9PXP74SnV18=
github.com/prometheus/procfs v0.16.0 h1:xh6oHhKwnOJKMYiYBDWmkHqQPyiY40sny36Cmx2bbsM=
github.com/prometheus/procfs v0.16.0/go.mod h1:8veyXUu3nGP7oaCxhX6yeaM5u4stL2FeMXnCqhDthZg=
github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk=
github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE=
github.com/prometheus/common v0.64.0 h1:pdZeA+g617P7oGv1CzdTzyeShxAGrTBsolKNOLQPGO4=
github.com/prometheus/common v0.64.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8=
github.com/prometheus/common v0.65.0 h1:QDwzd+G1twt//Kwj/Ww6E9FQq1iVMmODnILtW1t2VzE=
github.com/prometheus/common v0.65.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8=
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
github.com/prometheus/procfs v0.17.0 h1:FuLQ+05u4ZI+SS/w9+BWEM2TXiHKsUQ9TADiRH7DuK0=
github.com/prometheus/procfs v0.17.0/go.mod h1:oPQLaDAMRbA+u8H5Pbfq+dl3VDAvHxMUOVhe0wYB2zw=
github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ=
github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
@@ -250,14 +242,20 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.mongodb.org/mongo-driver v1.17.3 h1:TQyXhnsWfWtgAhMtOgtYHMTkZIfBTpMTsMnd9ZBeHxQ=
go.mongodb.org/mongo-driver v1.17.3/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ=
go.mongodb.org/mongo-driver v1.17.4 h1:jUorfmVzljjr0FLzYQsGP8cgN/qzzxlY9Vh0C9KFXVw=
go.mongodb.org/mongo-driver v1.17.4/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8=
golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw=
golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM=
golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
@@ -277,12 +275,16 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY=
golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds=
golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw=
golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA=
golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs=
golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.25.0 h1:CY4y7XT9v0cRI9oupztF8AgiIu99L/ksR/Xp/6jrZ70=
golang.org/x/oauth2 v0.25.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI=
golang.org/x/oauth2 v0.30.0/go.mod h1:B++QgG3ZKulg6sRPGD/mqlHQs5rB3Ml9erfeDY7xKlU=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@@ -290,8 +292,12 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ=
golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8=
golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw=
golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -307,18 +313,26 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA=
golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.31.0 h1:erwDkOK1Msy6offm1mOgvspSkslFnIGsFnxOKoufg3o=
golang.org/x/term v0.31.0/go.mod h1:R4BeIy7D95HzImkxGkTW1UQTtP54tio2RyHz7PwK0aw=
golang.org/x/term v0.32.0 h1:DR4lr0TjUs3epypdhTOkMmuF5CDFJ/8pOnbzMZPQ7bg=
golang.org/x/term v0.32.0/go.mod h1:uZG1FhGx848Sqfsq4/DlJr3xGGsYMu/L5GW4abiaEPQ=
golang.org/x/term v0.33.0 h1:NuFncQrRcaRvVmgRkvM3j/F00gWIAlcmlB8ACEKmGIg=
golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4=
golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA=
golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M=
golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA=
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4=
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU=
golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ=
golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

42
main.go
View File

@@ -14,7 +14,6 @@ import (
"oc-monitord/conf"
l "oc-monitord/logger"
"oc-monitord/models"
u "oc-monitord/utils"
"oc-monitord/workflow_builder"
@@ -54,7 +53,7 @@ func main() {
os.Setenv("test_service", "true") // Only for service demo, delete before merging on main
parser = *argparse.NewParser("oc-monitord", "Launch the execution of a workflow given as a parameter and sends the produced logs to a loki database")
setConf(&parser)
loadConfig(false, &parser)
oclib.InitDaemon("oc-monitord")
oclib.SetConfig(
@@ -72,7 +71,6 @@ func main() {
exec := u.GetExecution(conf.GetConfig().ExecutionID)
if exec == nil {
logger.Fatal().Msg("Could not retrieve workflow ID from execution ID " + conf.GetConfig().ExecutionID + " on peer " + conf.GetConfig().PeerID)
return
}
conf.GetConfig().WorkflowID = exec.WorkflowID
@@ -112,20 +110,20 @@ func main() {
// Executed in a k8s environment
logger.Info().Msg("Executes inside a k8s")
// executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID()
executeInside(exec.ExecutionsID, argoFilePath)
executeInside(conf.GetConfig().ExecutionID, exec.ExecutionsID, argoFilePath)
}
}
// So far we only log the output from
func executeInside(ns string, argo_file_path string) {
func executeInside(execID string, ns string, argo_file_path string) {
t, err := tools2.NewService(conf.GetConfig().Mode)
if err != nil {
logger.Error().Msg("Could not create KubernetesTool")
return
}
name, err := t.CreateArgoWorkflow(argo_file_path, ns)
// _ = name
// _ = name
if err != nil {
logger.Error().Msg("Could not create argo workflow : " + err.Error())
logger.Info().Msg(fmt.Sprint("CA :" + conf.GetConfig().KubeCA))
@@ -148,26 +146,26 @@ func executeInside(ns string, argo_file_path string) {
}
func executeOutside(argo_file_path string, workflow *models.Workflow) {
func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) {
var stdoutSubmit, stderrSubmit io.ReadCloser
var stdoutLogs, stderrLogs io.ReadCloser
var wg sync.WaitGroup
var err error
logger.Debug().Msg("executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID)
logger.Debug().Msg("executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID )
cmdSubmit := exec.Command("argo", "submit", "--watch", argo_file_path, "--serviceaccount", "sa-"+conf.GetConfig().ExecutionID, "-n", conf.GetConfig().ExecutionID)
if stdoutSubmit, err = cmdSubmit.StdoutPipe(); err != nil {
wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error())
return
}
cmdLogs := exec.Command("argo", "logs", "oc-monitor-"+workflowName, "-n", conf.GetConfig().ExecutionID, "--follow", "--no-color")
cmdLogs := exec.Command("argo", "logs", "oc-monitor-"+workflowName, "-n", conf.GetConfig().ExecutionID, "--follow","--no-color")
if stdoutLogs, err = cmdLogs.StdoutPipe(); err != nil {
wf_logger.Error().Msg("Could not retrieve stdoutpipe for 'argo logs'" + err.Error())
return
}
var steps []string
for _, template := range workflow.Spec.Templates {
steps = append(steps, template.Name)
@@ -188,7 +186,7 @@ func executeOutside(argo_file_path string, workflow *models.Workflow) {
logger.Info().Msg("Running argo logs")
if err := cmdLogs.Run(); err != nil {
wf_logger.Error().Msg("Could not run '" + strings.Join(cmdLogs.Args, " ") + "'")
wf_logger.Fatal().Msg(err.Error() + bufio.NewScanner(stderrLogs).Text())
}
@@ -203,13 +201,22 @@ func executeOutside(argo_file_path string, workflow *models.Workflow) {
wg.Wait()
}
func setConf(parser *argparse.Parser) {
func loadConfig(is_k8s bool, parser *argparse.Parser) {
var o *onion.Onion
o = initOnion(o)
setConf(is_k8s, o, parser)
// if !IsValidUUID(conf.GetConfig().ExecutionID) {
// logger.Fatal().Msg("Provided ID is not an UUID")
// }
}
func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) {
url := parser.String("u", "url", &argparse.Options{Required: true, Default: "http://127.0.0.1:3100", Help: "Url to the Loki database logs will be sent to"})
mode := parser.String("M", "mode", &argparse.Options{Required: false, Default: "", Help: "Mode of the execution"})
execution := parser.String("e", "execution", &argparse.Options{Required: true, Help: "Execution ID of the workflow to request from oc-catalog API"})
peer := parser.String("p", "peer", &argparse.Options{Required: false, Default: "", Help: "Peer ID of the workflow to request from oc-catalog API"})
groups := parser.String("g", "groups", &argparse.Options{Required: false, Default: "", Help: "Groups of the peer to request from oc-catalog API"})
mongo := parser.String("m", "mongo", &argparse.Options{Required: true, Default: "mongodb://127.0.0.1:27017", Help: "URL to reach the MongoDB"})
db := parser.String("d", "database", &argparse.Options{Required: true, Default: "DC_myDC", Help: "Name of the database to query in MongoDB"})
timeout := parser.Int("t", "timeout", &argparse.Options{Required: false, Default: -1, Help: "Timeout for the execution of the workflow"})
@@ -236,7 +243,7 @@ func setConf(parser *argparse.Parser) {
conf.GetConfig().Mode = *mode
conf.GetConfig().ExecutionID = *execution
conf.GetConfig().PeerID = *peer
conf.GetConfig().Groups = strings.Split((*groups), ",")
conf.GetConfig().KubeHost = *host
conf.GetConfig().KubePort = *port
@@ -298,6 +305,7 @@ func getContainerName(argo_file string) string {
return container_name
}
func updateStatus(status string, log string) {
exec_id := conf.GetConfig().ExecutionID

66
minio-doc/README.md Normal file
View File

@@ -0,0 +1,66 @@
# Goal
We want to be able to instantiate a service that allows to store file located on a `processing` pod onto it.
We have already tested it with a static `Argo` yaml file, a MinIO running on the same kubernetes node, the minio service is reached because it is the only associated to the `serviceAccount`.
We have established three otpions that need to be available to the user for the feature to be implemented:
- Use a MinIO running constantly on the node that executes the argo workflow
- Use a MinIO
- A MinIO is instanciated when a new workflow is launched
# Requirements
- Helm : `https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3`
- Helm GO client : `$ go get github.com/mittwald/go-helm-client`
- MinIO chart : `https://charts.min.io/`
# Ressources
We need to create several ressources in order for the pods to communicate with the MinIO
## MinIO Auth Secrets
## Bucket ConfigMap
With the name `artifact-repositories` this configMap will be used by default. It contains the URL to the MinIO server and the key to the authentication data held in a `secret` ressource.
```yaml
apiVersion: v1
kind: ConfigMap
metadata:
# If you want to use this config map by default, name it "artifact-repositories".
name: artifact-repositories
# annotations:
# # v3.0 and after - if you want to use a specific key, put that key into this annotation.
# workflows.argoproj.io/default-artifact-repository: oc-s3-artifact-repository
data:
oc-s3-artifact-repository: |
s3:
bucket: oc-bucket
endpoint: [ retrieve cluster with kubectl get service argo-artifacts -o jsonpath="{.spec.clusterIP}" ]:9000
insecure: true
accessKeySecret:
name: argo-artifact-secret
key: access-key
secretKeySecret:
name: argo-artifact-secret
key: secret-key
```
# Code modifications
Rajouter un attribut "isDataLink"
- true/false
Rajouter un attribut DataPath ou un truc comme ca
- liste de map[string]string permet de n'avoir qu'une copie par fichier)
- éditable uniquement a travers la méthode addDataPath
- clé : path du fichier / value : nom de la copie dans minio
===> on a besoin du meme attribut pour Processing -> Data et Data -> Processing

View File

@@ -1,7 +1,5 @@
package models
import "gopkg.in/yaml.v3"
type ServiceResource struct {
Action string `yaml:"action,omitempty"`
SuccessCondition string `yaml:"successCondition,omitempty"`
@@ -17,24 +15,6 @@ type Service struct {
Spec ServiceSpec `yaml:"spec"`
}
func (s *Service) BindToArgo(workflow *Workflow) error {
service_manifest, err := yaml.Marshal(s)
if err != nil {
return err
}
service_template := Template{Name: "workflow-service-pod",
Resource: ServiceResource{
Action: "create",
SuccessCondition: "status.succeeded > 0",
FailureCondition: "status.failed > 3",
SetOwnerReference: true,
Manifest: string(service_manifest),
},
}
workflow.Spec.Templates = append(workflow.Spec.Templates, service_template)
return nil
}
type Metadata struct {
Name string `yaml:"name"`
}

View File

@@ -1,15 +1,8 @@
package models
import (
"encoding/json"
"fmt"
"strconv"
"strings"
w "cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/resources"
)
@@ -19,60 +12,11 @@ type Parameter struct {
Value string `yaml:"value,omitempty"`
}
type Bounds struct {
CPU string `yaml:"cpu,omitempty"`
Memory string `yaml:"memory,omitempty"`
GPU string `yaml:"nvidia.com/gpu,omitempty"`
}
func NewBounds() *Bounds {
return &Bounds{
CPU: "0",
Memory: "0",
GPU: "0",
}
}
func (b *Bounds) Set(value float64, what string, isMin bool) bool {
i := float64(0)
switch what {
case "cpu":
if newI, err := strconv.ParseFloat(b.CPU, 64); err == nil {
i = newI
}
case "ram":
if newI, err := strconv.ParseFloat(b.Memory, 64); err == nil {
i = newI
}
case "gpu":
if newI, err := strconv.ParseFloat(b.GPU, 64); err == nil {
i = newI
}
}
ok := (value > i && !isMin) || (value < i && isMin)
if ok {
switch what {
case "cpu":
b.CPU = fmt.Sprintf("%f", value)
return true
case "ram":
b.Memory = fmt.Sprintf("%fGi", value)
return true
case "gpu":
b.GPU = fmt.Sprintf("%f", value)
return true
}
}
return false
}
type Container struct {
Image string `yaml:"image"`
Command []string `yaml:"command,omitempty,flow"`
Args []string `yaml:"args,omitempty,flow"`
VolumeMounts []VolumeMount `yaml:"volumeMounts,omitempty"`
Requests Bounds `yaml:"requests,omitempty"`
Limits Bounds `yaml:"limits,omitempty"`
}
func (c *Container) AddVolumeMount(volumeMount VolumeMount, volumes []VolumeMount) []VolumeMount {
@@ -93,87 +37,27 @@ func (c *Container) AddVolumeMount(volumeMount VolumeMount, volumes []VolumeMoun
return volumes
}
type VolumeMount struct {
Name string `yaml:"name"`
MountPath string `yaml:"mountPath"`
Storage *resources.StorageResource `yaml:"-"`
}
type Task struct {
Name string `yaml:"name"`
Template string `yaml:"template"`
Dependencies []string `yaml:"dependencies,omitempty"`
NodeSelector map[string]string `yaml:"nodeSelector,omitempty"`
Name string `yaml:"name"`
Template string `yaml:"template"`
Dependencies []string `yaml:"dependencies,omitempty"`
Arguments struct {
Parameters []Parameter `yaml:"parameters,omitempty"`
} `yaml:"arguments,omitempty"`
}
func NewTask(processingName string, graphItemID string) *Task {
unique_name := GetArgoName(processingName, graphItemID)
return &Task{
Name: unique_name,
Template: unique_name,
}
}
func (t *Task) BindToArgo(
dag *Dag,
graphItemID string,
originWf *w.Workflow,
processing *resources.ProcessingResource,
firstItems, lastItems []string,
) (*Dag, []string, []string) {
if instance := processing.GetSelectedInstance(); instance != nil {
t.addParams(instance.(*resources.ProcessingInstance).Env)
t.addParams(instance.(*resources.ProcessingInstance).Inputs)
t.addParams(instance.(*resources.ProcessingInstance).Outputs)
}
t.Dependencies = TransformDepsToArgo(originWf.GetDependencies(graphItemID))
name := ""
if originWf.Graph.Items[graphItemID].Processing != nil {
name = originWf.Graph.Items[graphItemID].Processing.GetName()
}
if originWf.Graph.Items[graphItemID].Workflow != nil {
name = originWf.Graph.Items[graphItemID].Workflow.GetName()
}
if len(t.Dependencies) == 0 && name != "" {
firstItems = append(firstItems, GetArgoName(name, graphItemID))
}
if deps := originWf.IsDependancy(graphItemID); len(deps) == 0 && name != "" {
lastItems = append(lastItems, GetArgoName(name, graphItemID))
}
dag.Tasks = append(dag.Tasks, *t)
return dag, firstItems, lastItems
}
func (t *Task) addParams(params []models.Param) {
for _, value := range params {
t.Arguments.Parameters = append(t.Arguments.Parameters, Parameter{
Name: value.Name,
Value: value.Value,
})
}
}
func (t *Task) GetDeps(name string) (int, string) {
for i, deps := range t.Dependencies {
if strings.Contains(deps, name) {
return i, deps
}
}
return 0, ""
}
type Dag struct {
Tasks []Task `yaml:"tasks,omitempty"`
}
func (d *Dag) GetTask(taskName string) *Task {
for _, task := range d.Tasks {
if strings.Contains(task.Name, taskName) {
return &task
}
}
return nil
}
type TemplateMetadata struct {
Labels map[string]string `yaml:"labels,omitempty"`
Labels map[string]string `yaml:"labels,omitempty"`
Annotations map[string]string `yaml:"annotations,omitempty"`
}
@@ -182,17 +66,13 @@ type Secret struct {
Key string `yaml:"key"`
}
func NewSecret(name string, key string) *Secret {
return &Secret{Name: name, Key: key + "-key"}
}
type Key struct {
Key string `yaml:"key"`
Bucket string `yaml:"bucket"`
EndPoint string `yaml:"endpoint"`
Insecure bool `yaml:"insecure"`
AccessKeySecret *Secret `yaml accessKeySecret`
SecretKeySecret *Secret `yaml secretKeySecret`
AccessKeySecret *Secret `yaml:"accessKeySecret"`
SecretKeySecret *Secret `yaml:"secretKeySecret"`
}
type Artifact struct {
@@ -201,59 +81,6 @@ type Artifact struct {
S3 *Key `yaml:"s3,omitempty"`
}
func NewArtifact(name string, rw graph.StorageProcessingGraphLink, params []models.Param, template Template) *Artifact {
if rw.Write {
name += "-" + rw.Destination + "-input-write"
} else {
name = "-" + rw.Destination + "-input-read"
}
return &Artifact{
Name: name,
Path: template.ReplacePerEnv(rw.Source, params),
}
}
func (a *Artifact) BindToArgo(storageType enum.StorageType, rw graph.StorageProcessingGraphLink, params []models.Param, template Template) {
if rw.Write {
template.Outputs.Artifacts = append(template.Inputs.Artifacts, *a)
} else {
template.Inputs.Artifacts = append(template.Outputs.Artifacts, *a)
}
}
func (a *Artifact) bindS3(rw graph.StorageProcessingGraphLink, params []models.Param, template Template) {
a.S3 = &Key{
Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, params),
Insecure: true, // temporary
}
/* sel := storage.GetSelectedInstance()
if sel != nil {
if sel.(*resources.StorageResourceInstance).Credentials != nil {
tool, err := tools2.NewService(conf.GetConfig().Mode)
if err != nil || tool == nil {
logger.Error().Msg("Could not create the access secret")
} else {
id, err := tool.CreateAccessSecret(namespace,
sel.(*resources.StorageResourceInstance).Credentials.Login,
sel.(*resources.StorageResourceInstance).Credentials.Pass)
if err == nil {
a.S3.AccessKeySecret = NewSecret(id, "access")
a.S3.SecretKeySecret = NewSecret(id, "secret")
}
}
}
source := sel.(*resources.StorageResourceInstance).Source
a.S3.Key = strings.ReplaceAll(strings.ReplaceAll(a.S3.Key, source+"/", ""), source, "")
splits := strings.Split(a.S3.EndPoint, "/")
if len(splits) > 1 {
a.S3.Bucket = splits[0]
a.S3.EndPoint = strings.Join(splits[1:], "/")
} else {
a.S3.Bucket = splits[0]
}
} */
}
type InOut struct {
Parameters []Parameter `yaml:"parameters"`
Artifacts []Artifact `yaml:"artifacts,omitempty"`
@@ -304,7 +131,7 @@ func (template *Template) CreateContainer(processing *resources.ProcessingResour
func (template *Template) ReplacePerEnv(arg string, envs []models.Param) string {
for _, v := range envs {
if strings.Contains(arg, v.Name) {
if v.Name != "" && strings.Contains(arg, v.Name) {
value := "{{ inputs.parameters." + v.Name + " }}"
arg = strings.ReplaceAll(arg, v.Name, value)
arg = strings.ReplaceAll(arg, "$"+v.Name, value)
@@ -316,43 +143,10 @@ func (template *Template) ReplacePerEnv(arg string, envs []models.Param) string
// Add the metadata that allow Admiralty to pick up an Argo Workflow that needs to be reparted
// The value of "clustername" is the peerId, which must be replaced by the node name's for this specific execution
func (t *Template) AddAdmiraltyAnnotations(peerID, namespace string) error {
func (t *Template) AddAdmiraltyAnnotations(peerId string){
if t.Metadata.Annotations == nil {
t.Metadata.Annotations = make(map[string]string)
}
const key = "admiralty.io/multi-cluster-scheduler"
var annotation SchedulerAnnotation
// Parse existing annotation if it exists
if val, ok := t.Metadata.Annotations[key]; ok && val != "" {
if err := json.Unmarshal([]byte(val), &annotation); err != nil {
return fmt.Errorf("failed to parse existing scheduler annotation: %w", err)
}
}
// Add new affinity
annotation.Affinities = append(annotation.Affinities, affinity{
Cluster: "target-" + peerID + "-" + namespace,
Namespace: namespace,
})
// Encode back to JSON
bytes, err := json.Marshal(annotation)
if err != nil {
return fmt.Errorf("failed to encode scheduler annotation: %w", err)
}
t.Metadata.Annotations[key] = string(bytes)
return nil
}
type affinity struct {
Cluster string `json:"cluster"`
Namespace string `json:"namespace"`
}
type SchedulerAnnotation struct {
Affinities []affinity `json:"affinities"`
}
t.Metadata.Annotations["multicluster.admiralty.io/elect"] = ""
t.Metadata.Annotations["multicluster.admiralty.io/clustername"] = peerId
}

View File

@@ -1,92 +0,0 @@
package models
import (
"strings"
w "cloud.o-forge.io/core/oc-lib/models/workflow"
)
type WorkflowsDependancies struct {
FirstWfTasks map[string][]string
RelatedWfTasks map[string][]string
LastWfTasks map[string][]string
}
func NewWorkflowDependancies() *WorkflowsDependancies {
return &WorkflowsDependancies{
FirstWfTasks: map[string][]string{},
RelatedWfTasks: map[string][]string{},
LastWfTasks: map[string][]string{},
}
}
func (w *WorkflowsDependancies) BindFirstTasks(depsFunc func(v string) []w.Deps, dag *Dag) {
for wfID, firstTasks := range w.FirstWfTasks {
deps := depsFunc(wfID)
if task := dag.GetTask(wfID); task != nil && len(deps) > 0 {
task.Dependencies = append(task.Dependencies, firstTasks...)
}
}
}
func (w *WorkflowsDependancies) BindRelatedTasks(dag *Dag) {
for wfID, relatedWfTasks := range w.RelatedWfTasks {
for _, dep := range relatedWfTasks {
if task := dag.GetTask(dep); task != nil {
index := -1
if i, deps := task.GetDeps(wfID); deps != "" {
index = i
}
if index != -1 {
task.Dependencies = append(task.Dependencies[:index], task.Dependencies[index+1:]...)
}
if w.LastWfTasks[wfID] != nil {
task.Dependencies = append(task.Dependencies, w.LastWfTasks[wfID]...)
}
}
}
}
}
type Workflow struct {
ApiVersion string `yaml:"apiVersion"`
Kind string `yaml:"kind"`
Metadata struct {
Name string `yaml:"name"`
} `yaml:"metadata"`
Spec Spec `yaml:"spec,omitempty"`
}
func (b *Workflow) GetDag() *Dag {
for _, t := range b.Spec.Templates {
if t.Name == "dag" {
return t.Dag
}
}
b.Spec.Templates = append(b.Spec.Templates, Template{Name: "dag", Dag: &Dag{}})
return b.Spec.Templates[len(b.Spec.Templates)-1].Dag
}
type Spec struct {
ServiceAccountName string `yaml:"serviceAccountName"`
Entrypoint string `yaml:"entrypoint"`
Arguments []Parameter `yaml:"arguments,omitempty"`
Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"`
Templates []Template `yaml:"templates"`
Timeout int `yaml:"activeDeadlineSeconds,omitempty"`
}
func GetArgoName(raw_name string, component_id string) (formatedName string) {
formatedName = strings.ReplaceAll(raw_name, " ", "-")
formatedName += "-" + component_id
formatedName = strings.ToLower(formatedName)
return
}
func TransformDepsToArgo(deps []w.Deps) []string {
argoDeps := []string{}
for _, dep := range deps {
argoDeps = append(argoDeps, GetArgoName(dep.Source, dep.Dest))
}
return argoDeps
}

View File

@@ -1,12 +1,5 @@
package models
import (
"fmt"
"strings"
"cloud.o-forge.io/core/oc-lib/models/resources"
)
type VolumeClaimTemplate struct {
Metadata struct {
Name string `yaml:"name"`
@@ -22,22 +15,3 @@ type VolumeSpec struct {
} `yaml:"requests"`
} `yaml:"resources"`
}
type VolumeMount struct {
Name string `yaml:"name"`
MountPath string `yaml:"mountPath"`
Storage *resources.StorageResource `yaml:"-"`
}
func (v *VolumeMount) BindToArgo(workflow *Workflow) { // TODO : one think about remote volume but TG
index := 0
if v.Storage.SelectedInstanceIndex != nil && (*v.Storage.SelectedInstanceIndex) >= 0 {
index = *v.Storage.SelectedInstanceIndex
}
storage := v.Storage.Instances[index]
new_volume := VolumeClaimTemplate{}
new_volume.Metadata.Name = strings.ReplaceAll(strings.ToLower(v.Name), " ", "-")
new_volume.Spec.AccessModes = []string{"ReadWriteOnce"}
new_volume.Spec.Resources.Requests.Storage = fmt.Sprintf("%v", storage.SizeGB) + storage.SizeType.ToArgo()
workflow.Spec.Volumes = append(workflow.Spec.Volumes, new_volume)
}

View File

@@ -4,14 +4,16 @@ import (
"errors"
"io"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
)
type Tool interface {
CreateArgoWorkflow(path string, ns string) (string, error)
CreateAccessSecret(ns string, login string, password string) (string, error)
CreateAccessSecret(user string, password string, storageId string, namespace string) (string, error)
GetArgoWatch(executionId string, wfName string) (watch.Interface, error)
GetPodLogger(ns string, wfName string, podName string) (io.ReadCloser, error)
GetS3Secret(storageId string, namespace string) *v1.Secret
}
var _service = map[string]func() (Tool, error){

View File

@@ -2,7 +2,6 @@ package tools
import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
@@ -13,8 +12,8 @@ import (
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
"github.com/google/uuid"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
@@ -31,7 +30,7 @@ type KubernetesTools struct {
func NewKubernetesTool() (Tool, error) {
// Load Kubernetes config (from ~/.kube/config)
config := &rest.Config{
Host: conf.GetConfig().KubeHost + ":" + conf.GetConfig().KubePort,
Host: "https://" + conf.GetConfig().KubeHost + ":" + conf.GetConfig().KubePort,
TLSClientConfig: rest.TLSClientConfig{
CAData: []byte(conf.GetConfig().KubeCA),
CertData: []byte(conf.GetConfig().KubeCert),
@@ -88,21 +87,20 @@ func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, er
return createdWf.Name, nil
}
func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password string) (string, error) {
func (k *KubernetesTools) CreateAccessSecret(access string, password string, storageId string, namespace string) (string, error) {
// Namespace where the secret will be created
namespace := "default"
// Encode the secret data (Kubernetes requires base64-encoded values)
secretData := map[string][]byte{
"access-key": []byte(base64.StdEncoding.EncodeToString([]byte(login))),
"secret-key": []byte(base64.StdEncoding.EncodeToString([]byte(password))),
"access-key": []byte(access),
"secret-key": []byte(password),
}
// Define the Secret object
name := uuid.New().String()
name := storageId+"-secret-s3"
secret := &v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
Namespace: namespace,
},
Type: v1.SecretTypeOpaque,
Data: secretData,
@@ -112,9 +110,28 @@ func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password s
if err != nil {
return "", errors.New("Error creating secret: " + err.Error())
}
return name, nil
}
func (k *KubernetesTools) GetS3Secret(storageId string, namespace string) *v1.Secret {
secret, err := k.Set.CoreV1().Secrets(namespace).Get(context.TODO(), storageId + "-secret-s3", metav1.GetOptions{})
// Get(context.TODO(),storageId + "-artifact-server", metav1.GetOptions{})
if err != nil && !k8serrors.IsNotFound(err) {
l := utils.GetLogger()
l.Fatal().Msg("An error happened when retrieving secret in " + namespace + " : " + err.Error())
}
if k8serrors.IsNotFound(err) {
return nil
}
return secret
// return secret
}
func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch.Interface, error){
options := metav1.ListOptions{FieldSelector: "metadata.name=oc-monitor-"+wfName}

View File

@@ -14,66 +14,67 @@ import (
tools "cloud.o-forge.io/core/oc-lib/tools"
)
type AdmiraltySetter struct {
Id string // ID to identify the execution, correspond to workflow_executions id
NodeName string // Allows to retrieve the name of the node used for this execution on each peer {"peerId": "nodeName"}
Id string // ID to identify the execution, correspond to workflow_executions id
NodeName string // Allows to retrieve the name of the node used for this execution on each peer {"peerId": "nodeName"}
}
func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string, remotePeerID string) error {
func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string,remotePeerID string) error {
logger := logs.GetLogger()
data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", localPeerID, nil, nil).LoadOne(remotePeerID)
data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(remotePeerID)
if data.Code != 200 {
logger.Error().Msg("Error while trying to instantiate remote peer " + remotePeerID)
return fmt.Errorf(data.Err)
}
remotePeer := data.ToPeer()
data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", localPeerID, nil, nil).LoadOne(localPeerID)
if data.Code != 200 {
logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID)
return fmt.Errorf(data.Err)
}
localPeer := data.ToPeer()
data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER),"",localPeerID,nil,nil).LoadOne(localPeerID)
if data.Code != 200 {
logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID)
return fmt.Errorf(data.Err)
}
localPeer := data.ToPeer()
caller := tools.NewHTTPCaller(
map[tools.DataType]map[tools.METHOD]string{
tools.ADMIRALTY_SOURCE: {
tools.POST: "/:id",
caller := tools.NewHTTPCaller(
map[tools.DataType]map[tools.METHOD]string{
tools.ADMIRALTY_SOURCE: {
tools.POST :"/:id",
},
tools.ADMIRALTY_KUBECONFIG: {
tools.GET:"/:id",
},
tools.ADMIRALTY_SECRET: {
tools.POST:"/:id/" + remotePeerID,
},
tools.ADMIRALTY_TARGET: {
tools.POST:"/:id/" + remotePeerID,
},
tools.ADMIRALTY_NODES: {
tools.GET:"/:id/" + remotePeerID,
},
},
tools.ADMIRALTY_KUBECONFIG: {
tools.GET: "/:id",
},
tools.ADMIRALTY_SECRET: {
tools.POST: "/:id/" + remotePeerID,
},
tools.ADMIRALTY_TARGET: {
tools.POST: "/:id/" + remotePeerID,
},
tools.ADMIRALTY_NODES: {
tools.GET: "/:id/" + remotePeerID,
},
},
)
)
logger.Info().Msg("\n\n Creating the Admiralty Source on " + remotePeerID + " ns-" + s.Id)
_ = s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict}, caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true)
s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict},caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true)
logger.Info().Msg("\n\n Retrieving kubeconfig with the secret on " + remotePeerID + " ns-" + s.Id)
kubeconfig := s.getKubeconfig(remotePeer, caller)
logger.Info().Msg("\n\n Creating a secret from the kubeconfig " + localPeerID + " ns-" + s.Id)
_ = s.callRemoteExecution(localPeer, []int{http.StatusCreated}, caller, s.Id, tools.ADMIRALTY_SECRET, tools.POST, kubeconfig, true)
logger.Info().Msg("\n\n Creating the Admiralty Target on " + localPeerID + " in namespace " + s.Id)
_ = s.callRemoteExecution(localPeer, []int{http.StatusCreated, http.StatusConflict}, caller, s.Id, tools.ADMIRALTY_TARGET, tools.POST, nil, true)
s.callRemoteExecution(localPeer, []int{http.StatusCreated}, caller,s.Id, tools.ADMIRALTY_SECRET, tools.POST,kubeconfig, true)
logger.Info().Msg("\n\n Creating the Admiralty Target on " + localPeerID + " in namespace " + s.Id )
s.callRemoteExecution(localPeer,[]int{http.StatusCreated, http.StatusConflict},caller,s.Id,tools.ADMIRALTY_TARGET,tools.POST, nil, true)
logger.Info().Msg("\n\n Checking for the creation of the admiralty node on " + localPeerID + " ns-" + s.Id)
s.checkNodeStatus(localPeer, caller)
s.checkNodeStatus(localPeer,caller)
return nil
}
func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCaller) map[string]string {
var kubedata map[string]string
_ = s.callRemoteExecution(peer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_KUBECONFIG, tools.GET, nil, true)
s.callRemoteExecution(peer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_KUBECONFIG, tools.GET, nil, true)
if caller.LastResults["body"] == nil || len(caller.LastResults["body"].([]byte)) == 0 {
l := utils.GetLogger()
l.Error().Msg("Something went wrong when retrieving data from Get call for kubeconfig")
@@ -89,9 +90,9 @@ func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCalle
return kubedata
}
func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int, caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) map[string]interface{} {
func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int,caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) {
l := utils.GetLogger()
resp, err := peer.LaunchPeerExecution(peer.UUID, dataID, dt, method, body, caller)
_, err := peer.LaunchPeerExecution(peer.UUID, dataID, dt, method, body, caller)
if err != nil {
l.Error().Msg("Error when executing on peer at" + peer.Url)
l.Error().Msg(err.Error())
@@ -108,10 +109,9 @@ func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int,
}
}
return resp
}
func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller) {
func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller){
var data map[string]interface{}
if resp, ok := caller.LastResults["body"]; ok {
json.Unmarshal(resp.([]byte), &data)
@@ -128,10 +128,10 @@ func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller) {
}
}
func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller) {
for i := range 5 {
func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller){
for i := range(5) {
time.Sleep(10 * time.Second) // let some time for kube to generate the node
_ = s.callRemoteExecution(localPeer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_NODES, tools.GET, nil, false)
s.callRemoteExecution(localPeer,[]int{http.StatusOK},caller,s.Id,tools.ADMIRALTY_NODES,tools.GET, nil, false)
if caller.LastResults["code"] == 200 {
s.storeNodeName(caller)
return
@@ -142,5 +142,5 @@ func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HT
}
logger.Info().Msg("Could not verify that node is up. Retrying...")
}
}
}

View File

@@ -5,18 +5,21 @@
package workflow_builder
import (
"errors"
"fmt"
"oc-monitord/conf"
"oc-monitord/models"
. "oc-monitord/models"
tools2 "oc-monitord/tools"
"os"
"strings"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/resources"
w "cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
"github.com/nwtgck/go-fakelish"
"github.com/rs/zerolog"
"gopkg.in/yaml.v3"
@@ -26,20 +29,49 @@ var logger zerolog.Logger
type ArgoBuilder struct {
OriginWorkflow *w.Workflow
Workflow *models.Workflow
Workflow Workflow
Services []*Service
Timeout int
RemotePeers []string
}
type Workflow struct {
ApiVersion string `yaml:"apiVersion"`
Kind string `yaml:"kind"`
Metadata struct {
Name string `yaml:"name"`
} `yaml:"metadata"`
Spec Spec `yaml:"spec,omitempty"`
}
func (b *Workflow) getDag() *Dag {
for _, t := range b.Spec.Templates {
if t.Name == "dag" {
return t.Dag
}
}
b.Spec.Templates = append(b.Spec.Templates, Template{Name: "dag", Dag: &Dag{}})
return b.Spec.Templates[len(b.Spec.Templates)-1].Dag
}
type Spec struct {
ServiceAccountName string `yaml:"serviceAccountName,omitempty"`
Entrypoint string `yaml:"entrypoint"`
Arguments []Parameter `yaml:"arguments,omitempty"`
Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"`
Templates []Template `yaml:"templates"`
Timeout int `yaml:"activeDeadlineSeconds,omitempty"`
}
// TODO: found on a processing instance linked to storage
// add s3, gcs, azure, etc if needed on a link between processing and storage
func (b *ArgoBuilder) CreateDAG(namespace string, write bool) (int, []string, []string, error) {
func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, []string, error) {
logger = logs.GetLogger()
logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items))
// handle services by checking if there is only one processing with hostname and port
firstItems, lastItems, volumes := b.createTemplates(namespace)
b.createVolumes(volumes)
if b.Timeout > 0 {
b.Workflow.Spec.Timeout = b.Timeout
}
@@ -47,16 +79,21 @@ func (b *ArgoBuilder) CreateDAG(namespace string, write bool) (int, []string, []
b.Workflow.Spec.Entrypoint = "dag"
b.Workflow.ApiVersion = "argoproj.io/v1alpha1"
b.Workflow.Kind = "Workflow"
return len(b.Workflow.GetDag().Tasks), firstItems, lastItems, nil
if !write {
return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
}
return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
}
func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []models.VolumeMount) {
volumes := []models.VolumeMount{}
func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []VolumeMount) {
volumes := []VolumeMount{}
firstItems := []string{}
lastItems := []string{}
items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing)
logger.Info().Msg(fmt.Sprint("Creating templates", len(items)))
for _, item := range items {
for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) {
instance := item.Processing.GetSelectedInstance()
logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance))
if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil {
@@ -66,68 +103,89 @@ func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []m
volumes, firstItems, lastItems = b.createArgoTemplates(namespace,
item.ID, item.Processing, volumes, firstItems, lastItems)
}
wfDeps := models.NewWorkflowDependancies()
for _, workflowID := range b.OriginWorkflow.Workflows {
b.createWorkflowArgoTemplate(workflowID, namespace, wfDeps)
firstWfTasks := map[string][]string{}
latestWfTasks := map[string][]string{}
relatedWfTasks := map[string][]string{}
for _, wf := range b.OriginWorkflow.Workflows {
realWorkflow, code, err := w.NewAccessor(nil).LoadOne(wf)
if code != 200 {
logger.Error().Msg("Error loading the workflow : " + err.Error())
continue
}
subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Timeout: b.Timeout}
_, fi, li, err := subBuilder.CreateDAG(namespace, false)
if err != nil {
logger.Error().Msg("Error creating the subworkflow : " + err.Error())
continue
}
firstWfTasks[wf] = fi
if ok, depsOfIds := subBuilder.isArgoDependancy(wf); ok { // IS BEFORE
latestWfTasks[wf] = li
relatedWfTasks[wf] = depsOfIds
}
subDag := subBuilder.Workflow.getDag()
d := b.Workflow.getDag()
d.Tasks = append(d.Tasks, subDag.Tasks...) // add the tasks of the subworkflow to the main workflow
b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, subBuilder.Workflow.Spec.Templates...)
b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, subBuilder.Workflow.Spec.Volumes...)
b.Workflow.Spec.Arguments = append(b.Workflow.Spec.Arguments, subBuilder.Workflow.Spec.Arguments...)
b.Services = append(b.Services, subBuilder.Services...)
}
for wfID, depsOfIds := range relatedWfTasks {
for _, dep := range depsOfIds {
for _, task := range b.Workflow.getDag().Tasks {
if strings.Contains(task.Name, dep) {
index := -1
for i, depp := range task.Dependencies {
if strings.Contains(depp, wfID) {
index = i
break
}
}
if index != -1 {
task.Dependencies = append(task.Dependencies[:index], task.Dependencies[index+1:]...)
}
task.Dependencies = append(task.Dependencies, latestWfTasks[wfID]...)
}
}
}
}
for wfID, fi := range firstWfTasks {
deps := b.getArgoDependencies(wfID)
if len(deps) > 0 {
for _, dep := range fi {
for _, task := range b.Workflow.getDag().Tasks {
if strings.Contains(task.Name, dep) {
task.Dependencies = append(task.Dependencies, deps...)
}
}
}
}
}
wfDeps.BindRelatedTasks(b.Workflow.GetDag())
wfDeps.BindFirstTasks(b.OriginWorkflow.GetDependencies, b.Workflow.GetDag())
if b.Services != nil {
dag := b.Workflow.GetDag()
dag := b.Workflow.getDag()
dag.Tasks = append(dag.Tasks, Task{Name: "workflow-service-pod", Template: "workflow-service-pod"})
b.addServiceToArgo()
}
return firstItems, lastItems, volumes
}
func (b *ArgoBuilder) createWorkflowArgoTemplate(
workflowID string,
namespace string,
wfDeps *models.WorkflowsDependancies,
) {
realWorkflow, code, err := w.NewAccessor(nil).LoadOne(workflowID)
if code != 200 {
logger.Error().Msg("Error loading the workflow : " + err.Error())
return
}
subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Workflow: &models.Workflow{}, Timeout: b.Timeout}
_, fi, li, err := subBuilder.CreateDAG(namespace, false)
if err != nil {
logger.Error().Msg("Error creating the subworkflow : " + err.Error())
return
}
wfDeps.FirstWfTasks[workflowID] = fi
if depsOfIds := subBuilder.OriginWorkflow.IsDependancy(workflowID); len(depsOfIds) > 0 { // IS BEFORE
wfDeps.LastWfTasks[workflowID] = li
wfDeps.RelatedWfTasks[workflowID] = models.TransformDepsToArgo(depsOfIds)
}
subDag := subBuilder.Workflow.GetDag()
d := b.Workflow.GetDag()
d.Tasks = append(d.Tasks, subDag.Tasks...) // add the tasks of the subworkflow to the main workflow
b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, subBuilder.Workflow.Spec.Templates...)
b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, subBuilder.Workflow.Spec.Volumes...)
b.Workflow.Spec.Arguments = append(b.Workflow.Spec.Arguments, subBuilder.Workflow.Spec.Arguments...)
b.Services = append(b.Services, subBuilder.Services...)
}
func (b *ArgoBuilder) createArgoTemplates(
namespace string,
func (b *ArgoBuilder) createArgoTemplates(namespace string,
id string,
processing *resources.ProcessingResource,
volumes []models.VolumeMount,
volumes []VolumeMount,
firstItems []string,
lastItems []string,
) ([]models.VolumeMount, []string, []string) {
_, firstItems, lastItems = NewTask(processing.Name, id).BindToArgo(b.Workflow.GetDag(), id, b.OriginWorkflow, processing, firstItems, lastItems)
template := &Template{Name: models.GetArgoName(processing.GetName(), id)}
lastItems []string) ([]VolumeMount, []string, []string) {
_, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems)
template := &Template{Name: getArgoName(processing.GetName(), id)}
logger.Info().Msg(fmt.Sprint("Creating template for", template.Name))
template.CreateContainer(processing, b.Workflow.GetDag())
if err := b.RepartiteProcess(*processing, id, template, namespace); err != nil {
logger.Error().Msg(fmt.Sprint("problem to sets up repartition expected %v", err.Error()))
return volumes, firstItems, lastItems
isReparted, peerId := b.isProcessingReparted(*processing, id)
template.CreateContainer(processing, b.Workflow.getDag())
if isReparted {
logger.Debug().Msg("Reparted processing, on " + peerId)
b.RemotePeers = append(b.RemotePeers, peerId)
template.AddAdmiraltyAnnotations(peerId)
}
// get datacenter from the processing
if processing.IsService {
@@ -135,97 +193,319 @@ func (b *ArgoBuilder) createArgoTemplates(
template.Metadata.Labels = make(map[string]string)
template.Metadata.Labels["app"] = "oc-service-" + processing.GetName() // Construct the template for the k8s service and add a link in graph between k8s service and processing
}
volumes = b.addStorageAnnotations(id, template, namespace, volumes)
b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, *template)
return volumes, firstItems, lastItems
}
func (b *ArgoBuilder) createVolumes(volumes []models.VolumeMount) { // TODO : one think about remote volume but TG
for _, volume := range volumes {
volume.BindToArgo(b.Workflow)
func (b *ArgoBuilder) addStorageAnnotations(id string, template *Template, namespace string, volumes []VolumeMount) []VolumeMount {
related := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.Graph.IsStorage) // Retrieve all of the storage node linked to the processing for which we create the template
for _, r := range related {
storage := r.Node.(*resources.StorageResource)
for _, linkToStorage := range r.Links {
for _, rw := range linkToStorage.StorageLinkInfos {
var art Artifact
artifactBaseName := strings.Join(strings.Split(storage.GetName(), " "), "-") + "-" + strings.Replace(rw.FileName, ".", "-", -1) // Parameter/Artifact name must consist of alpha-numeric characters, '_' or '-'
if rw.Write {
art = Artifact{Path: template.ReplacePerEnv(rw.Source, linkToStorage.Env)} // When we are writing to the s3 the Path element is the path to the file in the pod
art.Name = artifactBaseName + "-input-write"
} else {
art = Artifact{Path: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env)} // When we are reading from the s3 the Path element in pod should be the destination of the file
art.Name = artifactBaseName + "-input-read"
}
if storage.StorageType == enum.S3 {
b.addS3annotations(&art, template, rw, linkToStorage, storage, namespace)
}
if rw.Write {
template.Outputs.Artifacts = append(template.Inputs.Artifacts, art)
} else {
template.Inputs.Artifacts = append(template.Outputs.Artifacts, art)
}
}
}
index := 0
if storage.SelectedInstanceIndex != nil && (*storage.SelectedInstanceIndex) >= 0 {
index = *storage.SelectedInstanceIndex
}
s := storage.Instances[index]
if s.Local {
volumes = template.Container.AddVolumeMount(VolumeMount{
Name: strings.ReplaceAll(strings.ToLower(storage.GetName()), " ", "-"),
MountPath: s.Source,
Storage: storage,
}, volumes)
}
}
return volumes
}
func (b *ArgoBuilder) addS3annotations(art *Artifact, template *Template, rw graph.StorageProcessingGraphLink, linkToStorage graph.GraphLink, storage *resources.StorageResource, namespace string) {
art.S3 = &Key{
// Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env),
Insecure: true, // temporary
}
if rw.Write {
art.S3.Key = rw.Destination + "/" + rw.FileName
} else {
art.S3.Key = rw.Source
}
sel := storage.GetSelectedInstance()
// v0.1 : add the storage.Source to the s3 object
// v0.2 : test if the storage.Source exists in the configMap and quit if not
// v1 : v0.2 + if doesn't exist edit/create the configMap with the response from API call
if sel != nil {
b.addAuthInformation(storage, namespace, art)
art.S3.Bucket = namespace // DEFAULT : will need to update this to create an unique
art.S3.EndPoint = sel.(*resources.StorageResourceInstance).Source
}
}
func (b *ArgoBuilder) addAuthInformation(storage *resources.StorageResource, namespace string, art *Artifact) {
sel := storage.GetSelectedInstance()
tool, err := tools2.NewService(conf.GetConfig().Mode)
if err != nil || tool == nil {
logger.Fatal().Msg("Could not create the access secret :" + err.Error())
}
secretName, err := b.SetupS3Credentials(storage, namespace, tool) // this method return should be updated once we have decided how to retrieve credentials
if err == nil {
art.S3.AccessKeySecret = &Secret{
Name: secretName,
Key: "access-key",
}
art.S3.SecretKeySecret = &Secret{
Name: secretName,
Key: "secret-key",
}
}
art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source+"/", "")
art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source, "")
splits := strings.Split(art.S3.EndPoint, "/")
if len(splits) > 1 {
art.S3.Bucket = splits[0]
art.S3.EndPoint = strings.Join(splits[1:], "/")
} else {
art.S3.Bucket = splits[0]
}
}
func (b *ArgoBuilder) SetupS3Credentials(storage *resources.StorageResource, namespace string, tool tools2.Tool) (string, error) {
s := tool.GetS3Secret(storage.UUID, namespace)
// var s *v1.Secret
accessKey, secretKey := retrieveMinioCredential("peer",namespace)
if s == nil {
id, err := tool.CreateAccessSecret(
accessKey,
secretKey,
storage.UUID,
namespace,
)
if err != nil {
l := oclib.GetLogger()
l.Fatal().Msg("Error when creating the secret holding credentials for S3 access in " + namespace + " : " + err.Error())
}
return id, nil
}
// s.Name = "toto"
return s.Name, nil
}
// This method needs to evolve to an API call to the peer passed as a parameter
func retrieveMinioCredential(peer string, namespace string) (string,string) {
return "hF9wRGog75JuMdshWeEZ", "OwXXJkVQyb5l1aVPdOegKOtDJGoP1dJYeo8O7mDW"
}
func (b *ArgoBuilder) addTaskToArgo(dag *Dag, graphItemID string, processing *resources.ProcessingResource,
firstItems []string, lastItems []string) (*Dag, []string, []string) {
unique_name := getArgoName(processing.GetName(), graphItemID)
step := Task{Name: unique_name, Template: unique_name}
instance := processing.GetSelectedInstance()
if instance != nil {
for _, value := range instance.(*resources.ProcessingInstance).Env {
step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{
Name: value.Name,
Value: value.Value,
})
}
for _, value := range instance.(*resources.ProcessingInstance).Inputs {
step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{
Name: value.Name,
Value: value.Value,
})
}
for _, value := range instance.(*resources.ProcessingInstance).Outputs {
step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{
Name: value.Name,
Value: value.Value,
})
}
}
step.Dependencies = b.getArgoDependencies(graphItemID)
name := ""
if b.OriginWorkflow.Graph.Items[graphItemID].Processing != nil {
name = b.OriginWorkflow.Graph.Items[graphItemID].Processing.GetName()
}
if b.OriginWorkflow.Graph.Items[graphItemID].Workflow != nil {
name = b.OriginWorkflow.Graph.Items[graphItemID].Workflow.GetName()
}
if len(step.Dependencies) == 0 && name != "" {
firstItems = append(firstItems, getArgoName(name, graphItemID))
}
if ok, _ := b.isArgoDependancy(graphItemID); !ok && name != "" {
lastItems = append(lastItems, getArgoName(name, graphItemID))
}
dag.Tasks = append(dag.Tasks, step)
return dag, firstItems, lastItems
}
func (b *ArgoBuilder) createVolumes(volumes []VolumeMount) { // TODO : one think about remote volume but TG
for _, volume := range volumes {
index := 0
if volume.Storage.SelectedInstanceIndex != nil && (*volume.Storage.SelectedInstanceIndex) >= 0 {
index = *volume.Storage.SelectedInstanceIndex
}
storage := volume.Storage.Instances[index]
new_volume := VolumeClaimTemplate{}
new_volume.Metadata.Name = strings.ReplaceAll(strings.ToLower(volume.Name), " ", "-")
new_volume.Spec.AccessModes = []string{"ReadWriteOnce"}
new_volume.Spec.Resources.Requests.Storage = fmt.Sprintf("%v", storage.SizeGB) + storage.SizeType.ToArgo()
b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume)
}
}
func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) {
dependancyOfIDs := []string{}
isDeps := false
for _, link := range b.OriginWorkflow.Graph.Links {
if _, ok := b.OriginWorkflow.Graph.Items[link.Destination.ID]; !ok {
logger.Info().Msg(fmt.Sprint("Could not find the source of the link", link.Destination.ID))
continue
}
source := b.OriginWorkflow.Graph.Items[link.Destination.ID].Processing
if id == link.Source.ID && source != nil {
isDeps = true
dependancyOfIDs = append(dependancyOfIDs, getArgoName(source.GetName(), link.Destination.ID))
}
wourceWF := b.OriginWorkflow.Graph.Items[link.Destination.ID].Workflow
if id == link.Source.ID && wourceWF != nil {
isDeps = true
dependancyOfIDs = append(dependancyOfIDs, getArgoName(wourceWF.GetName(), link.Destination.ID))
}
}
return isDeps, dependancyOfIDs
}
func (b *ArgoBuilder) getArgoDependencies(id string) (dependencies []string) {
for _, link := range b.OriginWorkflow.Graph.Links {
if _, ok := b.OriginWorkflow.Graph.Items[link.Source.ID]; !ok {
logger.Info().Msg(fmt.Sprint("Could not find the source of the link", link.Source.ID))
continue
}
source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing
if id == link.Destination.ID && source != nil {
dependency_name := getArgoName(source.GetName(), link.Source.ID)
dependencies = append(dependencies, dependency_name)
continue
}
}
return
}
func getArgoName(raw_name string, component_id string) (formatedName string) {
formatedName = strings.ReplaceAll(raw_name, " ", "-")
formatedName += "-" + component_id
formatedName = strings.ToLower(formatedName)
return
}
// Verify if a processing resource is attached to another Compute than the one hosting
// the current Open Cloud instance. If true return the peer ID to contact
func (b *ArgoBuilder) RepartiteProcess(processing resources.ProcessingResource, graphID string, template *models.Template, namespace string) error {
computeAttached := b.OriginWorkflow.GetByRelatedProcessing(processing.GetID(), b.OriginWorkflow.Graph.IsCompute)
if len(computeAttached) == 0 {
return errors.New("No compute was found attached to processing " + processing.Name + " : " + processing.UUID)
func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource, graphID string) (bool, string) {
computeAttached := b.retrieveProcessingCompute(graphID)
if computeAttached == nil {
logger.Error().Msg("No compute was found attached to processing " + processing.Name + " : " + processing.UUID)
panic(0)
}
// Creates an accessor srtictly for Peer Collection
for _, related := range computeAttached {
instance := related.Node.GetSelectedInstance().(*resources.ComputeResourceInstance)
if instance == nil {
continue
}
partner := instance.GetSelectedPartnership(conf.GetConfig().PeerID, conf.GetConfig().Groups)
if partner == nil {
logger.Error().Msg("can't proceed on datacenter because of missing pricing profiles " + related.Node.GetID())
continue
}
garanteed, allowed := b.setResourcesAllowedAndGaranteed(b.Workflow.GetDag(), models.NewBounds(), models.NewBounds(), "gpu", partner)
garanteed, allowed = b.setResourcesAllowedAndGaranteed(b.Workflow.GetDag(), garanteed, allowed, "cpu", partner)
garanteed.Set(float64(partner.(*resources.ComputeResourcePartnership).MinGaranteedRAMSize), "ram", false)
allowed.Set(float64(partner.(*resources.ComputeResourcePartnership).MaxAllowedRAMSize), "ram", false)
res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil).LoadOne(related.Node.GetCreatorID())
if res.Err != "" {
return errors.New(res.Err)
}
peer := *res.ToPeer()
isNotReparted := peer.State == 1
logger.Info().Msg(fmt.Sprint("Result IsMySelf for ", peer.UUID, " : ", isNotReparted))
if !(isNotReparted) {
logger.Debug().Msg("Reparted processing, on " + peer.UUID)
b.RemotePeers = append(b.RemotePeers, peer.UUID)
template.AddAdmiraltyAnnotations(peer.UUID, namespace)
}
req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil)
if req == nil {
fmt.Println("TODO : handle error when trying to create a request on the Peer Collection")
return false, ""
}
res := req.LoadOne(computeAttached.CreatorID)
if res.Err != "" {
fmt.Print("TODO : handle error when requesting PeerID")
fmt.Print(res.Err)
return false, ""
}
peer := *res.ToPeer()
isNotReparted := peer.State == 1
logger.Info().Msg(fmt.Sprint("Result IsMySelf for ", peer.UUID, " : ", isNotReparted))
return !isNotReparted, peer.UUID
}
func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.ComputeResource {
for _, link := range b.OriginWorkflow.Graph.Links {
// If a link contains the id of the processing
var oppositeId string
if link.Source.ID == graphID {
oppositeId = link.Destination.ID
} else if link.Destination.ID == graphID {
oppositeId = link.Source.ID
}
if oppositeId != "" {
dt, res := b.OriginWorkflow.Graph.GetResource(oppositeId)
if dt == oclib.COMPUTE_RESOURCE {
return res.(*resources.ComputeResource)
} else {
continue
}
}
}
return nil
}
func (b *ArgoBuilder) setResourcesAllowedAndGaranteed(dag *Dag, minbound *models.Bounds, maxbound *models.Bounds, typ string, partner resources.ResourcePartnerITF) (*models.Bounds, *models.Bounds) {
selector := ""
values := map[string]float64{}
if typ == "gpu" {
values = partner.(*resources.ComputeResourcePartnership).MinGaranteedGPUsMemoryGB
} else {
values = partner.(*resources.ComputeResourcePartnership).MinGaranteedCPUsCores
}
for name, GPU := range values {
if minbound.Set(float64(GPU), typ, true) {
selector = name
}
}
if selector != "" {
for _, t := range dag.Tasks {
t.NodeSelector[typ+"-type"] = selector
}
}
if typ == "gpu" {
values = partner.(*resources.ComputeResourcePartnership).MaxAllowedGPUsMemoryGB
} else {
values = partner.(*resources.ComputeResourcePartnership).MaxAllowedCPUsCores
}
if max, ok := values[selector]; ok {
maxbound.Set(float64(max), typ, false)
} else {
maxbound.GPU = minbound.GPU
}
return minbound, maxbound
}
// Execute the last actions once the YAML file for the Argo Workflow is created
func (b *ArgoBuilder) CompleteBuild(namespace string) (string, error) {
logger.Info().Msg("DEV :: Completing build")
setter := AdmiraltySetter{Id: namespace}
func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) {
logger.Info().Msg(fmt.Sprint("DEV :: Completing build"))
setter := AdmiraltySetter{Id: executionsId}
// Setup admiralty for each node
for _, peer := range b.RemotePeers {
logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer))
setter.InitializeAdmiralty(conf.GetConfig().PeerID, peer)
setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer)
}
// Update the name of the admiralty node to use
for _, template := range b.Workflow.Spec.Templates {
if len(template.Metadata.Annotations) > 0 {
if peerId, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok {
template.Metadata.Annotations["multicluster.admiralty.io/clustername"] = "target-" + oclib.GetConcatenatedName(peerId, executionsId)
}
}
}
// Generate the YAML file
random_name := fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8)
b.Workflow.Metadata.Name = "oc-monitor-" + random_name
@@ -233,14 +513,14 @@ func (b *ArgoBuilder) CompleteBuild(namespace string) (string, error) {
yamlified, err := yaml.Marshal(b.Workflow)
if err != nil {
logger.Error().Msg("Could not transform object to yaml file")
return "", err
return "", err
}
// Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss
current_timestamp := time.Now().Format("02_01_2006_150405")
file_name := random_name + "_" + current_timestamp + ".yml"
workflows_dir := "./argo_workflows/"
err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660)
if err != nil {
logger.Error().Msg("Could not write the yaml file")
return "", err

View File

@@ -5,6 +5,7 @@ import (
"strings"
"cloud.o-forge.io/core/oc-lib/models/resources"
"gopkg.in/yaml.v3"
)
func (b *ArgoBuilder) CreateService(id string, processing *resources.ProcessingResource) {
@@ -46,9 +47,20 @@ func (b *ArgoBuilder) completeServicePorts(service *models.Service, id string, p
func (b *ArgoBuilder) addServiceToArgo() error {
for _, service := range b.Services {
if err := service.BindToArgo(b.Workflow); err != nil {
service_manifest, err := yaml.Marshal(service)
if err != nil {
return err
}
service_template := models.Template{Name: "workflow-service-pod",
Resource: models.ServiceResource{
Action: "create",
SuccessCondition: "status.succeeded > 0",
FailureCondition: "status.failed > 3",
SetOwnerReference: true,
Manifest: string(service_manifest),
},
}
b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, service_template)
}
return nil
}

View File

@@ -3,7 +3,6 @@ package workflow_builder
import (
"errors"
"fmt"
"oc-monitord/models"
oclib "cloud.o-forge.io/core/oc-lib"
workflow "cloud.o-forge.io/core/oc-lib/models/workflow"
@@ -49,7 +48,7 @@ func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, i
return nil, 0, fmt.Errorf("can't export a graph that has not been loaded yet")
}
argoBuilder := ArgoBuilder{OriginWorkflow: w.Workflow, Workflow: &models.Workflow{}, Timeout: timeout}
argoBuilder := ArgoBuilder{OriginWorkflow: w.Workflow, Timeout: timeout}
stepMax, _, _, err := argoBuilder.CreateDAG(namespace, true)
if err != nil {
logger.Error().Msg("Could not create the argo file for " + w.Workflow.Name)