minimize code + schedulerd naming + docker
This commit is contained in:
		
							
								
								
									
										18
									
								
								.vscode/launch.json
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										18
									
								
								.vscode/launch.json
									
									
									
									
										vendored
									
									
										Normal file
									
								
							@@ -0,0 +1,18 @@
 | 
			
		||||
{
 | 
			
		||||
    // Use IntelliSense to learn about possible attributes.
 | 
			
		||||
    // Hover to view descriptions of existing attributes.
 | 
			
		||||
    // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
 | 
			
		||||
    "version": "0.2.0",
 | 
			
		||||
    "configurations": [
 | 
			
		||||
        {
 | 
			
		||||
            "name": "Launch Package",
 | 
			
		||||
            "type": "go",
 | 
			
		||||
            "request": "launch",
 | 
			
		||||
            "mode": "auto",
 | 
			
		||||
            "program": "${fileDirname}",
 | 
			
		||||
            "env": {
 | 
			
		||||
                "MONITOR_METHOD" : "local"
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    ]
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										15
									
								
								Dockerfile
									
									
									
									
									
								
							
							
						
						
									
										15
									
								
								Dockerfile
									
									
									
									
									
								
							@@ -4,15 +4,20 @@ ENV DOCKER_ENVIRONMENT=true
 | 
			
		||||
WORKDIR /app
 | 
			
		||||
 | 
			
		||||
COPY . .
 | 
			
		||||
COPY conf/docker_scheduler.json /etc/oc/scheduler.json
 | 
			
		||||
 | 
			
		||||
RUN go build .
 | 
			
		||||
 | 
			
		||||
FROM golang:alpine
 | 
			
		||||
FROM oc-monitord:latest AS monitord
 | 
			
		||||
 | 
			
		||||
FROM argoproj/argocd:latest
 | 
			
		||||
 | 
			
		||||
ENV MONITORD_PATH = "./oc-monitord"
 | 
			
		||||
 | 
			
		||||
WORKDIR /app
 | 
			
		||||
 | 
			
		||||
COPY --from=builder /app/oc-scheduler .
 | 
			
		||||
COPY conf/docker_scheduler.json /etc/oc/scheduler.json
 | 
			
		||||
COPY --from=monitord /app/oc-monitord .
 | 
			
		||||
COPY --from=builder /app/oc-schedulerd .
 | 
			
		||||
COPY conf/docker_schedulerd.json /etc/oc/schedulerd.json
 | 
			
		||||
 | 
			
		||||
ENTRYPOINT ["/app/oc-scheduler"]
 | 
			
		||||
ENTRYPOINT ["/app/oc-schedulerd"]
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										33
									
								
								conf/conf.go
									
									
									
									
									
								
							
							
						
						
									
										33
									
								
								conf/conf.go
									
									
									
									
									
								
							@@ -8,27 +8,26 @@ import (
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Config struct {
 | 
			
		||||
	OcCatalogUrl 	string
 | 
			
		||||
	MongoUrl		string
 | 
			
		||||
	DBName			string
 | 
			
		||||
	Logs 			string
 | 
			
		||||
	LokiUrl			string
 | 
			
		||||
	NatsUrl			string
 | 
			
		||||
	MonitorPath  string
 | 
			
		||||
	OcCatalogUrl string
 | 
			
		||||
	MongoUrl     string
 | 
			
		||||
	DBName       string
 | 
			
		||||
	Logs         string
 | 
			
		||||
	LokiUrl      string
 | 
			
		||||
	NatsUrl      string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var instance *Config
 | 
			
		||||
var once sync.Once
 | 
			
		||||
 | 
			
		||||
const defaultConfigFile = "/etc/oc/scheduler.json"
 | 
			
		||||
const localConfigFile = "./conf/local_scheduler.json"
 | 
			
		||||
const defaultConfigFile = "/etc/oc/schedulerd.json"
 | 
			
		||||
const localConfigFile = "./conf/local_schedulerd.json"
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
func init(){
 | 
			
		||||
	
 | 
			
		||||
func init() {
 | 
			
		||||
	configFile := ""
 | 
			
		||||
	var o *onion.Onion
 | 
			
		||||
 | 
			
		||||
	l3 := onion.NewEnvLayerPrefix("_", "OCSCHEDULER_")
 | 
			
		||||
	l3 := onion.NewEnvLayerPrefix("_", "OCSCHEDULERD_")
 | 
			
		||||
	l2, err := onion.NewFileLayer(defaultConfigFile, nil)
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		logs.Info("Config file found : " + defaultConfigFile)
 | 
			
		||||
@@ -49,13 +48,13 @@ func init(){
 | 
			
		||||
	} else if l2 == nil {
 | 
			
		||||
		o = onion.New(l1, l3)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	GetConfig().MonitorPath = o.GetStringDefault("MONITORD_PATH", "../oc-monitord/oc-monitord")
 | 
			
		||||
	GetConfig().OcCatalogUrl = o.GetStringDefault("oc-catalog", "https://localhost:49618")
 | 
			
		||||
	GetConfig().Logs = o.GetStringDefault("loglevel", "info")
 | 
			
		||||
	GetConfig().LokiUrl = o.GetStringDefault("loki_url","http://127.0.0.1:3100")
 | 
			
		||||
	GetConfig().NatsUrl = o.GetStringDefault("nats_url","http://127.0.0.1:4222")
 | 
			
		||||
	GetConfig().MongoUrl = o.GetStringDefault("mongo_url","mongodb://127.0.0.1:27017")
 | 
			
		||||
	GetConfig().DBName = o.GetStringDefault("database_name","DC_myDC")
 | 
			
		||||
	GetConfig().LokiUrl = o.GetStringDefault("loki_url", "http://127.0.0.1:3100")
 | 
			
		||||
	GetConfig().NatsUrl = o.GetStringDefault("nats_url", "http://127.0.0.1:4222")
 | 
			
		||||
	GetConfig().MongoUrl = o.GetStringDefault("mongo_url", "mongodb://127.0.0.1:27017")
 | 
			
		||||
	GetConfig().DBName = o.GetStringDefault("database_name", "DC_myDC")
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,47 +1,41 @@
 | 
			
		||||
package daemons
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"oc-scheduler/conf"
 | 
			
		||||
	"oc-scheduler/logger"
 | 
			
		||||
	"oc-schedulerd/conf"
 | 
			
		||||
	"os/exec"
 | 
			
		||||
 | 
			
		||||
	"github.com/rs/zerolog"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type LocalMonitor struct{
 | 
			
		||||
	LokiURL 		string
 | 
			
		||||
	KubeURL			string
 | 
			
		||||
	WorkflowName	string
 | 
			
		||||
type LocalMonitor struct {
 | 
			
		||||
	LokiURL      string
 | 
			
		||||
	KubeURL      string
 | 
			
		||||
	WorkflowName string
 | 
			
		||||
	Logger       zerolog.Logger
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (lm *LocalMonitor) LaunchLocalMonitor (){
 | 
			
		||||
	if (lm.LokiURL == "" || lm.KubeURL == "" || lm.WorkflowName == ""){
 | 
			
		||||
		logger.Logger.Error().Msg("Missing parameter in LocalMonitor")
 | 
			
		||||
func (lm *LocalMonitor) LaunchLocalMonitor() {
 | 
			
		||||
	if lm.LokiURL == "" || lm.KubeURL == "" || lm.WorkflowName == "" {
 | 
			
		||||
		lm.Logger.Error().Msg("Missing parameter in LocalMonitor")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// For dev purposes, in prod KubeURL must be a kube API's URL
 | 
			
		||||
	if(lm.KubeURL == "localhost"){
 | 
			
		||||
	if lm.KubeURL == "localhost" {
 | 
			
		||||
		lm.execLocalKube()
 | 
			
		||||
	} else{
 | 
			
		||||
	} else {
 | 
			
		||||
		lm.execRemoteKube()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (lm *LocalMonitor) execLocalKube (){
 | 
			
		||||
	// kube_url := ""
 | 
			
		||||
	cmd := exec.Command("../oc-monitor/oc-monitor", "-w",lm.WorkflowName, "-u", lm.LokiURL, "-m", conf.GetConfig().MongoUrl,"-d", conf.GetConfig().DBName)
 | 
			
		||||
	// cmd_ls := exec.Command("ls", "../oc-monitor")
 | 
			
		||||
func (lm *LocalMonitor) execLocalKube() {
 | 
			
		||||
	cmd := exec.Command(conf.GetConfig().MonitorPath, "-w", lm.WorkflowName, "-u", lm.LokiURL, "-m", conf.GetConfig().MongoUrl, "-d", conf.GetConfig().DBName)
 | 
			
		||||
	err := cmd.Start()
 | 
			
		||||
	// output, err := cmd_ls.CombinedOutput()
 | 
			
		||||
	if err !=nil {
 | 
			
		||||
		logger.Logger.Error().Msg("Could not start oc-monitor for " + lm.WorkflowName + " : " + err.Error())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		lm.Logger.Error().Msg("Could not start oc-monitor for " + lm.WorkflowName + " : " + err.Error())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
func (lm *LocalMonitor) execRemoteKube (){
 | 
			
		||||
// TODO : implement this
 | 
			
		||||
func (lm *LocalMonitor) execRemoteKube() {
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (lm *LocalMonitor) todo (){
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
@@ -1,72 +1,54 @@
 | 
			
		||||
package daemons
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"oc-scheduler/conf"
 | 
			
		||||
	"oc-scheduler/logger"
 | 
			
		||||
	"oc-scheduler/models"
 | 
			
		||||
	"oc-schedulerd/conf"
 | 
			
		||||
	"os"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	oclib "cloud.o-forge.io/core/oc-lib"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type ExecutionManager struct {
 | 
			
		||||
	bookings	*models.ScheduledBooking
 | 
			
		||||
	executions	[]models.Booking	
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
func (em *ExecutionManager) SetBookings(b *models.ScheduledBooking){
 | 
			
		||||
	em.bookings = b
 | 
			
		||||
	bookings ScheduledBooking
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Loop every second on the booking's list and move the booking that must start to a new list
 | 
			
		||||
// that will be looped over to start them
 | 
			
		||||
func (em *ExecutionManager) RetrieveNextExecutions(){
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
	if(em.bookings == nil){
 | 
			
		||||
		logger.Logger.Error().Msg("booking has not been set in the exection manager")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for(true){
 | 
			
		||||
		logger.Logger.Debug().Msg("New loop")
 | 
			
		||||
func (em *ExecutionManager) RetrieveNextExecutions() {
 | 
			
		||||
	logger := oclib.GetLogger()
 | 
			
		||||
	for {
 | 
			
		||||
		logger.Debug().Msg("New loop")
 | 
			
		||||
		em.bookings.Mu.Lock()
 | 
			
		||||
		bookings :=  em.bookings.Bookings
 | 
			
		||||
		if (len(bookings) > 0){	
 | 
			
		||||
			for i := len( bookings) - 1 ; i  >= 0 ; i--{
 | 
			
		||||
				logger.Logger.Debug().Msg("It should start at " + bookings[i].Start.String() + " and it is now " + time.Now().UTC() .String())
 | 
			
		||||
				if (bookings[i].Start.Before(time.Now().UTC())){
 | 
			
		||||
					logger.Logger.Info().Msg("Will execute " + bookings[i].Workflow + " soon")
 | 
			
		||||
		bookings := em.bookings.Bookings
 | 
			
		||||
		if len(bookings) > 0 {
 | 
			
		||||
			for i := len(bookings) - 1; i >= 0; i-- {
 | 
			
		||||
				logger.Debug().Msg("It should start at " + bookings[i].Start.String() + " and it is now " + time.Now().UTC().String())
 | 
			
		||||
				if bookings[i].Start.Before(time.Now().UTC()) {
 | 
			
		||||
					logger.Info().Msg("Will execute " + bookings[i].Workflow + " soon")
 | 
			
		||||
					go em.executeBooking(bookings[i])
 | 
			
		||||
					bookings = append(bookings[:i], bookings[i+1:]...)
 | 
			
		||||
					em.bookings.Bookings = bookings
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}	 
 | 
			
		||||
		}
 | 
			
		||||
		em.bookings.Mu.Unlock()
 | 
			
		||||
		time.Sleep(time.Second)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (em *ExecutionManager) executeBooking(booking models.Booking){
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
	// start execution 
 | 
			
		||||
func (em *ExecutionManager) executeBooking(booking Booking) {
 | 
			
		||||
	// start execution
 | 
			
		||||
	// create the yaml that describes the pod : filename, path/url to Loki
 | 
			
		||||
 | 
			
		||||
		
 | 
			
		||||
	exec_method := os.Getenv("MONITOR_METHOD")
 | 
			
		||||
 | 
			
		||||
	if exec_method == "local"{
 | 
			
		||||
		logger.Logger.Debug().Msg("Executing oc-monitor localy")
 | 
			
		||||
		monitor := LocalMonitor{LokiURL: conf.GetConfig().LokiUrl,KubeURL: "localhost",WorkflowName: booking.Workflow,}
 | 
			
		||||
	logger := oclib.GetLogger()
 | 
			
		||||
	if exec_method == "k8s" {
 | 
			
		||||
		logger.Error().Msg("TODO : executing oc-monitor in a k8s")
 | 
			
		||||
	} else {
 | 
			
		||||
		logger.Debug().Msg("Executing oc-monitor localy")
 | 
			
		||||
		monitor := LocalMonitor{
 | 
			
		||||
			Logger:  logger,
 | 
			
		||||
			LokiURL: conf.GetConfig().LokiUrl, KubeURL: "localhost",
 | 
			
		||||
			WorkflowName: booking.Workflow}
 | 
			
		||||
		monitor.LaunchLocalMonitor()
 | 
			
		||||
	}else{
 | 
			
		||||
		logger.Logger.Error().Msg("TODO : executing oc-monitor in a k8s")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
	
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -3,45 +3,78 @@ package daemons
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"oc-scheduler/logger"
 | 
			
		||||
	"oc-scheduler/models"
 | 
			
		||||
 | 
			
		||||
	oclib "cloud.o-forge.io/core/oc-lib"
 | 
			
		||||
	"cloud.o-forge.io/core/oc-lib/dbs"
 | 
			
		||||
	"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
 | 
			
		||||
	"github.com/nats-io/nats.go"
 | 
			
		||||
	"github.com/rs/zerolog"
 | 
			
		||||
	"go.mongodb.org/mongo-driver/bson/primitive"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type Booking struct {
 | 
			
		||||
	Start    time.Time
 | 
			
		||||
	Stop     time.Time
 | 
			
		||||
	Duration uint
 | 
			
		||||
	Workflow string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s Booking) Equals(other Booking) bool {
 | 
			
		||||
	return s.Workflow == other.Workflow && s.Start == other.Start && s.Stop == other.Stop
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *Booking) String() string {
 | 
			
		||||
	return fmt.Sprintf("{workflow : %s ,  start_date : %s ,  stop_date :  %s }", b.Workflow, b.Start.Format(time.RFC3339), b.Stop.Format(time.RFC3339))
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type ScheduledBooking struct {
 | 
			
		||||
	Bookings []Booking
 | 
			
		||||
	Mu       sync.Mutex
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (sb *ScheduledBooking) AddSchedule(new_booking Booking, logger zerolog.Logger) {
 | 
			
		||||
	if !sb.scheduleAlreadyExists(new_booking) {
 | 
			
		||||
		sb.Bookings = append(sb.Bookings, new_booking)
 | 
			
		||||
		logger.Info().Msg("Updated list schedules : \n " + sb.String())
 | 
			
		||||
	}
 | 
			
		||||
	logger.Debug().Msg("Workflow received not added")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (sb *ScheduledBooking) scheduleAlreadyExists(new_booking Booking) bool {
 | 
			
		||||
	for _, booking := range sb.Bookings {
 | 
			
		||||
		if booking.Equals(new_booking) {
 | 
			
		||||
			return true
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (sb *ScheduledBooking) String() string {
 | 
			
		||||
	var str string
 | 
			
		||||
	for _, booking := range sb.Bookings {
 | 
			
		||||
		str += fmt.Sprintf("%s\n", booking.String())
 | 
			
		||||
	}
 | 
			
		||||
	return str
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NATS daemon listens to subject " workflowsUpdate "
 | 
			
		||||
// workflowsUpdate messages must be formatted following this pattern '{"workflow" : "", "start_date" : "", "stop_date" : "" }'
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
type ScheduleManager struct {
 | 
			
		||||
	Api_url		string
 | 
			
		||||
	bookings	*models.ScheduledBooking	
 | 
			
		||||
	ws			models.HttpQuery
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *ScheduleManager) SetBookings(b *models.ScheduledBooking){
 | 
			
		||||
	s.bookings = b
 | 
			
		||||
	Logger   zerolog.Logger
 | 
			
		||||
	bookings ScheduledBooking
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Goroutine listening to a NATS server for updates
 | 
			
		||||
// on workflows' scheduling. Messages must contain 
 | 
			
		||||
// on workflows' scheduling. Messages must contain
 | 
			
		||||
// workflow execution ID, to allow retrieval of execution infos
 | 
			
		||||
func (s *ScheduleManager) ListenForWorkflowSubmissions(){
 | 
			
		||||
	
 | 
			
		||||
	if(s.bookings == nil){
 | 
			
		||||
		logger.Logger.Error().Msg("booking has not been set in the schedule manager")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
func (s *ScheduleManager) ListenForWorkflowSubmissions() {
 | 
			
		||||
	nc, err := nats.Connect(nats.DefaultURL)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		logger.Logger.Error().Msg("Could not connect to NATS")
 | 
			
		||||
		s.Logger.Error().Msg("Could not connect to NATS")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -49,34 +82,31 @@ func (s *ScheduleManager) ListenForWorkflowSubmissions(){
 | 
			
		||||
 | 
			
		||||
	ch := make(chan *nats.Msg, 64)
 | 
			
		||||
 | 
			
		||||
	subs , err := nc.ChanSubscribe("workflowsUpdate", ch)
 | 
			
		||||
	subs, err := nc.ChanSubscribe("workflowsUpdate", ch)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		logger.Logger.Error().Msg("Error listening to NATS")
 | 
			
		||||
		s.Logger.Error().Msg("Error listening to NATS")
 | 
			
		||||
	}
 | 
			
		||||
	defer subs.Unsubscribe()
 | 
			
		||||
 | 
			
		||||
	for msg := range(ch){
 | 
			
		||||
	for msg := range ch {
 | 
			
		||||
		fmt.Println("Waiting...")
 | 
			
		||||
 | 
			
		||||
		map_mess := retrieveMapFromSub(msg.Data)
 | 
			
		||||
	
 | 
			
		||||
		s.bookings.Mu.Lock()
 | 
			
		||||
		
 | 
			
		||||
		wf_exec := getWorkflowExecution(map_mess["workflow"])
 | 
			
		||||
 | 
			
		||||
		s.bookings.AddSchedule(models.Booking{Workflow: map_mess["workflow"], Start: *wf_exec.ExecDate, Stop: *wf_exec.EndDate })
 | 
			
		||||
		s.bookings.Mu.Lock()
 | 
			
		||||
 | 
			
		||||
		wf_exec := s.getWorkflowExecution(map_mess["workflow"])
 | 
			
		||||
 | 
			
		||||
		s.bookings.AddSchedule(Booking{Workflow: map_mess["workflow"], Start: *wf_exec.ExecDate, Stop: *wf_exec.EndDate}, s.Logger)
 | 
			
		||||
		s.bookings.Mu.Unlock()
 | 
			
		||||
	
 | 
			
		||||
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
func getWorkflowExecution(exec_id string) *workflow_execution.WorkflowExecution {
 | 
			
		||||
 | 
			
		||||
	res := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION),exec_id)
 | 
			
		||||
 | 
			
		||||
func (s *ScheduleManager) getWorkflowExecution(exec_id string) *workflow_execution.WorkflowExecution {
 | 
			
		||||
	res := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), exec_id)
 | 
			
		||||
	if res.Code != 200 {
 | 
			
		||||
		logger.Logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id)
 | 
			
		||||
		s.Logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id)
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -93,62 +123,57 @@ func retrieveMapFromSub(message []byte) (result_map map[string]string) {
 | 
			
		||||
 | 
			
		||||
// Used at launch of the component to retrieve the next scheduled workflows
 | 
			
		||||
// and then every X minutes in case some workflows were scheduled before launch
 | 
			
		||||
func (s *ScheduleManager) SchedulePolling (){
 | 
			
		||||
func (s *ScheduleManager) SchedulePolling() {
 | 
			
		||||
	var sleep_time float64 = 1
 | 
			
		||||
	for(true){
 | 
			
		||||
	for {
 | 
			
		||||
		s.getNextScheduledWorkflows(3)
 | 
			
		||||
		
 | 
			
		||||
		logger.Logger.Info().Msg("Current list of schedules")
 | 
			
		||||
 | 
			
		||||
		s.Logger.Info().Msg("Current list of schedules")
 | 
			
		||||
		fmt.Println(s.bookings.Bookings)
 | 
			
		||||
 | 
			
		||||
		time.Sleep(time.Minute * time.Duration(sleep_time))
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
func (s *ScheduleManager) getWorfklowExecution(from time.Time, to time.Time) (exec_list []workflow_execution.WorkflowExecution, err error) {
 | 
			
		||||
	
 | 
			
		||||
	f := dbs.Filters{
 | 
			
		||||
		And: map[string][]dbs.Filter{
 | 
			
		||||
			"execution_date" : {{Operator : dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(from)}, {Operator: dbs.LTE.String(), Value:  primitive.NewDateTimeFromTime(to)}},
 | 
			
		||||
			"state": {{Operator: dbs.EQUAL.String(), Value: 1}},
 | 
			
		||||
			"execution_date": {{Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(from)}, {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(to)}},
 | 
			
		||||
			"state":          {{Operator: dbs.EQUAL.String(), Value: 1}},
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 	res := oclib.Search(&f,"",oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION))
 | 
			
		||||
	res := oclib.Search(&f, "", oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION))
 | 
			
		||||
	if res.Code != 200 {
 | 
			
		||||
		logger.Logger.Error().Msg("Error loading")
 | 
			
		||||
		s.Logger.Error().Msg("Error loading")
 | 
			
		||||
		return nil, nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, exec := range(res.Data){
 | 
			
		||||
		lib_data := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION),exec.GetID())
 | 
			
		||||
	for _, exec := range res.Data {
 | 
			
		||||
		lib_data := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), exec.GetID())
 | 
			
		||||
		exec_obj := lib_data.ToWorkflowExecution()
 | 
			
		||||
		exec_list = append(exec_list, *exec_obj)
 | 
			
		||||
	}
 | 
			
		||||
	return exec_list, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TODO : refactor to implement oclib.Search
 | 
			
		||||
func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64)  {
 | 
			
		||||
func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) {
 | 
			
		||||
	start := time.Now().UTC()
 | 
			
		||||
	end := start.Add(time.Minute * time.Duration(minutes)).UTC()
 | 
			
		||||
	
 | 
			
		||||
 | 
			
		||||
	fmt.Printf("Getting workflows execution from %s to %s \n", start.String(), end.String())
 | 
			
		||||
 | 
			
		||||
	next_wf_exec, err := s.getWorfklowExecution(start,end)
 | 
			
		||||
	next_wf_exec, err := s.getWorfklowExecution(start, end)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		logger.Logger.Error().Msg("Could not retrieve next schedules")
 | 
			
		||||
		s.Logger.Error().Msg("Could not retrieve next schedules")
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
	s.bookings.Mu.Lock()
 | 
			
		||||
	defer s.bookings.Mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	for _, exec := range(next_wf_exec){
 | 
			
		||||
	for _, exec := range next_wf_exec {
 | 
			
		||||
		exec_start := exec.ExecDate
 | 
			
		||||
		exec_stop := exec.EndDate
 | 
			
		||||
 | 
			
		||||
		s.bookings.AddSchedule(models.Booking{Workflow: exec.UUID, Start: *exec_start, Stop: *exec_stop}) 
 | 
			
		||||
		s.bookings.AddSchedule(Booking{Workflow: exec.UUID, Start: *exec_start, Stop: *exec_stop}, s.Logger)
 | 
			
		||||
	}
 | 
			
		||||
	
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -10,6 +10,7 @@ services:
 | 
			
		||||
      - "--debug"
 | 
			
		||||
    networks: 
 | 
			
		||||
      - scheduler
 | 
			
		||||
      - catalog
 | 
			
		||||
  loki:
 | 
			
		||||
    image: 'grafana/loki'
 | 
			
		||||
    container_name: loki
 | 
			
		||||
@@ -33,4 +34,6 @@ services:
 | 
			
		||||
 | 
			
		||||
networks: 
 | 
			
		||||
  scheduler:
 | 
			
		||||
    external: true
 | 
			
		||||
  catalog:
 | 
			
		||||
    external: true
 | 
			
		||||
@@ -1,14 +0,0 @@
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"oc-scheduler/daemons"
 | 
			
		||||
	"oc-scheduler/models"
 | 
			
		||||
	"testing"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestCreateManifest(t *testing.T){
 | 
			
		||||
	em := daemons.ExecutionManager{}
 | 
			
		||||
	em.CreateManifest(models.Booking{},"fessity-chlics_23_07_2024_154326")
 | 
			
		||||
 | 
			
		||||
	
 | 
			
		||||
}
 | 
			
		||||
@@ -1,13 +0,0 @@
 | 
			
		||||
package logger
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"cloud.o-forge.io/core/oc-lib/logs"
 | 
			
		||||
	"github.com/rs/zerolog"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var Logger zerolog.Logger
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	logs.SetAppName("oc-scheduler")
 | 
			
		||||
	Logger = logs.CreateLogger("","")
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										45
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										45
									
								
								main.go
									
									
									
									
									
								
							@@ -3,54 +3,23 @@ package main
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
 | 
			
		||||
	conf "oc-scheduler/conf"
 | 
			
		||||
	"oc-scheduler/models"
 | 
			
		||||
 | 
			
		||||
	"oc-scheduler/daemons"
 | 
			
		||||
	conf "oc-schedulerd/conf"
 | 
			
		||||
	"oc-schedulerd/daemons"
 | 
			
		||||
 | 
			
		||||
	oclib "cloud.o-forge.io/core/oc-lib"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// var log zerolog.Logger
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
	var bookings models.ScheduledBooking
 | 
			
		||||
	
 | 
			
		||||
	oclib.SetConfig(conf.GetConfig().MongoUrl,"DC_myDC")
 | 
			
		||||
	oclib.Init("oc-scheduler") 
 | 
			
		||||
	oclib.SetConfig(conf.GetConfig().MongoUrl, "DC_myDC")
 | 
			
		||||
	oclib.Init("oc-schedulerd")
 | 
			
		||||
 | 
			
		||||
	app_conf := conf.GetConfig()
 | 
			
		||||
	apiurl := app_conf.OcCatalogUrl
 | 
			
		||||
 | 
			
		||||
	sch_mngr := daemons.ScheduleManager{Api_url: apiurl}
 | 
			
		||||
	sch_mngr.SetBookings(&bookings)
 | 
			
		||||
	sch_mngr := daemons.ScheduleManager{Logger: oclib.GetLogger()}
 | 
			
		||||
	exe_mngr := daemons.ExecutionManager{}
 | 
			
		||||
	exe_mngr.SetBookings(&bookings)
 | 
			
		||||
 | 
			
		||||
	go sch_mngr.ListenForWorkflowSubmissions()
 | 
			
		||||
	
 | 
			
		||||
	go sch_mngr.SchedulePolling()
 | 
			
		||||
 | 
			
		||||
	exe_mngr.RetrieveNextExecutions()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
	// method in Schedule manager that checks the first Schedule object for its start date and exe 
 | 
			
		||||
 | 
			
		||||
	// var g Graph
 | 
			
		||||
 | 
			
		||||
	// list, err := g.GetGraphList(apiurl)
 | 
			
		||||
	// if err != nil {
 | 
			
		||||
	// 	log.Fatal().Msg("Failed to get the workspaces list, check api url and that api server is up : " + apiurl)
 | 
			
		||||
	// }
 | 
			
		||||
	
 | 
			
		||||
	// println("Available workspaces :")
 | 
			
		||||
	// for workspace, _ := range list {
 | 
			
		||||
	// 	println(workspace)
 | 
			
		||||
	// }
 | 
			
		||||
	
 | 
			
		||||
 | 
			
		||||
	// g.LoadFrom(list["test-alpr"])
 | 
			
		||||
	// g.ExportToArgo("test-alpr")
 | 
			
		||||
 | 
			
		||||
 	fmt.Print("stop")
 | 
			
		||||
 | 
			
		||||
	fmt.Print("stop")
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -1,53 +0,0 @@
 | 
			
		||||
package models
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"io"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"net/http/cookiejar"
 | 
			
		||||
	"net/url"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type HttpQuery struct {
 | 
			
		||||
	baseurl string
 | 
			
		||||
	jar     http.CookieJar
 | 
			
		||||
	Cookies map[string]string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *HttpQuery) Init(url string) {
 | 
			
		||||
	h.baseurl = url
 | 
			
		||||
	h.jar, _ = cookiejar.New(nil)
 | 
			
		||||
	h.Cookies = make(map[string]string)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *HttpQuery) Get(url string) ([]byte, error) {
 | 
			
		||||
	client := &http.Client{Jar: h.jar}
 | 
			
		||||
	resp, err := client.Get(h.baseurl + url)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	// store received cookies
 | 
			
		||||
	for _, cookie := range h.jar.Cookies(resp.Request.URL) {
 | 
			
		||||
		h.Cookies[cookie.Name] = cookie.Value
 | 
			
		||||
	}
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	body, err := io.ReadAll(resp.Body)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	return body, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (h *HttpQuery) Post(url string, data url.Values) (*http.Response, error) {
 | 
			
		||||
	client := &http.Client{Jar: h.jar}
 | 
			
		||||
	resp, err := client.PostForm(h.baseurl+url, data)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	// store received cookies
 | 
			
		||||
	for _, cookie := range h.jar.Cookies(resp.Request.URL) {
 | 
			
		||||
		h.Cookies[cookie.Name] = cookie.Value
 | 
			
		||||
	}
 | 
			
		||||
	return resp, err
 | 
			
		||||
}
 | 
			
		||||
@@ -1,71 +0,0 @@
 | 
			
		||||
package models
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"oc-scheduler/logger"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Is duration really important ?
 | 
			
		||||
 | 
			
		||||
type Booking struct {
 | 
			
		||||
	Start		time.Time
 | 
			
		||||
	Stop		time.Time
 | 
			
		||||
	Duration	uint
 | 
			
		||||
	Workflow	string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type ScheduledBooking struct {
 | 
			
		||||
	Bookings	[]Booking
 | 
			
		||||
	Mu			sync.Mutex
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s Booking) Equals(other Booking) bool {
 | 
			
		||||
    return s.Workflow == other.Workflow && s.Start == other.Start && s.Stop == other.Stop
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (sb *ScheduledBooking) AddSchedule(new_booking Booking){
 | 
			
		||||
	if(!sb.scheduleAlreadyExists(new_booking)){
 | 
			
		||||
		sb.Bookings = append(sb.Bookings,new_booking) 
 | 
			
		||||
		logger.Logger.Info().Msg("Updated list schedules : \n " + sb.String())
 | 
			
		||||
	} else {
 | 
			
		||||
		// Debug condition : delete once this feature is ready to be implemented
 | 
			
		||||
		logger.Logger.Debug().Msg("Workflow received not added")
 | 
			
		||||
		logger.Logger.Debug().Msg("current schedule contains")
 | 
			
		||||
		for _, booking := range(sb.Bookings){
 | 
			
		||||
			logger.Logger.Debug().Msg(booking.String())
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (sb *ScheduledBooking) GetListNames()(list_names []string ){
 | 
			
		||||
	for _, schedule := range(sb.Bookings){
 | 
			
		||||
		list_names = append(list_names, schedule.Workflow)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (sb *ScheduledBooking) scheduleAlreadyExists(new_booking Booking) bool {
 | 
			
		||||
	for _, booking :=  range(sb.Bookings){
 | 
			
		||||
		if booking.Equals(new_booking){
 | 
			
		||||
			return true
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *Booking) String() string {
 | 
			
		||||
	return fmt.Sprintf("{workflow : %s ,  start_date : %s ,  stop_date :  %s }", b.Workflow, b.Start.Format(time.RFC3339), b.Stop.Format(time.RFC3339))
 | 
			
		||||
	
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (sb *ScheduledBooking) String() string {
 | 
			
		||||
	var str string
 | 
			
		||||
	for _, booking := range(sb.Bookings){
 | 
			
		||||
		str += fmt.Sprintf("%s\n", booking.String())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return str
 | 
			
		||||
}
 | 
			
		||||
@@ -1,40 +0,0 @@
 | 
			
		||||
package models
 | 
			
		||||
 | 
			
		||||
type Parameter struct {
 | 
			
		||||
	Name  string `yaml:"name,omitempty"`
 | 
			
		||||
	Value string `yaml:"value,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Container struct {
 | 
			
		||||
	Image        string        `yaml:"image"`
 | 
			
		||||
	Command      []string      `yaml:"command,omitempty,flow"`
 | 
			
		||||
	Args         []string      `yaml:"args,omitempty,flow"`
 | 
			
		||||
	VolumeMounts []VolumeMount `yaml:"volumeMounts,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type VolumeMount struct {
 | 
			
		||||
	Name      string `yaml:"name"`
 | 
			
		||||
	MountPath string `yaml:"mountPath"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Task struct {
 | 
			
		||||
	Name         string   `yaml:"name"`
 | 
			
		||||
	Template     string   `yaml:"template"`
 | 
			
		||||
	Dependencies []string `yaml:"dependencies,omitempty"`
 | 
			
		||||
	Arguments    struct {
 | 
			
		||||
		Parameters []Parameter `yaml:"parameters,omitempty"`
 | 
			
		||||
	} `yaml:"arguments,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Dag struct {
 | 
			
		||||
	Tasks []Task `yaml:"tasks,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Template struct {
 | 
			
		||||
	Name      string    `yaml:"name"`
 | 
			
		||||
	Inputs    struct {
 | 
			
		||||
		Parameters []Parameter `yaml:"parameters"`
 | 
			
		||||
	} `yaml:"inputs,omitempty"`
 | 
			
		||||
	Container Container `yaml:"container,omitempty"`
 | 
			
		||||
	Dag       Dag       `yaml:"dag,omitempty"`
 | 
			
		||||
}
 | 
			
		||||
@@ -1,19 +0,0 @@
 | 
			
		||||
package models
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
type VolumeClaimTemplate struct {
 | 
			
		||||
    Metadata struct {
 | 
			
		||||
        Name string 	`yaml:"name"`
 | 
			
		||||
    } 					`yaml:"metadata"`
 | 
			
		||||
    Spec	VolumeSpec	`yaml:"spec"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type VolumeSpec struct {
 | 
			
		||||
	AccessModes []string 	`yaml:"accessModes,flow"`
 | 
			
		||||
	Resources   struct {
 | 
			
		||||
		Requests struct {
 | 
			
		||||
			Storage string 	`yaml:"storage"`
 | 
			
		||||
		} 					`yaml:"requests"`
 | 
			
		||||
	} 						`yaml:"resources"`
 | 
			
		||||
} 
 | 
			
		||||
							
								
								
									
										
											BIN
										
									
								
								oc-scheduler
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								oc-scheduler
									
									
									
									
									
										Executable file
									
								
							
										
											Binary file not shown.
										
									
								
							
		Reference in New Issue
	
	Block a user