Datacenter no more handle booking but is fully charged with Kube & minio allocate per NATS

This commit is contained in:
mr
2026-02-25 09:08:40 +01:00
parent 41750dc054
commit b45c795002
19 changed files with 1494 additions and 2109 deletions

348
infrastructure/admiralty.go Normal file
View File

@@ -0,0 +1,348 @@
package infrastructure
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"sync"
"time"
"oc-datacenter/conf"
"oc-datacenter/models"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/tools"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// kubeconfigChannels holds channels waiting for kubeconfig delivery (keyed by executionID).
var kubeconfigChannels sync.Map
// kubeconfigEvent is the NATS payload used to transfer the kubeconfig from the source peer to the target peer.
type KubeconfigEvent struct {
DestPeerID string `json:"dest_peer_id"`
ExecutionsID string `json:"executions_id"`
Kubeconfig string `json:"kubeconfig"`
SourcePeerID string `json:"source_peer_id"`
// OriginID is the peer that initiated the provisioning request.
// The PB_CONSIDERS response is routed back to this peer.
OriginID string `json:"origin_id"`
}
// admiraltyConsidersPayload is the PB_CONSIDERS payload emitted after admiralty provisioning.
type admiraltyConsidersPayload struct {
OriginID string `json:"origin_id"`
ExecutionsID string `json:"executions_id"`
Secret string `json:"secret,omitempty"`
Error *string `json:"error,omitempty"`
}
// emitAdmiraltyConsiders publishes a PB_CONSIDERS back to OriginID with the result
// of the admiralty provisioning. secret is the base64-encoded kubeconfig; err is nil on success.
func emitAdmiraltyConsiders(executionsID, originID, secret string, provErr error) {
var errStr *string
if provErr != nil {
s := provErr.Error()
errStr = &s
}
payload, _ := json.Marshal(admiraltyConsidersPayload{
OriginID: originID,
ExecutionsID: executionsID,
Secret: secret,
Error: errStr,
})
b, _ := json.Marshal(&tools.PropalgationMessage{
DataType: tools.COMPUTE_RESOURCE.EnumIndex(),
Action: tools.PB_CONSIDERS,
Payload: payload,
})
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: -1,
Method: int(tools.PROPALGATION_EVENT),
Payload: b,
})
}
// AdmiraltySetter carries the execution context for an admiralty pairing.
type AdmiraltySetter struct {
ExecutionsID string // execution ID, used as the Kubernetes namespace
NodeName string // name of the virtual node created by Admiralty on the target cluster
}
func NewAdmiraltySetter(execIDS string) *AdmiraltySetter {
return &AdmiraltySetter{
ExecutionsID: execIDS,
}
}
// InitializeAsSource is called on the peer that acts as the SOURCE cluster (compute provider).
// It creates the AdmiraltySource resource, generates a kubeconfig for the target peer,
// and publishes it on NATS so the target peer can complete its side of the setup.
func (s *AdmiraltySetter) InitializeAsSource(ctx context.Context, localPeerID string, destPeerID string, originID string) {
logger := oclib.GetLogger()
serv, err := tools.NewKubernetesService(conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort,
conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData)
if err != nil {
logger.Error().Msg("InitializeAsSource: failed to create service: " + err.Error())
return
}
// Create the AdmiraltySource resource on this cluster (inlined from CreateAdmiraltySource controller)
logger.Info().Msg("Creating AdmiraltySource ns-" + s.ExecutionsID)
_, err = serv.CreateAdmiraltySource(ctx, s.ExecutionsID)
if err != nil && !apierrors.IsAlreadyExists(err) {
logger.Error().Msg("InitializeAsSource: failed to create source: " + err.Error())
return
}
// Generate a service-account token for the namespace (inlined from GetAdmiraltyKubeconfig controller)
token, err := serv.GenerateToken(ctx, s.ExecutionsID, 3600)
if err != nil {
logger.Error().Msg("InitializeAsSource: failed to generate token for ns-" + s.ExecutionsID + ": " + err.Error())
return
}
kubeconfig, err := buildHostKubeWithToken(token)
if err != nil {
logger.Error().Msg("InitializeAsSource: " + err.Error())
return
}
b, err := json.Marshal(kubeconfig)
if err != nil {
logger.Error().Msg("InitializeAsSource: failed to marshal kubeconfig: " + err.Error())
return
}
encodedKubeconfig := base64.StdEncoding.EncodeToString(b)
kube := KubeconfigEvent{
ExecutionsID: s.ExecutionsID,
Kubeconfig: encodedKubeconfig,
SourcePeerID: localPeerID,
DestPeerID: destPeerID,
OriginID: originID,
}
if destPeerID == localPeerID {
s.InitializeAsTarget(ctx, kube)
return
}
// Publish the kubeconfig on NATS so the target peer can proceed
payload, err := json.Marshal(kube)
if err != nil {
logger.Error().Msg("InitializeAsSource: failed to marshal kubeconfig event: " + err.Error())
return
}
if b, err := json.Marshal(&tools.PropalgationMessage{
DataType: -1,
Action: tools.PB_ADMIRALTY_CONFIG,
Payload: payload,
}); err == nil {
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: -1,
User: "",
Method: int(tools.PROPALGATION_EVENT),
Payload: b,
})
}
logger.Info().Msg("InitializeAsSource: kubeconfig published for ns-" + s.ExecutionsID)
}
// InitializeAsTarget is called on the peer that acts as the TARGET cluster (scheduler).
// It waits for the kubeconfig published by the source peer via NATS, then creates
// the Secret, AdmiraltyTarget, and polls until the virtual node appears.
// kubeconfigCh must be obtained from RegisterKubeconfigWaiter before this goroutine starts.
func (s *AdmiraltySetter) InitializeAsTarget(ctx context.Context, kubeconfigObj KubeconfigEvent) {
logger := oclib.GetLogger()
defer kubeconfigChannels.Delete(s.ExecutionsID)
logger.Info().Msg("InitializeAsTarget: waiting for kubeconfig from source peer ns-" + s.ExecutionsID)
kubeconfigData := kubeconfigObj.Kubeconfig
serv, err := tools.NewKubernetesService(conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort,
conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData)
if err != nil {
logger.Error().Msg("InitializeAsTarget: failed to create service: " + err.Error())
return
}
// 1. Create the namespace
logger.Info().Msg("InitializeAsTarget: creating Namespace " + s.ExecutionsID)
if err := serv.CreateNamespace(ctx, s.ExecutionsID); err != nil && !apierrors.IsAlreadyExists(err) {
logger.Error().Msg("InitializeAsTarget: failed to create namespace: " + err.Error())
emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err)
return
}
// 2. Create the ServiceAccount sa-{executionID}
logger.Info().Msg("InitializeAsTarget: creating ServiceAccount sa-" + s.ExecutionsID)
if err := serv.CreateServiceAccount(ctx, s.ExecutionsID); err != nil && !apierrors.IsAlreadyExists(err) {
logger.Error().Msg("InitializeAsTarget: failed to create service account: " + err.Error())
emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err)
return
}
// 3. Create the Role
roleName := "role-" + s.ExecutionsID
logger.Info().Msg("InitializeAsTarget: creating Role " + roleName)
if err := serv.CreateRole(ctx, s.ExecutionsID, roleName,
[][]string{
{"coordination.k8s.io"},
{""},
{""}},
[][]string{
{"leases"},
{"secrets"},
{"pods"}},
[][]string{
{"get", "create", "update"},
{"get"},
{"patch"}},
); err != nil && !apierrors.IsAlreadyExists(err) {
logger.Error().Msg("InitializeAsTarget: failed to create role: " + err.Error())
emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err)
return
}
// 4. Create the RoleBinding
rbName := "rb-" + s.ExecutionsID
logger.Info().Msg("InitializeAsTarget: creating RoleBinding " + rbName)
if err := serv.CreateRoleBinding(ctx, s.ExecutionsID, rbName, roleName); err != nil && !apierrors.IsAlreadyExists(err) {
logger.Error().Msg("InitializeAsTarget: failed to create role binding: " + err.Error())
emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err)
return
}
// Create the Secret from the source peer's kubeconfig (inlined from CreateKubeSecret controller)
logger.Info().Msg("InitializeAsTarget: creating Secret ns-" + s.ExecutionsID)
if _, err := serv.CreateKubeconfigSecret(ctx, kubeconfigData, s.ExecutionsID, kubeconfigObj.SourcePeerID); err != nil {
logger.Error().Msg("InitializeAsTarget: failed to create kubeconfig secret: " + err.Error())
emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err)
return
}
// Create the AdmiraltyTarget resource (inlined from CreateAdmiraltyTarget controller)
logger.Info().Msg("InitializeAsTarget: creating AdmiraltyTarget ns-" + s.ExecutionsID)
resp, err := serv.CreateAdmiraltyTarget(ctx, s.ExecutionsID, kubeconfigObj.SourcePeerID)
if err != nil || resp == nil {
logger.Error().Msg(fmt.Sprintf("InitializeAsTarget: failed to create admiralty target: %v", err))
if err == nil {
err = fmt.Errorf("CreateAdmiraltyTarget returned nil response")
}
emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, "", err)
return
}
// Poll until the virtual node appears (inlined from GetNodeReady controller)
logger.Info().Msg("InitializeAsTarget: waiting for virtual node ns-" + s.ExecutionsID)
s.waitForNode(ctx, serv, kubeconfigObj.SourcePeerID)
emitAdmiraltyConsiders(s.ExecutionsID, kubeconfigObj.OriginID, kubeconfigData, nil)
}
// waitForNode polls GetOneNode until the Admiralty virtual node appears on this cluster.
func (s *AdmiraltySetter) waitForNode(ctx context.Context, serv *tools.KubernetesService, sourcePeerID string) {
logger := oclib.GetLogger()
for i := range 5 {
time.Sleep(10 * time.Second)
node, err := serv.GetOneNode(ctx, s.ExecutionsID, sourcePeerID)
if err == nil && node != nil {
s.NodeName = node.Name
logger.Info().Msg("waitForNode: node ready: " + s.NodeName)
return
}
if i == 4 {
logger.Error().Msg("waitForNode: node never appeared for ns-" + s.ExecutionsID)
return
}
logger.Info().Msg("waitForNode: node not ready yet, retrying...")
}
}
// TeardownAsTarget destroys all Admiralty resources created by InitializeAsTarget on the
// target (scheduler) cluster: the AdmiraltyTarget CRD, the ServiceAccount, the Role,
// the RoleBinding, and the namespace (namespace deletion cascades the rest).
func (s *AdmiraltySetter) TeardownAsTarget(ctx context.Context, originID string) {
logger := oclib.GetLogger()
serv, err := tools.NewKubernetesService(conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort,
conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData)
if err != nil {
logger.Error().Msg("TeardownAsTarget: failed to create k8s service: " + err.Error())
return
}
if err := serv.DeleteNamespace(ctx, s.ExecutionsID, func() {
logger.Info().Msg("TeardownAsTarget: namespace " + s.ExecutionsID + " deleted")
}); err != nil {
logger.Error().Msg("TeardownAsTarget: " + err.Error())
return
}
}
// TeardownAsSource destroys all Admiralty resources created by InitializeAsSource on the
// source (compute) cluster: the AdmiraltySource CRD, the ServiceAccount, and the namespace.
// The namespace deletion cascades the Role and RoleBinding.
func (s *AdmiraltySetter) TeardownAsSource(ctx context.Context) {
logger := oclib.GetLogger()
host := conf.GetConfig().KubeHost + ":" + conf.GetConfig().KubePort
ca := conf.GetConfig().KubeCA
cert := conf.GetConfig().KubeCert
data := conf.GetConfig().KubeData
// Delete the AdmiraltySource CRD via dynamic client
gvrSources := schema.GroupVersionResource{
Group: "multicluster.admiralty.io", Version: "v1alpha1", Resource: "sources",
}
if dyn, err := tools.NewDynamicClient(host, ca, cert, data); err != nil {
logger.Error().Msg("TeardownAsSource: failed to create dynamic client: " + err.Error())
} else if err := dyn.Resource(gvrSources).Namespace(s.ExecutionsID).Delete(
ctx, "source-"+s.ExecutionsID, metav1.DeleteOptions{},
); err != nil {
logger.Error().Msg("TeardownAsSource: failed to delete AdmiraltySource: " + err.Error())
}
// Delete the namespace (cascades SA, Role, RoleBinding)
serv, err := tools.NewKubernetesService(host, ca, cert, data)
if err != nil {
logger.Error().Msg("TeardownAsSource: failed to create k8s service: " + err.Error())
return
}
if err := serv.Set.CoreV1().Namespaces().Delete(ctx, s.ExecutionsID, metav1.DeleteOptions{}); err != nil {
logger.Error().Msg("TeardownAsSource: failed to delete namespace: " + err.Error())
return
}
logger.Info().Msg("TeardownAsSource: namespace " + s.ExecutionsID + " deleted")
}
// buildHostKubeWithToken builds a kubeconfig pointing to this peer's cluster,
// authenticated with the provided service-account token.
func buildHostKubeWithToken(token string) (*models.KubeConfigValue, error) {
if len(token) == 0 {
return nil, fmt.Errorf("buildHostKubeWithToken: empty token")
}
encodedCA := base64.StdEncoding.EncodeToString([]byte(conf.GetConfig().KubeCA))
return &models.KubeConfigValue{
APIVersion: "v1",
CurrentContext: "default",
Kind: "Config",
Preferences: struct{}{},
Clusters: []models.KubeconfigNamedCluster{{
Name: "default",
Cluster: models.KubeconfigCluster{
Server: "https://" + conf.GetConfig().KubeHost + ":6443",
CertificateAuthorityData: encodedCA,
},
}},
Contexts: []models.KubeconfigNamedContext{{
Name: "default",
Context: models.KubeconfigContext{Cluster: "default", User: "default"},
}},
Users: []models.KubeconfigUser{{
Name: "default",
User: models.KubeconfigUserKeyPair{Token: token},
}},
}, nil
}

View File

@@ -0,0 +1,244 @@
package infrastructure
import (
"context"
"fmt"
"sync"
"time"
"oc-datacenter/infrastructure/minio"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
"go.mongodb.org/mongo-driver/bson/primitive"
)
// processedBookings tracks booking IDs whose start-expiry has already been handled.
// Resets on restart; teardown methods are idempotent so duplicate runs are safe.
var processedBookings sync.Map
// processedEndBookings tracks booking IDs whose end-expiry (Admiralty source cleanup)
// has already been triggered in this process lifetime.
var processedEndBookings sync.Map
// closingStates is the set of terminal booking states after which infra must be torn down.
var closingStates = map[enum.BookingStatus]bool{
enum.FAILURE: true,
enum.SUCCESS: true,
enum.FORGOTTEN: true,
enum.CANCELLED: true,
}
// WatchBookings starts a passive loop that ticks every minute, scans bookings whose
// ExpectedStartDate + 1 min has passed, transitions them to terminal states when needed,
// and tears down the associated Kubernetes / Minio infrastructure.
// Must be launched in a goroutine from main.
func WatchBookings() {
logger := oclib.GetLogger()
logger.Info().Msg("BookingWatchdog: started")
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
if err := scanExpiredBookings(); err != nil {
logger.Error().Msg("BookingWatchdog: " + err.Error())
}
if err := scanEndedExec(); err != nil {
logger.Error().Msg("BookingWatchdog: " + err.Error())
}
}
}
// scanExpiredBookings queries all bookings whose start deadline has passed and
// dispatches each one to processExpiredBooking.
func scanExpiredBookings() error {
myself, err := oclib.GetMySelf()
if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err)
}
peerID := myself.GetID()
deadline := time.Now().Add(-time.Minute)
res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"expected_start_date": {{
Operator: dbs.LTE.String(),
Value: primitive.NewDateTimeFromTime(deadline),
}},
},
}, "", false)
if res.Err != "" {
return fmt.Errorf("booking search failed: %s", res.Err)
}
for _, dbo := range res.Data {
b, ok := dbo.(*bookingmodel.Booking)
if !ok {
continue
}
go processExpiredBooking(b, peerID)
}
return nil
}
// processExpiredBooking transitions the booking to a terminal state when applicable,
// then tears down infrastructure based on the resource type:
// - LIVE_DATACENTER / COMPUTE_RESOURCE → Admiralty (as target) + Minio (as target)
// - LIVE_STORAGE / STORAGE_RESOURCE → Minio (as source)
func processExpiredBooking(b *bookingmodel.Booking, peerID string) {
logger := oclib.GetLogger()
ctx := context.Background()
// Skip bookings already handled during this process lifetime.
if _, done := processedBookings.Load(b.GetID()); done {
return
}
// Transition non-terminal bookings.
if !closingStates[b.State] {
var newState enum.BookingStatus
switch b.State {
case enum.DRAFT, enum.DELAYED:
// DRAFT: never launched; DELAYED: was SCHEDULED but start never arrived.
newState = enum.FORGOTTEN
case enum.SCHEDULED:
// Passed its start date without ever being launched.
newState = enum.FAILURE
case enum.STARTED:
// A running booking is never auto-closed by the watchdog.
return
default:
return
}
upd := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil).
UpdateOne(map[string]any{"state": newState.EnumIndex()}, b.GetID())
if upd.Err != "" {
logger.Error().Msgf("BookingWatchdog: failed to update booking %s: %s", b.GetID(), upd.Err)
return
}
b.State = newState
logger.Info().Msgf("BookingWatchdog: booking %s (exec=%s, type=%s) → %s",
b.GetID(), b.ExecutionsID, b.ResourceType, b.State)
}
// Mark as handled before triggering async teardown (avoids double-trigger on next tick).
processedBookings.Store(b.GetID(), struct{}{})
// Tear down infrastructure according to resource type.
switch b.ResourceType {
case tools.LIVE_DATACENTER, tools.COMPUTE_RESOURCE:
logger.Info().Msgf("BookingWatchdog: tearing down compute infra exec=%s", b.ExecutionsID)
go NewAdmiraltySetter(b.ExecutionsID).TeardownAsSource(ctx) // i'm the compute units.
go teardownMinioForComputeBooking(ctx, b, peerID)
case tools.LIVE_STORAGE, tools.STORAGE_RESOURCE:
logger.Info().Msgf("BookingWatchdog: tearing down storage infra exec=%s", b.ExecutionsID)
go teardownMinioSourceBooking(ctx, b, peerID)
}
}
// scanEndedBookings queries LIVE_DATACENTER / COMPUTE_RESOURCE bookings whose
// ExpectedEndDate + 1 min has passed and triggers TeardownAsSource for Admiralty,
// cleaning up the compute-side namespace once the execution window is over.
func scanEndedExec() error {
myself, err := oclib.GetMySelf()
if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err)
}
peerID := myself.GetID()
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", peerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
// Only compute bookings require Admiralty source cleanup.
"state": {{
Operator: dbs.GT.String(),
Value: 2,
}},
},
}, "", false)
if res.Err != "" {
return fmt.Errorf("ended-booking search failed: %s", res.Err)
}
for _, dbo := range res.Data {
b, ok := dbo.(*workflow_execution.WorkflowExecution)
if !ok {
continue
}
go teardownAdmiraltyTarget(b)
}
return nil
}
// teardownAdmiraltySource triggers TeardownAsSource for the compute-side namespace
// of an execution whose expected end date has passed.
func teardownAdmiraltyTarget(b *workflow_execution.WorkflowExecution) {
logger := oclib.GetLogger()
// Each executionsID is processed at most once per process lifetime.
if _, done := processedEndBookings.Load(b.ExecutionsID); done {
return
}
processedEndBookings.Store(b.ExecutionsID, struct{}{})
logger.Info().Msgf("BookingWatchdog: tearing down Admiralty source exec=%s (booking=%s)",
b.ExecutionsID, b.GetID())
if p, err := oclib.GetMySelf(); err == nil {
NewAdmiraltySetter(b.ExecutionsID).TeardownAsTarget(context.Background(), p.GetID())
}
}
// teardownMinioForComputeBooking finds the LIVE_STORAGE bookings belonging to the same
// execution and triggers Minio-as-target teardown for each (K8s secret + configmap).
// The Minio-as-source side is handled separately by the storage booking's own watchdog pass.
func teardownMinioForComputeBooking(ctx context.Context, computeBooking *bookingmodel.Booking, localPeerID string) {
logger := oclib.GetLogger()
res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", localPeerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"executions_id": {{Operator: dbs.EQUAL.String(), Value: computeBooking.ExecutionsID}},
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}},
},
}, "", false)
if res.Err != "" || len(res.Data) == 0 {
logger.Warn().Msgf("BookingWatchdog: no storage booking found for exec=%s", computeBooking.ExecutionsID)
return
}
for _, dbo := range res.Data {
sb, ok := dbo.(*bookingmodel.Booking)
if !ok {
continue
}
event := minio.MinioDeleteEvent{
ExecutionsID: computeBooking.ExecutionsID,
MinioID: sb.ResourceID,
SourcePeerID: sb.DestPeerID, // peer hosting Minio
DestPeerID: localPeerID, // this peer (compute/target)
OriginID: "",
}
minio.NewMinioSetter(computeBooking.ExecutionsID, sb.ResourceID).TeardownAsTarget(ctx, event)
}
}
// teardownMinioSourceBooking triggers Minio-as-source teardown for a storage booking:
// revokes the scoped service account and removes the execution bucket on this Minio host.
func teardownMinioSourceBooking(ctx context.Context, b *bookingmodel.Booking, localPeerID string) {
event := minio.MinioDeleteEvent{
ExecutionsID: b.ExecutionsID,
MinioID: b.ResourceID,
SourcePeerID: localPeerID, // this peer IS the Minio host
DestPeerID: b.DestPeerID,
OriginID: "",
}
minio.NewMinioSetter(b.ExecutionsID, b.ResourceID).TeardownAsSource(ctx, event)
}

View File

@@ -1,47 +0,0 @@
package infrastructure
import (
"context"
"errors"
"oc-datacenter/conf"
v1 "k8s.io/api/core/v1"
)
type Infrastructure interface {
CreateNamespace(ctx context.Context, ns string) error
DeleteNamespace(ctx context.Context, ns string) error
GenerateToken(ctx context.Context, ns string, duration int) (string, error)
CreateServiceAccount(ctx context.Context, ns string) error
CreateRoleBinding(ctx context.Context, ns string, roleBinding string, role string) error
CreateRole(ctx context.Context, ns string, role string, groups [][]string, resources [][]string, verbs [][]string) error
GetTargets(ctx context.Context) ([]string, error)
CreateAdmiraltySource(context context.Context, executionId string) ([]byte, error)
CreateKubeconfigSecret(context context.Context, kubeconfig string, executionId string, peerId string) ([]byte, error)
GetKubeconfigSecret(context context.Context, executionId string, peerId string) ([]byte, error)
CreateAdmiraltyTarget(context context.Context, executionId string, peerId string) ([]byte, error)
GetOneNode(context context.Context, executionID string, peerId string) (*v1.Node, error)
GetNamespace(context context.Context, executionID string) (*v1.Namespace, error)
CreateSecret(context context.Context, minioId string, executionID string, access string, secret string) error
CheckHealth() error
}
var _service = map[string]func() (Infrastructure, error){
"kubernetes": NewKubernetesService,
}
func NewServiceByType(t string) (Infrastructure, error) {
service, ok := _service[t]
if !ok {
return nil, errors.New("service not found")
}
return service()
}
func NewService() (Infrastructure, error) {
service, ok := _service[conf.GetConfig().Mode]
if !ok {
return nil, errors.New("service not found")
}
return service()
}

View File

@@ -1,616 +0,0 @@
package infrastructure
import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"oc-datacenter/conf"
"oc-datacenter/infrastructure/monitor"
"strings"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
authv1 "k8s.io/api/authentication/v1"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
apply "k8s.io/client-go/applyconfigurations/core/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
var gvrSources = schema.GroupVersionResource{Group: "multicluster.admiralty.io", Version: "v1alpha1", Resource: "sources"}
var gvrTargets = schema.GroupVersionResource{Group: "multicluster.admiralty.io", Version: "v1alpha1", Resource: "targets"}
type KubernetesService struct {
Set *kubernetes.Clientset
}
func NewDynamicClient() (*dynamic.DynamicClient, error) {
config := &rest.Config{
Host: conf.GetConfig().KubeHost + ":" + conf.GetConfig().KubePort,
TLSClientConfig: rest.TLSClientConfig{
CAData: []byte(conf.GetConfig().KubeCA),
CertData: []byte(conf.GetConfig().KubeCert),
KeyData: []byte(conf.GetConfig().KubeData),
},
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, errors.New("Error creating Dynamic client: " + err.Error())
}
if dynamicClient == nil {
return nil, errors.New("Error creating Dynamic client: dynamicClient is nil")
}
return dynamicClient, nil
}
func NewKubernetesService() (Infrastructure, error) {
config := &rest.Config{
Host: conf.GetConfig().KubeHost + ":" + conf.GetConfig().KubePort,
TLSClientConfig: rest.TLSClientConfig{
CAData: []byte(conf.GetConfig().KubeCA),
CertData: []byte(conf.GetConfig().KubeCert),
KeyData: []byte(conf.GetConfig().KubeData),
},
}
// Create clientset
clientset, err := kubernetes.NewForConfig(config)
fmt.Println("NewForConfig", clientset, err)
if err != nil {
return nil, errors.New("Error creating Kubernetes client: " + err.Error())
}
if clientset == nil {
return nil, errors.New("Error creating Kubernetes client: clientset is nil")
}
return &KubernetesService{
Set: clientset,
}, nil
}
func NewRemoteKubernetesService(url string, ca string, cert string, key string) (Infrastructure, error) {
decodedCa, _ := base64.StdEncoding.DecodeString(ca)
decodedCert, _ := base64.StdEncoding.DecodeString(cert)
decodedKey, _ := base64.StdEncoding.DecodeString(key)
config := &rest.Config{
Host: url + ":6443",
TLSClientConfig: rest.TLSClientConfig{
CAData: decodedCa,
CertData: decodedCert,
KeyData: decodedKey,
},
}
// Create clientset
clientset, err := kubernetes.NewForConfig(config)
fmt.Println("NewForConfig", clientset, err)
if err != nil {
return nil, errors.New("Error creating Kubernetes client: " + err.Error())
}
if clientset == nil {
return nil, errors.New("Error creating Kubernetes client: clientset is nil")
}
return &KubernetesService{
Set: clientset,
}, nil
}
func (k *KubernetesService) CreateNamespace(ctx context.Context, ns string) error {
// Define the namespace
fmt.Println("ExecutionID in CreateNamespace() : ", ns)
namespace := &v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: ns,
Labels: map[string]string{
"multicluster-scheduler": "enabled",
},
},
}
// Create the namespace
fmt.Println("Creating namespace...", k.Set)
if _, err := k.Set.CoreV1().Namespaces().Create(ctx, namespace, metav1.CreateOptions{}); err != nil {
return errors.New("Error creating namespace: " + err.Error())
}
fmt.Println("Namespace created successfully!")
return nil
}
func (k *KubernetesService) CreateServiceAccount(ctx context.Context, ns string) error {
// Create the ServiceAccount object
serviceAccount := &v1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: "sa-" + ns,
Namespace: ns,
},
}
// Create the ServiceAccount in the specified namespace
_, err := k.Set.CoreV1().ServiceAccounts(ns).Create(ctx, serviceAccount, metav1.CreateOptions{})
if err != nil {
return errors.New("Failed to create ServiceAccount: " + err.Error())
}
return nil
}
func (k *KubernetesService) CreateRole(ctx context.Context, ns string, role string, groups [][]string, resources [][]string, verbs [][]string) error {
// Create the Role object
if len(groups) != len(resources) || len(resources) != len(verbs) {
return errors.New("Invalid input: groups, resources, and verbs must have the same length")
}
rules := []rbacv1.PolicyRule{}
for i, group := range groups {
rules = append(rules, rbacv1.PolicyRule{
APIGroups: group,
Resources: resources[i],
Verbs: verbs[i],
})
}
r := &rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{
Name: role,
Namespace: ns,
},
Rules: rules,
}
// Create the Role in the specified namespace
_, err := k.Set.RbacV1().Roles(ns).Create(ctx, r, metav1.CreateOptions{})
if err != nil {
return errors.New("Failed to create Role: " + err.Error())
}
return nil
}
func (k *KubernetesService) CreateRoleBinding(ctx context.Context, ns string, roleBinding string, role string) error {
// Create the RoleBinding object
rb := &rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: roleBinding,
Namespace: ns,
},
Subjects: []rbacv1.Subject{
{
Kind: "ServiceAccount",
Name: "sa-" + ns,
Namespace: ns,
},
},
RoleRef: rbacv1.RoleRef{
Kind: "Role",
Name: role,
APIGroup: "rbac.authorization.k8s.io",
},
}
// Create the RoleBinding in the specified namespace
_, err := k.Set.RbacV1().RoleBindings(ns).Create(ctx, rb, metav1.CreateOptions{})
if err != nil {
return errors.New("Failed to create RoleBinding: " + err.Error())
}
return nil
}
func (k *KubernetesService) DeleteNamespace(ctx context.Context, ns string) error {
targetGVR := schema.GroupVersionResource{
Group: "multicluster.admiralty.io",
Version: "v1alpha1",
Resource: "targets",
}
// Delete the Target
dyn, err := NewDynamicClient()
if err != nil {
return err
}
err = dyn.Resource(targetGVR).Namespace(ns).Delete(context.TODO(), "target-"+ns, metav1.DeleteOptions{})
if err != nil {
return err
}
err = k.Set.CoreV1().ServiceAccounts(ns).Delete(context.TODO(), "sa-"+ns, metav1.DeleteOptions{})
if err != nil {
return err
}
// Delete the namespace
if err := k.Set.CoreV1().Namespaces().Delete(ctx, ns, metav1.DeleteOptions{}); err != nil {
return errors.New("Error deleting namespace: " + err.Error())
}
monitor.StreamRegistry.Cancel(ns)
fmt.Println("Namespace deleted successfully!")
return nil
}
// Returns the string representing the token generated for the serviceAccount
// in the namespace identified by the value `ns` with the name sa-`ns`, which is valid for
// `duration` seconds
func (k *KubernetesService) GenerateToken(ctx context.Context, ns string, duration int) (string, error) {
// Define TokenRequest (valid for 1 hour)
d := int64(duration)
tokenRequest := &authv1.TokenRequest{
Spec: authv1.TokenRequestSpec{
ExpirationSeconds: &d, // 1 hour validity
},
}
// Generate the token
token, err := k.Set.CoreV1().
ServiceAccounts(ns).
CreateToken(ctx, "sa-"+ns, tokenRequest, metav1.CreateOptions{})
if err != nil {
return "", errors.New("Failed to create token for ServiceAccount: " + err.Error())
}
return token.Status.Token, nil
}
// Needs refactoring :
// - Retrieving the metada (in a method that Unmarshall the part of the json in a metadata object)
func (k *KubernetesService) GetTargets(ctx context.Context) ([]string, error) {
var listTargets []string
resp, err := getCDRapiKube(*k.Set, ctx, "/apis/multicluster.admiralty.io/v1alpha1/targets")
if err != nil {
return nil, err
}
fmt.Println(string(resp))
var targetDict map[string]interface{}
err = json.Unmarshal(resp, &targetDict)
if err != nil {
fmt.Println("TODO: handle the error when unmarshalling k8s API response")
return nil, err
}
b, _ := json.MarshalIndent(targetDict, "", " ")
fmt.Println(string(b))
data := targetDict["items"].([]interface{})
for _, item := range data {
var metadata metav1.ObjectMeta
item := item.(map[string]interface{})
byteMetada, err := json.Marshal(item["metadata"])
if err != nil {
fmt.Println("Error while Marshalling metadata field")
return nil, err
}
err = json.Unmarshal(byteMetada, &metadata)
if err != nil {
fmt.Println("Error while Unmarshalling metadata field to the library object")
return nil, err
}
listTargets = append(listTargets, metadata.Name)
}
return listTargets, nil
}
// Admiralty Target allows a cluster to deploy pods to remote cluster
//
// The remote cluster must :
//
// - have declared a Source resource
//
// - have declared the same namespace as the one where the pods are created in the local cluster
//
// - have delcared a serviceAccount with sufficient permission to create pods
func (k *KubernetesService) CreateAdmiraltyTarget(context context.Context, executionId string, peerId string) ([]byte, error) {
exists, err := k.GetKubeconfigSecret(context, executionId, peerId)
if err != nil {
fmt.Println("Error verifying kube-secret before creating target")
return nil, err
}
if exists == nil {
fmt.Println("Target needs to be binded to a secret in namespace ", executionId)
return nil, nil // Maybe we could create a wrapper for errors and add more info to have
}
targetName := "target-" + oclib.GetConcatenatedName(peerId, executionId)
target := map[string]interface{}{
"apiVersion": "multicluster.admiralty.io/v1alpha1",
"kind": "Target",
"metadata": map[string]interface{}{
"name": targetName,
"namespace": executionId,
// "labels": map[string]interface{}{
// "peer": peerId,
// },
},
"spec": map[string]interface{}{
"kubeconfigSecret": map[string]string{
"name": "kube-secret-" + oclib.GetConcatenatedName(peerId, executionId),
},
},
}
res, err := dynamicClientApply(executionId, targetName, gvrTargets, context, target)
if err != nil {
return nil, errors.New("Error when trying to apply Target definition :" + err.Error())
}
return res, nil
}
// Admiralty Source allows a cluster to receive pods from a remote cluster
//
// The source must be associated to a serviceAccount, which will execute the pods locally.
// This serviceAccount must have sufficient permission to create and patch pods
//
// This method is temporary to implement the use of Admiralty, but must be edited
// to rather contact the oc-datacenter from the remote cluster to create the source
// locally and retrieve the token for the serviceAccount
func (k *KubernetesService) CreateAdmiraltySource(context context.Context, executionId string) ([]byte, error) {
source := map[string]interface{}{
"apiVersion": "multicluster.admiralty.io/v1alpha1",
"kind": "Source",
"metadata": map[string]interface{}{
"name": "source-" + executionId,
"namespace": executionId,
},
"spec": map[string]interface{}{
"serviceAccountName": "sa-" + executionId,
},
}
res, err := dynamicClientApply(executionId, "source-"+executionId, gvrSources, context, source)
if err != nil {
return nil, errors.New("Error when trying to apply Source definition :" + err.Error())
}
return res, nil
}
// Create a secret from a kubeconfing. Use it to create the secret binded to an Admiralty
// target, which must contain the serviceAccount's token value
func (k *KubernetesService) CreateKubeconfigSecret(context context.Context, kubeconfig string, executionId string, peerId string) ([]byte, error) {
config, err := base64.StdEncoding.DecodeString(kubeconfig)
// config, err := base64.RawStdEncoding.DecodeString(kubeconfig)
if err != nil {
fmt.Println("Error while encoding kubeconfig")
fmt.Println(err)
return nil, err
}
secretApplyConfig := apply.Secret("kube-secret-"+oclib.GetConcatenatedName(peerId, executionId),
executionId).
WithData(map[string][]byte{
"config": config,
},
)
// exists, err := k.GetKubeconfigSecret(context,executionId)
// if err != nil {
// fmt.Println("Error verifying if kube secret exists in namespace ", executionId)
// return nil, err
// }
// if exists != nil {
// fmt.Println("kube-secret already exists in namespace", executionId)
// fmt.Println("Overriding existing kube-secret with a newer resource")
// // TODO : implement DeleteKubeConfigSecret(executionID)
// deleted, err := k.DeleteKubeConfigSecret(executionId)
// _ = deleted
// _ = err
// }
resp, err := k.Set.CoreV1().
Secrets(executionId).
Apply(context,
secretApplyConfig,
metav1.ApplyOptions{
FieldManager: "admiralty-manager",
})
if err != nil {
fmt.Println("Error while trying to contact API to get secret kube-secret-" + executionId)
fmt.Println(err)
return nil, err
}
data, err := json.Marshal(resp)
if err != nil {
fmt.Println("Couldn't marshal resp from : ", data)
fmt.Println(err)
return nil, err
}
return data, nil
}
func (k *KubernetesService) GetKubeconfigSecret(context context.Context, executionId string, peerId string) ([]byte, error) {
resp, err := k.Set.CoreV1().
Secrets(executionId).
Get(context, "kube-secret-"+oclib.GetConcatenatedName(peerId, executionId), metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
fmt.Println("kube-secret not found for execution", executionId)
return nil, nil
}
fmt.Println("Error while trying to contact API to get secret kube-secret-" + executionId)
fmt.Println(err)
return nil, err
}
data, err := json.Marshal(resp)
if err != nil {
fmt.Println("Couldn't marshal resp from : ", data)
fmt.Println(err)
return nil, err
}
return data, nil
}
func (k *KubernetesService) DeleteKubeConfigSecret(executionID string) ([]byte, error) {
return []byte{}, nil
}
func (k *KubernetesService) GetNamespace(context context.Context, executionID string) (*v1.Namespace, error) {
resp, err := k.Set.CoreV1().Namespaces().Get(context, executionID, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
return nil, nil
}
if err != nil {
logger := oclib.GetLogger()
logger.Error().Msg("An error occured when trying to get namespace " + executionID + " : " + err.Error())
return nil, err
}
return resp, nil
}
func getCDRapiKube(client kubernetes.Clientset, ctx context.Context, path string) ([]byte, error) {
resp, err := client.RESTClient().Get().
AbsPath(path).
DoRaw(ctx) // from https://stackoverflow.com/questions/60764908/how-to-access-kubernetes-crd-using-client-go
if err != nil {
fmt.Println("Error from k8s API when getting "+path+" : ", err)
return nil, err
}
return resp, nil
}
func dynamicClientApply(executionId string, resourceName string, resourceDefinition schema.GroupVersionResource, ctx context.Context, object map[string]interface{}) ([]byte, error) {
cli, err := NewDynamicClient()
if err != nil {
return nil, errors.New("Could not retrieve dynamic client when creating Admiralty Source : " + err.Error())
}
res, err := cli.Resource(resourceDefinition).
Namespace(executionId).
Apply(ctx,
resourceName,
&unstructured.Unstructured{Object: object},
metav1.ApplyOptions{
FieldManager: "kubectl-client-side-apply",
},
)
if err != nil {
o, err := json.Marshal(object)
fmt.Println("Error from k8s API when applying "+fmt.Sprint(string(o))+" to "+gvrSources.String()+" : ", err)
return nil, err
}
// We can add more info to the log with the content of resp if not nil
resByte, err := json.Marshal(res)
if err != nil {
// fmt.Println("Error trying to create a Source on remote cluster : ", err , " : ", res)
return nil, err
}
return resByte, nil
}
func (k *KubernetesService) CheckHealth() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
// Check API server connectivity
_, err := k.Set.ServerVersion()
if err != nil {
return fmt.Errorf("API server unreachable: %v", err)
}
// Check nodes status
nodes, err := k.Set.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list nodes: %v", err)
}
for _, node := range nodes.Items {
for _, condition := range node.Status.Conditions {
if condition.Type == "Ready" && condition.Status != "True" {
return fmt.Errorf("node %s not ready", node.Name)
}
}
}
// Optional: Check if all pods in kube-system are running
pods, err := k.Set.CoreV1().Pods("kube-system").List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list pods: %v", err)
}
for _, pod := range pods.Items {
if pod.Status.Phase != "Running" && pod.Status.Phase != "Succeeded" {
return fmt.Errorf("pod %s in namespace kube-system is %s", pod.Name, pod.Status.Phase)
}
}
return nil
}
// Returns the Kubernetes' Node object corresponding to the executionID if it exists on this host
//
// The node is created when an admiralty Target (on host) can connect to an admiralty Source (on remote)
func (k *KubernetesService) GetOneNode(context context.Context, executionID string, peerId string) (*v1.Node, error) {
concatenatedName := oclib.GetConcatenatedName(peerId, executionID)
res, err := k.Set.CoreV1().
Nodes().
List(
context,
metav1.ListOptions{},
)
if err != nil {
fmt.Println("Error getting the list of nodes from k8s API")
fmt.Println(err)
return nil, err
}
for _, node := range res.Items {
if isNode := strings.Contains(node.Name, "admiralty-"+executionID+"-target-"+concatenatedName+"-"); isNode {
return &node, nil
}
}
return nil, nil
}
func (k *KubernetesService) CreateSecret(context context.Context, minioId string, executionID string, access string, secret string) error {
data := map[string][]byte{
"access-key": []byte(access),
"secret-key": []byte(secret),
}
s := v1.Secret{
Type: v1.SecretTypeOpaque,
Data: data,
ObjectMeta: metav1.ObjectMeta{
Name: minioId + "-secret-s3",
},
}
_, err := k.Set.CoreV1().Secrets(executionID).Create(context, &s, metav1.CreateOptions{})
if err != nil {
logger := oclib.GetLogger()
logger.Error().Msg("An error happened when creating the secret holding minio credentials in namespace " + executionID + " : " + err.Error())
return err
}
return nil
}
// Returns a concatenation of the peerId and namespace in order for
// kubernetes ressources to have a unique name, under 63 characters
// and yet identify which peer they are created for
func getConcatenatedName(peerId string, namespace string) string {
s := strings.Split(namespace, "-")[:2]
n := s[0] + "-" + s[1]
return peerId + "-" + n
}

View File

@@ -1,126 +0,0 @@
package infrastructure
import (
"context"
"encoding/json"
"oc-datacenter/conf"
oclib "cloud.o-forge.io/core/oc-lib"
"github.com/minio/madmin-go/v4"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"github.com/necmettindev/randomstring"
)
type MinioService struct{
Url string
RootKey string
RootSecret string
MinioAdminClient *madmin.AdminClient
}
type StatementEntry struct {
Effect string `json:"Effect"`
Action []string `json:"Action"`
Resource string `json:"Resource"`
}
type PolicyDocument struct {
Version string `json:"Version"`
Statement []StatementEntry `json:"Statement"`
}
func NewMinioService(url string) *MinioService {
return &MinioService{
Url: url,
RootKey: conf.GetConfig().MinioRootKey,
RootSecret: conf.GetConfig().MinioRootSecret,
}
}
func (m *MinioService) CreateClient() error {
cred := credentials.NewStaticV4(m.RootKey,m.RootSecret,"")
cli, err := madmin.NewWithOptions(m.Url, &madmin.Options{Creds: cred, Secure: false}) // Maybe in the future we should use the secure option ?
if err != nil {
return err
}
m.MinioAdminClient = cli
return nil
}
func (m *MinioService) CreateCredentials(executionId string) (string,string,error){
policy := PolicyDocument{
Version: "2012-10-17",
Statement: []StatementEntry{
{
Effect: "Allow",
Action: []string{"s3:GetObject", "s3:PutObject"},
Resource: "arn:aws:s3:::"+executionId+"/*",
},
},
}
p, err := json.Marshal(policy)
if err != nil {
return "","",err
}
randAccess, randSecret := getRandomCreds()
req := madmin.AddServiceAccountReq{
Policy: p,
TargetUser: m.RootKey,
AccessKey: randAccess,
SecretKey: randSecret,
}
res, err := m.MinioAdminClient.AddServiceAccount(context.Background(), req)
if err != nil {
return "", "", err
}
return res.AccessKey, res.SecretKey, nil
}
func getRandomCreds() (string, string){
opts := randomstring.GenerationOptions{
Length: 20,
}
a, _ := randomstring.GenerateString(opts)
opts.Length = 40
s, _ := randomstring.GenerateString(opts)
return a,s
}
func (m *MinioService) CreateBucket(executionId string) error {
l := oclib.GetLogger()
cred := credentials.NewStaticV4(m.RootKey,m.RootSecret,"")
client, err := minio.New(m.Url, &minio.Options{
Creds: cred,
Secure: false,
})
if err != nil {
l.Error().Msg("Error when creating the minio client for the data plane")
return err
}
err = client.MakeBucket(context.Background(), executionId, minio.MakeBucketOptions{})
if err != nil {
l.Error().Msg("Error when creating the bucket for namespace " + executionId)
return err
}
l.Info().Msg("Created the bucket " + executionId + " on " + m.Url + " minio")
return nil
}

View File

@@ -0,0 +1,219 @@
package minio
import (
"context"
"encoding/json"
"fmt"
"oc-datacenter/conf"
oclib "cloud.o-forge.io/core/oc-lib"
"github.com/minio/madmin-go/v4"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"github.com/necmettindev/randomstring"
)
type MinioService struct {
Url string
RootKey string
RootSecret string
MinioAdminClient *madmin.AdminClient
}
type StatementEntry struct {
Effect string `json:"Effect"`
Action []string `json:"Action"`
Resource string `json:"Resource"`
}
type PolicyDocument struct {
Version string `json:"Version"`
Statement []StatementEntry `json:"Statement"`
}
func NewMinioService(url string) *MinioService {
return &MinioService{
Url: url,
RootKey: conf.GetConfig().MinioRootKey,
RootSecret: conf.GetConfig().MinioRootSecret,
}
}
func (m *MinioService) CreateClient() error {
cred := credentials.NewStaticV4(m.RootKey, m.RootSecret, "")
cli, err := madmin.NewWithOptions(m.Url, &madmin.Options{Creds: cred, Secure: false}) // Maybe in the future we should use the secure option ?
if err != nil {
return err
}
m.MinioAdminClient = cli
return nil
}
func (m *MinioService) CreateCredentials(executionId string) (string, string, error) {
policy := PolicyDocument{
Version: "2012-10-17",
Statement: []StatementEntry{
{
Effect: "Allow",
Action: []string{"s3:GetObject", "s3:PutObject"},
Resource: "arn:aws:s3:::" + executionId + "/*",
},
},
}
p, err := json.Marshal(policy)
if err != nil {
return "", "", err
}
randAccess, randSecret := getRandomCreds()
req := madmin.AddServiceAccountReq{
Policy: p,
TargetUser: m.RootKey,
AccessKey: randAccess,
SecretKey: randSecret,
}
res, err := m.MinioAdminClient.AddServiceAccount(context.Background(), req)
if err != nil {
return "", "", err
}
return res.AccessKey, res.SecretKey, nil
}
func getRandomCreds() (string, string) {
opts := randomstring.GenerationOptions{
Length: 20,
}
a, _ := randomstring.GenerateString(opts)
opts.Length = 40
s, _ := randomstring.GenerateString(opts)
return a, s
}
func (m *MinioService) CreateMinioConfigMap(minioID string, executionId string, url string) error {
config, err := rest.InClusterConfig()
if err != nil {
return err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return err
}
configMap := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: minioID + "artifact-repository",
Namespace: executionId,
},
Data: map[string]string{
minioID + "s3-local": fmt.Sprintf(`
s3:
bucket: %s
endpoint: %s
insecure: true
accessKeySecret:
name: %s-secret-s3
key: accesskey
secretKeySecret:
name: %s-secret-s3
key: secretkey
`, minioID+"-"+executionId, url, minioID, minioID),
},
}
existing, err := clientset.CoreV1().
ConfigMaps(executionId).
Get(context.Background(), minioID+"artifact-repository", metav1.GetOptions{})
if err == nil {
// Update
existing.Data = configMap.Data
_, err = clientset.CoreV1().
ConfigMaps(executionId).
Update(context.Background(), existing, metav1.UpdateOptions{})
} else {
// Create
_, err = clientset.CoreV1().
ConfigMaps(executionId).
Create(context.Background(), configMap, metav1.CreateOptions{})
}
return nil
}
func (m *MinioService) CreateBucket(minioID string, executionId string) error {
l := oclib.GetLogger()
cred := credentials.NewStaticV4(m.RootKey, m.RootSecret, "")
client, err := minio.New(m.Url, &minio.Options{
Creds: cred,
Secure: false,
})
if err != nil {
l.Error().Msg("Error when creating the minio client for the data plane")
return err
}
err = client.MakeBucket(context.Background(), minioID+"-"+executionId, minio.MakeBucketOptions{})
if err != nil {
l.Error().Msg("Error when creating the bucket for namespace " + executionId)
return err
}
l.Info().Msg("Created the bucket " + minioID + "-" + executionId + " on " + m.Url + " minio")
return nil
}
// DeleteCredentials revokes a scoped Minio service account by its access key.
func (m *MinioService) DeleteCredentials(accessKey string) error {
if err := m.MinioAdminClient.DeleteServiceAccount(context.Background(), accessKey); err != nil {
return fmt.Errorf("DeleteCredentials: %w", err)
}
return nil
}
// DeleteBucket removes the execution bucket from Minio.
func (m *MinioService) DeleteBucket(minioID, executionId string) error {
l := oclib.GetLogger()
cred := credentials.NewStaticV4(m.RootKey, m.RootSecret, "")
client, err := minio.New(m.Url, &minio.Options{Creds: cred, Secure: false})
if err != nil {
l.Error().Msg("Error when creating minio client for bucket deletion")
return err
}
bucketName := minioID + "-" + executionId
if err := client.RemoveBucket(context.Background(), bucketName); err != nil {
l.Error().Msg("Error when deleting bucket " + bucketName)
return err
}
l.Info().Msg("Deleted bucket " + bucketName + " on " + m.Url)
return nil
}
// DeleteMinioConfigMap removes the artifact-repository ConfigMap from the execution namespace.
func (m *MinioService) DeleteMinioConfigMap(minioID, executionId string) error {
cfg, err := rest.InClusterConfig()
if err != nil {
return err
}
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return err
}
return clientset.CoreV1().ConfigMaps(executionId).Delete(
context.Background(), minioID+"artifact-repository", metav1.DeleteOptions{},
)
}

View File

@@ -0,0 +1,297 @@
package minio
import (
"context"
"encoding/json"
"fmt"
"slices"
"oc-datacenter/conf"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/live"
"cloud.o-forge.io/core/oc-lib/tools"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// MinioCredentialEvent is the NATS payload used to transfer Minio credentials between peers.
//
// Two-phase protocol over PROPALGATION_EVENT (Action = PB_MINIO_CONFIG):
// - Phase 1 role assignment (Access == ""):
// oc-discovery routes this to the SOURCE peer (Minio host) → InitializeAsSource.
// - Phase 2 credential delivery (Access != ""):
// oc-discovery routes this to the TARGET peer (compute host) → InitializeAsTarget.
type MinioCredentialEvent struct {
ExecutionsID string `json:"executions_id"`
MinioID string `json:"minio_id"`
Access string `json:"access"`
Secret string `json:"secret"`
SourcePeerID string `json:"source_peer_id"`
DestPeerID string `json:"dest_peer_id"`
URL string `json:"url"`
// OriginID is the peer that initiated the provisioning request.
// The PB_CONSIDERS response is routed back to this peer.
OriginID string `json:"origin_id"`
}
// minioConsidersPayload is the PB_CONSIDERS payload emitted after minio provisioning.
type minioConsidersPayload struct {
OriginID string `json:"origin_id"`
ExecutionsID string `json:"executions_id"`
Secret string `json:"secret,omitempty"`
Error *string `json:"error,omitempty"`
}
// emitConsiders publishes a PB_CONSIDERS back to OriginID with the result of
// the minio provisioning. secret is the provisioned credential; err is nil on success.
func emitConsiders(executionsID, originID, secret string, provErr error) {
var errStr *string
if provErr != nil {
s := provErr.Error()
errStr = &s
}
payload, _ := json.Marshal(minioConsidersPayload{
OriginID: originID,
ExecutionsID: executionsID,
Secret: secret,
Error: errStr,
})
b, _ := json.Marshal(&tools.PropalgationMessage{
DataType: tools.STORAGE_RESOURCE.EnumIndex(),
Action: tools.PB_CONSIDERS,
Payload: payload,
})
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: -1,
Method: int(tools.PROPALGATION_EVENT),
Payload: b,
})
}
// MinioSetter carries the execution context for a Minio credential provisioning.
type MinioSetter struct {
ExecutionsID string // used as both the bucket name and the K8s namespace suffix
MinioID string // ID of the Minio storage resource
}
func NewMinioSetter(execID, minioID string) *MinioSetter {
return &MinioSetter{ExecutionsID: execID, MinioID: minioID}
}
// InitializeAsSource is called on the peer that hosts the Minio instance.
//
// It:
// 1. Looks up the live-storage endpoint URL for MinioID.
// 2. Creates a scoped service account (access + secret limited to the execution bucket).
// 3. Creates the execution bucket.
// 4. If source and dest are the same peer, calls InitializeAsTarget directly.
// Otherwise, publishes a MinioCredentialEvent via NATS (Phase 2) so that
// oc-discovery can route the credentials to the compute peer.
func (m *MinioSetter) InitializeAsSource(ctx context.Context, localPeerID, destPeerID, originID string) {
logger := oclib.GetLogger()
url, err := m.loadMinioURL(localPeerID)
if err != nil {
logger.Error().Msg("MinioSetter.InitializeAsSource: " + err.Error())
return
}
service := NewMinioService(url)
if err := service.CreateClient(); err != nil {
logger.Error().Msg("MinioSetter.InitializeAsSource: failed to create admin client: " + err.Error())
return
}
access, secret, err := service.CreateCredentials(m.ExecutionsID)
if err != nil {
logger.Error().Msg("MinioSetter.InitializeAsSource: failed to create service account: " + err.Error())
return
}
if err := service.CreateBucket(m.MinioID, m.ExecutionsID); err != nil {
logger.Error().Msg("MinioSetter.InitializeAsSource: failed to create bucket: " + err.Error())
return
}
logger.Info().Msg("MinioSetter.InitializeAsSource: bucket and service account ready for " + m.ExecutionsID)
event := MinioCredentialEvent{
ExecutionsID: m.ExecutionsID,
MinioID: m.MinioID,
Access: access,
Secret: secret,
SourcePeerID: localPeerID,
DestPeerID: destPeerID,
OriginID: originID,
}
if destPeerID == localPeerID {
// Same peer: store the secret locally without going through NATS.
m.InitializeAsTarget(ctx, event)
return
}
// Cross-peer: publish credentials (Phase 2) so oc-discovery routes them to the compute peer.
payload, err := json.Marshal(event)
if err != nil {
logger.Error().Msg("MinioSetter.InitializeAsSource: failed to marshal credential event: " + err.Error())
return
}
if b, err := json.Marshal(&tools.PropalgationMessage{
DataType: -1,
Action: tools.PB_MINIO_CONFIG,
Payload: payload,
}); err == nil {
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: -1,
User: "",
Method: int(tools.PROPALGATION_EVENT),
Payload: b,
})
logger.Info().Msg("MinioSetter.InitializeAsSource: credentials published via NATS for " + m.ExecutionsID)
}
}
// InitializeAsTarget is called on the peer that runs the compute workload.
//
// It stores the Minio credentials received from the source peer (via NATS or directly)
// as a Kubernetes secret inside the execution namespace, making them available to pods.
func (m *MinioSetter) InitializeAsTarget(ctx context.Context, event MinioCredentialEvent) {
logger := oclib.GetLogger()
k, err := tools.NewKubernetesService(
conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort,
conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData,
)
if err != nil {
logger.Error().Msg("MinioSetter.InitializeAsTarget: failed to create k8s service: " + err.Error())
return
}
if err := k.CreateSecret(ctx, event.MinioID, event.ExecutionsID, event.Access, event.Secret); err != nil {
logger.Error().Msg("MinioSetter.InitializeAsTarget: failed to create k8s secret: " + err.Error())
emitConsiders(event.ExecutionsID, event.OriginID, "", err)
return
}
if err := NewMinioService(event.URL).CreateMinioConfigMap(event.MinioID, event.ExecutionsID, event.URL); err == nil {
logger.Error().Msg("MinioSetter.InitializeAsTarget: failed to create config map: " + err.Error())
emitConsiders(event.ExecutionsID, event.OriginID, "", err)
return
}
logger.Info().Msg("MinioSetter.InitializeAsTarget: Minio credentials stored in namespace " + event.ExecutionsID)
emitConsiders(event.ExecutionsID, event.OriginID, event.Secret, nil)
}
// MinioDeleteEvent is the NATS payload used to tear down Minio resources.
// It mirrors MinioCredentialEvent but carries the access key for revocation.
type MinioDeleteEvent struct {
ExecutionsID string `json:"executions_id"`
MinioID string `json:"minio_id"`
Access string `json:"access"` // service account access key to revoke on the Minio host
SourcePeerID string `json:"source_peer_id"`
DestPeerID string `json:"dest_peer_id"`
OriginID string `json:"origin_id"`
}
// TeardownAsTarget is called on the peer that runs the compute workload.
// It reads the stored access key from the K8s secret, then removes both the secret
// and the artifact-repository ConfigMap from the execution namespace.
// For same-peer deployments it calls TeardownAsSource directly; otherwise it
// publishes a MinioDeleteEvent via NATS (PB_DELETE) so oc-discovery routes it to
// the Minio host peer.
func (m *MinioSetter) TeardownAsTarget(ctx context.Context, event MinioDeleteEvent) {
logger := oclib.GetLogger()
k, err := tools.NewKubernetesService(
conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort,
conf.GetConfig().KubeCA, conf.GetConfig().KubeCert, conf.GetConfig().KubeData,
)
if err != nil {
logger.Error().Msg("MinioSetter.TeardownAsTarget: failed to create k8s service: " + err.Error())
emitConsiders(event.ExecutionsID, event.OriginID, "", err)
return
}
// Read the access key from the K8s secret before deleting it.
accessKey := event.Access
if accessKey == "" {
if secret, err := k.Set.CoreV1().Secrets(event.ExecutionsID).Get(
ctx, event.MinioID+"-secret-s3", metav1.GetOptions{},
); err == nil {
accessKey = string(secret.Data["access-key"])
}
}
// Delete K8s credentials secret.
if err := k.Set.CoreV1().Secrets(event.ExecutionsID).Delete(
ctx, event.MinioID+"-secret-s3", metav1.DeleteOptions{},
); err != nil {
logger.Error().Msg("MinioSetter.TeardownAsTarget: failed to delete secret: " + err.Error())
}
// Delete artifact-repository ConfigMap.
if err := NewMinioService("").DeleteMinioConfigMap(event.MinioID, event.ExecutionsID); err != nil {
logger.Error().Msg("MinioSetter.TeardownAsTarget: failed to delete configmap: " + err.Error())
}
logger.Info().Msg("MinioSetter.TeardownAsTarget: K8s resources removed for " + event.ExecutionsID)
// For same-peer deployments the source cleanup runs directly here so the
// caller (REMOVE_EXECUTION handler) doesn't have to distinguish roles.
if event.SourcePeerID == event.DestPeerID {
event.Access = accessKey
m.TeardownAsSource(ctx, event)
}
}
// TeardownAsSource is called on the peer that hosts the Minio instance.
// It revokes the scoped service account and removes the execution bucket.
func (m *MinioSetter) TeardownAsSource(ctx context.Context, event MinioDeleteEvent) {
logger := oclib.GetLogger()
url, err := m.loadMinioURL(event.SourcePeerID)
if err != nil {
logger.Error().Msg("MinioSetter.TeardownAsSource: " + err.Error())
return
}
svc := NewMinioService(url)
if err := svc.CreateClient(); err != nil {
logger.Error().Msg("MinioSetter.TeardownAsSource: failed to create admin client: " + err.Error())
return
}
if event.Access != "" {
if err := svc.DeleteCredentials(event.Access); err != nil {
logger.Error().Msg("MinioSetter.TeardownAsSource: failed to delete service account: " + err.Error())
}
}
if err := svc.DeleteBucket(event.MinioID, event.ExecutionsID); err != nil {
logger.Error().Msg("MinioSetter.TeardownAsSource: failed to delete bucket: " + err.Error())
}
logger.Info().Msg("MinioSetter.TeardownAsSource: Minio resources removed for " + event.ExecutionsID)
}
// loadMinioURL searches through all live storages accessible by peerID to find
// the one that references MinioID, and returns its endpoint URL.
func (m *MinioSetter) loadMinioURL(peerID string) (string, error) {
res := oclib.NewRequest(oclib.LibDataEnum(oclib.LIVE_STORAGE), "", peerID, []string{}, nil).LoadAll(false)
if res.Err != "" {
return "", fmt.Errorf("loadMinioURL: failed to load live storages: %s", res.Err)
}
for _, dbo := range res.Data {
l := dbo.(*live.LiveStorage)
if slices.Contains(l.ResourcesID, m.MinioID) {
return l.Source, nil
}
}
return "", fmt.Errorf("loadMinioURL: no live storage found for minio ID %s", m.MinioID)
}

View File

@@ -0,0 +1,100 @@
# Analyse de `infrastructure/prometheus.go`
## Ce que fait le fichier
Ce fichier implémente un service de monitoring qui interroge une instance **Prometheus** pour collecter des métriques de conteneurs Kubernetes associés à une réservation (Booking).
### Structures de données
| Struct | Role |
|---|---|
| `MetricsSnapshot` | Snapshot de métriques associé à une origine (source). **Note : cette struct locale est déclarée mais jamais utilisée** — le code utilise en réalité `models.MetricsSnapshot` de oc-lib. |
| `Metric` | Paire nom/valeur d'une métrique. **Même remarque** — le code utilise `models.Metric`. |
| `PrometheusResponse` | Mapping de la réponse JSON de l'API Prometheus `/api/v1/query`. |
### Métriques collectées (`queriesMetrics`)
| # | Requête PromQL | Mesure |
|---|---|---|
| 1 | `rate(container_cpu_usage_seconds_total{namespace}[1m]) * 100` | Utilisation CPU (%) |
| 2 | `container_memory_usage_bytes{namespace}` | Mémoire utilisée (bytes) |
| 3 | `container_fs_usage_bytes / container_fs_limit_bytes * 100` | Utilisation disque (%) |
| 4 | `DCGM_FI_DEV_GPU_UTIL{namespace}` | Utilisation GPU (NVIDIA DCGM) |
| 5 | `rate(container_fs_reads_bytes_total[1m])` | Débit lecture disque (bytes/s) |
| 6 | `rate(container_fs_writes_bytes_total[1m])` | Débit écriture disque (bytes/s) |
| 7 | `rate(container_network_receive_bytes_total[1m])` | Bande passante réseau entrante (bytes/s) |
| 8 | `rate(container_network_transmit_bytes_total[1m])` | Bande passante réseau sortante (bytes/s) |
| 9 | `rate(http_requests_total[1m])` | Requêtes HTTP/s |
| 10 | `rate(http_requests_total{status=~"5.."}[1m]) / rate(http_requests_total[1m]) * 100` | Taux d'erreur HTTP 5xx (%) |
Métriques commentées (non actives) : `system_load_average`, `system_network_latency_ms`, `app_mean_time_to_repair_seconds`, `app_mean_time_between_failure_seconds`.
### Méthodes
#### `queryPrometheus(promURL, expr, namespace) Metric`
- Construit une requête GET vers `/api/v1/query` de Prometheus.
- Injecte le namespace dans l'expression PromQL via `fmt.Sprintf`.
- Parse la réponse JSON et extrait la première valeur du premier résultat.
- Retourne `-1` si aucun résultat.
#### `Call(book, user, peerID, groups) (Booking, map[string]MetricsSnapshot)`
- Charge la ressource de calcul (`ComputeResource`) liée au booking.
- Pour chaque instance de la ressource, cherche le `LiveDatacenter` correspondant.
- Lance en **goroutine** (parallèle) l'exécution de toutes les requêtes PromQL pour chaque datacenter ayant un `MonitorPath`.
- Attend toutes les goroutines (`sync.WaitGroup`), puis retourne les métriques groupées par instance.
#### `Stream(bookingID, interval, user, peerID, groups, websocket)`
- Boucle de monitoring en continu jusqu'à `ExpectedEndDate` du booking ou signal de kill.
- A chaque tick (`interval`), appelle `Call()` dans une goroutine.
- Envoie les métriques en temps réel via **WebSocket**.
- Accumule les métriques en mémoire et les persiste dans le booking tous les `max` (100) cycles.
- Supporte un mécanisme de kill via la variable globale `Kill`.
---
## Problemes et points d'attention
### Bugs potentiels
1. **Race condition dans `Stream`** — Les variables `mets`, `bookIDS`, `book` sont partagées entre la boucle principale et les goroutines lancées à chaque tick, **sans synchronisation** (pas de mutex). Si `interval` est court, plusieurs goroutines peuvent écrire simultanément dans `mets` et `bookIDS`.
2. **Race condition sur `Kill`** — La variable globale `Kill` est lue dans la boucle sans verrouiller `LockKill`. Le mutex n'est utilisé que pour l'écriture.
3. **Structs locales inutilisées**`MetricsSnapshot` et `Metric` (lignes 22-31) sont déclarées localement mais le code utilise `models.MetricsSnapshot` et `models.Metric`. Code mort à nettoyer.
4. **Requête PromQL avec double placeholder** — La requête filesystem (ligne 47) contient deux `%s` mais `queryPrometheus` ne fait qu'un seul `fmt.Sprintf(expr, namespace)`. Cela provoque un **`%!s(MISSING)`** dans la requête. Il faut passer le namespace deux fois ou réécrire la fonction.
5. **Pas de timeout HTTP**`http.Get()` utilise le client par défaut sans timeout. Un Prometheus lent peut bloquer indéfiniment.
6. **Pas de gestion d'erreur sur `WriteJSON`** — Si le WebSocket est fermé côté client, l'écriture échoue silencieusement.
### Améliorations possibles
#### Fiabilité
- **Ajouter un `context.Context`** à `queryPrometheus` et `Call` pour supporter les timeouts et l'annulation.
- **Utiliser un `http.Client` avec timeout** au lieu de `http.Get`.
- **Protéger les accès concurrents** dans `Stream` avec un `sync.Mutex` sur `mets`/`bookIDS`.
- **Remplacer la variable globale `Kill`** par un `context.WithCancel` ou un channel, plus idiomatique en Go.
#### Métriques supplémentaires envisageables
- `container_cpu_cfs_throttled_seconds_total` — Throttling CPU (le container est bridé).
- `kube_pod_container_status_restarts_total` — Nombre de restarts (instabilité).
- `container_memory_working_set_bytes` — Mémoire réelle utilisée (exclut le cache, plus précis que `memory_usage_bytes`).
- `kube_pod_status_phase` — Phase du pod (Running, Pending, Failed...).
- `container_oom_events_total` ou `kube_pod_container_status_last_terminated_reason` — Détection des OOM kills.
- `kubelet_volume_stats_used_bytes` / `kubelet_volume_stats_capacity_bytes` — Utilisation des PVC.
- `DCGM_FI_DEV_MEM_COPY_UTIL` — Utilisation mémoire GPU.
- `DCGM_FI_DEV_GPU_TEMP` — Température GPU.
- `node_cpu_seconds_total` / `node_memory_MemAvailable_bytes` — Métriques au niveau du noeud (vue globale).
#### Architecture
- **Range queries** (`/api/v1/query_range`) — Actuellement seul l'instant query est utilisé. Pour le streaming sur une période, `query_range` permettrait de récupérer des séries temporelles complètes et de calculer des moyennes/percentiles.
- **Labels dans les résultats** — Actuellement seule la première série est lue (`Result[0]`). On perd l'information si plusieurs pods/containers matchent. Agréger ou renvoyer toutes les séries.
- **Noms de métriques lisibles** — Mapper les expressions PromQL vers des noms humains (`cpu_usage_percent`, `memory_bytes`, etc.) au lieu de stocker l'expression brute comme nom.
- **Health check Prometheus** — Ajouter une méthode pour vérifier que Prometheus est accessible (`/-/healthy`).
---
## Résumé
Le fichier est **fonctionnel** pour un cas d'usage basique (collecte one-shot + streaming WebSocket), mais présente des **race conditions** dans `Stream`, un **bug sur la requête filesystem** (double `%s`), et du **code mort**. Les améliorations prioritaires sont la correction des accès concurrents et l'ajout de timeouts HTTP.

158
infrastructure/nats.go Normal file
View File

@@ -0,0 +1,158 @@
package infrastructure
import (
"context"
"encoding/json"
"oc-datacenter/infrastructure/minio"
"sync"
"cloud.o-forge.io/core/oc-lib/tools"
)
// roleWaiters maps executionID → channel expecting the role-assignment message from OC discovery.
var roleWaiters sync.Map
// ArgoKubeEvent carries the peer-routing metadata for a resource provisioning event.
//
// When MinioID is non-empty the event concerns Minio credential provisioning;
// otherwise it concerns Admiralty kubeconfig provisioning.
type ArgoKubeEvent struct {
ExecutionsID string `json:"executions_id"`
DestPeerID string `json:"dest_peer_id"`
Type tools.DataType `json:"data_type"`
SourcePeerID string `json:"source_peer_id"`
MinioID string `json:"minio_id,omitempty"`
// OriginID is the peer that initiated the request; the PB_CONSIDERS
// response is routed back to this peer once provisioning completes.
OriginID string `json:"origin_id,omitempty"`
}
// ListenNATS starts all NATS subscriptions for the infrastructure layer.
// Must be launched in a goroutine from main.
func ListenNATS() {
tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
// ─── ARGO_KUBE_EVENT ────────────────────────────────────────────────────────
// Triggered by oc-discovery to notify this peer of a provisioning task.
// Dispatches to Admiralty or Minio based on whether MinioID is set.
tools.ARGO_KUBE_EVENT: func(resp tools.NATSResponse) {
argo := &ArgoKubeEvent{}
if err := json.Unmarshal(resp.Payload, argo); err != nil {
return
}
if argo.Type == tools.STORAGE_RESOURCE {
// ── Minio credential provisioning ──────────────────────────────
setter := minio.NewMinioSetter(argo.ExecutionsID, argo.MinioID)
if argo.SourcePeerID == argo.DestPeerID {
// Same peer: source creates credentials and immediately stores them.
go setter.InitializeAsSource(context.Background(), argo.SourcePeerID, argo.DestPeerID, argo.OriginID)
} else {
// Different peers: publish Phase-1 PB_MINIO_CONFIG (Access == "")
// so oc-discovery routes the role-assignment to the Minio host.
phase1 := minio.MinioCredentialEvent{
ExecutionsID: argo.ExecutionsID,
MinioID: argo.MinioID,
SourcePeerID: argo.SourcePeerID,
DestPeerID: argo.DestPeerID,
OriginID: argo.OriginID,
}
if b, err := json.Marshal(phase1); err == nil {
if b2, err := json.Marshal(&tools.PropalgationMessage{
Payload: b,
Action: tools.PB_MINIO_CONFIG,
}); err == nil {
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: -1,
User: resp.User,
Method: int(tools.PROPALGATION_EVENT),
Payload: b2,
})
}
}
}
} else {
// ── Admiralty kubeconfig provisioning (existing behaviour) ──────
if argo.SourcePeerID == argo.DestPeerID {
go NewAdmiraltySetter(argo.ExecutionsID).InitializeAsSource(
context.Background(), argo.SourcePeerID, argo.DestPeerID, argo.OriginID)
} else if b, err := json.Marshal(argo); err == nil {
if b2, err := json.Marshal(&tools.PropalgationMessage{
Payload: b,
Action: tools.PB_ADMIRALTY_CONFIG,
}); err == nil {
go tools.NewNATSCaller().SetNATSPub(tools.PROPALGATION_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Datatype: -1,
User: resp.User,
Method: int(tools.PROPALGATION_EVENT),
Payload: b2,
})
}
}
}
},
// ─── PROPALGATION_EVENT ─────────────────────────────────────────────────────
// Routes messages forwarded by oc-discovery to the right handler.
tools.PROPALGATION_EVENT: func(resp tools.NATSResponse) {
if resp.FromApp != "oc-discovery" {
return
}
var prop tools.PropalgationMessage
if err := json.Unmarshal(resp.Payload, &prop); err != nil {
return
}
switch prop.Action {
// ── Admiralty ──────────────────────────────────────────────────────
case tools.PB_ADMIRALTY_CONFIG:
kubeconfigEvent := KubeconfigEvent{}
if err := json.Unmarshal(prop.Payload, &kubeconfigEvent); err == nil {
if kubeconfigEvent.Kubeconfig != "" {
// Phase 2: kubeconfig present → this peer is the TARGET (scheduler).
NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsTarget(
context.Background(), kubeconfigEvent)
} else {
// Phase 1: no kubeconfig → this peer is the SOURCE (compute).
NewAdmiraltySetter(kubeconfigEvent.ExecutionsID).InitializeAsSource(
context.Background(), kubeconfigEvent.SourcePeerID, kubeconfigEvent.DestPeerID, kubeconfigEvent.OriginID)
}
}
// ── Minio ──────────────────────────────────────────────────────────
case tools.PB_MINIO_CONFIG:
minioEvent := minio.MinioCredentialEvent{}
if err := json.Unmarshal(prop.Payload, &minioEvent); err == nil {
if minioEvent.Access != "" {
// Phase 2: credentials present → this peer is the TARGET (compute).
minio.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsTarget(
context.Background(), minioEvent)
} else {
// Phase 1: no credentials → this peer is the SOURCE (Minio host).
minio.NewMinioSetter(minioEvent.ExecutionsID, minioEvent.MinioID).InitializeAsSource(
context.Background(), minioEvent.SourcePeerID, minioEvent.DestPeerID, minioEvent.OriginID)
}
}
// ── Deletion (routed by oc-discovery to the source peer) ───────────
case tools.PB_DELETE:
argo := &ArgoKubeEvent{}
if err := json.Unmarshal(prop.Payload, argo); err != nil || argo.ExecutionsID == "" {
return
}
if argo.Type == tools.STORAGE_RESOURCE {
// Minio source teardown: revoke credentials + delete bucket.
deleteEvent := minio.MinioDeleteEvent{}
if err := json.Unmarshal(prop.Payload, &deleteEvent); err == nil {
go minio.NewMinioSetter(deleteEvent.ExecutionsID, deleteEvent.MinioID).
TeardownAsSource(context.Background(), deleteEvent)
}
} else {
// Admiralty source teardown: delete AdmiraltySource + namespace.
go NewAdmiraltySetter(argo.ExecutionsID).TeardownAsSource(context.Background())
}
}
},
})
}