Files
oc-datacenter/infrastructure/booking_watchdog.go

245 lines
8.3 KiB
Go

package infrastructure
import (
"context"
"fmt"
"sync"
"time"
"oc-datacenter/infrastructure/minio"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
bookingmodel "cloud.o-forge.io/core/oc-lib/models/booking"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
"go.mongodb.org/mongo-driver/bson/primitive"
)
// processedBookings tracks booking IDs whose start-expiry has already been handled.
// Resets on restart; teardown methods are idempotent so duplicate runs are safe.
var processedBookings sync.Map
// processedEndBookings tracks booking IDs whose end-expiry (Admiralty source cleanup)
// has already been triggered in this process lifetime.
var processedEndBookings sync.Map
// closingStates is the set of terminal booking states after which infra must be torn down.
var closingStates = map[enum.BookingStatus]bool{
enum.FAILURE: true,
enum.SUCCESS: true,
enum.FORGOTTEN: true,
enum.CANCELLED: true,
}
// WatchBookings starts a passive loop that ticks every minute, scans bookings whose
// ExpectedStartDate + 1 min has passed, transitions them to terminal states when needed,
// and tears down the associated Kubernetes / Minio infrastructure.
// Must be launched in a goroutine from main.
func WatchBookings() {
logger := oclib.GetLogger()
logger.Info().Msg("BookingWatchdog: started")
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for range ticker.C {
if err := scanExpiredBookings(); err != nil {
logger.Error().Msg("BookingWatchdog: " + err.Error())
}
if err := scanEndedExec(); err != nil {
logger.Error().Msg("BookingWatchdog: " + err.Error())
}
}
}
// scanExpiredBookings queries all bookings whose start deadline has passed and
// dispatches each one to processExpiredBooking.
func scanExpiredBookings() error {
myself, err := oclib.GetMySelf()
if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err)
}
peerID := myself.GetID()
deadline := time.Now().Add(-time.Minute)
res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"expected_start_date": {{
Operator: dbs.LTE.String(),
Value: primitive.NewDateTimeFromTime(deadline),
}},
},
}, "", false)
if res.Err != "" {
return fmt.Errorf("booking search failed: %s", res.Err)
}
for _, dbo := range res.Data {
b, ok := dbo.(*bookingmodel.Booking)
if !ok {
continue
}
go processExpiredBooking(b, peerID)
}
return nil
}
// processExpiredBooking transitions the booking to a terminal state when applicable,
// then tears down infrastructure based on the resource type:
// - LIVE_DATACENTER / COMPUTE_RESOURCE → Admiralty (as target) + Minio (as target)
// - LIVE_STORAGE / STORAGE_RESOURCE → Minio (as source)
func processExpiredBooking(b *bookingmodel.Booking, peerID string) {
logger := oclib.GetLogger()
ctx := context.Background()
// Skip bookings already handled during this process lifetime.
if _, done := processedBookings.Load(b.GetID()); done {
return
}
// Transition non-terminal bookings.
if !closingStates[b.State] {
var newState enum.BookingStatus
switch b.State {
case enum.DRAFT, enum.DELAYED:
// DRAFT: never launched; DELAYED: was SCHEDULED but start never arrived.
newState = enum.FORGOTTEN
case enum.SCHEDULED:
// Passed its start date without ever being launched.
newState = enum.FAILURE
case enum.STARTED:
// A running booking is never auto-closed by the watchdog.
return
default:
return
}
upd := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", peerID, []string{}, nil).
UpdateOne(map[string]any{"state": newState.EnumIndex()}, b.GetID())
if upd.Err != "" {
logger.Error().Msgf("BookingWatchdog: failed to update booking %s: %s", b.GetID(), upd.Err)
return
}
b.State = newState
logger.Info().Msgf("BookingWatchdog: booking %s (exec=%s, type=%s) → %s",
b.GetID(), b.ExecutionsID, b.ResourceType, b.State)
}
// Mark as handled before triggering async teardown (avoids double-trigger on next tick).
processedBookings.Store(b.GetID(), struct{}{})
// Tear down infrastructure according to resource type.
switch b.ResourceType {
case tools.LIVE_DATACENTER, tools.COMPUTE_RESOURCE:
logger.Info().Msgf("BookingWatchdog: tearing down compute infra exec=%s", b.ExecutionsID)
go NewAdmiraltySetter(b.ExecutionsID).TeardownAsSource(ctx) // i'm the compute units.
go teardownMinioForComputeBooking(ctx, b, peerID)
case tools.LIVE_STORAGE, tools.STORAGE_RESOURCE:
logger.Info().Msgf("BookingWatchdog: tearing down storage infra exec=%s", b.ExecutionsID)
go teardownMinioSourceBooking(ctx, b, peerID)
}
}
// scanEndedBookings queries LIVE_DATACENTER / COMPUTE_RESOURCE bookings whose
// ExpectedEndDate + 1 min has passed and triggers TeardownAsSource for Admiralty,
// cleaning up the compute-side namespace once the execution window is over.
func scanEndedExec() error {
myself, err := oclib.GetMySelf()
if err != nil {
return fmt.Errorf("could not resolve local peer: %w", err)
}
peerID := myself.GetID()
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", peerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
// Only compute bookings require Admiralty source cleanup.
"state": {{
Operator: dbs.GT.String(),
Value: 2,
}},
},
}, "", false)
if res.Err != "" {
return fmt.Errorf("ended-booking search failed: %s", res.Err)
}
for _, dbo := range res.Data {
b, ok := dbo.(*workflow_execution.WorkflowExecution)
if !ok {
continue
}
go teardownAdmiraltyTarget(b)
}
return nil
}
// teardownAdmiraltySource triggers TeardownAsSource for the compute-side namespace
// of an execution whose expected end date has passed.
func teardownAdmiraltyTarget(b *workflow_execution.WorkflowExecution) {
logger := oclib.GetLogger()
// Each executionsID is processed at most once per process lifetime.
if _, done := processedEndBookings.Load(b.ExecutionsID); done {
return
}
processedEndBookings.Store(b.ExecutionsID, struct{}{})
logger.Info().Msgf("BookingWatchdog: tearing down Admiralty source exec=%s (booking=%s)",
b.ExecutionsID, b.GetID())
if p, err := oclib.GetMySelf(); err == nil {
NewAdmiraltySetter(b.ExecutionsID).TeardownAsTarget(context.Background(), p.GetID())
}
}
// teardownMinioForComputeBooking finds the LIVE_STORAGE bookings belonging to the same
// execution and triggers Minio-as-target teardown for each (K8s secret + configmap).
// The Minio-as-source side is handled separately by the storage booking's own watchdog pass.
func teardownMinioForComputeBooking(ctx context.Context, computeBooking *bookingmodel.Booking, localPeerID string) {
logger := oclib.GetLogger()
res := oclib.NewRequest(oclib.LibDataEnum(oclib.BOOKING), "", localPeerID, []string{}, nil).
Search(&dbs.Filters{
And: map[string][]dbs.Filter{
"executions_id": {{Operator: dbs.EQUAL.String(), Value: computeBooking.ExecutionsID}},
"resource_type": {{Operator: dbs.EQUAL.String(), Value: tools.LIVE_STORAGE.EnumIndex()}},
},
}, "", false)
if res.Err != "" || len(res.Data) == 0 {
logger.Warn().Msgf("BookingWatchdog: no storage booking found for exec=%s", computeBooking.ExecutionsID)
return
}
for _, dbo := range res.Data {
sb, ok := dbo.(*bookingmodel.Booking)
if !ok {
continue
}
event := minio.MinioDeleteEvent{
ExecutionsID: computeBooking.ExecutionsID,
MinioID: sb.ResourceID,
SourcePeerID: sb.DestPeerID, // peer hosting Minio
DestPeerID: localPeerID, // this peer (compute/target)
OriginID: "",
}
minio.NewMinioSetter(computeBooking.ExecutionsID, sb.ResourceID).TeardownAsTarget(ctx, event)
}
}
// teardownMinioSourceBooking triggers Minio-as-source teardown for a storage booking:
// revokes the scoped service account and removes the execution bucket on this Minio host.
func teardownMinioSourceBooking(ctx context.Context, b *bookingmodel.Booking, localPeerID string) {
event := minio.MinioDeleteEvent{
ExecutionsID: b.ExecutionsID,
MinioID: b.ResourceID,
SourcePeerID: localPeerID, // this peer IS the Minio host
DestPeerID: b.DestPeerID,
OriginID: "",
}
minio.NewMinioSetter(b.ExecutionsID, b.ResourceID).TeardownAsSource(ctx, event)
}