package nats import ( "context" "encoding/json" "fmt" "oc-datacenter/infrastructure" "oc-datacenter/infrastructure/admiralty" "oc-datacenter/infrastructure/kubernetes" "oc-datacenter/infrastructure/kubernetes/models" "oc-datacenter/infrastructure/storage" "sync" oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/tools" ) // roleWaiters maps executionID → channel expecting the role-assignment message from OC discovery. var roleWaiters sync.Map // ArgoKubeEvent carries the peer-routing metadata for a resource provisioning event. // // Dispatch rules (evaluated in order): // 1. Type == STORAGE_RESOURCE → Minio credentials or PVC // 2. (Type == PROCESSING_RESOURCE || DATA_RESOURCE) && SourceResourceID != "" → Minio++ pre-signed URL (Phase 4) // 3. everything else → Admiralty kubeconfig type ArgoKubeEvent struct { ExecutionsID string `json:"executions_id"` DestPeerID string `json:"dest_peer_id"` Type tools.DataType `json:"data_type"` SourcePeerID string `json:"source_peer_id"` MinioID string `json:"minio_id,omitempty"` // Local signals that this STORAGE_RESOURCE event is for a local PVC (not Minio). Local bool `json:"local,omitempty"` StorageName string `json:"storage_name,omitempty"` // OriginID is the peer that initiated the request; the PB_CONSIDERS // response is routed back to this peer once provisioning completes. OriginID string `json:"origin_id,omitempty"` // Images is the list of container images to pre-pull on the target peer // before the workflow starts. Empty for STORAGE_RESOURCE / PROCESSING_RESOURCE source events. Images []string `json:"images,omitempty"` // SourceResourceID is non-empty only for Phase 4 (isReachable=false): // it identifies the Processing or Data resource whose binary/data must be // fetched via a pre-signed Minio URL. When set, the event is a Minio++ request, // NOT an Admiralty compute event. SourceResourceID string `json:"source_resource_id,omitempty"` } // ListenNATS starts all NATS subscriptions for the infrastructure layer. // Must be launched in a goroutine from main. func ListenNATS() { tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){ // ─── ARGO_KUBE_EVENT ──────────────────────────────────────────────────────── // Triggered by oc-discovery to notify this peer of a provisioning task. // Dispatches to Admiralty, Minio, or local PVC based on event fields. tools.ARGO_KUBE_EVENT: func(resp tools.NATSResponse) { argo := &ArgoKubeEvent{} if err := json.Unmarshal(resp.Payload, argo); err != nil { return } kube := kubernetes.NewKubernetesService(argo.ExecutionsID) isSourcePresign := argo.SourceResourceID != "" && (argo.Type == tools.PROCESSING_RESOURCE || argo.Type == tools.DATA_RESOURCE) if argo.Type == tools.STORAGE_RESOURCE { if argo.Local { fmt.Println("DETECT LOCAL PVC ARGO_KUBE_EVENT") // ── Local PVC provisioning ────────────────────────────────── setter := storage.NewPVCSetter(argo.ExecutionsID, argo.MinioID) event := storage.PVCProvisionEvent{ ExecutionsID: argo.ExecutionsID, StorageID: argo.MinioID, StorageName: argo.StorageName, SourcePeerID: argo.SourcePeerID, DestPeerID: argo.DestPeerID, OriginID: argo.OriginID, } if argo.SourcePeerID == argo.DestPeerID { fmt.Println("CONFIG PVC MYSELF") err := kube.CreateNamespace() fmt.Println("NS", err) go setter.InitializeAsSource(context.Background(), event, true) } else { // Cross-peer: route to dest peer via PB_PVC_CONFIG. if b, err := json.Marshal(event); err == nil { if b2, err := json.Marshal(&tools.PropalgationMessage{ Payload: b, Action: tools.PB_PVC_CONFIG, }); err == nil { fmt.Println("CONFIG PVC THEM") go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-datacenter", Datatype: -1, User: resp.User, Method: int(tools.PROPALGATION_EVENT), Payload: b2, }) } } } } else { fmt.Println("DETECT STORAGE ARGO_KUBE_EVENT") // ── Minio credential provisioning ────────────────────────────── setter := storage.NewMinioSetter(argo.ExecutionsID, argo.MinioID) if argo.SourcePeerID == argo.DestPeerID { fmt.Println("CONFIG MYSELF") err := kube.CreateNamespace() fmt.Println("NS", err) go setter.InitializeAsSource(context.Background(), argo.SourcePeerID, argo.DestPeerID, argo.OriginID, true) } else { // Different peers: publish Phase-1 PB_MINIO_CONFIG (Access == "") // so oc-discovery routes the role-assignment to the Minio host. phase1 := storage.MinioCredentialEvent{ ExecutionsID: argo.ExecutionsID, MinioID: argo.MinioID, SourcePeerID: argo.SourcePeerID, DestPeerID: argo.DestPeerID, OriginID: argo.OriginID, } if b, err := json.Marshal(phase1); err == nil { if b2, err := json.Marshal(&tools.PropalgationMessage{ Payload: b, Action: tools.PB_MINIO_CONFIG, }); err == nil { fmt.Println("CONFIG THEM") go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-datacenter", Datatype: -1, User: resp.User, Method: int(tools.PROPALGATION_EVENT), Payload: b2, }) } } } } } else if isSourcePresign { fmt.Println("DETECT SOURCE PRESIGN ARGO_KUBE_EVENT", argo.Type, argo.SourceResourceID) // ── Minio++ : génération d'URL pré-signée pour source privée ── presigner := storage.NewSourcePresigner(argo.ExecutionsID, argo.SourceResourceID) event := storage.SourcePresignEvent{ ExecutionsID: argo.ExecutionsID, SourceResourceID: argo.SourceResourceID, SourcePeerID: argo.SourcePeerID, DestPeerID: argo.DestPeerID, OriginID: argo.OriginID, DataType: argo.Type.EnumIndex(), } if argo.SourcePeerID == argo.DestPeerID { // Même peer : génère directement l'URL et émet CONSIDERS_EVENT local. go presigner.InitializeAsSource(context.Background(), event, true) } else { // Cross-peer : route via PROPALGATION_EVENT(PB_SOURCE_PRESIGN) // vers le peer propriétaire de la ressource. if b, err := json.Marshal(event); err == nil { if b2, err := json.Marshal(&tools.PropalgationMessage{ Payload: b, Action: tools.PB_SOURCE_PRESIGN, }); err == nil { fmt.Println("ROUTE SOURCE PRESIGN TO", argo.SourcePeerID) go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-datacenter", Datatype: argo.Type, User: resp.User, Method: int(tools.PROPALGATION_EVENT), Payload: b2, }) } } } } else { fmt.Println("DETECT COMPUTE ARGO_KUBE_EVENT") // ── Pre-pull + Admiralty kubeconfig provisioning ───────────── fmt.Println(argo.SourcePeerID, argo.DestPeerID) if argo.SourcePeerID == argo.DestPeerID { fmt.Println("CONFIG MYSELF") kube := kubernetes.NewKubernetesService(argo.ExecutionsID) err := kube.CreateNamespace() fmt.Println("NS", err) go func(a ArgoKubeEvent) { ctx := context.Background() // Pre-pull en premier : PB_CONSIDERS n'est envoyé qu'après. if len(a.Images) > 0 { if err := kube.RunPrepull(ctx, a.Images); err != nil { logger := oclib.GetLogger() logger.Error().Msgf("RunPrepull local: %v", err) } } admiralty.NewAdmiraltySetter(a.ExecutionsID).InitializeAsSource( ctx, a.SourcePeerID, a.DestPeerID, a.OriginID, true, a.Images) }(*argo) } else if b, err := json.Marshal(argo); err == nil { if b2, err := json.Marshal(&tools.PropalgationMessage{ Payload: b, Action: tools.PB_ADMIRALTY_CONFIG, }); err == nil { fmt.Println("CONFIG THEM") go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{ FromApp: "oc-datacenter", Datatype: -1, User: resp.User, Method: int(tools.PROPALGATION_EVENT), Payload: b2, }) } } } }, // ─── SOURCE_PRESIGN_EVENT ──────────────────────────────────────────────────── // Forwarded by oc-discovery after receiving PB_SOURCE_PRESIGN via libp2p // ProtocolSourcePresignResource. This peer is the resource owner (Minio source). // It generates a pre-signed URL and responds via PB_CONSIDERS → OriginID. tools.SOURCE_PRESIGN_EVENT: func(resp tools.NATSResponse) { event := storage.SourcePresignEvent{} if err := json.Unmarshal(resp.Payload, &event); err != nil { return } fmt.Println("SOURCE_PRESIGN_EVENT received resource=", event.SourceResourceID) presigner := storage.NewSourcePresigner(event.ExecutionsID, event.SourceResourceID) go presigner.InitializeAsSource(context.Background(), event, false) }, // ─── ADMIRALTY_CONFIG_EVENT ───────────────────────────────────────────────── // Forwarded by oc-discovery after receiving via libp2p ProtocolAdmiraltyConfigResource. // Payload is a KubeconfigEvent (phase discriminated by Kubeconfig presence). tools.ADMIRALTY_CONFIG_EVENT: func(resp tools.NATSResponse) { kubeconfigEvent := models.KubeconfigEvent{} if err := json.Unmarshal(resp.Payload, &kubeconfigEvent); err == nil { if kubeconfigEvent.Kubeconfig != "" { // Phase 2: kubeconfig present → this peer is the TARGET (scheduler). fmt.Println("CreateAdmiraltyTarget") admiralty.NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsTarget( context.Background(), kubeconfigEvent, false) } else { kube := kubernetes.NewKubernetesService(kubeconfigEvent.ExecutionsID) err := kube.CreateNamespace() fmt.Println("NS", err) // Phase 1: no kubeconfig → this peer is the SOURCE (compute). if len(kubeconfigEvent.Images) > 0 { if err := kube.RunPrepull(context.Background(), kubeconfigEvent.Images); err != nil { logger := oclib.GetLogger() logger.Error().Msgf("RunPrepull local: %v", err) } } fmt.Println("CreateAdmiraltySource") admiralty.NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsSource( context.Background(), kubeconfigEvent.SourcePeerID, kubeconfigEvent.DestPeerID, kubeconfigEvent.OriginID, false, kubeconfigEvent.Images) } } }, // ─── MINIO_CONFIG_EVENT ────────────────────────────────────────────────────── // Forwarded by oc-discovery after receiving via libp2p ProtocolMinioConfigResource. // Payload is a MinioCredentialEvent (phase discriminated by Access presence). tools.MINIO_CONFIG_EVENT: func(resp tools.NATSResponse) { minioEvent := storage.MinioCredentialEvent{} if err := json.Unmarshal(resp.Payload, &minioEvent); err == nil { if minioEvent.Access != "" { // Phase 2: credentials present → this peer is the TARGET (compute). storage.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsTarget( context.Background(), minioEvent, false) } else { err := kubernetes.NewKubernetesService(minioEvent.ExecutionsID).CreateNamespace() fmt.Println("NS", err) // Phase 1: no credentials → this peer is the SOURCE (Minio host). storage.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsSource( context.Background(), minioEvent.SourcePeerID, minioEvent.DestPeerID, minioEvent.OriginID, false) } } }, // ─── PVC_CONFIG_EVENT ──────────────────────────────────────────────────────── // Forwarded by oc-discovery for cross-peer local PVC provisioning. // The dest peer creates the PVC in its own cluster. tools.PVC_CONFIG_EVENT: func(resp tools.NATSResponse) { event := storage.PVCProvisionEvent{} if err := json.Unmarshal(resp.Payload, &event); err == nil { err := kubernetes.NewKubernetesService(event.ExecutionsID).CreateNamespace() fmt.Println("NS", err) storage.NewPVCSetter(event.ExecutionsID, event.StorageID).InitializeAsSource( context.Background(), event, false) } }, // ─── WORKFLOW_DONE_EVENT ───────────────────────────────────────────────────── // Emitted by oc-monitord when the top-level Argo workflow reaches a terminal // phase. oc-datacenter is responsible only for infrastructure teardown here: // booking/execution state management is handled entirely by oc-scheduler. tools.WORKFLOW_DONE_EVENT: func(resp tools.NATSResponse) { var evt tools.WorkflowLifecycleEvent if err := json.Unmarshal(resp.Payload, &evt); err != nil || evt.ExecutionsID == "" { return } go infrastructure.TeardownForExecution(evt.ExecutionID, evt.ExecutionsID) }, // ─── REMOVE_RESOURCE ──────────────────────────────────────────────────────── // Routed by oc-discovery via ProtocolDeleteResource for datacenter teardown. // Only STORAGE_RESOURCE and COMPUTE_RESOURCE deletions are handled here. tools.REMOVE_RESOURCE: func(resp tools.NATSResponse) { switch resp.Datatype { case tools.STORAGE_RESOURCE: // Try PVC delete first (Local=true), fall back to Minio. pvcEvent := storage.PVCDeleteEvent{} if err := json.Unmarshal(resp.Payload, &pvcEvent); err == nil && pvcEvent.ExecutionsID != "" && pvcEvent.StorageName != "" { go storage.NewPVCSetter(pvcEvent.ExecutionsID, pvcEvent.StorageID). TeardownAsSource(context.Background(), pvcEvent) } else { deleteEvent := storage.MinioDeleteEvent{} if err := json.Unmarshal(resp.Payload, &deleteEvent); err == nil && deleteEvent.ExecutionsID != "" { go storage.NewMinioSetter(deleteEvent.ExecutionsID, deleteEvent.MinioID). TeardownAsSource(context.Background(), deleteEvent) } } case tools.COMPUTE_RESOURCE: argo := &ArgoKubeEvent{} if err := json.Unmarshal(resp.Payload, argo); err == nil && argo.ExecutionsID != "" { go admiralty.NewAdmiraltySetter(argo.ExecutionsID).TeardownAsSource(context.Background()) } } }, }) }