integrated oclib
This commit is contained in:
@@ -1,3 +1,67 @@
|
||||
package daemons
|
||||
|
||||
// copy/transfer the argo_file to the created pod
|
||||
// type manifestValues struct{
|
||||
// ARGO_FILE string
|
||||
// LOKI_URL string
|
||||
// CONTAINER_NAME string
|
||||
// }
|
||||
|
||||
// manifest, err := em.CreateManifest(booking, argo_file_path)
|
||||
// if err != nil {
|
||||
// logger.Logger.Error().Msg("Could not create manifest " + err.Error())
|
||||
// }
|
||||
|
||||
// // launch a pod that contains oc-monitor, give it the name of the workflow
|
||||
// cmd := exec.Command("kubectl","apply","-f", "manifests/"+manifest)
|
||||
// output , err := cmd.CombinedOutput()
|
||||
// if err != nil {
|
||||
// logger.Logger.Error().Msg("failed to create new pod for " + booking.Workflow + " :" + err.Error())
|
||||
// return
|
||||
// }
|
||||
|
||||
// func (*ExecutionManager) CreateManifest(booking models.Booking, argo_file string) (manifest_filepath string, err error) {
|
||||
|
||||
// var filled_template bytes.Buffer
|
||||
|
||||
// manifest_file, err := os.ReadFile("conf/monitor_pod_template.yml")
|
||||
// if err != nil {
|
||||
// logger.Logger.Error().Msg("Could not open the k8s template file for " + booking.Workflow)
|
||||
// return "", err
|
||||
// }
|
||||
|
||||
// container_name := getContainerName(argo_file)
|
||||
// tmpl, err := template.New("manifest_template").Parse(string(manifest_file))
|
||||
// if err != nil {
|
||||
// logger.Logger.Error().Msg(err.Error())
|
||||
// return "", err
|
||||
// }
|
||||
|
||||
// manifest_data := manifestValues{
|
||||
// ARGO_FILE: argo_file,
|
||||
// LOKI_URL: conf.GetConfig().Logs,
|
||||
// CONTAINER_NAME: container_name,
|
||||
// }
|
||||
|
||||
// err = tmpl.Execute(&filled_template, manifest_data)
|
||||
// if err != nil {
|
||||
// logger.Logger.Error().Msg("Could not complete manifest template for " + booking.Workflow)
|
||||
// return "", err }
|
||||
|
||||
// manifest_filepath = booking.Workflow + "_manifest.yaml"
|
||||
|
||||
// err = os.WriteFile("manifests/" + manifest_filepath, filled_template.Bytes(),0644)
|
||||
// if err != nil {
|
||||
// logger.Logger.Error().Msg("Could not write the YAML file for " + booking.Workflow + "'s manifest")
|
||||
// return "", err
|
||||
// }
|
||||
|
||||
// return manifest_filepath, nil
|
||||
// }
|
||||
|
||||
// func getContainerName(argo_file string) string {
|
||||
// regex := "([a-zA-Z]+-[a-zA-Z]+)"
|
||||
// re := regexp.MustCompile(regex)
|
||||
|
||||
// container_name := re.FindString(argo_file)
|
||||
// return container_name
|
||||
// }
|
||||
@@ -1,32 +1,45 @@
|
||||
package daemons
|
||||
|
||||
import "oc-scheduler/logger"
|
||||
import (
|
||||
"fmt"
|
||||
"oc-scheduler/logger"
|
||||
"os/exec"
|
||||
)
|
||||
|
||||
type LocalMonitor struct{
|
||||
LokiURL string
|
||||
KubeURL string
|
||||
ArgoFile string
|
||||
LokiURL string
|
||||
KubeURL string
|
||||
WorkflowName string
|
||||
}
|
||||
|
||||
func (lm *LocalMonitor) LaunchLocalMonitor (){
|
||||
if (lm.LokiURL == "" || lm.KubeURL == "" || lm.ArgoFile == ""){
|
||||
if (lm.LokiURL == "" || lm.KubeURL == "" || lm.WorkflowName == ""){
|
||||
logger.Logger.Error().Msg("Missing parameter in LocalMonitor")
|
||||
}
|
||||
|
||||
// For dev purposes, in prod KubeURL must be a kube API's URL
|
||||
if(lm.KubeURL == "localhost"){
|
||||
lm.ExecLocalKube()
|
||||
lm.execLocalKube()
|
||||
} else{
|
||||
lm.ExecRemoteKube()
|
||||
lm.execRemoteKube()
|
||||
}
|
||||
}
|
||||
|
||||
func (lm *LocalMonitor) ExecLocalKube (){
|
||||
func (lm *LocalMonitor) execLocalKube (){
|
||||
// kube_url := ""
|
||||
cmd := exec.Command("../oc-monitor/oc-monitor", "-w",lm.WorkflowName, "-u", lm.LokiURL)
|
||||
// cmd_ls := exec.Command("ls", "../oc-monitor")
|
||||
output, err := cmd.CombinedOutput()
|
||||
// output, err := cmd_ls.CombinedOutput()
|
||||
fmt.Println(string(output))
|
||||
if err !=nil {
|
||||
logger.Logger.Error().Msg("Could not start oc-monitor for " + lm.WorkflowName + " : " + err.Error())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
func (lm *LocalMonitor) ExecRemoteKube (){
|
||||
func (lm *LocalMonitor) execRemoteKube (){
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -1,15 +1,10 @@
|
||||
package daemons
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"oc-scheduler/conf"
|
||||
"oc-scheduler/logger"
|
||||
"oc-scheduler/models"
|
||||
"oc-scheduler/workflow_builder"
|
||||
"os"
|
||||
"os/exec"
|
||||
"regexp"
|
||||
"text/template"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -18,11 +13,7 @@ type ExecutionManager struct {
|
||||
executions []models.Booking
|
||||
}
|
||||
|
||||
type manifestValues struct{
|
||||
ARGO_FILE string
|
||||
LOKI_URL string
|
||||
CONTAINER_NAME string
|
||||
}
|
||||
|
||||
|
||||
func (em *ExecutionManager) SetBookings(b *models.ScheduledBooking){
|
||||
em.bookings = b
|
||||
@@ -34,7 +25,8 @@ func (em *ExecutionManager) RetrieveNextExecutions(){
|
||||
|
||||
|
||||
if(em.bookings == nil){
|
||||
logger.Logger.Fatal().Msg("booking has not been set in the exection manager")
|
||||
logger.Logger.Error().Msg("booking has not been set in the exection manager")
|
||||
return
|
||||
}
|
||||
|
||||
for(true){
|
||||
@@ -55,101 +47,26 @@ func (em *ExecutionManager) RetrieveNextExecutions(){
|
||||
em.bookings.Mu.Unlock()
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (em *ExecutionManager) executeBooking(booking models.Booking){
|
||||
|
||||
// create argo
|
||||
new_graph := workflow_builder.Graph{}
|
||||
|
||||
err := new_graph.LoadFrom(booking.Workflow)
|
||||
if err != nil {
|
||||
logger.Logger.Error().Msg("Could not retrieve workflow " + booking.Workflow + " from oc-catalog API")
|
||||
}
|
||||
|
||||
argo_file_path, err := new_graph.ExportToArgo()
|
||||
if err != nil {
|
||||
logger.Logger.Error().Msg("Could not create the Argo file for " + booking.Workflow )
|
||||
logger.Logger.Error().Msg(err.Error())
|
||||
}
|
||||
|
||||
logger.Logger.Debug().Msg("Created :" + argo_file_path)
|
||||
// start execution
|
||||
// create the yaml that describes the pod : filename, path/url to Loki
|
||||
manifest, err := em.CreateManifest(booking, argo_file_path)
|
||||
if err != nil {
|
||||
logger.Logger.Error().Msg("Could not create manifest " + err.Error())
|
||||
|
||||
|
||||
exec_method := os.Getenv("MONITOR_METHOD")
|
||||
|
||||
if exec_method == "local"{
|
||||
logger.Logger.Debug().Msg("Executing oc-monitor localy")
|
||||
monitor := LocalMonitor{LokiURL: conf.GetConfig().LokiUrl,KubeURL: "localhost",WorkflowName: booking.Workflow}
|
||||
monitor.LaunchLocalMonitor()
|
||||
}else{
|
||||
logger.Logger.Error().Msg("TODO : executing oc-monitor in a k8s")
|
||||
}
|
||||
|
||||
// launch a pod that contains oc-monitor, give it the name of the workflow
|
||||
cmd := exec.Command("kubectl","apply","-f", "manifests/"+manifest)
|
||||
output , err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
logger.Logger.Error().Msg("failed to create new pod for " + booking.Workflow + " :" + err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
logger.Logger.Debug().Msg("Result from kubectl apply : " + string(output))
|
||||
|
||||
// Transfer the argo file to the pod
|
||||
|
||||
cmd = exec.Command("kubectl","cp", "argo_workflows/" + argo_file_path, "pods/test-monitor:/app/workflows", "-c", "oc-monitor-" + getContainerName(argo_file_path))
|
||||
output, err = cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
logger.Logger.Error().Msg("failed to copy argo file to " + booking.Workflow + " :" + err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
logger.Logger.Debug().Msg("Result from kubectl cp : " + string(output))
|
||||
|
||||
}
|
||||
|
||||
func (*ExecutionManager) CreateManifest(booking models.Booking, argo_file string) (manifest_filepath string, err error) {
|
||||
|
||||
var filled_template bytes.Buffer
|
||||
|
||||
manifest_file, err := os.ReadFile("conf/monitor_pod_template.yml")
|
||||
if err != nil {
|
||||
logger.Logger.Error().Msg("Could not open the k8s template file for " + booking.Workflow)
|
||||
return "", err
|
||||
}
|
||||
|
||||
container_name := getContainerName(argo_file)
|
||||
tmpl, err := template.New("manifest_template").Parse(string(manifest_file))
|
||||
if err != nil {
|
||||
logger.Logger.Error().Msg(err.Error())
|
||||
return "", err
|
||||
}
|
||||
|
||||
manifest_data := manifestValues{
|
||||
ARGO_FILE: argo_file,
|
||||
LOKI_URL: conf.GetConfig().Logs,
|
||||
CONTAINER_NAME: container_name,
|
||||
}
|
||||
|
||||
err = tmpl.Execute(&filled_template, manifest_data)
|
||||
if err != nil {
|
||||
logger.Logger.Error().Msg("Could not complete manifest template for " + booking.Workflow)
|
||||
return "", err }
|
||||
|
||||
|
||||
manifest_filepath = booking.Workflow + "_manifest.yaml"
|
||||
|
||||
err = os.WriteFile("manifests/" + manifest_filepath, filled_template.Bytes(),0644)
|
||||
if err != nil {
|
||||
logger.Logger.Error().Msg("Could not write the YAML file for " + booking.Workflow + "'s manifest")
|
||||
return "", err
|
||||
}
|
||||
|
||||
return manifest_filepath, nil
|
||||
}
|
||||
|
||||
func getContainerName(argo_file string) string {
|
||||
regex := "([a-zA-Z]+-[a-zA-Z]+)"
|
||||
re := regexp.MustCompile(regex)
|
||||
|
||||
container_name := re.FindString(argo_file)
|
||||
return container_name
|
||||
|
||||
}
|
||||
@@ -3,13 +3,16 @@ package daemons
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"oc-scheduler/logger"
|
||||
"oc-scheduler/models"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/dbs"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||
"github.com/nats-io/nats.go"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
|
||||
// NATS daemon listens to subject " workflowsUpdate "
|
||||
@@ -29,23 +32,26 @@ func (s *ScheduleManager) SetBookings(b *models.ScheduledBooking){
|
||||
|
||||
// Goroutine listening to a NATS server for updates
|
||||
// on workflows' scheduling. Messages must contain
|
||||
// workflow's name, start_date and stop_date while there
|
||||
// is no way to get scheduling infos for a specific workflow
|
||||
// workflow execution ID, to allow retrieval of execution infos
|
||||
func (s *ScheduleManager) ListenForWorkflowSubmissions(){
|
||||
|
||||
if(s.bookings == nil){
|
||||
logger.Logger.Fatal().Msg("booking has not been set in the schedule manager")
|
||||
logger.Logger.Error().Msg("booking has not been set in the schedule manager")
|
||||
}
|
||||
|
||||
nc, _ := nats.Connect(nats.DefaultURL)
|
||||
defer nc.Close()
|
||||
nc, err := nats.Connect(nats.DefaultURL)
|
||||
if err != nil {
|
||||
logger.Logger.Error().Msg("Could not connect to NATS")
|
||||
return
|
||||
}
|
||||
|
||||
defer nc.Close()
|
||||
|
||||
ch := make(chan *nats.Msg, 64)
|
||||
|
||||
subs , err := nc.ChanSubscribe("workflowsUpdate", ch)
|
||||
if err != nil {
|
||||
logger.Logger.Fatal().Msg("Error listening to NATS")
|
||||
logger.Logger.Error().Msg("Error listening to NATS")
|
||||
}
|
||||
defer subs.Unsubscribe()
|
||||
|
||||
@@ -56,21 +62,29 @@ func (s *ScheduleManager) ListenForWorkflowSubmissions(){
|
||||
|
||||
s.bookings.Mu.Lock()
|
||||
|
||||
start, err := time.Parse(time.RFC3339,map_mess["start_date"])
|
||||
if err != nil{
|
||||
logger.Logger.Error().Msg(err.Error())
|
||||
}
|
||||
stop, err := time.Parse(time.RFC3339,map_mess["stop_date"])
|
||||
if err != nil{
|
||||
logger.Logger.Error().Msg(err.Error())
|
||||
}
|
||||
wf_exec := getWorkflowExecution(map_mess["workflow"])
|
||||
|
||||
s.bookings.AddSchedule(models.Booking{Workflow: map_mess["workflow"], Start: start, Stop: stop })
|
||||
s.bookings.AddSchedule(models.Booking{Workflow: map_mess["workflow"], Start: *wf_exec.ExecDate, Stop: *wf_exec.EndDate })
|
||||
s.bookings.Mu.Unlock()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
func getWorkflowExecution(exec_id string) *workflow_execution.WorkflowExecution {
|
||||
|
||||
res := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION),exec_id)
|
||||
|
||||
if res.Code != 200 {
|
||||
logger.Logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id)
|
||||
return nil
|
||||
}
|
||||
|
||||
wf_exec := res.ToWorkflowExecution()
|
||||
|
||||
return wf_exec
|
||||
}
|
||||
|
||||
// At the moment very simplistic, but could be useful if we send bigger messages
|
||||
func retrieveMapFromSub(message []byte) (result_map map[string]string) {
|
||||
json.Unmarshal(message, &result_map)
|
||||
@@ -80,44 +94,63 @@ func retrieveMapFromSub(message []byte) (result_map map[string]string) {
|
||||
// Used at launch of the component to retrieve the next scheduled workflows
|
||||
// and then every X minutes in case some workflows were scheduled before launch
|
||||
func (s *ScheduleManager) SchedulePolling (){
|
||||
var sleep_time float64 = 1
|
||||
for(true){
|
||||
err := s.getNextScheduledWorkflows(s.Api_url, 0.3)
|
||||
if err != nil {
|
||||
logger.Logger.Fatal().Msg("Failed to get the workspaces list, check api url and that api server is up : " + s.Api_url)
|
||||
}
|
||||
s.getNextScheduledWorkflows(1800)
|
||||
|
||||
logger.Logger.Info().Msg("Current list of schedules")
|
||||
fmt.Println(s.bookings.Bookings)
|
||||
|
||||
time.Sleep(time.Minute * 5)
|
||||
time.Sleep(time.Minute * time.Duration(sleep_time))
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ScheduleManager) getNextScheduledWorkflows(apiurl string, hours float64) (error) {
|
||||
s.ws.Init(apiurl)
|
||||
params := url.Values{}
|
||||
start := time.Now().UTC()
|
||||
params.Add("start_date", start.Format(time.RFC3339))
|
||||
time_span := time.Hour * time.Duration(hours)
|
||||
params.Add("stop_date",start.Add(time_span).Format(time.RFC3339))
|
||||
body, err := s.ws.Get("v1/schedule?" + params.Encode())
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
func (s *ScheduleManager) getWorfklowExecution(from time.Time, to time.Time) (exec_list []workflow_execution.WorkflowExecution, err error) {
|
||||
|
||||
f := dbs.Filters{
|
||||
And: map[string][]dbs.Filter{
|
||||
"execution_date" : {{Operator : dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(from)}},
|
||||
"end_date": {{Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(to)}},
|
||||
// "state": {{Operator: dbs.EQUAL.String(), Value: 1}},
|
||||
},
|
||||
}
|
||||
var workflows []map[string]string
|
||||
json.Unmarshal(body,&workflows)
|
||||
|
||||
res := oclib.Search(&f,"",oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION))
|
||||
if res.Code != 200 {
|
||||
logger.Logger.Error().Msg("Error loading")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
for _, exec := range(res.Data){
|
||||
lib_data := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION),exec.GetID())
|
||||
exec_obj := lib_data.ToWorkflowExecution()
|
||||
exec_list = append(exec_list, *exec_obj)
|
||||
}
|
||||
return exec_list, nil
|
||||
}
|
||||
|
||||
// TODO : refactor to implement oclib.Search
|
||||
func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) {
|
||||
start := time.Now().UTC()
|
||||
end := start.Add(time.Minute * time.Duration(minutes)).UTC()
|
||||
|
||||
fmt.Printf("Getting workflows execution from %s to %s \n", start.String(), end.String())
|
||||
|
||||
next_wf_exec, err := s.getWorfklowExecution(start,end)
|
||||
if err != nil {
|
||||
logger.Logger.Error().Msg("Could not retrieve next schedules")
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
s.bookings.Mu.Lock()
|
||||
defer s.bookings.Mu.Unlock()
|
||||
|
||||
for _, workflow := range(workflows){
|
||||
start, _ := time.Parse(time.RFC3339,workflow["start_date"])
|
||||
stop, _ := time.Parse(time.RFC3339,workflow["stop_date"])
|
||||
for _, exec := range(next_wf_exec){
|
||||
exec_start := exec.ExecDate
|
||||
exec_stop := exec.EndDate
|
||||
|
||||
s.bookings.AddSchedule(models.Booking{Workflow: workflow["Workflow"], Start: start, Stop: stop})
|
||||
s.bookings.AddSchedule(models.Booking{Workflow: exec.UUID, Start: *exec_start, Stop: *exec_stop})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user