Files
2026-05-27 16:09:45 +02:00

355 lines
12 KiB
Go

package workflow_builder
// source_fetch.go — Phase 3 : gestion des sources tierces (isReachable = true)
//
// Pour chaque ressource (Processing ou Data) dont l'instance expose une source
// publique (access.Container == nil, access.Source != "", access.IsReachable),
// le builder injecte une step Argo de téléchargement (curl) AVANT la step qui
// consomme la ressource.
//
// Garde critique : si la step aval (processing) contient déjà un curl ciblant
// la même URL dans sa commande de container, on n'injecte PAS de step
// supplémentaire — ce serait un double téléchargement.
import (
"fmt"
"net/url"
"path"
"strings"
. "oc-monitord/models"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
)
// curlImage est l'image utilisée pour la step de téléchargement.
// alpine dispose de wget ; on installe curl à la volée, ou on utilise
// directement wget. Utiliser curlimages/curl évite l'installation.
const curlImage = "curlimages/curl:latest"
// ── Garde ────────────────────────────────────────────────────────────────────
// sourceAlreadyFetchedByStep retourne true si le container du processing
// identifié par processingItemID contient déjà un appel curl/wget ciblant
// sourceURL dans sa commande ou ses arguments.
//
// Si c'est le cas on NE doit PAS injecter une step curl supplémentaire :
// le processing gère lui-même le téléchargement et injecter une step
// amont serait un double téléchargement.
func (b *ArgoBuilder) sourceAlreadyFetchedByStep(
exec *workflow_execution.WorkflowExecution,
processingItemID string,
sourceURL string,
) bool {
item, ok := b.OriginWorkflow.Graph.Items[processingItemID]
if !ok || item.ItemResource.Processing == nil {
return false
}
index := 0
if d, ok := exec.SelectedInstances[item.ItemResource.Processing.GetID()]; ok {
index = d
}
inst := item.ItemResource.Processing.GetSelectedInstance(&index)
if inst == nil {
return false
}
procInst, ok := inst.(*resources.ProcessingInstance)
if !ok || procInst.Access == nil || procInst.Access.Container == nil {
// Pas de container → le step sera lui-même construit depuis la source,
// pas de double téléchargement possible.
return false
}
fullCmd := procInst.Access.Container.Command + " " + procInst.Access.Container.Args
hasFetch := strings.Contains(fullCmd, "curl") || strings.Contains(fullCmd, "wget")
hasURL := strings.Contains(fullCmd, sourceURL)
return hasFetch && hasURL
}
// ── Injection de la step curl ─────────────────────────────────────────────────
func (b *ArgoBuilder) injectSourceFetchStep(
stepBaseName string,
sourceURL string,
destPath string,
isExecutable bool,
dependsOn []string,
) string {
curlStepName := stepBaseName + "-src-fetch"
filename := sourceFilename(sourceURL)
fullDest := destPath + "/" + filename
var script string
if isExecutable {
script = fmt.Sprintf(
"curl -fsSL '%s' -o '%s' && chmod +x '%s'",
sourceURL, fullDest, fullDest,
)
} else {
script = fmt.Sprintf("curl -fsSL '%s' -o '%s'", sourceURL, fullDest)
}
// Tâche dans le DAG.
fetchTask := Task{
Name: curlStepName,
Template: curlStepName,
Dependencies: dependsOn,
}
b.Workflow.getDag().Tasks = append(b.Workflow.getDag().Tasks, fetchTask)
// Template Argo correspondant.
fetchTemplate := Template{
Name: curlStepName,
Container: Container{
Image: curlImage,
ImagePullPolicy: "IfNotPresent",
Command: []string{"sh", "-c"},
Args: []string{script},
},
}
b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, fetchTemplate)
logger.Info().Msg(fmt.Sprintf(
"[source-fetch] injected curl step '%s' → %s → %s",
curlStepName, sourceURL, fullDest,
))
return curlStepName
}
// ── Traitement Processing source (isReachable = true) ────────────────────────
// handleProcessingSource gère le cas où un ProcessingInstance a une source
// publique (access.HasSource() && access.IsReachable) sans container associé.
//
// Elle injecte une step curl avant la step processing dans le DAG, puis
// modifie le template du processing pour exécuter le binaire téléchargé
// depuis le storage lié.
//
// Retourne une erreur si aucun storage n'est lié (prérequis obligatoire).
func (b *ArgoBuilder) handleProcessingSource(
exec *workflow_execution.WorkflowExecution,
graphID string,
procResource *resources.ProcessingResource,
procInst *resources.ProcessingInstance,
argoStepName string,
template *Template,
) error {
access := procInst.Access
if !access.HasSource() || !access.Source.IsReachable {
return nil
}
// Récupérer le storage lié à ce processing.
related := b.OriginWorkflow.GetByRelatedProcessing(graphID, b.OriginWorkflow.Graph.IsStorage)
if len(related) == 0 {
return fmt.Errorf(
"processing '%s' has source '%s' but no storage linked — cannot inject fetch step",
procResource.GetName(), access.Source,
)
}
// On utilise le premier storage lié (cas nominal).
var mountPath string
for _, r := range related {
n := r.Node
storage := n.(*resources.StorageResource)
if len(storage.Instances) > 0 && storage.Instances[0].Source != "" {
mountPath = storage.Instances[0].Source
break
}
}
if mountPath == "" {
return fmt.Errorf(
"processing '%s': linked storage has no mount path configured",
procResource.GetName(),
)
}
// Dépendances courantes de la step processing (pour les câbler sur la step curl).
existingDeps := b.getArgoDependencies(exec, graphID)
// Injection de la step curl.
fetchStepName := b.injectSourceFetchStep(
argoStepName,
access.Source.Source,
mountPath,
true, // binaire exécutable
existingDeps,
)
// La step processing dépend maintenant de la step curl.
// On met à jour la tâche DAG existante.
dag := b.Workflow.getDag()
for i, task := range dag.Tasks {
if task.Name == argoStepName {
dag.Tasks[i].Dependencies = []string{fetchStepName}
break
}
}
// Le template processing doit exécuter le binaire téléchargé.
filename := sourceFilename(access.Source.Source)
binaryPath := mountPath + "/" + filename
template.Container = Container{
Image: "alpine:latest",
ImagePullPolicy: "IfNotPresent",
Command: []string{"sh", "-c"},
Args: []string{binaryPath},
}
// Propagation des paramètres d'entrée/sortie du workflow.
for _, v := range procResource.GetEnv() {
template.Inputs.Parameters = AppendParamIfAbsent(template.Inputs.Parameters, Parameter{Name: v.Name})
}
for _, v := range b.OriginWorkflow.Env[graphID] {
template.Inputs.Parameters = AppendParamIfAbsent(template.Inputs.Parameters, Parameter{Name: v.Name})
}
return nil
}
// ── Traitement Data source (isReachable = true) ───────────────────────────────
// HandleDataSources parcourt tous les items Data du graphe dont une instance
// expose une source publique (access.HasSource() && access.IsReachable) et
// injecte pour chacun une step curl de téléchargement dans le storage lié.
//
// Les sources privées (isReachable=false) sont gérées par HandlePrivateDataSources
// (Phase 4), appelée en fin de cette fonction.
//
// Garde : si la step processing aval contient déjà un curl ciblant la même URL,
// on saute l'injection pour ce processing.
//
// Cette fonction est appelée depuis createTemplates() après la boucle principale.
func (b *ArgoBuilder) HandleDataSources(exec *workflow_execution.WorkflowExecution, namespace string) {
for itemID, item := range b.OriginWorkflow.Graph.Items {
if !b.OriginWorkflow.Graph.IsData(item) || item.ItemResource.Data == nil {
continue
}
// Chercher une instance avec source PUBLIQUE (isReachable=true).
// Les sources privées sont traitées par HandlePrivateDataSources.
var sourceURL string
var mountPath string
for _, inst := range item.ItemResource.Data.Instances {
if inst == nil || !inst.Access.HasSource() || !inst.Access.Source.IsReachable {
continue
}
sourceURL = inst.Access.Source.Source
break
}
if sourceURL == "" {
continue
}
// Storage lié à cette Data (ValidateIntegrity garantit qu'il en existe un).
linkedStorageIDs := b.OriginWorkflow.Graph.GetLinkedStorageForData(itemID)
if len(linkedStorageIDs) == 0 {
logger.Error().Msg(fmt.Sprintf(
"[source-fetch] data '%s' has source but no storage linked — skipping",
item.ItemResource.Data.GetName(),
))
continue
}
storageItemID := linkedStorageIDs[0]
storageItem, ok := b.OriginWorkflow.Graph.Items[storageItemID]
if !ok || storageItem.ItemResource.Storage == nil || len(storageItem.ItemResource.Storage.Instances) == 0 {
continue
}
mountPath = storageItem.ItemResource.Storage.Instances[0].Source
if mountPath == "" {
logger.Error().Msg(fmt.Sprintf(
"[source-fetch] storage linked to data '%s' has no mount path — skipping",
item.ItemResource.Data.GetName(),
))
continue
}
// Trouver tous les processings qui lisent depuis ce storage.
downstreamProcIDs := b.processingsThatReadStorage(storageItemID)
// Pour chaque processing aval, appliquer la garde puis injecter si nécessaire.
for _, procItemID := range downstreamProcIDs {
if b.sourceAlreadyFetchedByStep(exec, procItemID, sourceURL) {
logger.Info().Msg(fmt.Sprintf(
"[source-fetch] data '%s': downstream processing '%s' already curls source — skipping injection",
item.ItemResource.Data.GetName(), procItemID,
))
continue
}
procItem := b.OriginWorkflow.Graph.Items[procItemID]
if procItem.ItemResource.Processing == nil {
continue
}
// Dépendances courantes de la step processing aval.
existingDeps := b.getArgoDependencies(exec, procItemID)
// Nom de la step curl : basé sur le nom de la Data + storage.
fetchBaseName := strings.ToLower(strings.ReplaceAll(item.ItemResource.Data.GetName(), " ", "-")) +
"-" + strings.ToLower(strings.ReplaceAll(storageItem.ItemResource.Storage.GetName(), " ", "-"))
fetchStepName := b.injectSourceFetchStep(
fetchBaseName,
sourceURL,
mountPath,
false, // donnée, pas un binaire
existingDeps,
)
// Ajouter la step curl comme dépendance de CHAQUE instance (peer) du processing aval.
dag := b.Workflow.getDag()
for _, pb := range getAllPeersForItem(exec, procItemID) {
procArgoName := getArgoName(procItem.ItemResource.Processing.GetName(), pb.BookingID)
for i, task := range dag.Tasks {
if task.Name == procArgoName {
// Remplacer les dépendances existantes par [fetchStepName].
// Les anciennes dépendances sont déjà portées par la step curl.
dag.Tasks[i].Dependencies = []string{fetchStepName}
break
}
}
}
}
}
// Phase 4 — sources privées (isReachable=false).
b.HandlePrivateDataSources(exec, namespace)
}
// ── Helpers ───────────────────────────────────────────────────────────────────
// sourceFilename extrait le nom de fichier depuis une URL source.
// Fallback : "source-binary" si l'URL n'a pas de chemin exploitable.
func sourceFilename(sourceURL string) string {
u, err := url.Parse(sourceURL)
if err == nil && u.Path != "" {
if base := path.Base(u.Path); base != "." && base != "/" {
return base
}
}
return "source-binary"
}
// processingsThatReadStorage retourne les IDs des items Processing
// connectés (via un lien quelconque) au storage identifié par storageItemID.
func (b *ArgoBuilder) processingsThatReadStorage(storageItemID string) []string {
var result []string
for _, link := range b.OriginWorkflow.Graph.Links {
var otherID string
if link.Source.ID == storageItemID {
otherID = link.Destination.ID
} else if link.Destination.ID == storageItemID {
otherID = link.Source.ID
} else {
continue
}
if other, ok := b.OriginWorkflow.Graph.Items[otherID]; ok && b.OriginWorkflow.Graph.IsProcessing(other) {
result = append(result, otherID)
}
}
return result
}