diff --git a/.gitignore b/.gitignore index 5de9851..29796df 100644 --- a/.gitignore +++ b/.gitignore @@ -22,4 +22,5 @@ go.work argo_workflows/* -env.env \ No newline at end of file +env.env +oc-monitord \ No newline at end of file diff --git a/conf/conf.go b/conf/conf.go index 7968a9c..abddccd 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -1,6 +1,11 @@ package conf -import "sync" +import ( + "sync" + "time" + + "cloud.o-forge.io/core/oc-lib/config" +) type Config struct { ExecutionID string @@ -15,6 +20,29 @@ type Config struct { KubeCert string KubeData string ArgoHost string // when executed in a container will replace addresses with "localhost" in their url + // OCNamespace est le namespace Kubernetes où tournent les composants OpenCloud (NATS, etc.). + // Utilisé pour construire le FQDN NATS accessible depuis n'importe quel namespace. + // Valeur par défaut : "opencloud". + NatsUrl string + OCNamespace string + // ScheduledTime is the wall-clock time at which the Argo workflow must be submitted. + // oc-monitord completes pre-pull + infra setup first, then waits until this time. + // Zero value means "submit immediately after prep". + ScheduledTime time.Time +} + +// NATSPodURL retourne l'URL NATS utilisable depuis un pod dans n'importe quel namespace. +// Les pods Argo tournent dans le namespace executions_id, pas dans OCNamespace, +// donc le FQDN complet est nécessaire pour atteindre le service NATS. +func (c *Config) NATSPodURL() string { + if config.GetConfig().NATSUrl == "" { + ns := c.OCNamespace + if ns == "" { + ns = "opencloud" + } + return "nats." + ns + ".svc.cluster.local:4222" + } + return config.GetConfig().NATSUrl } var instance *Config diff --git a/env.env b/env.env index 4a00a82..d60dd97 100644 --- a/env.env +++ b/env.env @@ -1,4 +1,4 @@ KUBERNETES_SERVICE_HOST=192.168.1.169 -KUBE_CA="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFTVlk3ZHZhNEdYTVdkMy9jMlhLN3JLYjlnWXgyNSthaEE0NmkyNVBkSFAKRktQL2UxSVMyWVF0dzNYZW1TTUQxaStZdzJSaVppNUQrSVZUamNtNHdhcnFvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVWtlUVJpNFJiODduME5yRnZaWjZHClc2SU55NnN3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUlnRXA5ck04WmdNclRZSHYxZjNzOW5DZXZZeWVVa3lZUk4KWjUzazdoaytJS1FDSVFDbk05TnVGKzlTakIzNDFacGZ5ays2NEpWdkpSM3BhcmVaejdMd2lhNm9kdz09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K" -KUBE_CERT="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJWUxWNkFPQkdrU1F3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOekl6TVRFeU1ETTJNQjRYRFRJME1EZ3dPREV3TVRNMU5sb1hEVEkxTURndwpPREV3TVRNMU5sb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJGQ2Q1MFdPeWdlQ2syQzcKV2FrOWY4MVAvSkJieVRIajRWOXBsTEo0ck5HeHFtSjJOb2xROFYxdUx5RjBtOTQ2Nkc0RmRDQ2dqaXFVSk92Swp3NVRPNnd5alNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFJkOFI5cXVWK2pjeUVmL0ovT1hQSzMyS09XekFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQTArbThqTDBJVldvUTZ0dnB4cFo4NVlMalF1SmpwdXM0aDdnSXRxS3NmUVVDSUI2M2ZNdzFBMm5OVWU1TgpIUGZOcEQwSEtwcVN0Wnk4djIyVzliYlJUNklZCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFRc3hXWk9pbnIrcVp4TmFEQjVGMGsvTDF5cE01VHAxOFRaeU92ektJazQKRTFsZWVqUm9STW0zNmhPeVljbnN3d3JoNnhSUnBpMW5RdGhyMzg0S0Z6MlBvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBYZkVmYXJsZm8zTWhIL3lmemx6Cnl0OWlqbHN3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUxJL2dNYnNMT3MvUUpJa3U2WHVpRVMwTEE2cEJHMXgKcnBlTnpGdlZOekZsQWlFQW1wdjBubjZqN3M0MVI0QzFNMEpSL0djNE53MHdldlFmZWdEVGF1R2p3cFk9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K" -KUBE_DATA="LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSU5ZS1BFb1dhd1NKUzJlRW5oWmlYMk5VZlY1ZlhKV2krSVNnV09TNFE5VTlvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFVUozblJZN0tCNEtUWUx0WnFUMS96VS84a0Z2Sk1lUGhYMm1Vc25pczBiR3FZblkyaVZEeApYVzR2SVhTYjNqcm9iZ1YwSUtDT0twUWs2OHJEbE03ckRBPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo=" \ No newline at end of file +KUBE_CA="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTnpReU56STVNVEF3SGhjTk1qWXdNekl6TVRNek5URXdXaGNOTXpZd016SXdNVE16TlRFdwpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTnpReU56STVNVEF3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFSSGpYRDVpbnRIYWZWSk5VaDFlRnIxcXBKdFlkUmc5NStKVENEa0tadTIKYjUxRXlKaG1zanRIY3BDUndGL1VGMzlvdzY4TFBUcjBxaUorUHlhQTBLZUtvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTdWQkNzZVN3ajJ2cmczMFE5UG8vCnV6ZzAvMjR3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUloQUlEOVY2aFlUSS83ZW1hRzU0dDdDWVU3TXFSdDdESUkKNlgvSUwrQ0RLbzlNQWlCdlFEMGJmT0tVWDc4UmRGdUplcEhEdWFUMUExaGkxcWdIUGduM1dZdDBxUT09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K" +KUBE_CERT="LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJUU5KbFNJQUJPMDR3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOemMwTWpjeU9URXdNQjRYRFRJMk1ETXlNekV6TXpVeE1Gb1hEVEkzTURNeQpNekV6TXpVeE1Gb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJMY3Uwb2pUbVg4RFhTQkYKSHZwZDZNVEoyTHdXc1lRTmdZVURXRDhTVERIUWlCczlMZ0x5ZTdOMEFvZk85RkNZVW1HamhiaVd3WFVHR3dGTgpUdlRMU2lXalNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCUlJhRW9wQzc5NGJyTHlnR0g5SVhvbDZTSmlFREFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQWhaRUlrSWV3Y1loL1NmTFVCVjE5MW1CYTNRK0J5S2J5eTVlQmpwL3kzeWtDSUIxWTJicTVOZTNLUUU4RAprNnNzeFJrbjJmN0VoWWVRQU1pUlJ2MjIweDNLCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTnpReU56STVNVEF3SGhjTk1qWXdNekl6TVRNek5URXdXaGNOTXpZd016SXdNVE16TlRFdwpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTnpReU56STVNVEF3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFTcTdVTC85MEc1ZmVTaE95NjI3eGFZWlM5dHhFdWFoWFQ3Vk5wZkpQSnMKaEdXd2UxOXdtbXZzdlp6dlNPUWFRSzJaMmttN0hSb1IrNlA1YjIyamczbHVvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVVVXaEtLUXUvZUc2eThvQmgvU0Y2Ckpla2lZaEF3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUloQUk3cGxHczFtV20ySDErbjRobDBNTk13RmZzd0o5ZXIKTzRGVkM0QzhwRG44QWlCN3NZMVFwd2M5VkRUeGNZaGxuZzZNUzRXai85K0lHWjJxcy94UStrMjdTQT09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K" +KUBE_DATA="LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSUROZDRnWXd6aVRhK1hwNnFtNVc3SHFzc1JJNkREaUJTbUV2ZHoxZzk3VGxvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFdHk3U2lOT1pmd05kSUVVZStsM294TW5ZdkJheGhBMkJoUU5ZUHhKTU1kQ0lHejB1QXZKNwpzM1FDaDg3MFVKaFNZYU9GdUpiQmRRWWJBVTFPOU10S0pRPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo=" diff --git a/go.mod b/go.mod index c1c8bc3..1eda15e 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module oc-monitord go 1.25.0 require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260320151407-88d2e526283b + cloud.o-forge.io/core/oc-lib v0.0.0-20260527135023-cef23b5f307b github.com/akamensky/argparse v1.4.0 github.com/google/uuid v1.6.0 github.com/nwtgck/go-fakelish v0.1.3 diff --git a/go.sum b/go.sum index 464bac1..319b8f9 100644 --- a/go.sum +++ b/go.sum @@ -10,6 +10,24 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260320103359-c34b8c67038b h1:VdLBRXb0wSsR9 cloud.o-forge.io/core/oc-lib v0.0.0-20260320103359-c34b8c67038b/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= cloud.o-forge.io/core/oc-lib v0.0.0-20260320151407-88d2e526283b h1:QEdy0FxwWcXYHVLcC06tRmhFl6T/pr2M7l2Auni/sSU= cloud.o-forge.io/core/oc-lib v0.0.0-20260320151407-88d2e526283b/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260326110203-87cf2cb12af0 h1:pQf9k+GSzNGEmrUa00jn9Zcqfp9X4N1Z5ie7InvUf3g= +cloud.o-forge.io/core/oc-lib v0.0.0-20260326110203-87cf2cb12af0/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260402123119-a2f6f3c252ac h1:nCr9cWzPNdEuwjG/KDOYslKw4kHE8hJXzGI81jDNf/A= +cloud.o-forge.io/core/oc-lib v0.0.0-20260402123119-a2f6f3c252ac/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260402124551-4f0714cb1182 h1:1SQm0TfFIpn+3fJpFgxibx0V8uAqaf4DpjDL28+bkqs= +cloud.o-forge.io/core/oc-lib v0.0.0-20260402124551-4f0714cb1182/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260402125506-54985bbc4543 h1:gP+DrjkHZJ4I1xUkR/4DbfW1mVdHoAwmmkte9TEiPwM= +cloud.o-forge.io/core/oc-lib v0.0.0-20260402125506-54985bbc4543/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260410075751-d7b2ef6ae120 h1:CMOOpmpgkD63Gq7ukmXG6r+WlJxvpSgDRmalpWPhaIg= +cloud.o-forge.io/core/oc-lib v0.0.0-20260410075751-d7b2ef6ae120/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260414104622-dc0041999d22 h1:lum7G12vCKYKQWXTOYtl2Qh9hLRlzrcOPO3pozUBL40= +cloud.o-forge.io/core/oc-lib v0.0.0-20260414104622-dc0041999d22/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260423072402-9c2663601a2b h1:h1Rwra0Ljp8bhj7L5t9NEtP51lbg7RFySY1XMTprEXE= +cloud.o-forge.io/core/oc-lib v0.0.0-20260423072402-9c2663601a2b/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260427091650-f048b420d74d h1:jzgwgbZDASalQJSYbPF/L2L2RSP2OAbqhMB4YUXK27M= +cloud.o-forge.io/core/oc-lib v0.0.0-20260427091650-f048b420d74d/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260527135023-cef23b5f307b h1:TWhmHeurbBmdyevREh4+mHWOBehO2AK587RCIjCfvOc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260527135023-cef23b5f307b/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= diff --git a/logger/argo_logs.go b/logger/argo_logs.go index 9664481..b0e49e7 100644 --- a/logger/argo_logs.go +++ b/logger/argo_logs.go @@ -14,7 +14,6 @@ import ( "cloud.o-forge.io/core/oc-lib/models/common/enum" octools "cloud.o-forge.io/core/oc-lib/tools" "github.com/rs/zerolog" - "k8s.io/apimachinery/pkg/watch" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" ) @@ -114,15 +113,23 @@ func NewArgoPodLog(name string, step string, msg string) ArgoPodLog { // delegated to oc-scheduler (WorkflowExecution) and oc-datacenter (Bookings) // via the dedicated NATS channels. // -// - wfName : Argo workflow name (also the name of the root DAG node) -// - execID : WorkflowExecution UUID (for oc-scheduler to update state) -// - executionsID: run-group ID shared by all bookings of this run -// - namespace : Kubernetes namespace -// - watcher : Argo watch stream -func LogKubernetesArgo(wfName string, execID string, executionsID string, namespace string, watcher watch.Interface) { +// When the Kubernetes watch channel closes before the workflow reaches a +// terminal state (API server timeout, network blip, etc.), the function +// fetches the actual workflow state directly. If the workflow is still +// running it reconnects the watcher and continues. Only a genuine terminal +// failure emits WORKFLOW_DONE_EVENT with FAILURE. +// +// - wfName : Argo workflow name as returned by CreateArgoWorkflow (used +// as the root DAG node key in wf.Status.Nodes) +// - execID : WorkflowExecution UUID +// - executionsID: run-group ID / Kubernetes namespace of the workflow +// - namespace : Kubernetes namespace for pod log streaming +// - tool : Kubernetes/Argo client (used to reconnect + fetch state) +// - rawWfName : workflow name without the "oc-monitor-" prefix (passed to +// GetArgoWatch which prepends it) +func LogKubernetesArgo(wfName string, execID string, executionsID string, namespace string, tool tools.Tool, rawWfName string) { var argoWatcher *ArgoWatch var pods []string - var node wfv1.NodeStatus wfl := utils.GetWFLogger("") wfl.Debug().Msg("Starting to log " + wfName) @@ -131,181 +138,363 @@ func LogKubernetesArgo(wfName string, execID string, executionsID string, namesp // nodePhases tracks the last known phase of each step node so we can detect // phase transitions and emit WORKFLOW_STEP_DONE_EVENT exactly once per step. + // Kept across watcher reconnections so we never double-emit. nodePhases := map[string]wfv1.NodePhase{} // stepResults captures the final NodeStatus of every completed step so the - // WORKFLOW_DONE_EVENT can include a full recap (Steps slice) for oc-scheduler - // and oc-catalog to catch up if they missed individual STEP_DONE events. + // WORKFLOW_DONE_EVENT can include a full recap (Steps slice). stepResults := map[string]wfv1.NodeStatus{} workflowStartedEmitted := false - for event := range watcher.ResultChan() { - wf, ok := event.Object.(*wfv1.Workflow) - if !ok { - wfl.Error().Msg("unexpected type") - continue - } - if len(wf.Status.Nodes) == 0 { - wfl.Info().Msg("No node status yet") + for { + watcher, err := tool.GetArgoWatch(executionsID, rawWfName) + if err != nil { + wfl.Error().Msg("Could not create watcher: " + err.Error()) + if resolveAndEmitTerminal(wfName, execID, executionsID, tool, nodePhases, stepResults, &wg, wfl) { + return + } + time.Sleep(5 * time.Second) continue } - // ── Emit WORKFLOW_STARTED_EVENT once ──────────────────────────────── - if !workflowStartedEmitted { - realStart := wf.Status.StartedAt.Time - emitLifecycleEvent(octools.WORKFLOW_STARTED_EVENT, octools.WorkflowLifecycleEvent{ - ExecutionID: execID, - ExecutionsID: executionsID, - State: enum.STARTED.EnumIndex(), - RealStart: &realStart, - }) - workflowStartedEmitted = true - } + reachedTerminalState := false + var node wfv1.NodeStatus - conditions := retrieveCondition(wf) - - // Retrieving the Status for the main node, which is named after the workflow - if node, ok = wf.Status.Nodes[wfName]; !ok { - bytified, _ := json.MarshalIndent(wf.Status.Nodes, "", "\t") - wfl.Fatal().Msg("Could not find the " + wfName + " node in \n" + string(bytified)) - } - - now := time.Now().UTC() - start, _ := time.Parse(time.RFC3339, node.StartedAt.String()) - duration := now.Sub(start.UTC()) - - newWatcher := ArgoWatch{ - Name: node.Name, - Namespace: namespace, - Status: string(node.Phase), - Created: node.StartedAt.String(), - Started: node.StartedAt.String(), - Progress: string(node.Progress), - Duration: duration.String(), - Conditions: conditions, - } - - if argoWatcher == nil { - argoWatcher = &newWatcher - } - - if !newWatcher.Equals(argoWatcher) { - jsonified, _ := json.Marshal(newWatcher) - wfl.Info().Msg(string(jsonified)) - argoWatcher = &newWatcher - } - - // ── Per-step completion detection ──────────────────────────────────── - for _, stepNode := range wf.Status.Nodes { - if stepNode.Name == wfName { - continue // skip the main DAG node + for event := range watcher.ResultChan() { + wf, ok := event.Object.(*wfv1.Workflow) + if !ok { + wfl.Error().Msg("unexpected type") + continue } - prev := nodePhases[stepNode.Name] - nodePhases[stepNode.Name] = stepNode.Phase - - if prev == stepNode.Phase { - continue // no change - } - if !stepNode.Phase.Completed() && !stepNode.Phase.FailedOrError() { - continue // not terminal yet - } - if prev.Completed() || prev.FailedOrError() { - continue // already processed - } - - bookingID := extractBookingID(stepNode.Name) - if bookingID == "" { + if len(wf.Status.Nodes) == 0 { + wfl.Info().Msg("No node status yet") continue } - stepState := enum.SUCCESS - if stepNode.Phase.FailedOrError() { + // ── Emit WORKFLOW_STARTED_EVENT once ──────────────────────────────── + if !workflowStartedEmitted { + realStart := wf.Status.StartedAt.Time + if realStart.IsZero() { + realStart = time.Now().UTC() + } + emitLifecycleEvent(octools.WORKFLOW_STARTED_EVENT, octools.WorkflowLifecycleEvent{ + ExecutionID: execID, + ExecutionsID: executionsID, + State: enum.STARTED.EnumIndex(), + RealStart: &realStart, + }) + workflowStartedEmitted = true + } + + conditions := retrieveCondition(wf) + + // Retrieving the Status for the main node, which is named after the workflow + if node, ok = wf.Status.Nodes[wfName]; !ok { + bytified, _ := json.MarshalIndent(wf.Status.Nodes, "", "\t") + wfl.Error().Msg("Could not find the " + wfName + " node in \n" + string(bytified)) + continue + } + + now := time.Now().UTC() + start := node.StartedAt.Time.UTC() + var duration time.Duration + if !start.IsZero() { + duration = now.Sub(start) + } + + newWatcher := ArgoWatch{ + Name: node.Name, + Namespace: namespace, + Status: string(node.Phase), + Created: node.StartedAt.String(), + Started: node.StartedAt.String(), + Progress: string(node.Progress), + Duration: duration.String(), + Conditions: conditions, + } + + if argoWatcher == nil { + argoWatcher = &newWatcher + } + + if !newWatcher.Equals(argoWatcher) { + jsonified, _ := json.Marshal(newWatcher) + wfl.Info().Msg(string(jsonified)) + argoWatcher = &newWatcher + } + + // ── Per-step completion detection ──────────────────────────────────── + for _, stepNode := range wf.Status.Nodes { + if stepNode.Name == wfName { + continue // skip the main DAG node + } + prev := nodePhases[stepNode.Name] + nodePhases[stepNode.Name] = stepNode.Phase + + if prev == stepNode.Phase { + continue // no change + } + if !stepNode.Phase.Completed() && !stepNode.Phase.FailedOrError() { + continue // not terminal yet + } + if prev.Completed() || prev.FailedOrError() { + continue // already processed + } + + bookingID := extractBookingID(stepNode.Name) + if bookingID == "" { + continue + } + + stepState := enum.SUCCESS + if stepNode.Phase.FailedOrError() { + if !(strings.Contains(stepNode.Message, "context cancel") || strings.Contains(stepNode.Message, "exit")) { + fmt.Println("1 baraka", stepNode.Message) + stepState = enum.FAILURE + } + } + realStart := stepNode.StartedAt.Time + realEnd := stepNode.FinishedAt.Time + if realEnd.IsZero() { + realEnd = time.Now().UTC() + } + if realStart.IsZero() { + realStart = realEnd + } + emitLifecycleEvent(octools.WORKFLOW_STEP_DONE_EVENT, octools.WorkflowLifecycleEvent{ + ExecutionID: execID, + ExecutionsID: executionsID, + BookingID: bookingID, + State: stepState.EnumIndex(), + RealStart: &realStart, + RealEnd: &realEnd, + }) + // Store for the final recap emitted with WORKFLOW_DONE_EVENT. + stepResults[bookingID] = stepNode + } + + // ── Pod log streaming ──────────────────────────────────────────────── + for _, pod := range wf.Status.Nodes { + if pod.Type != wfv1.NodeTypePod { + continue + } + if !slices.Contains(pods, pod.Name) { + pl := wfl.With().Str("pod", pod.Name).Logger() + pl.Info().Msg("Found a new pod to log : " + pod.Name) + wg.Add(1) + go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg) + pods = append(pods, pod.Name) + } + } + + // ── Workflow terminal phase ────────────────────────────────────────── + if node.Phase.Completed() || node.Phase.FailedOrError() { + if node.Phase.Completed() { + wfl.Info().Msg(wfName + " workflow completed") + } else { + wfl.Error().Msg(wfName + " has failed, please refer to the logs") + wfl.Error().Msg(node.Message) + } + wg.Wait() + wfl.Info().Msg(wfName + " exiting") + + finalState := enum.SUCCESS + if node.Phase.FailedOrError() { + if !(strings.Contains(node.Message, "context cancel") || strings.Contains(node.Message, "exit")) { + fmt.Println("2 baraka", node.Message) + finalState = enum.FAILURE + } + } + realStart := node.StartedAt.Time + realEnd := node.FinishedAt.Time + if realEnd.IsZero() { + realEnd = time.Now().UTC() + } + if realStart.IsZero() { + realStart = realEnd + } + + // Build recap from all observed step results. + steps := make([]octools.StepMetric, 0, len(stepResults)) + for bookingID, s := range stepResults { + stepState := enum.SUCCESS + if s.Phase.FailedOrError() { + if !(strings.Contains(s.Message, "context cancel") || strings.Contains(s.Message, "exit")) { + fmt.Println("3 baraka", s.Message) + stepState = enum.FAILURE + } + } + start := s.StartedAt.Time + end := s.FinishedAt.Time + if end.IsZero() { + end = realEnd + } + steps = append(steps, octools.StepMetric{ + BookingID: bookingID, + State: stepState.EnumIndex(), + RealStart: &start, + RealEnd: &end, + }) + } + + emitLifecycleEvent(octools.WORKFLOW_DONE_EVENT, octools.WorkflowLifecycleEvent{ + ExecutionID: execID, + ExecutionsID: executionsID, + State: finalState.EnumIndex(), + RealStart: &realStart, + RealEnd: &realEnd, + Steps: steps, + }) + reachedTerminalState = true + break + } + } + + watcher.Stop() + + if reachedTerminalState { + return + } + + // ── Watcher closed before terminal state ───────────────────────────── + // The Kubernetes watch API closes the channel on server-side timeouts, + // API server restarts, or network blips. Do NOT assume failure: fetch + // the actual workflow state and reconnect if still running. + wfl.Warn().Msg(wfName + " watcher closed before workflow reached terminal state — checking actual state") + + if resolveAndEmitTerminal(wfName, execID, executionsID, tool, nodePhases, stepResults, &wg, wfl) { + return + } + + wfl.Info().Msg(wfName + " workflow still running, reconnecting watcher") + time.Sleep(3 * time.Second) + } +} + +// resolveAndEmitTerminal fetches the live workflow state via the API. If the +// workflow has reached a terminal phase it emits any missing STEP_DONE events +// followed by WORKFLOW_DONE_EVENT and returns true. Returns false when the +// workflow is still running or the state cannot be determined (caller should +// reconnect the watcher). +func resolveAndEmitTerminal( + wfName string, execID string, executionsID string, + tool tools.Tool, + nodePhases map[string]wfv1.NodePhase, + stepResults map[string]wfv1.NodeStatus, + wg *sync.WaitGroup, + wfl zerolog.Logger, +) bool { + wf, err := tool.GetArgoWorkflow(executionsID, wfName) + if err != nil { + wfl.Warn().Msg("Could not fetch workflow state: " + err.Error()) + return false + } + + rootNode, ok := wf.Status.Nodes[wfName] + if !ok { + wfl.Warn().Msg("Root node " + wfName + " not found in live workflow status") + return false + } + + if !rootNode.Phase.Completed() && !rootNode.Phase.FailedOrError() { + wfl.Info().Msgf("%s still running (phase=%s), will reconnect watcher", wfName, rootNode.Phase) + return false + } + + // Emit any STEP_DONE events that were missed while the watcher was down. + for _, stepNode := range wf.Status.Nodes { + if stepNode.Name == wfName { + continue + } + prev := nodePhases[stepNode.Name] + if prev.Completed() || prev.FailedOrError() { + continue // already emitted + } + if !stepNode.Phase.Completed() && !stepNode.Phase.FailedOrError() { + continue + } + bookingID := extractBookingID(stepNode.Name) + if bookingID == "" { + continue + } + stepState := enum.SUCCESS + if stepNode.Phase.FailedOrError() { + if !(strings.Contains(stepNode.Message, "context cancel") || strings.Contains(stepNode.Message, "exit")) { + fmt.Println("4 baraka", stepNode.Message) stepState = enum.FAILURE } - realStart := stepNode.StartedAt.Time - realEnd := stepNode.FinishedAt.Time - if realEnd.IsZero() { - realEnd = time.Now().UTC() - } - emitLifecycleEvent(octools.WORKFLOW_STEP_DONE_EVENT, octools.WorkflowLifecycleEvent{ - ExecutionID: execID, - ExecutionsID: executionsID, - BookingID: bookingID, - State: stepState.EnumIndex(), - RealStart: &realStart, - RealEnd: &realEnd, - }) - // Store for the final recap emitted with WORKFLOW_DONE_EVENT. - stepResults[bookingID] = stepNode } - - // ── Pod log streaming ──────────────────────────────────────────────── - for _, pod := range wf.Status.Nodes { - if pod.Type != wfv1.NodeTypePod { - continue - } - if !slices.Contains(pods, pod.Name) { - pl := wfl.With().Str("pod", pod.Name).Logger() - pl.Info().Msg("Found a new pod to log : " + pod.Name) - wg.Add(1) - go logKubernetesPods(namespace, wfName, pod.Name, pl, &wg) - pods = append(pods, pod.Name) - } + realStart := stepNode.StartedAt.Time + realEnd := stepNode.FinishedAt.Time + if realEnd.IsZero() { + realEnd = time.Now().UTC() } + if realStart.IsZero() { + realStart = realEnd + } + fmt.Println("STEP DONE !!!!! ", stepNode.Name, stepState) + emitLifecycleEvent(octools.WORKFLOW_STEP_DONE_EVENT, octools.WorkflowLifecycleEvent{ + ExecutionID: execID, + ExecutionsID: executionsID, + BookingID: bookingID, + State: stepState.EnumIndex(), + RealStart: &realStart, + RealEnd: &realEnd, + }) + stepResults[bookingID] = stepNode + nodePhases[stepNode.Name] = stepNode.Phase + } - // ── Workflow terminal phase ────────────────────────────────────────── - if node.Phase.Completed() || node.Phase.FailedOrError() { - if node.Phase.Completed() { - wfl.Info().Msg(wfName + " workflow completed") - } else { - wfl.Error().Msg(wfName + " has failed, please refer to the logs") - wfl.Error().Msg(node.Message) - } - wg.Wait() - wfl.Info().Msg(wfName + " exiting") + wg.Wait() - finalState := enum.SUCCESS - if node.Phase.FailedOrError() { - finalState = enum.FAILURE - } - realStart := node.StartedAt.Time - realEnd := node.FinishedAt.Time - if realEnd.IsZero() { - realEnd = time.Now().UTC() - } - - // Build recap from all observed step results. - steps := make([]octools.StepMetric, 0, len(stepResults)) - for bookingID, s := range stepResults { - stepState := enum.SUCCESS - if s.Phase.FailedOrError() { - stepState = enum.FAILURE - } - start := s.StartedAt.Time - end := s.FinishedAt.Time - if end.IsZero() { - end = realEnd - } - steps = append(steps, octools.StepMetric{ - BookingID: bookingID, - State: stepState.EnumIndex(), - RealStart: &start, - RealEnd: &end, - }) - } - - emitLifecycleEvent(octools.WORKFLOW_DONE_EVENT, octools.WorkflowLifecycleEvent{ - ExecutionID: execID, - ExecutionsID: executionsID, - State: finalState.EnumIndex(), - RealStart: &realStart, - RealEnd: &realEnd, - Steps: steps, - }) - break + finalState := enum.SUCCESS + if rootNode.Phase.FailedOrError() { + if !(strings.Contains(rootNode.Message, "context cancel") || strings.Contains(rootNode.Message, "exit")) { + fmt.Println("5 baraka", rootNode.Message) + finalState = enum.FAILURE } } + realStart := rootNode.StartedAt.Time + realEnd := rootNode.FinishedAt.Time + if realEnd.IsZero() { + realEnd = time.Now().UTC() + } + if realStart.IsZero() { + realStart = realEnd + } + + steps := make([]octools.StepMetric, 0, len(stepResults)) + for bookingID, s := range stepResults { + stepState := enum.SUCCESS + if s.Phase.FailedOrError() { + if !(strings.Contains(s.Message, "context cancel") || strings.Contains(s.Message, "exit")) { + fmt.Println("6 baraka", s.Message) + stepState = enum.FAILURE + } + } + start := s.StartedAt.Time + end := s.FinishedAt.Time + if end.IsZero() { + end = realEnd + } + steps = append(steps, octools.StepMetric{ + BookingID: bookingID, + State: stepState.EnumIndex(), + RealStart: &start, + RealEnd: &end, + }) + } + + emitLifecycleEvent(octools.WORKFLOW_DONE_EVENT, octools.WorkflowLifecycleEvent{ + ExecutionID: execID, + ExecutionsID: executionsID, + State: finalState.EnumIndex(), + RealStart: &realStart, + RealEnd: &realEnd, + Steps: steps, + }) + return true } // emitLifecycleEvent publishes a WorkflowLifecycleEvent on the given NATS channel. @@ -357,7 +546,11 @@ func retrieveCondition(wf *wfv1.Workflow) (c Conditions) { func logKubernetesPods(executionId string, wfName string, podName string, logger zerolog.Logger, wg *sync.WaitGroup) { defer wg.Done() - s := strings.Split(podName, ".") + s := strings.SplitN(podName, ".", 2) + if len(s) < 2 { + logger.Error().Str("pod", podName).Msg("Unexpected pod name format, expected wfName.stepName") + return + } name := s[0] + "-" + s[1] step := s[1] @@ -374,10 +567,27 @@ func logKubernetesPods(executionId string, wfName string, podName string, logger } scanner := bufio.NewScanner(reader) + scanner.Buffer(make([]byte, 1024*1024), 1024*1024) for scanner.Scan() { - log := scanner.Text() - podLog := NewArgoPodLog(name, step, log) - jsonified, _ := json.Marshal(podLog) - logger.Info().Msg(string(jsonified)) + line := scanner.Text() + podLog := NewArgoPodLog(name, step, line) + jsonified, err := json.Marshal(podLog) + if err != nil { + logger.Error().Err(err).Msg("Failed to marshal pod log") + continue + } + // Propagate the log level from the container output so errors + // are visible above Debug-only sinks. + switch { + case strings.Contains(line, "level=error") || strings.Contains(line, " ERR "): + logger.Error().Msg(string(jsonified)) + case strings.Contains(line, "level=warning") || strings.Contains(line, " WRN "): + logger.Warn().Msg(string(jsonified)) + default: + logger.Info().Msg(string(jsonified)) + } + } + if err := scanner.Err(); err != nil { + logger.Error().Err(err).Str("pod", podName).Msg("Pod log scanner error") } } diff --git a/main.go b/main.go index 4f85e45..12fbd75 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "os" "regexp" "strings" + "time" "oc-monitord/conf" l "oc-monitord/logger" @@ -42,19 +43,14 @@ var parser argparse.Parser var workflowName string func main() { - o := config.GetConfLoader("oc-monitord") - parser = *argparse.NewParser("oc-monitord", "Launch the execution of a workflow given as a parameter and sends the produced logs to a loki database") loadConfig(&parser) - fmt.Println("sqdqs", o.GetStringDefault("MONGO_URL", "mongodb://mongo:27017")) oclib.InitDaemon("oc-monitord") // Lance l'abonné NATS centralisé pour les confirmations PB_CONSIDERS. workflow_builder.StartConsidersListener() - fmt.Println(conf.GetConfig()) - logger = u.GetLogger() logger.Debug().Msg("Loki URL : " + config.GetConfig().LokiUrl) @@ -62,9 +58,7 @@ func main() { exec := u.GetExecution(conf.GetConfig().ExecutionID) if exec == nil { logger.Fatal().Msg("Could not retrieve workflow ID from execution ID " + conf.GetConfig().ExecutionID + " on peer " + conf.GetConfig().PeerID) - oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{ - "state": enum.FAILURE.EnumIndex(), - }, conf.GetConfig().ExecutionID) + u.EmitExecStateUpdate(conf.GetConfig().ExecutionID, enum.FAILURE) return } conf.GetConfig().WorkflowID = exec.WorkflowID @@ -83,35 +77,42 @@ func main() { if err != nil { logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API") } - + fmt.Println("ExportToArgo") builder, _, err := new_wf.ExportToArgo(exec, conf.GetConfig().Timeout) // Removed stepMax so far, I don't know if we need it anymore + fmt.Println("ExportToArgo", err) if err != nil { logger.Error().Msg("Could not create the Argo file for " + conf.GetConfig().WorkflowID) logger.Error().Msg(err.Error()) - oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{ - "state": enum.FAILURE.EnumIndex(), - }, exec.GetID()) + u.EmitExecStateUpdate(exec.GetID(), enum.FAILURE) return } - + fmt.Println("CompleteBuild") argoFilePath, err := builder.CompleteBuild(exec.ExecutionsID) + fmt.Println("CompleteBuild", err) if err != nil { logger.Error().Msg("Error when completing the build of the workflow: " + err.Error()) - oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{ - "state": enum.FAILURE.EnumIndex(), - }, exec.GetID()) + u.EmitExecStateUpdate(exec.GetID(), enum.FAILURE) return } workflowName = getContainerName(argoFilePath) - + fmt.Println("getContainerName", workflowName, conf.GetConfig().KubeHost) if conf.GetConfig().KubeHost == "" { // Not in a k8s environment, get conf from parameters panic("can't exec with no kube for argo deployment") } else { // Executed in a k8s environment logger.Info().Msg("Executes inside a k8s") - // executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID() + // Wait until the scheduled start time if prep finished early. + if st := conf.GetConfig().ScheduledTime; !st.IsZero() && time.Now().Before(st) { + wait := time.Until(st) + logger.Info().Msgf("Prep done early, waiting %s until scheduled start %s", wait.Round(time.Second), st.Format(time.RFC3339)) + u.EmitExecStateUpdate(exec.GetID(), enum.IN_PREPARATION) + time.Sleep(wait) + } else if st := conf.GetConfig().ScheduledTime; !st.IsZero() && time.Now().After(st) { + logger.Warn().Msgf("Prep finished %s late vs scheduled start %s", time.Since(st).Round(time.Second), st.Format(time.RFC3339)) + } + fmt.Println("EXEC") executeInside(exec.ExecutionsID, exec.GetID(), argoFilePath) } } @@ -133,15 +134,7 @@ func executeInside(ns string, execID string, argo_file_path string) { logger.Info().Msg(fmt.Sprint("Data :" + conf.GetConfig().KubeData)) return } else { - watcher, err := t.GetArgoWatch(ns, workflowName) - if err != nil { - logger.Error().Msg("Could not retrieve Watcher : " + err.Error()) - oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{ - "state": enum.FAILURE.EnumIndex(), - }, execID) - } - - l.LogKubernetesArgo(name, execID, ns, ns, watcher) + l.LogKubernetesArgo(name, execID, ns, ns, t, workflowName) logger.Info().Msg("Finished, exiting...") } @@ -149,9 +142,11 @@ func executeInside(ns string, execID string, argo_file_path string) { func loadConfig(parser *argparse.Parser) { mode := parser.String("M", "mode", &argparse.Options{Required: false, Default: "", Help: "Mode of the execution"}) + ocNamespace := parser.String("n", "namespace", &argparse.Options{Required: false, Default: "opencloud", Help: "Kubernetes namespace where OpenCloud components (NATS) run"}) execution := parser.String("e", "execution", &argparse.Options{Required: true, Help: "Execution ID of the workflow to request from oc-catalog API"}) peer := parser.String("p", "peer", &argparse.Options{Required: false, Default: "", Help: "Peer ID of the workflow to request from oc-catalog API"}) timeout := parser.Int("t", "timeout", &argparse.Options{Required: false, Default: -1, Help: "Timeout for the execution of the workflow"}) + scheduledUnix := parser.Int("s", "scheduled-time", &argparse.Options{Required: false, Default: 0, Help: "Unix timestamp of the scheduled start; oc-monitord will wait until this time before submitting the Argo workflow"}) ca := parser.String("c", "ca", &argparse.Options{Required: false, Default: "", Help: "CA file for the Kubernetes cluster"}) cert := parser.String("C", "cert", &argparse.Options{Required: false, Default: "", Help: "Cert file for the Kubernetes cluster"}) @@ -160,6 +155,7 @@ func loadConfig(parser *argparse.Parser) { host := parser.String("H", "host", &argparse.Options{Required: false, Default: "", Help: "Host for the Kubernetes cluster"}) port := parser.String("P", "port", &argparse.Options{Required: false, Default: "6443", Help: "Port for the Kubernetes cluster"}) + natsUrl := parser.String("N", "nats", &argparse.Options{Required: false, Default: "", Help: "Nats URL"}) // argoHost := parser.String("h", "argoHost", &argparse.Options{Required: false, Default: "", Help: "Host where Argo is running from"}) // can't use -h because its reserved to help err := parser.Parse(os.Args) @@ -172,7 +168,11 @@ func loadConfig(parser *argparse.Parser) { conf.GetConfig().Mode = *mode conf.GetConfig().ExecutionID = *execution conf.GetConfig().PeerID = *peer - + conf.GetConfig().OCNamespace = *ocNamespace + if *scheduledUnix > 0 { + conf.GetConfig().ScheduledTime = time.Unix(int64(*scheduledUnix), 0) + } + conf.GetConfig().NatsUrl = *natsUrl conf.GetConfig().KubeHost = *host conf.GetConfig().KubePort = *port diff --git a/models/template.go b/models/template.go index 687b49b..0c376c9 100644 --- a/models/template.go +++ b/models/template.go @@ -4,12 +4,12 @@ import ( "encoding/json" "fmt" "os/exec" + "sort" "strings" - "cloud.o-forge.io/core/oc-lib/config" - "cloud.o-forge.io/core/oc-lib/models/common/models" "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/resources/native_tools" + "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" ) @@ -48,6 +48,7 @@ func (c *Container) AddVolumeMount(volumeMount VolumeMount, volumes []VolumeMoun type VolumeMount struct { Name string `yaml:"name"` MountPath string `yaml:"mountPath"` + ReadOnly bool `yaml:"readOnly,omitempty"` Storage *resources.StorageResource `yaml:"-"` IsReparted bool `yaml:"-"` } @@ -90,8 +91,8 @@ type Artifact struct { } type ArtifactRepositoryRef struct { - ConfigMap string `yaml:"configMap"` - Key string `yaml:"key"` + ConfigMap string `yaml:"configMap,omitempty"` + Key string `yaml:"key,omitempty"` } type InOut struct { @@ -110,7 +111,7 @@ type Template struct { NodeSelector map[string]string `yaml:"nodeSelector,omitempty"` } -func (template *Template) CreateEventContainer(execution *workflow_execution.WorkflowExecution, nt *resources.NativeTool, dag *Dag) { +func (template *Template) CreateEventContainer(execution *workflow_execution.WorkflowExecution, id string, wf *workflow.Workflow, nt *resources.NativeTool, dag *Dag, natsURL string) { container := Container{Image: "natsio/nats-box", ImagePullPolicy: "IfNotPresent"} container.Command = []string{"sh", "-c"} // all is bash @@ -139,12 +140,21 @@ func (template *Template) CreateEventContainer(execution *workflow_execution.Wor cmd := exec.Command( "nats", "pub", - "--server", config.GetConfig().NATSUrl+":4222", + "--server", natsURL, tools.WORKFLOW_EVENT.GenerateKey(), string(payload), ) - for _, args := range cmd.Args { - container.Args = append(container.Args, args) + if len(wf.Args[id]) > 0 { + for _, args := range wf.Args[id] { + container.Args = append(container.Args, args) + } + } else { + for _, args := range cmd.Args { + container.Args = append(container.Args, args) + } + } + container.Args = []string{ + strings.Join(container.Args, " "), } template.Container = container } @@ -152,7 +162,7 @@ func (template *Template) CreateEventContainer(execution *workflow_execution.Wor } } -func (template *Template) CreateContainer(exec *workflow_execution.WorkflowExecution, processing *resources.ProcessingResource, dag *Dag) { +func (template *Template) CreateContainer(exec *workflow_execution.WorkflowExecution, wf *workflow.Workflow, itemID string, processing *resources.ProcessingResource, dag *Dag) { index := 0 if d, ok := exec.SelectedInstances[processing.GetID()]; ok { index = d @@ -162,40 +172,147 @@ func (template *Template) CreateContainer(exec *workflow_execution.WorkflowExecu return } inst := instance.(*resources.ProcessingInstance) - container := Container{Image: inst.Access.Container.Image, ImagePullPolicy: "IfNotPresent"} + container := Container{ + Image: inst.Access.Container.Image, + ImagePullPolicy: "IfNotPresent", + } if container.Image == "" { return } container.Command = []string{"sh", "-c"} // all is bash - for _, v := range inst.Env { - template.Inputs.Parameters = append(template.Inputs.Parameters, Parameter{Name: v.Name}) + for _, v := range processing.Env { + template.Inputs.Parameters = AppendParamIfAbsent(template.Inputs.Parameters, Parameter{Name: v.Name}) } - for _, v := range inst.Inputs { - template.Inputs.Parameters = append(template.Inputs.Parameters, Parameter{Name: v.Name}) + for _, v := range wf.Env[itemID] { + template.Inputs.Parameters = AppendParamIfAbsent(template.Inputs.Parameters, Parameter{Name: v.Name}) } - for _, v := range inst.Inputs { - template.Outputs.Parameters = append(template.Inputs.Parameters, Parameter{Name: v.Name}) + for _, v := range processing.Inputs { + template.Inputs.Parameters = AppendParamIfAbsent(template.Inputs.Parameters, Parameter{Name: v.Name}) + } + for _, v := range wf.Inputs[itemID] { + template.Inputs.Parameters = AppendParamIfAbsent(template.Inputs.Parameters, Parameter{Name: v.Name}) + } + for _, v := range processing.Outputs { + template.Outputs.Parameters = AppendParamIfAbsent(template.Outputs.Parameters, Parameter{ + Name: v.Name, + Value: v.Value, + }) + } + for _, v := range wf.Outputs[itemID] { + template.Outputs.Parameters = AppendParamIfAbsent(template.Outputs.Parameters, Parameter{ + Name: v.Name, + Value: v.Value, + }) } cmd := strings.ReplaceAll(inst.Access.Container.Command, container.Image, "") for _, a := range strings.Split(cmd, " ") { - container.Args = append(container.Args, template.ReplacePerEnv(a, inst.Env)) + container.Args = append(container.Args, template.ReplacePerEnv(a, template.Inputs.Parameters)) } - for _, a := range strings.Split(inst.Access.Container.Args, " ") { - container.Args = append(container.Args, template.ReplacePerEnv(a, inst.Env)) + if len(wf.Args[itemID]) > 0 { + for _, a := range wf.Args[itemID] { + container.Args = append(container.Args, template.ReplacePerEnv(a, template.Inputs.Parameters)) + } + } else { + for _, a := range strings.Split(inst.Access.Container.Args, " ") { + container.Args = append(container.Args, template.ReplacePerEnv(a, template.Inputs.Parameters)) + } } + container.Args = []string{strings.Join(container.Args, " ")} template.Container = container } -func (template *Template) ReplacePerEnv(arg string, envs []models.Param) string { - for _, v := range envs { - if v.Name != "" && strings.Contains(arg, v.Name) { +// CreateServiceContainer crée le container Argo pour un ServiceResource. +// Pour HOSTED, le container appelle le service distant (endpoint connu) ; +// pour DEPLOYMENT, le container EST le service à déployer. +// La logique de paramètres est identique à CreateContainer. +func (template *Template) CreateServiceContainer(exec *workflow_execution.WorkflowExecution, wf *workflow.Workflow, itemID string, service *resources.ServiceResource, dag *Dag) { + index := 0 + if d, ok := exec.SelectedInstances[service.GetID()]; ok { + index = d + } + instance := service.GetSelectedInstance(&index) + if instance == nil { + return + } + inst := instance.(*resources.ServiceInstance) + if inst.Access == nil || inst.Access.Container == nil || inst.Access.Container.Image == "" { + return + } + container := Container{ + Image: inst.Access.Container.Image, + ImagePullPolicy: "IfNotPresent", + } + container.Command = []string{"sh", "-c"} + for _, v := range service.Env { + template.Inputs.Parameters = AppendParamIfAbsent(template.Inputs.Parameters, Parameter{Name: v.Name}) + } + for _, v := range wf.Env[itemID] { + template.Inputs.Parameters = AppendParamIfAbsent(template.Inputs.Parameters, Parameter{Name: v.Name}) + } + for _, v := range service.Inputs { + template.Inputs.Parameters = AppendParamIfAbsent(template.Inputs.Parameters, Parameter{Name: v.Name}) + } + for _, v := range wf.Inputs[itemID] { + template.Inputs.Parameters = AppendParamIfAbsent(template.Inputs.Parameters, Parameter{Name: v.Name}) + } + for _, v := range service.Outputs { + template.Outputs.Parameters = AppendParamIfAbsent(template.Outputs.Parameters, Parameter{ + Name: v.Name, + Value: v.Value, + }) + } + for _, v := range wf.Outputs[itemID] { + template.Outputs.Parameters = AppendParamIfAbsent(template.Outputs.Parameters, Parameter{ + Name: v.Name, + Value: v.Value, + }) + } + cmd := strings.ReplaceAll(inst.Access.Container.Command, container.Image, "") + for _, a := range strings.Split(cmd, " ") { + container.Args = append(container.Args, template.ReplacePerEnv(a, template.Inputs.Parameters)) + } + if len(wf.Args[itemID]) > 0 { + for _, a := range wf.Args[itemID] { + container.Args = append(container.Args, template.ReplacePerEnv(a, template.Inputs.Parameters)) + } + } else { + for _, a := range strings.Split(inst.Access.Container.Args, " ") { + container.Args = append(container.Args, template.ReplacePerEnv(a, template.Inputs.Parameters)) + } + } + container.Args = []string{strings.Join(container.Args, " ")} + template.Container = container +} + +// AppendParamIfAbsent ajoute p à params uniquement si aucun paramètre +// portant le même nom n'est déjà présent. Évite les doublons quand les +// variables sont définies à la fois sur le ProcessingResource et sur le +// Workflow (overrides). +func AppendParamIfAbsent(params []Parameter, p Parameter) []Parameter { + for _, existing := range params { + if existing.Name == p.Name { + return params + } + } + return append(params, p) +} + +func (template *Template) ReplacePerEnv(arg string, envs []Parameter) string { + // Tri par longueur décroissante : les noms longs sont remplacés en premier + // pour éviter que $FOO matche à l'intérieur de $FOOBAR. + sorted := make([]Parameter, len(envs)) + copy(sorted, envs) + sort.Slice(sorted, func(i, j int) bool { + return len(sorted[i].Name) > len(sorted[j].Name) + }) + for _, v := range sorted { + needle := "$" + v.Name + if v.Name != "" && strings.Contains(arg, needle) { value := "{{ inputs.parameters." + v.Name + " }}" - arg = strings.ReplaceAll(arg, v.Name, value) - arg = strings.ReplaceAll(arg, "$"+v.Name, value) - arg = strings.ReplaceAll(arg, "$", "") + arg = strings.ReplaceAll(arg, needle, value) } } return arg @@ -204,10 +321,10 @@ func (template *Template) ReplacePerEnv(arg string, envs []models.Param) string // AddAdmiraltyAnnotations marque le template pour qu'Admiralty route le pod // vers le cluster virtuel correspondant au peerId. // -// - elect: "" déclenche le webhook Admiralty sur le pod créé par Argo. -// - nodeSelector cible le virtual node Admiralty dont le label -// multicluster.admiralty.io/cluster-name vaut peerId, -// ce qui contraint le scheduling au cluster distant. +// - elect: "" déclenche le webhook Admiralty sur le pod créé par Argo. +// - nodeSelector cible le virtual node Admiralty dont le label +// multicluster.admiralty.io/cluster-name vaut peerId, +// ce qui contraint le scheduling au cluster distant. func (t *Template) AddAdmiraltyAnnotations(peerId string) { if t.Metadata.Annotations == nil { t.Metadata.Annotations = make(map[string]string) diff --git a/models/volume.go b/models/volume.go index 8aca52d..44e2b14 100644 --- a/models/volume.go +++ b/models/volume.go @@ -17,11 +17,26 @@ type VolumeSpec struct { } `yaml:"resources"` } -// ExistingVolume references a pre-provisioned PVC (created by oc-datacenter). -// Used in Workflow.Spec.ExistingVolumes (yaml: "volumes") instead of volumeClaimTemplates. -type ExistingVolume struct { - Name string `yaml:"name"` - PersistentVolumeClaim struct { - ClaimName string `yaml:"claimName"` - } `yaml:"persistentVolumeClaim"` +// PVCRef references a pre-provisioned PersistentVolumeClaim by name. +type PVCRef struct { + ClaimName string `yaml:"claimName"` +} + +// SecretRef references a K8s Secret to mount as a volume. +type SecretRef struct { + SecretName string `yaml:"secretName"` +} + +// EmptyDirRef declares an emptyDir volume. Set Medium to "Memory" for /dev/shm-style RAM backing. +type EmptyDirRef struct { + Medium string `yaml:"medium,omitempty"` +} + +// ExistingVolume represents any volume mounted into an Argo workflow spec. +// Exactly one of PersistentVolumeClaim, Secret, or EmptyDir should be non-nil. +type ExistingVolume struct { + Name string `yaml:"name"` + PersistentVolumeClaim *PVCRef `yaml:"persistentVolumeClaim,omitempty"` + Secret *SecretRef `yaml:"secret,omitempty"` + EmptyDir *EmptyDirRef `yaml:"emptyDir,omitempty"` } diff --git a/oc-monitord b/oc-monitord index bd9d448..5fdbab8 100755 Binary files a/oc-monitord and b/oc-monitord differ diff --git a/source_minio.md b/source_minio.md new file mode 100644 index 0000000..3a233eb --- /dev/null +++ b/source_minio.md @@ -0,0 +1,542 @@ +# Source tierce dans ProcessingAccess + +## Contexte + +`ProcessingResourceAccess` (oc-lib) expose trois champs : + +```go +type ProcessingResourceAccess struct { + Source string // URL ou identifiant de la source tierce + IsReachable bool // true = accessible publiquement, false = chez un peer privé + Container *models.Container // nil si on passe par Source +} +``` + +Quand `Container` est nil et `Source` non vide, le workflow builder doit gérer l'accès à la source tierce selon la valeur de `IsReachable`. + +--- + +## Cas 1 — `isReachable = true` (source publique) + +La source est accessible via une URL publique (HTTP/S3 public, etc.). + +**Comportement dans le builder :** + +- Insérer une step Argo **avant** la step de processing courante +- Cette step effectue un `curl` de la source vers le storage lié au processing +- Si aucun storage lié → erreur immédiate +- La commande du processing devient `/` + +``` +[step précédente] → [step curl download] → [step processing] + ↓ + storage lié (S3 ou local) +``` + +--- + +## Cas 2 — `isReachable = false` (source privée chez le peer dépositaire) + +La source (`cmd.bin`) se trouve sur le réseau privé de PeerA, inaccessible de l'extérieur. +L'exécution a lieu **sur B**, avec des protections pour limiter l'extraction du binaire. + +### Principe général + +PeerA expose temporairement le binaire via un **Minio partagé à usage unique**, +avec des credentials éphémères. B exécute le binaire en mémoire sans le persister sur disque. + +--- + +### Protocole étape par étape + +#### 1. Demande d'accès via NATS (à la construction du template) + +`oc-monitord` (B) envoie un message NATS à `oc-discovery` : + +``` +Subject : PB_SOURCE_REQUEST +Payload : { + executions_id : string, + source_key : string, // clé opaque — B ne connaît pas le path réel + peer_id_src : string, // PeerA + peer_id_dst : string, // PeerB + ttl : duration, // durée du booking + coupling : { // résumé du couplage pour vérification AE côté A + compute_peer : string, + storage_peers : []string, + data_peers : []string + } +} +``` + +`oc-discovery` transmet la demande à A et attend sa réponse. + +#### 2. A vérifie les AE et génère une pre-signed URL à usage unique + +A reçoit la demande, vérifie : +1. Que `source_key` correspond bien à une source qu'il détient +2. Que le couplage décrit respecte ses **Autorisations d'Exploitation** (voir section dédiée) + +Si les deux conditions sont remplies, A monte le fichier réel dans son Minio interne +et génère une pre-signed URL Minio avec : + +- `max-download-count = 1` → révoquée après le premier GET +- TTL = durée du booking + +Le path réel n'est jamais transmis à B — seule l'URL pre-signed est retournée. +Si une AE est violée → refus + événement `AE_VIOLATION` (voir section AE). + +#### 3. `oc-monitord` crée un Kubernetes Secret éphémère + +La pre-signed URL revient à B via la réponse NATS. +`oc-monitord` crée un Secret Kubernetes dans le namespace Argo **juste avant** la soumission +du workflow, avec une `ownerReference` sur le workflow (suppression automatique à la fin) : + +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: source- + ownerReferences: + - kind: Workflow + name: +data: + url: +``` + +L'URL n'apparaît **jamais** dans le spec du workflow (pas de trace dans `kubectl get workflow -o yaml`). + +#### 4. Génération de la step Argo + +Le builder injecte un wrapper script comme commande de la step processing : + +```sh +curl -s "$PRESIGNED_URL" -o /dev/shm/.exec +chmod +x /dev/shm/.exec +/dev/shm/.exec "$@" & +PID=$! +rm -f /dev/shm/.exec +wait $PID +``` + +La variable `PRESIGNED_URL` est injectée via `env.valueFrom.secretKeyRef` → jamais en clair dans le spec. + +Le volume éphémère est déclaré en mémoire : + +```yaml +volumes: + - name: ephemeral-bin + emptyDir: + medium: Memory # tmpfs — rien sur disque, détruit avec le pod +``` + +--- + +### Protections et limites + +| Vecteur d'attaque | Protection | Efficacité | +|---|---|---| +| Re-téléchargement depuis l'URL | Usage unique — URL révoquée après le 1er GET | Forte | +| Copie du fichier depuis le pod (`kubectl exec cp`) | `rm` immédiat après exec — fichier absent du FS | Forte (non-root) | +| Extraction depuis le disque du node | `medium: Memory` — rien écrit sur disque | Forte | +| Lecture de l'URL dans le spec Argo | Kubernetes Secret + `valueFrom.secretKeyRef` | Forte | +| Identité / path réel de la source | Clé opaque — B ne connaît que la clé, pas le path | Forte | +| `/proc/PID/exe` (root sur le node B) | **Aucune** — voir ci-dessous | Nulle | +| `/proc/PID/mem` + `ptrace` / `gdb` (root) | **Aucune** | Nulle | +| `criu dump` (snapshot mémoire container) | **Aucune** | Nulle | + +### Angle mort : `/proc/PID/exe` + +Le `rm` sur `/dev/shm/.exec` supprime l'entrée dans le répertoire mais **pas l'inode** — le kernel +maintient une référence ouverte tant que le process tourne. Un admin root sur le node de B peut +récupérer le binaire intact en une commande pendant l'exécution : + +```sh +cp /proc/$(pgrep .exec)/exe /tmp/recovered_binary +``` + +De même, `/proc/PID/mem` combiné à `/proc/PID/maps` permet de dumper les segments text/data, +et `ptrace` / `gdb --pid` d'attacher un debugger. Ces vecteurs sont **incontournables** par +des moyens purement logiciels si B est root sur son propre cluster. + +Les protections en place couvrent donc : +- Les utilisateurs non-root / autres pods +- La persistence accidentelle (disque, logs, artefacts) +- Le re-téléchargement après exécution + +Elles **ne protègent pas** contre un opérateur de B activement malveillant avec accès root. + +--- + +### Options si le niveau de confiance en B est faible + +#### ~~Option 1 — Exécution chez A~~ *(écarté)* + +Dispatcher la step Argo sur l'infrastructure de A via Admiralty / virtual kubelet. +**Écarté** : le service couple un datacenter — le couplage physique d'infrastructure est +hors scope dans l'architecture actuelle. + +#### Option 2 — Confidential Computing (Intel TDX / SGX) + +La mémoire du pod est chiffrée au niveau hardware. Inaccessible au kernel et à root. +Nécessite du hardware compatible côté B — non déployable universellement. + +#### Option 3 — Binaire chiffré + clé en mémoire à usage unique + +A transmet un binaire chiffré (AES-GCM). Un sidecar sécurisé injecte la clé de déchiffrement +en mémoire uniquement, via un channel éphémère (ex. socket Unix dans le pod). Le loader déchiffre +en RAM et exécute sans jamais écrire le binaire en clair. + +- `/proc/PID/exe` → blob chiffré inutilisable ✅ +- `/proc/PID/mem` → expose le binaire déchiffré à l'exécution ❌ (angle mort résiduel) + +Complexe à implémenter, ralentit le démarrage, mais ferme le vecteur `exe`. + +#### Option 4 — OCI Image Encryption (ocicrypt) pour les images de conteneur + +Complémentaire de l'Option 3 pour les images Docker (pas les binaires bruts). +Les layers de l'image sont chiffrés dans le registry. La clé est délivrée par un KMS +uniquement au moment du pull, uniquement à un kubelet autorisé. +Ce que root voit sur le disque du node = des blobs chiffrés inutilisables. +Supporté par containerd avec plugin. + +> **Pourquoi c'est nécessaire :** quand containerd pull une image, les layers sont stockés +> dans `/var/lib/containerd/` sur le node. Un root peut les exporter intégralement : +> `ctr images export image.tar registry/image:tag`. Les credentials éphémères limitent +> le re-pull mais pas la copie de ce qui est déjà sur le node. + +--- + +### Synthèse — choix selon le niveau de confiance en B + +| Niveau de confiance en B | Solution recommandée | +|---|---| +| Peer de confiance (contrat, audit) | Protections best-effort actuelles + AE | +| Peer partiellement de confiance | Option 3 (binaire chiffré) + AE | +| Peer non de confiance | Option 2 (Confidential Computing) + AE | + +### Architecture de protection par couches + +``` +Couche 1 — Protections techniques → limitent l'extraction physique (tmpfs, URL unique, Secret K8s) +Couche 2 — Chiffrement (Options 3/4) → limitent l'utilité d'une extraction réussie +Couche 3 — Autorisations d'Exploitation (AE) → limitent l'usage contextuel (couplage, peers autorisés, quota) +Couche 4 — Licences / Consentements → cadre légal et traçabilité opposable +``` + +--- + +## Autorisations d'Exploitation (AE) + +### Concept + +Au-delà de protéger l'accès à une ressource, A contrôle **dans quel contexte** sa ressource +peut être couplée dans un workflow. L'AE répond à la question : + +> "Mon Processing P peut être utilisé chez B, **seulement** avec le Storage S de C +> et le Compute K de B. Toute autre combinaison est refusée." + +On ne peut pas coupler dans un workflow une Data + un Storage d'un pair donné, +ou un Processing avec un Compute d'un pair donné, sans que le détenteur de ces ressources +ait explicitement accordé cette association. + +### Modèle de données (oc-lib) + +```go +type ExploitationAuthorization struct { + ID string + GrantorPeer string // A — celui qui autorise + GranteePeer string // B — celui qui reçoit le droit + Resource ResourceRef // la ressource concernée (Processing, Data, Storage...) + + Conditions ExploitAuthConditions + License *ResourceLicense // lien avec le modèle licence +} + +type ExploitAuthConditions struct { + AllowedComputePeers []string // nil = tout peer autorisé + AllowedStoragePeers []string + AllowedDataPeers []string + AllowedProcessingPeers []string + + MaxExecutions int // -1 = illimité + ExpiresAt *time.Time + ConsentRequired bool +} +``` + +### Stockage des AE — oc-catalog (copie distribuée) + +Les AE sont publiées dans **oc-catalog**, dont chaque peer détient une **copie locale synchronisée**. + +**Rationale :** même si un peer B tente de tricher en construisant un workflow non autorisé, +A dispose de sa propre copie des règles et peut analyser le couplage reçu dans le payload +`PB_SOURCE_REQUEST` au moment du booking, puis **refuser** si une AE est violée. + +Ce mécanisme est auto-défensif : A ne dépend pas de la bonne foi de B pour appliquer ses règles. + +### Violation d'AE — acte critique et score de réputation + +Tenter de contourner une AE est un **acte critique** qui est : + +1. Détecté par A au moment du booking (vérification indépendante côté A) +2. Enregistré dans oc-catalog via un événement `AE_VIOLATION` +3. Sanctionné par une **dégradation sérieuse du score de réputation** de B + +Un peer dont le score s'effondre perd progressivement la possibilité de booker +des ressources chez d'autres peers — c'est la dissuasion réseau. + +``` +Subject : AE_VIOLATION +Payload : { + violator_peer_id : string, + grantor_peer_id : string, + resource_id : string, + workflow_id : string, + timestamp : time, + attempted_coupling : { // couplage tenté vs. AE en vigueur + compute_peer : string, + storage_peers : []string, + data_peers : []string + } +} +``` + +### Vérification dans le workflow builder (côté B) + +Au moment de la construction du template, avant soumission Argo, B vérifie +en local (sur sa copie des AE) la légitimité du couplage : + +```go +for _, res := range workflow.Resources { + if res.PeerID != localPeerID { + ae, err := catalog.GetExploitationAuthorization(res.PeerID, localPeerID, res.ID) + if err != nil || !ae.AllowsCoupling(workflow.ComputePeer, workflow.StoragePeers, workflow.DataPeers) { + return nil, ErrUnauthorizedCoupling{Resource: res.ID, Peer: res.PeerID} + } + } +} +``` + +La vérification côté B est une première barrière ; la vérification côté A au booking +est la barrière souveraine (celle que B ne peut pas contourner). + +### Ce que les AE résolvent vs. les protections techniques + +| Vecteur | Protection technique | AE | +|---|---|---| +| Copie du binaire | Partielle (loader chiffré, tmpfs) | ✗ hors scope | +| Ré-exécution dans un workflow non autorisé | ✗ aucune | ✅ couplage rejeté + score dégradé | +| Exfiltration vers un storage non autorisé | ✗ aucune | ✅ storage peer non listé → rejet | +| Usage du Processing avec une Data non autorisée | ✗ aucune | ✅ data peer non listé → rejet | +| Traçabilité légale | Partielle (logs) | ✅ violation enregistrée dans oc-catalog | +| Dissuasion | Faible | ✅ dégradation de score = conséquence réseau réelle | + +--- + +## Licences et Consentements (oc-lib `Resource`) + +Couche légale complémentaire aux AE. Chaque ressource porte ses conditions d'usage. + +```go +type ResourceLicense struct { + SPDX string // ex. "Apache-2.0", "Proprietary" + ConsentURL string // lien vers les CGU / texte de licence complet + ConsentRequired bool // si true → workflow bloqué jusqu'à ACK explicite + MaxExecCount int // -1 = illimité, sinon quota d'exécutions + ExpiresAt *time.Time +} +``` + +Le workflow builder bloque la soumission si `ConsentRequired && !consent_recorded`. +Le consentement est enregistré dans oc-catalog (horodatage + identité du peer) +pour constituer une trace opposable. + +Le `MaxExecCount` croise avec la pre-signed URL à usage unique (Cas 2) — les deux +limiteurs se renforcent mutuellement. + +--- + +## Changements dans le workflow builder + +``` +if access.Container == nil && access.Source != "" { + if access.IsReachable { + // Ajoute une step curl avant la step courante + // Commande = / + } else { + // 1. Vérifier les AE locales pour le couplage du workflow + // 2. waitForConsiders(PROCESSING_RESOURCE, peerA) via NATS + // → payload inclut le résumé de couplage (coupling{}) pour vérification AE côté A + // → reçoit presigned URL en réponse (NATS reply), ou erreur AE_VIOLATION + // 3. Crée K8s Secret éphémère avec ownerRef sur le workflow + // 4. Injecte volume emptyDir medium:Memory + // 5. Injecte wrapper script + env secretKeyRef dans la step + } +} +``` + +Le hook naturel est le mécanisme `waitForConsiders` / `PB_CONSIDERS` déjà en place +pour `STORAGE_RESOURCE` — à étendre avec un `PROCESSING_RESOURCE` source. +Le payload doit être enrichi du **résumé de couplage** (peers impliqués, ressources) +pour permettre la vérification AE souveraine côté A. + +--- + +## Data — même système que Processing, même lacune à combler + +### Constat actuel + +Aujourd'hui, une **Data n'est jamais reliée à un Storage dans un workflow** — c'est un manque. +Elle devrait l'être, exactement comme un Processing est relié à un Compute. + +### Ce que ça implique dans le builder + +Une Data avec une `source` doit déclencher le même mécanisme que Processing, +à ceci près que la destination n'est pas `/dev/shm` (exécution en mémoire) mais +le **Storage lié à la Data dans le workflow**. + +Avant toute step de processing qui consomme ce Storage, le builder doit vérifier +si la Data source a déjà été copiée dedans. Sinon, il injecte une step de transfert. + +``` +Cas isReachable = true (source publique) : + +[step curl Data→Storage] → [step processing qui lit depuis Storage] + ↓ + Storage lié à la Data + +Cas isReachable = false (source privée) : + +[step wrapper NATS/Minio → Storage] → [step processing qui lit depuis Storage] + ↓ + même protocole que Processing : + PB_SOURCE_REQUEST → pre-signed URL → télécharge dans le Storage (pas en mémoire) +``` + +La différence clé avec Processing : +- **Processing** : la source est un exécutable → téléchargé en `/dev/shm`, exécuté, supprimé +- **Data** : la source est une donnée → téléchargée dans le Storage lié, persistée pour le processing + +### Changements requis + +**oc-lib** : `DataResourceAccess` (ou généraliser `ResourceAccess`) doit exposer les mêmes champs : + +```go +type DataResourceAccess struct { + Source string // URL ou clé opaque de la source + IsReachable bool + Container *models.Container // nil si on passe par Source + // + lien vers le Storage cible dans le workflow (à définir) +} +``` + +**workflow builder** : même logique que Processing — + +``` +if data.access.Container == nil && data.access.Source != "" { + if data.access.IsReachable { + // Injecte step curl source → Storage lié + // avant toute step processing consommant ce Storage + } else { + // Même protocole NATS/Minio que Processing + // mais curl destination = Storage lié (S3 mount), pas /dev/shm + // pas de rm après (la donnée doit persister) + // pas de wrapper exec — juste le téléchargement + } +} +``` + +**Workflow** : le lien `Data → Storage` doit être modélisé explicitement +(aujourd'hui absent — c'est le prérequis de tout le reste). + +Les AE s'appliquent de la même façon : A peut restreindre l'usage de sa Data +à certains peers de storage ou de compute. + +--- + +## Validation d'intégrité du workflow — double barrière + +### Principe + +La validation de l'intégrité d'un workflow ne doit **jamais** reposer uniquement sur oc-front. +Le front peut être bypassé (appel API direct, client custom, bug). La règle est : + +> **oc-front valide pour l'UX. oc-scheduler valide pour la sécurité.** + +C'est le même principe que la double vérification des AE (B vérifie en local, A vérifie au booking). + +### Liens obligatoires à valider + +| Lien | Manquant → | +|---|---| +| `Processing → Compute` | Le processing ne peut pas s'exécuter | +| `Data → Storage` | La donnée n'a nulle part où atterrir | +| `Data.source → Storage lié` | La step de téléchargement ne peut pas être générée | +| `Processing.source → Storage lié` (si isReachable) | Idem | + +### oc-front — enforcement UX + +- Bloquer la soumission d'un workflow si un Processing n'a pas de Compute lié +- Bloquer si une Data avec source n'a pas de Storage lié +- Afficher les erreurs inline sur le graphe du workflow (arête manquante = erreur visuelle) +- Ces contrôles sont de la **prévention UX** — ils aident l'utilisateur, ils ne garantissent rien + +### oc-scheduler — validation souveraine + +Avant d'accepter un workflow pour scheduling, oc-scheduler effectue une **validation d'intégrité structurelle** : + +``` +1. Vérifier que tout Processing a un Compute lié +2. Vérifier que toute Data avec source a un Storage lié +3. Vérifier que les liens source → storage sont cohérents avec les accès déclarés +4. Vérifier les AE pour chaque ressource externe (copie locale oc-catalog) +5. Vérifier les consentements de licence requis +``` + +Si une règle est violée → **rejet immédiat** du workflow avec code d'erreur explicite. +Pas de tentative de correction silencieuse — le workflow est invalide tel quel. + +Cette validation se fait **indépendamment de la source de la soumission** (front, API, CLI, autre service). + +### Analogie avec la vérification AE + +``` +oc-front vérifie les liens ←→ B vérifie les AE en local +oc-scheduler valide en entrée ←→ A vérifie les AE au booking + +Dans les deux cas : la barrière arrière est souveraine et ne fait pas confiance à la barrière avant. +``` + +--- + +## Évolutions oc-front + +Dans la page/workflow, les Détails pour **Processing** et **Data** doivent afficher (readonly) : +- Si `Container` non nil → afficher le conteneur +- Sinon → afficher la source et son mode d'accès (`isReachable` ou privé) +- La clé source est rendue via un générateur de clé opaque : humainement lisible, + ressemblant à un path mais ne correspondant pas au path réel — dissociation intentionnelle, + à expliquer dans l'UI +- Les AE et licences associées à la ressource (résumé lisible) +- **Erreurs inline** sur le graphe si un lien obligatoire est manquant (Processing sans Compute, Data sans Storage) + +--- + +## Intégration oc-catalog + +- Lors du `POST` d'une ressource avec `source != ""` → création automatique de la clé opaque + et enregistrement dans la table privée de A (`source_key → real_path`) +- Publication des AE dans oc-catalog à la création/modification d'une ressource ; + chaque peer maintient une copie locale synchronisée +- Enregistrement des violations AE (`AE_VIOLATION`) et mise à jour du score de réputation +- Enregistrement des consentements de licence (horodatage + identité du peer) + +--- + +## Évolution future — table privée de clés opaques + +Plutôt que de transmettre l'URL pre-signed via NATS (canal réseau), +A maintient une table interne `source_key → real_path` et résout lui-même la clé +au moment de monter le fichier dans son Minio. B ne reçoit que la pre-signed URL, +sans aucune information sur ce qu'elle contient. diff --git a/tools/interface.go b/tools/interface.go index 083a498..27bfc00 100644 --- a/tools/interface.go +++ b/tools/interface.go @@ -4,6 +4,7 @@ import ( "errors" "io" + wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/watch" ) @@ -11,7 +12,12 @@ import ( type Tool interface { CreateArgoWorkflow(path string, ns string) (string, error) CreateAccessSecret(user string, password string, storageId string, namespace string) (string, error) + // CreateSourceSecret creates an ephemeral K8s Secret holding a pre-signed URL + // for a private source resource. The secret is labelled with the execution ID + // so it can be bulk-cleaned up after workflow completion. + CreateSourceSecret(secretName, presignedURL, executionID, namespace string) error GetArgoWatch(executionId string, wfName string) (watch.Interface, error) + GetArgoWorkflow(ns string, wfName string) (*wfv1.Workflow, error) GetPodLogger(ns string, wfName string, podName string) (io.ReadCloser, error) GetS3Secret(storageId string, namespace string) *v1.Secret } diff --git a/tools/kubernetes.go b/tools/kubernetes.go index f93692c..e63213e 100644 --- a/tools/kubernetes.go +++ b/tools/kubernetes.go @@ -75,7 +75,6 @@ func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, er if !ok { return "", errors.New("decoded object is not a Workflow") } - fmt.Println("NAMESPACE", ns) // Create the workflow in the "argo" namespace createdWf, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(ns).Create(context.TODO(), workflow, metav1.CreateOptions{}) if err != nil { @@ -113,6 +112,32 @@ func (k *KubernetesTools) CreateAccessSecret(access string, password string, sto return name, nil } +// CreateSourceSecret creates an ephemeral Opaque Secret containing a pre-signed URL +// for a private source resource. The secret is labelled with the execution ID so +// it can be bulk-cleaned up after workflow completion. +func (k *KubernetesTools) CreateSourceSecret(secretName, presignedURL, executionID, namespace string) error { + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: secretName, + Namespace: namespace, + Labels: map[string]string{ + "oc-execution-id": executionID, + "oc-managed-by": "oc-monitord", + "oc-secret-type": "source-presigned", + }, + }, + Type: v1.SecretTypeOpaque, + Data: map[string][]byte{ + "presigned-url": []byte(presignedURL), + }, + } + _, err := k.Set.CoreV1().Secrets(namespace).Create(context.TODO(), secret, metav1.CreateOptions{}) + if err != nil && !k8serrors.IsAlreadyExists(err) { + return fmt.Errorf("error creating source secret %s: %w", secretName, err) + } + return nil +} + func (k *KubernetesTools) GetS3Secret(storageId string, namespace string) *v1.Secret { secret, err := k.Set.CoreV1().Secrets(namespace).Get(context.TODO(), storageId+"-secret-s3", metav1.GetOptions{}) @@ -139,7 +164,10 @@ func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch } return watcher, nil +} +func (k *KubernetesTools) GetArgoWorkflow(ns string, wfName string) (*wfv1.Workflow, error) { + return k.VersionedSet.ArgoprojV1alpha1().Workflows(ns).Get(context.TODO(), wfName, metav1.GetOptions{}) } func (k *KubernetesTools) GetPodLogger(ns string, wfName string, nodeName string) (io.ReadCloser, error) { diff --git a/utils/utils.go b/utils/utils.go index 2faa38b..4d12e43 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -1,12 +1,15 @@ package utils import ( + "encoding/json" "oc-monitord/conf" "sync" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/logs" + "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" + "cloud.o-forge.io/core/oc-lib/tools" "github.com/rs/zerolog" ) @@ -36,6 +39,31 @@ func GetLogger() zerolog.Logger { return logger } +// EmitExecStateUpdate loads the execution, sets its state and emits a +// CREATE_RESOURCE NATS event so oc-scheduler applies the change and fires +// NotifyChange for the WebSocket streams. +// Direct UpdateOne calls are replaced by this function so oc-scheduler remains +// the single writer for WorkflowExecution. +func EmitExecStateUpdate(execID string, state enum.BookingStatus) { + adminReq := &tools.APIRequest{Admin: true} + res, _, err := workflow_execution.NewAccessor(adminReq).LoadOne(execID) + if err != nil || res == nil { + return + } + exec := res.(*workflow_execution.WorkflowExecution) + exec.State = state + payload, marshalErr := json.Marshal(exec) + if marshalErr != nil { + return + } + tools.NewNATSCaller().SetNATSPub(tools.CREATE_RESOURCE, tools.NATSResponse{ + FromApp: "oc-monitord", + Datatype: tools.WORKFLOW_EXECUTION, + Method: int(tools.CREATE_RESOURCE), + Payload: payload, + }) +} + func GetWFLogger(workflowName string) zerolog.Logger { onceWF.Do(func(){ wf_logger = logger.With(). diff --git a/workflow_builder/argo_builder.go b/workflow_builder/argo_builder.go index 8df264d..65c2612 100644 --- a/workflow_builder/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -8,6 +8,7 @@ import ( "fmt" "oc-monitord/conf" . "oc-monitord/models" + "sort" "sync" "os" @@ -80,23 +81,23 @@ func (b *Workflow) getDag() *Dag { // PodSecurityContext mirrors the subset of k8s PodSecurityContext used by Argo. type PodSecurityContext struct { - RunAsUser *int64 `yaml:"runAsUser,omitempty"` + RunAsUser *int64 `yaml:"runAsUser,omitempty"` RunAsGroup *int64 `yaml:"runAsGroup,omitempty"` - FSGroup *int64 `yaml:"fsGroup,omitempty"` + FSGroup *int64 `yaml:"fsGroup,omitempty"` } // Spec contient la spécification complète du workflow Argo : // compte de service, point d'entrée, volumes, templates et timeout. type Spec struct { - ArtifactRepositoryRef - ServiceAccountName string `yaml:"serviceAccountName,omitempty"` - Entrypoint string `yaml:"entrypoint"` - Arguments []Parameter `yaml:"arguments,omitempty"` - Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"` - ExistingVolumes []ExistingVolume `yaml:"volumes,omitempty"` - Templates []Template `yaml:"templates"` - Timeout int `yaml:"activeDeadlineSeconds,omitempty"` - SecurityContext *PodSecurityContext `yaml:"securityContext,omitempty"` + ArtifactRepositoryRef *ArtifactRepositoryRef `yaml:"artifactRepositoryRef,omitempty"` + ServiceAccountName string `yaml:"serviceAccountName,omitempty"` + Entrypoint string `yaml:"entrypoint"` + Arguments []Parameter `yaml:"arguments,omitempty"` + Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"` + ExistingVolumes []ExistingVolume `yaml:"volumes,omitempty"` + Templates []Template `yaml:"templates"` + Timeout int `yaml:"activeDeadlineSeconds,omitempty"` + SecurityContext *PodSecurityContext `yaml:"securityContext,omitempty"` } // CreateDAG est le point d'entrée de la construction du DAG Argo. @@ -151,10 +152,11 @@ func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution if d, ok := exec.SelectedInstances[res.GetID()]; ok { index = d } - instance := item.Processing.GetSelectedInstance(&index) - logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance)) - if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil { - logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName()) + instance := item.ItemResource.Processing.GetSelectedInstance(&index) + logger.Info().Msg(fmt.Sprint("Creating template for", item.ItemResource.Processing.GetName(), instance)) + procInst, _ := instance.(*resources.ProcessingInstance) + if instance == nil || procInst == nil || procInst.Access == nil || (procInst.Access.Container == nil && !procInst.Access.HasSource()) { + logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.ItemResource.Processing.GetName()) return firstItems, lastItems, volumes, nil } // Un même processing peut être bookié sur plusieurs peers : on crée @@ -162,16 +164,55 @@ func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution for _, pb := range getAllPeersForItem(exec, item.ID) { var err error volumes, firstItems, lastItems, err = b.createArgoTemplates(exec, - namespace, item.ID, pb.PeerID, pb.BookingID, item.Processing, volumes, firstItems, lastItems) + namespace, item.ID, pb.PeerID, pb.BookingID, item.ItemResource.Processing, volumes, firstItems, lastItems) if err != nil { return firstItems, lastItems, volumes, err } } } + // --- Service Resources --- + // HOSTED : le creator_id identifie le peer propriétaire du compute à contacter ; + // pas de lien avec un compute unit nécessaire. + // DEPLOYMENT : le service doit être déployé sur un compute booké (comme un processing). + for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsService) { + index := 0 + _, res := item.GetResource() + if d, ok := exec.SelectedInstances[res.GetID()]; ok { + index = d + } + instance := item.ItemResource.Service.GetSelectedInstance(&index) + logger.Info().Msg(fmt.Sprint("Creating template for service", item.ItemResource.Service.GetName(), instance)) + if instance == nil { + logger.Error().Msg("Not enough configuration setup, service template can't be created : " + item.ItemResource.Service.GetName()) + continue + } + svcInst := instance.(*resources.ServiceInstance) + if svcInst.Mode == resources.HOSTED { + // HOSTED : le creator_id suffit à identifier le peer cible. + peerID := item.ItemResource.Service.GetCreatorID() + var err error + volumes, firstItems, lastItems, err = b.createArgoTemplates(exec, + namespace, item.ID, peerID, item.ID, item.ItemResource.Service, volumes, firstItems, lastItems) + if err != nil { + return firstItems, lastItems, volumes, err + } + } else { + // DEPLOYMENT : un template par peer booké, comme les processings. + for _, pb := range getAllPeersForItem(exec, item.ID) { + var err error + volumes, firstItems, lastItems, err = b.createArgoTemplates(exec, + namespace, item.ID, pb.PeerID, pb.BookingID, item.ItemResource.Service, volumes, firstItems, lastItems) + if err != nil { + return firstItems, lastItems, volumes, err + } + } + } + } + // --- Native Tools de type WORKFLOW_EVENT uniquement --- for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsNativeTool) { - if item.NativeTool.Kind != int(native_tools.WORKFLOW_EVENT) { + if item.ItemResource.NativeTool.Kind != int(native_tools.WORKFLOW_EVENT) { continue } index := 0 @@ -179,11 +220,14 @@ func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution if d, ok := exec.SelectedInstances[res.GetID()]; ok { index = d } - instance := item.NativeTool.GetSelectedInstance(&index) - logger.Info().Msg(fmt.Sprint("Creating template for", item.NativeTool.GetName(), instance)) + instance := item.ItemResource.NativeTool.GetSelectedInstance(&index) + logger.Info().Msg(fmt.Sprint("Creating template for", item.ItemResource.NativeTool.GetName(), instance)) + // Résolution du peer cible : distant si un compute directement connecté + // est distant, local sinon (aucun compute ou compute local). + peerID := b.getNativeToolPeer(item.ID) var err error volumes, firstItems, lastItems, err = b.createArgoTemplates(exec, - namespace, item.ID, "", item.ID, item.NativeTool, volumes, firstItems, lastItems) + namespace, item.ID, peerID, item.ID, item.ItemResource.NativeTool, volumes, firstItems, lastItems) if err != nil { return firstItems, lastItems, volumes, err } @@ -193,7 +237,7 @@ func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution firstWfTasks := map[string][]string{} latestWfTasks := map[string][]string{} relatedWfTasks := map[string][]string{} - for _, wf := range b.OriginWorkflow.Workflows { + for _, wf := range b.OriginWorkflow.ResourceSet.Workflows { realWorkflow, code, err := w.NewAccessor(nil).LoadOne(wf) if code != 200 { logger.Error().Msg("Error loading the workflow : " + err.Error()) @@ -257,6 +301,12 @@ func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution } } + // --- Sources Data (isReachable = true ET false) --- + // Injecte les steps curl/wrapper pour les Data avec source, APRÈS que toutes les + // steps processing ont été ajoutées au DAG (dépendances câblées). + // Phase 3 (public) et Phase 4 (privé) sont gérées dans HandleDataSources. + b.HandleDataSources(exec, namespace) + // Si des services Kubernetes sont nécessaires, on ajoute le pod dédié. if b.Services != nil { dag := b.Workflow.getDag() @@ -289,17 +339,61 @@ func (b *ArgoBuilder) createArgoTemplates( template := &Template{Name: getArgoName(obj.GetName(), bookingID)} logger.Info().Msg(fmt.Sprint("Creating template for", template.Name)) + // Résoudre le peer en amont pour que le NativeTool puisse choisir l'URL NATS cible. + isReparted, remotePeer := b.isPeerReparted(peerID) + if obj.GetType() == tools.PROCESSING_RESOURCE.String() { - template.CreateContainer(exec, obj.(*resources.ProcessingResource), b.Workflow.getDag()) + proc := obj.(*resources.ProcessingResource) + index := 0 + if d, ok := exec.SelectedInstances[proc.GetID()]; ok { + index = d + } + if procInst, ok := proc.GetSelectedInstance(&index).(*resources.ProcessingInstance); ok && procInst.Access.HasSource() { + argoStepName := getArgoName(proc.GetName(), bookingID) + if procInst.Access.Source.IsReachable { + // Phase 3 — source publique : injecter une step curl directe. + if err := b.handleProcessingSource(exec, graphID, proc, procInst, argoStepName, template); err != nil { + logger.Error().Msg("[source-fetch] " + err.Error()) + return volumes, firstItems, lastItems, err + } + } else { + // Phase 4 — source privée : NATS + URL pré-signée + Secret K8s. + if err := b.handlePrivateProcessingSource(exec, graphID, proc, procInst, argoStepName, namespace); err != nil { + logger.Error().Msg("[source-private] " + err.Error()) + return volumes, firstItems, lastItems, err + } + } + } else { + template.CreateContainer(exec, b.OriginWorkflow, graphID, proc, b.Workflow.getDag()) + } } else if obj.GetType() == tools.NATIVE_TOOL.String() { - template.CreateEventContainer(exec, obj.(*resources.NativeTool), b.Workflow.getDag()) + // Pour le cas local, on utilise le FQDN cross-namespace car le pod tourne + // dans le namespace executions_id, pas dans OCNamespace (opencloud). + natsURL := conf.GetConfig().NATSPodURL() + if isReparted && remotePeer != nil && remotePeer.NATSAddress != "" { + natsURL = remotePeer.NATSAddress + } + template.CreateEventContainer(exec, graphID, b.OriginWorkflow, obj.(*resources.NativeTool), b.Workflow.getDag(), natsURL) + } else if obj.GetType() == tools.SERVICE_RESOURCE.String() { + svc := obj.(*resources.ServiceResource) + template.CreateServiceContainer(exec, b.OriginWorkflow, graphID, svc, b.Workflow.getDag()) + // Le k8s Service (NodePort/LoadBalancer) et le label "app" ne sont nécessaires + // que pour DEPLOYMENT : le service est déployé et doit être exposé. + // Pour HOSTED, le service tourne déjà chez son créateur, aucune exposition locale. + svcIndex := 0 + if d, ok := exec.SelectedInstances[svc.GetID()]; ok { + svcIndex = d + } + if inst, ok := svc.GetSelectedInstance(&svcIndex).(*resources.ServiceInstance); ok && inst.Mode == resources.DEPLOYMENT { + b.CreateService(exec, graphID, obj) + template.Metadata.Labels = make(map[string]string) + template.Metadata.Labels["app"] = "oc-service-" + obj.GetName() + } } // Enregistre l'image pour le pre-pull sur le peer cible. // peerID == "" désigne le peer local (clé "" dans PeerImages). b.addPeerImage(peerID, template.Container.Image) - // Vérifie si le peer est distant (Admiralty). - isReparted, remotePeer := b.isPeerReparted(peerID) if isReparted { logger.Debug().Msg("Reparted processing, on " + remotePeer.GetID()) b.RemotePeers = append(b.RemotePeers, remotePeer.GetID()) @@ -309,14 +403,6 @@ func (b *ArgoBuilder) createArgoTemplates( b.HasLocalCompute = true } - // Si le processing expose un service Kubernetes, on l'enregistre et on - // applique le label "app" pour que le Service puisse le sélectionner. - if obj.GetType() == tools.PROCESSING_RESOURCE.String() && obj.(*resources.ProcessingResource).IsService { - b.CreateService(exec, graphID, obj) - template.Metadata.Labels = make(map[string]string) - template.Metadata.Labels["app"] = "oc-service-" + obj.GetName() - } - var err error volumes, err = b.addStorageAnnotations(exec, graphID, template, namespace, volumes, isReparted) if err != nil { @@ -340,19 +426,25 @@ func (b *ArgoBuilder) addStorageAnnotations(exec *workflow_execution.WorkflowExe related := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.Graph.IsStorage) for _, r := range related { - storage := r.Node.(*resources.StorageResource) - for _, linkToStorage := range r.Links { + n := r.Node + storage := n.(*resources.StorageResource) + for _, linkToStorage := range r.Links { //nolint:govet for _, rw := range linkToStorage.StorageLinkInfos { var art Artifact // Le nom de l'artefact doit être alphanumérique + '-' ou '_'. artifactBaseName := strings.Join(strings.Split(storage.GetName(), " "), "-") + "-" + strings.Replace(rw.FileName, ".", "-", -1) + envs := []Parameter{} + for _, p := range linkToStorage.Env { + envs = append(envs, Parameter{Name: p.Name}) + } if rw.Write { // Écriture vers S3 : Path = chemin du fichier dans le pod. - art = Artifact{Path: template.ReplacePerEnv(rw.Source, linkToStorage.Env)} + art = Artifact{Path: template.ReplacePerEnv(rw.Source, envs)} art.Name = artifactBaseName + "-input-write" } else { + // Lecture depuis S3 : Path = destination dans le pod. - art = Artifact{Path: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env)} + art = Artifact{Path: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, envs)} art.Name = artifactBaseName + "-input-read" } @@ -430,6 +522,97 @@ func (b *ArgoBuilder) addStorageAnnotations(exec *workflow_execution.WorkflowExe }, volumes) } } + // Embedded storages: scan links for compute nodes connected to this processing. + // Key in SelectedEmbeddedStorages is the graph item ID (not resource ID), so we + // iterate links directly to preserve the graph position identity. + for _, link := range b.OriginWorkflow.Graph.Links { + var computeGraphID string + if link.Source.ID == id && b.OriginWorkflow.Graph.IsCompute(b.OriginWorkflow.Graph.Items[link.Destination.ID]) { + computeGraphID = link.Destination.ID + } else if link.Destination.ID == id && b.OriginWorkflow.Graph.IsCompute(b.OriginWorkflow.Graph.Items[link.Source.ID]) { + computeGraphID = link.Source.ID + } + if computeGraphID == "" { + continue + } + sel, ok := exec.SelectedEmbeddedStorages[computeGraphID] + if !ok || sel == nil { + continue + } + c := b.OriginWorkflow.Graph.Items[computeGraphID] + _, computeRes := (&c).GetResource() + computeResource := computeRes.(*resources.ComputeResource) + computeIdx := 0 + if d, ok := exec.SelectedInstances[computeResource.GetID()]; ok { + computeIdx = d + } + if computeIdx >= len(computeResource.Instances) { + continue + } + computeInst := computeResource.Instances[computeIdx] + if sel.StorageIndex >= len(computeInst.AvailableStorages) { + continue + } + storage := computeInst.AvailableStorages[sel.StorageIndex] + + if storage.StorageType == enum.S3 { + relatedProcessing := b.getStorageRelatedProcessing(storage.GetID()) + var wg sync.WaitGroup + errCh := make(chan error, len(relatedProcessing)) + for _, rp := range relatedProcessing { + wg.Add(1) + go waitForConsiders(exec.ExecutionsID, tools.STORAGE_RESOURCE, ArgoKubeEvent{ + ExecutionsID: exec.ExecutionsID, + DestPeerID: rp.GetID(), + Type: tools.STORAGE_RESOURCE, + SourcePeerID: storage.GetCreatorID(), + OriginID: conf.GetConfig().PeerID, + }, &wg, errCh) + } + wg.Wait() + close(errCh) + for err := range errCh { + if err != nil { + return volumes, err + } + } + b.addS3annotations(storage, namespace) + } else { + // Local volume / Minio: provision PVC via oc-datacenter then mount it. + var pvcWg sync.WaitGroup + pvcErrCh := make(chan error, 1) + pvcWg.Add(1) + go waitForConsiders(exec.ExecutionsID, tools.STORAGE_RESOURCE, ArgoKubeEvent{ + ExecutionsID: exec.ExecutionsID, + Type: tools.STORAGE_RESOURCE, + SourcePeerID: conf.GetConfig().PeerID, + DestPeerID: conf.GetConfig().PeerID, + OriginID: conf.GetConfig().PeerID, + MinioID: storage.GetID(), + Local: true, + StorageName: storage.GetName(), + }, &pvcWg, pvcErrCh) + pvcWg.Wait() + close(pvcErrCh) + for err := range pvcErrCh { + if err != nil { + return volumes, err + } + } + // Use the first instance's source as mount path if available. + mountPath := "" + if len(storage.Instances) > 0 { + mountPath = storage.Instances[0].Source + } + volumes = template.Container.AddVolumeMount(VolumeMount{ + Name: strings.ReplaceAll(strings.ToLower(storage.GetName()), " ", "-"), + MountPath: mountPath, + Storage: storage, + IsReparted: isReparted, + }, volumes) + } + } + return volumes, nil } @@ -478,12 +661,26 @@ func (b *ArgoBuilder) getComputeProcessing(processingId string) (res []resources // du workflow Argo. La ConfigMap et la clé sont dérivées de l'ID du stockage. // Le namespace est conservé en signature pour une évolution future. func (b *ArgoBuilder) addS3annotations(storage *resources.StorageResource, namespace string) { - b.Workflow.Spec.ArtifactRepositoryRef = ArtifactRepositoryRef{ + b.Workflow.Spec.ArtifactRepositoryRef = &ArtifactRepositoryRef{ ConfigMap: storage.GetID() + "-artifact-repository", Key: storage.GetID() + "-s3-local", } } +func (b *ArgoBuilder) getRealVar(exec *workflow_execution.WorkflowExecution, val string, processing resources.ResourceInterface) string { + if strings.Contains(val, "[resource]instance.") { + attr := strings.ReplaceAll(val, "[resource]instance.", "") + index := 0 + if d, ok := exec.SelectedInstances[processing.GetID()]; ok { + index = d + } + instance := processing.GetSelectedInstance(&index) + ser := instance.Serialize(instance) + return fmt.Sprintf("%v", ser[attr]) + } + return val +} + // addTaskToArgo ajoute une tâche au DAG Argo pour le nœud graphItemID. // Elle résout les dépendances DAG, propage les paramètres d'environnement, // d'entrée et de sortie de l'instance sélectionnée, et met à jour les listes @@ -494,32 +691,77 @@ func (b *ArgoBuilder) addTaskToArgo(exec *workflow_execution.WorkflowExecution, unique_name := getArgoName(processing.GetName(), bookingID) step := Task{Name: unique_name, Template: unique_name} - - index := 0 - if d, ok := exec.SelectedInstances[processing.GetID()]; ok { - index = d + // Propagation des variables d'environnement, entrées et sorties + // de l'instance vers les paramètres de la tâche Argo. + // AppendParamIfAbsent évite les doublons quand une variable est définie + // à la fois sur le ProcessingResource et sur le Workflow (override). + for _, value := range processing.GetEnv() { + step.Arguments.Parameters = AppendParamIfAbsent(step.Arguments.Parameters, Parameter{ + Name: value.Name, + Value: b.getRealVar(exec, value.Value, processing), + }) } - instance := processing.GetSelectedInstance(&index) - if instance != nil { - // Propagation des variables d'environnement, entrées et sorties - // de l'instance vers les paramètres de la tâche Argo. - for _, value := range instance.(*resources.ProcessingInstance).Env { - step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{ - Name: value.Name, - Value: value.Value, - }) + for _, value := range b.OriginWorkflow.Env[graphItemID] { + step.Arguments.Parameters = AppendParamIfAbsent(step.Arguments.Parameters, Parameter{ + Name: value.Name, + Value: b.getRealVar(exec, value.Value, processing), + }) + } + for _, value := range processing.GetInputs() { + step.Arguments.Parameters = AppendParamIfAbsent(step.Arguments.Parameters, Parameter{ + Name: value.Name, + Value: b.getRealVar(exec, value.Value, processing), + }) + } + for _, value := range b.OriginWorkflow.Inputs[graphItemID] { + step.Arguments.Parameters = AppendParamIfAbsent(step.Arguments.Parameters, Parameter{ + Name: value.Name, + Value: b.getRealVar(exec, value.Value, processing), + }) + } + for _, value := range processing.GetOutputs() { + step.Arguments.Parameters = AppendParamIfAbsent(step.Arguments.Parameters, Parameter{ + Name: value.Name, + Value: b.getRealVar(exec, value.Value, processing), + }) + } + for _, value := range b.OriginWorkflow.Outputs[graphItemID] { + step.Arguments.Parameters = AppendParamIfAbsent(step.Arguments.Parameters, Parameter{ + Name: value.Name, + Value: b.getRealVar(exec, value.Value, processing), + }) + } + + // Résolution récursive des références $VAR_NAME entre paramètres. + // Les needles sont triées par longueur décroissante pour éviter que $FOO + // matche à l'intérieur de $FOOBAR (le plus long est substitué en premier). + // On itère jusqu'au point fixe pour gérer les dépendances transitives + // (A=$B, B=$C → après deux passes A=valeur de C). + sortedParams := make([]Parameter, len(step.Arguments.Parameters)) + copy(sortedParams, step.Arguments.Parameters) + sort.Slice(sortedParams, func(i, j int) bool { + return len(sortedParams[i].Name) > len(sortedParams[j].Name) + }) + for { + changed := false + for i := range step.Arguments.Parameters { + for _, needle_param := range sortedParams { + if step.Arguments.Parameters[i].Name == needle_param.Name { + continue + } + needle := "$" + needle_param.Name + if strings.Contains(step.Arguments.Parameters[i].Value, needle) { + step.Arguments.Parameters[i].Value = strings.ReplaceAll( + step.Arguments.Parameters[i].Value, + needle, + needle_param.Value, + ) + changed = true + } + } } - for _, value := range instance.(*resources.ProcessingInstance).Inputs { - step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{ - Name: value.Name, - Value: value.Value, - }) - } - for _, value := range instance.(*resources.ProcessingInstance).Outputs { - step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{ - Name: value.Name, - Value: value.Value, - }) + if !changed { + break } } @@ -527,11 +769,14 @@ func (b *ArgoBuilder) addTaskToArgo(exec *workflow_execution.WorkflowExecution, // Détermine si ce nœud est une première ou une dernière tâche du DAG. name := "" - if b.OriginWorkflow.Graph.Items[graphItemID].Processing != nil { - name = b.OriginWorkflow.Graph.Items[graphItemID].Processing.GetName() + if b.OriginWorkflow.Graph.Items[graphItemID].ItemResource.Processing != nil { + name = b.OriginWorkflow.Graph.Items[graphItemID].ItemResource.Processing.GetName() } - if b.OriginWorkflow.Graph.Items[graphItemID].Workflow != nil { - name = b.OriginWorkflow.Graph.Items[graphItemID].Workflow.GetName() + if b.OriginWorkflow.Graph.Items[graphItemID].ItemResource.Workflow != nil { + name = b.OriginWorkflow.Graph.Items[graphItemID].ItemResource.Workflow.GetName() + } + if b.OriginWorkflow.Graph.Items[graphItemID].ItemResource.Service != nil { + name = b.OriginWorkflow.Graph.Items[graphItemID].ItemResource.Service.GetName() } if len(step.Dependencies) == 0 && name != "" { firstItems = append(firstItems, getArgoName(name, bookingID)) @@ -557,9 +802,10 @@ func (b *ArgoBuilder) createVolumes(exec *workflow_execution.WorkflowExecution, } seen[name] = struct{}{} claimName := name + "-" + exec.ExecutionsID - ev := ExistingVolume{} - ev.Name = name - ev.PersistentVolumeClaim.ClaimName = claimName + ev := ExistingVolume{ + Name: name, + PersistentVolumeClaim: &PVCRef{ClaimName: claimName}, + } b.Workflow.Spec.ExistingVolumes = append(b.Workflow.Spec.ExistingVolumes, ev) } // hostPath PVs are created as root:root 0755. Ensure pods can read/write @@ -586,20 +832,27 @@ func (b *ArgoBuilder) isArgoDependancy(exec *workflow_execution.WorkflowExecutio logger.Info().Msg(fmt.Sprint("Could not find the source of the link", link.Destination.ID)) continue } - source := b.OriginWorkflow.Graph.Items[link.Destination.ID].Processing + source := b.OriginWorkflow.Graph.Items[link.Destination.ID].ItemResource.Processing if id == link.Source.ID && source != nil { isDeps = true for _, pb := range getAllPeersForItem(exec, link.Destination.ID) { dependancyOfIDs = append(dependancyOfIDs, getArgoName(source.GetName(), pb.BookingID)) } } - wourceWF := b.OriginWorkflow.Graph.Items[link.Destination.ID].Workflow + wourceWF := b.OriginWorkflow.Graph.Items[link.Destination.ID].ItemResource.Workflow if id == link.Source.ID && wourceWF != nil { isDeps = true for _, pb := range getAllPeersForItem(exec, link.Destination.ID) { dependancyOfIDs = append(dependancyOfIDs, getArgoName(wourceWF.GetName(), pb.BookingID)) } } + sourceSvc := b.OriginWorkflow.Graph.Items[link.Destination.ID].ItemResource.Service + if id == link.Source.ID && sourceSvc != nil { + isDeps = true + for _, pb := range getAllPeersForItem(exec, link.Destination.ID) { + dependancyOfIDs = append(dependancyOfIDs, getArgoName(sourceSvc.GetName(), pb.BookingID)) + } + } } return isDeps, dependancyOfIDs } @@ -614,12 +867,18 @@ func (b *ArgoBuilder) getArgoDependencies(exec *workflow_execution.WorkflowExecu logger.Info().Msg(fmt.Sprint("Could not find the source of the link", link.Source.ID)) continue } - source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing + source := b.OriginWorkflow.Graph.Items[link.Source.ID].ItemResource.Processing if id == link.Destination.ID && source != nil { for _, pb := range getAllPeersForItem(exec, link.Source.ID) { dependencies = append(dependencies, getArgoName(source.GetName(), pb.BookingID)) } } + sourceSvc := b.OriginWorkflow.Graph.Items[link.Source.ID].ItemResource.Service + if id == link.Destination.ID && sourceSvc != nil { + for _, pb := range getAllPeersForItem(exec, link.Source.ID) { + dependencies = append(dependencies, getArgoName(sourceSvc.GetName(), pb.BookingID)) + } + } } return } @@ -657,6 +916,24 @@ func getAllPeersForItem(exec *workflow_execution.WorkflowExecution, graphItemID return result } +// getNativeToolPeer résout le peer cible d'un NativeTool WORKFLOW_EVENT. +// Règle : si un compute est directement connecté au NativeTool dans le graphe +// et que ce compute appartient à un peer distant, on retourne ce peerID. +// Dans tous les autres cas (aucun compute connecté, ou compute local), on retourne "". +func (b *ArgoBuilder) getNativeToolPeer(graphItemID string) string { + computeRel := b.OriginWorkflow.GetByRelatedProcessing(graphItemID, b.OriginWorkflow.Graph.IsCompute) + for _, rel := range computeRel { + peerID := rel.Node.GetCreatorID() + if peerID == "" { + continue + } + if isReparted, _ := b.isPeerReparted(peerID); isReparted { + return peerID + } + } + return "" +} + // isPeerReparted vérifie si le peerID désigne un peer distant (Relation != 1). // Un peerID vide signifie exécution locale : retourne false sans appel réseau. func (b *ArgoBuilder) isPeerReparted(peerID string) (bool, *peer.Peer) { @@ -725,7 +1002,7 @@ func waitForConsiders(executionsId string, dataType tools.DataType, event ArgoKu } // ArgoKubeEvent est la structure publiée sur NATS lors de la demande de -// provisionnement d'une ressource distante (Admiralty ou stockage S3). +// provisionnement d'une ressource distante (Admiralty, stockage S3, ou source privée). // Le champ OriginID identifie le peer initiateur : c'est vers lui que la // réponse PB_CONSIDERS sera routée par le système de propagation. type ArgoKubeEvent struct { @@ -733,7 +1010,8 @@ type ArgoKubeEvent struct { ExecutionsID string `json:"executions_id"` // DestPeerID est le peer de destination (compute ou peer S3 cible). DestPeerID string `json:"dest_peer_id"` - // Type indique la nature de la ressource : COMPUTE_RESOURCE ou STORAGE_RESOURCE. + // Type indique la nature de la ressource : COMPUTE_RESOURCE, STORAGE_RESOURCE + // ou PROCESSING_RESOURCE (source privée Phase 4). Type tools.DataType `json:"data_type"` // SourcePeerID est le peer source de la ressource demandée. SourcePeerID string `json:"source_peer_id"` @@ -747,8 +1025,11 @@ type ArgoKubeEvent struct { // StorageName est le nom normalisé du storage, utilisé pour calculer le claimName. StorageName string `json:"storage_name,omitempty"` // Images est la liste des images de conteneurs à pre-pull sur le peer cible - // avant le démarrage du workflow. Vide pour les events STORAGE_RESOURCE. + // avant le démarrage du workflow. Vide pour les events STORAGE_RESOURCE / PROCESSING_RESOURCE. Images []string `json:"images,omitempty"` + // SourceResourceID est l'ID de la ressource Processing/Data dont on demande + // une URL pré-signée (Phase 4, isReachable=false uniquement). + SourceResourceID string `json:"source_resource_id,omitempty"` } // addPeerImage enregistre une image à pre-pull pour un peer donné. @@ -768,7 +1049,6 @@ func (b *ArgoBuilder) addPeerImage(peerID, image string) { b.PeerImages[peerID] = append(b.PeerImages[peerID], image) } - // CompleteBuild finalise la construction du workflow Argo après la génération // du DAG. Elle effectue dans l'ordre : // 1. Pour chaque peer distant (Admiralty) : publie un ArgoKubeEvent de type diff --git a/workflow_builder/argo_services.go b/workflow_builder/argo_services.go index 0961686..25a20da 100644 --- a/workflow_builder/argo_services.go +++ b/workflow_builder/argo_services.go @@ -30,24 +30,18 @@ func (b *ArgoBuilder) CreateService(exec *workflow_execution.WorkflowExecution, } func (b *ArgoBuilder) completeServicePorts(exec *workflow_execution.WorkflowExecution, service *models.Service, id string, processing resources.ResourceInterface) { - index := 0 - if d, ok := exec.SelectedInstances[processing.GetID()]; ok { - index = d - } - instance := processing.GetSelectedInstance(&index) - if instance != nil && instance.(*resources.ProcessingInstance).Access != nil && instance.(*resources.ProcessingInstance).Access.Container != nil { - for _, execute := range instance.(*resources.ProcessingInstance).Access.Container.Exposes { - if execute.PAT != 0 { - new_port_translation := models.ServicePort{ - Name: strings.ToLower(processing.GetName()) + id, - Port: execute.Port, - TargetPort: execute.PAT, - Protocol: "TCP", - } - service.Spec.Ports = append(service.Spec.Ports, new_port_translation) + for _, execute := range b.OriginWorkflow.Exposes[processing.GetID()] { + if execute.PAT != 0 { + new_port_translation := models.ServicePort{ + Name: strings.ToLower(processing.GetName()) + id, + Port: execute.Port, + TargetPort: execute.PAT, + Protocol: "TCP", } + service.Spec.Ports = append(service.Spec.Ports, new_port_translation) } } + } func (b *ArgoBuilder) addServiceToArgo() error { diff --git a/workflow_builder/considers_cache.go b/workflow_builder/considers_cache.go index af8d7ae..4a12fa1 100644 --- a/workflow_builder/considers_cache.go +++ b/workflow_builder/considers_cache.go @@ -10,6 +10,8 @@ import ( "cloud.o-forge.io/core/oc-lib/tools" ) +// ── considersCache (signal-only) ───────────────────────────────────────────── + // considersCache stocke les canaux en attente d'un PB_CONSIDERS, // indexés par "executionsId:dataType". Un même message NATS réveille // tous les waiters enregistrés sous la même clé (broadcast). @@ -75,9 +77,78 @@ func (c *considersCache) confirm(key string) { } } +// ── sourcePresignedCache (value-bearing) ───────────────────────────────────── + +// sourcePresignedCache stocke les canaux en attente d'une URL pré-signée pour +// une source privée (isReachable=false), indexés par la clé sourceConsidersKey. +// La valeur transportée est l'URL pré-signée elle-même. +type sourcePresignedCache struct { + mu sync.Mutex + pending map[string][]chan string +} + +var globalSourceCache = &sourcePresignedCache{ + pending: make(map[string][]chan string), +} + +// sourceConsidersKey construit une clé unique pour une demande de source privée. +// La clé encode l'executionsID, le peerID du propriétaire et le resourceID +// pour permettre des requêtes parallèles distinctes. +func sourceConsidersKey(executionsID, peerID, resourceID string) string { + return executionsID + ":src:" + peerID + ":" + resourceID +} + +// register inscrit un nouveau canal d'attente pour la clé donnée. +// Retourne le canal à lire et une fonction de désinscription à appeler en defer. +func (c *sourcePresignedCache) register(key string) (<-chan string, func()) { + ch := make(chan string, 1) + c.mu.Lock() + c.pending[key] = append(c.pending[key], ch) + c.mu.Unlock() + + unregister := func() { + c.mu.Lock() + defer c.mu.Unlock() + list := c.pending[key] + for i, existing := range list { + if existing == ch { + c.pending[key] = append(list[:i], list[i+1:]...) + break + } + } + if len(c.pending[key]) == 0 { + delete(c.pending, key) + } + } + return ch, unregister +} + +// confirm réveille tous les waiters enregistrés sous la clé donnée +// en leur transmettant l'URL pré-signée, puis les supprime du cache. +func (c *sourcePresignedCache) confirm(key, url string) { + c.mu.Lock() + list := c.pending[key] + delete(c.pending, key) + c.mu.Unlock() + + for _, ch := range list { + select { + case ch <- url: + default: + } + } +} + +// ── StartConsidersListener ──────────────────────────────────────────────────── + // StartConsidersListener démarre un abonné NATS global via ListenNats (oclib) -// qui reçoit les messages CONSIDERS_EVENT et réveille les goroutines en attente -// via globalConsidersCache. Doit être appelé une seule fois au démarrage. +// qui reçoit les messages CONSIDERS_EVENT et réveille les goroutines en attente. +// +// Deux chemins de dispatch : +// - Si presigned_url est présent dans le payload → globalSourceCache (Phase 4). +// - Sinon → globalConsidersCache (Phases COMPUTE / STORAGE, signal sans valeur). +// +// Doit être appelé une seule fois au démarrage. func StartConsidersListener() { log := logs.GetLogger() log.Info().Msg("Considers NATS listener starting on " + tools.CONSIDERS_EVENT.GenerateKey()) @@ -87,14 +158,27 @@ func StartConsidersListener() { var body struct { ExecutionsID string `json:"executions_id"` PeerID string `json:"peer_id,omitempty"` + // PresignedURL est non-vide uniquement pour les réponses de source privée (Phase 4). + PresignedURL string `json:"presigned_url,omitempty"` + // ResourceID identifie la ressource Processing/Data pour la Phase 4. + ResourceID string `json:"resource_id,omitempty"` } if err := json.Unmarshal(resp.Payload, &body); err != nil { log.Error().Msg("CONSIDERS_EVENT: cannot unmarshal payload: " + err.Error()) return } - key := considersKey(body.ExecutionsID, resp.Datatype, body.PeerID) - log.Info().Msg(fmt.Sprintf("CONSIDERS_EVENT dispatched for key=%s", key)) - globalConsidersCache.confirm(key) + + if body.PresignedURL != "" { + // Phase 4 — source privée : transmettre l'URL pré-signée. + key := sourceConsidersKey(body.ExecutionsID, body.PeerID, body.ResourceID) + log.Info().Msg(fmt.Sprintf("CONSIDERS_EVENT (presigned) dispatched for key=%s", key)) + globalSourceCache.confirm(key, body.PresignedURL) + } else { + // Phases COMPUTE / STORAGE — simple signal. + key := considersKey(body.ExecutionsID, resp.Datatype, body.PeerID) + log.Info().Msg(fmt.Sprintf("CONSIDERS_EVENT dispatched for key=%s", key)) + globalConsidersCache.confirm(key) + } }, }) } diff --git a/workflow_builder/source_fetch.go b/workflow_builder/source_fetch.go new file mode 100644 index 0000000..a8ef68c --- /dev/null +++ b/workflow_builder/source_fetch.go @@ -0,0 +1,354 @@ +package workflow_builder + +// source_fetch.go — Phase 3 : gestion des sources tierces (isReachable = true) +// +// Pour chaque ressource (Processing ou Data) dont l'instance expose une source +// publique (access.Container == nil, access.Source != "", access.IsReachable), +// le builder injecte une step Argo de téléchargement (curl) AVANT la step qui +// consomme la ressource. +// +// Garde critique : si la step aval (processing) contient déjà un curl ciblant +// la même URL dans sa commande de container, on n'injecte PAS de step +// supplémentaire — ce serait un double téléchargement. + +import ( + "fmt" + "net/url" + "path" + "strings" + + . "oc-monitord/models" + + "cloud.o-forge.io/core/oc-lib/models/resources" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" +) + +// curlImage est l'image utilisée pour la step de téléchargement. +// alpine dispose de wget ; on installe curl à la volée, ou on utilise +// directement wget. Utiliser curlimages/curl évite l'installation. +const curlImage = "curlimages/curl:latest" + +// ── Garde ──────────────────────────────────────────────────────────────────── + +// sourceAlreadyFetchedByStep retourne true si le container du processing +// identifié par processingItemID contient déjà un appel curl/wget ciblant +// sourceURL dans sa commande ou ses arguments. +// +// Si c'est le cas on NE doit PAS injecter une step curl supplémentaire : +// le processing gère lui-même le téléchargement et injecter une step +// amont serait un double téléchargement. +func (b *ArgoBuilder) sourceAlreadyFetchedByStep( + exec *workflow_execution.WorkflowExecution, + processingItemID string, + sourceURL string, +) bool { + item, ok := b.OriginWorkflow.Graph.Items[processingItemID] + if !ok || item.ItemResource.Processing == nil { + return false + } + index := 0 + if d, ok := exec.SelectedInstances[item.ItemResource.Processing.GetID()]; ok { + index = d + } + inst := item.ItemResource.Processing.GetSelectedInstance(&index) + if inst == nil { + return false + } + procInst, ok := inst.(*resources.ProcessingInstance) + if !ok || procInst.Access == nil || procInst.Access.Container == nil { + // Pas de container → le step sera lui-même construit depuis la source, + // pas de double téléchargement possible. + return false + } + fullCmd := procInst.Access.Container.Command + " " + procInst.Access.Container.Args + hasFetch := strings.Contains(fullCmd, "curl") || strings.Contains(fullCmd, "wget") + hasURL := strings.Contains(fullCmd, sourceURL) + return hasFetch && hasURL +} + +// ── Injection de la step curl ───────────────────────────────────────────────── +func (b *ArgoBuilder) injectSourceFetchStep( + stepBaseName string, + sourceURL string, + destPath string, + isExecutable bool, + dependsOn []string, +) string { + curlStepName := stepBaseName + "-src-fetch" + + filename := sourceFilename(sourceURL) + fullDest := destPath + "/" + filename + + var script string + if isExecutable { + script = fmt.Sprintf( + "curl -fsSL '%s' -o '%s' && chmod +x '%s'", + sourceURL, fullDest, fullDest, + ) + } else { + script = fmt.Sprintf("curl -fsSL '%s' -o '%s'", sourceURL, fullDest) + } + + // Tâche dans le DAG. + fetchTask := Task{ + Name: curlStepName, + Template: curlStepName, + Dependencies: dependsOn, + } + b.Workflow.getDag().Tasks = append(b.Workflow.getDag().Tasks, fetchTask) + + // Template Argo correspondant. + fetchTemplate := Template{ + Name: curlStepName, + Container: Container{ + Image: curlImage, + ImagePullPolicy: "IfNotPresent", + Command: []string{"sh", "-c"}, + Args: []string{script}, + }, + } + b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, fetchTemplate) + + logger.Info().Msg(fmt.Sprintf( + "[source-fetch] injected curl step '%s' → %s → %s", + curlStepName, sourceURL, fullDest, + )) + return curlStepName +} + +// ── Traitement Processing source (isReachable = true) ──────────────────────── + +// handleProcessingSource gère le cas où un ProcessingInstance a une source +// publique (access.HasSource() && access.IsReachable) sans container associé. +// +// Elle injecte une step curl avant la step processing dans le DAG, puis +// modifie le template du processing pour exécuter le binaire téléchargé +// depuis le storage lié. +// +// Retourne une erreur si aucun storage n'est lié (prérequis obligatoire). +func (b *ArgoBuilder) handleProcessingSource( + exec *workflow_execution.WorkflowExecution, + graphID string, + procResource *resources.ProcessingResource, + procInst *resources.ProcessingInstance, + argoStepName string, + template *Template, +) error { + access := procInst.Access + if !access.HasSource() || !access.Source.IsReachable { + return nil + } + + // Récupérer le storage lié à ce processing. + related := b.OriginWorkflow.GetByRelatedProcessing(graphID, b.OriginWorkflow.Graph.IsStorage) + if len(related) == 0 { + return fmt.Errorf( + "processing '%s' has source '%s' but no storage linked — cannot inject fetch step", + procResource.GetName(), access.Source, + ) + } + + // On utilise le premier storage lié (cas nominal). + var mountPath string + for _, r := range related { + n := r.Node + storage := n.(*resources.StorageResource) + if len(storage.Instances) > 0 && storage.Instances[0].Source != "" { + mountPath = storage.Instances[0].Source + break + } + } + if mountPath == "" { + return fmt.Errorf( + "processing '%s': linked storage has no mount path configured", + procResource.GetName(), + ) + } + + // Dépendances courantes de la step processing (pour les câbler sur la step curl). + existingDeps := b.getArgoDependencies(exec, graphID) + + // Injection de la step curl. + fetchStepName := b.injectSourceFetchStep( + argoStepName, + access.Source.Source, + mountPath, + true, // binaire exécutable + existingDeps, + ) + + // La step processing dépend maintenant de la step curl. + // On met à jour la tâche DAG existante. + dag := b.Workflow.getDag() + for i, task := range dag.Tasks { + if task.Name == argoStepName { + dag.Tasks[i].Dependencies = []string{fetchStepName} + break + } + } + + // Le template processing doit exécuter le binaire téléchargé. + filename := sourceFilename(access.Source.Source) + binaryPath := mountPath + "/" + filename + template.Container = Container{ + Image: "alpine:latest", + ImagePullPolicy: "IfNotPresent", + Command: []string{"sh", "-c"}, + Args: []string{binaryPath}, + } + // Propagation des paramètres d'entrée/sortie du workflow. + for _, v := range procResource.GetEnv() { + template.Inputs.Parameters = AppendParamIfAbsent(template.Inputs.Parameters, Parameter{Name: v.Name}) + } + for _, v := range b.OriginWorkflow.Env[graphID] { + template.Inputs.Parameters = AppendParamIfAbsent(template.Inputs.Parameters, Parameter{Name: v.Name}) + } + + return nil +} + +// ── Traitement Data source (isReachable = true) ─────────────────────────────── + +// HandleDataSources parcourt tous les items Data du graphe dont une instance +// expose une source publique (access.HasSource() && access.IsReachable) et +// injecte pour chacun une step curl de téléchargement dans le storage lié. +// +// Les sources privées (isReachable=false) sont gérées par HandlePrivateDataSources +// (Phase 4), appelée en fin de cette fonction. +// +// Garde : si la step processing aval contient déjà un curl ciblant la même URL, +// on saute l'injection pour ce processing. +// +// Cette fonction est appelée depuis createTemplates() après la boucle principale. +func (b *ArgoBuilder) HandleDataSources(exec *workflow_execution.WorkflowExecution, namespace string) { + for itemID, item := range b.OriginWorkflow.Graph.Items { + if !b.OriginWorkflow.Graph.IsData(item) || item.ItemResource.Data == nil { + continue + } + + // Chercher une instance avec source PUBLIQUE (isReachable=true). + // Les sources privées sont traitées par HandlePrivateDataSources. + var sourceURL string + var mountPath string + + for _, inst := range item.ItemResource.Data.Instances { + if inst == nil || !inst.Access.HasSource() || !inst.Access.Source.IsReachable { + continue + } + sourceURL = inst.Access.Source.Source + break + } + if sourceURL == "" { + continue + } + + // Storage lié à cette Data (ValidateIntegrity garantit qu'il en existe un). + linkedStorageIDs := b.OriginWorkflow.Graph.GetLinkedStorageForData(itemID) + if len(linkedStorageIDs) == 0 { + logger.Error().Msg(fmt.Sprintf( + "[source-fetch] data '%s' has source but no storage linked — skipping", + item.ItemResource.Data.GetName(), + )) + continue + } + + storageItemID := linkedStorageIDs[0] + storageItem, ok := b.OriginWorkflow.Graph.Items[storageItemID] + if !ok || storageItem.ItemResource.Storage == nil || len(storageItem.ItemResource.Storage.Instances) == 0 { + continue + } + mountPath = storageItem.ItemResource.Storage.Instances[0].Source + if mountPath == "" { + logger.Error().Msg(fmt.Sprintf( + "[source-fetch] storage linked to data '%s' has no mount path — skipping", + item.ItemResource.Data.GetName(), + )) + continue + } + + // Trouver tous les processings qui lisent depuis ce storage. + downstreamProcIDs := b.processingsThatReadStorage(storageItemID) + + // Pour chaque processing aval, appliquer la garde puis injecter si nécessaire. + for _, procItemID := range downstreamProcIDs { + if b.sourceAlreadyFetchedByStep(exec, procItemID, sourceURL) { + logger.Info().Msg(fmt.Sprintf( + "[source-fetch] data '%s': downstream processing '%s' already curls source — skipping injection", + item.ItemResource.Data.GetName(), procItemID, + )) + continue + } + + procItem := b.OriginWorkflow.Graph.Items[procItemID] + if procItem.ItemResource.Processing == nil { + continue + } + + // Dépendances courantes de la step processing aval. + existingDeps := b.getArgoDependencies(exec, procItemID) + + // Nom de la step curl : basé sur le nom de la Data + storage. + fetchBaseName := strings.ToLower(strings.ReplaceAll(item.ItemResource.Data.GetName(), " ", "-")) + + "-" + strings.ToLower(strings.ReplaceAll(storageItem.ItemResource.Storage.GetName(), " ", "-")) + + fetchStepName := b.injectSourceFetchStep( + fetchBaseName, + sourceURL, + mountPath, + false, // donnée, pas un binaire + existingDeps, + ) + + // Ajouter la step curl comme dépendance de CHAQUE instance (peer) du processing aval. + dag := b.Workflow.getDag() + for _, pb := range getAllPeersForItem(exec, procItemID) { + procArgoName := getArgoName(procItem.ItemResource.Processing.GetName(), pb.BookingID) + for i, task := range dag.Tasks { + if task.Name == procArgoName { + // Remplacer les dépendances existantes par [fetchStepName]. + // Les anciennes dépendances sont déjà portées par la step curl. + dag.Tasks[i].Dependencies = []string{fetchStepName} + break + } + } + } + } + } + + // Phase 4 — sources privées (isReachable=false). + b.HandlePrivateDataSources(exec, namespace) +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +// sourceFilename extrait le nom de fichier depuis une URL source. +// Fallback : "source-binary" si l'URL n'a pas de chemin exploitable. +func sourceFilename(sourceURL string) string { + u, err := url.Parse(sourceURL) + if err == nil && u.Path != "" { + if base := path.Base(u.Path); base != "." && base != "/" { + return base + } + } + return "source-binary" +} + +// processingsThatReadStorage retourne les IDs des items Processing +// connectés (via un lien quelconque) au storage identifié par storageItemID. +func (b *ArgoBuilder) processingsThatReadStorage(storageItemID string) []string { + var result []string + for _, link := range b.OriginWorkflow.Graph.Links { + var otherID string + if link.Source.ID == storageItemID { + otherID = link.Destination.ID + } else if link.Destination.ID == storageItemID { + otherID = link.Source.ID + } else { + continue + } + if other, ok := b.OriginWorkflow.Graph.Items[otherID]; ok && b.OriginWorkflow.Graph.IsProcessing(other) { + result = append(result, otherID) + } + } + return result +} diff --git a/workflow_builder/source_private.go b/workflow_builder/source_private.go new file mode 100644 index 0000000..61dbf62 --- /dev/null +++ b/workflow_builder/source_private.go @@ -0,0 +1,459 @@ +package workflow_builder + +// source_private.go — Phase 4 : sources privées (isReachable = false) +// +// Pour les ressources (Processing ou Data) dont la source n'est pas +// directement accessible (access.IsReachable == false), le protocole est : +// +// 1. oc-monitord publie un ARGO_KUBE_EVENT(PROCESSING_RESOURCE) sur NATS +// avec les informations de couplage (vérification AE côté peer distant). +// +// 2. Le peer propriétaire valide l'AE, génère une URL pré-signée Minio +// éphémère et répond via CONSIDERS_EVENT avec presigned_url + resource_id. +// +// 3. oc-monitord crée un Secret Kubernetes éphémère contenant l'URL, +// labelisé oc-execution-id pour nettoyage post-exécution. +// +// 4. La step Argo injecte un wrapper sh qui : +// • Processing binary : lit l'URL → /dev/shm/.exec → chmod+x → fork → rm → wait +// • Data : lit l'URL → destPath/filename (pas de chmod/rm/exec) + +import ( + "encoding/json" + "fmt" + "strings" + "sync" + "time" + + "oc-monitord/conf" + . "oc-monitord/models" + "oc-monitord/tools" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/models/resources" + "cloud.o-forge.io/core/oc-lib/models/workflow_execution" + octools "cloud.o-forge.io/core/oc-lib/tools" +) + +// ── Demande NATS d'URL pré-signée ──────────────────────────────────────────── + +// waitForPresignedURL publie un ARGO_KUBE_EVENT(PROCESSING_RESOURCE) sur NATS +// pour demander une URL pré-signée au peer propriétaire de la source privée, +// puis attend la réponse via globalSourceCache. +// +// Résultats envoyés dans urlCh / errCh ; wg.Done() est toujours appelé. +func waitForPresignedURL( + executionsID string, + event ArgoKubeEvent, + wg *sync.WaitGroup, + urlCh chan<- string, + errCh chan<- error, +) { + defer wg.Done() + + b, err := json.Marshal(event) + if err != nil { + logger.Error().Msg("[source-private] cannot marshal ArgoKubeEvent: " + err.Error()) + urlCh <- "" + errCh <- err + return + } + octools.NewNATSCaller().SetNATSPub(octools.ARGO_KUBE_EVENT, octools.NATSResponse{ + FromApp: "oc-monitord", + Datatype: octools.PROCESSING_RESOURCE, + User: "root", + Method: int(octools.ARGO_KUBE_EVENT), + Payload: b, + }) + + key := sourceConsidersKey(executionsID, event.SourcePeerID, event.SourceResourceID) + ch, unregister := globalSourceCache.register(key) + defer unregister() + + select { + case url := <-ch: + logger.Info().Msg(fmt.Sprintf( + "[source-private] presigned URL received resource=%s exec=%s", + event.SourceResourceID, executionsID, + )) + urlCh <- url + errCh <- nil + case <-time.After(5 * time.Minute): + ferr := fmt.Errorf( + "timeout waiting for presigned URL resource=%s exec=%s", + event.SourceResourceID, executionsID, + ) + logger.Error().Msg(ferr.Error()) + urlCh <- "" + errCh <- ferr + } +} + +// ── Nommage du Secret ───────────────────────────────────────────────────────── + +// secretNameFor génère un nom de Secret K8s valide (63 chars max, alphanum+-) +// à partir d'un nom de step et d'un execution ID. +func secretNameFor(stepBaseName, executionID string) string { + base := strings.ToLower(strings.ReplaceAll(stepBaseName, "_", "-")) + if len(base) > 30 { + base = base[:30] + } + suffix := executionID + if len(suffix) > 8 { + suffix = suffix[:8] + } + name := "oc-src-" + base + "-" + suffix + + // Éliminer les caractères non autorisés par K8s (alphanum + '-'). + var clean strings.Builder + for _, c := range name { + if (c >= 'a' && c <= 'z') || (c >= '0' && c <= '9') || c == '-' { + clean.WriteRune(c) + } + } + s := strings.Trim(clean.String(), "-") + if len(s) > 63 { + s = s[:63] + } + return s +} + +// ── Injection step Processing (source privée) ───────────────────────────────── + +// injectPrivateProcessingStep ajoute dans le DAG et dans les templates Argo une +// step wrapper pour un Processing à source privée. +// +// Le script sh : +// 1. Lit l'URL pré-signée depuis le Secret monté à /var/oc-secrets/presigned-url +// 2. Télécharge le binaire dans /dev/shm/.exec (RAM — jamais sur disque) +// 3. Le rend exécutable, le lance en background, supprime le fichier, attend la fin +// +// Le Secret est déclaré dans spec.volumes (ExistingVolume) et monté en lecture +// seule dans le container. +// +// Retourne le nom Argo de la step créée. +func (b *ArgoBuilder) injectPrivateProcessingStep( + stepBaseName string, + secretName string, + dependsOn []string, +) string { + stepName := stepBaseName + "-prv-fetch" + volName := strings.ReplaceAll(secretName, ".", "-") + + script := `PRESIGNED=$(cat /var/oc-secrets/presigned-url) +curl -fsSL "$PRESIGNED" -o /dev/shm/.exec +chmod +x /dev/shm/.exec +/dev/shm/.exec & +PID=$! +rm -f /dev/shm/.exec +wait $PID` + + // Volume Secret dans le spec du workflow (partagé entre toutes les steps). + b.addSecretVolumeIfAbsent(volName, secretName) + + fetchTask := Task{ + Name: stepName, + Template: stepName, + Dependencies: dependsOn, + } + b.Workflow.getDag().Tasks = append(b.Workflow.getDag().Tasks, fetchTask) + + fetchTemplate := Template{ + Name: stepName, + Container: Container{ + Image: curlImage, + ImagePullPolicy: "IfNotPresent", + Command: []string{"sh", "-c"}, + Args: []string{script}, + VolumeMounts: []VolumeMount{ + {Name: volName, MountPath: "/var/oc-secrets", ReadOnly: true}, + }, + }, + } + b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, fetchTemplate) + + logger.Info().Msg(fmt.Sprintf( + "[source-private] injected private processing step '%s' (secret=%s)", + stepName, secretName, + )) + return stepName +} + +// ── Injection step Data (source privée) ────────────────────────────────────── + +// injectPrivateDataStep ajoute dans le DAG une step de téléchargement sécurisé +// pour une Data à source privée vers le storage lié (pas de /dev/shm, pas de rm). +func (b *ArgoBuilder) injectPrivateDataStep( + stepBaseName string, + secretName string, + destPath string, + filename string, + dependsOn []string, +) string { + stepName := stepBaseName + "-prv-fetch" + volName := strings.ReplaceAll(secretName, ".", "-") + fullDest := destPath + "/" + filename + + script := fmt.Sprintf( + "PRESIGNED=$(cat /var/oc-secrets/presigned-url)\ncurl -fsSL \"$PRESIGNED\" -o '%s'", + fullDest, + ) + + b.addSecretVolumeIfAbsent(volName, secretName) + + fetchTask := Task{ + Name: stepName, + Template: stepName, + Dependencies: dependsOn, + } + b.Workflow.getDag().Tasks = append(b.Workflow.getDag().Tasks, fetchTask) + + fetchTemplate := Template{ + Name: stepName, + Container: Container{ + Image: curlImage, + ImagePullPolicy: "IfNotPresent", + Command: []string{"sh", "-c"}, + Args: []string{script}, + VolumeMounts: []VolumeMount{ + {Name: volName, MountPath: "/var/oc-secrets", ReadOnly: true}, + }, + }, + } + b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, fetchTemplate) + + logger.Info().Msg(fmt.Sprintf( + "[source-private] injected private data step '%s' → %s (secret=%s)", + stepName, fullDest, secretName, + )) + return stepName +} + +// addSecretVolumeIfAbsent déclare un volume de type Secret dans spec.volumes +// uniquement s'il n'est pas déjà présent (déduplication par nom). +func (b *ArgoBuilder) addSecretVolumeIfAbsent(volName, secretName string) { + for _, v := range b.Workflow.Spec.ExistingVolumes { + if v.Name == volName { + return + } + } + b.Workflow.Spec.ExistingVolumes = append(b.Workflow.Spec.ExistingVolumes, ExistingVolume{ + Name: volName, + Secret: &SecretRef{SecretName: secretName}, + }) +} + +// ── handlePrivateProcessingSource ──────────────────────────────────────────── + +// handlePrivateProcessingSource gère le cas où un ProcessingInstance a une source +// privée (access.HasSource() && !access.IsReachable). +// +// Orchestration : +// 1. Demande de l'URL pré-signée via NATS (waitForPresignedURL) +// 2. Création du Secret K8s éphémère +// 3. Injection de la step wrapper dans le DAG +// 4. Recâblage des dépendances (processing dépend du step wrapper) +func (b *ArgoBuilder) handlePrivateProcessingSource( + exec *workflow_execution.WorkflowExecution, + graphID string, + procResource *resources.ProcessingResource, + procInst *resources.ProcessingInstance, + argoStepName string, + namespace string, +) error { + access := procInst.Access + if !access.HasSource() || access.Source.IsReachable { + return nil + } + + self, err := oclib.GetMySelf() + if err != nil { + return fmt.Errorf("[source-private] cannot get local peer ID: %w", err) + } + + // Demande de l'URL pré-signée au peer propriétaire. + var wg sync.WaitGroup + urlCh := make(chan string, 1) + errCh := make(chan error, 1) + wg.Add(1) + go waitForPresignedURL(exec.ExecutionsID, ArgoKubeEvent{ + ExecutionsID: exec.ExecutionsID, + Type: octools.PROCESSING_RESOURCE, + SourcePeerID: procResource.GetCreatorID(), + DestPeerID: self.GetID(), + OriginID: conf.GetConfig().PeerID, + SourceResourceID: procResource.GetID(), + }, &wg, urlCh, errCh) + wg.Wait() + close(urlCh) + close(errCh) + + presignedURL := <-urlCh + if ferr := <-errCh; ferr != nil || presignedURL == "" { + if ferr == nil { + ferr = fmt.Errorf("empty presigned URL for processing '%s'", procResource.GetName()) + } + return ferr + } + + // Création du Secret K8s contenant l'URL. + secretName := secretNameFor(argoStepName, exec.ExecutionsID) + kube, kerr := tools.NewKubernetesTool() + if kerr != nil { + return fmt.Errorf("[source-private] cannot create K8s client: %w", kerr) + } + if kerr = kube.CreateSourceSecret(secretName, presignedURL, exec.ExecutionsID, namespace); kerr != nil { + return fmt.Errorf("[source-private] CreateSourceSecret: %w", kerr) + } + + // Dépendances actuelles de la step processing (seront portées par le step wrapper). + existingDeps := b.getArgoDependencies(exec, graphID) + + // Injection de la step wrapper. + fetchStepName := b.injectPrivateProcessingStep(argoStepName, secretName, existingDeps) + + // Recâblage : la step processing ne dépend plus que du step wrapper. + dag := b.Workflow.getDag() + for i, task := range dag.Tasks { + if task.Name == argoStepName { + dag.Tasks[i].Dependencies = []string{fetchStepName} + break + } + } + + logger.Info().Msg(fmt.Sprintf( + "[source-private] processing '%s' wired: %v → %s → %s", + procResource.GetName(), existingDeps, fetchStepName, argoStepName, + )) + return nil +} + +// ── HandlePrivateDataSources ───────────────────────────────────────────────── + +// HandlePrivateDataSources parcourt tous les items Data dont une instance a +// une source privée (access.HasSource() && !access.IsReachable) et injecte +// pour chacun une step de téléchargement sécurisé dans le storage lié. +// +// Appelé depuis HandleDataSources() en fin de createTemplates(). +func (b *ArgoBuilder) HandlePrivateDataSources( + exec *workflow_execution.WorkflowExecution, + namespace string, +) { + self, err := oclib.GetMySelf() + if err != nil { + logger.Error().Msg("[source-private] cannot get local peer ID: " + err.Error()) + return + } + + for itemID, item := range b.OriginWorkflow.Graph.Items { + if !b.OriginWorkflow.Graph.IsData(item) || item.ItemResource.Data == nil { + continue + } + + var sourceURL string + var resourceID string + var creatorID string + + for _, inst := range item.ItemResource.Data.Instances { + if inst == nil || !inst.Access.HasSource() || inst.Access.Source.IsReachable { + continue + } + sourceURL = inst.Access.Source.Source + resourceID = item.ItemResource.Data.GetID() + creatorID = item.ItemResource.Data.GetCreatorID() + break + } + if sourceURL == "" { + continue + } + + // Storage lié à cette Data. + linkedStorageIDs := b.OriginWorkflow.Graph.GetLinkedStorageForData(itemID) + if len(linkedStorageIDs) == 0 { + logger.Error().Msg(fmt.Sprintf( + "[source-private] data '%s' has private source but no storage linked — skipping", + item.ItemResource.Data.GetName(), + )) + continue + } + storageItem, ok := b.OriginWorkflow.Graph.Items[linkedStorageIDs[0]] + if !ok || storageItem.ItemResource.Storage == nil || len(storageItem.ItemResource.Storage.Instances) == 0 { + continue + } + mountPath := storageItem.ItemResource.Storage.Instances[0].Source + if mountPath == "" { + logger.Error().Msg(fmt.Sprintf( + "[source-private] storage linked to data '%s' has no mount path — skipping", + item.ItemResource.Data.GetName(), + )) + continue + } + + // Demande de l'URL pré-signée. + var wg sync.WaitGroup + urlCh := make(chan string, 1) + errCh := make(chan error, 1) + wg.Add(1) + go waitForPresignedURL(exec.ExecutionsID, ArgoKubeEvent{ + ExecutionsID: exec.ExecutionsID, + Type: octools.PROCESSING_RESOURCE, + SourcePeerID: creatorID, + DestPeerID: self.GetID(), + OriginID: conf.GetConfig().PeerID, + SourceResourceID: resourceID, + }, &wg, urlCh, errCh) + wg.Wait() + close(urlCh) + close(errCh) + + presignedURL := <-urlCh + if ferr := <-errCh; ferr != nil || presignedURL == "" { + logger.Error().Msg(fmt.Sprintf( + "[source-private] data '%s': failed to get presigned URL: %v", + item.ItemResource.Data.GetName(), ferr, + )) + continue + } + + // Création du Secret K8s. + fetchBaseName := strings.ToLower(strings.ReplaceAll(item.ItemResource.Data.GetName(), " ", "-")) + secretName := secretNameFor(fetchBaseName, exec.ExecutionsID) + kube, kerr := tools.NewKubernetesTool() + if kerr != nil { + logger.Error().Msg("[source-private] cannot create K8s client: " + kerr.Error()) + continue + } + if kerr = kube.CreateSourceSecret(secretName, presignedURL, exec.ExecutionsID, namespace); kerr != nil { + logger.Error().Msg("[source-private] cannot create source secret: " + kerr.Error()) + continue + } + + // Injection pour chaque processing aval lisant ce storage. + downstreamProcIDs := b.processingsThatReadStorage(linkedStorageIDs[0]) + filename := sourceFilename(sourceURL) + + for _, procItemID := range downstreamProcIDs { + procItem := b.OriginWorkflow.Graph.Items[procItemID] + if procItem.ItemResource.Processing == nil { + continue + } + + existingDeps := b.getArgoDependencies(exec, procItemID) + fetchStepName := b.injectPrivateDataStep( + fetchBaseName, secretName, mountPath, filename, existingDeps, + ) + + // Recâblage des steps processing aval. + dag := b.Workflow.getDag() + for _, pb := range getAllPeersForItem(exec, procItemID) { + procArgoName := getArgoName(procItem.ItemResource.Processing.GetName(), pb.BookingID) + for i, task := range dag.Tasks { + if task.Name == procArgoName { + dag.Tasks[i].Dependencies = []string{fetchStepName} + break + } + } + } + } + } +}