// Package workflow_builder traduit les informations du graphe d'un Workflow // (ses composants, ses liens) en un fichier YAML Argo Workflow prêt à être // soumis à un cluster Kubernetes. Le point d'entrée principal est ArgoBuilder. package workflow_builder import ( "encoding/json" "fmt" "oc-monitord/conf" . "oc-monitord/models" "os" "strings" "time" oclib "cloud.o-forge.io/core/oc-lib" oclib_config "cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/resources/native_tools" w "cloud.o-forge.io/core/oc-lib/models/workflow" "cloud.o-forge.io/core/oc-lib/models/workflow/graph" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/tools" "github.com/nats-io/nats.go" "github.com/nwtgck/go-fakelish" "github.com/rs/zerolog" "gopkg.in/yaml.v3" ) // logger est le logger zerolog partagé au sein du package, initialisé à // chaque appel de CreateDAG pour récupérer la configuration courante. var logger zerolog.Logger // ArgoBuilder est le constructeur principal du fichier Argo Workflow. // Il porte l'état de la construction (workflow source, templates générés, // services k8s à créer, timeout global, liste des peers distants impliqués). type ArgoBuilder struct { // OriginWorkflow est le workflow métier Open Cloud dont on construit la représentation Argo. OriginWorkflow *w.Workflow // Workflow est la structure YAML Argo en cours de construction. Workflow Workflow // Services liste les services Kubernetes à exposer pour les processings "IsService". Services []*Service // Timeout est la durée maximale d'exécution en secondes (activeDeadlineSeconds). Timeout int // RemotePeers contient les IDs des peers distants détectés via Admiralty. RemotePeers []string } // Workflow est la structure racine du fichier YAML Argo Workflow. // Elle correspond exactement au format attendu par le contrôleur Argo. type Workflow struct { ApiVersion string `yaml:"apiVersion"` Kind string `yaml:"kind"` Metadata struct { Name string `yaml:"name"` } `yaml:"metadata"` Spec Spec `yaml:"spec,omitempty"` } // getDag retourne le pointeur sur le template "dag" du workflow. // S'il n'existe pas encore, il est créé et ajouté à la liste des templates. func (b *Workflow) getDag() *Dag { for _, t := range b.Spec.Templates { if t.Name == "dag" { return t.Dag } } b.Spec.Templates = append(b.Spec.Templates, Template{Name: "dag", Dag: &Dag{}}) return b.Spec.Templates[len(b.Spec.Templates)-1].Dag } // Spec contient la spécification complète du workflow Argo : // compte de service, point d'entrée, volumes, templates et timeout. type Spec struct { ArtifactRepositoryRef ServiceAccountName string `yaml:"serviceAccountName,omitempty"` Entrypoint string `yaml:"entrypoint"` Arguments []Parameter `yaml:"arguments,omitempty"` Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"` Templates []Template `yaml:"templates"` Timeout int `yaml:"activeDeadlineSeconds,omitempty"` } // CreateDAG est le point d'entrée de la construction du DAG Argo. // Il crée tous les templates (un par processing / native tool / sous-workflow), // configure les volumes persistants, positionne les métadonnées globales du // workflow et retourne : // - le nombre de tâches dans le DAG, // - les noms des premières tâches (sans dépendances), // - les noms des dernières tâches (dont personne ne dépend), // - une éventuelle erreur. // // Le paramètre write est conservé pour usage futur (écriture effective du YAML). // TODO: gérer S3, GCS, Azure selon le type de stockage lié au processing. func (b *ArgoBuilder) CreateDAG(exec *workflow_execution.WorkflowExecution, namespace string, write bool) (int, []string, []string, error) { logger = logs.GetLogger() logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items)) // Crée un template Argo pour chaque nœud du graphe et collecte les volumes. firstItems, lastItems, volumes := b.createTemplates(exec, namespace) b.createVolumes(exec, volumes) if b.Timeout > 0 { b.Workflow.Spec.Timeout = b.Timeout } b.Workflow.Spec.ServiceAccountName = "sa-" + namespace b.Workflow.Spec.Entrypoint = "dag" b.Workflow.ApiVersion = "argoproj.io/v1alpha1" b.Workflow.Kind = "Workflow" if !write { return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil } return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil } // createTemplates parcourt tous les nœuds du graphe (processings, native tools, // sous-workflows) et génère les templates Argo correspondants. // Elle gère également le recâblage des dépendances DAG entre sous-workflows // imbriqués, et l'ajout du pod de service si nécessaire. // Retourne les premières tâches, les dernières tâches et les volumes à créer. func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution, namespace string) ([]string, []string, []VolumeMount) { volumes := []VolumeMount{} firstItems := []string{} lastItems := []string{} // --- Processings --- for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) { index := 0 _, res := item.GetResource() if d, ok := exec.SelectedInstances[res.GetID()]; ok { index = d } instance := item.Processing.GetSelectedInstance(&index) logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance)) if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil { logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName()) return firstItems, lastItems, volumes } volumes, firstItems, lastItems = b.createArgoTemplates(exec, namespace, item.ID, item.Processing, volumes, firstItems, lastItems) } // --- Native Tools de type WORKFLOW_EVENT uniquement --- for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsNativeTool) { if item.NativeTool.Kind != int(native_tools.WORKFLOW_EVENT) { continue } index := 0 _, res := item.GetResource() if d, ok := exec.SelectedInstances[res.GetID()]; ok { index = d } instance := item.NativeTool.GetSelectedInstance(&index) logger.Info().Msg(fmt.Sprint("Creating template for", item.NativeTool.GetName(), instance)) volumes, firstItems, lastItems = b.createArgoTemplates(exec, namespace, item.ID, item.NativeTool, volumes, firstItems, lastItems) } // --- Sous-workflows : chargement, construction récursive et fusion du DAG --- firstWfTasks := map[string][]string{} latestWfTasks := map[string][]string{} relatedWfTasks := map[string][]string{} for _, wf := range b.OriginWorkflow.Workflows { realWorkflow, code, err := w.NewAccessor(nil).LoadOne(wf) if code != 200 { logger.Error().Msg("Error loading the workflow : " + err.Error()) continue } subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Timeout: b.Timeout} _, fi, li, err := subBuilder.CreateDAG(exec, namespace, false) if err != nil { logger.Error().Msg("Error creating the subworkflow : " + err.Error()) continue } firstWfTasks[wf] = fi if ok, depsOfIds := subBuilder.isArgoDependancy(wf); ok { // le sous-workflow est une dépendance d'autre chose latestWfTasks[wf] = li relatedWfTasks[wf] = depsOfIds } // Fusion des tâches, templates, volumes et arguments du sous-workflow dans le DAG principal. subDag := subBuilder.Workflow.getDag() d := b.Workflow.getDag() d.Tasks = append(d.Tasks, subDag.Tasks...) b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, subBuilder.Workflow.Spec.Templates...) b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, subBuilder.Workflow.Spec.Volumes...) b.Workflow.Spec.Arguments = append(b.Workflow.Spec.Arguments, subBuilder.Workflow.Spec.Arguments...) b.Services = append(b.Services, subBuilder.Services...) } // Recâblage : les tâches qui dépendaient du sous-workflow dépendent désormais // de sa dernière tâche réelle (latestWfTasks). for wfID, depsOfIds := range relatedWfTasks { for _, dep := range depsOfIds { for _, task := range b.Workflow.getDag().Tasks { if strings.Contains(task.Name, dep) { index := -1 for i, depp := range task.Dependencies { if strings.Contains(depp, wfID) { index = i break } } if index != -1 { task.Dependencies = append(task.Dependencies[:index], task.Dependencies[index+1:]...) } task.Dependencies = append(task.Dependencies, latestWfTasks[wfID]...) } } } } // Les premières tâches du sous-workflow héritent des dépendances // que le sous-workflow avait vis-à-vis du DAG principal. for wfID, fi := range firstWfTasks { deps := b.getArgoDependencies(wfID) if len(deps) > 0 { for _, dep := range fi { for _, task := range b.Workflow.getDag().Tasks { if strings.Contains(task.Name, dep) { task.Dependencies = append(task.Dependencies, deps...) } } } } } // Si des services Kubernetes sont nécessaires, on ajoute le pod dédié. if b.Services != nil { dag := b.Workflow.getDag() dag.Tasks = append(dag.Tasks, Task{Name: "workflow-service-pod", Template: "workflow-service-pod"}) b.addServiceToArgo() } return firstItems, lastItems, volumes } // createArgoTemplates crée le template Argo pour un nœud du graphe (processing // ou native tool). Il : // 1. Ajoute la tâche au DAG avec ses dépendances. // 2. Crée le template de container (ou d'événement pour les native tools). // 3. Ajoute les annotations Admiralty si le processing est hébergé sur un peer distant. // 4. Crée un service Kubernetes si le processing est déclaré IsService. // 5. Configure les annotations de stockage (S3, volumes locaux). func (b *ArgoBuilder) createArgoTemplates( exec *workflow_execution.WorkflowExecution, namespace string, id string, obj resources.ResourceInterface, volumes []VolumeMount, firstItems []string, lastItems []string) ([]VolumeMount, []string, []string) { _, firstItems, lastItems = b.addTaskToArgo(exec, b.Workflow.getDag(), id, obj, firstItems, lastItems) template := &Template{Name: getArgoName(obj.GetName(), id)} logger.Info().Msg(fmt.Sprint("Creating template for", template.Name)) // Vérifie si le processing est sur un peer distant (Admiralty). isReparted, peer := b.isReparted(obj, id) if obj.GetType() == tools.PROCESSING_RESOURCE.String() { template.CreateContainer(exec, obj.(*resources.ProcessingResource), b.Workflow.getDag()) } else if obj.GetType() == tools.NATIVE_TOOL.String() { template.CreateEventContainer(exec, obj.(*resources.NativeTool), b.Workflow.getDag()) } if isReparted { logger.Debug().Msg("Reparted processing, on " + peer.GetID()) b.RemotePeers = append(b.RemotePeers, peer.GetID()) template.AddAdmiraltyAnnotations(peer.GetID()) } // Si le processing expose un service Kubernetes, on l'enregistre et on // applique le label "app" pour que le Service puisse le sélectionner. if obj.GetType() == tools.PROCESSING_RESOURCE.String() && obj.(*resources.ProcessingResource).IsService { b.CreateService(exec, id, obj) template.Metadata.Labels = make(map[string]string) template.Metadata.Labels["app"] = "oc-service-" + obj.GetName() } volumes = b.addStorageAnnotations(exec, id, template, namespace, volumes) b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, *template) return volumes, firstItems, lastItems } // addStorageAnnotations parcourt tous les nœuds de stockage liés au processing // identifié par id. Pour chaque lien de stockage : // - Construit le nom de l'artefact Argo (lecture ou écriture). // - Pour les stockages S3 : appelle waitForConsiders (STORAGE_RESOURCE) pour // attendre la validation PB_CONSIDERS avant de configurer les annotations S3. // - Pour les volumes locaux : ajoute un VolumeMount dans le container. func (b *ArgoBuilder) addStorageAnnotations(exec *workflow_execution.WorkflowExecution, id string, template *Template, namespace string, volumes []VolumeMount) []VolumeMount { // Récupère tous les nœuds de stockage connectés au processing courant. related := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.Graph.IsStorage) for _, r := range related { storage := r.Node.(*resources.StorageResource) for _, linkToStorage := range r.Links { for _, rw := range linkToStorage.StorageLinkInfos { var art Artifact // Le nom de l'artefact doit être alphanumérique + '-' ou '_'. artifactBaseName := strings.Join(strings.Split(storage.GetName(), " "), "-") + "-" + strings.Replace(rw.FileName, ".", "-", -1) if rw.Write { // Écriture vers S3 : Path = chemin du fichier dans le pod. art = Artifact{Path: template.ReplacePerEnv(rw.Source, linkToStorage.Env)} art.Name = artifactBaseName + "-input-write" } else { // Lecture depuis S3 : Path = destination dans le pod. art = Artifact{Path: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env)} art.Name = artifactBaseName + "-input-read" } if storage.StorageType == enum.S3 { // Pour chaque ressource de compute liée à ce stockage S3, // on notifie via NATS et on attend la validation PB_CONSIDERS // avec DataType = STORAGE_RESOURCE avant de continuer. for _, r := range b.getStorageRelatedProcessing(storage.GetID()) { waitForConsiders(exec.ExecutionsID, tools.STORAGE_RESOURCE, ArgoKubeEvent{ ExecutionsID: exec.ExecutionsID, DestPeerID: r.GetID(), Type: tools.STORAGE_RESOURCE, SourcePeerID: storage.GetCreatorID(), OriginID: conf.GetConfig().PeerID, }) } // Configure la référence au dépôt d'artefacts S3 dans le Spec. b.addS3annotations(storage, namespace) } if rw.Write { template.Outputs.Artifacts = append(template.Outputs.Artifacts, art) } else { template.Inputs.Artifacts = append(template.Inputs.Artifacts, art) } } } // Si l'instance de stockage est locale, on monte un volume persistant. index := 0 if s, ok := exec.SelectedInstances[storage.GetID()]; ok { index = s } s := storage.Instances[index] if s.Local { volumes = template.Container.AddVolumeMount(VolumeMount{ Name: strings.ReplaceAll(strings.ToLower(storage.GetName()), " ", "-"), MountPath: s.Source, Storage: storage, }, volumes) } } return volumes } // getStorageRelatedProcessing retourne la liste des ressources de compute // connectées (via un processing intermédiaire) au stockage identifié par storageId. // Ces ressources sont utilisées pour construire les ArgoKubeEvent destinés // à la validation NATS. func (b *ArgoBuilder) getStorageRelatedProcessing(storageId string) (res []resources.ResourceInterface) { var storageLinks []graph.GraphLink // On ne conserve que les liens impliquant ce stockage. for _, link := range b.OriginWorkflow.Graph.Links { if link.Destination.ID == storageId || link.Source.ID == storageId { storageLinks = append(storageLinks, link) } } for _, link := range storageLinks { var resourceId string // L'opposé du lien est soit la source soit la destination selon la direction. if link.Source.ID != storageId { resourceId = link.Source.ID } else { resourceId = link.Destination.ID } // Si l'opposé est un processing, on récupère ses ressources de compute. if b.OriginWorkflow.Graph.IsProcessing(b.OriginWorkflow.Graph.Items[resourceId]) { res = append(res, b.getComputeProcessing(resourceId)...) } } return } // getComputeProcessing retourne toutes les ressources de compute attachées // au processing identifié par processingId dans le graphe du workflow. func (b *ArgoBuilder) getComputeProcessing(processingId string) (res []resources.ResourceInterface) { arr := []resources.ResourceInterface{} computeRel := b.OriginWorkflow.GetByRelatedProcessing(processingId, b.OriginWorkflow.Graph.IsCompute) for _, rel := range computeRel { arr = append(arr, rel.Node) } return arr } // addS3annotations configure la référence au dépôt d'artefacts S3 dans le Spec // du workflow Argo. La ConfigMap et la clé sont dérivées de l'ID du stockage. // Le namespace est conservé en signature pour une évolution future. func (b *ArgoBuilder) addS3annotations(storage *resources.StorageResource, namespace string) { b.Workflow.Spec.ArtifactRepositoryRef = ArtifactRepositoryRef{ ConfigMap: storage.GetID() + "-artifact-repository", Key: storage.GetID() + "-s3-local", } } // addTaskToArgo ajoute une tâche au DAG Argo pour le nœud graphItemID. // Elle résout les dépendances DAG, propage les paramètres d'environnement, // d'entrée et de sortie de l'instance sélectionnée, et met à jour les listes // firstItems / lastItems utilisées pour le recâblage des sous-workflows. func (b *ArgoBuilder) addTaskToArgo(exec *workflow_execution.WorkflowExecution, dag *Dag, graphItemID string, processing resources.ResourceInterface, firstItems []string, lastItems []string) (*Dag, []string, []string) { unique_name := getArgoName(processing.GetName(), graphItemID) step := Task{Name: unique_name, Template: unique_name} index := 0 if d, ok := exec.SelectedInstances[processing.GetID()]; ok { index = d } instance := processing.GetSelectedInstance(&index) if instance != nil { // Propagation des variables d'environnement, entrées et sorties // de l'instance vers les paramètres de la tâche Argo. for _, value := range instance.(*resources.ProcessingInstance).Env { step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{ Name: value.Name, Value: value.Value, }) } for _, value := range instance.(*resources.ProcessingInstance).Inputs { step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{ Name: value.Name, Value: value.Value, }) } for _, value := range instance.(*resources.ProcessingInstance).Outputs { step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{ Name: value.Name, Value: value.Value, }) } } step.Dependencies = b.getArgoDependencies(graphItemID) // Détermine si ce nœud est une première ou une dernière tâche du DAG. name := "" if b.OriginWorkflow.Graph.Items[graphItemID].Processing != nil { name = b.OriginWorkflow.Graph.Items[graphItemID].Processing.GetName() } if b.OriginWorkflow.Graph.Items[graphItemID].Workflow != nil { name = b.OriginWorkflow.Graph.Items[graphItemID].Workflow.GetName() } if len(step.Dependencies) == 0 && name != "" { firstItems = append(firstItems, getArgoName(name, graphItemID)) } if ok, _ := b.isArgoDependancy(graphItemID); !ok && name != "" { lastItems = append(lastItems, getArgoName(name, graphItemID)) } dag.Tasks = append(dag.Tasks, step) return dag, firstItems, lastItems } // createVolumes crée les PersistentVolumeClaims Argo (volumeClaimTemplates) // pour chaque volume local référencé dans les templates de processing. // TODO: gérer les volumes distants. func (b *ArgoBuilder) createVolumes(exec *workflow_execution.WorkflowExecution, volumes []VolumeMount) { for _, volume := range volumes { index := 0 if s, ok := exec.SelectedInstances[volume.Storage.GetID()]; ok { index = s } storage := volume.Storage.Instances[index] new_volume := VolumeClaimTemplate{} new_volume.Metadata.Name = strings.ReplaceAll(strings.ToLower(volume.Name), " ", "-") new_volume.Spec.AccessModes = []string{"ReadWriteOnce"} new_volume.Spec.Resources.Requests.Storage = fmt.Sprintf("%v", storage.SizeGB) + storage.SizeType.ToArgo() b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) } } // isArgoDependancy vérifie si le nœud identifié par id est une dépendance // d'au moins un autre nœud du DAG (i.e. s'il existe un lien sortant vers // un processing ou un workflow). // Retourne true + la liste des noms Argo des nœuds qui en dépendent. func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) { dependancyOfIDs := []string{} isDeps := false for _, link := range b.OriginWorkflow.Graph.Links { if _, ok := b.OriginWorkflow.Graph.Items[link.Destination.ID]; !ok { logger.Info().Msg(fmt.Sprint("Could not find the source of the link", link.Destination.ID)) continue } source := b.OriginWorkflow.Graph.Items[link.Destination.ID].Processing if id == link.Source.ID && source != nil { isDeps = true dependancyOfIDs = append(dependancyOfIDs, getArgoName(source.GetName(), link.Destination.ID)) } wourceWF := b.OriginWorkflow.Graph.Items[link.Destination.ID].Workflow if id == link.Source.ID && wourceWF != nil { isDeps = true dependancyOfIDs = append(dependancyOfIDs, getArgoName(wourceWF.GetName(), link.Destination.ID)) } } return isDeps, dependancyOfIDs } // getArgoDependencies retourne la liste des noms de tâches Argo dont dépend // le nœud identifié par id (liens entrants depuis des processings). func (b *ArgoBuilder) getArgoDependencies(id string) (dependencies []string) { for _, link := range b.OriginWorkflow.Graph.Links { if _, ok := b.OriginWorkflow.Graph.Items[link.Source.ID]; !ok { logger.Info().Msg(fmt.Sprint("Could not find the source of the link", link.Source.ID)) continue } source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing if id == link.Destination.ID && source != nil { dependency_name := getArgoName(source.GetName(), link.Source.ID) dependencies = append(dependencies, dependency_name) continue } } return } // getArgoName construit le nom unique d'une tâche / template Argo à partir // du nom humain de la ressource et de son ID dans le graphe. // Les espaces sont remplacés par des tirets et tout est mis en minuscules. func getArgoName(raw_name string, component_id string) (formatedName string) { formatedName = strings.ReplaceAll(raw_name, " ", "-") formatedName += "-" + component_id formatedName = strings.ToLower(formatedName) return } // isReparted vérifie si le processing est hébergé sur un Compute appartenant // à un peer distant (Relation != 1, i.e. pas le peer local). // Si c'est le cas, elle retourne true et le Peer concerné pour qu'Admiralty // puisse router les pods vers le bon cluster. func (b *ArgoBuilder) isReparted(processing resources.ResourceInterface, graphID string) (bool, *peer.Peer) { computeAttached := b.retrieveProcessingCompute(graphID) if computeAttached == nil { logger.Error().Msg("No compute was found attached to processing " + processing.GetName() + " : " + processing.GetID()) panic(0) } // Résolution du Peer propriétaire du Compute via l'API oc-lib. req := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil) if req == nil { fmt.Println("TODO : handle error when trying to create a request on the Peer Collection") return false, nil } res := req.LoadOne(computeAttached.CreatorID) if res.Err != "" { fmt.Print("TODO : handle error when requesting PeerID") fmt.Print(res.Err) return false, nil } peer := res.ToPeer() // Relation == 1 signifie "moi-même" : le processing est local. isNotReparted := peer.Relation == 1 logger.Info().Msg(fmt.Sprint("Result IsMySelf for ", peer.UUID, " : ", isNotReparted)) return !isNotReparted, peer } // retrieveProcessingCompute parcourt les liens du graphe pour retrouver // la ressource de Compute directement connectée au nœud graphID. // Retourne nil si aucun Compute n'est trouvé. func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.ComputeResource { for _, link := range b.OriginWorkflow.Graph.Links { var oppositeId string if link.Source.ID == graphID { oppositeId = link.Destination.ID } else if link.Destination.ID == graphID { oppositeId = link.Source.ID } if oppositeId != "" { dt, res := b.OriginWorkflow.Graph.GetResource(oppositeId) if dt == oclib.COMPUTE_RESOURCE { return res.(*resources.ComputeResource) } else { continue } } } return nil } // waitForConsiders publie un ArgoKubeEvent sur le canal NATS ARGO_KUBE_EVENT // puis se bloque jusqu'à réception d'un PropalgationMessage vérifiant : // - Action == PB_CONSIDERS // - DataType == dataType (COMPUTE_RESOURCE ou STORAGE_RESOURCE) // - Payload décodé en JSON contenant "executions_id" == executionsId // // Cela garantit que l'infrastructure distante (Admiralty ou Minio) a bien // pris en compte la demande avant que la construction du workflow continue. // Un timeout de 5 minutes est appliqué pour éviter un blocage indéfini. func waitForConsiders(executionsId string, dataType tools.DataType, event ArgoKubeEvent) { // Sérialise l'événement et le publie sur ARGO_KUBE_EVENT. b, err := json.Marshal(event) if err != nil { logger.Error().Msg("Cannot marshal ArgoKubeEvent: " + err.Error()) return } tools.NewNATSCaller().SetNATSPub(tools.ARGO_KUBE_EVENT, tools.NATSResponse{ FromApp: "oc-monitord", Datatype: dataType, User: "root", Method: int(tools.PROPALGATION_EVENT), Payload: b, }) // Connexion NATS pour écouter la réponse PB_CONSIDERS. natsURL := oclib_config.GetConfig().NATSUrl if natsURL == "" { logger.Error().Msg("NATS_SERVER not set, skipping PB_CONSIDERS wait") return } nc, err := nats.Connect(natsURL) if err != nil { logger.Error().Msg("NATS connect error waiting for PB_CONSIDERS: " + err.Error()) return } defer nc.Close() // Souscription au canal PROPALGATION_EVENT avec un buffer de 64 messages. ch := make(chan *nats.Msg, 64) sub, err := nc.ChanSubscribe(tools.PROPALGATION_EVENT.GenerateKey(), ch) if err != nil { logger.Error().Msg("NATS subscribe error waiting for PB_CONSIDERS: " + err.Error()) return } defer sub.Unsubscribe() timeout := time.After(5 * time.Minute) for { select { case msg := <-ch: // Désérialise le message en PropalgationMessage. var pm tools.PropalgationMessage if err := json.Unmarshal(msg.Data, &pm); err != nil { continue } // Filtre : action, type de données. if pm.Action != tools.PB_CONSIDERS || pm.DataType != int(dataType) { continue } // Filtre : executions_id dans le Payload du PropalgationMessage. var body struct { ExecutionsID string `json:"executions_id"` } if err := json.Unmarshal(pm.Payload, &body); err != nil { continue } if body.ExecutionsID != executionsId { continue } logger.Info().Msg(fmt.Sprintf("PB_CONSIDERS received for executions_id=%s datatype=%s", executionsId, dataType.String())) return case <-timeout: logger.Warn().Msg(fmt.Sprintf("Timeout waiting for PB_CONSIDERS executions_id=%s datatype=%s", executionsId, dataType.String())) return } } } // ArgoKubeEvent est la structure publiée sur NATS lors de la demande de // provisionnement d'une ressource distante (Admiralty ou stockage S3). // Le champ OriginID identifie le peer initiateur : c'est vers lui que la // réponse PB_CONSIDERS sera routée par le système de propagation. type ArgoKubeEvent struct { // ExecutionsID est l'identifiant de l'exécution de workflow en cours. ExecutionsID string `json:"executions_id"` // DestPeerID est le peer de destination (compute ou peer S3 cible). DestPeerID string `json:"dest_peer_id"` // Type indique la nature de la ressource : COMPUTE_RESOURCE ou STORAGE_RESOURCE. Type tools.DataType `json:"data_type"` // SourcePeerID est le peer source de la ressource demandée. SourcePeerID string `json:"source_peer_id"` // OriginID est le peer qui a initié la demande de provisionnement ; // la réponse PB_CONSIDERS lui sera renvoyée. OriginID string `json:"origin_id"` } // CompleteBuild finalise la construction du workflow Argo après la génération // du DAG. Elle effectue dans l'ordre : // 1. Pour chaque peer distant (Admiralty) : publie un ArgoKubeEvent de type // COMPUTE_RESOURCE et attend la validation PB_CONSIDERS via waitForConsiders. // 2. Met à jour les annotations Admiralty des templates avec le nom de cluster // construit à partir du peerId et de l'executionsId. // 3. Sérialise le workflow en YAML et l'écrit dans ./argo_workflows/. // // Retourne le chemin du fichier YAML généré. func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) { logger.Info().Msg("DEV :: Completing build") // --- Étape 1 : validation Admiralty pour chaque peer distant --- for _, peer := range b.RemotePeers { logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer)) // Publie l'événement COMPUTE_RESOURCE et attend PB_CONSIDERS (bloquant). waitForConsiders(executionsId, tools.COMPUTE_RESOURCE, ArgoKubeEvent{ ExecutionsID: executionsId, Type: tools.COMPUTE_RESOURCE, DestPeerID: conf.GetConfig().PeerID, SourcePeerID: peer, OriginID: conf.GetConfig().PeerID, }) } // --- Étape 2 : mise à jour du nom de cluster Admiralty --- // Le nom final du cluster cible est "target--". for _, template := range b.Workflow.Spec.Templates { if len(template.Metadata.Annotations) > 0 { if peerId, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok { template.Metadata.Annotations["multicluster.admiralty.io/clustername"] = "target-" + tools.GetConcatenatedName(peerId, executionsId) } } } // --- Étape 3 : génération et écriture du fichier YAML --- random_name := fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8) b.Workflow.Metadata.Name = "oc-monitor-" + random_name logger = oclib.GetLogger() yamlified, err := yaml.Marshal(b.Workflow) if err != nil { logger.Error().Msg("Could not transform object to yaml file") return "", err } // Nom de fichier horodaté au format DD_MM_YYYY_hhmmss. current_timestamp := time.Now().Format("02_01_2006_150405") file_name := random_name + "_" + current_timestamp + ".yml" workflows_dir := "./argo_workflows/" err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660) if err != nil { logger.Error().Msg("Could not write the yaml file") return "", err } return workflows_dir + file_name, nil }