Datacenter Update to Ws

This commit is contained in:
mr
2026-04-09 07:49:35 +02:00
parent c87245e83f
commit 74919994c2
20 changed files with 633 additions and 922 deletions

View File

@@ -274,7 +274,7 @@ func provisionPVCsForTarget(ctx context.Context, targetNS string, sourceExecutio
"executions_id": {{Operator: dbs.EQUAL.String(), Value: sourceExecutionsID}},
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}},
},
}, "", false)
}, "", false, 0, 1000)
if res.Err != "" || len(res.Data) == 0 {
return
@@ -424,7 +424,7 @@ func (s *AdmiraltySetter) TeardownIfRemote(exec *workflow_execution.WorkflowExec
"executions_id": {{Operator: dbs.EQUAL.String(), Value: exec.ExecutionsID}},
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.COMPUTE_RESOURCE.EnumIndex()}},
},
}, "", false)
}, "", false, 0, 1000)
if res.Err != "" || len(res.Data) == 0 {
return

View File

@@ -29,7 +29,7 @@ func BootstrapAllowedImages() {
And: map[string][]dbs.Filter{
"image": {{Operator: dbs.EQUAL.String(), Value: img.Image}},
},
}, "", false)
}, "", false, 0, 1)
if existing.Err != "" || len(existing.Data) > 0 {
continue // déjà présente ou erreur de recherche : on passe
}

View File

@@ -1,112 +0,0 @@
package infrastructure
import (
"encoding/json"
"fmt"
"sync"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/tools"
"go.mongodb.org/mongo-driver/bson/primitive"
)
// processedBookings tracks booking IDs already handled this process lifetime.
var processedBookings sync.Map
// closingStates is the set of terminal booking states.
var ClosingStates = map[enum.BookingStatus]bool{
enum.FAILURE: true,
enum.SUCCESS: true,
enum.FORGOTTEN: true,
enum.CANCELLED: true,
}
// WatchBookings is a safety-net fallback for when oc-monitord fails to launch.
// It detects bookings that are past expected_start_date by at least 1 minute and
// are still in a non-terminal state. Instead of writing to the database directly,
// it emits WORKFLOW_STEP_DONE_EVENT with State=FAILURE on NATS so that oc-scheduler
// handles the state transition — keeping a single source of truth for booking state.
//
// Must be launched in a goroutine from main.
func WatchBookings() {
logger := oclib.GetLogger()
logger.Info().Msg("BookingWatchdog: started")
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
if err := scanStaleBookings(); err != nil {
logger.Error().Msg("BookingWatchdog: " + err.Error())
}
}
}
// scanStaleBookings queries all bookings whose ExpectedStartDate passed more than
// 1 minute ago. Non-terminal ones get a WORKFLOW_STEP_DONE_EVENT FAILURE emitted
// on NATS so oc-scheduler closes them.
func scanStaleBookings() error {
myself, err := oclib.GetMySelf()
if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err)
}
peerID := myself.GetID()
deadline := time.Now().UTC().Add(-time.Minute)
res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"expected_start_date": {{
Operator: dbs.LTE.String(),
Value: primitive.NewDateTimeFromTime(deadline),
}},
},
}, "", false)
if res.Err != "" {
return fmt.Errorf("stale booking search failed: %s", res.Err)
}
for _, dbo := range res.Data {
b, ok := dbo.(*bookingmodel.Booking)
if !ok {
continue
}
go emitWatchdogFailure(b)
}
return nil
}
// emitWatchdogFailure publishes a WORKFLOW_STEP_DONE_EVENT FAILURE for a stale
// booking. oc-scheduler is the single authority for booking state transitions.
func emitWatchdogFailure(b *bookingmodel.Booking) {
logger := oclib.GetLogger()
if _, done := processedBookings.Load(b.GetID()); done {
return
}
if ClosingStates[b.State] {
processedBookings.Store(b.GetID(), struct{}{})
return
}
now := time.Now().UTC()
payload, err := json.Marshal(tools.WorkflowLifecycleEvent{
BookingID: b.GetID(),
State: enum.FAILURE.EnumIndex(),
RealEnd: &now,
})
if err != nil {
return
}
tools.NewNATSCaller().SetNATSPub(tools.WORKFLOW_STEP_DONE_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Method: int(tools.WORKFLOW_STEP_DONE_EVENT),
Payload: payload,
})
logger.Info().Msgf("BookingWatchdog: booking %s stale → emitting FAILURE", b.GetID())
processedBookings.Store(b.GetID(), struct{}{})
}

View File

@@ -152,7 +152,7 @@ func (s *KubernetesService) filterNonAllowed(images []string) []string {
And: map[string][]dbs.Filter{
"image": {{Operator: dbs.EQUAL.String(), Value: name}},
},
}, "", false)
}, "", false, 0, 1000)
if len(res.Data) == 0 {
toRemove = append(toRemove, img)

View File

@@ -57,7 +57,7 @@ func Call(book *booking.Booking,
"source": {{Operator: dbs.EQUAL.String(), Value: instance.Source}},
"abstractlive.resources_id": {{Operator: dbs.EQUAL.String(), Value: computeRes.GetID()}},
},
}, "", false)
}, "", false, 0, 1000)
if res.Err != "" {
continue
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"oc-datacenter/infrastructure"
"oc-datacenter/infrastructure/admiralty"
"oc-datacenter/infrastructure/kubernetes"
"oc-datacenter/infrastructure/kubernetes/models"
@@ -237,7 +238,7 @@ func ListenNATS() {
if err := json.Unmarshal(resp.Payload, &evt); err != nil || evt.ExecutionsID == "" {
return
}
go kubernetes.NewKubernetesService(evt.ExecutionsID).TeardownForExecution(evt.ExecutionID)
go infrastructure.TeardownForExecution(evt.ExecutionID, evt.ExecutionsID)
},
// ─── REMOVE_RESOURCE ────────────────────────────────────────────────────────

View File

@@ -298,7 +298,7 @@ func (m *MinioSetter) TeardownAsSource(ctx context.Context, event MinioDeleteEve
// loadMinioURL searches through all live storages accessible by peerID to find
// the one that references MinioID, and returns its endpoint URL.
func (m *MinioSetter) loadMinioURL(peerID string) (string, error) {
res := oclib.NewRequest(oclib.LibDataEnum(oclib.LIVE_STORAGE), "", peerID, []string{}, nil).LoadAll(false)
res := oclib.NewRequest(oclib.LibDataEnum(oclib.LIVE_STORAGE), "", peerID, []string{}, nil).LoadAll(false, 0, 10000)
if res.Err != "" {
return "", fmt.Errorf("loadMinioURL: failed to load live storages: %s", res.Err)
}
@@ -324,7 +324,7 @@ func (m *MinioSetter) TeardownForExecution(ctx context.Context, localPeerID stri
"executions_id": {{Operator: dbs.EQUAL.String(), Value: m.ExecutionsID}},
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}},
},
}, "", false)
}, "", false, 0, 10000)
if res.Err != "" || len(res.Data) == 0 {
return

View File

@@ -160,7 +160,7 @@ func (p *PVCSetter) TeardownAsSource(ctx context.Context, event PVCDeleteEvent)
// ResolveStorageName returns the live storage name for a given storageID, or "" if not found.
func ResolveStorageName(storageID, peerID string) string {
res := oclib.NewRequest(oclib.LibDataEnum(oclib.LIVE_STORAGE), "", peerID, []string{}, nil).LoadAll(false)
res := oclib.NewRequest(oclib.LibDataEnum(oclib.LIVE_STORAGE), "", peerID, []string{}, nil).LoadAll(false, 0, 10000)
if res.Err != "" {
return ""
}
@@ -175,7 +175,7 @@ func ResolveStorageName(storageID, peerID string) string {
// loadStorageSize looks up the SizeGB for this storage in live storages.
func (p *PVCSetter) loadStorageSize(peerID string) (string, error) {
res := oclib.NewRequest(oclib.LibDataEnum(oclib.LIVE_STORAGE), "", peerID, []string{}, nil).LoadAll(false)
res := oclib.NewRequest(oclib.LibDataEnum(oclib.LIVE_STORAGE), "", peerID, []string{}, nil).LoadAll(false, 0, 10000)
if res.Err != "" {
return "", fmt.Errorf("loadStorageSize: %s", res.Err)
}
@@ -199,7 +199,7 @@ func (p *PVCSetter) TeardownForExecution(ctx context.Context, localPeerID string
"executions_id": {{Operator: dbs.EQUAL.String(), Value: p.ExecutionsID}},
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}},
},
}, "", false)
}, "", false, 0, 10000)
if res.Err != "" || len(res.Data) == 0 {
return

View File

@@ -1,27 +1,126 @@
package kubernetes
package infrastructure
import (
"context"
"encoding/json"
"fmt"
"oc-datacenter/conf"
"oc-datacenter/infrastructure/admiralty"
"oc-datacenter/infrastructure/kubernetes"
"oc-datacenter/infrastructure/storage"
"regexp"
"strings"
"sync"
"time"
"oc-datacenter/conf"
"oc-datacenter/infrastructure"
"oc-datacenter/infrastructure/admiralty"
"oc-datacenter/infrastructure/storage"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
"go.mongodb.org/mongo-driver/bson/primitive"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// uuidNsPattern matches Kubernetes namespace names that are execution UUIDs.
// processedBookings tracks booking IDs already handled this process lifetime.
var processedBookings sync.Map
// closingStates is the set of terminal booking states.
var ClosingStates = map[enum.BookingStatus]bool{
enum.FAILURE: true,
enum.SUCCESS: true,
enum.FORGOTTEN: true,
enum.CANCELLED: true,
}
// WatchBookings is a safety-net fallback for when oc-monitord fails to launch.
// It detects bookings that are past expected_start_date by at least 1 minute and
// are still in a non-terminal state. Instead of writing to the database directly,
// it emits WORKFLOW_STEP_DONE_EVENT with State=FAILURE on NATS so that oc-scheduler
// handles the state transition — keeping a single source of truth for booking state.
//
// Must be launched in a goroutine from main.
func WatchBookings() {
logger := oclib.GetLogger()
logger.Info().Msg("BookingWatchdog: started")
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
if err := scanStaleBookings(); err != nil {
logger.Error().Msg("BookingWatchdog: " + err.Error())
}
}
}
// scanStaleBookings queries all bookings whose ExpectedStartDate passed more than
// 1 minute ago. Non-terminal ones get a WORKFLOW_STEP_DONE_EVENT FAILURE emitted
// on NATS so oc-scheduler closes them.
func scanStaleBookings() error {
myself, err := oclib.GetMySelf()
if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err)
}
peerID := myself.GetID()
deadline := time.Now().UTC().Add(-time.Minute)
res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"expected_start_date": {{
Operator: dbs.LTE.String(),
Value: primitive.NewDateTimeFromTime(deadline),
}},
},
}, "", false, 0, 10000)
if res.Err != "" {
return fmt.Errorf("stale booking search failed: %s", res.Err)
}
for _, dbo := range res.Data {
b, ok := dbo.(*bookingmodel.Booking)
if !ok {
continue
}
go emitWatchdogFailure(b)
}
return nil
}
// emitWatchdogFailure publishes a WORKFLOW_STEP_DONE_EVENT FAILURE for a stale
// booking. oc-scheduler is the single authority for booking state transitions.
func emitWatchdogFailure(b *bookingmodel.Booking) {
logger := oclib.GetLogger()
if _, done := processedBookings.Load(b.GetID()); done {
return
}
if ClosingStates[b.State] {
processedBookings.Store(b.GetID(), struct{}{})
return
}
now := time.Now().UTC()
payload, err := json.Marshal(tools.WorkflowLifecycleEvent{
BookingID: b.GetID(),
State: enum.FAILURE.EnumIndex(),
RealEnd: &now,
})
if err != nil {
return
}
tools.NewNATSCaller().SetNATSPub(tools.WORKFLOW_STEP_DONE_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Method: int(tools.WORKFLOW_STEP_DONE_EVENT),
Payload: payload,
})
logger.Info().Msgf("BookingWatchdog: booking %s stale → emitting FAILURE", b.GetID())
processedBookings.Store(b.GetID(), struct{}{})
}
var uuidNsPattern = regexp.MustCompile(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`)
// WatchInfra is a safety-net watchdog that periodically scans Kubernetes for
@@ -30,22 +129,22 @@ var uuidNsPattern = regexp.MustCompile(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-
// missed due to oc-monitord or oc-datacenter crash/restart).
//
// Must be launched in a goroutine from main.
func (s *KubernetesService) Watch() {
func Watch() {
logger := oclib.GetLogger()
logger.Info().Msg("InfraWatchdog: started")
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
if err := s.scanOrphaned(); err != nil {
if err := scanOrphaned(); err != nil {
logger.Error().Msg("InfraWatchdog: " + err.Error())
}
if err := s.scanOrphanedMinio(); err != nil {
if err := scanOrphanedMinio(); err != nil {
logger.Error().Msg("InfraWatchdog(minio): " + err.Error())
}
if err := s.scanOrphanedAdmiraltyNodes(); err != nil {
if err := scanOrphanedAdmiraltyNodes(); err != nil {
logger.Error().Msg("InfraWatchdog(admiralty-nodes): " + err.Error())
}
if err := s.scanOrphanedPVC(); err != nil {
if err := scanOrphanedPVC(); err != nil {
logger.Error().Msg("InfraWatchdog(pvc): " + err.Error())
}
}
@@ -54,7 +153,7 @@ func (s *KubernetesService) Watch() {
// scanOrphanedInfra lists all UUID-named Kubernetes namespaces, looks up their
// WorkflowExecution in the DB, and triggers teardown for any that are in a
// terminal state. Namespaces already in Terminating phase are skipped.
func (s *KubernetesService) scanOrphaned() error {
func scanOrphaned() error {
logger := oclib.GetLogger()
serv, err := tools.NewKubernetesService(
@@ -98,7 +197,7 @@ func (s *KubernetesService) scanOrphaned() error {
logger.Info().Msgf("InfraWatchdog: orphaned infra detected for execution %s (state=%v) → teardown",
executionsID, exec.State)
go s.TeardownForExecution(exec.GetID())
go TeardownForExecution(exec.GetID(), exec.ExecutionsID)
}
return nil
}
@@ -107,7 +206,7 @@ func (s *KubernetesService) scanOrphaned() error {
// terminal state and triggers Minio teardown for each unique executionsID found.
// This covers the case where the Kubernetes namespace is already gone (manual
// deletion, prior partial teardown) but Minio SA and bucket were never revoked.
func (s *KubernetesService) scanOrphanedMinio() error {
func scanOrphanedMinio() error {
logger := oclib.GetLogger()
myself, err := oclib.GetMySelf()
@@ -121,7 +220,7 @@ func (s *KubernetesService) scanOrphanedMinio() error {
And: map[string][]dbs.Filter{
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}},
},
}, "", false)
}, "", false, 0, 10000)
if res.Err != "" {
return fmt.Errorf("failed to search LIVE_STORAGE bookings: %s", res.Err)
@@ -175,7 +274,7 @@ func (s *KubernetesService) scanOrphanedMinio() error {
// This covers the gap where the namespace is already gone (or Terminating) but
// the virtual node was never cleaned up by the Admiralty controller — which can
// happen when the node goes NotReady before the AdmiraltyTarget CRD is deleted.
func (s *KubernetesService) scanOrphanedAdmiraltyNodes() error {
func scanOrphanedAdmiraltyNodes() error {
logger := oclib.GetLogger()
serv, err := tools.NewKubernetesService(
@@ -251,7 +350,7 @@ func (s *KubernetesService) scanOrphanedAdmiraltyNodes() error {
//
// A LIVE_STORAGE booking is treated as a local PVC only when ResolveStorageName
// returns a non-empty name — the same guard used by teardownPVCForExecution.
func (s *KubernetesService) scanOrphanedPVC() error {
func scanOrphanedPVC() error {
logger := oclib.GetLogger()
myself, err := oclib.GetMySelf()
@@ -265,7 +364,7 @@ func (s *KubernetesService) scanOrphanedPVC() error {
And: map[string][]dbs.Filter{
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}},
},
}, "", false)
}, "", false, 0, 10000)
if res.Err != "" {
return fmt.Errorf("failed to search LIVE_STORAGE bookings: %s", res.Err)
@@ -314,7 +413,7 @@ func findTerminalExecution(executionsID string, peerID string) *workflow_executi
And: map[string][]dbs.Filter{
"executions_id": {{Operator: dbs.EQUAL.String(), Value: executionsID}},
},
}, "", false)
}, "", false, 0, 10000)
if res.Err != "" || len(res.Data) == 0 {
return nil
@@ -325,7 +424,7 @@ func findTerminalExecution(executionsID string, peerID string) *workflow_executi
return nil
}
if !infrastructure.ClosingStates[exec.State] {
if !ClosingStates[exec.State] {
return nil
}
return exec
@@ -334,7 +433,7 @@ func findTerminalExecution(executionsID string, peerID string) *workflow_executi
// teardownInfraForExecution handles infrastructure cleanup when a workflow terminates.
// oc-datacenter is responsible only for infra here — booking/execution state
// is managed by oc-scheduler.
func (s *KubernetesService) TeardownForExecution(executionID string) {
func TeardownForExecution(executionID string, executionsID string) {
logger := oclib.GetLogger()
myself, err := oclib.GetMySelf()
@@ -352,8 +451,8 @@ func (s *KubernetesService) TeardownForExecution(executionID string) {
exec := res.(*workflow_execution.WorkflowExecution)
ctx := context.Background()
admiralty.NewAdmiraltySetter(s.ExecutionsID).TeardownIfRemote(exec, selfPeerID)
storage.NewMinioSetter(s.ExecutionsID, "").TeardownForExecution(ctx, selfPeerID)
storage.NewPVCSetter(s.ExecutionsID, "").TeardownForExecution(ctx, selfPeerID)
s.CleanupImages(ctx)
admiralty.NewAdmiraltySetter(executionsID).TeardownIfRemote(exec, selfPeerID)
storage.NewMinioSetter(executionsID, "").TeardownForExecution(ctx, selfPeerID)
storage.NewPVCSetter(executionsID, "").TeardownForExecution(ctx, selfPeerID)
kubernetes.NewKubernetesService(executionsID).CleanupImages(ctx)
}