diff --git a/daemons/execute_monitor_local.go b/daemons/execute_monitor_local.go index 9edbc33..8ae8224 100644 --- a/daemons/execute_monitor_local.go +++ b/daemons/execute_monitor_local.go @@ -11,7 +11,7 @@ import ( type LocalMonitor struct { LokiURL string KubeURL string - ExecutionID string + ExecutionID string Duration int Logger zerolog.Logger } @@ -30,7 +30,7 @@ func (lm *LocalMonitor) LaunchLocalMonitor() { } func (lm *LocalMonitor) execLocalKube() { - args := []string{"-w", lm.ExecutionID, "-u", lm.LokiURL, "-m", conf.GetConfig().MongoUrl, "-d", conf.GetConfig().DBName} + args := []string{"-e", lm.ExecutionID, "-u", lm.LokiURL, "-m", conf.GetConfig().MongoUrl, "-d", conf.GetConfig().DBName} if lm.Duration > 0 { args = append(args, "-t", fmt.Sprintf("%d", lm.Duration)) } diff --git a/daemons/execution_manager.go b/daemons/execution_manager.go index f43dc64..52e6086 100644 --- a/daemons/execution_manager.go +++ b/daemons/execution_manager.go @@ -9,7 +9,7 @@ import ( workflow_execution "cloud.o-forge.io/core/oc-lib/models/workflow_execution" ) -var Bookings = ScheduledBooking{Bookings: map[string]*workflow_execution.WorkflowExecution{}} +var Bookings = ScheduledBooking{Bookings: []*workflow_execution.WorkflowExecution{}} type ExecutionManager struct{} @@ -21,11 +21,12 @@ func (em *ExecutionManager) RetrieveNextExecutions() { logger.Debug().Msg("New loop") Bookings.Mu.Lock() if len(Bookings.Bookings) > 0 { - for k, v := range Bookings.Bookings { - if v.ExecDate.Before(time.Now().UTC()) { - logger.Info().Msg("Will execute " + k + " soon") - go em.executeBooking(v) - delete(Bookings.Bookings, k) + bookings := Bookings.Bookings + for i := len(bookings) - 1; i >= 0; i-- { + if bookings[i].ExecDate.Before(time.Now().UTC()) { + logger.Info().Msg("Will execute " + bookings[i].UUID + " soon") + go em.executeBooking(bookings[i]) + Bookings.Bookings = append(bookings[:i], bookings[i+1:]...) } } } diff --git a/daemons/schedule_manager.go b/daemons/schedule_manager.go index 60f476d..00a4eec 100644 --- a/daemons/schedule_manager.go +++ b/daemons/schedule_manager.go @@ -6,39 +6,38 @@ import ( "oc-schedulerd/conf" "sync" "time" - "cloud.o-forge.io/core/oc-lib/tools" + oclib "cloud.o-forge.io/core/oc-lib" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/models/workflow_execution" + "cloud.o-forge.io/core/oc-lib/tools" "github.com/nats-io/nats.go" "github.com/rs/zerolog" "go.mongodb.org/mongo-driver/bson/primitive" ) type ScheduledBooking struct { - Bookings map[string]*workflow_execution.WorkflowExecution + Bookings []*workflow_execution.WorkflowExecution Mu sync.Mutex } func (sb *ScheduledBooking) DeleteSchedules(workflow_id string) { - toDelete := []string{} - for k, b := range sb.Bookings { - if b.WorkflowID == workflow_id { - toDelete = append(toDelete, k) + toNotDelete := []*workflow_execution.WorkflowExecution{} + for _, b := range sb.Bookings { + if b.WorkflowID != workflow_id { + toNotDelete = append(toNotDelete, b) } } Bookings.Mu.Lock() defer Bookings.Mu.Unlock() - for _, k := range toDelete { - delete(sb.Bookings, k) - } + sb.Bookings = toNotDelete } func (sb *ScheduledBooking) AddSchedules(new_bookings []*workflow_execution.WorkflowExecution, logger zerolog.Logger) { Bookings.Mu.Lock() defer Bookings.Mu.Unlock() for _, exec := range new_bookings { - sb.Bookings[exec.GetID()] = exec + sb.Bookings = append(sb.Bookings , exec) } } // NATS daemon listens to subject " workflowsUpdate " diff --git a/go.mod b/go.mod index 5b8b474..083c0dd 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( ) require ( - cloud.o-forge.io/core/oc-lib v0.0.0-20240821093044-f64563c9ff06 // indirect + cloud.o-forge.io/core/oc-lib v0.0.0-20240826103423-aff9304b1a71 // indirect github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7 // indirect github.com/antihax/optional v1.0.0 // indirect github.com/aws/aws-sdk-go v1.36.29 // indirect diff --git a/go.sum b/go.sum index 911c1e4..b1ca284 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,12 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20240812075555-6e3069068ce4 h1:fdxRsT4eR4v1D cloud.o-forge.io/core/oc-lib v0.0.0-20240812075555-6e3069068ce4/go.mod h1:V5EL+NV2s9P1/BcFm3/icfLeBYVVMLl1Z0F0eecJZGo= cloud.o-forge.io/core/oc-lib v0.0.0-20240821093044-f64563c9ff06 h1:sYveE1C/0mpSr+ZmOYxuZ3fTWID7mr5hPiq0jQenv3Q= cloud.o-forge.io/core/oc-lib v0.0.0-20240821093044-f64563c9ff06/go.mod h1:1hhYh5QWAbYw9cKplQ0ZD9PMgU8t6gPqiYF8sldv1HU= +cloud.o-forge.io/core/oc-lib v0.0.0-20240826085916-d0e1474f8f34 h1:40XQgwR9HxXSnouY+ZqE/xYCM4qa+U+RLA5GA5JSNyQ= +cloud.o-forge.io/core/oc-lib v0.0.0-20240826085916-d0e1474f8f34/go.mod h1:1hhYh5QWAbYw9cKplQ0ZD9PMgU8t6gPqiYF8sldv1HU= +cloud.o-forge.io/core/oc-lib v0.0.0-20240826094730-73dc43b329d7 h1:WrlURBiciau4p6iU3v0nKcQYjBqW8e9Uc2soDDXll28= +cloud.o-forge.io/core/oc-lib v0.0.0-20240826094730-73dc43b329d7/go.mod h1:1hhYh5QWAbYw9cKplQ0ZD9PMgU8t6gPqiYF8sldv1HU= +cloud.o-forge.io/core/oc-lib v0.0.0-20240826103423-aff9304b1a71 h1:GodGMXVFgSdd5R1FoUjFAloOS+zOd3j66Wa+jcEPa4c= +cloud.o-forge.io/core/oc-lib v0.0.0-20240826103423-aff9304b1a71/go.mod h1:1hhYh5QWAbYw9cKplQ0ZD9PMgU8t6gPqiYF8sldv1HU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7 h1:n0MD6UkwbgGHtXsmfgVzC2+ZbHzIsScpbq9ZGI18074= github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7/go.mod h1:xdrQDwHlKUmv8yiElMx6W0W10cLkqpeSEUUib8KGtv4= diff --git a/main.go b/main.go index 481c502..c90b5d1 100644 --- a/main.go +++ b/main.go @@ -5,13 +5,14 @@ import ( conf "oc-schedulerd/conf" "oc-schedulerd/daemons" - "cloud.o-forge.io/core/oc-lib/tools" + oclib "cloud.o-forge.io/core/oc-lib" + "cloud.o-forge.io/core/oc-lib/tools" ) func main() { tools.SetConfig(conf.GetConfig().MongoUrl, "DC_myDC", "") - oclib.Init("oc-schedulerd") + oclib.Init("oc-schedulerd","","") sch_mngr := daemons.ScheduleManager{Logger: oclib.GetLogger()} exe_mngr := daemons.ExecutionManager{}