This commit is contained in:
pb 2024-08-28 14:03:48 +02:00
parent 7206de35a8
commit a69ecc4ab5
6 changed files with 28 additions and 21 deletions

View File

@ -11,7 +11,7 @@ import (
type LocalMonitor struct { type LocalMonitor struct {
LokiURL string LokiURL string
KubeURL string KubeURL string
ExecutionID string ExecutionID string
Duration int Duration int
Logger zerolog.Logger Logger zerolog.Logger
} }
@ -30,7 +30,7 @@ func (lm *LocalMonitor) LaunchLocalMonitor() {
} }
func (lm *LocalMonitor) execLocalKube() { func (lm *LocalMonitor) execLocalKube() {
args := []string{"-w", lm.ExecutionID, "-u", lm.LokiURL, "-m", conf.GetConfig().MongoUrl, "-d", conf.GetConfig().DBName} args := []string{"-e", lm.ExecutionID, "-u", lm.LokiURL, "-m", conf.GetConfig().MongoUrl, "-d", conf.GetConfig().DBName}
if lm.Duration > 0 { if lm.Duration > 0 {
args = append(args, "-t", fmt.Sprintf("%d", lm.Duration)) args = append(args, "-t", fmt.Sprintf("%d", lm.Duration))
} }

View File

@ -9,7 +9,7 @@ import (
workflow_execution "cloud.o-forge.io/core/oc-lib/models/workflow_execution" workflow_execution "cloud.o-forge.io/core/oc-lib/models/workflow_execution"
) )
var Bookings = ScheduledBooking{Bookings: map[string]*workflow_execution.WorkflowExecution{}} var Bookings = ScheduledBooking{Bookings: []*workflow_execution.WorkflowExecution{}}
type ExecutionManager struct{} type ExecutionManager struct{}
@ -21,11 +21,12 @@ func (em *ExecutionManager) RetrieveNextExecutions() {
logger.Debug().Msg("New loop") logger.Debug().Msg("New loop")
Bookings.Mu.Lock() Bookings.Mu.Lock()
if len(Bookings.Bookings) > 0 { if len(Bookings.Bookings) > 0 {
for k, v := range Bookings.Bookings { bookings := Bookings.Bookings
if v.ExecDate.Before(time.Now().UTC()) { for i := len(bookings) - 1; i >= 0; i-- {
logger.Info().Msg("Will execute " + k + " soon") if bookings[i].ExecDate.Before(time.Now().UTC()) {
go em.executeBooking(v) logger.Info().Msg("Will execute " + bookings[i].UUID + " soon")
delete(Bookings.Bookings, k) go em.executeBooking(bookings[i])
Bookings.Bookings = append(bookings[:i], bookings[i+1:]...)
} }
} }
} }

View File

@ -6,39 +6,38 @@ import (
"oc-schedulerd/conf" "oc-schedulerd/conf"
"sync" "sync"
"time" "time"
"cloud.o-forge.io/core/oc-lib/tools"
oclib "cloud.o-forge.io/core/oc-lib" oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution" "cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"go.mongodb.org/mongo-driver/bson/primitive" "go.mongodb.org/mongo-driver/bson/primitive"
) )
type ScheduledBooking struct { type ScheduledBooking struct {
Bookings map[string]*workflow_execution.WorkflowExecution Bookings []*workflow_execution.WorkflowExecution
Mu sync.Mutex Mu sync.Mutex
} }
func (sb *ScheduledBooking) DeleteSchedules(workflow_id string) { func (sb *ScheduledBooking) DeleteSchedules(workflow_id string) {
toDelete := []string{} toNotDelete := []*workflow_execution.WorkflowExecution{}
for k, b := range sb.Bookings { for _, b := range sb.Bookings {
if b.WorkflowID == workflow_id { if b.WorkflowID != workflow_id {
toDelete = append(toDelete, k) toNotDelete = append(toNotDelete, b)
} }
} }
Bookings.Mu.Lock() Bookings.Mu.Lock()
defer Bookings.Mu.Unlock() defer Bookings.Mu.Unlock()
for _, k := range toDelete { sb.Bookings = toNotDelete
delete(sb.Bookings, k)
}
} }
func (sb *ScheduledBooking) AddSchedules(new_bookings []*workflow_execution.WorkflowExecution, logger zerolog.Logger) { func (sb *ScheduledBooking) AddSchedules(new_bookings []*workflow_execution.WorkflowExecution, logger zerolog.Logger) {
Bookings.Mu.Lock() Bookings.Mu.Lock()
defer Bookings.Mu.Unlock() defer Bookings.Mu.Unlock()
for _, exec := range new_bookings { for _, exec := range new_bookings {
sb.Bookings[exec.GetID()] = exec sb.Bookings = append(sb.Bookings , exec)
} }
} }
// NATS daemon listens to subject " workflowsUpdate " // NATS daemon listens to subject " workflowsUpdate "

2
go.mod
View File

@ -17,7 +17,7 @@ require (
) )
require ( require (
cloud.o-forge.io/core/oc-lib v0.0.0-20240821093044-f64563c9ff06 // indirect cloud.o-forge.io/core/oc-lib v0.0.0-20240826103423-aff9304b1a71 // indirect
github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7 // indirect github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7 // indirect
github.com/antihax/optional v1.0.0 // indirect github.com/antihax/optional v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.36.29 // indirect github.com/aws/aws-sdk-go v1.36.29 // indirect

6
go.sum
View File

@ -6,6 +6,12 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20240812075555-6e3069068ce4 h1:fdxRsT4eR4v1D
cloud.o-forge.io/core/oc-lib v0.0.0-20240812075555-6e3069068ce4/go.mod h1:V5EL+NV2s9P1/BcFm3/icfLeBYVVMLl1Z0F0eecJZGo= cloud.o-forge.io/core/oc-lib v0.0.0-20240812075555-6e3069068ce4/go.mod h1:V5EL+NV2s9P1/BcFm3/icfLeBYVVMLl1Z0F0eecJZGo=
cloud.o-forge.io/core/oc-lib v0.0.0-20240821093044-f64563c9ff06 h1:sYveE1C/0mpSr+ZmOYxuZ3fTWID7mr5hPiq0jQenv3Q= cloud.o-forge.io/core/oc-lib v0.0.0-20240821093044-f64563c9ff06 h1:sYveE1C/0mpSr+ZmOYxuZ3fTWID7mr5hPiq0jQenv3Q=
cloud.o-forge.io/core/oc-lib v0.0.0-20240821093044-f64563c9ff06/go.mod h1:1hhYh5QWAbYw9cKplQ0ZD9PMgU8t6gPqiYF8sldv1HU= cloud.o-forge.io/core/oc-lib v0.0.0-20240821093044-f64563c9ff06/go.mod h1:1hhYh5QWAbYw9cKplQ0ZD9PMgU8t6gPqiYF8sldv1HU=
cloud.o-forge.io/core/oc-lib v0.0.0-20240826085916-d0e1474f8f34 h1:40XQgwR9HxXSnouY+ZqE/xYCM4qa+U+RLA5GA5JSNyQ=
cloud.o-forge.io/core/oc-lib v0.0.0-20240826085916-d0e1474f8f34/go.mod h1:1hhYh5QWAbYw9cKplQ0ZD9PMgU8t6gPqiYF8sldv1HU=
cloud.o-forge.io/core/oc-lib v0.0.0-20240826094730-73dc43b329d7 h1:WrlURBiciau4p6iU3v0nKcQYjBqW8e9Uc2soDDXll28=
cloud.o-forge.io/core/oc-lib v0.0.0-20240826094730-73dc43b329d7/go.mod h1:1hhYh5QWAbYw9cKplQ0ZD9PMgU8t6gPqiYF8sldv1HU=
cloud.o-forge.io/core/oc-lib v0.0.0-20240826103423-aff9304b1a71 h1:GodGMXVFgSdd5R1FoUjFAloOS+zOd3j66Wa+jcEPa4c=
cloud.o-forge.io/core/oc-lib v0.0.0-20240826103423-aff9304b1a71/go.mod h1:1hhYh5QWAbYw9cKplQ0ZD9PMgU8t6gPqiYF8sldv1HU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7 h1:n0MD6UkwbgGHtXsmfgVzC2+ZbHzIsScpbq9ZGI18074= github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7 h1:n0MD6UkwbgGHtXsmfgVzC2+ZbHzIsScpbq9ZGI18074=
github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7/go.mod h1:xdrQDwHlKUmv8yiElMx6W0W10cLkqpeSEUUib8KGtv4= github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7/go.mod h1:xdrQDwHlKUmv8yiElMx6W0W10cLkqpeSEUUib8KGtv4=

View File

@ -5,13 +5,14 @@ import (
conf "oc-schedulerd/conf" conf "oc-schedulerd/conf"
"oc-schedulerd/daemons" "oc-schedulerd/daemons"
"cloud.o-forge.io/core/oc-lib/tools"
oclib "cloud.o-forge.io/core/oc-lib" oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/tools"
) )
func main() { func main() {
tools.SetConfig(conf.GetConfig().MongoUrl, "DC_myDC", "") tools.SetConfig(conf.GetConfig().MongoUrl, "DC_myDC", "")
oclib.Init("oc-schedulerd") oclib.Init("oc-schedulerd","","")
sch_mngr := daemons.ScheduleManager{Logger: oclib.GetLogger()} sch_mngr := daemons.ScheduleManager{Logger: oclib.GetLogger()}
exe_mngr := daemons.ExecutionManager{} exe_mngr := daemons.ExecutionManager{}