From 139b249a7cd53e76356f1a793a1b80422e00c889 Mon Sep 17 00:00:00 2001 From: pb Date: Thu, 17 Apr 2025 18:37:48 +0200 Subject: [PATCH 01/10] modify Docker related files to adapt to new architecture --- Dockerfile | 2 +- docker-compose.yml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index d24d39f..15c0bac 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,7 +19,7 @@ COPY --from=builder /app/oc-schedulerd /usr/bin/oc-schedulerd COPY docker_schedulerd.json /etc/oc/schedulerd.json -COPY argo_workflows . +# COPY argo_workflows . EXPOSE 8080 diff --git a/docker-compose.yml b/docker-compose.yml index 2407c4a..c03d76b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,8 +12,8 @@ services: - 9001:8080 container_name: oc-schedulerd networks: - - catalog + - oc networks: - catalog: + oc: external: true \ No newline at end of file From a59a48cc667f00771d0ef755e19bf266e3f48785 Mon Sep 17 00:00:00 2001 From: pb Date: Thu, 17 Apr 2025 18:39:34 +0200 Subject: [PATCH 02/10] modify Docker related files to adapt to new architecture --- docker_schedulerd.json | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/docker_schedulerd.json b/docker_schedulerd.json index ef0ce51..e14ee0c 100644 --- a/docker_schedulerd.json +++ b/docker_schedulerd.json @@ -6,8 +6,7 @@ "MONITORD_PATH": "oc-monitord", "KUBERNETES_SERVICE_HOST" : "192.168.47.41", "MODE": "kubernetes", - "KUBE_CA" : "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFTVlk3ZHZhNEdYTVdkMy9jMlhLN3JLYjlnWXgyNSthaEE0NmkyNVBkSFAKRktQL2UxSVMyWVF0dzNYZW1TTUQxaStZdzJSaVppNUQrSVZUamNtNHdhcnFvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVWtlUVJpNFJiODduME5yRnZaWjZHClc2SU55NnN3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUlnRXA5ck04WmdNclRZSHYxZjNzOW5DZXZZeWVVa3lZUk4KWjUzazdoaytJS1FDSVFDbk05TnVGKzlTakIzNDFacGZ5ays2NEpWdkpSM3BhcmVaejdMd2lhNm9kdz09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K", - "KUBE_CERT":"LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJWUxWNkFPQkdrU1F3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOekl6TVRFeU1ETTJNQjRYRFRJME1EZ3dPREV3TVRNMU5sb1hEVEkxTURndwpPREV3TVRNMU5sb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJGQ2Q1MFdPeWdlQ2syQzcKV2FrOWY4MVAvSkJieVRIajRWOXBsTEo0ck5HeHFtSjJOb2xROFYxdUx5RjBtOTQ2Nkc0RmRDQ2dqaXFVSk92Swp3NVRPNnd5alNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFJkOFI5cXVWK2pjeUVmL0ovT1hQSzMyS09XekFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQTArbThqTDBJVldvUTZ0dnB4cFo4NVlMalF1SmpwdXM0aDdnSXRxS3NmUVVDSUI2M2ZNdzFBMm5OVWU1TgpIUGZOcEQwSEtwcVN0Wnk4djIyVzliYlJUNklZCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFRc3hXWk9pbnIrcVp4TmFEQjVGMGsvTDF5cE01VHAxOFRaeU92ektJazQKRTFsZWVqUm9STW0zNmhPeVljbnN3d3JoNnhSUnBpMW5RdGhyMzg0S0Z6MlBvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBYZkVmYXJsZm8zTWhIL3lmemx6Cnl0OWlqbHN3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUxJL2dNYnNMT3MvUUpJa3U2WHVpRVMwTEE2cEJHMXgKcnBlTnpGdlZOekZsQWlFQW1wdjBubjZqN3M0MVI0QzFNMEpSL0djNE53MHdldlFmZWdEVGF1R2p3cFk9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K", - "KUBE_DATA": "LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSU5ZS1BFb1dhd1NKUzJlRW5oWmlYMk5VZlY1ZlhKV2krSVNnV09TNFE5VTlvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFVUozblJZN0tCNEtUWUx0WnFUMS96VS84a0Z2Sk1lUGhYMm1Vc25pczBiR3FZblkyaVZEeApYVzR2SVhTYjNqcm9iZ1YwSUtDT0twUWs2OHJEbE03ckRBPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo=" + "KUBE_CA": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTkRNMk56UTNPRGt3SGhjTk1qVXdOREF6TVRBd05qSTVXaGNOTXpVd05EQXhNVEF3TmpJNQpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTkRNMk56UTNPRGt3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFUL1NDWEMycjFTWGdza0FvTGJKSEtIem4zQXYva2t0ZElpSk42WlBsWVEKY3p0dXV5K3JBMHJ5VUlkZnIyK3VCRS9VN0NjSlhPL004QVdyODFwVklzVmdvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVVFHOVBQQ0g0c1lMbFkvQk5CdnN5CklEam1PK0l3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUtJeFc4NERQTW1URXVVN0Z3ek44SFB6ZHdldWh6U20KVzNYMU9tczFSQVNRQWlFQXI4UTJZSGtNQndSOThhcWtTa2JqU1dhejg0OEY2VkZLWjFacXpNbDFZaTg9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K", + "KUBE_CERT": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJZWFxQUp2bHhmYzh3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOelF6TmpjME56ZzVNQjRYRFRJMU1EUXdNekV3TURZeU9Wb1hEVEkyTURRdwpNekV3TURZeU9Wb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJJelpGSlJUVHJmYXlNNFoKTjlRclN4MC9wbDdoZGdvWFM5bGEydmFFRkhlYVFaalRML2NZd1dMUnhoOWVOa01SRDZjTk4reWZkSXE2aWo1SQo5RTlENGdLalNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFFzUkZXUlNweDV0RGZnZDh1UTdweUw0ZERMVEFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQStXZTlBVXJRUm5pWjVCUERELzJwWjA3TzFQWWFIc01ycTZZcVB4VlV5cGdDSUhrRE8rcVlMYUhkUEhXZgpWUGszNXJmejM0Qk4xN2VyaEVxRjF0U0c1MWFqCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTkRNMk56UTNPRGt3SGhjTk1qVXdOREF6TVRBd05qSTVXaGNOTXpVd05EQXhNVEF3TmpJNQpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTkRNMk56UTNPRGt3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFUNDF1NXIzM0JyenZ3ZXZaWHM2TEg3T1k4NGhOOGRrODdnTlhaUndBdWkKdXJBaU45TFdYcmYxeFoyaXp5d0FiVGk1ZVc2Q1hIMjhDdEVSWUlrcjNoTXdvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBMRVJWa1VxY2ViUTM0SGZMa082CmNpK0hReTB3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUloQUpLWGZLdXBzdklONEtQVW50c1lPNXhiaGhSQmhSYlIKN3JyeWs2VHpZMU5JQWlBVktKWis3UUxzeGFyQktORnI3eTVYYlNGanI3Y1gyQmhOYy9wdnFLcWtFUT09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K", + "KUBE_DATA": "LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSUVJd01wVjdzMHc2S0VTQ2FBWDhvSVZPUHloa2U0Q3duNWZQZnhOaUYyM3JvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFak5rVWxGTk90OXJJemhrMzFDdExIVCttWHVGMkNoZEwyVnJhOW9RVWQ1cEJtTk12OXhqQgpZdEhHSDE0MlF4RVBwdzAzN0o5MGlycUtQa2owVDBQaUFnPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo=" } - From 012c8a83cbd4998e8d4d0681189aa441926e1ed3 Mon Sep 17 00:00:00 2001 From: pb Date: Thu, 17 Apr 2025 19:57:55 +0200 Subject: [PATCH 03/10] checking execution more often for dev purposes --- daemons/schedule_manager.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/daemons/schedule_manager.go b/daemons/schedule_manager.go index d353955..a300f50 100644 --- a/daemons/schedule_manager.go +++ b/daemons/schedule_manager.go @@ -111,11 +111,11 @@ func (s *ScheduleManager) listenForChange(nc *nats.Conn, chanName string, delete // Used at launch of the component to retrieve the next scheduled workflows // and then every X minutes in case some workflows were scheduled before launch func (s *ScheduleManager) SchedulePolling() { - var sleep_time float64 = 1 + var sleep_time float64 = 20 for { s.getNextScheduledWorkflows(1) s.Logger.Info().Msg("Current list of schedules -------> " + fmt.Sprintf("%v", len(Executions.Execs))) - time.Sleep(time.Minute * time.Duration(sleep_time)) + time.Sleep(time.Second * time.Duration(sleep_time)) } } func (s *ScheduleManager) getExecution(from time.Time, to time.Time) (exec_list []*workflow_execution.WorkflowExecution, err error) { From 0f6213cd14c1b75846664328182e745b09e4b307 Mon Sep 17 00:00:00 2001 From: pb Date: Thu, 17 Apr 2025 19:58:19 +0200 Subject: [PATCH 04/10] Added some test and logging on the path to execute monitord --- main.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index 7ba18c6..9b0a36f 100644 --- a/main.go +++ b/main.go @@ -4,13 +4,14 @@ import ( "fmt" "oc-schedulerd/conf" "oc-schedulerd/daemons" + "os" oclib "cloud.o-forge.io/core/oc-lib" ) func main() { - oclib.InitDaemon("oc-schedulerd") + l := oclib.GetLogger() o := oclib.GetConfLoader() c := oclib.SetConfig( @@ -20,6 +21,7 @@ func main() { o.GetStringDefault("LOKI_URL", ""), o.GetStringDefault("LOG_LEVEL", "info"), ) + conf.GetConfig().DBName = c.MongoDatabase conf.GetConfig().MongoUrl = c.MongoUrl conf.GetConfig().NatsUrl = c.NATSUrl @@ -32,6 +34,22 @@ func main() { conf.GetConfig().KubeCert = o.GetStringDefault("KUBE_CERT", "") conf.GetConfig().KubeData = o.GetStringDefault("KUBE_DATA", "") + // Test if oc-monitor binary is reachable + // For local executions + if _, err := os.Stat("../oc-monitord/oc-monitord"); err == nil { + conf.GetConfig().MonitorPath = "../oc-monitord/oc-monitord" + } + // For container executions + if _, err := os.Stat("/usr/bin/oc-monitord"); conf.GetConfig().MonitorPath == "" && err == nil { + conf.GetConfig().MonitorPath = "/usr/bin/oc-monitord" + } + + if conf.GetConfig().MonitorPath == "" { + l.Fatal().Msg("Could not find oc-monitord binary") + } + + l.Info().Msg("oc-monitord binary at " + conf.GetConfig().MonitorPath ) + sch_mngr := daemons.ScheduleManager{Logger: oclib.GetLogger()} exe_mngr := daemons.ExecutionManager{} From 494ba2f3619d63eb6ed3e5377ceb9b6a96082339 Mon Sep 17 00:00:00 2001 From: pb Date: Thu, 17 Apr 2025 19:58:59 +0200 Subject: [PATCH 05/10] updated the value of ExecutionID in LocalMonitor object --- daemons/execution_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/daemons/execution_manager.go b/daemons/execution_manager.go index e117534..352087c 100644 --- a/daemons/execution_manager.go +++ b/daemons/execution_manager.go @@ -51,7 +51,7 @@ func (em *ExecutionManager) executeExecution(Execution *workflow_execution.Workf monitor := LocalMonitor{ Logger: logger, Duration: duration, - ExecutionID: Execution.UUID, + ExecutionID: Execution.ExecutionsID, PeerID: Execution.CreatorID, } monitor.LaunchLocalMonitor() From d94f9603e842b9ed6f71f1316c014965bdfa4c5f Mon Sep 17 00:00:00 2001 From: pb Date: Thu, 17 Apr 2025 19:59:33 +0200 Subject: [PATCH 06/10] added logging to see if monitord is running well --- daemons/execute_monitor_local.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/daemons/execute_monitor_local.go b/daemons/execute_monitor_local.go index f58cc79..c2eabf4 100644 --- a/daemons/execute_monitor_local.go +++ b/daemons/execute_monitor_local.go @@ -1,10 +1,12 @@ package daemons import ( + "bufio" "fmt" "oc-schedulerd/conf" "os/exec" + oclib "cloud.o-forge.io/core/oc-lib" "github.com/rs/zerolog" ) @@ -23,6 +25,9 @@ func (lm *LocalMonitor) LaunchLocalMonitor() { } func (lm *LocalMonitor) execKube() { + + l := oclib.GetLogger() + args := []string{ "-e", lm.ExecutionID, "-p", lm.PeerID, "-u", conf.GetConfig().LokiUrl, "-m", conf.GetConfig().MongoUrl, "-d", conf.GetConfig().DBName, @@ -35,10 +40,24 @@ func (lm *LocalMonitor) execKube() { if lm.Duration > 0 { args = append(args, "-t", fmt.Sprintf("%d", lm.Duration)) } + cmd := exec.Command(conf.GetConfig().MonitorPath, args...) fmt.Printf("Command : %v\n", cmd) - err := cmd.Start() + + stdoutMonitord, err := cmd.StdoutPipe(); + if err != nil { + l.Error().Msg("Could not retrieve stdoutpipe for execution of oc-monitord" + err.Error()) + return + } + + err = cmd.Start() if err != nil { lm.Logger.Error().Msg("Could not start oc-monitor for " + lm.ExecutionID + " : " + err.Error()) } + + scanner := bufio.NewScanner(stdoutMonitord) + for scanner.Scan() { + output := scanner.Text() + l.Debug().Msg(output) + } } From b43cb6d758fbad9d0bf483146622a9e2b3b96992 Mon Sep 17 00:00:00 2001 From: pb Date: Fri, 18 Apr 2025 11:27:24 +0200 Subject: [PATCH 07/10] cleaned README, need to add more documentation in it --- README.md | 24 ++++++------------------ 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 47a7126..275a637 100644 --- a/README.md +++ b/README.md @@ -1,23 +1,11 @@ # oc-scheduler -OC-Scheduler retrieves the content of submitted workflows and prepare them to be executed. +oc-schedulerd is a daemon performing to actions at the same time : +- subscribing to the local NATS instance' custom channels for message commanding either the scheduling or the removing of an execution. +- polling oc-catalog for scheduled executions -make dev +Depending on the environment it is running in, oc-schedulerd will either : +- execute the oc-monitord binary +- run an oc-monitord container -## Parsing -From a workflow's name we retrieve the xml graph associated and parse it in order to create the object representing each componant. -Each object is linked to another, represented by a links object with the two object IDs has attributes. - -TODO : -- [x] Retrieve the user input's for each component. - -## Organising - -TODO : -- [ ] create an argo file from the graph/worfklow - - [ ] Create a different entry for each component - - [ ] execute each element in the right order - -## CHANGE ENV FOR KUBE -Add your proper CA, Cert & Data + external IP for kube config. \ No newline at end of file From 90fa0b8edd79298380e51d4456db801d940ec57c Mon Sep 17 00:00:00 2001 From: pb Date: Fri, 25 Apr 2025 11:14:54 +0200 Subject: [PATCH 08/10] Divided the execution between local and container and created an interface responsible for preparing and launching the execution --- conf/conf.go | 24 ++--- daemons/execute_monitor_container.go | 132 +++++++++++++++++++++++++++ daemons/execute_monitor_local.go | 56 +++++++----- daemons/execution_manager.go | 56 ++++++++---- daemons/interface.go | 21 +++++ 5 files changed, 237 insertions(+), 52 deletions(-) create mode 100644 daemons/execute_monitor_container.go create mode 100644 daemons/interface.go diff --git a/conf/conf.go b/conf/conf.go index ed29337..3e03005 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -8,18 +8,18 @@ import ( ) type Config struct { - MonitorPath string - MongoUrl string - DBName string - Logs string - LokiUrl string - NatsUrl string - Mode string - KubeHost string - KubePort string - KubeCA string - KubeCert string - KubeData string + MonitorPath string + MongoUrl string + DBName string + Logs string + LokiUrl string + NatsUrl string + Mode string + KubeHost string + KubePort string + KubeCA string + KubeCert string + KubeData string } var instance *Config diff --git a/daemons/execute_monitor_container.go b/daemons/execute_monitor_container.go new file mode 100644 index 0000000..57b5642 --- /dev/null +++ b/daemons/execute_monitor_container.go @@ -0,0 +1,132 @@ +package daemons + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "oc-schedulerd/conf" + + "github.com/rs/zerolog" +) + +type ContainerMonitor struct { + Monitor LocalMonitor + KubeCA string + KubeCert string + KubeData string + KubeHost string + KubePort string +} + +func NewContainerMonitor(executionsId string, peerId string, duration int) (Executor){ + return &ContainerMonitor{ + Monitor: LocalMonitor{ + ExecutionID: executionsId, + PeerID: peerId, + Duration: duration, + LokiUrl: conf.GetConfig().LokiUrl, + MongoUrl: conf.GetConfig().MongoUrl, + DBName: conf.GetConfig().DBName, + }, + KubeCA: conf.GetConfig().KubeCA, + KubeCert: conf.GetConfig().KubeCert, + KubeData: conf.GetConfig().KubeData, + KubeHost: conf.GetConfig().KubeHost, + KubePort: conf.GetConfig().KubePort, + } +} + +func (cm *ContainerMonitor) PrepareMonitorExec() []string { + + args := []string{ + "-e", cm.Monitor.ExecutionID, + "-p", cm.Monitor.PeerID, + "-u", cm.Monitor.LokiUrl, + "-m", cm.Monitor.MongoUrl, + "-d", cm.Monitor.DBName, + "-M", "kubernetes", + "-H", cm.KubeHost, + "-P", cm.KubePort, + "-C", cm.KubeCert, + "-D", cm.KubeData, + "-c", cm.KubeCA, + } + + if cm.Monitor.Duration > 0 { + args = append(args, "-t", fmt.Sprintf("%d", cm.Monitor.Duration)) + } + + return args + +} + +// Contact the docker's API at the KubeHost's URL to : +// - Check if the image exists +// - Create the container +// - Start the container +func (cm *ContainerMonitor) LaunchMonitor(args []string, l zerolog.Logger) { + + var containerID string + imageName := "oc-monitord" + url := "http://" + conf.GetConfig().KubeHost + ":2375" + + resp, err := http.Get(url + "/images/" + imageName + "/json") + if err != nil { + l.Fatal().Msg("Error when contacting the docker API on " + conf.GetConfig().KubeHost + ": " + err.Error()) + } + + if resp.StatusCode != http.StatusOK { + d, _ := io.ReadAll(resp.Body) + l.Fatal().Msg("Couldn't find the oc-monitord image : " + string(d)) + } + + dataCreation := map[string]interface{}{"Image": imageName, "Cmd" : args} + byteData, err := json.Marshal(dataCreation) + if err != nil { + l.Fatal().Msg("Error when contacting the creating request body : " + err.Error()) + } + + r, _ := http.NewRequest("POST",url + "/containers/create", bytes.NewBuffer(byteData)) + r.Header.Add("Content-Type","application/json") + resp, err = http.DefaultClient.Do(r) + if err != nil { + l.Fatal().Msg("Error when contacting the docker API on " + conf.GetConfig().KubeHost + ": " + err.Error()) + } + + if resp.StatusCode == 201 { + var d map[string]interface{} + + b, err := io.ReadAll(resp.Body) + if err != nil { + l.Fatal().Msg(err.Error()) + } + + err = json.Unmarshal(b, &d) + if err != nil { + l.Fatal().Msg(err.Error()) + } + + containerID = d["Id"].(string) + } else { + d, _ := io.ReadAll(resp.Body) + l.Fatal().Msg("Error when creating the container on " + conf.GetConfig().KubeHost + "\n " + string(d)) + } + + + resp, err = http.Post( url + "/containers/" + containerID + "/start", "", nil) + if err != nil { + l.Fatal().Msg("Error when contacting the docker API on " + conf.GetConfig().KubeHost + ": " + err.Error()) + } + + if resp.StatusCode >= 300 { + d, _ := io.ReadAll(resp.Body) + l.Fatal().Msg("Error when starting the container on " + conf.GetConfig().KubeHost + "\n " + string(d)) + } + + l.Info().Msg("Started container " + containerID) + // we can add logging with GET /containers/id/logs?stdout=true&follow=true + + // logExecution(stdoutMonitord, l) +} \ No newline at end of file diff --git a/daemons/execute_monitor_local.go b/daemons/execute_monitor_local.go index c2eabf4..33c57bf 100644 --- a/daemons/execute_monitor_local.go +++ b/daemons/execute_monitor_local.go @@ -1,12 +1,10 @@ package daemons import ( - "bufio" "fmt" "oc-schedulerd/conf" "os/exec" - oclib "cloud.o-forge.io/core/oc-lib" "github.com/rs/zerolog" ) @@ -14,50 +12,62 @@ type LocalMonitor struct { ExecutionID string PeerID string Duration int - Logger zerolog.Logger + LokiUrl string + MongoUrl string + DBName string + } -func (lm *LocalMonitor) LaunchLocalMonitor() { - if lm.ExecutionID == "" { - lm.Logger.Error().Msg("Missing parameter in LocalMonitor") +func NewLocalMonitor(executionsId string, peerId string, duration int) (Executor){ + return &LocalMonitor{ + ExecutionID: executionsId, + PeerID: peerId, + Duration: duration, + LokiUrl: conf.GetConfig().LokiUrl, + MongoUrl: conf.GetConfig().MongoUrl, + DBName: conf.GetConfig().DBName, } - lm.execKube() } -func (lm *LocalMonitor) execKube() { +// func (lm *LocalMonitor) LaunchLocalMonitor() { +// if lm.ExecutionID == "" { +// lm.Logger.Error().Msg("Missing parameter in LocalMonitor") +// } - l := oclib.GetLogger() +// } + +func (lm *LocalMonitor) PrepareMonitorExec() []string { args := []string{ - "-e", lm.ExecutionID, "-p", lm.PeerID, "-u", conf.GetConfig().LokiUrl, "-m", conf.GetConfig().MongoUrl, - "-d", conf.GetConfig().DBName, + "-e", lm.ExecutionID, + "-p", lm.PeerID, + "-u", lm.LokiUrl, + "-m", lm.MongoUrl, + "-d", lm.DBName, } - if conf.GetConfig().Mode == "kubernetes" { - args = append(args, []string{"-M", conf.GetConfig().Mode, "-H", conf.GetConfig().KubeHost, "-P", conf.GetConfig().KubePort, - "-C", conf.GetConfig().KubeCert, "-D", conf.GetConfig().KubeData, "-c", conf.GetConfig().KubeCA}...) - } + if lm.Duration > 0 { args = append(args, "-t", fmt.Sprintf("%d", lm.Duration)) } + return args +} + +func (lm *LocalMonitor) LaunchMonitor(args []string, l zerolog.Logger) { cmd := exec.Command(conf.GetConfig().MonitorPath, args...) fmt.Printf("Command : %v\n", cmd) - stdoutMonitord, err := cmd.StdoutPipe(); + stdoutMonitord, err := cmd.StdoutPipe() if err != nil { l.Error().Msg("Could not retrieve stdoutpipe for execution of oc-monitord" + err.Error()) - return } err = cmd.Start() if err != nil { - lm.Logger.Error().Msg("Could not start oc-monitor for " + lm.ExecutionID + " : " + err.Error()) + l.Error().Msg("Could not start oc-monitor for " + lm.ExecutionID + " : " + err.Error()) } - scanner := bufio.NewScanner(stdoutMonitord) - for scanner.Scan() { - output := scanner.Text() - l.Debug().Msg(output) - } + logExecution(stdoutMonitord, l) } + diff --git a/daemons/execution_manager.go b/daemons/execution_manager.go index 352087c..fcb48d4 100644 --- a/daemons/execution_manager.go +++ b/daemons/execution_manager.go @@ -2,7 +2,7 @@ package daemons import ( "fmt" - "os" + "oc-schedulerd/conf" "time" oclib "cloud.o-forge.io/core/oc-lib" @@ -38,22 +38,44 @@ func (em *ExecutionManager) RetrieveNextExecutions() { func (em *ExecutionManager) executeExecution(Execution *workflow_execution.WorkflowExecution) { // start execution // create the yaml that describes the pod : filename, path/url to Loki - exec_method := os.Getenv("MONITOR_METHOD") + var executor Executor + // exec_method := os.Getenv("MONITOR_METHOD") logger := oclib.GetLogger() - if exec_method == "k8s" { - logger.Error().Msg("TODO : executing oc-monitor in a k8s") - } else { - logger.Debug().Msg("Executing oc-monitor localy") - duration := 0 - if Execution.EndDate != nil { - duration = int(Execution.EndDate.Sub(Execution.ExecDate).Seconds()) - } - monitor := LocalMonitor{ - Logger: logger, - Duration: duration, - ExecutionID: Execution.ExecutionsID, - PeerID: Execution.CreatorID, - } - monitor.LaunchLocalMonitor() + duration := 0 + if Execution.EndDate != nil { + duration = int(Execution.EndDate.Sub(Execution.ExecDate).Seconds()) } + + if conf.GetConfig().Mode == "local" { + executor = NewLocalMonitor(Execution.ExecutionsID, Execution.CreatorID, duration) + } + + if conf.GetConfig().Mode == "container" { + executor = NewContainerMonitor(Execution.ExecutionsID, Execution.CreatorID, duration) + } + + if executor == nil { + logger.Fatal().Msg("Could not create logger") + } + args := executor.PrepareMonitorExec() + executor.LaunchMonitor(args,logger) + + // if exec_method == "k8s" { + // logger.Error().Msg("TODO : executing oc-monitor in a k8s") + // } else { + // logger.Debug().Msg("Executing oc-monitor localy") + // duration := 0 + // if Execution.EndDate != nil { + // duration = int(Execution.EndDate.Sub(Execution.ExecDate).Seconds()) + // } + // monitor := LocalMonitor{ + // Logger: logger, + // Duration: duration, + // ExecutionID: Execution.ExecutionsID, + // PeerID: Execution.CreatorID, + // LokiUrl: conf.GetConfig().LokiUrl, + + // } + // monitor.LaunchLocalMonitor() + // } } diff --git a/daemons/interface.go b/daemons/interface.go new file mode 100644 index 0000000..7b5f4fe --- /dev/null +++ b/daemons/interface.go @@ -0,0 +1,21 @@ +package daemons + +import ( + "bufio" + "io" + + "github.com/rs/zerolog" +) + +type Executor interface { + PrepareMonitorExec() []string + LaunchMonitor(args []string, l zerolog.Logger) +} + +func logExecution(reader io.ReadCloser, l zerolog.Logger) { + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + output := scanner.Text() + l.Debug().Msg(output) + } +} \ No newline at end of file From 6c3a20999ba16530f113163f142a8e5371234eac Mon Sep 17 00:00:00 2001 From: pb Date: Fri, 25 Apr 2025 11:33:48 +0200 Subject: [PATCH 09/10] misc --- README.md | 23 +++++++++++++++++++++++ schedulerd.json | 12 ++++++++++++ 2 files changed, 35 insertions(+) create mode 100644 schedulerd.json diff --git a/README.md b/README.md index 275a637..96440cb 100644 --- a/README.md +++ b/README.md @@ -8,4 +8,27 @@ Depending on the environment it is running in, oc-schedulerd will either : - execute the oc-monitord binary - run an oc-monitord container +## Parameters +oc-schedulerd uses json files to load its configuration. The template for this configuration file is below + +```json +{ + "LOKI_URL" : "http://[IP/URL]:3100", + "MONGO_URL":"mongodb://[IP/URL]:27017/", + "NATS_URL":"nats://[IP/URL]:4222", + "MONGO_DATABASE":"", + "MONITORD_PATH": "", + "KUBERNETES_SERVICE_HOST" : "[IP/URL]", + "MONITOR_MODE": "", + "KUBE_CA": "", + "KUBE_CERT": "", + "KUBE_DATA": "" +} +``` + +**monitor_mode** : should be either "local","container", "" + +## TODO + +- [ ] Implement the discovery of current mode : local, local in container, as a container \ No newline at end of file diff --git a/schedulerd.json b/schedulerd.json new file mode 100644 index 0000000..7bec63b --- /dev/null +++ b/schedulerd.json @@ -0,0 +1,12 @@ +{ + "LOKI_URL" : "http://172.16.0.181:3100", + "MONGO_URL":"mongodb://172.16.0.181:27017/", + "NATS_URL":"nats://172.16.0.181:4222", + "MONGO_DATABASE":"DC_myDC", + "MONITORD_PATH": "../oc-monitord/oc-monitord", + "KUBERNETES_SERVICE_HOST" : "172.16.0.181", + "MODE": "container", + "KUBE_CA": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTkRNMk56UTNPRGt3SGhjTk1qVXdOREF6TVRBd05qSTVXaGNOTXpVd05EQXhNVEF3TmpJNQpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTkRNMk56UTNPRGt3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFUL1NDWEMycjFTWGdza0FvTGJKSEtIem4zQXYva2t0ZElpSk42WlBsWVEKY3p0dXV5K3JBMHJ5VUlkZnIyK3VCRS9VN0NjSlhPL004QVdyODFwVklzVmdvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVVFHOVBQQ0g0c1lMbFkvQk5CdnN5CklEam1PK0l3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUtJeFc4NERQTW1URXVVN0Z3ek44SFB6ZHdldWh6U20KVzNYMU9tczFSQVNRQWlFQXI4UTJZSGtNQndSOThhcWtTa2JqU1dhejg0OEY2VkZLWjFacXpNbDFZaTg9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K", + "KUBE_CERT": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJZWFxQUp2bHhmYzh3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOelF6TmpjME56ZzVNQjRYRFRJMU1EUXdNekV3TURZeU9Wb1hEVEkyTURRdwpNekV3TURZeU9Wb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJJelpGSlJUVHJmYXlNNFoKTjlRclN4MC9wbDdoZGdvWFM5bGEydmFFRkhlYVFaalRML2NZd1dMUnhoOWVOa01SRDZjTk4reWZkSXE2aWo1SQo5RTlENGdLalNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFFzUkZXUlNweDV0RGZnZDh1UTdweUw0ZERMVEFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQStXZTlBVXJRUm5pWjVCUERELzJwWjA3TzFQWWFIc01ycTZZcVB4VlV5cGdDSUhrRE8rcVlMYUhkUEhXZgpWUGszNXJmejM0Qk4xN2VyaEVxRjF0U0c1MWFqCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTkRNMk56UTNPRGt3SGhjTk1qVXdOREF6TVRBd05qSTVXaGNOTXpVd05EQXhNVEF3TmpJNQpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTkRNMk56UTNPRGt3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFUNDF1NXIzM0JyenZ3ZXZaWHM2TEg3T1k4NGhOOGRrODdnTlhaUndBdWkKdXJBaU45TFdYcmYxeFoyaXp5d0FiVGk1ZVc2Q1hIMjhDdEVSWUlrcjNoTXdvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBMRVJWa1VxY2ViUTM0SGZMa082CmNpK0hReTB3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUloQUpLWGZLdXBzdklONEtQVW50c1lPNXhiaGhSQmhSYlIKN3JyeWs2VHpZMU5JQWlBVktKWis3UUxzeGFyQktORnI3eTVYYlNGanI3Y1gyQmhOYy9wdnFLcWtFUT09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K", + "KUBE_DATA": "LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSUVJd01wVjdzMHc2S0VTQ2FBWDhvSVZPUHloa2U0Q3duNWZQZnhOaUYyM3JvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFak5rVWxGTk90OXJJemhrMzFDdExIVCttWHVGMkNoZEwyVnJhOW9RVWQ1cEJtTk12OXhqQgpZdEhHSDE0MlF4RVBwdzAzN0o5MGlycUtQa2owVDBQaUFnPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo=" +} From 1b21c142f1c4026a7d299fd2beded9a118bb4df5 Mon Sep 17 00:00:00 2001 From: pb Date: Fri, 25 Apr 2025 15:42:21 +0200 Subject: [PATCH 10/10] added a step to connect the monitord container to the 'oc' network --- daemons/execute_monitor_container.go | 28 +++++++++++++++++++++------- main.go | 13 ++++++++----- 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/daemons/execute_monitor_container.go b/daemons/execute_monitor_container.go index 57b5642..c643f33 100644 --- a/daemons/execute_monitor_container.go +++ b/daemons/execute_monitor_container.go @@ -70,11 +70,11 @@ func (cm *ContainerMonitor) LaunchMonitor(args []string, l zerolog.Logger) { var containerID string imageName := "oc-monitord" - url := "http://" + conf.GetConfig().KubeHost + ":2375" + url := "http://" + cm.KubeHost + ":2375" resp, err := http.Get(url + "/images/" + imageName + "/json") if err != nil { - l.Fatal().Msg("Error when contacting the docker API on " + conf.GetConfig().KubeHost + ": " + err.Error()) + l.Fatal().Msg("Error when contacting the docker API on " + url + ": " + err.Error()) } if resp.StatusCode != http.StatusOK { @@ -92,7 +92,7 @@ func (cm *ContainerMonitor) LaunchMonitor(args []string, l zerolog.Logger) { r.Header.Add("Content-Type","application/json") resp, err = http.DefaultClient.Do(r) if err != nil { - l.Fatal().Msg("Error when contacting the docker API on " + conf.GetConfig().KubeHost + ": " + err.Error()) + l.Fatal().Msg("Error when contacting the docker API on " + url + ": " + err.Error()) } if resp.StatusCode == 201 { @@ -111,18 +111,32 @@ func (cm *ContainerMonitor) LaunchMonitor(args []string, l zerolog.Logger) { containerID = d["Id"].(string) } else { d, _ := io.ReadAll(resp.Body) - l.Fatal().Msg("Error when creating the container on " + conf.GetConfig().KubeHost + "\n " + string(d)) + l.Fatal().Msg("Error when creating the container on " + url + "\n " + string(d)) + } + + networkName := "oc" + + dataNetwork, _ := json.Marshal(map[string]string{"Container" : containerID}) + r, _ = http.NewRequest("POST",url + "/networks/" + networkName + "/connect", bytes.NewBuffer(dataNetwork)) + r.Header.Add("Content-Type","application/json") + resp, err = http.DefaultClient.Do(r) + if err != nil { + l.Fatal().Msg("Error when contacting the docker API on " + url + ": " + err.Error()) + } + if resp.StatusCode != 200 { + d, _ := io.ReadAll(resp.Body) + l.Error().Msg("Error when adding container to the network : " + string(d)) + return } - resp, err = http.Post( url + "/containers/" + containerID + "/start", "", nil) if err != nil { - l.Fatal().Msg("Error when contacting the docker API on " + conf.GetConfig().KubeHost + ": " + err.Error()) + l.Fatal().Msg("Error when contacting the docker API on " + url + ": " + err.Error()) } if resp.StatusCode >= 300 { d, _ := io.ReadAll(resp.Body) - l.Fatal().Msg("Error when starting the container on " + conf.GetConfig().KubeHost + "\n " + string(d)) + l.Fatal().Msg("Error when starting the container on " + url + "\n " + string(d)) } l.Info().Msg("Started container " + containerID) diff --git a/main.go b/main.go index 9b0a36f..05d956d 100644 --- a/main.go +++ b/main.go @@ -27,12 +27,15 @@ func main() { conf.GetConfig().NatsUrl = c.NATSUrl conf.GetConfig().LokiUrl = c.LokiUrl conf.GetConfig().Mode = o.GetStringDefault("MODE", "") - conf.GetConfig().KubeHost = o.GetStringDefault("KUBERNETES_SERVICE_HOST", "") - conf.GetConfig().KubePort = o.GetStringDefault("KUBERNETES_SERVICE_PORT", "6443") - conf.GetConfig().KubeCA = o.GetStringDefault("KUBE_CA", "") - conf.GetConfig().KubeCert = o.GetStringDefault("KUBE_CERT", "") - conf.GetConfig().KubeData = o.GetStringDefault("KUBE_DATA", "") + if conf.GetConfig().Mode == "container"{ + conf.GetConfig().KubeHost = o.GetStringDefault("KUBERNETES_SERVICE_HOST", "") + conf.GetConfig().KubePort = o.GetStringDefault("KUBERNETES_SERVICE_PORT", "6443") + + conf.GetConfig().KubeCA = o.GetStringDefault("KUBE_CA", "") + conf.GetConfig().KubeCert = o.GetStringDefault("KUBE_CERT", "") + conf.GetConfig().KubeData = o.GetStringDefault("KUBE_DATA", "") + } // Test if oc-monitor binary is reachable // For local executions