package daemons import ( "bytes" "oc-scheduler/conf" "oc-scheduler/logger" "oc-scheduler/models" "oc-scheduler/workflow_builder" "os" "os/exec" "regexp" "text/template" "time" ) type ExecutionManager struct { bookings *models.ScheduledBooking executions []models.Booking } type manifestValues struct{ ARGO_FILE string LOKI_URL string CONTAINER_NAME string } func (em *ExecutionManager) SetBookings(b *models.ScheduledBooking){ em.bookings = b } // Loop every second on the booking's list and move the booking that must start to a new list // that will be looped over to start them func (em *ExecutionManager) RetrieveNextExecutions(){ if(em.bookings == nil){ logger.Logger.Fatal().Msg("booking has not been set in the exection manager") } for(true){ logger.Logger.Debug().Msg("New loop") em.bookings.Mu.Lock() bookings := em.bookings.Bookings if (len(bookings) > 0){ for i := len( bookings) - 1 ; i >= 0 ; i--{ logger.Logger.Debug().Msg("It should start at " + bookings[i].Start.String() + " and it is now " + time.Now().UTC() .String()) if (bookings[i].Start.Before(time.Now().UTC())){ logger.Logger.Info().Msg("Will execute " + bookings[i].Workflow + " soon") go em.executeBooking(bookings[i]) bookings = append(bookings[:i], bookings[i+1:]...) em.bookings.Bookings = bookings } } } em.bookings.Mu.Unlock() time.Sleep(time.Second) } } func (em *ExecutionManager) executeBooking(booking models.Booking){ // create argo new_graph := workflow_builder.Graph{} err := new_graph.LoadFrom(booking.Workflow) if err != nil { logger.Logger.Error().Msg("Could not retrieve workflow " + booking.Workflow + " from oc-catalog API") } argo_file_path, err := new_graph.ExportToArgo() if err != nil { logger.Logger.Error().Msg("Could not create the Argo file for " + booking.Workflow ) logger.Logger.Error().Msg(err.Error()) } logger.Logger.Debug().Msg("Created :" + argo_file_path) // start execution // create the yaml that describes the pod : filename, path/url to Loki manifest, err := em.CreateManifest(booking, argo_file_path) if err != nil { logger.Logger.Error().Msg("Could not create manifest " + err.Error()) } // launch a pod that contains oc-monitor, give it the name of the workflow cmd := exec.Command("kubectl","apply","-f", "manifests/"+manifest) output , err := cmd.CombinedOutput() if err != nil { logger.Logger.Error().Msg("failed to create new pod for " + booking.Workflow + " :" + err.Error()) return } logger.Logger.Debug().Msg("Result from kubectl apply : " + string(output)) // Transfer the argo file to the pod cmd = exec.Command("kubectl","cp", "argo_workflows/" + argo_file_path, "pods/test-monitor:/app/workflows", "-c", "oc-monitor-" + getContainerName(argo_file_path)) output, err = cmd.CombinedOutput() if err != nil { logger.Logger.Error().Msg("failed to copy argo file to " + booking.Workflow + " :" + err.Error()) return } logger.Logger.Debug().Msg("Result from kubectl cp : " + string(output)) } func (*ExecutionManager) CreateManifest(booking models.Booking, argo_file string) (manifest_filepath string, err error) { var filled_template bytes.Buffer manifest_file, err := os.ReadFile("conf/monitor_pod_template.yml") if err != nil { logger.Logger.Error().Msg("Could not open the k8s template file for " + booking.Workflow) return "", err } container_name := getContainerName(argo_file) tmpl, err := template.New("manifest_template").Parse(string(manifest_file)) if err != nil { logger.Logger.Error().Msg(err.Error()) return "", err } manifest_data := manifestValues{ ARGO_FILE: argo_file, LOKI_URL: conf.GetConfig().Logs, CONTAINER_NAME: container_name, } err = tmpl.Execute(&filled_template, manifest_data) if err != nil { logger.Logger.Error().Msg("Could not complete manifest template for " + booking.Workflow) return "", err } manifest_filepath = booking.Workflow + "_manifest.yaml" err = os.WriteFile("manifests/" + manifest_filepath, filled_template.Bytes(),0644) if err != nil { logger.Logger.Error().Msg("Could not write the YAML file for " + booking.Workflow + "'s manifest") return "", err } return manifest_filepath, nil } func getContainerName(argo_file string) string { regex := "([a-zA-Z]+-[a-zA-Z]+)" re := regexp.MustCompile(regex) container_name := re.FindString(argo_file) return container_name }