diff --git a/go.mod b/go.mod index 9596192..1e88349 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,9 @@ module oc-monitord -go 1.23.1 - -toolchain go1.23.3 +go 1.25.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260203150531-ef916fe2d995 + cloud.o-forge.io/core/oc-lib v0.0.0-20260224130821-ce8ef70516f7 github.com/akamensky/argparse v1.4.0 github.com/google/uuid v1.6.0 github.com/goraz/onion v0.1.3 @@ -16,15 +14,21 @@ require ( require ( github.com/beego/beego/v2 v2.3.8 // indirect + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect github.com/go-playground/validator/v10 v10.27.0 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect + github.com/libp2p/go-libp2p/core v0.43.0-rc2 // indirect github.com/sirupsen/logrus v1.9.3 // indirect github.com/ugorji/go/codec v1.1.7 // indirect + go.yaml.in/yaml/v2 v2.4.3 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240227224415-6ceb2ff114de // indirect google.golang.org/grpc v1.63.0 // indirect + sigs.k8s.io/randfill v1.0.0 // indirect + sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect ) require ( @@ -33,10 +37,10 @@ require ( github.com/biter777/countries v1.7.5 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/emicklei/go-restful/v3 v3.11.0 // indirect - github.com/fxamacker/cbor/v2 v2.7.0 // indirect + github.com/emicklei/go-restful/v3 v3.12.2 // indirect + github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/gabriel-vasile/mimetype v1.4.9 // indirect - github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/logr v1.4.3 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.20.4 // indirect github.com/go-openapi/swag v0.23.0 // indirect @@ -44,7 +48,7 @@ require ( github.com/go-playground/universal-translator v0.18.1 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/snappy v1.0.0 // indirect - github.com/google/gnostic-models v0.6.8 // indirect + github.com/google/gnostic-models v0.7.0 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/gofuzz v1.2.0 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect @@ -57,7 +61,7 @@ require ( github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect github.com/montanaflynn/stats v0.7.1 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nats-io/nats.go v1.44.0 // indirect @@ -77,24 +81,24 @@ require ( github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect go.mongodb.org/mongo-driver v1.17.4 // indirect - golang.org/x/crypto v0.41.0 // indirect - golang.org/x/net v0.43.0 // indirect + golang.org/x/crypto v0.44.0 // indirect + golang.org/x/net v0.47.0 // indirect golang.org/x/oauth2 v0.30.0 // indirect - golang.org/x/sync v0.16.0 // indirect - golang.org/x/sys v0.35.0 // indirect - golang.org/x/term v0.34.0 // indirect - golang.org/x/text v0.28.0 // indirect - golang.org/x/time v0.7.0 // indirect - google.golang.org/protobuf v1.36.7 // indirect - gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect + golang.org/x/sync v0.18.0 // indirect + golang.org/x/sys v0.38.0 // indirect + golang.org/x/term v0.37.0 // indirect + golang.org/x/text v0.31.0 // indirect + golang.org/x/time v0.9.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect + gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect - k8s.io/api v0.32.1 - k8s.io/apimachinery v0.32.1 - k8s.io/client-go v0.32.1 + k8s.io/api v0.35.1 + k8s.io/apimachinery v0.35.1 + k8s.io/client-go v0.35.1 k8s.io/klog/v2 v2.130.1 // indirect - k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect - k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 // indirect - sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect + k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect + k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 // indirect + sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect - sigs.k8s.io/yaml v1.4.0 // indirect + sigs.k8s.io/yaml v1.6.0 // indirect ) diff --git a/go.sum b/go.sum index 10a0d2b..0809ab5 100644 --- a/go.sum +++ b/go.sum @@ -26,6 +26,16 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260203083753-4f28b9b589d6 h1:N+0xkioACl3PN cloud.o-forge.io/core/oc-lib v0.0.0-20260203083753-4f28b9b589d6/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= cloud.o-forge.io/core/oc-lib v0.0.0-20260203150531-ef916fe2d995 h1:ZDRvnzTTNHgMm5hYmseHdEPqQ6rn/4v+P9f/JIxPaNw= cloud.o-forge.io/core/oc-lib v0.0.0-20260203150531-ef916fe2d995/go.mod h1:T0UCxRd8w+qCVVC0NEyDiWIGC5ADwEbQ7hFcvftd4Ks= +cloud.o-forge.io/core/oc-lib v0.0.0-20260212123952-403913d8cf13 h1:DNIPQ7C+7wjbj5RUx29wLxuIe/wiSOcuUMlLRIv6Fvs= +cloud.o-forge.io/core/oc-lib v0.0.0-20260212123952-403913d8cf13/go.mod h1:jmyBwmsac/4V7XPL347qawF60JsBCDmNAMfn/ySXKYo= +cloud.o-forge.io/core/oc-lib v0.0.0-20260224093610-a9ebad78f3a8 h1:xoC5PAz1469QxrNm8rrsq5+BtwshEt+L2Nhf90MrqrM= +cloud.o-forge.io/core/oc-lib v0.0.0-20260224093610-a9ebad78f3a8/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260224120019-0f6aa1fe7881 h1:1JUGErc+3Runda7iapS5sieH+yFqWrGp+ljv7Kly+hc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260224120019-0f6aa1fe7881/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260224122900-d18b031a293a h1:gdr886O31Ai5pEFgJC/mrJMJdhplnQg+UJdZF9mV1n4= +cloud.o-forge.io/core/oc-lib v0.0.0-20260224122900-d18b031a293a/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260224130821-ce8ef70516f7 h1:p9uJjMY+QkE4neA+xRmIRtAm9us94EKZqgajDdLOd0Y= +cloud.o-forge.io/core/oc-lib v0.0.0-20260224130821-ce8ef70516f7/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc= github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= @@ -55,10 +65,14 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 h1:NMZiJj8QnKe1LgsbDayM4UoHwbvwDRwnI3hwNaAHRnc= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40= github.com/elazarl/go-bindata-assetfs v1.0.1 h1:m0kkaHRKEu7tUIUFVwhGGGYClXvyl4RE03qmvRTNfbw= github.com/elazarl/go-bindata-assetfs v1.0.1/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= github.com/emicklei/go-restful/v3 v3.11.0 h1:rAQeMHw1c7zTmncogyy8VvRZwtkmkZ4FxERmMY4rD+g= github.com/emicklei/go-restful/v3 v3.11.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= +github.com/emicklei/go-restful/v3 v3.12.2 h1:DhwDP0vY3k8ZzE0RunuJy8GhNpPL6zqLkDf9B/a0/xU= +github.com/emicklei/go-restful/v3 v3.12.2/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -67,11 +81,15 @@ github.com/etcd-io/etcd v3.3.17+incompatible/go.mod h1:cdZ77EstHBwVtD6iTgzgvogwc github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= +github.com/fxamacker/cbor/v2 v2.9.0 h1:NpKPmjDBgUfBms6tr6JZkTHtfFGcMKsw3eGcmD/sapM= +github.com/fxamacker/cbor/v2 v2.9.0/go.mod h1:vM4b+DJCtHn+zz7h3FFp/hDAI9WNWCsZj23V5ytsSxQ= github.com/gabriel-vasile/mimetype v1.4.9 h1:5k+WDwEsD9eTLL8Tz3L0VnmVh9QxGjRmjBvAG7U/oYY= github.com/gabriel-vasile/mimetype v1.4.9/go.mod h1:WnSQhFKJuBlRyLiKohA/2DtIlPFAbguNaG7QCHcyGok= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-openapi/jsonpointer v0.21.0 h1:YgdVicSA9vH5RiHs9TZW5oyafXZFc6+2Vc1rr/O9oNQ= github.com/go-openapi/jsonpointer v0.21.0/go.mod h1:IUyH9l/+uyhIYQ/PXVA41Rexl+kOkAPDdXEYns6fzUY= github.com/go-openapi/jsonreference v0.20.4 h1:bKlDxQxQJgwpUSgOENiMPzCTBVuc7vTdXSSgNeAhojU= @@ -104,6 +122,8 @@ github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/gnostic-models v0.6.8 h1:yo/ABAfM5IMRsS1VnXjTBvUb61tFIHozhlYvRgGre9I= github.com/google/gnostic-models v0.6.8/go.mod h1:5n7qKqH0f5wFt+aWF8CW6pZLLNOfYuF5OpfBSENuI8U= +github.com/google/gnostic-models v0.7.0 h1:qwTtogB15McXDaNqTZdzPJRHvaVJlAl+HVQnLmJEJxo= +github.com/google/gnostic-models v0.7.0/go.mod h1:whL5G0m6dmc5cPxKc5bdKdEN3UjI7OUGxBlw57miDrQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -147,6 +167,8 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/libp2p/go-libp2p/core v0.43.0-rc2 h1:1X1aDJNWhMfodJ/ynbaGLkgnC8f+hfBIqQDrzxFZOqI= +github.com/libp2p/go-libp2p/core v0.43.0-rc2/go.mod h1:NYeJ9lvyBv9nbDk2IuGb8gFKEOkIv/W5YRIy1pAJB2Q= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= @@ -169,6 +191,8 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee h1:W5t00kpgFdJifH4BDsTlE89Zl93FEloxaWZfGcifgq8= +github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= @@ -266,6 +290,10 @@ go.mongodb.org/mongo-driver v1.17.3 h1:TQyXhnsWfWtgAhMtOgtYHMTkZIfBTpMTsMnd9ZBeH go.mongodb.org/mongo-driver v1.17.3/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ= go.mongodb.org/mongo-driver v1.17.4 h1:jUorfmVzljjr0FLzYQsGP8cgN/qzzxlY9Vh0C9KFXVw= go.mongodb.org/mongo-driver v1.17.4/go.mod h1:Hy04i7O2kC4RS06ZrhPRqj/u4DTYkFDAAccj+rVKqgQ= +go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= +go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= +go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -280,6 +308,8 @@ golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= +golang.org/x/crypto v0.44.0 h1:A97SsFvM3AIwEEmTBiaxPPTYpDC47w720rdiiUvgoAU= +golang.org/x/crypto v0.44.0/go.mod h1:013i+Nw79BMiQiMsOPcVCB5ZIJbYkerPrGnOa00tvmc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -307,6 +337,8 @@ golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= +golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= +golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.30.0 h1:dnDm7JmhM45NNpd8FDDeLhK6FwqbOf4MLCM9zb1BOHI= @@ -324,6 +356,8 @@ golang.org/x/sync v0.15.0 h1:KWH3jNZsfyT6xfAfKiz6MRNmd46ByHDYaZ7KSkCtdW8= golang.org/x/sync v0.15.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -345,6 +379,8 @@ golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= +golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.32.0 h1:DR4lr0TjUs3epypdhTOkMmuF5CDFJ/8pOnbzMZPQ7bg= @@ -353,6 +389,8 @@ golang.org/x/term v0.33.0 h1:NuFncQrRcaRvVmgRkvM3j/F00gWIAlcmlB8ACEKmGIg= golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0= golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4= golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw= +golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU= +golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= @@ -365,8 +403,12 @@ golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= +golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= +golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -405,11 +447,15 @@ google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9x google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A= google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/evanphx/json-patch.v4 v4.12.0 h1:n6jtcsulIzXPJaxegRbvFNNrZDjbij7ny3gmSPG+6V4= gopkg.in/evanphx/json-patch.v4 v4.12.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M= +gopkg.in/evanphx/json-patch.v4 v4.13.0 h1:czT3CmqEaQ1aanPc5SdlgQrrEIb8w/wwCvWWnfEbYzo= +gopkg.in/evanphx/json-patch.v4 v4.13.0/go.mod h1:p8EYWUEYMpynmqDbY58zCKCFZw8pRWMG4EsWvDvM72M= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -423,19 +469,37 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= k8s.io/api v0.32.1 h1:f562zw9cy+GvXzXf0CKlVQ7yHJVYzLfL6JAS4kOAaOc= k8s.io/api v0.32.1/go.mod h1:/Yi/BqkuueW1BgpoePYBRdDYfjPF5sgTr5+YqDZra5k= +k8s.io/api v0.35.1 h1:0PO/1FhlK/EQNVK5+txc4FuhQibV25VLSdLMmGpDE/Q= +k8s.io/api v0.35.1/go.mod h1:28uR9xlXWml9eT0uaGo6y71xK86JBELShLy4wR1XtxM= k8s.io/apimachinery v0.32.1 h1:683ENpaCBjma4CYqsmZyhEzrGz6cjn1MY/X2jB2hkZs= k8s.io/apimachinery v0.32.1/go.mod h1:GpHVgxoKlTxClKcteaeuF1Ul/lDVb74KpZcxcmLDElE= +k8s.io/apimachinery v0.35.1 h1:yxO6gV555P1YV0SANtnTjXYfiivaTPvCTKX6w6qdDsU= +k8s.io/apimachinery v0.35.1/go.mod h1:jQCgFZFR1F4Ik7hvr2g84RTJSZegBc8yHgFWKn//hns= k8s.io/client-go v0.32.1 h1:otM0AxdhdBIaQh7l1Q0jQpmo7WOFIk5FFa4bg6YMdUU= k8s.io/client-go v0.32.1/go.mod h1:aTTKZY7MdxUaJ/KiUs8D+GssR9zJZi77ZqtzcGXIiDg= +k8s.io/client-go v0.35.1 h1:+eSfZHwuo/I19PaSxqumjqZ9l5XiTEKbIaJ+j1wLcLM= +k8s.io/client-go v0.35.1/go.mod h1:1p1KxDt3a0ruRfc/pG4qT/3oHmUj1AhSHEcxNSGg+OA= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f h1:GA7//TjRY9yWGy1poLzYYJJ4JRdzg3+O6e8I+e+8T5Y= k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f/go.mod h1:R/HEjbvWI0qdfb8viZUeVZm0X6IZnxAydC7YU42CMw4= +k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 h1:Y3gxNAuB0OBLImH611+UDZcmKS3g6CthxToOb37KgwE= +k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912/go.mod h1:kdmbQkyfwUagLfXIad1y2TdrjPFWp2Q89B3qkRwf/pQ= k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738 h1:M3sRQVHv7vB20Xc2ybTt7ODCeFj6JSWYFzOFnYeS6Ro= k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 h1:SjGebBtkBqHFOli+05xYbK8YF1Dzkbzn+gDM4X9T4Ck= +k8s.io/utils v0.0.0-20251002143259-bc988d571ff4/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 h1:/Rv+M11QRah1itp8VhT6HoVx1Ray9eB4DBr+K+/sCJ8= sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3/go.mod h1:18nIHnGi6636UCz6m8i4DhaJ65T6EruyzmoQqI2BVDo= +sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 h1:IpInykpT6ceI+QxKBbEflcR5EXP7sU1kvOlxwZh5txg= +sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730/go.mod h1:mdzfpAEoE6DHQEN0uh9ZbOCuHbLK5wOm7dK4ctXE9Tg= +sigs.k8s.io/randfill v1.0.0 h1:JfjMILfT8A6RbawdsK2JXGBR5AQVfd+9TbzrlneTyrU= +sigs.k8s.io/randfill v1.0.0/go.mod h1:XeLlZ/jmk4i1HRopwe7/aU3H5n1zNUcX6TM94b3QxOY= sigs.k8s.io/structured-merge-diff/v4 v4.4.2 h1:MdmvkGuXi/8io6ixD5wud3vOLwc1rj0aNqRlpuvjmwA= sigs.k8s.io/structured-merge-diff/v4 v4.4.2/go.mod h1:N8f93tFZh9U6vpxwRArLiikrE5/2tiu1w1AGfACIGE4= +sigs.k8s.io/structured-merge-diff/v6 v6.3.0 h1:jTijUJbW353oVOd9oTlifJqOGEkUw2jB/fXCbTiQEco= +sigs.k8s.io/structured-merge-diff/v6 v6.3.0/go.mod h1:M3W8sfWvn2HhQDIbGWj3S099YozAsymCo/wrT5ohRUE= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= sigs.k8s.io/yaml v1.4.0/go.mod h1:Ejl7/uTz7PSA4eKMyQCUTnhZYNmLIl+5c2lQPGR2BPY= +sigs.k8s.io/yaml v1.6.0 h1:G8fkbMSAFqgEFgh4b1wmtzDnioxFCUgTZhlbj5P9QYs= +sigs.k8s.io/yaml v1.6.0/go.mod h1:796bPqUfzR/0jLAl6XjHl3Ck7MiyVv8dbTdyT3/pMf4= diff --git a/models/template.go b/models/template.go index eeac2ea..c2268e1 100644 --- a/models/template.go +++ b/models/template.go @@ -84,7 +84,11 @@ type Key struct { type Artifact struct { Name string `yaml:"name"` Path string `yaml:"path"` - S3 *Key `yaml:"s3,omitempty"` +} + +type ArtifactRepositoryRef struct { + ConfigMap string `yaml:"configMap"` + Key string `yaml:"key"` } type InOut struct { diff --git a/workflow_builder/admiralty_setter.go b/workflow_builder/admiralty_setter.go deleted file mode 100644 index 4862df0..0000000 --- a/workflow_builder/admiralty_setter.go +++ /dev/null @@ -1,145 +0,0 @@ -package workflow_builder - -import ( - "encoding/json" - "fmt" - "net/http" - "oc-monitord/utils" - "slices" - "time" - - oclib "cloud.o-forge.io/core/oc-lib" - "cloud.o-forge.io/core/oc-lib/logs" - "cloud.o-forge.io/core/oc-lib/models/peer" - tools "cloud.o-forge.io/core/oc-lib/tools" -) - -type AdmiraltySetter struct { - Id string // ID to identify the execution, correspond to workflow_executions id - NodeName string // Allows to retrieve the name of the node used for this execution on each peer {"peerId": "nodeName"} -} - -func (s *AdmiraltySetter) InitializeAdmiralty(localPeerID string, remotePeerID string) error { - - logger := logs.GetLogger() - - data := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", localPeerID, nil, nil).LoadOne(remotePeerID) - if data.Code != 200 { - logger.Error().Msg("Error while trying to instantiate remote peer " + remotePeerID) - return fmt.Errorf(data.Err) - } - remotePeer := data.ToPeer() - - data = oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", localPeerID, nil, nil).LoadOne(localPeerID) - if data.Code != 200 { - logger.Error().Msg("Error while trying to instantiate local peer " + remotePeerID) - return fmt.Errorf(data.Err) - } - localPeer := data.ToPeer() - - caller := tools.NewHTTPCaller( - map[tools.DataType]map[tools.METHOD]string{ - tools.ADMIRALTY_SOURCE: { - tools.POST: "/:id", - }, - tools.ADMIRALTY_KUBECONFIG: { - tools.GET: "/:id", - }, - tools.ADMIRALTY_SECRET: { - tools.POST: "/:id/" + remotePeerID, - }, - tools.ADMIRALTY_TARGET: { - tools.POST: "/:id/" + remotePeerID, - }, - tools.ADMIRALTY_NODES: { - tools.GET: "/:id/" + remotePeerID, - }, - }, - ) - - logger.Info().Msg("\n\n Creating the Admiralty Source on " + remotePeerID + " ns-" + s.Id) - s.callRemoteExecution(remotePeer, []int{http.StatusCreated, http.StatusConflict}, caller, s.Id, tools.ADMIRALTY_SOURCE, tools.POST, nil, true) - logger.Info().Msg("\n\n Retrieving kubeconfig with the secret on " + remotePeerID + " ns-" + s.Id) - kubeconfig := s.getKubeconfig(remotePeer, caller) - logger.Info().Msg("\n\n Creating a secret from the kubeconfig " + localPeerID + " ns-" + s.Id) - s.callRemoteExecution(localPeer, []int{http.StatusCreated}, caller, s.Id, tools.ADMIRALTY_SECRET, tools.POST, kubeconfig, true) - logger.Info().Msg("\n\n Creating the Admiralty Target on " + localPeerID + " in namespace " + s.Id) - s.callRemoteExecution(localPeer, []int{http.StatusCreated, http.StatusConflict}, caller, s.Id, tools.ADMIRALTY_TARGET, tools.POST, nil, true) - logger.Info().Msg("\n\n Checking for the creation of the admiralty node on " + localPeerID + " ns-" + s.Id) - s.checkNodeStatus(localPeer, caller) - - return nil -} - -func (s *AdmiraltySetter) getKubeconfig(peer *peer.Peer, caller *tools.HTTPCaller) map[string]string { - var kubedata map[string]string - s.callRemoteExecution(peer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_KUBECONFIG, tools.GET, nil, true) - if caller.LastResults["body"] == nil || len(caller.LastResults["body"].([]byte)) == 0 { - l := utils.GetLogger() - l.Error().Msg("Something went wrong when retrieving data from Get call for kubeconfig") - panic(0) - } - err := json.Unmarshal(caller.LastResults["body"].([]byte), &kubedata) - if err != nil { - l := utils.GetLogger() - l.Error().Msg("Something went wrong when unmarshalling data from Get call for kubeconfig") - panic(0) - } - - return kubedata -} - -func (*AdmiraltySetter) callRemoteExecution(peer *peer.Peer, expectedCode []int, caller *tools.HTTPCaller, dataID string, dt tools.DataType, method tools.METHOD, body interface{}, panicCode bool) { - l := utils.GetLogger() - _, err := peer.LaunchPeerExecution(peer.UUID, dataID, dt, method, body, caller) - if err != nil { - l.Error().Msg("Error when executing on peer at" + peer.APIUrl) - l.Error().Msg(err.Error()) - panic(0) - } - - if !slices.Contains(expectedCode, caller.LastResults["code"].(int)) { - l.Error().Msg(fmt.Sprint("Didn't receive the expected code :", caller.LastResults["code"], "when expecting", expectedCode)) - if _, ok := caller.LastResults["body"]; ok { - l.Info().Msg(string(caller.LastResults["body"].([]byte))) - } - if panicCode { - panic(0) - } - } - -} - -func (s *AdmiraltySetter) storeNodeName(caller *tools.HTTPCaller) { - var data map[string]interface{} - if resp, ok := caller.LastResults["body"]; ok { - json.Unmarshal(resp.([]byte), &data) - } - - if node, ok := data["node"]; ok { - metadata := node.(map[string]interface{})["metadata"] - name := metadata.(map[string]interface{})["name"].(string) - s.NodeName = name - } else { - l := utils.GetLogger() - l.Error().Msg("Could not retrieve data about the recently created node") - panic(0) - } -} - -func (s *AdmiraltySetter) checkNodeStatus(localPeer *peer.Peer, caller *tools.HTTPCaller) { - for i := range 5 { - time.Sleep(10 * time.Second) // let some time for kube to generate the node - s.callRemoteExecution(localPeer, []int{http.StatusOK}, caller, s.Id, tools.ADMIRALTY_NODES, tools.GET, nil, false) - if caller.LastResults["code"] == 200 { - s.storeNodeName(caller) - return - } - if i == 5 { - logger.Error().Msg("Node on " + localPeer.Name + " was never found, panicking !") - panic(0) - } - logger.Info().Msg("Could not verify that node is up. Retrying...") - } - -} diff --git a/workflow_builder/argo_builder.go b/workflow_builder/argo_builder.go index 1725254..044173f 100644 --- a/workflow_builder/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -1,20 +1,20 @@ -// A class that translates the informations held in the graph object -// via its lists of components into an argo file, using the a list of -// link ID to build the dag - +// Package workflow_builder traduit les informations du graphe d'un Workflow +// (ses composants, ses liens) en un fichier YAML Argo Workflow prêt à être +// soumis à un cluster Kubernetes. Le point d'entrée principal est ArgoBuilder. package workflow_builder import ( + "encoding/json" "fmt" "oc-monitord/conf" . "oc-monitord/models" - tools2 "oc-monitord/tools" "os" "strings" "time" oclib "cloud.o-forge.io/core/oc-lib" + oclib_config "cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/peer" @@ -24,21 +24,34 @@ import ( "cloud.o-forge.io/core/oc-lib/models/workflow/graph" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" + "github.com/nats-io/nats.go" "github.com/nwtgck/go-fakelish" "github.com/rs/zerolog" "gopkg.in/yaml.v3" ) +// logger est le logger zerolog partagé au sein du package, initialisé à +// chaque appel de CreateDAG pour récupérer la configuration courante. var logger zerolog.Logger +// ArgoBuilder est le constructeur principal du fichier Argo Workflow. +// Il porte l'état de la construction (workflow source, templates générés, +// services k8s à créer, timeout global, liste des peers distants impliqués). type ArgoBuilder struct { + // OriginWorkflow est le workflow métier Open Cloud dont on construit la représentation Argo. OriginWorkflow *w.Workflow - Workflow Workflow - Services []*Service - Timeout int - RemotePeers []string + // Workflow est la structure YAML Argo en cours de construction. + Workflow Workflow + // Services liste les services Kubernetes à exposer pour les processings "IsService". + Services []*Service + // Timeout est la durée maximale d'exécution en secondes (activeDeadlineSeconds). + Timeout int + // RemotePeers contient les IDs des peers distants détectés via Admiralty. + RemotePeers []string } +// Workflow est la structure racine du fichier YAML Argo Workflow. +// Elle correspond exactement au format attendu par le contrôleur Argo. type Workflow struct { ApiVersion string `yaml:"apiVersion"` Kind string `yaml:"kind"` @@ -48,6 +61,8 @@ type Workflow struct { Spec Spec `yaml:"spec,omitempty"` } +// getDag retourne le pointeur sur le template "dag" du workflow. +// S'il n'existe pas encore, il est créé et ajouté à la liste des templates. func (b *Workflow) getDag() *Dag { for _, t := range b.Spec.Templates { if t.Name == "dag" { @@ -58,7 +73,10 @@ func (b *Workflow) getDag() *Dag { return b.Spec.Templates[len(b.Spec.Templates)-1].Dag } +// Spec contient la spécification complète du workflow Argo : +// compte de service, point d'entrée, volumes, templates et timeout. type Spec struct { + ArtifactRepositoryRef ServiceAccountName string `yaml:"serviceAccountName,omitempty"` Entrypoint string `yaml:"entrypoint"` Arguments []Parameter `yaml:"arguments,omitempty"` @@ -67,12 +85,21 @@ type Spec struct { Timeout int `yaml:"activeDeadlineSeconds,omitempty"` } -// TODO: found on a processing instance linked to storage -// add s3, gcs, azure, etc if needed on a link between processing and storage +// CreateDAG est le point d'entrée de la construction du DAG Argo. +// Il crée tous les templates (un par processing / native tool / sous-workflow), +// configure les volumes persistants, positionne les métadonnées globales du +// workflow et retourne : +// - le nombre de tâches dans le DAG, +// - les noms des premières tâches (sans dépendances), +// - les noms des dernières tâches (dont personne ne dépend), +// - une éventuelle erreur. +// +// Le paramètre write est conservé pour usage futur (écriture effective du YAML). +// TODO: gérer S3, GCS, Azure selon le type de stockage lié au processing. func (b *ArgoBuilder) CreateDAG(exec *workflow_execution.WorkflowExecution, namespace string, write bool) (int, []string, []string, error) { logger = logs.GetLogger() logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items)) - // handle services by checking if there is only one processing with hostname and port + // Crée un template Argo pour chaque nœud du graphe et collecte les volumes. firstItems, lastItems, volumes := b.createTemplates(exec, namespace) b.createVolumes(exec, volumes) @@ -90,10 +117,17 @@ func (b *ArgoBuilder) CreateDAG(exec *workflow_execution.WorkflowExecution, name return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil } +// createTemplates parcourt tous les nœuds du graphe (processings, native tools, +// sous-workflows) et génère les templates Argo correspondants. +// Elle gère également le recâblage des dépendances DAG entre sous-workflows +// imbriqués, et l'ajout du pod de service si nécessaire. +// Retourne les premières tâches, les dernières tâches et les volumes à créer. func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution, namespace string) ([]string, []string, []VolumeMount) { volumes := []VolumeMount{} firstItems := []string{} lastItems := []string{} + + // --- Processings --- for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) { index := 0 _, res := item.GetResource() @@ -110,6 +144,8 @@ func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution namespace, item.ID, item.Processing, volumes, firstItems, lastItems) } + + // --- Native Tools de type WORKFLOW_EVENT uniquement --- for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsNativeTool) { if item.NativeTool.Kind != int(native_tools.WORKFLOW_EVENT) { continue @@ -124,6 +160,8 @@ func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution volumes, firstItems, lastItems = b.createArgoTemplates(exec, namespace, item.ID, item.NativeTool, volumes, firstItems, lastItems) } + + // --- Sous-workflows : chargement, construction récursive et fusion du DAG --- firstWfTasks := map[string][]string{} latestWfTasks := map[string][]string{} relatedWfTasks := map[string][]string{} @@ -140,18 +178,22 @@ func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution continue } firstWfTasks[wf] = fi - if ok, depsOfIds := subBuilder.isArgoDependancy(wf); ok { // IS BEFORE + if ok, depsOfIds := subBuilder.isArgoDependancy(wf); ok { // le sous-workflow est une dépendance d'autre chose latestWfTasks[wf] = li relatedWfTasks[wf] = depsOfIds } + // Fusion des tâches, templates, volumes et arguments du sous-workflow dans le DAG principal. subDag := subBuilder.Workflow.getDag() d := b.Workflow.getDag() - d.Tasks = append(d.Tasks, subDag.Tasks...) // add the tasks of the subworkflow to the main workflow + d.Tasks = append(d.Tasks, subDag.Tasks...) b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, subBuilder.Workflow.Spec.Templates...) b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, subBuilder.Workflow.Spec.Volumes...) b.Workflow.Spec.Arguments = append(b.Workflow.Spec.Arguments, subBuilder.Workflow.Spec.Arguments...) b.Services = append(b.Services, subBuilder.Services...) } + + // Recâblage : les tâches qui dépendaient du sous-workflow dépendent désormais + // de sa dernière tâche réelle (latestWfTasks). for wfID, depsOfIds := range relatedWfTasks { for _, dep := range depsOfIds { for _, task := range b.Workflow.getDag().Tasks { @@ -171,6 +213,9 @@ func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution } } } + + // Les premières tâches du sous-workflow héritent des dépendances + // que le sous-workflow avait vis-à-vis du DAG principal. for wfID, fi := range firstWfTasks { deps := b.getArgoDependencies(wfID) if len(deps) > 0 { @@ -183,6 +228,8 @@ func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution } } } + + // Si des services Kubernetes sont nécessaires, on ajoute le pod dédié. if b.Services != nil { dag := b.Workflow.getDag() dag.Tasks = append(dag.Tasks, Task{Name: "workflow-service-pod", Template: "workflow-service-pod"}) @@ -191,6 +238,13 @@ func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution return firstItems, lastItems, volumes } +// createArgoTemplates crée le template Argo pour un nœud du graphe (processing +// ou native tool). Il : +// 1. Ajoute la tâche au DAG avec ses dépendances. +// 2. Crée le template de container (ou d'événement pour les native tools). +// 3. Ajoute les annotations Admiralty si le processing est hébergé sur un peer distant. +// 4. Crée un service Kubernetes si le processing est déclaré IsService. +// 5. Configure les annotations de stockage (S3, volumes locaux). func (b *ArgoBuilder) createArgoTemplates( exec *workflow_execution.WorkflowExecution, namespace string, @@ -199,9 +253,12 @@ func (b *ArgoBuilder) createArgoTemplates( volumes []VolumeMount, firstItems []string, lastItems []string) ([]VolumeMount, []string, []string) { + _, firstItems, lastItems = b.addTaskToArgo(exec, b.Workflow.getDag(), id, obj, firstItems, lastItems) template := &Template{Name: getArgoName(obj.GetName(), id)} logger.Info().Msg(fmt.Sprint("Creating template for", template.Name)) + + // Vérifie si le processing est sur un peer distant (Admiralty). isReparted, peer := b.isReparted(obj, id) if obj.GetType() == tools.PROCESSING_RESOURCE.String() { template.CreateContainer(exec, obj.(*resources.ProcessingResource), b.Workflow.getDag()) @@ -214,11 +271,13 @@ func (b *ArgoBuilder) createArgoTemplates( b.RemotePeers = append(b.RemotePeers, peer.GetID()) template.AddAdmiraltyAnnotations(peer.GetID()) } - // get datacenter from the processing + + // Si le processing expose un service Kubernetes, on l'enregistre et on + // applique le label "app" pour que le Service puisse le sélectionner. if obj.GetType() == tools.PROCESSING_RESOURCE.String() && obj.(*resources.ProcessingResource).IsService { b.CreateService(exec, id, obj) template.Metadata.Labels = make(map[string]string) - template.Metadata.Labels["app"] = "oc-service-" + obj.GetName() // Construct the template for the k8s service and add a link in graph between k8s service and processing + template.Metadata.Labels["app"] = "oc-service-" + obj.GetName() } volumes = b.addStorageAnnotations(exec, id, template, namespace, volumes) @@ -226,26 +285,48 @@ func (b *ArgoBuilder) createArgoTemplates( return volumes, firstItems, lastItems } +// addStorageAnnotations parcourt tous les nœuds de stockage liés au processing +// identifié par id. Pour chaque lien de stockage : +// - Construit le nom de l'artefact Argo (lecture ou écriture). +// - Pour les stockages S3 : appelle waitForConsiders (STORAGE_RESOURCE) pour +// attendre la validation PB_CONSIDERS avant de configurer les annotations S3. +// - Pour les volumes locaux : ajoute un VolumeMount dans le container. func (b *ArgoBuilder) addStorageAnnotations(exec *workflow_execution.WorkflowExecution, id string, template *Template, namespace string, volumes []VolumeMount) []VolumeMount { - related := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.Graph.IsStorage) // Retrieve all of the storage node linked to the processing for which we create the template + // Récupère tous les nœuds de stockage connectés au processing courant. + related := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.Graph.IsStorage) for _, r := range related { storage := r.Node.(*resources.StorageResource) for _, linkToStorage := range r.Links { for _, rw := range linkToStorage.StorageLinkInfos { var art Artifact - artifactBaseName := strings.Join(strings.Split(storage.GetName(), " "), "-") + "-" + strings.Replace(rw.FileName, ".", "-", -1) // Parameter/Artifact name must consist of alpha-numeric characters, '_' or '-' + // Le nom de l'artefact doit être alphanumérique + '-' ou '_'. + artifactBaseName := strings.Join(strings.Split(storage.GetName(), " "), "-") + "-" + strings.Replace(rw.FileName, ".", "-", -1) if rw.Write { - art = Artifact{Path: template.ReplacePerEnv(rw.Source, linkToStorage.Env)} // When we are writing to the s3 the Path element is the path to the file in the pod + // Écriture vers S3 : Path = chemin du fichier dans le pod. + art = Artifact{Path: template.ReplacePerEnv(rw.Source, linkToStorage.Env)} art.Name = artifactBaseName + "-input-write" } else { - art = Artifact{Path: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env)} // When we are reading from the s3 the Path element in pod should be the destination of the file + // Lecture depuis S3 : Path = destination dans le pod. + art = Artifact{Path: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env)} art.Name = artifactBaseName + "-input-read" } if storage.StorageType == enum.S3 { - - b.addS3annotations(exec, &art, template, rw, linkToStorage, storage, namespace) + // Pour chaque ressource de compute liée à ce stockage S3, + // on notifie via NATS et on attend la validation PB_CONSIDERS + // avec DataType = STORAGE_RESOURCE avant de continuer. + for _, r := range b.getStorageRelatedProcessing(storage.GetID()) { + waitForConsiders(exec.ExecutionsID, tools.STORAGE_RESOURCE, ArgoKubeEvent{ + ExecutionsID: exec.ExecutionsID, + DestPeerID: r.GetID(), + Type: tools.STORAGE_RESOURCE, + SourcePeerID: storage.GetCreatorID(), + OriginID: conf.GetConfig().PeerID, + }) + } + // Configure la référence au dépôt d'artefacts S3 dans le Spec. + b.addS3annotations(storage, namespace) } if rw.Write { @@ -255,6 +336,8 @@ func (b *ArgoBuilder) addStorageAnnotations(exec *workflow_execution.WorkflowExe } } } + + // Si l'instance de stockage est locale, on monte un volume persistant. index := 0 if s, ok := exec.SelectedInstances[storage.GetID()]; ok { index = s @@ -271,107 +354,75 @@ func (b *ArgoBuilder) addStorageAnnotations(exec *workflow_execution.WorkflowExe return volumes } -func (b *ArgoBuilder) addS3annotations(exec *workflow_execution.WorkflowExecution, art *Artifact, template *Template, rw graph.StorageProcessingGraphLink, linkToStorage graph.GraphLink, storage *resources.StorageResource, namespace string) { - - art.S3 = &Key{ - // Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env), - Insecure: true, // temporary - } - if rw.Write { - art.S3.Key = rw.Destination + "/" + rw.FileName - } else { - art.S3.Key = rw.Source - } - index := 0 - if d, ok := exec.SelectedInstances[storage.GetID()]; ok { - index = d - } - sel := storage.GetSelectedInstance(&index) - // v0.1 : add the storage.Source to the s3 object - // v0.2 : test if the storage.Source exists in the configMap and quit if not - // v1 : v0.2 + if doesn't exist edit/create the configMap with the response from API call - if sel != nil { - b.addAuthInformation(exec, storage, namespace, art) - art.S3.Bucket = namespace // DEFAULT : will need to update this to create an unique - art.S3.EndPoint = sel.(*resources.StorageResourceInstance).Source - } -} - -func (b *ArgoBuilder) addAuthInformation(exec *workflow_execution.WorkflowExecution, storage *resources.StorageResource, namespace string, art *Artifact) { - index := 0 - if d, ok := exec.SelectedInstances[storage.GetID()]; ok { - index = d - } - - sel := storage.GetSelectedInstance(&index) - - tool, err := tools2.NewService(conf.GetConfig().Mode) - if err != nil || tool == nil { - logger.Fatal().Msg("Could not create the access secret :" + err.Error()) - } - - secretName, err := b.SetupS3Credentials(storage, namespace, tool) // this method return should be updated once we have decided how to retrieve credentials - - if err == nil { - art.S3.AccessKeySecret = &Secret{ - Name: secretName, - Key: "access-key", - } - art.S3.SecretKeySecret = &Secret{ - Name: secretName, - Key: "secret-key", +// getStorageRelatedProcessing retourne la liste des ressources de compute +// connectées (via un processing intermédiaire) au stockage identifié par storageId. +// Ces ressources sont utilisées pour construire les ArgoKubeEvent destinés +// à la validation NATS. +func (b *ArgoBuilder) getStorageRelatedProcessing(storageId string) (res []resources.ResourceInterface) { + var storageLinks []graph.GraphLink + // On ne conserve que les liens impliquant ce stockage. + for _, link := range b.OriginWorkflow.Graph.Links { + if link.Destination.ID == storageId || link.Source.ID == storageId { + storageLinks = append(storageLinks, link) } } - art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source+"/", "") - art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source, "") - splits := strings.Split(art.S3.EndPoint, "/") - if len(splits) > 1 { - art.S3.Bucket = splits[0] - art.S3.EndPoint = strings.Join(splits[1:], "/") - } else { - art.S3.Bucket = splits[0] - } -} - -func (b *ArgoBuilder) SetupS3Credentials(storage *resources.StorageResource, namespace string, tool tools2.Tool) (string, error) { - s := tool.GetS3Secret(storage.UUID, namespace) - // var s *v1.Secret - accessKey, secretKey := retrieveMinioCredential("peer", namespace) - - if s == nil { - id, err := tool.CreateAccessSecret( - accessKey, - secretKey, - storage.UUID, - namespace, - ) - if err != nil { - l := oclib.GetLogger() - l.Fatal().Msg("Error when creating the secret holding credentials for S3 access in " + namespace + " : " + err.Error()) + for _, link := range storageLinks { + var resourceId string + // L'opposé du lien est soit la source soit la destination selon la direction. + if link.Source.ID != storageId { + resourceId = link.Source.ID + } else { + resourceId = link.Destination.ID + } + // Si l'opposé est un processing, on récupère ses ressources de compute. + if b.OriginWorkflow.Graph.IsProcessing(b.OriginWorkflow.Graph.Items[resourceId]) { + res = append(res, b.getComputeProcessing(resourceId)...) } - - return id, nil } - return s.Name, nil + return } -// This method needs to evolve to an API call to the peer passed as a parameter -func retrieveMinioCredential(peer string, namespace string) (string, string) { - return "hF9wRGog75JuMdshWeEZ", "OwXXJkVQyb5l1aVPdOegKOtDJGoP1dJYeo8O7mDW" +// getComputeProcessing retourne toutes les ressources de compute attachées +// au processing identifié par processingId dans le graphe du workflow. +func (b *ArgoBuilder) getComputeProcessing(processingId string) (res []resources.ResourceInterface) { + arr := []resources.ResourceInterface{} + computeRel := b.OriginWorkflow.GetByRelatedProcessing(processingId, b.OriginWorkflow.Graph.IsCompute) + for _, rel := range computeRel { + arr = append(arr, rel.Node) + } + return arr } +// addS3annotations configure la référence au dépôt d'artefacts S3 dans le Spec +// du workflow Argo. La ConfigMap et la clé sont dérivées de l'ID du stockage. +// Le namespace est conservé en signature pour une évolution future. +func (b *ArgoBuilder) addS3annotations(storage *resources.StorageResource, namespace string) { + b.Workflow.Spec.ArtifactRepositoryRef = ArtifactRepositoryRef{ + ConfigMap: storage.GetID() + "-artifact-repository", + Key: storage.GetID() + "-s3-local", + } +} + +// addTaskToArgo ajoute une tâche au DAG Argo pour le nœud graphItemID. +// Elle résout les dépendances DAG, propage les paramètres d'environnement, +// d'entrée et de sortie de l'instance sélectionnée, et met à jour les listes +// firstItems / lastItems utilisées pour le recâblage des sous-workflows. func (b *ArgoBuilder) addTaskToArgo(exec *workflow_execution.WorkflowExecution, dag *Dag, graphItemID string, processing resources.ResourceInterface, firstItems []string, lastItems []string) (*Dag, []string, []string) { + unique_name := getArgoName(processing.GetName(), graphItemID) step := Task{Name: unique_name, Template: unique_name} + index := 0 if d, ok := exec.SelectedInstances[processing.GetID()]; ok { index = d } instance := processing.GetSelectedInstance(&index) if instance != nil { + // Propagation des variables d'environnement, entrées et sorties + // de l'instance vers les paramètres de la tâche Argo. for _, value := range instance.(*resources.ProcessingInstance).Env { step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{ Name: value.Name, @@ -391,7 +442,10 @@ func (b *ArgoBuilder) addTaskToArgo(exec *workflow_execution.WorkflowExecution, }) } } + step.Dependencies = b.getArgoDependencies(graphItemID) + + // Détermine si ce nœud est une première ou une dernière tâche du DAG. name := "" if b.OriginWorkflow.Graph.Items[graphItemID].Processing != nil { name = b.OriginWorkflow.Graph.Items[graphItemID].Processing.GetName() @@ -405,11 +459,15 @@ func (b *ArgoBuilder) addTaskToArgo(exec *workflow_execution.WorkflowExecution, if ok, _ := b.isArgoDependancy(graphItemID); !ok && name != "" { lastItems = append(lastItems, getArgoName(name, graphItemID)) } + dag.Tasks = append(dag.Tasks, step) return dag, firstItems, lastItems } -func (b *ArgoBuilder) createVolumes(exec *workflow_execution.WorkflowExecution, volumes []VolumeMount) { // TODO : one think about remote volume but TG +// createVolumes crée les PersistentVolumeClaims Argo (volumeClaimTemplates) +// pour chaque volume local référencé dans les templates de processing. +// TODO: gérer les volumes distants. +func (b *ArgoBuilder) createVolumes(exec *workflow_execution.WorkflowExecution, volumes []VolumeMount) { for _, volume := range volumes { index := 0 if s, ok := exec.SelectedInstances[volume.Storage.GetID()]; ok { @@ -424,6 +482,10 @@ func (b *ArgoBuilder) createVolumes(exec *workflow_execution.WorkflowExecution, } } +// isArgoDependancy vérifie si le nœud identifié par id est une dépendance +// d'au moins un autre nœud du DAG (i.e. s'il existe un lien sortant vers +// un processing ou un workflow). +// Retourne true + la liste des noms Argo des nœuds qui en dépendent. func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) { dependancyOfIDs := []string{} isDeps := false @@ -446,6 +508,8 @@ func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) { return isDeps, dependancyOfIDs } +// getArgoDependencies retourne la liste des noms de tâches Argo dont dépend +// le nœud identifié par id (liens entrants depuis des processings). func (b *ArgoBuilder) getArgoDependencies(id string) (dependencies []string) { for _, link := range b.OriginWorkflow.Graph.Links { if _, ok := b.OriginWorkflow.Graph.Items[link.Source.ID]; !ok { @@ -462,6 +526,9 @@ func (b *ArgoBuilder) getArgoDependencies(id string) (dependencies []string) { return } +// getArgoName construit le nom unique d'une tâche / template Argo à partir +// du nom humain de la ressource et de son ID dans le graphe. +// Les espaces sont remplacés par des tirets et tout est mis en minuscules. func getArgoName(raw_name string, component_id string) (formatedName string) { formatedName = strings.ReplaceAll(raw_name, " ", "-") formatedName += "-" + component_id @@ -469,8 +536,10 @@ func getArgoName(raw_name string, component_id string) (formatedName string) { return } -// Verify if a processing resource is attached to another Compute than the one hosting -// the current Open Cloud instance. If true return the peer ID to contact +// isReparted vérifie si le processing est hébergé sur un Compute appartenant +// à un peer distant (Relation != 1, i.e. pas le peer local). +// Si c'est le cas, elle retourne true et le Peer concerné pour qu'Admiralty +// puisse router les pods vers le bon cluster. func (b *ArgoBuilder) isReparted(processing resources.ResourceInterface, graphID string) (bool, *peer.Peer) { computeAttached := b.retrieveProcessingCompute(graphID) if computeAttached == nil { @@ -478,7 +547,7 @@ func (b *ArgoBuilder) isReparted(processing resources.ResourceInterface, graphID panic(0) } - // Creates an accessor srtictly for Peer Collection + // Résolution du Peer propriétaire du Compute via l'API oc-lib. req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil) if req == nil { fmt.Println("TODO : handle error when trying to create a request on the Peer Collection") @@ -494,15 +563,18 @@ func (b *ArgoBuilder) isReparted(processing resources.ResourceInterface, graphID peer := res.ToPeer() - isNotReparted := peer.State == 1 + // Relation == 1 signifie "moi-même" : le processing est local. + isNotReparted := peer.Relation == 1 logger.Info().Msg(fmt.Sprint("Result IsMySelf for ", peer.UUID, " : ", isNotReparted)) return !isNotReparted, peer } +// retrieveProcessingCompute parcourt les liens du graphe pour retrouver +// la ressource de Compute directement connectée au nœud graphID. +// Retourne nil si aucun Compute n'est trouvé. func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.ComputeResource { for _, link := range b.OriginWorkflow.Graph.Links { - // If a link contains the id of the processing var oppositeId string if link.Source.ID == graphID { oppositeId = link.Destination.ID @@ -518,32 +590,142 @@ func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.Compu continue } } - } - return nil } -// Execute the last actions once the YAML file for the Argo Workflow is created +// waitForConsiders publie un ArgoKubeEvent sur le canal NATS ARGO_KUBE_EVENT +// puis se bloque jusqu'à réception d'un PropalgationMessage vérifiant : +// - Action == PB_CONSIDERS +// - DataType == dataType (COMPUTE_RESOURCE ou STORAGE_RESOURCE) +// - Payload décodé en JSON contenant "executions_id" == executionsId +// +// Cela garantit que l'infrastructure distante (Admiralty ou Minio) a bien +// pris en compte la demande avant que la construction du workflow continue. +// Un timeout de 5 minutes est appliqué pour éviter un blocage indéfini. +func waitForConsiders(executionsId string, dataType tools.DataType, event ArgoKubeEvent) { + // Sérialise l'événement et le publie sur ARGO_KUBE_EVENT. + b, err := json.Marshal(event) + if err != nil { + logger.Error().Msg("Cannot marshal ArgoKubeEvent: " + err.Error()) + return + } + tools.NewNATSCaller().SetNATSPub(tools.ARGO_KUBE_EVENT, tools.NATSResponse{ + FromApp: "oc-monitord", + Datatype: dataType, + User: "root", + Method: int(tools.PROPALGATION_EVENT), + Payload: b, + }) + + // Connexion NATS pour écouter la réponse PB_CONSIDERS. + natsURL := oclib_config.GetConfig().NATSUrl + if natsURL == "" { + logger.Error().Msg("NATS_SERVER not set, skipping PB_CONSIDERS wait") + return + } + nc, err := nats.Connect(natsURL) + if err != nil { + logger.Error().Msg("NATS connect error waiting for PB_CONSIDERS: " + err.Error()) + return + } + defer nc.Close() + + // Souscription au canal PROPALGATION_EVENT avec un buffer de 64 messages. + ch := make(chan *nats.Msg, 64) + sub, err := nc.ChanSubscribe(tools.PROPALGATION_EVENT.GenerateKey(), ch) + if err != nil { + logger.Error().Msg("NATS subscribe error waiting for PB_CONSIDERS: " + err.Error()) + return + } + defer sub.Unsubscribe() + + timeout := time.After(5 * time.Minute) + for { + select { + case msg := <-ch: + // Désérialise le message en PropalgationMessage. + var pm tools.PropalgationMessage + if err := json.Unmarshal(msg.Data, &pm); err != nil { + continue + } + // Filtre : action, type de données. + if pm.Action != tools.PB_CONSIDERS || pm.DataType != int(dataType) { + continue + } + // Filtre : executions_id dans le Payload du PropalgationMessage. + var body struct { + ExecutionsID string `json:"executions_id"` + } + if err := json.Unmarshal(pm.Payload, &body); err != nil { + continue + } + if body.ExecutionsID != executionsId { + continue + } + logger.Info().Msg(fmt.Sprintf("PB_CONSIDERS received for executions_id=%s datatype=%s", executionsId, dataType.String())) + return + case <-timeout: + logger.Warn().Msg(fmt.Sprintf("Timeout waiting for PB_CONSIDERS executions_id=%s datatype=%s", executionsId, dataType.String())) + return + } + } +} + +// ArgoKubeEvent est la structure publiée sur NATS lors de la demande de +// provisionnement d'une ressource distante (Admiralty ou stockage S3). +// Le champ OriginID identifie le peer initiateur : c'est vers lui que la +// réponse PB_CONSIDERS sera routée par le système de propagation. +type ArgoKubeEvent struct { + // ExecutionsID est l'identifiant de l'exécution de workflow en cours. + ExecutionsID string `json:"executions_id"` + // DestPeerID est le peer de destination (compute ou peer S3 cible). + DestPeerID string `json:"dest_peer_id"` + // Type indique la nature de la ressource : COMPUTE_RESOURCE ou STORAGE_RESOURCE. + Type tools.DataType `json:"data_type"` + // SourcePeerID est le peer source de la ressource demandée. + SourcePeerID string `json:"source_peer_id"` + // OriginID est le peer qui a initié la demande de provisionnement ; + // la réponse PB_CONSIDERS lui sera renvoyée. + OriginID string `json:"origin_id"` +} + +// CompleteBuild finalise la construction du workflow Argo après la génération +// du DAG. Elle effectue dans l'ordre : +// 1. Pour chaque peer distant (Admiralty) : publie un ArgoKubeEvent de type +// COMPUTE_RESOURCE et attend la validation PB_CONSIDERS via waitForConsiders. +// 2. Met à jour les annotations Admiralty des templates avec le nom de cluster +// construit à partir du peerId et de l'executionsId. +// 3. Sérialise le workflow en YAML et l'écrit dans ./argo_workflows/. +// +// Retourne le chemin du fichier YAML généré. func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { - logger.Info().Msg(fmt.Sprint("DEV :: Completing build")) - setter := AdmiraltySetter{Id: executionsId} - // Setup admiralty for each node + logger.Info().Msg("DEV :: Completing build") + + // --- Étape 1 : validation Admiralty pour chaque peer distant --- for _, peer := range b.RemotePeers { logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer)) - setter.InitializeAdmiralty(conf.GetConfig().PeerID, peer) + // Publie l'événement COMPUTE_RESOURCE et attend PB_CONSIDERS (bloquant). + waitForConsiders(executionsId, tools.COMPUTE_RESOURCE, ArgoKubeEvent{ + ExecutionsID: executionsId, + Type: tools.COMPUTE_RESOURCE, + DestPeerID: conf.GetConfig().PeerID, + SourcePeerID: peer, + OriginID: conf.GetConfig().PeerID, + }) } - // Update the name of the admiralty node to use + // --- Étape 2 : mise à jour du nom de cluster Admiralty --- + // Le nom final du cluster cible est "target--". for _, template := range b.Workflow.Spec.Templates { if len(template.Metadata.Annotations) > 0 { if peerId, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok { - template.Metadata.Annotations["multicluster.admiralty.io/clustername"] = "target-" + oclib.GetConcatenatedName(peerId, executionsId) + template.Metadata.Annotations["multicluster.admiralty.io/clustername"] = "target-" + tools.GetConcatenatedName(peerId, executionsId) } } } - // Generate the YAML file + // --- Étape 3 : génération et écriture du fichier YAML --- random_name := fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8) b.Workflow.Metadata.Name = "oc-monitor-" + random_name logger = oclib.GetLogger() @@ -552,12 +734,11 @@ func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { logger.Error().Msg("Could not transform object to yaml file") return "", err } - // Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss + // Nom de fichier horodaté au format DD_MM_YYYY_hhmmss. current_timestamp := time.Now().Format("02_01_2006_150405") file_name := random_name + "_" + current_timestamp + ".yml" workflows_dir := "./argo_workflows/" err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660) - if err != nil { logger.Error().Msg("Could not write the yaml file") return "", err diff --git a/workflow_builder/argo_builder.md b/workflow_builder/argo_builder.md new file mode 100644 index 0000000..4a867e2 --- /dev/null +++ b/workflow_builder/argo_builder.md @@ -0,0 +1,128 @@ +# argo_builder.go — Résumé + +## Rôle général + +`argo_builder.go` traduit un **Workflow Open Cloud** (graphe de nœuds : processings, +stockages, computes, sous-workflows) en un **fichier YAML Argo Workflow** prêt à +être soumis à un cluster Kubernetes. + +--- + +## Structures principales + +| Struct | Rôle | +|---|---| +| `ArgoBuilder` | Constructeur principal. Porte le workflow source, la structure YAML en cours de build, les services k8s, le timeout et la liste des peers distants (Admiralty). | +| `Workflow` | Racine du YAML Argo (`apiVersion`, `kind`, `metadata`, `spec`). | +| `Spec` | Spécification du workflow : compte de service, entrypoint, templates, volumes, timeout, référence au dépôt d'artefacts S3. | +| `ArgoKubeEvent` | Événement publié sur NATS lors de la demande de provisionnement d'une ressource distante (compute ou stockage S3). Contient `executions_id`, `dest_peer_id`, `source_peer_id`, `data_type`, `origin_id`. | + +--- + +## Flux d'exécution principal + +``` +CreateDAG() + └─ createTemplates() + ├─ [pour chaque processing] createArgoTemplates() + │ ├─ addTaskToArgo() → ajoute la tâche au DAG + dépendances + │ ├─ CreateContainer() → template container Argo + │ ├─ AddAdmiraltyAnnotations() → si peer distant détecté + │ └─ addStorageAnnotations() → S3 + volumes locaux + ├─ [pour chaque native tool WORKFLOW_EVENT] createArgoTemplates() + └─ [pour chaque sous-workflow] + ├─ CreateDAG() récursif + └─ fusion DAG + recâblage des dépendances + └─ createVolumes() → PersistentVolumeClaims + +CompleteBuild() + ├─ waitForConsiders() × N peers → validation Admiralty (COMPUTE_RESOURCE) + ├─ mise à jour annotations Admiralty (clustername) + └─ écriture du YAML dans ./argo_workflows/ +``` + +--- + +## Fonctions clés + +### `CreateDAG(exec, namespace, write) → (nbTâches, firstItems, lastItems, err)` +Point d'entrée. Initialise le logger, déclenche la création des templates et des +volumes, configure les métadonnées globales du workflow Argo. + +### `createTemplates(exec, namespace) → (firstItems, lastItems, volumes)` +Itère sur tous les nœuds du graphe. +- Processings → template container. +- Native tools `WORKFLOW_EVENT` → template événement. +- Sous-workflows → build récursif + fusion DAG + recâblage des dépendances entrantes/sortantes. + +### `createArgoTemplates(exec, namespace, id, obj, …)` +Crée le template Argo pour un nœud donné. +Détecte si le processing est **réparti** (peer distant via `isReparted`) → ajoute les +annotations Admiralty et enregistre le peer dans `RemotePeers`. +Délègue la configuration du stockage à `addStorageAnnotations`. + +### `addStorageAnnotations(exec, id, template, namespace, volumes)` +Pour chaque stockage lié au processing : +- **S3** : appelle `waitForConsiders(STORAGE_RESOURCE)` pour chaque compute associé, + puis configure la référence au dépôt d'artefacts via `addS3annotations`. +- **Local** : monte un `VolumeMount` dans le container. + +### `waitForConsiders(executionsId, dataType, event)` +**Fonction bloquante.** +1. Publie l'`ArgoKubeEvent` sur le canal NATS `ARGO_KUBE_EVENT`. +2. S'abonne à `PROPALGATION_EVENT`. +3. Attend un `PropalgationMessage` vérifiant : + - `Action == PB_CONSIDERS` + - `DataType == dataType` + - `Payload.executions_id == executionsId` +4. Timeout : **5 minutes**. + +| Appelant | DataType attendu | Signification | +|---|---|---| +| `addStorageAnnotations` (S3) | `STORAGE_RESOURCE` | Le stockage S3 distant est prêt | +| `CompleteBuild` (Admiralty) | `COMPUTE_RESOURCE` | Le cluster cible Admiralty est configuré | + +### `CompleteBuild(executionsId) → (cheminYAML, err)` +Finalise le build : +1. Pour chaque peer dans `RemotePeers` → `waitForConsiders(COMPUTE_RESOURCE)` (bloquant, séquentiel). +2. Met à jour les annotations `multicluster.admiralty.io/clustername` avec `target--`. +3. Sérialise le workflow en YAML et l'écrit dans `./argo_workflows/_.yml`. + +### `isReparted(processing, graphID) → (bool, *peer.Peer)` +Retrouve le Compute attaché au processing, charge le Peer propriétaire via l'API +oc-lib, et vérifie si `Relation != 1` (pas le peer local). + +### `addTaskToArgo(exec, dag, graphItemID, processing, …)` +Crée une `Task` Argo (nom unique, template, dépendances DAG, paramètres env/inputs/outputs) +et la rattache au DAG. Met à jour `firstItems` / `lastItems`. + +### `isArgoDependancy(id) → (bool, []string)` +Vérifie si un nœud est utilisé comme source d'un lien sortant vers un autre +processing ou workflow (il est donc une dépendance pour quelqu'un). + +### `getArgoDependencies(id) → []string` +Retourne les noms des tâches Argo dont ce nœud dépend (liens entrants). + +--- + +## Protocole NATS utilisé + +``` +Publication → canal : ARGO_KUBE_EVENT + payload : NATSResponse{Method: PROPALGATION_EVENT, Payload: ArgoKubeEvent} + +Attente ← canal : PROPALGATION_EVENT + filtre : PropalgationMessage{ + Action = PB_CONSIDERS, + DataType = COMPUTE_RESOURCE | STORAGE_RESOURCE, + Payload = {"executions_id": ""} + } +``` + +--- + +## Fichier YAML produit + +- Nom : `oc-monitor--_.yml` +- Dossier : `./argo_workflows/` +- Permissions : `0660`