3 Commits

Author SHA1 Message Date
mr
6621d14d74 neo oclib 2025-02-06 08:54:00 +01:00
mr
e4305bdbd1 Adapt to new inputs env struct + instance of a resource 2025-02-05 08:38:50 +01:00
mr
0de2f9842b working oc-schedulerd 2025-01-17 17:21:17 +01:00
25 changed files with 973 additions and 78 deletions

View File

@@ -1,32 +1,22 @@
FROM golang:alpine AS deps
WORKDIR /app
COPY go.mod go.sum ./
RUN sed -i '/replace/d' go.mod
RUN go mod download -x
#----------------------------------------------------------------------------------------------
FROM golang:alpine AS builder
LABEL maintainer="IRT PFN"
ENV DOCKER_ENVIRONMENT=true
WORKDIR /app
COPY --from=deps /go/pkg /go/pkg
COPY --from=deps /app/go.mod /app/go.sum ./
COPY . .
RUN go build .
#----------------------------------------------------------------------------------------------
FROM oc-monitord:latest AS monitord
FROM argoproj/argocd:latest
ENV MONITORD_PATH = "./oc-monitord"
WORKDIR /app
COPY conf/docker_schedulerd.json /etc/oc/schedulerd.json
COPY --from=monitord /app/oc-monitord .
COPY --from=builder /app/oc-schedulerd .
COPY conf/docker_schedulerd.json /etc/oc/schedulerd.json

View File

@@ -1,24 +0,0 @@
.DEFAULT_GOAL := all
build: clean
go build .
run:
./oc-schedulerd
clean:
rm -rf oc-schedulerd
docker:
DOCKER_BUILDKIT=1 docker build -t oc/oc-schedulerd:0.0.1 -f Dockerfile .
docker tag oc/oc-schedulerd:0.0.1 oc/oc-schedulerd:latest
publish-kind:
kind load docker-image oc/oc-schedulerd:0.0.1 --name opencloud
publish-registry:
@echo "TODO"
all: docker publish-kind publish-registry
.PHONY: build run clean docker publish-kind publish-registry

View File

@@ -38,6 +38,12 @@ func init() {
o = onion.New(l2, l3)
}
GetConfig().MonitorPath = o.GetStringDefault("MONITORD_PATH", "../oc-monitord/oc-monitord")
GetConfig().Logs = o.GetStringDefault("LOG_LEVEL", "info")
GetConfig().LokiUrl = o.GetStringDefault("LOKI_URL", "http://127.0.0.1:3100")
GetConfig().NatsUrl = o.GetStringDefault("NATS_URL", "http://127.0.0.1:4222")
GetConfig().MongoUrl = o.GetStringDefault("MONGO_URL", "mongodb://127.0.0.1:27017")
GetConfig().DBName = o.GetStringDefault("MONGO_DATABASE", "DC_myDC")
}
func GetConfig() *Config {

View File

@@ -5,7 +5,6 @@ import (
"oc-schedulerd/conf"
"os/exec"
oclib "cloud.o-forge.io/core/oc-lib"
"github.com/rs/zerolog"
)
@@ -13,6 +12,7 @@ type LocalMonitor struct {
LokiURL string
KubeURL string
ExecutionID string
PeerID string
Duration int
Logger zerolog.Logger
}
@@ -31,7 +31,7 @@ func (lm *LocalMonitor) LaunchLocalMonitor() {
}
func (lm *LocalMonitor) execLocalKube() {
args := []string{"-e", lm.ExecutionID, "-u", lm.LokiURL, "-m", oclib.GetConfig().MongoUrl, "-d", oclib.GetConfig().MongoDatabase}
args := []string{"-e", lm.ExecutionID, "-p", lm.PeerID, "-u", lm.LokiURL, "-m", conf.GetConfig().MongoUrl, "-d", conf.GetConfig().DBName}
if lm.Duration > 0 {
args = append(args, "-t", fmt.Sprintf("%d", lm.Duration))
}

View File

@@ -1,6 +1,8 @@
package daemons
import (
"fmt"
"oc-schedulerd/conf"
"os"
"time"
@@ -8,7 +10,7 @@ import (
workflow_execution "cloud.o-forge.io/core/oc-lib/models/workflow_execution"
)
var Bookings = ScheduledBooking{Bookings: []*workflow_execution.WorkflowExecution{}}
var Bookings = ScheduledBooking{Bookings: []*workflow_execution.WorkflowExecutions{}}
type ExecutionManager struct{}
@@ -17,7 +19,7 @@ type ExecutionManager struct{}
func (em *ExecutionManager) RetrieveNextExecutions() {
logger := oclib.GetLogger()
for {
logger.Debug().Msg("New loop")
fmt.Println("Checking for bookings", len(Bookings.Bookings))
Bookings.Mu.Lock()
if len(Bookings.Bookings) > 0 {
bookings := Bookings.Bookings
@@ -34,7 +36,7 @@ func (em *ExecutionManager) RetrieveNextExecutions() {
}
}
func (em *ExecutionManager) executeBooking(booking *workflow_execution.WorkflowExecution) {
func (em *ExecutionManager) executeBooking(booking *workflow_execution.WorkflowExecutions) {
// start execution
// create the yaml that describes the pod : filename, path/url to Loki
exec_method := os.Getenv("MONITOR_METHOD")
@@ -44,15 +46,16 @@ func (em *ExecutionManager) executeBooking(booking *workflow_execution.WorkflowE
} else {
logger.Debug().Msg("Executing oc-monitor localy")
duration := 0
if booking.EndDate != nil && booking.ExecDate != nil {
duration = int(booking.EndDate.Sub(*booking.ExecDate).Seconds())
if booking.EndDate != nil {
duration = int(booking.EndDate.Sub(booking.ExecDate).Seconds())
}
monitor := LocalMonitor{
Logger: logger,
Duration: duration,
LokiURL: oclib.GetConfig().LokiUrl,
LokiURL: conf.GetConfig().LokiUrl,
KubeURL: "localhost",
ExecutionID: booking.UUID,
PeerID: booking.CreatorID,
}
monitor.LaunchLocalMonitor()
}

View File

@@ -3,11 +3,13 @@ package daemons
import (
"encoding/json"
"fmt"
"oc-schedulerd/conf"
"sync"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/dbs"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/nats-io/nats.go"
@@ -16,12 +18,12 @@ import (
)
type ScheduledBooking struct {
Bookings []*workflow_execution.WorkflowExecution
Bookings []*workflow_execution.WorkflowExecutions
Mu sync.Mutex
}
func (sb *ScheduledBooking) DeleteSchedules(workflow_id string) {
toNotDelete := []*workflow_execution.WorkflowExecution{}
toNotDelete := []*workflow_execution.WorkflowExecutions{}
for _, b := range sb.Bookings {
if b.WorkflowID != workflow_id {
toNotDelete = append(toNotDelete, b)
@@ -32,17 +34,18 @@ func (sb *ScheduledBooking) DeleteSchedules(workflow_id string) {
sb.Bookings = toNotDelete
}
func (sb *ScheduledBooking) AddSchedules(new_bookings []*workflow_execution.WorkflowExecution, logger zerolog.Logger) {
func (sb *ScheduledBooking) AddSchedules(new_bookings []*workflow_execution.WorkflowExecutions, logger zerolog.Logger) {
Bookings.Mu.Lock()
defer Bookings.Mu.Unlock()
for _, exec := range new_bookings {
fmt.Println("Adding "+exec.UUID, !sb.execIsSet(exec))
if !sb.execIsSet(exec) {
sb.Bookings = append(sb.Bookings, exec)
}
}
}
func (sb *ScheduledBooking) execIsSet(exec *workflow_execution.WorkflowExecution) bool {
func (sb *ScheduledBooking) execIsSet(exec *workflow_execution.WorkflowExecutions) bool {
for _, b := range sb.Bookings {
if b.Equals(exec) {
return true
@@ -63,7 +66,7 @@ type ScheduleManager struct {
// on workflows' scheduling. Messages must contain
// workflow execution ID, to allow retrieval of execution infos
func (s *ScheduleManager) ListenNATS() {
nc, err := nats.Connect(oclib.GetConfig().NATSUrl)
nc, err := nats.Connect(conf.GetConfig().NatsUrl)
if err != nil {
s.Logger.Error().Msg("Could not connect to NATS")
return
@@ -116,27 +119,32 @@ func (s *ScheduleManager) SchedulePolling() {
time.Sleep(time.Minute * time.Duration(sleep_time))
}
}
func (s *ScheduleManager) getExecution(from time.Time, to time.Time) (exec_list []*workflow_execution.WorkflowExecution, err error) {
func (s *ScheduleManager) getExecution(from time.Time, to time.Time) (exec_list []*workflow_execution.WorkflowExecutions, err error) {
fmt.Printf("Getting workflows execution from %s to %s \n", from.String(), to.String())
f := dbs.Filters{
And: map[string][]dbs.Filter{
"execution_date": {{Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(from)}, {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(to)}},
"state": {{Operator: dbs.EQUAL.String(), Value: 1}},
"state": {{Operator: dbs.EQUAL.String(), Value: enum.SCHEDULED}},
},
}
res := oclib.Search(&f, "", oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION))
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", "", []string{}, nil).Search(&f, "", false)
if res.Code != 200 {
s.Logger.Error().Msg("Error loading")
return
}
for _, exec := range res.Data {
exec_list = append(exec_list, exec.(*workflow_execution.WorkflowExecution))
exec_list = append(exec_list, exec.(*workflow_execution.WorkflowExecutions))
}
fmt.Println("Found "+fmt.Sprintf("%v", len(exec_list))+" workflows", res)
return
}
func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) {
start := time.Now().UTC()
fmt.Println(s.getExecution(
start.Add(time.Second*time.Duration(-1)).UTC(),
start.Add(time.Minute*time.Duration(minutes)).UTC(),
))
if next_wf_exec, err := s.getExecution(
start.Add(time.Second*time.Duration(-1)).UTC(),
start.Add(time.Minute*time.Duration(minutes)).UTC(),

View File

@@ -26,10 +26,10 @@ services:
- catalog
volumes:
- ./conf/grafana_data_source.yml:/etc/grafana/provisioning/datasources/datasource.yml
#environment:
#- GF_SECURITY_ADMIN_PASSWORD=pfnirt # Change this to anything but admin to not have a password change page at startup
#- GF_SECURITY_ADMIN_USER=admin
#- GF_SECURITY_DISABLE_INITIAL_ADMIN_PASSWORD_CHANGE=true
environment:
- GF_SECURITY_ADMIN_PASSWORD=pfnirt # Change this to anything but admin to not have a password change page at startup
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_DISABLE_INITIAL_ADMIN_PASSWORD_CHANGE=true
networks:
catalog:

26
go.mod
View File

@@ -5,17 +5,24 @@ go 1.22.0
toolchain go1.22.5
require (
cloud.o-forge.io/core/oc-lib v0.0.0-20250110163958-fd1c579ec418
github.com/beego/beego v1.12.14
github.com/beego/beego v1.12.12
github.com/beego/beego/v2 v2.3.1
github.com/goraz/onion v0.1.3
github.com/nats-io/nats.go v1.37.0
github.com/nwtgck/go-fakelish v0.1.3
github.com/rs/zerolog v1.33.0
github.com/tidwall/gjson v1.17.1
gopkg.in/yaml.v3 v3.0.1
k8s.io/client-go v0.30.3
)
require (
github.com/beego/beego/v2 v2.3.1 // indirect
cloud.o-forge.io/core/oc-lib v0.0.0-20250205160221-88b7cfe2fd0f // indirect
github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7 // indirect
github.com/antihax/optional v1.0.0 // indirect
github.com/aws/aws-sdk-go v1.36.29 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/biter777/countries v1.7.5 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
@@ -27,6 +34,7 @@ require (
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.22.1 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
@@ -35,11 +43,13 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/imdario/mergo v0.3.8 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/marcinwyszynski/geopoint v0.0.0-20140302213024-cf2a6f750c5b // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
@@ -47,18 +57,27 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nats-io/jwt v0.3.2 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/ugorji/go/codec v1.1.7 // indirect
github.com/vk496/cron v1.2.0 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
go.mongodb.org/mongo-driver v1.17.1 // indirect
golang.org/x/crypto v0.28.0 // indirect
@@ -73,7 +92,6 @@ require (
google.golang.org/protobuf v1.35.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.30.3 // indirect
k8s.io/apimachinery v0.30.3 // indirect
k8s.io/klog/v2 v2.120.1 // indirect

753
go.sum

File diff suppressed because it is too large Load Diff

13
main.go
View File

@@ -3,21 +3,20 @@ package main
import (
"fmt"
"oc-schedulerd/conf"
"oc-schedulerd/daemons"
oclib "cloud.o-forge.io/core/oc-lib"
)
func main() {
oclib.InitDaemon("oc-schedulerd")
conf := oclib.GetConfLoader()
oclib.SetConfig(
conf.GetStringDefault("MONGO_URL", "mongodb://127.0.0.1:27017"),
conf.GetStringDefault("MONGO_DATABASE", "DC_myDC"),
conf.GetStringDefault("NATS_URL", "nats://localhost:4222"),
conf.GetStringDefault("LOKI_URL", ""),
conf.GetStringDefault("LOG_LEVEL", "info"),
conf.GetConfig().MongoUrl,
conf.GetConfig().DBName,
conf.GetConfig().NatsUrl,
conf.GetConfig().LokiUrl,
conf.GetConfig().Logs,
)
sch_mngr := daemons.ScheduleManager{Logger: oclib.GetLogger()}
exe_mngr := daemons.ExecutionManager{}

Binary file not shown.

BIN
swagger/favicon-16x16.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 665 B

BIN
swagger/favicon-32x32.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 628 B

60
swagger/index.html Normal file
View File

@@ -0,0 +1,60 @@
<!-- HTML for static distribution bundle build -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Swagger UI</title>
<link rel="stylesheet" type="text/css" href="./swagger-ui.css" />
<link rel="icon" type="image/png" href="./favicon-32x32.png" sizes="32x32" />
<link rel="icon" type="image/png" href="./favicon-16x16.png" sizes="16x16" />
<style>
html
{
box-sizing: border-box;
overflow: -moz-scrollbars-vertical;
overflow-y: scroll;
}
*,
*:before,
*:after
{
box-sizing: inherit;
}
body
{
margin:0;
background: #fafafa;
}
</style>
</head>
<body>
<div id="swagger-ui"></div>
<script src="./swagger-ui-bundle.js" charset="UTF-8"> </script>
<script src="./swagger-ui-standalone-preset.js" charset="UTF-8"> </script>
<script>
window.onload = function() {
// Begin Swagger UI call region
const ui = SwaggerUIBundle({
url: "https://petstore.swagger.io/v2/swagger.json",
dom_id: '#swagger-ui',
deepLinking: true,
presets: [
SwaggerUIBundle.presets.apis,
SwaggerUIStandalonePreset
],
plugins: [
SwaggerUIBundle.plugins.DownloadUrl
],
layout: "StandaloneLayout"
});
// End Swagger UI call region
window.ui = ui;
};
</script>
</body>
</html>

View File

@@ -0,0 +1,79 @@
<!doctype html>
<html lang="en-US">
<head>
<title>Swagger UI: OAuth2 Redirect</title>
</head>
<body>
<script>
'use strict';
function run () {
var oauth2 = window.opener.swaggerUIRedirectOauth2;
var sentState = oauth2.state;
var redirectUrl = oauth2.redirectUrl;
var isValid, qp, arr;
if (/code|token|error/.test(window.location.hash)) {
qp = window.location.hash.substring(1);
} else {
qp = location.search.substring(1);
}
arr = qp.split("&");
arr.forEach(function (v,i,_arr) { _arr[i] = '"' + v.replace('=', '":"') + '"';});
qp = qp ? JSON.parse('{' + arr.join() + '}',
function (key, value) {
return key === "" ? value : decodeURIComponent(value);
}
) : {};
isValid = qp.state === sentState;
if ((
oauth2.auth.schema.get("flow") === "accessCode" ||
oauth2.auth.schema.get("flow") === "authorizationCode" ||
oauth2.auth.schema.get("flow") === "authorization_code"
) && !oauth2.auth.code) {
if (!isValid) {
oauth2.errCb({
authId: oauth2.auth.name,
source: "auth",
level: "warning",
message: "Authorization may be unsafe, passed state was changed in server Passed state wasn't returned from auth server"
});
}
if (qp.code) {
delete oauth2.state;
oauth2.auth.code = qp.code;
oauth2.callback({auth: oauth2.auth, redirectUrl: redirectUrl});
} else {
let oauthErrorMsg;
if (qp.error) {
oauthErrorMsg = "["+qp.error+"]: " +
(qp.error_description ? qp.error_description+ ". " : "no accessCode received from the server. ") +
(qp.error_uri ? "More info: "+qp.error_uri : "");
}
oauth2.errCb({
authId: oauth2.auth.name,
source: "auth",
level: "error",
message: oauthErrorMsg || "[Authorization failed]: no accessCode received from the server"
});
}
} else {
oauth2.callback({auth: oauth2.auth, token: qp, isValid: isValid, redirectUrl: redirectUrl});
}
window.close();
}
if (document.readyState !== 'loading') {
run();
} else {
document.addEventListener('DOMContentLoaded', function () {
run();
});
}
</script>
</body>
</html>

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

4
swagger/swagger-ui.css Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

2
swagger/swagger-ui.js Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long