Files
oc-datacenter/infrastructure/watchdog.go

459 lines
14 KiB
Go
Raw Normal View History

2026-04-09 07:49:35 +02:00
package infrastructure
2026-03-24 10:50:36 +01:00
import (
"context"
2026-04-09 07:49:35 +02:00
"encoding/json"
2026-03-24 10:50:36 +01:00
"fmt"
"oc-datacenter/conf"
"oc-datacenter/infrastructure/admiralty"
2026-04-09 07:49:35 +02:00
"oc-datacenter/infrastructure/kubernetes"
2026-03-24 10:50:36 +01:00
"oc-datacenter/infrastructure/storage"
2026-04-09 07:49:35 +02:00
"regexp"
"strings"
"sync"
"time"
2026-03-24 10:50:36 +01:00
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking"
2026-04-09 07:49:35 +02:00
"cloud.o-forge.io/core/oc-lib/models/common/enum"
2026-03-24 10:50:36 +01:00
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
2026-04-09 07:49:35 +02:00
"go.mongodb.org/mongo-driver/bson/primitive"
2026-03-24 10:50:36 +01:00
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
2026-04-09 07:49:35 +02:00
// processedBookings tracks booking IDs already handled this process lifetime.
var processedBookings sync.Map
// closingStates is the set of terminal booking states.
var ClosingStates = map[enum.BookingStatus]bool{
enum.FAILURE: true,
enum.SUCCESS: true,
enum.FORGOTTEN: true,
enum.CANCELLED: true,
}
// WatchBookings is a safety-net fallback for when oc-monitord fails to launch.
// It detects bookings that are past expected_start_date by at least 1 minute and
// are still in a non-terminal state. Instead of writing to the database directly,
// it emits WORKFLOW_STEP_DONE_EVENT with State=FAILURE on NATS so that oc-scheduler
// handles the state transition — keeping a single source of truth for booking state.
//
// Must be launched in a goroutine from main.
func WatchBookings() {
logger := oclib.GetLogger()
logger.Info().Msg("BookingWatchdog: started")
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
if err := scanStaleBookings(); err != nil {
logger.Error().Msg("BookingWatchdog: " + err.Error())
}
}
}
// scanStaleBookings queries all bookings whose ExpectedStartDate passed more than
// 1 minute ago. Non-terminal ones get a WORKFLOW_STEP_DONE_EVENT FAILURE emitted
// on NATS so oc-scheduler closes them.
func scanStaleBookings() error {
myself, err := oclib.GetMySelf()
if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err)
}
peerID := myself.GetID()
deadline := time.Now().UTC().Add(-time.Minute)
res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"expected_start_date": {{
Operator: dbs.LTE.String(),
Value: primitive.NewDateTimeFromTime(deadline),
}},
},
}, "", false, 0, 10000)
if res.Err != "" {
return fmt.Errorf("stale booking search failed: %s", res.Err)
}
for _, dbo := range res.Data {
b, ok := dbo.(*bookingmodel.Booking)
if !ok {
continue
}
go emitWatchdogFailure(b)
}
return nil
}
// emitWatchdogFailure publishes a WORKFLOW_STEP_DONE_EVENT FAILURE for a stale
// booking. oc-scheduler is the single authority for booking state transitions.
func emitWatchdogFailure(b *bookingmodel.Booking) {
logger := oclib.GetLogger()
if _, done := processedBookings.Load(b.GetID()); done {
return
}
if ClosingStates[b.State] {
processedBookings.Store(b.GetID(), struct{}{})
return
}
now := time.Now().UTC()
payload, err := json.Marshal(tools.WorkflowLifecycleEvent{
BookingID: b.GetID(),
State: enum.FAILURE.EnumIndex(),
RealEnd: &now,
})
if err != nil {
return
}
tools.NewNATSCaller().SetNATSPub(tools.WORKFLOW_STEP_DONE_EVENT, tools.NATSResponse{
FromApp: "oc-datacenter",
Method: int(tools.WORKFLOW_STEP_DONE_EVENT),
Payload: payload,
})
logger.Info().Msgf("BookingWatchdog: booking %s stale → emitting FAILURE", b.GetID())
processedBookings.Store(b.GetID(), struct{}{})
}
2026-03-24 10:50:36 +01:00
var uuidNsPattern = regexp.MustCompile(`^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$`)
// WatchInfra is a safety-net watchdog that periodically scans Kubernetes for
// execution namespaces whose WorkflowExecution has reached a terminal state
// but whose infra was never torn down (e.g. because WORKFLOW_DONE_EVENT was
// missed due to oc-monitord or oc-datacenter crash/restart).
//
// Must be launched in a goroutine from main.
2026-04-09 07:49:35 +02:00
func Watch() {
2026-03-24 10:50:36 +01:00
logger := oclib.GetLogger()
logger.Info().Msg("InfraWatchdog: started")
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for range ticker.C {
2026-04-09 07:49:35 +02:00
if err := scanOrphaned(); err != nil {
2026-03-24 10:50:36 +01:00
logger.Error().Msg("InfraWatchdog: " + err.Error())
}
2026-04-09 07:49:35 +02:00
if err := scanOrphanedMinio(); err != nil {
2026-03-24 10:50:36 +01:00
logger.Error().Msg("InfraWatchdog(minio): " + err.Error())
}
2026-04-09 07:49:35 +02:00
if err := scanOrphanedAdmiraltyNodes(); err != nil {
2026-03-24 10:50:36 +01:00
logger.Error().Msg("InfraWatchdog(admiralty-nodes): " + err.Error())
}
2026-04-09 07:49:35 +02:00
if err := scanOrphanedPVC(); err != nil {
2026-03-24 10:50:36 +01:00
logger.Error().Msg("InfraWatchdog(pvc): " + err.Error())
}
}
}
// scanOrphanedInfra lists all UUID-named Kubernetes namespaces, looks up their
// WorkflowExecution in the DB, and triggers teardown for any that are in a
// terminal state. Namespaces already in Terminating phase are skipped.
2026-04-09 07:49:35 +02:00
func scanOrphaned() error {
2026-03-24 10:50:36 +01:00
logger := oclib.GetLogger()
serv, err := tools.NewKubernetesService(
conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort,
conf.GetConfig().KubeCA,
conf.GetConfig().KubeCert,
conf.GetConfig().KubeData,
)
if err != nil {
return fmt.Errorf("failed to init k8s service: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
nsList, err := serv.Set.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list namespaces: %w", err)
}
myself, err := oclib.GetMySelf()
if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err)
}
peerID := myself.GetID()
for _, ns := range nsList.Items {
executionsID := ns.Name
if !uuidNsPattern.MatchString(executionsID) {
continue
}
// Skip namespaces already being deleted by a previous teardown.
if ns.Status.Phase == v1.NamespaceTerminating {
continue
}
exec := findTerminalExecution(executionsID, peerID)
if exec == nil {
continue
}
logger.Info().Msgf("InfraWatchdog: orphaned infra detected for execution %s (state=%v) → teardown",
executionsID, exec.State)
2026-04-09 07:49:35 +02:00
go TeardownForExecution(exec.GetID(), exec.ExecutionsID)
2026-03-24 10:50:36 +01:00
}
return nil
}
// scanOrphanedMinio scans LIVE_STORAGE bookings for executions that are in a
// terminal state and triggers Minio teardown for each unique executionsID found.
// This covers the case where the Kubernetes namespace is already gone (manual
// deletion, prior partial teardown) but Minio SA and bucket were never revoked.
2026-04-09 07:49:35 +02:00
func scanOrphanedMinio() error {
2026-03-24 10:50:36 +01:00
logger := oclib.GetLogger()
myself, err := oclib.GetMySelf()
if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err)
}
peerID := myself.GetID()
res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}},
},
2026-04-09 07:49:35 +02:00
}, "", false, 0, 10000)
2026-03-24 10:50:36 +01:00
if res.Err != "" {
return fmt.Errorf("failed to search LIVE_STORAGE bookings: %s", res.Err)
}
// Collect unique executionsIDs to avoid redundant teardowns.
seen := map[string]bool{}
ctx := context.Background()
for _, dbo := range res.Data {
b, ok := dbo.(*bookingmodel.Booking)
if !ok || seen[b.ExecutionsID] {
continue
}
exec := findTerminalExecution(b.ExecutionsID, peerID)
if exec == nil {
continue
}
seen[b.ExecutionsID] = true
minio := storage.NewMinioSetter(b.ExecutionsID, b.ResourceID)
2026-03-24 10:50:36 +01:00
// Determine this peer's role and call the appropriate teardown.
if b.DestPeerID == peerID {
logger.Info().Msgf("InfraWatchdog(minio): orphaned target resources for exec %s → TeardownAsTarget", b.ExecutionsID)
event := storage.MinioDeleteEvent{
2026-03-24 10:50:36 +01:00
ExecutionsID: b.ExecutionsID,
MinioID: b.ResourceID,
SourcePeerID: b.DestPeerID,
DestPeerID: peerID,
}
go minio.TeardownAsTarget(ctx, event)
2026-03-24 10:50:36 +01:00
} else {
logger.Info().Msgf("InfraWatchdog(minio): orphaned source resources for exec %s → TeardownAsSource", b.ExecutionsID)
event := storage.MinioDeleteEvent{
2026-03-24 10:50:36 +01:00
ExecutionsID: b.ExecutionsID,
MinioID: b.ResourceID,
SourcePeerID: peerID,
DestPeerID: b.DestPeerID,
}
go minio.TeardownAsSource(ctx, event)
2026-03-24 10:50:36 +01:00
}
}
return nil
}
// scanOrphanedAdmiraltyNodes lists all Kubernetes nodes, identifies Admiralty
// virtual nodes (name prefix "admiralty-{UUID}-") that are NotReady, and
// explicitly deletes them when their WorkflowExecution is in a terminal state.
//
// This covers the gap where the namespace is already gone (or Terminating) but
// the virtual node was never cleaned up by the Admiralty controller — which can
// happen when the node goes NotReady before the AdmiraltyTarget CRD is deleted.
2026-04-09 07:49:35 +02:00
func scanOrphanedAdmiraltyNodes() error {
2026-03-24 10:50:36 +01:00
logger := oclib.GetLogger()
serv, err := tools.NewKubernetesService(
conf.GetConfig().KubeHost+":"+conf.GetConfig().KubePort,
conf.GetConfig().KubeCA,
conf.GetConfig().KubeCert,
conf.GetConfig().KubeData,
)
if err != nil {
return fmt.Errorf("failed to init k8s service: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
nodeList, err := serv.Set.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list nodes: %w", err)
}
myself, err := oclib.GetMySelf()
if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err)
}
peerID := myself.GetID()
for _, node := range nodeList.Items {
// Admiralty virtual nodes are named: admiralty-{executionID}-target-{...}
rest := strings.TrimPrefix(node.Name, "admiralty-")
if rest == node.Name {
continue // not an admiralty node
}
// UUID is exactly 36 chars: 8-4-4-4-12
if len(rest) < 36 {
continue
}
executionsID := rest[:36]
if !uuidNsPattern.MatchString(executionsID) {
continue
}
// Only act on NotReady nodes.
ready := false
for _, cond := range node.Status.Conditions {
if cond.Type == v1.NodeReady {
ready = cond.Status == v1.ConditionTrue
break
}
}
if ready {
continue
}
exec := findTerminalExecution(executionsID, peerID)
if exec == nil {
continue
}
logger.Info().Msgf("InfraWatchdog(admiralty-nodes): NotReady orphaned node %s for terminal execution %s → deleting",
node.Name, executionsID)
if delErr := serv.Set.CoreV1().Nodes().Delete(ctx, node.Name, metav1.DeleteOptions{}); delErr != nil {
logger.Error().Msgf("InfraWatchdog(admiralty-nodes): failed to delete node %s: %v", node.Name, delErr)
}
}
return nil
}
// scanOrphanedPVC scans LIVE_STORAGE bookings for executions that are in a
// terminal state and triggers PVC teardown for each one where this peer holds
// the local storage. This covers the case where the Kubernetes namespace was
// already deleted (or its teardown was partial) but the PersistentVolume
// (cluster-scoped) was never reclaimed.
//
// A LIVE_STORAGE booking is treated as a local PVC only when ResolveStorageName
// returns a non-empty name — the same guard used by teardownPVCForExecution.
2026-04-09 07:49:35 +02:00
func scanOrphanedPVC() error {
2026-03-24 10:50:36 +01:00
logger := oclib.GetLogger()
myself, err := oclib.GetMySelf()
if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err)
}
peerID := myself.GetID()
res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}},
},
2026-04-09 07:49:35 +02:00
}, "", false, 0, 10000)
2026-03-24 10:50:36 +01:00
if res.Err != "" {
return fmt.Errorf("failed to search LIVE_STORAGE bookings: %s", res.Err)
}
seen := map[string]bool{}
ctx := context.Background()
for _, dbo := range res.Data {
b, ok := dbo.(*bookingmodel.Booking)
if !ok || seen[b.ExecutionsID+b.ResourceID] {
continue
}
storageName := storage.ResolveStorageName(b.ResourceID, peerID)
if storageName == "" {
continue // not a local PVC booking
}
exec := findTerminalExecution(b.ExecutionsID, peerID)
if exec == nil {
continue
}
seen[b.ExecutionsID+b.ResourceID] = true
logger.Info().Msgf("InfraWatchdog(pvc): orphaned PVC for exec %s storage %s → TeardownAsSource",
b.ExecutionsID, b.ResourceID)
event := storage.PVCDeleteEvent{
ExecutionsID: b.ExecutionsID,
StorageID: b.ResourceID,
StorageName: storageName,
SourcePeerID: peerID,
DestPeerID: b.DestPeerID,
}
go storage.NewPVCSetter(b.ExecutionsID, b.ResourceID).TeardownAsSource(ctx, event)
}
return nil
}
// findTerminalExecution returns the WorkflowExecution for the given executionsID
// if it exists in the DB and is in a terminal state, otherwise nil.
func findTerminalExecution(executionsID string, peerID string) *workflow_execution.WorkflowExecution {
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", peerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"executions_id": {{Operator: dbs.EQUAL.String(), Value: executionsID}},
},
2026-04-09 07:49:35 +02:00
}, "", false, 0, 10000)
2026-03-24 10:50:36 +01:00
if res.Err != "" || len(res.Data) == 0 {
return nil
}
exec, ok := res.Data[0].(*workflow_execution.WorkflowExecution)
if !ok {
return nil
}
2026-04-09 07:49:35 +02:00
if !ClosingStates[exec.State] {
2026-03-24 10:50:36 +01:00
return nil
}
return exec
}
// teardownInfraForExecution handles infrastructure cleanup when a workflow terminates.
// oc-datacenter is responsible only for infra here — booking/execution state
// is managed by oc-scheduler.
2026-04-09 07:49:35 +02:00
func TeardownForExecution(executionID string, executionsID string) {
logger := oclib.GetLogger()
myself, err := oclib.GetMySelf()
if err != nil || myself == nil {
return
}
selfPeerID := myself.GetID()
adminReq := &tools.APIRequest{Admin: true}
res, _, loadErr := workflow_execution.NewAccessor(adminReq).LoadOne(executionID)
if loadErr != nil || res == nil {
logger.Warn().Msgf("teardownInfraForExecution: execution %s not found", executionID)
return
}
exec := res.(*workflow_execution.WorkflowExecution)
ctx := context.Background()
2026-04-09 07:49:35 +02:00
admiralty.NewAdmiraltySetter(executionsID).TeardownIfRemote(exec, selfPeerID)
storage.NewMinioSetter(executionsID, "").TeardownForExecution(ctx, selfPeerID)
storage.NewPVCSetter(executionsID, "").TeardownForExecution(ctx, selfPeerID)
kubernetes.NewKubernetesService(executionsID).CleanupImages(ctx)
}