Oclib major new version
This commit is contained in:
		| @@ -11,13 +11,13 @@ import ( | |||||||
| type LocalMonitor struct { | type LocalMonitor struct { | ||||||
| 	LokiURL      string | 	LokiURL      string | ||||||
| 	KubeURL      string | 	KubeURL      string | ||||||
| 	WorkflowName string | 	ExecutionID string | ||||||
| 	Duration     int | 	Duration     int | ||||||
| 	Logger       zerolog.Logger | 	Logger       zerolog.Logger | ||||||
| } | } | ||||||
|  |  | ||||||
| func (lm *LocalMonitor) LaunchLocalMonitor() { | func (lm *LocalMonitor) LaunchLocalMonitor() { | ||||||
| 	if lm.LokiURL == "" || lm.KubeURL == "" || lm.WorkflowName == "" { | 	if lm.LokiURL == "" || lm.KubeURL == "" || lm.ExecutionID == "" { | ||||||
| 		lm.Logger.Error().Msg("Missing parameter in LocalMonitor") | 		lm.Logger.Error().Msg("Missing parameter in LocalMonitor") | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -30,7 +30,7 @@ func (lm *LocalMonitor) LaunchLocalMonitor() { | |||||||
| } | } | ||||||
|  |  | ||||||
| func (lm *LocalMonitor) execLocalKube() { | func (lm *LocalMonitor) execLocalKube() { | ||||||
| 	args := []string{"-w", lm.WorkflowName, "-u", lm.LokiURL, "-m", conf.GetConfig().MongoUrl, "-d", conf.GetConfig().DBName} | 	args := []string{"-w", lm.ExecutionID, "-u", lm.LokiURL, "-m", conf.GetConfig().MongoUrl, "-d", conf.GetConfig().DBName} | ||||||
| 	if lm.Duration > 0 { | 	if lm.Duration > 0 { | ||||||
| 		args = append(args, "-t", fmt.Sprintf("%d", lm.Duration)) | 		args = append(args, "-t", fmt.Sprintf("%d", lm.Duration)) | ||||||
| 	} | 	} | ||||||
| @@ -38,7 +38,7 @@ func (lm *LocalMonitor) execLocalKube() { | |||||||
| 	fmt.Println("CMD", cmd) | 	fmt.Println("CMD", cmd) | ||||||
| 	err := cmd.Start() | 	err := cmd.Start() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		lm.Logger.Error().Msg("Could not start oc-monitor for " + lm.WorkflowName + " : " + err.Error()) | 		lm.Logger.Error().Msg("Could not start oc-monitor for " + lm.ExecutionID + " : " + err.Error()) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -6,9 +6,10 @@ import ( | |||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	oclib "cloud.o-forge.io/core/oc-lib" | 	oclib "cloud.o-forge.io/core/oc-lib" | ||||||
|  | 	workflow_execution "cloud.o-forge.io/core/oc-lib/models/workflow_execution" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| var Bookings = ScheduledBooking{Bookings: []Booking{}} | var Bookings = ScheduledBooking{Bookings: map[string]*workflow_execution.WorkflowExecution{}} | ||||||
|  |  | ||||||
| type ExecutionManager struct{} | type ExecutionManager struct{} | ||||||
|  |  | ||||||
| @@ -20,12 +21,11 @@ func (em *ExecutionManager) RetrieveNextExecutions() { | |||||||
| 		logger.Debug().Msg("New loop") | 		logger.Debug().Msg("New loop") | ||||||
| 		Bookings.Mu.Lock() | 		Bookings.Mu.Lock() | ||||||
| 		if len(Bookings.Bookings) > 0 { | 		if len(Bookings.Bookings) > 0 { | ||||||
| 			for i := len(Bookings.Bookings) - 1; i >= 0; i-- { | 			for k, v := range Bookings.Bookings { | ||||||
| 				logger.Debug().Msg("It should start at " + Bookings.Bookings[i].Start.String() + " and it is now " + time.Now().UTC().String()) | 				if v.ExecDate.Before(time.Now().UTC()) { | ||||||
| 				if Bookings.Bookings[i].Start.Before(time.Now().UTC()) { | 					logger.Info().Msg("Will execute " + k + " soon") | ||||||
| 					logger.Info().Msg("Will execute " + Bookings.Bookings[i].Workflow + " soon") | 					go em.executeBooking(v) | ||||||
| 					go em.executeBooking(Bookings.Bookings[i]) | 					delete(Bookings.Bookings, k) | ||||||
| 					Bookings.Bookings = append(Bookings.Bookings[:i], Bookings.Bookings[i+1:]...) |  | ||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| @@ -34,7 +34,7 @@ func (em *ExecutionManager) RetrieveNextExecutions() { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (em *ExecutionManager) executeBooking(booking Booking) { | func (em *ExecutionManager) executeBooking(booking *workflow_execution.WorkflowExecution) { | ||||||
| 	// start execution | 	// start execution | ||||||
| 	// create the yaml that describes the pod : filename, path/url to Loki | 	// create the yaml that describes the pod : filename, path/url to Loki | ||||||
| 	exec_method := os.Getenv("MONITOR_METHOD") | 	exec_method := os.Getenv("MONITOR_METHOD") | ||||||
| @@ -44,15 +44,15 @@ func (em *ExecutionManager) executeBooking(booking Booking) { | |||||||
| 	} else { | 	} else { | ||||||
| 		logger.Debug().Msg("Executing oc-monitor localy") | 		logger.Debug().Msg("Executing oc-monitor localy") | ||||||
| 		duration := 0 | 		duration := 0 | ||||||
| 		if booking.Stop != nil && booking.Start != nil { | 		if booking.EndDate != nil && booking.ExecDate != nil { | ||||||
| 			duration = int(booking.Stop.Sub(*booking.Start).Seconds()) | 			duration = int(booking.EndDate.Sub(*booking.ExecDate).Seconds()) | ||||||
| 		} | 		} | ||||||
| 		monitor := LocalMonitor{ | 		monitor := LocalMonitor{ | ||||||
| 			Logger:  logger, | 			Logger:  logger, | ||||||
| 			Duration: duration, | 			Duration: duration, | ||||||
| 			LokiURL: conf.GetConfig().LokiUrl,  | 			LokiURL: conf.GetConfig().LokiUrl,  | ||||||
| 			KubeURL: "localhost", | 			KubeURL: "localhost", | ||||||
| 			WorkflowName: booking.Workflow, | 			ExecutionID: booking.UUID, | ||||||
| 		} | 		} | ||||||
| 		monitor.LaunchLocalMonitor() | 		monitor.LaunchLocalMonitor() | ||||||
| 	} | 	} | ||||||
|   | |||||||
| @@ -6,7 +6,7 @@ import ( | |||||||
| 	"oc-schedulerd/conf" | 	"oc-schedulerd/conf" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
|  | 	"cloud.o-forge.io/core/oc-lib/tools" | ||||||
| 	oclib "cloud.o-forge.io/core/oc-lib" | 	oclib "cloud.o-forge.io/core/oc-lib" | ||||||
| 	"cloud.o-forge.io/core/oc-lib/dbs" | 	"cloud.o-forge.io/core/oc-lib/dbs" | ||||||
| 	"cloud.o-forge.io/core/oc-lib/models/workflow_execution" | 	"cloud.o-forge.io/core/oc-lib/models/workflow_execution" | ||||||
| @@ -15,108 +15,92 @@ import ( | |||||||
| 	"go.mongodb.org/mongo-driver/bson/primitive" | 	"go.mongodb.org/mongo-driver/bson/primitive" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| type Booking struct { |  | ||||||
| 	Start    *time.Time |  | ||||||
| 	Stop     *time.Time |  | ||||||
| 	Duration uint |  | ||||||
| 	Workflow string |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (s Booking) Equals(other Booking) bool { |  | ||||||
| 	return s.Workflow == other.Workflow && s.Start == other.Start |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (b *Booking) String() string { |  | ||||||
| 	stop := "nil" |  | ||||||
| 	if b.Stop != nil { |  | ||||||
| 		stop = b.Stop.Format(time.RFC3339) |  | ||||||
| 	} |  | ||||||
| 	start := "nil" |  | ||||||
| 	if b.Start != nil { |  | ||||||
| 		start = b.Start.Format(time.RFC3339) |  | ||||||
| 	} |  | ||||||
| 	return fmt.Sprintf("{workflow : %s ,  start_date : %s ,  stop_date :  %s }", b.Workflow, start, stop) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| type ScheduledBooking struct { | type ScheduledBooking struct { | ||||||
| 	Bookings []Booking | 	Bookings map[string]*workflow_execution.WorkflowExecution | ||||||
| 	Mu       sync.Mutex | 	Mu       sync.Mutex | ||||||
| } | } | ||||||
|  |  | ||||||
| func (sb *ScheduledBooking) AddSchedule(new_booking Booking, logger zerolog.Logger) { | func (sb *ScheduledBooking) DeleteSchedules(workflow_id string) { | ||||||
| 	found := false | 	toDelete := []string{} | ||||||
| 	for _, booking := range sb.Bookings { | 	for k, b := range sb.Bookings { | ||||||
| 		if booking.Equals(new_booking) { | 		if b.WorkflowID == workflow_id { | ||||||
| 			found = true | 			toDelete = append(toDelete, k) | ||||||
| 			break |  | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	if !found { | 	Bookings.Mu.Lock() | ||||||
| 		sb.Bookings = append(sb.Bookings, new_booking) | 	defer Bookings.Mu.Unlock() | ||||||
| 		logger.Info().Msg("Updated list schedules : \n " + new_booking.String()) | 	for _, k := range toDelete { | ||||||
|  | 		delete(sb.Bookings, k) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| func (sb *ScheduledBooking) String() string { | func (sb *ScheduledBooking) AddSchedules(new_bookings []*workflow_execution.WorkflowExecution, logger zerolog.Logger) { | ||||||
| 	var str string | 	Bookings.Mu.Lock() | ||||||
| 	for _, booking := range sb.Bookings { | 	defer Bookings.Mu.Unlock() | ||||||
| 		str += fmt.Sprintf("%s\n", booking.String()) | 	for _, exec := range new_bookings { | ||||||
|  | 		sb.Bookings[exec.GetID()] = exec | ||||||
| 	} | 	} | ||||||
| 	return str |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // NATS daemon listens to subject " workflowsUpdate " | // NATS daemon listens to subject " workflowsUpdate " | ||||||
| // workflowsUpdate messages must be formatted following this pattern '{"workflow" : "", "start_date" : "", "stop_date" : "" }' | // workflowsUpdate messages must be formatted following this pattern '{"workflow" : "", "start_date" : "", "stop_date" : "" }' | ||||||
|  |  | ||||||
| type ScheduleManager struct { | type ScheduleManager struct { | ||||||
| 	Logger zerolog.Logger | 	Logger zerolog.Logger | ||||||
| } | } | ||||||
|  |  | ||||||
| // Goroutine listening to a NATS server for updates | // Goroutine listening to a NATS server for updates | ||||||
| // on workflows' scheduling. Messages must contain | // on workflows' scheduling. Messages must contain | ||||||
| // workflow execution ID, to allow retrieval of execution infos | // workflow execution ID, to allow retrieval of execution infos | ||||||
| func (s *ScheduleManager) ListenForWorkflowSubmissions() { | func (s *ScheduleManager) ListenNATS() { | ||||||
| 	nc, err := nats.Connect(conf.GetConfig().NatsUrl) | 	nc, err := nats.Connect(conf.GetConfig().NatsUrl) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		s.Logger.Error().Msg("Could not connect to NATS") | 		s.Logger.Error().Msg("Could not connect to NATS") | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	defer nc.Close() | 	defer nc.Close() | ||||||
|  | 	var wg sync.WaitGroup | ||||||
|  | 	wg.Add(2) | ||||||
|  | 	go s.listenForChange(nc, tools.REMOVE.GenerateKey(oclib.WORKFLOW.String()), true, wg) | ||||||
|  | 	go s.listenForChange(nc, tools.CREATE.GenerateKey(oclib.WORKFLOW.String()), false, wg) | ||||||
|  | 	wg.Wait() | ||||||
|  |  | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Goroutine listening to a NATS server for updates | ||||||
|  | // on workflows' scheduling. Messages must contain | ||||||
|  | // workflow execution ID, to allow retrieval of execution infos | ||||||
|  | func (s *ScheduleManager) listenForChange(nc *nats.Conn, chanName string, delete bool, wg sync.WaitGroup) { | ||||||
|  | 	defer wg.Done() | ||||||
| 	ch := make(chan *nats.Msg, 64) | 	ch := make(chan *nats.Msg, 64) | ||||||
|  | 	fmt.Println("Listening to " + chanName) | ||||||
| 	subs, err := nc.ChanSubscribe("workflowsUpdate", ch) | 	subs, err := nc.ChanSubscribe(chanName, ch) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		s.Logger.Error().Msg("Error listening to NATS") | 		s.Logger.Error().Msg("Error listening to NATS : " + err.Error()) | ||||||
| 	} | 	} | ||||||
| 	defer subs.Unsubscribe() | 	defer subs.Unsubscribe() | ||||||
|  |  | ||||||
| 	for msg := range ch { | 	for msg := range ch { | ||||||
| 		map_mess := retrieveMapFromSub(msg.Data) | 		map_mess := map[string]string{} | ||||||
| 		fmt.Println("Catching new workflow... " + map_mess["workflow_id"]) | 		json.Unmarshal(msg.Data, &map_mess) | ||||||
| 		s.getNextScheduledWorkflows(1) | 		fmt.Println("Catching new workflow... " + map_mess["id"]) | ||||||
|  | 		if delete { | ||||||
|  | 			Bookings.DeleteSchedules(map_mess["id"]) | ||||||
|  | 		} else { | ||||||
|  | 			s.getNextScheduledWorkflows(1) | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| // At the moment very simplistic, but could be useful if we send bigger messages |  | ||||||
| func retrieveMapFromSub(message []byte) (result_map map[string]string) { |  | ||||||
| 	json.Unmarshal(message, &result_map) |  | ||||||
| 	return |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Used at launch of the component to retrieve the next scheduled workflows | // Used at launch of the component to retrieve the next scheduled workflows | ||||||
| // and then every X minutes in case some workflows were scheduled before launch | // and then every X minutes in case some workflows were scheduled before launch | ||||||
| func (s *ScheduleManager) SchedulePolling() { | func (s *ScheduleManager) SchedulePolling() { | ||||||
| 	var sleep_time float64 = 1 | 	var sleep_time float64 = 1 | ||||||
| 	for { | 	for { | ||||||
| 		s.getNextScheduledWorkflows(1) | 		s.getNextScheduledWorkflows(1) | ||||||
|  |  | ||||||
| 		s.Logger.Info().Msg("Current list of schedules -------> " + fmt.Sprintf("%v", len(Bookings.Bookings))) | 		s.Logger.Info().Msg("Current list of schedules -------> " + fmt.Sprintf("%v", len(Bookings.Bookings))) | ||||||
| 		time.Sleep(time.Minute * time.Duration(sleep_time)) | 		time.Sleep(time.Minute * time.Duration(sleep_time)) | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
| func (s *ScheduleManager) getWorfklowExecution(from time.Time, to time.Time) (exec_list []workflow_execution.WorkflowExecution, err error) { | func (s *ScheduleManager) getExecution(from time.Time, to time.Time) (exec_list []*workflow_execution.WorkflowExecution, err error) { | ||||||
|  | 	fmt.Printf("Getting workflows execution from %s to %s \n", from.String(), to.String()) | ||||||
| 	f := dbs.Filters{ | 	f := dbs.Filters{ | ||||||
| 		And: map[string][]dbs.Filter{ | 		And: map[string][]dbs.Filter{ | ||||||
| 			"execution_date": {{Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(from)}, {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(to)}}, | 			"execution_date": {{Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(from)}, {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(to)}}, | ||||||
| @@ -126,33 +110,22 @@ func (s *ScheduleManager) getWorfklowExecution(from time.Time, to time.Time) (ex | |||||||
| 	res := oclib.Search(&f, "", oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION)) | 	res := oclib.Search(&f, "", oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION)) | ||||||
| 	if res.Code != 200 { | 	if res.Code != 200 { | ||||||
| 		s.Logger.Error().Msg("Error loading") | 		s.Logger.Error().Msg("Error loading") | ||||||
| 		return nil, nil | 		return | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for _, exec := range res.Data { | 	for _, exec := range res.Data { | ||||||
| 		lib_data := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), exec.GetID()) | 		exec_list = append(exec_list, exec.(*workflow_execution.WorkflowExecution)) | ||||||
| 		exec_obj := lib_data.ToWorkflowExecution() |  | ||||||
| 		exec_list = append(exec_list, *exec_obj) |  | ||||||
| 	} | 	} | ||||||
| 	return exec_list, nil | 	return | ||||||
| } | } | ||||||
|  |  | ||||||
| func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) { | func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) { | ||||||
| 	start := time.Now().UTC() | 	start := time.Now().UTC() | ||||||
| 	end := start.Add(time.Minute * time.Duration(minutes)).UTC() | 	if next_wf_exec, err := s.getExecution( | ||||||
| 	start = start.Add(time.Second * time.Duration(-1)).UTC() | 		start.Add(time.Second * time.Duration(-1)).UTC(),  | ||||||
| 	fmt.Printf("Getting workflows execution from %s to %s \n", start.String(), end.String()) | 		start.Add(time.Minute * time.Duration(minutes)).UTC(), | ||||||
|  | 	); err != nil { | ||||||
| 	next_wf_exec, err := s.getWorfklowExecution(start, end) |  | ||||||
| 	if err != nil { |  | ||||||
| 		s.Logger.Error().Msg("Could not retrieve next schedules") | 		s.Logger.Error().Msg("Could not retrieve next schedules") | ||||||
| 		return | 	} else { | ||||||
| 	} | 		Bookings.AddSchedules(next_wf_exec, s.Logger) | ||||||
|  |  | ||||||
| 	Bookings.Mu.Lock() |  | ||||||
| 	defer Bookings.Mu.Unlock() |  | ||||||
|  |  | ||||||
| 	for _, exec := range next_wf_exec { |  | ||||||
| 		Bookings.AddSchedule(Booking{Workflow: exec.UUID, Start: exec.ExecDate, Stop: exec.EndDate}, s.Logger) |  | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|   | |||||||
							
								
								
									
										6
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										6
									
								
								go.mod
									
									
									
									
									
								
							| @@ -8,7 +8,7 @@ require ( | |||||||
| 	github.com/beego/beego v1.12.12 | 	github.com/beego/beego v1.12.12 | ||||||
| 	github.com/beego/beego/v2 v2.2.2 | 	github.com/beego/beego/v2 v2.2.2 | ||||||
| 	github.com/goraz/onion v0.1.3 | 	github.com/goraz/onion v0.1.3 | ||||||
| 	github.com/nats-io/nats.go v1.9.1 | 	github.com/nats-io/nats.go v1.37.0 | ||||||
| 	github.com/nwtgck/go-fakelish v0.1.3 | 	github.com/nwtgck/go-fakelish v0.1.3 | ||||||
| 	github.com/rs/zerolog v1.33.0 | 	github.com/rs/zerolog v1.33.0 | ||||||
| 	github.com/tidwall/gjson v1.17.1 | 	github.com/tidwall/gjson v1.17.1 | ||||||
| @@ -17,7 +17,7 @@ require ( | |||||||
| ) | ) | ||||||
|  |  | ||||||
| require ( | require ( | ||||||
| 	cloud.o-forge.io/core/oc-lib v0.0.0-20240812075555-6e3069068ce4 // indirect | 	cloud.o-forge.io/core/oc-lib v0.0.0-20240821093044-f64563c9ff06 // indirect | ||||||
| 	github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7 // indirect | 	github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7 // indirect | ||||||
| 	github.com/antihax/optional v1.0.0 // indirect | 	github.com/antihax/optional v1.0.0 // indirect | ||||||
| 	github.com/aws/aws-sdk-go v1.36.29 // indirect | 	github.com/aws/aws-sdk-go v1.36.29 // indirect | ||||||
| @@ -56,7 +56,7 @@ require ( | |||||||
| 	github.com/montanaflynn/stats v0.7.1 // indirect | 	github.com/montanaflynn/stats v0.7.1 // indirect | ||||||
| 	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect | 	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect | ||||||
| 	github.com/nats-io/jwt v0.3.2 // indirect | 	github.com/nats-io/jwt v0.3.2 // indirect | ||||||
| 	github.com/nats-io/nkeys v0.1.3 // indirect | 	github.com/nats-io/nkeys v0.4.7 // indirect | ||||||
| 	github.com/nats-io/nuid v1.0.1 // indirect | 	github.com/nats-io/nuid v1.0.1 // indirect | ||||||
| 	github.com/pkg/errors v0.9.1 // indirect | 	github.com/pkg/errors v0.9.1 // indirect | ||||||
| 	github.com/prometheus/client_golang v1.19.0 // indirect | 	github.com/prometheus/client_golang v1.19.0 // indirect | ||||||
|   | |||||||
							
								
								
									
										6
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										6
									
								
								go.sum
									
									
									
									
									
								
							| @@ -4,6 +4,8 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20240808075405-f45ad91687c4 h1:3xqz2s6r/PONq | |||||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20240808075405-f45ad91687c4/go.mod h1:V5EL+NV2s9P1/BcFm3/icfLeBYVVMLl1Z0F0eecJZGo= | cloud.o-forge.io/core/oc-lib v0.0.0-20240808075405-f45ad91687c4/go.mod h1:V5EL+NV2s9P1/BcFm3/icfLeBYVVMLl1Z0F0eecJZGo= | ||||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20240812075555-6e3069068ce4 h1:fdxRsT4eR4v1DM3FpTPi9AKxB5oIw3XgLu9ByNipj4I= | cloud.o-forge.io/core/oc-lib v0.0.0-20240812075555-6e3069068ce4 h1:fdxRsT4eR4v1DM3FpTPi9AKxB5oIw3XgLu9ByNipj4I= | ||||||
| cloud.o-forge.io/core/oc-lib v0.0.0-20240812075555-6e3069068ce4/go.mod h1:V5EL+NV2s9P1/BcFm3/icfLeBYVVMLl1Z0F0eecJZGo= | cloud.o-forge.io/core/oc-lib v0.0.0-20240812075555-6e3069068ce4/go.mod h1:V5EL+NV2s9P1/BcFm3/icfLeBYVVMLl1Z0F0eecJZGo= | ||||||
|  | cloud.o-forge.io/core/oc-lib v0.0.0-20240821093044-f64563c9ff06 h1:sYveE1C/0mpSr+ZmOYxuZ3fTWID7mr5hPiq0jQenv3Q= | ||||||
|  | cloud.o-forge.io/core/oc-lib v0.0.0-20240821093044-f64563c9ff06/go.mod h1:1hhYh5QWAbYw9cKplQ0ZD9PMgU8t6gPqiYF8sldv1HU= | ||||||
| github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= | github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= | ||||||
| github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7 h1:n0MD6UkwbgGHtXsmfgVzC2+ZbHzIsScpbq9ZGI18074= | github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7 h1:n0MD6UkwbgGHtXsmfgVzC2+ZbHzIsScpbq9ZGI18074= | ||||||
| github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7/go.mod h1:xdrQDwHlKUmv8yiElMx6W0W10cLkqpeSEUUib8KGtv4= | github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7/go.mod h1:xdrQDwHlKUmv8yiElMx6W0W10cLkqpeSEUUib8KGtv4= | ||||||
| @@ -370,9 +372,13 @@ github.com/nats-io/nats-server/v2 v2.1.2 h1:i2Ly0B+1+rzNZHHWtD4ZwKi+OU5l+uQo1iDH | |||||||
| github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= | github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= | ||||||
| github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ= | github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ= | ||||||
| github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= | github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= | ||||||
|  | github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE= | ||||||
|  | github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8= | ||||||
| github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= | github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= | ||||||
| github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k= | github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k= | ||||||
| github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= | github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= | ||||||
|  | github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI= | ||||||
|  | github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc= | ||||||
| github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= | github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= | ||||||
| github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= | github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= | ||||||
| github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= | github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= | ||||||
|   | |||||||
							
								
								
									
										6
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										6
									
								
								main.go
									
									
									
									
									
								
							| @@ -5,18 +5,18 @@ import ( | |||||||
|  |  | ||||||
| 	conf "oc-schedulerd/conf" | 	conf "oc-schedulerd/conf" | ||||||
| 	"oc-schedulerd/daemons" | 	"oc-schedulerd/daemons" | ||||||
|  | 	"cloud.o-forge.io/core/oc-lib/tools" | ||||||
| 	oclib "cloud.o-forge.io/core/oc-lib" | 	oclib "cloud.o-forge.io/core/oc-lib" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| func main() { | func main() { | ||||||
| 	oclib.SetConfig(conf.GetConfig().MongoUrl, "DC_myDC") | 	tools.SetConfig(conf.GetConfig().MongoUrl, "DC_myDC", "") | ||||||
| 	oclib.Init("oc-schedulerd") | 	oclib.Init("oc-schedulerd") | ||||||
|  |  | ||||||
| 	sch_mngr := daemons.ScheduleManager{Logger: oclib.GetLogger()} | 	sch_mngr := daemons.ScheduleManager{Logger: oclib.GetLogger()} | ||||||
| 	exe_mngr := daemons.ExecutionManager{} | 	exe_mngr := daemons.ExecutionManager{} | ||||||
|  |  | ||||||
| 	go sch_mngr.ListenForWorkflowSubmissions() | 	go sch_mngr.ListenNATS() | ||||||
| 	go sch_mngr.SchedulePolling() | 	go sch_mngr.SchedulePolling() | ||||||
|  |  | ||||||
| 	exe_mngr.RetrieveNextExecutions() | 	exe_mngr.RetrieveNextExecutions() | ||||||
|   | |||||||
							
								
								
									
										
											BIN
										
									
								
								oc-schedulerd
									
									
									
									
									
								
							
							
						
						
									
										
											BIN
										
									
								
								oc-schedulerd
									
									
									
									
									
								
							
										
											Binary file not shown.
										
									
								
							
		Reference in New Issue
	
	Block a user