diff --git a/conf/config.go b/conf/config.go index d16f9d2..1b6fa6a 100644 --- a/conf/config.go +++ b/conf/config.go @@ -17,6 +17,11 @@ type Config struct { MinioRootSecret string MonitorMode string MonitorAddress string + // SourceKeyStorePath est le chemin du fichier JSON persistant qui stocke + // la table privée opaque_key → { real_path, resource_id }. + // Ce fichier doit être sur un volume monté pour survivre aux redémarrages. + // Valeur par défaut : /data/source-keys.json + SourceKeyStorePath string } var instance *Config diff --git a/controllers/datacenter.go b/controllers/datacenter.go index dd9e5c1..d37210c 100644 --- a/controllers/datacenter.go +++ b/controllers/datacenter.go @@ -29,6 +29,9 @@ func resourceTypeEnum(t string, special bool) []oclib.LibDataEnum { if t == "storage" || t == "live" { e = append(e, oclib.LibDataEnum(oclib.LIVE_STORAGE)) } + if t == "service" || t == "live" { + e = append(e, oclib.LibDataEnum(oclib.LIVE_SERVICE)) + } if t == "datacenter" || t == "live" { e = append(e, oclib.LibDataEnum(oclib.LIVE_DATACENTER)) } @@ -92,6 +95,44 @@ func (o *DatacenterController) Search() { o.ServeJSON() } +// @Title GetAllCompatible +// @Description find booking by id +// @Param type path string true "the word type you want to get" +// @Param is_draft query string false "draft wished" +// @Param offset query string false +// @Param limit query string false +// @Success 200 {booking} models.booking +// @router /compatible/:type [put] +func (o *DatacenterController) GetAllCompatible() { + user, peerID, groups := oclib.ExtractTokenInfo(*o.Ctx.Request) + isDraft := o.Ctx.Input.Query("is_draft") + offset, _ := strconv.Atoi(o.Ctx.Input.Query("offset")) + limit, _ := strconv.Atoi(o.Ctx.Input.Query("limit")) + + var res map[string]interface{} + json.Unmarshal(o.Ctx.Input.CopyBody(100000), &res) + + m := map[string][]utils.ShallowDBObject{} + for _, col := range o.collection(false) { + if m[col.String()] == nil { + m[col.String()] = []utils.ShallowDBObject{} + } + s := oclib.NewRequest(oclib.LibDataEnum(col), user, peerID, groups, nil).LoadAll(isDraft == "true", int64(offset), int64(limit)) + for _, ss := range s.Data { + if ss.(live.LiveInterface).IsCompatible(res) { + m[col.String()] = append(m[col.String()], ss) + } + } + } + fmt.Println(m) + o.Data["json"] = map[string]interface{}{ + "data": m, + "code": 200, + "err": nil, + } + o.ServeJSON() +} + // @Title GetAll // @Description find booking by id // @Param type path string true "the word type you want to get" diff --git a/docker-compose.yml b/docker-compose.yml index 34fe11a..002de94 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,11 +5,18 @@ services: env_file: - path: ./env.env required: false - environment: + environment: - MONGO_DATABASE=DC_myDC + # Chemin du fichier JSON persistant contenant la table opaque_key → real_path. + # Doit pointer vers le volume monté ci-dessous pour survivre aux redémarrages. + - SOURCE_KEY_STORE_PATH=/data/source-keys.json image: 'oc-datacenter:latest' ports: - 8092:8080 + volumes: + # Table privée opaque_key → real_path (ne jamais effacer ce volume). + # Même exigence de conservation que la base de données. + - oc-source-keys:/data labels: - "traefik.enable=true" - "traefik.http.routers.datacenter.entrypoints=web" @@ -18,15 +25,21 @@ services: - "traefik.http.middlewares.datacenter-rewrite.replacepathregex.regex=^/datacenter(.*)" - "traefik.http.middlewares.datacenter-rewrite.replacepathregex.replacement=/oc$$1" - "traefik.http.routers.datacenter.middlewares=datacenter-rewrite,auth-datacenter" - + - "traefik.http.middlewares.auth-datacenter.forwardauth.address=http://oc-auth:8080/oc/forward" - "traefik.http.middlewares.auth-datacenter.forwardauth.trustForwardHeader=true" - "traefik.http.middlewares.auth-datacenter.forwardauth.authResponseHeaders=X-Auth-Request-User,X-Auth-Request-Email" container_name: oc-datacenter - networks: + networks: - oc -networks: +volumes: + # Volume créé et écrit par oc-catalog, monté ici en lecture. + # Démarrer oc-catalog avant oc-datacenter pour que le volume existe. + oc-source-keys: + external: true + +networks: oc: - external: true \ No newline at end of file + external: true diff --git a/go.mod b/go.mod index 928d97e..b8d1701 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( ) require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20260408134044-284533ad1d7b // indirect + cloud.o-forge.io/core/oc-lib v0.0.0-20260527135023-cef23b5f307b // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/biter777/countries v1.7.5 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect diff --git a/go.sum b/go.sum index cc86af7..d9c37b6 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,14 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20260407090927-6fe91eda875d h1:54Vl14gurwAkm cloud.o-forge.io/core/oc-lib v0.0.0-20260407090927-6fe91eda875d/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= cloud.o-forge.io/core/oc-lib v0.0.0-20260408134044-284533ad1d7b h1:mOU+tc87/KEQgFmw1RcQ9E9Rbz8Q2jLOh5Cpu6po9Ww= cloud.o-forge.io/core/oc-lib v0.0.0-20260408134044-284533ad1d7b/go.mod h1:+ENuvBfZdESSvecoqGY/wSvRlT3vinEolxKgwbOhUpA= +cloud.o-forge.io/core/oc-lib v0.0.0-20260423074839-e70e89b630b7 h1:WdExXiecLnST8a7gAh6Z9Xd1Q+0/EjTy1P+b9ABoga8= +cloud.o-forge.io/core/oc-lib v0.0.0-20260423074839-e70e89b630b7/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260428094823-f926a4206694 h1:HlAzTt6KTtpZIYp76JFZj/eKk1hd4KoNNl0ZvQL3rnM= +cloud.o-forge.io/core/oc-lib v0.0.0-20260428094823-f926a4206694/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260428100553-7a1250653175 h1:BxXCAEkizoHNi/NDHfPtAHcWu0lKRJNGTUabBpEpndI= +cloud.o-forge.io/core/oc-lib v0.0.0-20260428100553-7a1250653175/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260527135023-cef23b5f307b h1:TWhmHeurbBmdyevREh4+mHWOBehO2AK587RCIjCfvOc= +cloud.o-forge.io/core/oc-lib v0.0.0-20260527135023-cef23b5f307b/go.mod h1:JynnOb3eMr9VZW1mHq+Vsl3tzx6gPhPsGKpQD/dtEBc= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0= github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM= diff --git a/infrastructure/nats/nats.go b/infrastructure/nats/nats.go index 009b8c3..0cf4c4a 100644 --- a/infrastructure/nats/nats.go +++ b/infrastructure/nats/nats.go @@ -20,9 +20,10 @@ var roleWaiters sync.Map // ArgoKubeEvent carries the peer-routing metadata for a resource provisioning event. // -// When MinioID is non-empty and Local is false, the event concerns Minio credential provisioning. -// When Local is true, the event concerns local PVC provisioning. -// Otherwise it concerns Admiralty kubeconfig provisioning. +// Dispatch rules (evaluated in order): +// 1. Type == STORAGE_RESOURCE → Minio credentials or PVC +// 2. (Type == PROCESSING_RESOURCE || DATA_RESOURCE) && SourceResourceID != "" → Minio++ pre-signed URL (Phase 4) +// 3. everything else → Admiralty kubeconfig type ArgoKubeEvent struct { ExecutionsID string `json:"executions_id"` DestPeerID string `json:"dest_peer_id"` @@ -36,8 +37,13 @@ type ArgoKubeEvent struct { // response is routed back to this peer once provisioning completes. OriginID string `json:"origin_id,omitempty"` // Images is the list of container images to pre-pull on the target peer - // before the workflow starts. Empty for STORAGE_RESOURCE events. + // before the workflow starts. Empty for STORAGE_RESOURCE / PROCESSING_RESOURCE source events. Images []string `json:"images,omitempty"` + // SourceResourceID is non-empty only for Phase 4 (isReachable=false): + // it identifies the Processing or Data resource whose binary/data must be + // fetched via a pre-signed Minio URL. When set, the event is a Minio++ request, + // NOT an Admiralty compute event. + SourceResourceID string `json:"source_resource_id,omitempty"` } // ListenNATS starts all NATS subscriptions for the infrastructure layer. @@ -54,6 +60,9 @@ func ListenNATS() { } kube := kubernetes.NewKubernetesService(argo.ExecutionsID) + isSourcePresign := argo.SourceResourceID != "" && + (argo.Type == tools.PROCESSING_RESOURCE || argo.Type == tools.DATA_RESOURCE) + if argo.Type == tools.STORAGE_RESOURCE { if argo.Local { fmt.Println("DETECT LOCAL PVC ARGO_KUBE_EVENT") @@ -126,6 +135,40 @@ func ListenNATS() { } } } + } else if isSourcePresign { + fmt.Println("DETECT SOURCE PRESIGN ARGO_KUBE_EVENT", argo.Type, argo.SourceResourceID) + // ── Minio++ : génération d'URL pré-signée pour source privée ── + presigner := storage.NewSourcePresigner(argo.ExecutionsID, argo.SourceResourceID) + event := storage.SourcePresignEvent{ + ExecutionsID: argo.ExecutionsID, + SourceResourceID: argo.SourceResourceID, + SourcePeerID: argo.SourcePeerID, + DestPeerID: argo.DestPeerID, + OriginID: argo.OriginID, + DataType: argo.Type.EnumIndex(), + } + if argo.SourcePeerID == argo.DestPeerID { + // Même peer : génère directement l'URL et émet CONSIDERS_EVENT local. + go presigner.InitializeAsSource(context.Background(), event, true) + } else { + // Cross-peer : route via PROPALGATION_EVENT(PB_SOURCE_PRESIGN) + // vers le peer propriétaire de la ressource. + if b, err := json.Marshal(event); err == nil { + if b2, err := json.Marshal(&tools.PropalgationMessage{ + Payload: b, + Action: tools.PB_SOURCE_PRESIGN, + }); err == nil { + fmt.Println("ROUTE SOURCE PRESIGN TO", argo.SourcePeerID) + go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ + FromApp: "oc-datacenter", + Datatype: argo.Type, + User: resp.User, + Method: int(tools.PROPALGATION_EVENT), + Payload: b2, + }) + } + } + } } else { fmt.Println("DETECT COMPUTE ARGO_KUBE_EVENT") // ── Pre-pull + Admiralty kubeconfig provisioning ───────────── @@ -165,6 +208,20 @@ func ListenNATS() { } }, + // ─── SOURCE_PRESIGN_EVENT ──────────────────────────────────────────────────── + // Forwarded by oc-discovery after receiving PB_SOURCE_PRESIGN via libp2p + // ProtocolSourcePresignResource. This peer is the resource owner (Minio source). + // It generates a pre-signed URL and responds via PB_CONSIDERS → OriginID. + tools.SOURCE_PRESIGN_EVENT: func(resp tools.NATSResponse) { + event := storage.SourcePresignEvent{} + if err := json.Unmarshal(resp.Payload, &event); err != nil { + return + } + fmt.Println("SOURCE_PRESIGN_EVENT received resource=", event.SourceResourceID) + presigner := storage.NewSourcePresigner(event.ExecutionsID, event.SourceResourceID) + go presigner.InitializeAsSource(context.Background(), event, false) + }, + // ─── ADMIRALTY_CONFIG_EVENT ───────────────────────────────────────────────── // Forwarded by oc-discovery after receiving via libp2p ProtocolAdmiraltyConfigResource. // Payload is a KubeconfigEvent (phase discriminated by Kubeconfig presence). diff --git a/infrastructure/storage/source_key_store.go b/infrastructure/storage/source_key_store.go new file mode 100644 index 0000000..6b13910 --- /dev/null +++ b/infrastructure/storage/source_key_store.go @@ -0,0 +1,78 @@ +package storage + +// source_key_store.go — lecture de la table opaque_key → real_path (côté oc-datacenter) +// +// oc-catalog est le seul writer : il génère les clés et écrit le fichier JSON. +// oc-datacenter est read-only : il résout une clé opaque avant de générer +// une presigned URL pour s'assurer que la clé est bien connue du peer local +// (protection contre les clés forgées par un peer distant). +// +// Partage de volume : oc-catalog et oc-datacenter montent le même volume Docker +// sur /data. oc-catalog écrit /data/source-keys.json via write-then-rename +// (atomique sur Linux). oc-datacenter lit le fichier à chaque Resolve — pas de +// cache en mémoire, ce qui garantit de voir les clés écrites après le démarrage. +// Le fichier étant petit (centaines d'entrées), le coût I/O est négligeable. + +import ( + "encoding/json" + "fmt" + "os" + "sync" + + oclib "cloud.o-forge.io/core/oc-lib" +) + +// SourceKeyEntry est une entrée du store. +type SourceKeyEntry struct { + RealPath string `json:"real_path"` + ResourceID string `json:"resource_id"` + CreatedAt string `json:"created_at"` +} + +// SourceKeyStore est le lecteur de la table privée. +// Pas de cache — lit le fichier à chaque appel pour voir les nouvelles entrées +// écrites par oc-catalog sans nécessiter de coordination inter-process. +type SourceKeyStore struct { + path string +} + +var ( + globalSourceKeyStore *SourceKeyStore + sourceKeyStoreInitOnce sync.Once +) + +// InitSourceKeyStore initialise le store global avec le chemin du fichier partagé. +// Idempotent. +func InitSourceKeyStore(path string) { + sourceKeyStoreInitOnce.Do(func() { + globalSourceKeyStore = &SourceKeyStore{path: path} + log := oclib.GetLogger() + log.Info().Msgf("SourceKeyStore: watching %s (shared volume with oc-catalog)", path) + }) +} + +// GetSourceKeyStore retourne le store global. +func GetSourceKeyStore() *SourceKeyStore { + return globalSourceKeyStore +} + +// Resolve retourne l'entrée associée à opaqueKey, ou false si inconnue. +// Lit le fichier JSON à chaque appel pour voir les entrées écrites par oc-catalog. +func (s *SourceKeyStore) Resolve(opaqueKey string) (SourceKeyEntry, bool) { + data, err := os.ReadFile(s.path) + if err != nil { + if !os.IsNotExist(err) { + log := oclib.GetLogger() + log.Error().Msg(fmt.Sprintf("SourceKeyStore.Resolve: cannot read %s: %v", s.path, err)) + } + return SourceKeyEntry{}, false + } + var entries map[string]SourceKeyEntry + if err := json.Unmarshal(data, &entries); err != nil { + log := oclib.GetLogger() + log.Error().Msg(fmt.Sprintf("SourceKeyStore.Resolve: cannot parse %s: %v", s.path, err)) + return SourceKeyEntry{}, false + } + e, ok := entries[opaqueKey] + return e, ok +} diff --git a/infrastructure/storage/source_presign.go b/infrastructure/storage/source_presign.go new file mode 100644 index 0000000..0d8a51b --- /dev/null +++ b/infrastructure/storage/source_presign.go @@ -0,0 +1,358 @@ +package storage + +// source_presign.go — Minio++ : génération d'URL pré-signée pour source privée (Phase 4) +// +// Appelé depuis le handler ARGO_KUBE_EVENT quand : +// (Type == PROCESSING_RESOURCE || Type == DATA_RESOURCE) && SourceResourceID != "" +// +// Protocole : +// 1. Le peer local reçoit la demande. +// - Même peer que le propriétaire → génère l'URL directement. +// - Peer distant → route via PROPALGATION_EVENT(PB_SOURCE_PRESIGN) vers le peer source. +// 2. Le peer source génère une URL pré-signée Minio (TTL = 24h). +// 3. Il émet PB_CONSIDERS avec { origin_id, executions_id, peer_id, resource_id, presigned_url }. +// 4. oc-discovery route PB_CONSIDERS vers OriginID → CONSIDERS_EVENT local → oc-monitord. +// +// Convention bucket/key Minio : +// bucket : "oc-resources" +// key : SourceResourceID +// +// Le bucket "oc-resources" est permanent (créé à la publication de la ressource). +// Pas de service account éphémère, pas de bucket par exécution. + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "strings" + "time" + + "oc-datacenter/conf" + + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/models/live" + "cloud.o-forge.io/core/oc-lib/models/resources" + "cloud.o-forge.io/core/oc-lib/tools" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" +) + +const ( + // resourcesBucket est le bucket Minio permanent qui stocke les binaires/données + // des ressources à source privée. Il est créé lors de la publication de la ressource. + resourcesBucket = "oc-resources" + // presignTTL est la durée de validité de l'URL pré-signée. + // Suffisamment large pour couvrir l'exécution du workflow + buffer. + presignTTL = 24 * time.Hour +) + +// SourcePresignEvent est le payload NATS échangé entre peers pour une demande +// d'URL pré-signée (routé via PROPALGATION_EVENT action=PB_SOURCE_PRESIGN). +type SourcePresignEvent struct { + ExecutionsID string `json:"executions_id"` + SourceResourceID string `json:"source_resource_id"` + // SourcePeerID est le peer qui héberge la ressource (propriétaire Minio). + SourcePeerID string `json:"source_peer_id"` + // DestPeerID est le peer qui exécute le workflow (oc-monitord). + DestPeerID string `json:"dest_peer_id"` + // OriginID est le peer initiateur de la demande ; la réponse PB_CONSIDERS lui sera renvoyée. + OriginID string `json:"origin_id"` + // DataType encode PROCESSING_RESOURCE ou DATA_RESOURCE pour le logging. + DataType int `json:"data_type"` +} + +// sourceConsidersPayload est le payload PB_CONSIDERS renvoyé par le peer source. +// Le champ presigned_url est lu par oc-monitord (StartConsidersListener / globalSourceCache). +type sourceConsidersPayload struct { + OriginID string `json:"origin_id"` + ExecutionsID string `json:"executions_id"` + // PeerID est le SourcePeerID de l'événement original : + // utilisé pour construire sourceConsidersKey dans oc-monitord. + PeerID string `json:"peer_id"` + ResourceID string `json:"resource_id"` + PresignedURL string `json:"presigned_url,omitempty"` + Error *string `json:"error,omitempty"` +} + +// SourcePresigner porte le contexte d'une demande d'URL pré-signée. +type SourcePresigner struct { + ExecutionsID string + SourceResourceID string +} + +func NewSourcePresigner(executionsID, resourceID string) *SourcePresigner { + return &SourcePresigner{ExecutionsID: executionsID, SourceResourceID: resourceID} +} + +// ── Point d'entrée principal ────────────────────────────────────────────────── + +// InitializeAsSource est appelé sur le peer qui héberge la ressource (Minio source). +// +// Il valide la clé opaque contre le SourceKeyStore local, génère une URL +// pré-signée pour l'objet (resourcesBucket / SourceResourceID) puis émet +// PB_CONSIDERS vers OriginID. +// +// self=true → le peer local est aussi le peer origine (CONSIDERS_EVENT direct). +func (s *SourcePresigner) InitializeAsSource(ctx context.Context, event SourcePresignEvent, self bool) { + logger := oclib.GetLogger() + + // ── Validation de la clé opaque ────────────────────────────────────────── + // Si la clé est inconnue du store local, c'est soit une clé forgée par B, + // soit une clé non encore enregistrée. Dans les deux cas on refuse. + var resolvedEntry SourceKeyEntry + if store := GetSourceKeyStore(); store != nil { + entry, ok := store.Resolve(s.SourceResourceID) + if !ok { + err := fmt.Errorf("unknown opaque key %q — refusing presign request", s.SourceResourceID) + logger.Warn().Msg("SourcePresigner.InitializeAsSource: " + err.Error()) + s.emitConsiders(event, "", err, self) + return + } + resolvedEntry = entry + } + + // ── Vérification des AE (Autorisations d'Exploitation) ─────────────────── + // Charge la ressource et vérifie que le peer demandeur (DestPeerID) est + // autorisé par ses AEs. Seules les contraintes évaluables sans contexte + // de workflow sont vérifiées ici (peer, validité temporelle, révocation). + // Les contraintes de couplage sont vérifiées par oc-schedulerd. + // + // Dégradation gracieuse : si la ressource est introuvable en DB (ex. erreur + // transitoire), on ne bloque pas — oc-schedulerd reste la barrière souveraine. + if resolvedEntry.ResourceID != "" && event.DestPeerID != "" { + if violations := s.checkResourceAE(resolvedEntry.ResourceID, event.DataType, event.DestPeerID); len(violations) > 0 { + msgs := make([]string, 0, len(violations)) + for _, v := range violations { + msgs = append(msgs, string(v.Type)+": "+v.Message) + } + err := fmt.Errorf("AE violation for resource %s peer %s: %s", + resolvedEntry.ResourceID, event.DestPeerID, strings.Join(msgs, "; ")) + logger.Warn().Msg("SourcePresigner.InitializeAsSource: " + err.Error()) + // Signal de comportement frauduleux vers le peer demandeur. + resources.EmitAEBehaviorReport(event.DestPeerID, violations) + s.emitConsiders(event, "", err, self) + return + } + } + + minioURL, err := s.loadMinioURL(event.SourcePeerID) + if err != nil { + logger.Error().Msg("SourcePresigner.InitializeAsSource: " + err.Error()) + s.emitConsiders(event, "", err, self) + return + } + + presignedURL, err := s.generatePresignedURL(ctx, minioURL, s.SourceResourceID) + if err != nil { + logger.Error().Msg("SourcePresigner.InitializeAsSource: failed to generate presigned URL: " + err.Error()) + s.emitConsiders(event, "", err, self) + return + } + + logger.Info().Msg(fmt.Sprintf( + "SourcePresigner: presigned URL generated for resource=%s exec=%s", + s.SourceResourceID, s.ExecutionsID, + )) + s.emitConsiders(event, presignedURL, nil, self) +} + +// ── Génération URL pré-signée ───────────────────────────────────────────────── + +// generatePresignedURL ouvre un client Minio en lecture seule (root credentials) +// et génère une URL pré-signée GET valide pour presignTTL. +// +// Le bucket est "oc-resources" (permanent) ; la clé est resourceID. +func (s *SourcePresigner) generatePresignedURL(ctx context.Context, minioURL, resourceID string) (string, error) { + client, err := minio.New(minioURL, &minio.Options{ + Creds: credentials.NewStaticV4(conf.GetConfig().MinioRootKey, conf.GetConfig().MinioRootSecret, ""), + Secure: false, + }) + if err != nil { + return "", fmt.Errorf("generatePresignedURL: failed to create minio client: %w", err) + } + + reqParams := make(url.Values) + presignedURL, err := client.PresignedGetObject(ctx, resourcesBucket, resourceID, presignTTL, reqParams) + if err != nil { + return "", fmt.Errorf("generatePresignedURL: PresignedGetObject failed bucket=%s key=%s: %w", + resourcesBucket, resourceID, err) + } + + return presignedURL.String(), nil +} + +// ── Émission de la réponse ──────────────────────────────────────────────────── + +// emitConsiders publie la réponse PB_CONSIDERS contenant l'URL pré-signée +// (ou l'erreur) vers le peer OriginID. +// +// self=true → CONSIDERS_EVENT direct sur NATS local (même peer). +// self=false → PROPALGATION_EVENT(PB_CONSIDERS) → oc-discovery route vers OriginID. +func (s *SourcePresigner) emitConsiders(event SourcePresignEvent, presignedURL string, provErr error, self bool) { + var errStr *string + if provErr != nil { + e := provErr.Error() + errStr = &e + } + + payload, _ := json.Marshal(sourceConsidersPayload{ + OriginID: event.OriginID, + ExecutionsID: event.ExecutionsID, + PeerID: event.SourcePeerID, // clé de routage dans globalSourceCache + ResourceID: event.SourceResourceID, + PresignedURL: presignedURL, + Error: errStr, + }) + + if self { + tools.NewNATSCaller().SetNATSPub(tools.CONSIDERS_EVENT, tools.NATSResponse{ + FromApp: "oc-datacenter", + Datatype: tools.PROCESSING_RESOURCE, + Method: int(tools.CONSIDERS_EVENT), + Payload: payload, + }) + return + } + + // Cross-peer : PB_CONSIDERS → oc-discovery route vers OriginID via ProtocolConsidersResource. + b, _ := json.Marshal(&tools.PropalgationMessage{ + DataType: tools.PROCESSING_RESOURCE.EnumIndex(), + Action: tools.PB_CONSIDERS, + Payload: payload, + }) + tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ + FromApp: "oc-datacenter", + Datatype: -1, + Method: int(tools.PROPALGATION_EVENT), + Payload: b, + }) +} + +// ── Lookup URL Minio locale ─────────────────────────────────────────────────── + +// loadMinioURL retrouve l'URL du Minio hébergé par peerID en cherchant dans +// les live storages (même logique que MinioSetter.loadMinioURL). +// Pour les sources privées on utilise le premier live storage disponible +// (le Minio "resource store" du peer, pas un storage lié à un workflow). +func (s *SourcePresigner) loadMinioURL(peerID string) (string, error) { + res := oclib.NewRequest(oclib.LibDataEnum(oclib.LIVE_STORAGE), "", peerID, []string{}, nil).LoadAll(false, 0, 10000) + if res.Err != "" { + return "", fmt.Errorf("loadMinioURL: failed to load live storages for peer %s: %s", peerID, res.Err) + } + for _, dbo := range res.Data { + l, ok := dbo.(*live.LiveStorage) + if !ok || l.Source == "" { + continue + } + return l.Source, nil + } + return "", fmt.Errorf("loadMinioURL: no live storage found for peer %s", peerID) +} + +// ── Vérification AE au presign ──────────────────────────────────────────────── + +// checkResourceAE charge la ressource identifiée par resourceID depuis la DB +// locale et vérifie les AEs évaluables sans contexte de workflow : +// - Révocation +// - Validité temporelle (ValidFrom / ValidUntil) +// - Restriction de peer (AllowedPeerIDs) +// +// Les contraintes de couplage (RequiredResourceIDs / ForbiddenResourceIDs) et +// de workflow (AllowedWorkflowIDs) ne sont PAS vérifiées ici — oc-schedulerd +// les vérifie de façon souveraine avant tout lancement d'exécution. +// +// Retourne nil si la ressource est introuvable (dégradation gracieuse). +func (s *SourcePresigner) checkResourceAE(resourceID string, dataType int, consumerPeerID string) []resources.AEViolation { + res := oclib.NewRequestAdmin(oclib.LibDataEnum(dataType), nil).LoadOne(resourceID) + if res.Err != "" || res.Data == nil { + log := oclib.GetLogger() + log.Warn().Msgf("checkResourceAE: cannot load resource %s (type %d): %s — skipping AE check", + resourceID, dataType, res.Err) + return nil // dégradation gracieuse : oc-schedulerd reste la barrière + } + + type hasAE interface { + GetExploitationAuthorizations() []resources.ExploitationAuthorization + } + ra, ok := res.Data.(hasAE) + if !ok { + return nil + } + + now := time.Now().UTC() + var violations []resources.AEViolation + for _, ae := range ra.GetExploitationAuthorizations() { + vs := checkAEPresign(ae, resourceID, consumerPeerID, now) + violations = append(violations, vs...) + } + return violations +} + +// checkAEPresign évalue uniquement les contraintes d'une AE qui sont vérifiables +// sans le contexte complet du workflow (couplage, workflow ID). +func checkAEPresign(ae resources.ExploitationAuthorization, resourceID, consumerPeerID string, now time.Time) []resources.AEViolation { + var vs []resources.AEViolation + add := func(t resources.AEViolationType, msg string) { + vs = append(vs, resources.AEViolation{AEID: ae.ID, ResourceID: resourceID, Type: t, Message: msg}) + } + + if ae.IsRevoked { + add(resources.AEViolationRevoked, + fmt.Sprintf("AE %s for resource %s is revoked", ae.ID, resourceID)) + return vs // revoked → pas la peine de continuer + } + if ae.ValidUntil != nil && now.After(*ae.ValidUntil) { + add(resources.AEViolationExpired, + fmt.Sprintf("AE %s for resource %s expired at %s", ae.ID, resourceID, ae.ValidUntil.Format(time.RFC3339))) + return vs + } + if ae.ValidFrom != nil && now.Before(*ae.ValidFrom) { + add(resources.AEViolationNotYetValid, + fmt.Sprintf("AE %s for resource %s not valid until %s", ae.ID, resourceID, ae.ValidFrom.Format(time.RFC3339))) + return vs + } + // Restriction de peer : AllowedPeerIDs vide = tous les peers autorisés. + if consumerPeerID != "" && len(ae.AllowedPeerIDs) > 0 { + allowed := false + for _, id := range ae.AllowedPeerIDs { + if id == consumerPeerID { + allowed = true + break + } + } + if !allowed { + add(resources.AEViolationPeerNotAllowed, + fmt.Sprintf("peer %s not in allowed list for resource %s (AE %s)", + consumerPeerID, resourceID, ae.ID)) + } + } + return vs +} + +// ── Nettoyage post-exécution ────────────────────────────────────────────────── + +// CleanupSourceSecrets supprime tous les K8s Secrets créés par oc-monitord +// pour les sources privées de cette exécution (label oc-execution-id= +// et oc-secret-type=source-presigned). +// +// Appelé depuis TeardownForExecution() dans watchdog.go. +func CleanupSourceSecrets(ctx context.Context, executionsID string) { + logger := oclib.GetLogger() + + k, err := tools.NewKubernetesService( + conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort, + conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData, + ) + if err != nil { + logger.Error().Msg("CleanupSourceSecrets: failed to create k8s service: " + err.Error()) + return + } + + labelSelector := "oc-execution-id=" + executionsID + ",oc-secret-type=source-presigned" + if err := k.DeleteSecretsByLabel(ctx, executionsID, labelSelector); err != nil { + logger.Error().Msg("CleanupSourceSecrets: " + err.Error()) + return + } + logger.Info().Msg("CleanupSourceSecrets: source secrets removed for exec " + executionsID) +} diff --git a/infrastructure/watchdog.go b/infrastructure/watchdog.go index 479a723..ce94875 100644 --- a/infrastructure/watchdog.go +++ b/infrastructure/watchdog.go @@ -454,5 +454,7 @@ func TeardownForExecution(executionID string, executionsID string) { admiralty.NewAdmiraltySetter(executionsID).TeardownIfRemote(exec, selfPeerID) storage.NewMinioSetter(executionsID, "").TeardownForExecution(ctx, selfPeerID) storage.NewPVCSetter(executionsID, "").TeardownForExecution(ctx, selfPeerID) + // Supprime les Secrets K8s éphémères des sources privées (Phase 4). + storage.CleanupSourceSecrets(ctx, executionsID) kubernetes.NewKubernetesService(executionsID).CleanupImages(ctx) } diff --git a/main.go b/main.go index 02d6b61..194bfa6 100644 --- a/main.go +++ b/main.go @@ -4,9 +4,11 @@ import ( "oc-datacenter/conf" "oc-datacenter/infrastructure" "oc-datacenter/infrastructure/nats" + "oc-datacenter/infrastructure/storage" _ "oc-datacenter/routers" oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/config" beego "github.com/beego/beego/v2/server/web" ) @@ -27,13 +29,20 @@ func main() { conf.GetConfig().MonitorMode = o.GetStringDefault("MONITOR_MODE", "prometheus") conf.GetConfig().MinioRootKey = o.GetStringDefault("MINIO_ADMIN_ACCESS", "") conf.GetConfig().MinioRootSecret = o.GetStringDefault("MINIO_ADMIN_SECRET", "") + conf.GetConfig().SourceKeyStorePath = o.GetStringDefault("SOURCE_KEY_STORE_PATH", "/data/source-keys.json") oclib.InitAPI(appname) + // Initialise le lecteur de la table opaque_key → real_path. + // Le fichier est écrit par oc-catalog sur le volume partagé /data. + storage.InitSourceKeyStore(conf.GetConfig().SourceKeyStorePath) + infrastructure.BootstrapAllowedImages() go nats.ListenNATS() go infrastructure.WatchBookings() go infrastructure.Watch() - beego.Run() + if config.GetConfig().IsApi { + beego.Run() + } } diff --git a/oc-datacenter b/oc-datacenter deleted file mode 100755 index c8910c9..0000000 Binary files a/oc-datacenter and /dev/null differ diff --git a/swagger/swagger.json b/swagger/swagger.json index 9d0c3f2..3c03bd1 100644 --- a/swagger/swagger.json +++ b/swagger/swagger.json @@ -166,6 +166,47 @@ } } }, + "/compatible/{type}": { + "put": { + "tags": [ + "oc-datacenter/controllersDatacenterController" + ], + "description": "find booking by id\n\u003cbr\u003e", + "operationId": "DatacenterController.GetAllCompatible", + "parameters": [ + { + "in": "path", + "name": "type", + "description": "the word type you want to get", + "required": true, + "type": "string" + }, + { + "in": "query", + "name": "is_draft", + "description": "draft wished", + "type": "string" + }, + { + "in": "query", + "name": "offset", + "description": "false", + "type": "string" + }, + { + "in": "query", + "name": "limit", + "description": "false", + "type": "string" + } + ], + "responses": { + "200": { + "description": "{booking} models.booking" + } + } + } + }, "/logs/{id}": { "get": { "tags": [ diff --git a/swagger/swagger.yml b/swagger/swagger.yml index db5592a..922f5f7 100644 --- a/swagger/swagger.yml +++ b/swagger/swagger.yml @@ -259,6 +259,35 @@ paths: description: "" schema: $ref: '#/definitions/allowed_image.AllowedImage' + /compatible/{type}: + put: + tags: + - oc-datacenter/controllersDatacenterController + description: |- + find booking by id +
+ operationId: DatacenterController.GetAllCompatible + parameters: + - in: path + name: type + description: the word type you want to get + required: true + type: string + - in: query + name: is_draft + description: draft wished + type: string + - in: query + name: offset + description: "false" + type: string + - in: query + name: limit + description: "false" + type: string + responses: + "200": + description: '{booking} models.booking' /logs/{id}: get: tags: