355 lines
12 KiB
Go
355 lines
12 KiB
Go
package workflow_builder
|
|
|
|
// source_fetch.go — Phase 3 : gestion des sources tierces (isReachable = true)
|
|
//
|
|
// Pour chaque ressource (Processing ou Data) dont l'instance expose une source
|
|
// publique (access.Container == nil, access.Source != "", access.IsReachable),
|
|
// le builder injecte une step Argo de téléchargement (curl) AVANT la step qui
|
|
// consomme la ressource.
|
|
//
|
|
// Garde critique : si la step aval (processing) contient déjà un curl ciblant
|
|
// la même URL dans sa commande de container, on n'injecte PAS de step
|
|
// supplémentaire — ce serait un double téléchargement.
|
|
|
|
import (
|
|
"fmt"
|
|
"net/url"
|
|
"path"
|
|
"strings"
|
|
|
|
. "oc-monitord/models"
|
|
|
|
"cloud.o-forge.io/core/oc-lib/models/resources"
|
|
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
|
)
|
|
|
|
// curlImage est l'image utilisée pour la step de téléchargement.
|
|
// alpine dispose de wget ; on installe curl à la volée, ou on utilise
|
|
// directement wget. Utiliser curlimages/curl évite l'installation.
|
|
const curlImage = "curlimages/curl:latest"
|
|
|
|
// ── Garde ────────────────────────────────────────────────────────────────────
|
|
|
|
// sourceAlreadyFetchedByStep retourne true si le container du processing
|
|
// identifié par processingItemID contient déjà un appel curl/wget ciblant
|
|
// sourceURL dans sa commande ou ses arguments.
|
|
//
|
|
// Si c'est le cas on NE doit PAS injecter une step curl supplémentaire :
|
|
// le processing gère lui-même le téléchargement et injecter une step
|
|
// amont serait un double téléchargement.
|
|
func (b *ArgoBuilder) sourceAlreadyFetchedByStep(
|
|
exec *workflow_execution.WorkflowExecution,
|
|
processingItemID string,
|
|
sourceURL string,
|
|
) bool {
|
|
item, ok := b.OriginWorkflow.Graph.Items[processingItemID]
|
|
if !ok || item.ItemResource.Processing == nil {
|
|
return false
|
|
}
|
|
index := 0
|
|
if d, ok := exec.SelectedInstances[item.ItemResource.Processing.GetID()]; ok {
|
|
index = d
|
|
}
|
|
inst := item.ItemResource.Processing.GetSelectedInstance(&index)
|
|
if inst == nil {
|
|
return false
|
|
}
|
|
procInst, ok := inst.(*resources.ProcessingInstance)
|
|
if !ok || procInst.Access == nil || procInst.Access.Container == nil {
|
|
// Pas de container → le step sera lui-même construit depuis la source,
|
|
// pas de double téléchargement possible.
|
|
return false
|
|
}
|
|
fullCmd := procInst.Access.Container.Command + " " + procInst.Access.Container.Args
|
|
hasFetch := strings.Contains(fullCmd, "curl") || strings.Contains(fullCmd, "wget")
|
|
hasURL := strings.Contains(fullCmd, sourceURL)
|
|
return hasFetch && hasURL
|
|
}
|
|
|
|
// ── Injection de la step curl ─────────────────────────────────────────────────
|
|
func (b *ArgoBuilder) injectSourceFetchStep(
|
|
stepBaseName string,
|
|
sourceURL string,
|
|
destPath string,
|
|
isExecutable bool,
|
|
dependsOn []string,
|
|
) string {
|
|
curlStepName := stepBaseName + "-src-fetch"
|
|
|
|
filename := sourceFilename(sourceURL)
|
|
fullDest := destPath + "/" + filename
|
|
|
|
var script string
|
|
if isExecutable {
|
|
script = fmt.Sprintf(
|
|
"curl -fsSL '%s' -o '%s' && chmod +x '%s'",
|
|
sourceURL, fullDest, fullDest,
|
|
)
|
|
} else {
|
|
script = fmt.Sprintf("curl -fsSL '%s' -o '%s'", sourceURL, fullDest)
|
|
}
|
|
|
|
// Tâche dans le DAG.
|
|
fetchTask := Task{
|
|
Name: curlStepName,
|
|
Template: curlStepName,
|
|
Dependencies: dependsOn,
|
|
}
|
|
b.Workflow.getDag().Tasks = append(b.Workflow.getDag().Tasks, fetchTask)
|
|
|
|
// Template Argo correspondant.
|
|
fetchTemplate := Template{
|
|
Name: curlStepName,
|
|
Container: Container{
|
|
Image: curlImage,
|
|
ImagePullPolicy: "IfNotPresent",
|
|
Command: []string{"sh", "-c"},
|
|
Args: []string{script},
|
|
},
|
|
}
|
|
b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, fetchTemplate)
|
|
|
|
logger.Info().Msg(fmt.Sprintf(
|
|
"[source-fetch] injected curl step '%s' → %s → %s",
|
|
curlStepName, sourceURL, fullDest,
|
|
))
|
|
return curlStepName
|
|
}
|
|
|
|
// ── Traitement Processing source (isReachable = true) ────────────────────────
|
|
|
|
// handleProcessingSource gère le cas où un ProcessingInstance a une source
|
|
// publique (access.HasSource() && access.IsReachable) sans container associé.
|
|
//
|
|
// Elle injecte une step curl avant la step processing dans le DAG, puis
|
|
// modifie le template du processing pour exécuter le binaire téléchargé
|
|
// depuis le storage lié.
|
|
//
|
|
// Retourne une erreur si aucun storage n'est lié (prérequis obligatoire).
|
|
func (b *ArgoBuilder) handleProcessingSource(
|
|
exec *workflow_execution.WorkflowExecution,
|
|
graphID string,
|
|
procResource *resources.ProcessingResource,
|
|
procInst *resources.ProcessingInstance,
|
|
argoStepName string,
|
|
template *Template,
|
|
) error {
|
|
access := procInst.Access
|
|
if !access.HasSource() || !access.Source.IsReachable {
|
|
return nil
|
|
}
|
|
|
|
// Récupérer le storage lié à ce processing.
|
|
related := b.OriginWorkflow.GetByRelatedProcessing(graphID, b.OriginWorkflow.Graph.IsStorage)
|
|
if len(related) == 0 {
|
|
return fmt.Errorf(
|
|
"processing '%s' has source '%s' but no storage linked — cannot inject fetch step",
|
|
procResource.GetName(), access.Source,
|
|
)
|
|
}
|
|
|
|
// On utilise le premier storage lié (cas nominal).
|
|
var mountPath string
|
|
for _, r := range related {
|
|
n := r.Node
|
|
storage := n.(*resources.StorageResource)
|
|
if len(storage.Instances) > 0 && storage.Instances[0].Source != "" {
|
|
mountPath = storage.Instances[0].Source
|
|
break
|
|
}
|
|
}
|
|
if mountPath == "" {
|
|
return fmt.Errorf(
|
|
"processing '%s': linked storage has no mount path configured",
|
|
procResource.GetName(),
|
|
)
|
|
}
|
|
|
|
// Dépendances courantes de la step processing (pour les câbler sur la step curl).
|
|
existingDeps := b.getArgoDependencies(exec, graphID)
|
|
|
|
// Injection de la step curl.
|
|
fetchStepName := b.injectSourceFetchStep(
|
|
argoStepName,
|
|
access.Source.Source,
|
|
mountPath,
|
|
true, // binaire exécutable
|
|
existingDeps,
|
|
)
|
|
|
|
// La step processing dépend maintenant de la step curl.
|
|
// On met à jour la tâche DAG existante.
|
|
dag := b.Workflow.getDag()
|
|
for i, task := range dag.Tasks {
|
|
if task.Name == argoStepName {
|
|
dag.Tasks[i].Dependencies = []string{fetchStepName}
|
|
break
|
|
}
|
|
}
|
|
|
|
// Le template processing doit exécuter le binaire téléchargé.
|
|
filename := sourceFilename(access.Source.Source)
|
|
binaryPath := mountPath + "/" + filename
|
|
template.Container = Container{
|
|
Image: "alpine:latest",
|
|
ImagePullPolicy: "IfNotPresent",
|
|
Command: []string{"sh", "-c"},
|
|
Args: []string{binaryPath},
|
|
}
|
|
// Propagation des paramètres d'entrée/sortie du workflow.
|
|
for _, v := range procResource.GetEnv() {
|
|
template.Inputs.Parameters = AppendParamIfAbsent(template.Inputs.Parameters, Parameter{Name: v.Name})
|
|
}
|
|
for _, v := range b.OriginWorkflow.Env[graphID] {
|
|
template.Inputs.Parameters = AppendParamIfAbsent(template.Inputs.Parameters, Parameter{Name: v.Name})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ── Traitement Data source (isReachable = true) ───────────────────────────────
|
|
|
|
// HandleDataSources parcourt tous les items Data du graphe dont une instance
|
|
// expose une source publique (access.HasSource() && access.IsReachable) et
|
|
// injecte pour chacun une step curl de téléchargement dans le storage lié.
|
|
//
|
|
// Les sources privées (isReachable=false) sont gérées par HandlePrivateDataSources
|
|
// (Phase 4), appelée en fin de cette fonction.
|
|
//
|
|
// Garde : si la step processing aval contient déjà un curl ciblant la même URL,
|
|
// on saute l'injection pour ce processing.
|
|
//
|
|
// Cette fonction est appelée depuis createTemplates() après la boucle principale.
|
|
func (b *ArgoBuilder) HandleDataSources(exec *workflow_execution.WorkflowExecution, namespace string) {
|
|
for itemID, item := range b.OriginWorkflow.Graph.Items {
|
|
if !b.OriginWorkflow.Graph.IsData(item) || item.ItemResource.Data == nil {
|
|
continue
|
|
}
|
|
|
|
// Chercher une instance avec source PUBLIQUE (isReachable=true).
|
|
// Les sources privées sont traitées par HandlePrivateDataSources.
|
|
var sourceURL string
|
|
var mountPath string
|
|
|
|
for _, inst := range item.ItemResource.Data.Instances {
|
|
if inst == nil || !inst.Access.HasSource() || !inst.Access.Source.IsReachable {
|
|
continue
|
|
}
|
|
sourceURL = inst.Access.Source.Source
|
|
break
|
|
}
|
|
if sourceURL == "" {
|
|
continue
|
|
}
|
|
|
|
// Storage lié à cette Data (ValidateIntegrity garantit qu'il en existe un).
|
|
linkedStorageIDs := b.OriginWorkflow.Graph.GetLinkedStorageForData(itemID)
|
|
if len(linkedStorageIDs) == 0 {
|
|
logger.Error().Msg(fmt.Sprintf(
|
|
"[source-fetch] data '%s' has source but no storage linked — skipping",
|
|
item.ItemResource.Data.GetName(),
|
|
))
|
|
continue
|
|
}
|
|
|
|
storageItemID := linkedStorageIDs[0]
|
|
storageItem, ok := b.OriginWorkflow.Graph.Items[storageItemID]
|
|
if !ok || storageItem.ItemResource.Storage == nil || len(storageItem.ItemResource.Storage.Instances) == 0 {
|
|
continue
|
|
}
|
|
mountPath = storageItem.ItemResource.Storage.Instances[0].Source
|
|
if mountPath == "" {
|
|
logger.Error().Msg(fmt.Sprintf(
|
|
"[source-fetch] storage linked to data '%s' has no mount path — skipping",
|
|
item.ItemResource.Data.GetName(),
|
|
))
|
|
continue
|
|
}
|
|
|
|
// Trouver tous les processings qui lisent depuis ce storage.
|
|
downstreamProcIDs := b.processingsThatReadStorage(storageItemID)
|
|
|
|
// Pour chaque processing aval, appliquer la garde puis injecter si nécessaire.
|
|
for _, procItemID := range downstreamProcIDs {
|
|
if b.sourceAlreadyFetchedByStep(exec, procItemID, sourceURL) {
|
|
logger.Info().Msg(fmt.Sprintf(
|
|
"[source-fetch] data '%s': downstream processing '%s' already curls source — skipping injection",
|
|
item.ItemResource.Data.GetName(), procItemID,
|
|
))
|
|
continue
|
|
}
|
|
|
|
procItem := b.OriginWorkflow.Graph.Items[procItemID]
|
|
if procItem.ItemResource.Processing == nil {
|
|
continue
|
|
}
|
|
|
|
// Dépendances courantes de la step processing aval.
|
|
existingDeps := b.getArgoDependencies(exec, procItemID)
|
|
|
|
// Nom de la step curl : basé sur le nom de la Data + storage.
|
|
fetchBaseName := strings.ToLower(strings.ReplaceAll(item.ItemResource.Data.GetName(), " ", "-")) +
|
|
"-" + strings.ToLower(strings.ReplaceAll(storageItem.ItemResource.Storage.GetName(), " ", "-"))
|
|
|
|
fetchStepName := b.injectSourceFetchStep(
|
|
fetchBaseName,
|
|
sourceURL,
|
|
mountPath,
|
|
false, // donnée, pas un binaire
|
|
existingDeps,
|
|
)
|
|
|
|
// Ajouter la step curl comme dépendance de CHAQUE instance (peer) du processing aval.
|
|
dag := b.Workflow.getDag()
|
|
for _, pb := range getAllPeersForItem(exec, procItemID) {
|
|
procArgoName := getArgoName(procItem.ItemResource.Processing.GetName(), pb.BookingID)
|
|
for i, task := range dag.Tasks {
|
|
if task.Name == procArgoName {
|
|
// Remplacer les dépendances existantes par [fetchStepName].
|
|
// Les anciennes dépendances sont déjà portées par la step curl.
|
|
dag.Tasks[i].Dependencies = []string{fetchStepName}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Phase 4 — sources privées (isReachable=false).
|
|
b.HandlePrivateDataSources(exec, namespace)
|
|
}
|
|
|
|
// ── Helpers ───────────────────────────────────────────────────────────────────
|
|
|
|
// sourceFilename extrait le nom de fichier depuis une URL source.
|
|
// Fallback : "source-binary" si l'URL n'a pas de chemin exploitable.
|
|
func sourceFilename(sourceURL string) string {
|
|
u, err := url.Parse(sourceURL)
|
|
if err == nil && u.Path != "" {
|
|
if base := path.Base(u.Path); base != "." && base != "/" {
|
|
return base
|
|
}
|
|
}
|
|
return "source-binary"
|
|
}
|
|
|
|
// processingsThatReadStorage retourne les IDs des items Processing
|
|
// connectés (via un lien quelconque) au storage identifié par storageItemID.
|
|
func (b *ArgoBuilder) processingsThatReadStorage(storageItemID string) []string {
|
|
var result []string
|
|
for _, link := range b.OriginWorkflow.Graph.Links {
|
|
var otherID string
|
|
if link.Source.ID == storageItemID {
|
|
otherID = link.Destination.ID
|
|
} else if link.Destination.ID == storageItemID {
|
|
otherID = link.Source.ID
|
|
} else {
|
|
continue
|
|
}
|
|
if other, ok := b.OriginWorkflow.Graph.Items[otherID]; ok && b.OriginWorkflow.Graph.IsProcessing(other) {
|
|
result = append(result, otherID)
|
|
}
|
|
}
|
|
return result
|
|
}
|