359 lines
15 KiB
Go
359 lines
15 KiB
Go
|
|
package storage
|
||
|
|
|
||
|
|
// source_presign.go — Minio++ : génération d'URL pré-signée pour source privée (Phase 4)
|
||
|
|
//
|
||
|
|
// Appelé depuis le handler ARGO_KUBE_EVENT quand :
|
||
|
|
// (Type == PROCESSING_RESOURCE || Type == DATA_RESOURCE) && SourceResourceID != ""
|
||
|
|
//
|
||
|
|
// Protocole :
|
||
|
|
// 1. Le peer local reçoit la demande.
|
||
|
|
// - Même peer que le propriétaire → génère l'URL directement.
|
||
|
|
// - Peer distant → route via PROPALGATION_EVENT(PB_SOURCE_PRESIGN) vers le peer source.
|
||
|
|
// 2. Le peer source génère une URL pré-signée Minio (TTL = 24h).
|
||
|
|
// 3. Il émet PB_CONSIDERS avec { origin_id, executions_id, peer_id, resource_id, presigned_url }.
|
||
|
|
// 4. oc-discovery route PB_CONSIDERS vers OriginID → CONSIDERS_EVENT local → oc-monitord.
|
||
|
|
//
|
||
|
|
// Convention bucket/key Minio :
|
||
|
|
// bucket : "oc-resources"
|
||
|
|
// key : SourceResourceID
|
||
|
|
//
|
||
|
|
// Le bucket "oc-resources" est permanent (créé à la publication de la ressource).
|
||
|
|
// Pas de service account éphémère, pas de bucket par exécution.
|
||
|
|
|
||
|
|
import (
|
||
|
|
"context"
|
||
|
|
"encoding/json"
|
||
|
|
"fmt"
|
||
|
|
"net/url"
|
||
|
|
"strings"
|
||
|
|
"time"
|
||
|
|
|
||
|
|
"oc-datacenter/conf"
|
||
|
|
|
||
|
|
oclib "cloud.o-forge.io/core/oc-lib"
|
||
|
|
"cloud.o-forge.io/core/oc-lib/models/live"
|
||
|
|
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||
|
|
"cloud.o-forge.io/core/oc-lib/tools"
|
||
|
|
"github.com/minio/minio-go/v7"
|
||
|
|
"github.com/minio/minio-go/v7/pkg/credentials"
|
||
|
|
)
|
||
|
|
|
||
|
|
const (
|
||
|
|
// resourcesBucket est le bucket Minio permanent qui stocke les binaires/données
|
||
|
|
// des ressources à source privée. Il est créé lors de la publication de la ressource.
|
||
|
|
resourcesBucket = "oc-resources"
|
||
|
|
// presignTTL est la durée de validité de l'URL pré-signée.
|
||
|
|
// Suffisamment large pour couvrir l'exécution du workflow + buffer.
|
||
|
|
presignTTL = 24 * time.Hour
|
||
|
|
)
|
||
|
|
|
||
|
|
// SourcePresignEvent est le payload NATS échangé entre peers pour une demande
|
||
|
|
// d'URL pré-signée (routé via PROPALGATION_EVENT action=PB_SOURCE_PRESIGN).
|
||
|
|
type SourcePresignEvent struct {
|
||
|
|
ExecutionsID string `json:"executions_id"`
|
||
|
|
SourceResourceID string `json:"source_resource_id"`
|
||
|
|
// SourcePeerID est le peer qui héberge la ressource (propriétaire Minio).
|
||
|
|
SourcePeerID string `json:"source_peer_id"`
|
||
|
|
// DestPeerID est le peer qui exécute le workflow (oc-monitord).
|
||
|
|
DestPeerID string `json:"dest_peer_id"`
|
||
|
|
// OriginID est le peer initiateur de la demande ; la réponse PB_CONSIDERS lui sera renvoyée.
|
||
|
|
OriginID string `json:"origin_id"`
|
||
|
|
// DataType encode PROCESSING_RESOURCE ou DATA_RESOURCE pour le logging.
|
||
|
|
DataType int `json:"data_type"`
|
||
|
|
}
|
||
|
|
|
||
|
|
// sourceConsidersPayload est le payload PB_CONSIDERS renvoyé par le peer source.
|
||
|
|
// Le champ presigned_url est lu par oc-monitord (StartConsidersListener / globalSourceCache).
|
||
|
|
type sourceConsidersPayload struct {
|
||
|
|
OriginID string `json:"origin_id"`
|
||
|
|
ExecutionsID string `json:"executions_id"`
|
||
|
|
// PeerID est le SourcePeerID de l'événement original :
|
||
|
|
// utilisé pour construire sourceConsidersKey dans oc-monitord.
|
||
|
|
PeerID string `json:"peer_id"`
|
||
|
|
ResourceID string `json:"resource_id"`
|
||
|
|
PresignedURL string `json:"presigned_url,omitempty"`
|
||
|
|
Error *string `json:"error,omitempty"`
|
||
|
|
}
|
||
|
|
|
||
|
|
// SourcePresigner porte le contexte d'une demande d'URL pré-signée.
|
||
|
|
type SourcePresigner struct {
|
||
|
|
ExecutionsID string
|
||
|
|
SourceResourceID string
|
||
|
|
}
|
||
|
|
|
||
|
|
func NewSourcePresigner(executionsID, resourceID string) *SourcePresigner {
|
||
|
|
return &SourcePresigner{ExecutionsID: executionsID, SourceResourceID: resourceID}
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Point d'entrée principal ──────────────────────────────────────────────────
|
||
|
|
|
||
|
|
// InitializeAsSource est appelé sur le peer qui héberge la ressource (Minio source).
|
||
|
|
//
|
||
|
|
// Il valide la clé opaque contre le SourceKeyStore local, génère une URL
|
||
|
|
// pré-signée pour l'objet (resourcesBucket / SourceResourceID) puis émet
|
||
|
|
// PB_CONSIDERS vers OriginID.
|
||
|
|
//
|
||
|
|
// self=true → le peer local est aussi le peer origine (CONSIDERS_EVENT direct).
|
||
|
|
func (s *SourcePresigner) InitializeAsSource(ctx context.Context, event SourcePresignEvent, self bool) {
|
||
|
|
logger := oclib.GetLogger()
|
||
|
|
|
||
|
|
// ── Validation de la clé opaque ──────────────────────────────────────────
|
||
|
|
// Si la clé est inconnue du store local, c'est soit une clé forgée par B,
|
||
|
|
// soit une clé non encore enregistrée. Dans les deux cas on refuse.
|
||
|
|
var resolvedEntry SourceKeyEntry
|
||
|
|
if store := GetSourceKeyStore(); store != nil {
|
||
|
|
entry, ok := store.Resolve(s.SourceResourceID)
|
||
|
|
if !ok {
|
||
|
|
err := fmt.Errorf("unknown opaque key %q — refusing presign request", s.SourceResourceID)
|
||
|
|
logger.Warn().Msg("SourcePresigner.InitializeAsSource: " + err.Error())
|
||
|
|
s.emitConsiders(event, "", err, self)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
resolvedEntry = entry
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Vérification des AE (Autorisations d'Exploitation) ───────────────────
|
||
|
|
// Charge la ressource et vérifie que le peer demandeur (DestPeerID) est
|
||
|
|
// autorisé par ses AEs. Seules les contraintes évaluables sans contexte
|
||
|
|
// de workflow sont vérifiées ici (peer, validité temporelle, révocation).
|
||
|
|
// Les contraintes de couplage sont vérifiées par oc-schedulerd.
|
||
|
|
//
|
||
|
|
// Dégradation gracieuse : si la ressource est introuvable en DB (ex. erreur
|
||
|
|
// transitoire), on ne bloque pas — oc-schedulerd reste la barrière souveraine.
|
||
|
|
if resolvedEntry.ResourceID != "" && event.DestPeerID != "" {
|
||
|
|
if violations := s.checkResourceAE(resolvedEntry.ResourceID, event.DataType, event.DestPeerID); len(violations) > 0 {
|
||
|
|
msgs := make([]string, 0, len(violations))
|
||
|
|
for _, v := range violations {
|
||
|
|
msgs = append(msgs, string(v.Type)+": "+v.Message)
|
||
|
|
}
|
||
|
|
err := fmt.Errorf("AE violation for resource %s peer %s: %s",
|
||
|
|
resolvedEntry.ResourceID, event.DestPeerID, strings.Join(msgs, "; "))
|
||
|
|
logger.Warn().Msg("SourcePresigner.InitializeAsSource: " + err.Error())
|
||
|
|
// Signal de comportement frauduleux vers le peer demandeur.
|
||
|
|
resources.EmitAEBehaviorReport(event.DestPeerID, violations)
|
||
|
|
s.emitConsiders(event, "", err, self)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
minioURL, err := s.loadMinioURL(event.SourcePeerID)
|
||
|
|
if err != nil {
|
||
|
|
logger.Error().Msg("SourcePresigner.InitializeAsSource: " + err.Error())
|
||
|
|
s.emitConsiders(event, "", err, self)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
presignedURL, err := s.generatePresignedURL(ctx, minioURL, s.SourceResourceID)
|
||
|
|
if err != nil {
|
||
|
|
logger.Error().Msg("SourcePresigner.InitializeAsSource: failed to generate presigned URL: " + err.Error())
|
||
|
|
s.emitConsiders(event, "", err, self)
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
logger.Info().Msg(fmt.Sprintf(
|
||
|
|
"SourcePresigner: presigned URL generated for resource=%s exec=%s",
|
||
|
|
s.SourceResourceID, s.ExecutionsID,
|
||
|
|
))
|
||
|
|
s.emitConsiders(event, presignedURL, nil, self)
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Génération URL pré-signée ─────────────────────────────────────────────────
|
||
|
|
|
||
|
|
// generatePresignedURL ouvre un client Minio en lecture seule (root credentials)
|
||
|
|
// et génère une URL pré-signée GET valide pour presignTTL.
|
||
|
|
//
|
||
|
|
// Le bucket est "oc-resources" (permanent) ; la clé est resourceID.
|
||
|
|
func (s *SourcePresigner) generatePresignedURL(ctx context.Context, minioURL, resourceID string) (string, error) {
|
||
|
|
client, err := minio.New(minioURL, &minio.Options{
|
||
|
|
Creds: credentials.NewStaticV4(conf.GetConfig().MinioRootKey, conf.GetConfig().MinioRootSecret, ""),
|
||
|
|
Secure: false,
|
||
|
|
})
|
||
|
|
if err != nil {
|
||
|
|
return "", fmt.Errorf("generatePresignedURL: failed to create minio client: %w", err)
|
||
|
|
}
|
||
|
|
|
||
|
|
reqParams := make(url.Values)
|
||
|
|
presignedURL, err := client.PresignedGetObject(ctx, resourcesBucket, resourceID, presignTTL, reqParams)
|
||
|
|
if err != nil {
|
||
|
|
return "", fmt.Errorf("generatePresignedURL: PresignedGetObject failed bucket=%s key=%s: %w",
|
||
|
|
resourcesBucket, resourceID, err)
|
||
|
|
}
|
||
|
|
|
||
|
|
return presignedURL.String(), nil
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Émission de la réponse ────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
// emitConsiders publie la réponse PB_CONSIDERS contenant l'URL pré-signée
|
||
|
|
// (ou l'erreur) vers le peer OriginID.
|
||
|
|
//
|
||
|
|
// self=true → CONSIDERS_EVENT direct sur NATS local (même peer).
|
||
|
|
// self=false → PROPALGATION_EVENT(PB_CONSIDERS) → oc-discovery route vers OriginID.
|
||
|
|
func (s *SourcePresigner) emitConsiders(event SourcePresignEvent, presignedURL string, provErr error, self bool) {
|
||
|
|
var errStr *string
|
||
|
|
if provErr != nil {
|
||
|
|
e := provErr.Error()
|
||
|
|
errStr = &e
|
||
|
|
}
|
||
|
|
|
||
|
|
payload, _ := json.Marshal(sourceConsidersPayload{
|
||
|
|
OriginID: event.OriginID,
|
||
|
|
ExecutionsID: event.ExecutionsID,
|
||
|
|
PeerID: event.SourcePeerID, // clé de routage dans globalSourceCache
|
||
|
|
ResourceID: event.SourceResourceID,
|
||
|
|
PresignedURL: presignedURL,
|
||
|
|
Error: errStr,
|
||
|
|
})
|
||
|
|
|
||
|
|
if self {
|
||
|
|
tools.NewNATSCaller().SetNATSPub(tools.CONSIDERS_EVENT, tools.NATSResponse{
|
||
|
|
FromApp: "oc-datacenter",
|
||
|
|
Datatype: tools.PROCESSING_RESOURCE,
|
||
|
|
Method: int(tools.CONSIDERS_EVENT),
|
||
|
|
Payload: payload,
|
||
|
|
})
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
// Cross-peer : PB_CONSIDERS → oc-discovery route vers OriginID via ProtocolConsidersResource.
|
||
|
|
b, _ := json.Marshal(&tools.PropalgationMessage{
|
||
|
|
DataType: tools.PROCESSING_RESOURCE.EnumIndex(),
|
||
|
|
Action: tools.PB_CONSIDERS,
|
||
|
|
Payload: payload,
|
||
|
|
})
|
||
|
|
tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
|
||
|
|
FromApp: "oc-datacenter",
|
||
|
|
Datatype: -1,
|
||
|
|
Method: int(tools.PROPALGATION_EVENT),
|
||
|
|
Payload: b,
|
||
|
|
})
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Lookup URL Minio locale ───────────────────────────────────────────────────
|
||
|
|
|
||
|
|
// loadMinioURL retrouve l'URL du Minio hébergé par peerID en cherchant dans
|
||
|
|
// les live storages (même logique que MinioSetter.loadMinioURL).
|
||
|
|
// Pour les sources privées on utilise le premier live storage disponible
|
||
|
|
// (le Minio "resource store" du peer, pas un storage lié à un workflow).
|
||
|
|
func (s *SourcePresigner) loadMinioURL(peerID string) (string, error) {
|
||
|
|
res := oclib.NewRequest(oclib.LibDataEnum(oclib.LIVE_STORAGE), "", peerID, []string{}, nil).LoadAll(false, 0, 10000)
|
||
|
|
if res.Err != "" {
|
||
|
|
return "", fmt.Errorf("loadMinioURL: failed to load live storages for peer %s: %s", peerID, res.Err)
|
||
|
|
}
|
||
|
|
for _, dbo := range res.Data {
|
||
|
|
l, ok := dbo.(*live.LiveStorage)
|
||
|
|
if !ok || l.Source == "" {
|
||
|
|
continue
|
||
|
|
}
|
||
|
|
return l.Source, nil
|
||
|
|
}
|
||
|
|
return "", fmt.Errorf("loadMinioURL: no live storage found for peer %s", peerID)
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Vérification AE au presign ────────────────────────────────────────────────
|
||
|
|
|
||
|
|
// checkResourceAE charge la ressource identifiée par resourceID depuis la DB
|
||
|
|
// locale et vérifie les AEs évaluables sans contexte de workflow :
|
||
|
|
// - Révocation
|
||
|
|
// - Validité temporelle (ValidFrom / ValidUntil)
|
||
|
|
// - Restriction de peer (AllowedPeerIDs)
|
||
|
|
//
|
||
|
|
// Les contraintes de couplage (RequiredResourceIDs / ForbiddenResourceIDs) et
|
||
|
|
// de workflow (AllowedWorkflowIDs) ne sont PAS vérifiées ici — oc-schedulerd
|
||
|
|
// les vérifie de façon souveraine avant tout lancement d'exécution.
|
||
|
|
//
|
||
|
|
// Retourne nil si la ressource est introuvable (dégradation gracieuse).
|
||
|
|
func (s *SourcePresigner) checkResourceAE(resourceID string, dataType int, consumerPeerID string) []resources.AEViolation {
|
||
|
|
res := oclib.NewRequestAdmin(oclib.LibDataEnum(dataType), nil).LoadOne(resourceID)
|
||
|
|
if res.Err != "" || res.Data == nil {
|
||
|
|
log := oclib.GetLogger()
|
||
|
|
log.Warn().Msgf("checkResourceAE: cannot load resource %s (type %d): %s — skipping AE check",
|
||
|
|
resourceID, dataType, res.Err)
|
||
|
|
return nil // dégradation gracieuse : oc-schedulerd reste la barrière
|
||
|
|
}
|
||
|
|
|
||
|
|
type hasAE interface {
|
||
|
|
GetExploitationAuthorizations() []resources.ExploitationAuthorization
|
||
|
|
}
|
||
|
|
ra, ok := res.Data.(hasAE)
|
||
|
|
if !ok {
|
||
|
|
return nil
|
||
|
|
}
|
||
|
|
|
||
|
|
now := time.Now().UTC()
|
||
|
|
var violations []resources.AEViolation
|
||
|
|
for _, ae := range ra.GetExploitationAuthorizations() {
|
||
|
|
vs := checkAEPresign(ae, resourceID, consumerPeerID, now)
|
||
|
|
violations = append(violations, vs...)
|
||
|
|
}
|
||
|
|
return violations
|
||
|
|
}
|
||
|
|
|
||
|
|
// checkAEPresign évalue uniquement les contraintes d'une AE qui sont vérifiables
|
||
|
|
// sans le contexte complet du workflow (couplage, workflow ID).
|
||
|
|
func checkAEPresign(ae resources.ExploitationAuthorization, resourceID, consumerPeerID string, now time.Time) []resources.AEViolation {
|
||
|
|
var vs []resources.AEViolation
|
||
|
|
add := func(t resources.AEViolationType, msg string) {
|
||
|
|
vs = append(vs, resources.AEViolation{AEID: ae.ID, ResourceID: resourceID, Type: t, Message: msg})
|
||
|
|
}
|
||
|
|
|
||
|
|
if ae.IsRevoked {
|
||
|
|
add(resources.AEViolationRevoked,
|
||
|
|
fmt.Sprintf("AE %s for resource %s is revoked", ae.ID, resourceID))
|
||
|
|
return vs // revoked → pas la peine de continuer
|
||
|
|
}
|
||
|
|
if ae.ValidUntil != nil && now.After(*ae.ValidUntil) {
|
||
|
|
add(resources.AEViolationExpired,
|
||
|
|
fmt.Sprintf("AE %s for resource %s expired at %s", ae.ID, resourceID, ae.ValidUntil.Format(time.RFC3339)))
|
||
|
|
return vs
|
||
|
|
}
|
||
|
|
if ae.ValidFrom != nil && now.Before(*ae.ValidFrom) {
|
||
|
|
add(resources.AEViolationNotYetValid,
|
||
|
|
fmt.Sprintf("AE %s for resource %s not valid until %s", ae.ID, resourceID, ae.ValidFrom.Format(time.RFC3339)))
|
||
|
|
return vs
|
||
|
|
}
|
||
|
|
// Restriction de peer : AllowedPeerIDs vide = tous les peers autorisés.
|
||
|
|
if consumerPeerID != "" && len(ae.AllowedPeerIDs) > 0 {
|
||
|
|
allowed := false
|
||
|
|
for _, id := range ae.AllowedPeerIDs {
|
||
|
|
if id == consumerPeerID {
|
||
|
|
allowed = true
|
||
|
|
break
|
||
|
|
}
|
||
|
|
}
|
||
|
|
if !allowed {
|
||
|
|
add(resources.AEViolationPeerNotAllowed,
|
||
|
|
fmt.Sprintf("peer %s not in allowed list for resource %s (AE %s)",
|
||
|
|
consumerPeerID, resourceID, ae.ID))
|
||
|
|
}
|
||
|
|
}
|
||
|
|
return vs
|
||
|
|
}
|
||
|
|
|
||
|
|
// ── Nettoyage post-exécution ──────────────────────────────────────────────────
|
||
|
|
|
||
|
|
// CleanupSourceSecrets supprime tous les K8s Secrets créés par oc-monitord
|
||
|
|
// pour les sources privées de cette exécution (label oc-execution-id=<execID>
|
||
|
|
// et oc-secret-type=source-presigned).
|
||
|
|
//
|
||
|
|
// Appelé depuis TeardownForExecution() dans watchdog.go.
|
||
|
|
func CleanupSourceSecrets(ctx context.Context, executionsID string) {
|
||
|
|
logger := oclib.GetLogger()
|
||
|
|
|
||
|
|
k, err := tools.NewKubernetesService(
|
||
|
|
conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort,
|
||
|
|
conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData,
|
||
|
|
)
|
||
|
|
if err != nil {
|
||
|
|
logger.Error().Msg("CleanupSourceSecrets: failed to create k8s service: " + err.Error())
|
||
|
|
return
|
||
|
|
}
|
||
|
|
|
||
|
|
labelSelector := "oc-execution-id=" + executionsID + ",oc-secret-type=source-presigned"
|
||
|
|
if err := k.DeleteSecretsByLabel(ctx, executionsID, labelSelector); err != nil {
|
||
|
|
logger.Error().Msg("CleanupSourceSecrets: " + err.Error())
|
||
|
|
return
|
||
|
|
}
|
||
|
|
logger.Info().Msg("CleanupSourceSecrets: source secrets removed for exec " + executionsID)
|
||
|
|
}
|