Compare commits
78 Commits
execute-mo
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| c1609ea9d9 | |||
| 3d8e36436c | |||
| df7ecacb75 | |||
| ce3425e9eb | |||
| dbc41f0326 | |||
| 142a81197b | |||
| 41b92cebec | |||
| 016f5017f1 | |||
| 2a76f2b9fb | |||
| 07ca18e347 | |||
| c3f769ccd5 | |||
| 26e5db4572 | |||
| 937c811e8d | |||
| a9c7bb66ee | |||
| 87951c8a77 | |||
| 1fda2b1334 | |||
| 01098842dd | |||
| d267c88fb5 | |||
| 8bbbd1dcfb | |||
| 7717a02f7f | |||
|
|
568d5bac30 | ||
|
|
666ffe10f5 | ||
| 817f3eb468 | |||
| 937116e9c5 | |||
| bd58016d4b | |||
| 0bfa8fdad0 | |||
|
|
54610f2dcc | ||
|
|
defdcf4264 | ||
|
|
bcc024caef | ||
|
|
6fce8f3aac | ||
| b6dea94196 | |||
| fba84ac612 | |||
|
|
1b21c142f1 | ||
|
|
6c3a20999b | ||
|
|
90fa0b8edd | ||
|
|
b43cb6d758 | ||
|
|
d94f9603e8 | ||
|
|
494ba2f361 | ||
|
|
0f6213cd14 | ||
|
|
012c8a83cb | ||
|
|
a59a48cc66 | ||
|
|
e106d7e243 | ||
|
|
139b249a7c | ||
| 7a7364fb45 | |||
| b4d57a8f2f | |||
| 907880bfd6 | |||
| 09b8046073 | |||
| 90af391a0d | |||
| 88b2edee2f | |||
| 88610c3dba | |||
| b753523c35 | |||
| 7246dea2b2 | |||
| 6621d14d74 | |||
| e4305bdbd1 | |||
| 0de2f9842b | |||
| 8ddb119899 | |||
| 7d78920304 | |||
| b4cef41db2 | |||
| 8eeba712e7 | |||
| 9a676297f6 | |||
| 066f6b54e0 | |||
| 3e36ed0ecf | |||
| 2589451b07 | |||
| c3d553068c | |||
| 04a157ba64 | |||
| 6ab737c915 | |||
| 47570d9423 | |||
| 2bc6e4327e | |||
| a69ecc4ab5 | |||
| 7206de35a8 | |||
| 20b5955ba9 | |||
| 826650487b | |||
| c5d15d32da | |||
| 825c18b6d6 | |||
| e5cfd6f4fb | |||
| c710469881 | |||
| 41f93a292c | |||
| 5b626dcb21 |
2
.gitignore
vendored
2
.gitignore
vendored
@@ -8,7 +8,7 @@
|
||||
*.dll
|
||||
*.so
|
||||
*.dylib
|
||||
|
||||
env.env
|
||||
# Test binary, built with `go test -c`
|
||||
*.test
|
||||
|
||||
|
||||
18
.vscode/launch.json
vendored
Normal file
18
.vscode/launch.json
vendored
Normal file
@@ -0,0 +1,18 @@
|
||||
{
|
||||
// Use IntelliSense to learn about possible attributes.
|
||||
// Hover to view descriptions of existing attributes.
|
||||
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
{
|
||||
"name": "Launch Package",
|
||||
"type": "go",
|
||||
"request": "launch",
|
||||
"mode": "auto",
|
||||
"program": "${fileDirname}",
|
||||
"env": {
|
||||
"MONITOR_METHOD" : "local"
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
42
Dockerfile
42
Dockerfile
@@ -1,18 +1,44 @@
|
||||
FROM golang:alpine AS builder
|
||||
LABEL maintainer="IRT PFN"
|
||||
ENV DOCKER_ENVIRONMENT=true
|
||||
ARG KUBERNETES_HOST=${KUBERNETES_HOST:-"127.0.0.1"}
|
||||
FROM golang:alpine AS deps
|
||||
|
||||
ARG MONITORD_IMAGE
|
||||
|
||||
WORKDIR /app
|
||||
COPY go.mod go.sum ./
|
||||
RUN sed -i '/replace/d' go.mod
|
||||
RUN go mod download -x
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
|
||||
FROM golang:alpine AS builder
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY --from=deps /go/pkg /go/pkg
|
||||
COPY --from=deps /app/go.mod /app/go.sum ./
|
||||
|
||||
COPY . .
|
||||
COPY conf/docker_scheduler.json /etc/oc/scheduler.json
|
||||
|
||||
RUN go build .
|
||||
RUN go build
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
FROM ${MONITORD_IMAGE:-oc-monitord}:latest AS monitord
|
||||
|
||||
FROM golang:alpine
|
||||
|
||||
ENV KUBERNETES_SERVICE_HOST=$KUBERNETES_HOST
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY --from=builder /app/oc-scheduler .
|
||||
COPY conf/docker_scheduler.json /etc/oc/scheduler.json
|
||||
COPY docker_schedulerd.json /etc/oc/schedulerd.json
|
||||
|
||||
ENTRYPOINT ["/app/oc-scheduler"]
|
||||
COPY --from=monitord /app/oc-monitord /usr/bin/oc-monitord
|
||||
COPY --from=builder /app/oc-schedulerd /usr/bin/oc-schedulerd
|
||||
|
||||
COPY docker_schedulerd.json /etc/oc/schedulerd.json
|
||||
|
||||
# COPY argo_workflows .
|
||||
|
||||
EXPOSE 8080
|
||||
|
||||
ENTRYPOINT ["oc-schedulerd"]
|
||||
|
||||
36
Makefile
Normal file
36
Makefile
Normal file
@@ -0,0 +1,36 @@
|
||||
.DEFAULT_GOAL := all
|
||||
|
||||
build: clean
|
||||
go build .
|
||||
|
||||
build-monitord:
|
||||
go build -o ../oc-monitord ../oc-monitord
|
||||
|
||||
run:
|
||||
./oc-schedulerd
|
||||
|
||||
clean:
|
||||
rm -rf oc-schedulerd
|
||||
|
||||
docker:
|
||||
DOCKER_BUILDKIT=1 docker build -t oc-schedulerd --build-arg MONITORD_IMAGE=oc-monitord -f Dockerfile . --build-arg=HOST=$(HOST) --build-arg=KUBERNETES_HOST=$(KUBERNETES_HOST) --build-arg=KUBERNETES_SERVICE_PORT=$(KUBERNETES_SERVICE_PORT) --build-arg=KUBE_CA=$(KUBE_CA) --build-arg=KUBE_CERT=$(KUBE_CERT) --build-arg=KUBE_DATA=$(KUBE_DATA)
|
||||
docker tag oc-schedulerd opencloudregistry/oc-schedulerd:latest
|
||||
|
||||
publish-kind:
|
||||
kind load docker-image opencloudregistry/oc-schedulerd:latest --name $(CLUSTER_NAME) | true
|
||||
|
||||
publish-registry:
|
||||
docker push opencloudregistry/oc-schedulerd:latest
|
||||
|
||||
docker-deploy:
|
||||
docker compose up -d
|
||||
|
||||
run-docker: docker publish-kind publish-registry docker-deploy
|
||||
|
||||
all: docker publish-kind
|
||||
|
||||
ci: docker publish-registry
|
||||
|
||||
dev: build-monitord build run
|
||||
|
||||
.PHONY: build run clean docker publish-kind publish-registry
|
||||
38
README.md
38
README.md
@@ -1,18 +1,34 @@
|
||||
# oc-scheduler
|
||||
|
||||
OC-Scheduler retrieves the content of submitted workflows and prepare them to be executed.
|
||||
oc-schedulerd is a daemon performing to actions at the same time :
|
||||
- subscribing to the local NATS instance' custom channels for message commanding either the scheduling or the removing of an execution.
|
||||
- polling oc-catalog for scheduled executions
|
||||
|
||||
## Parsing
|
||||
Depending on the environment it is running in, oc-schedulerd will either :
|
||||
- execute the oc-monitord binary
|
||||
- run an oc-monitord container
|
||||
|
||||
From a workflow's name we retrieve the xml graph associated and parse it in order to create the object representing each componant.
|
||||
Each object is linked to another, represented by a links object with the two object IDs has attributes.
|
||||
## Parameters
|
||||
|
||||
TODO :
|
||||
- [x] Retrieve the user input's for each component.
|
||||
oc-schedulerd uses json files to load its configuration. The template for this configuration file is below
|
||||
|
||||
## Organising
|
||||
```json
|
||||
{
|
||||
"LOKI_URL" : "http://[IP/URL]:3100",
|
||||
"MONGO_URL":"mongodb://[IP/URL]:27017/",
|
||||
"NATS_URL":"nats://[IP/URL]:4222",
|
||||
"MONGO_DATABASE":"",
|
||||
"MONITORD_PATH": "",
|
||||
"KUBERNETES_SERVICE_HOST" : "[IP/URL]",
|
||||
"MONITOR_MODE": "",
|
||||
"KUBE_CA": "",
|
||||
"KUBE_CERT": "",
|
||||
"KUBE_DATA": ""
|
||||
}
|
||||
```
|
||||
|
||||
TODO :
|
||||
- [ ] create an argo file from the graph/worfklow
|
||||
- [ ] Create a different entry for each component
|
||||
- [ ] execute each element in the right order
|
||||
**monitor_mode** : should be either "local","container", ""
|
||||
|
||||
## TODO
|
||||
|
||||
- [ ] Implement the discovery of current mode : local, local in container, as a container
|
||||
53
conf/conf.go
53
conf/conf.go
@@ -8,55 +8,46 @@ import (
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
OcCatalogUrl string
|
||||
MongoUrl string
|
||||
DBName string
|
||||
Logs string
|
||||
LokiUrl string
|
||||
NatsUrl string
|
||||
MonitorPath string
|
||||
Logs string
|
||||
KubeHost string
|
||||
KubePort string
|
||||
KubeCA string
|
||||
KubeCert string
|
||||
KubeData string
|
||||
KubeNamespace string
|
||||
KubeImage string
|
||||
}
|
||||
|
||||
var instance *Config
|
||||
var once sync.Once
|
||||
|
||||
const defaultConfigFile = "/etc/oc/scheduler.json"
|
||||
const localConfigFile = "./conf/local_scheduler.json"
|
||||
|
||||
|
||||
func init(){
|
||||
const defaultConfigFile = "/etc/oc/schedulerd.json"
|
||||
|
||||
func init() {
|
||||
configFile := ""
|
||||
var o *onion.Onion
|
||||
|
||||
l3 := onion.NewEnvLayerPrefix("_", "OCSCHEDULER_")
|
||||
l3 := onion.NewEnvLayerPrefix("_", "OCSCHEDULERD_")
|
||||
l2, err := onion.NewFileLayer(defaultConfigFile, nil)
|
||||
if err == nil {
|
||||
logs.Info("Config file found : " + defaultConfigFile)
|
||||
configFile = defaultConfigFile
|
||||
}
|
||||
l1, err := onion.NewFileLayer(localConfigFile, nil)
|
||||
if err == nil {
|
||||
logs.Info("Local config file found " + localConfigFile + ", overriding default file")
|
||||
configFile = localConfigFile
|
||||
}
|
||||
if configFile == "" {
|
||||
if configFile == "" || l2 == nil {
|
||||
logs.Info("No config file found, using env")
|
||||
o = onion.New(l3)
|
||||
} else if l1 == nil && l2 == nil {
|
||||
o = onion.New(l1, l2, l3)
|
||||
} else if l1 == nil {
|
||||
} else {
|
||||
o = onion.New(l2, l3)
|
||||
} else if l2 == nil {
|
||||
o = onion.New(l1, l3)
|
||||
}
|
||||
|
||||
GetConfig().OcCatalogUrl = o.GetStringDefault("oc-catalog", "https://localhost:49618")
|
||||
GetConfig().Logs = o.GetStringDefault("loglevel", "info")
|
||||
GetConfig().LokiUrl = o.GetStringDefault("loki_url","http://127.0.0.1:3100")
|
||||
GetConfig().NatsUrl = o.GetStringDefault("nats_url","http://127.0.0.1:4222")
|
||||
GetConfig().MongoUrl = o.GetStringDefault("mongo_url","mongodb://127.0.0.1:27017")
|
||||
GetConfig().DBName = o.GetStringDefault("database_name","DC_myDC")
|
||||
|
||||
GetConfig().MonitorPath = o.GetStringDefault("MONITORD_PATH", "../oc-monitord/oc-monitord")
|
||||
GetConfig().KubeHost = o.GetStringDefault("KUBE_HOST", "")
|
||||
GetConfig().KubePort = o.GetStringDefault("KUBE_PORT", "6443")
|
||||
GetConfig().KubeCA = o.GetStringDefault("KUBE_CA", "")
|
||||
GetConfig().KubeCert = o.GetStringDefault("KUBE_CERT", "")
|
||||
GetConfig().KubeData = o.GetStringDefault("KUBE_DATA", "")
|
||||
GetConfig().KubeNamespace = o.GetStringDefault("KUBE_NAMESPACE", "default")
|
||||
GetConfig().KubeImage = o.GetStringDefault("KUBE_IMAGE", "oc-monitord")
|
||||
}
|
||||
|
||||
func GetConfig() *Config {
|
||||
|
||||
@@ -1,4 +0,0 @@
|
||||
{
|
||||
"oc-catalog" : "http://oc-catalog:49618/",
|
||||
"loki_url" : "http://192.168.1.18:3100"
|
||||
}
|
||||
8
conf/grafana_data_source.yml
Normal file
8
conf/grafana_data_source.yml
Normal file
@@ -0,0 +1,8 @@
|
||||
datasources:
|
||||
- name: Loki
|
||||
type: loki
|
||||
access: proxy
|
||||
url: http://loki:3100
|
||||
isDefault: true
|
||||
jsonData:
|
||||
httpMethod: POST
|
||||
@@ -1,5 +0,0 @@
|
||||
{
|
||||
"oc-catalog" : "http://localhost:49618/",
|
||||
"logs" : "",
|
||||
"mongo_url": "mongodb://127.0.0.1:27017"
|
||||
}
|
||||
143
daemons/execute_monitor_container.go
Normal file
143
daemons/execute_monitor_container.go
Normal file
@@ -0,0 +1,143 @@
|
||||
package daemons
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"oc-schedulerd/conf"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
||||
"github.com/rs/zerolog"
|
||||
batchv1 "k8s.io/api/batch/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
type ContainerMonitor struct {
|
||||
Monitor LocalMonitor
|
||||
KubeCA string
|
||||
KubeCert string
|
||||
KubeData string
|
||||
KubeHost string
|
||||
KubePort string
|
||||
KubeNamespace string
|
||||
KubeImage string
|
||||
}
|
||||
|
||||
func NewContainerMonitor(UUID string, peerId string, duration int) Executor {
|
||||
return &ContainerMonitor{
|
||||
Monitor: LocalMonitor{
|
||||
ExecutionID: UUID,
|
||||
PeerID: peerId,
|
||||
Duration: duration,
|
||||
LokiUrl: oclib.GetConfig().LokiUrl,
|
||||
MongoUrl: oclib.GetConfig().MongoUrl,
|
||||
DBName: oclib.GetConfig().MongoDatabase,
|
||||
},
|
||||
KubeCA: conf.GetConfig().KubeCA,
|
||||
KubeCert: conf.GetConfig().KubeCert,
|
||||
KubeData: conf.GetConfig().KubeData,
|
||||
KubeHost: conf.GetConfig().KubeHost,
|
||||
KubePort: conf.GetConfig().KubePort,
|
||||
KubeNamespace: conf.GetConfig().KubeNamespace,
|
||||
KubeImage: conf.GetConfig().KubeImage,
|
||||
}
|
||||
}
|
||||
|
||||
func (cm *ContainerMonitor) PrepareMonitorExec() []string {
|
||||
|
||||
args := []string{
|
||||
"-e", cm.Monitor.ExecutionID,
|
||||
"-p", cm.Monitor.PeerID,
|
||||
"-u", cm.Monitor.LokiUrl,
|
||||
"-m", cm.Monitor.MongoUrl,
|
||||
"-d", cm.Monitor.DBName,
|
||||
"-M", "kubernetes",
|
||||
"-H", cm.KubeHost,
|
||||
"-P", cm.KubePort,
|
||||
"-C", cm.KubeCert,
|
||||
"-D", cm.KubeData,
|
||||
"-c", cm.KubeCA,
|
||||
}
|
||||
|
||||
if cm.Monitor.Duration > 0 {
|
||||
args = append(args, "-t", fmt.Sprintf("%d", cm.Monitor.Duration))
|
||||
}
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
func (cm *ContainerMonitor) failExec(execID string, l zerolog.Logger, msg string) {
|
||||
l.Error().Msg(msg)
|
||||
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
|
||||
"state": enum.FAILURE.EnumIndex(),
|
||||
}, execID)
|
||||
}
|
||||
|
||||
func (cm *ContainerMonitor) LaunchMonitor(args []string, execID string, l zerolog.Logger) {
|
||||
|
||||
ca, err := base64.StdEncoding.DecodeString(cm.KubeCA)
|
||||
if err != nil {
|
||||
cm.failExec(execID, l, "Failed to decode KubeCA: "+err.Error())
|
||||
return
|
||||
}
|
||||
cert, err := base64.StdEncoding.DecodeString(cm.KubeCert)
|
||||
if err != nil {
|
||||
cm.failExec(execID, l, "Failed to decode KubeCert: "+err.Error())
|
||||
return
|
||||
}
|
||||
key, err := base64.StdEncoding.DecodeString(cm.KubeData)
|
||||
if err != nil {
|
||||
cm.failExec(execID, l, "Failed to decode KubeData: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
cfg := &rest.Config{
|
||||
Host: "https://" + cm.KubeHost + ":" + cm.KubePort,
|
||||
TLSClientConfig: rest.TLSClientConfig{
|
||||
CAData: ca,
|
||||
CertData: cert,
|
||||
KeyData: key,
|
||||
},
|
||||
}
|
||||
|
||||
clientset, err := kubernetes.NewForConfig(cfg)
|
||||
if err != nil {
|
||||
cm.failExec(execID, l, "Failed to build Kubernetes client: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
backoffLimit := int32(0)
|
||||
job := &batchv1.Job{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "oc-monitord-" + execID,
|
||||
Namespace: cm.KubeNamespace,
|
||||
},
|
||||
Spec: batchv1.JobSpec{
|
||||
BackoffLimit: &backoffLimit,
|
||||
Template: corev1.PodTemplateSpec{
|
||||
Spec: corev1.PodSpec{
|
||||
RestartPolicy: corev1.RestartPolicyNever,
|
||||
Containers: []corev1.Container{
|
||||
{
|
||||
Name: "oc-monitord",
|
||||
Image: cm.KubeImage,
|
||||
Args: args,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
_, err = clientset.BatchV1().Jobs(cm.KubeNamespace).Create(context.Background(), job, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
cm.failExec(execID, l, "Failed to create Kubernetes Job: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
l.Info().Msg("Started Kubernetes Job oc-monitord-" + execID)
|
||||
}
|
||||
@@ -1,67 +0,0 @@
|
||||
package daemons
|
||||
|
||||
// type manifestValues struct{
|
||||
// ARGO_FILE string
|
||||
// LOKI_URL string
|
||||
// CONTAINER_NAME string
|
||||
// }
|
||||
|
||||
// manifest, err := em.CreateManifest(booking, argo_file_path)
|
||||
// if err != nil {
|
||||
// logger.Logger.Error().Msg("Could not create manifest " + err.Error())
|
||||
// }
|
||||
|
||||
// // launch a pod that contains oc-monitor, give it the name of the workflow
|
||||
// cmd := exec.Command("kubectl","apply","-f", "manifests/"+manifest)
|
||||
// output , err := cmd.CombinedOutput()
|
||||
// if err != nil {
|
||||
// logger.Logger.Error().Msg("failed to create new pod for " + booking.Workflow + " :" + err.Error())
|
||||
// return
|
||||
// }
|
||||
|
||||
// func (*ExecutionManager) CreateManifest(booking models.Booking, argo_file string) (manifest_filepath string, err error) {
|
||||
|
||||
// var filled_template bytes.Buffer
|
||||
|
||||
// manifest_file, err := os.ReadFile("conf/monitor_pod_template.yml")
|
||||
// if err != nil {
|
||||
// logger.Logger.Error().Msg("Could not open the k8s template file for " + booking.Workflow)
|
||||
// return "", err
|
||||
// }
|
||||
|
||||
// container_name := getContainerName(argo_file)
|
||||
// tmpl, err := template.New("manifest_template").Parse(string(manifest_file))
|
||||
// if err != nil {
|
||||
// logger.Logger.Error().Msg(err.Error())
|
||||
// return "", err
|
||||
// }
|
||||
|
||||
// manifest_data := manifestValues{
|
||||
// ARGO_FILE: argo_file,
|
||||
// LOKI_URL: conf.GetConfig().Logs,
|
||||
// CONTAINER_NAME: container_name,
|
||||
// }
|
||||
|
||||
// err = tmpl.Execute(&filled_template, manifest_data)
|
||||
// if err != nil {
|
||||
// logger.Logger.Error().Msg("Could not complete manifest template for " + booking.Workflow)
|
||||
// return "", err }
|
||||
|
||||
// manifest_filepath = booking.Workflow + "_manifest.yaml"
|
||||
|
||||
// err = os.WriteFile("manifests/" + manifest_filepath, filled_template.Bytes(),0644)
|
||||
// if err != nil {
|
||||
// logger.Logger.Error().Msg("Could not write the YAML file for " + booking.Workflow + "'s manifest")
|
||||
// return "", err
|
||||
// }
|
||||
|
||||
// return manifest_filepath, nil
|
||||
// }
|
||||
|
||||
// func getContainerName(argo_file string) string {
|
||||
// regex := "([a-zA-Z]+-[a-zA-Z]+)"
|
||||
// re := regexp.MustCompile(regex)
|
||||
|
||||
// container_name := re.FindString(argo_file)
|
||||
// return container_name
|
||||
// }
|
||||
@@ -1,47 +1,79 @@
|
||||
package daemons
|
||||
|
||||
import (
|
||||
"oc-scheduler/conf"
|
||||
"oc-scheduler/logger"
|
||||
"fmt"
|
||||
"oc-schedulerd/conf"
|
||||
"os/exec"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
type LocalMonitor struct{
|
||||
LokiURL string
|
||||
KubeURL string
|
||||
WorkflowName string
|
||||
type LocalMonitor struct {
|
||||
ExecutionID string
|
||||
PeerID string
|
||||
Duration int
|
||||
LokiUrl string
|
||||
MongoUrl string
|
||||
DBName string
|
||||
}
|
||||
|
||||
func (lm *LocalMonitor) LaunchLocalMonitor (){
|
||||
if (lm.LokiURL == "" || lm.KubeURL == "" || lm.WorkflowName == ""){
|
||||
logger.Logger.Error().Msg("Missing parameter in LocalMonitor")
|
||||
}
|
||||
|
||||
// For dev purposes, in prod KubeURL must be a kube API's URL
|
||||
if(lm.KubeURL == "localhost"){
|
||||
lm.execLocalKube()
|
||||
} else{
|
||||
lm.execRemoteKube()
|
||||
func NewLocalMonitor(UUID string, peerId string, duration int) Executor {
|
||||
return &LocalMonitor{
|
||||
ExecutionID: UUID,
|
||||
PeerID: peerId,
|
||||
Duration: duration,
|
||||
LokiUrl: oclib.GetConfig().LokiUrl,
|
||||
MongoUrl: oclib.GetConfig().MongoUrl,
|
||||
DBName: oclib.GetConfig().MongoDatabase,
|
||||
}
|
||||
}
|
||||
|
||||
func (lm *LocalMonitor) execLocalKube (){
|
||||
// kube_url := ""
|
||||
cmd := exec.Command("../oc-monitor/oc-monitor", "-w",lm.WorkflowName, "-u", lm.LokiURL, "-m", conf.GetConfig().MongoUrl,"-d", conf.GetConfig().DBName)
|
||||
// cmd_ls := exec.Command("ls", "../oc-monitor")
|
||||
err := cmd.Start()
|
||||
// output, err := cmd_ls.CombinedOutput()
|
||||
if err !=nil {
|
||||
logger.Logger.Error().Msg("Could not start oc-monitor for " + lm.WorkflowName + " : " + err.Error())
|
||||
// func (lm *LocalMonitor) LaunchLocalMonitor() {
|
||||
// if lm.ExecutionID == "" {
|
||||
// lm.Logger.Error().Msg("Missing parameter in LocalMonitor")
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
func (lm *LocalMonitor) PrepareMonitorExec() []string {
|
||||
|
||||
args := []string{
|
||||
"-e", lm.ExecutionID,
|
||||
"-p", lm.PeerID,
|
||||
"-u", lm.LokiUrl,
|
||||
"-m", lm.MongoUrl,
|
||||
"-d", lm.DBName,
|
||||
}
|
||||
|
||||
if lm.Duration > 0 {
|
||||
args = append(args, "-t", fmt.Sprintf("%d", lm.Duration))
|
||||
}
|
||||
|
||||
return args
|
||||
}
|
||||
|
||||
func (lm *LocalMonitor) LaunchMonitor(args []string, execID string, l zerolog.Logger) {
|
||||
cmd := exec.Command(conf.GetConfig().MonitorPath, args...)
|
||||
fmt.Printf("Command : %v\n", cmd)
|
||||
|
||||
func (lm *LocalMonitor) execRemoteKube (){
|
||||
|
||||
}
|
||||
|
||||
func (lm *LocalMonitor) todo (){
|
||||
stdoutMonitord, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
l.Error().Msg("Could not retrieve stdoutpipe for execution of oc-monitord" + err.Error())
|
||||
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
|
||||
"state": enum.FAILURE.EnumIndex(),
|
||||
}, execID)
|
||||
return
|
||||
}
|
||||
|
||||
err = cmd.Start()
|
||||
if err != nil {
|
||||
l.Error().Msg("Could not start oc-monitor for " + lm.ExecutionID + " : " + err.Error())
|
||||
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
|
||||
"state": enum.FAILURE.EnumIndex(),
|
||||
}, execID)
|
||||
return
|
||||
}
|
||||
logExecution(stdoutMonitord, l)
|
||||
}
|
||||
@@ -1,72 +1,93 @@
|
||||
package daemons
|
||||
|
||||
import (
|
||||
"oc-scheduler/conf"
|
||||
"oc-scheduler/logger"
|
||||
"oc-scheduler/models"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/dbs"
|
||||
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
||||
workflow_execution "cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
|
||||
type ExecutionManager struct {
|
||||
bookings *models.ScheduledBooking
|
||||
executions []models.Booking
|
||||
}
|
||||
var Executions = ScheduledExecution{Execs: map[string]workflow_execution.WorkflowExecution{}}
|
||||
|
||||
type ExecutionManager struct{}
|
||||
|
||||
|
||||
func (em *ExecutionManager) SetBookings(b *models.ScheduledBooking){
|
||||
em.bookings = b
|
||||
}
|
||||
|
||||
// Loop every second on the booking's list and move the booking that must start to a new list
|
||||
// Loop every second on the Execution's list and move the Execution that must start to a new list
|
||||
// that will be looped over to start them
|
||||
func (em *ExecutionManager) RetrieveNextExecutions(){
|
||||
func (em *ExecutionManager) RetrieveNextExecutions() {
|
||||
logger := oclib.GetLogger()
|
||||
for {
|
||||
Executions.Mu.Lock()
|
||||
if len(Executions.Execs) > 0 {
|
||||
executions := Executions.Execs
|
||||
orderedExec := map[int]map[string]workflow_execution.WorkflowExecution{}
|
||||
for execId, exec := range executions {
|
||||
if orderedExec[exec.Priority] == nil {
|
||||
orderedExec[exec.Priority] = map[string]workflow_execution.WorkflowExecution{}
|
||||
}
|
||||
orderedExec[exec.Priority][execId] = exec
|
||||
}
|
||||
for i := range []int{7, 6, 5, 4, 3, 2, 1, 0} { // priority in reversed
|
||||
if orderedExec[i] == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
if(em.bookings == nil){
|
||||
logger.Logger.Error().Msg("booking has not been set in the exection manager")
|
||||
return
|
||||
}
|
||||
|
||||
for(true){
|
||||
logger.Logger.Debug().Msg("New loop")
|
||||
em.bookings.Mu.Lock()
|
||||
bookings := em.bookings.Bookings
|
||||
if (len(bookings) > 0){
|
||||
for i := len( bookings) - 1 ; i >= 0 ; i--{
|
||||
logger.Logger.Debug().Msg("It should start at " + bookings[i].Start.String() + " and it is now " + time.Now().UTC() .String())
|
||||
if (bookings[i].Start.Before(time.Now().UTC())){
|
||||
logger.Logger.Info().Msg("Will execute " + bookings[i].Workflow + " soon")
|
||||
go em.executeBooking(bookings[i])
|
||||
bookings = append(bookings[:i], bookings[i+1:]...)
|
||||
em.bookings.Bookings = bookings
|
||||
for execId, exec := range orderedExec[i] {
|
||||
if i == 0 && em.isAStartingExecutionBeforeEnd(&exec) { // BEST EFFORT exception
|
||||
continue
|
||||
}
|
||||
if exec.ExecDate.Before(time.Now().UTC()) {
|
||||
logger.Info().Msg("Will execute " + execId + " soon")
|
||||
go em.executeExecution(&exec)
|
||||
delete(executions, execId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
em.bookings.Mu.Unlock()
|
||||
Executions.Mu.Unlock()
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
|
||||
func (em *ExecutionManager) executeBooking(booking models.Booking){
|
||||
|
||||
|
||||
// start execution
|
||||
// create the yaml that describes the pod : filename, path/url to Loki
|
||||
|
||||
|
||||
exec_method := os.Getenv("MONITOR_METHOD")
|
||||
|
||||
if exec_method == "local"{
|
||||
logger.Logger.Debug().Msg("Executing oc-monitor localy")
|
||||
monitor := LocalMonitor{LokiURL: conf.GetConfig().LokiUrl,KubeURL: "localhost",WorkflowName: booking.Workflow,}
|
||||
monitor.LaunchLocalMonitor()
|
||||
}else{
|
||||
logger.Logger.Error().Msg("TODO : executing oc-monitor in a k8s")
|
||||
func (em *ExecutionManager) isAStartingExecutionBeforeEnd(execution *workflow_execution.WorkflowExecution) bool {
|
||||
access := workflow_execution.NewAccessor(nil)
|
||||
l, _, err := access.Search(&dbs.Filters{
|
||||
And: map[string][]dbs.Filter{
|
||||
"execution_date": {{Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(*execution.EndDate)}},
|
||||
"state": {{Operator: dbs.EQUAL.String(), Value: enum.SCHEDULED}},
|
||||
}, // TODO later should refine on each endpoint
|
||||
}, "", false)
|
||||
if err != nil && len(l) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (em *ExecutionManager) executeExecution(execution *workflow_execution.WorkflowExecution) {
|
||||
// start execution
|
||||
// create the yaml that describes the pod : filename, path/url to Loki
|
||||
var executor Executor
|
||||
// exec_method := os.Getenv("MONITOR_METHOD")
|
||||
logger := oclib.GetLogger()
|
||||
|
||||
duration := 0
|
||||
if execution.EndDate != nil {
|
||||
duration = int(execution.EndDate.Sub(execution.ExecDate).Seconds())
|
||||
}
|
||||
|
||||
executor = NewContainerMonitor(execution.UUID, execution.CreatorID, duration)
|
||||
|
||||
if executor == nil {
|
||||
logger.Fatal().Msg("Could not create executor")
|
||||
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
|
||||
"state": enum.FAILURE.EnumIndex(),
|
||||
}, execution.GetID())
|
||||
return
|
||||
}
|
||||
|
||||
args := executor.PrepareMonitorExec()
|
||||
executor.LaunchMonitor(args, execution.GetID(), logger)
|
||||
}
|
||||
|
||||
21
daemons/interface.go
Normal file
21
daemons/interface.go
Normal file
@@ -0,0 +1,21 @@
|
||||
package daemons
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
type Executor interface {
|
||||
PrepareMonitorExec() []string
|
||||
LaunchMonitor(args []string, execID string, l zerolog.Logger)
|
||||
}
|
||||
|
||||
func logExecution(reader io.ReadCloser, l zerolog.Logger) {
|
||||
scanner := bufio.NewScanner(reader)
|
||||
for scanner.Scan() {
|
||||
output := scanner.Text()
|
||||
l.Debug().Msg(output)
|
||||
}
|
||||
}
|
||||
@@ -3,152 +3,135 @@ package daemons
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"oc-scheduler/logger"
|
||||
"oc-scheduler/models"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/dbs"
|
||||
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
||||
"cloud.o-forge.io/core/oc-lib/models/execution_verification"
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources/native_tools"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||
"github.com/nats-io/nats.go"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/rs/zerolog"
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
)
|
||||
|
||||
type ScheduledExecution struct {
|
||||
Execs map[string]workflow_execution.WorkflowExecution
|
||||
Mu sync.Mutex
|
||||
}
|
||||
|
||||
func (sb *ScheduledExecution) DeleteSchedules(resp tools.NATSResponse) {
|
||||
var m map[string]string
|
||||
json.Unmarshal(resp.Payload, &m)
|
||||
Executions.Mu.Lock()
|
||||
defer Executions.Mu.Unlock()
|
||||
delete(sb.Execs, m["id"])
|
||||
}
|
||||
|
||||
func (sb *ScheduledExecution) AddSchedules(new_executions []*workflow_execution.WorkflowExecution, logger zerolog.Logger) {
|
||||
Executions.Mu.Lock()
|
||||
defer Executions.Mu.Unlock()
|
||||
for _, exec := range new_executions {
|
||||
fmt.Println("Adding "+exec.UUID, !sb.execIsSet(exec))
|
||||
if !sb.execIsSet(exec) {
|
||||
sb.Execs[exec.UUID] = *exec
|
||||
oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).UpdateOne(map[string]interface{}{
|
||||
"state": enum.SCHEDULED.EnumIndex(),
|
||||
}, exec.GetID())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sb *ScheduledExecution) execIsSet(exec *workflow_execution.WorkflowExecution) bool {
|
||||
if _, ok := sb.Execs[exec.UUID]; ok {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
|
||||
}
|
||||
|
||||
// NATS daemon listens to subject " workflowsUpdate "
|
||||
// workflowsUpdate messages must be formatted following this pattern '{"workflow" : "", "start_date" : "", "stop_date" : "" }'
|
||||
|
||||
|
||||
type ScheduleManager struct {
|
||||
Api_url string
|
||||
bookings *models.ScheduledBooking
|
||||
ws models.HttpQuery
|
||||
|
||||
}
|
||||
|
||||
func (s *ScheduleManager) SetBookings(b *models.ScheduledBooking){
|
||||
s.bookings = b
|
||||
}
|
||||
|
||||
// Goroutine listening to a NATS server for updates
|
||||
// on workflows' scheduling. Messages must contain
|
||||
// workflow execution ID, to allow retrieval of execution infos
|
||||
func (s *ScheduleManager) ListenForWorkflowSubmissions(){
|
||||
|
||||
if(s.bookings == nil){
|
||||
logger.Logger.Error().Msg("booking has not been set in the schedule manager")
|
||||
}
|
||||
|
||||
nc, err := nats.Connect(nats.DefaultURL)
|
||||
if err != nil {
|
||||
logger.Logger.Error().Msg("Could not connect to NATS")
|
||||
return
|
||||
}
|
||||
|
||||
defer nc.Close()
|
||||
|
||||
ch := make(chan *nats.Msg, 64)
|
||||
|
||||
subs , err := nc.ChanSubscribe("workflowsUpdate", ch)
|
||||
if err != nil {
|
||||
logger.Logger.Error().Msg("Error listening to NATS")
|
||||
}
|
||||
defer subs.Unsubscribe()
|
||||
|
||||
for msg := range(ch){
|
||||
fmt.Println("Waiting...")
|
||||
|
||||
map_mess := retrieveMapFromSub(msg.Data)
|
||||
|
||||
s.bookings.Mu.Lock()
|
||||
|
||||
wf_exec := getWorkflowExecution(map_mess["workflow"])
|
||||
|
||||
s.bookings.AddSchedule(models.Booking{Workflow: map_mess["workflow"], Start: *wf_exec.ExecDate, Stop: *wf_exec.EndDate })
|
||||
s.bookings.Mu.Unlock()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
func getWorkflowExecution(exec_id string) *workflow_execution.WorkflowExecution {
|
||||
|
||||
res := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION),exec_id)
|
||||
|
||||
if res.Code != 200 {
|
||||
logger.Logger.Error().Msg("Could not retrieve workflow ID from execution ID " + exec_id)
|
||||
return nil
|
||||
}
|
||||
|
||||
wf_exec := res.ToWorkflowExecution()
|
||||
|
||||
return wf_exec
|
||||
}
|
||||
|
||||
// At the moment very simplistic, but could be useful if we send bigger messages
|
||||
func retrieveMapFromSub(message []byte) (result_map map[string]string) {
|
||||
json.Unmarshal(message, &result_map)
|
||||
return
|
||||
Logger zerolog.Logger
|
||||
}
|
||||
|
||||
// Used at launch of the component to retrieve the next scheduled workflows
|
||||
// and then every X minutes in case some workflows were scheduled before launch
|
||||
func (s *ScheduleManager) SchedulePolling (){
|
||||
var sleep_time float64 = 1
|
||||
for(true){
|
||||
s.getNextScheduledWorkflows(3)
|
||||
|
||||
logger.Logger.Info().Msg("Current list of schedules")
|
||||
fmt.Println(s.bookings.Bookings)
|
||||
|
||||
time.Sleep(time.Minute * time.Duration(sleep_time))
|
||||
func (s *ScheduleManager) SchedulePolling() {
|
||||
var sleep_time float64 = 20
|
||||
for {
|
||||
s.GetNextScheduledWorkflows(tools.NATSResponse{})
|
||||
s.Logger.Info().Msg("Current list of schedules -------> " + fmt.Sprintf("%v", len(Executions.Execs)))
|
||||
time.Sleep(time.Second * time.Duration(sleep_time))
|
||||
}
|
||||
}
|
||||
func (s *ScheduleManager) getWorfklowExecution(from time.Time, to time.Time) (exec_list []workflow_execution.WorkflowExecution, err error) {
|
||||
|
||||
func (s *ScheduleManager) getExecution(from time.Time, to time.Time) (exec_list []*workflow_execution.WorkflowExecution, err error) {
|
||||
fmt.Printf("Getting workflows execution from %s to %s \n", from.String(), to.String())
|
||||
f := dbs.Filters{
|
||||
And: map[string][]dbs.Filter{
|
||||
"execution_date" : {{Operator : dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(from)}, {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(to)}},
|
||||
"state": {{Operator: dbs.EQUAL.String(), Value: 1}},
|
||||
"execution_date": {{Operator: dbs.GTE.String(), Value: primitive.NewDateTimeFromTime(from)}, {Operator: dbs.LTE.String(), Value: primitive.NewDateTimeFromTime(to)}},
|
||||
"state": {{Operator: dbs.EQUAL.String(), Value: enum.SCHEDULED}},
|
||||
},
|
||||
}
|
||||
res := oclib.Search(&f,"",oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION))
|
||||
res := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), "", "", []string{}, nil).Search(&f, "", false)
|
||||
if res.Code != 200 {
|
||||
logger.Logger.Error().Msg("Error loading")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
for _, exec := range(res.Data){
|
||||
lib_data := oclib.LoadOne(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION),exec.GetID())
|
||||
exec_obj := lib_data.ToWorkflowExecution()
|
||||
exec_list = append(exec_list, *exec_obj)
|
||||
}
|
||||
return exec_list, nil
|
||||
}
|
||||
|
||||
// TODO : refactor to implement oclib.Search
|
||||
func (s *ScheduleManager) getNextScheduledWorkflows(minutes float64) {
|
||||
start := time.Now().UTC()
|
||||
end := start.Add(time.Minute * time.Duration(minutes)).UTC()
|
||||
|
||||
fmt.Printf("Getting workflows execution from %s to %s \n", start.String(), end.String())
|
||||
|
||||
next_wf_exec, err := s.getWorfklowExecution(start,end)
|
||||
if err != nil {
|
||||
logger.Logger.Error().Msg("Could not retrieve next schedules")
|
||||
s.Logger.Error().Msg("Error loading " + res.Err)
|
||||
return
|
||||
}
|
||||
for _, exec := range res.Data {
|
||||
exec_list = append(exec_list, exec.(*workflow_execution.WorkflowExecution))
|
||||
}
|
||||
fmt.Println("Found "+fmt.Sprintf("%v", len(exec_list))+" workflows", res)
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
s.bookings.Mu.Lock()
|
||||
defer s.bookings.Mu.Unlock()
|
||||
|
||||
for _, exec := range(next_wf_exec){
|
||||
exec_start := exec.ExecDate
|
||||
exec_stop := exec.EndDate
|
||||
|
||||
s.bookings.AddSchedule(models.Booking{Workflow: exec.UUID, Start: *exec_start, Stop: *exec_stop})
|
||||
func (s *ScheduleManager) ExecuteWorkflow(resp tools.NATSResponse) {
|
||||
var event native_tools.WorkflowEventParams
|
||||
json.Unmarshal(resp.Payload, &event)
|
||||
if event.ManualCheck {
|
||||
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.EXECUTION_VERIFICATION), nil)
|
||||
exec := &execution_verification.ExecutionVerification{
|
||||
WorkflowID: event.WorkflowResourceID,
|
||||
Payload: event.Payload,
|
||||
}
|
||||
access.StoreOne(exec.Serialize(exec))
|
||||
} else {
|
||||
access := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW), nil)
|
||||
if d := access.LoadOne(fmt.Sprintf("%v", event.WorkflowResourceID)); d.Err == "" {
|
||||
eventExec := &workflow_execution.WorkflowExecution{
|
||||
WorkflowID: d.Data.GetID(),
|
||||
ExecDate: time.Now(),
|
||||
ExecutionsID: uuid.New().String(),
|
||||
State: enum.SCHEDULED,
|
||||
}
|
||||
exec := oclib.NewRequestAdmin(oclib.LibDataEnum(oclib.WORKFLOW_EXECUTION), nil).StoreOne(eventExec.Serialize(eventExec))
|
||||
if execc := exec.ToWorkflowExecution(); execc != nil {
|
||||
Executions.AddSchedules([]*workflow_execution.WorkflowExecution{execc}, s.Logger)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (s *ScheduleManager) GetNextScheduledWorkflows(_ tools.NATSResponse) {
|
||||
start := time.Now().UTC()
|
||||
fmt.Println(s.getExecution(
|
||||
start.Add(time.Second*time.Duration(-1)).UTC(),
|
||||
start.Add(time.Minute*time.Duration(1)).UTC(),
|
||||
))
|
||||
if next_wf_exec, err := s.getExecution(
|
||||
start.Add(time.Second*time.Duration(-1)).UTC(),
|
||||
start.Add(time.Minute*time.Duration(1)).UTC(),
|
||||
); err != nil {
|
||||
s.Logger.Error().Msg("Could not retrieve next schedules")
|
||||
} else {
|
||||
Executions.AddSchedules(next_wf_exec, s.Logger)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,23 +1,21 @@
|
||||
version: '3.4'
|
||||
|
||||
services:
|
||||
nats:
|
||||
image: 'nats:latest'
|
||||
container_name: nats
|
||||
oc-schedulerd:
|
||||
env_file:
|
||||
- path: ./env.env
|
||||
required: false
|
||||
environment:
|
||||
- MONGO_DATABASE=DC_myDC
|
||||
- KUBE_CA=${KUBE_CA:-LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJWUxWNkFPQkdrU1F3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOekl6TVRFeU1ETTJNQjRYRFRJME1EZ3dPREV3TVRNMU5sb1hEVEkxTURndwpPREV3TVRNMU5sb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJGQ2Q1MFdPeWdlQ2syQzcKV2FrOWY4MVAvSkJieVRIajRWOXBsTEo0ck5HeHFtSjJOb2xROFYxdUx5RjBtOTQ2Nkc0RmRDQ2dqaXFVSk92Swp3NVRPNnd5alNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFJkOFI5cXVWK2pjeUVmL0ovT1hQSzMyS09XekFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQTArbThqTDBJVldvUTZ0dnB4cFo4NVlMalF1SmpwdXM0aDdnSXRxS3NmUVVDSUI2M2ZNdzFBMm5OVWU1TgpIUGZOcEQwSEtwcVN0Wnk4djIyVzliYlJUNklZCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTWpNeE1USXdNell3SGhjTk1qUXdPREE0TVRBeE16VTJXaGNOTXpRd09EQTJNVEF4TXpVMgpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTWpNeE1USXdNell3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFRc3hXWk9pbnIrcVp4TmFEQjVGMGsvTDF5cE01VHAxOFRaeU92ektJazQKRTFsZWVqUm9STW0zNmhPeVljbnN3d3JoNnhSUnBpMW5RdGhyMzg0S0Z6MlBvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBYZkVmYXJsZm8zTWhIL3lmemx6Cnl0OWlqbHN3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUxJL2dNYnNMT3MvUUpJa3U2WHVpRVMwTEE2cEJHMXgKcnBlTnpGdlZOekZsQWlFQW1wdjBubjZqN3M0MVI0QzFNMEpSL0djNE53MHdldlFmZWdEVGF1R2p3cFk9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K}
|
||||
- KUBE_TOKEN=${KUBE_TOKEN:-LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSU5ZS1BFb1dhd1NKUzJlRW5oWmlYMk5VZlY1ZlhKV2krSVNnV09TNFE5VTlvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFVUozblJZN0tCNEtUWUx0WnFUMS96VS84a0Z2Sk1lUGhYMm1Vc25pczBiR3FZblkyaVZEeApYVzR2SVhTYjNqcm9iZ1YwSUtDT0twUWs2OHJEbE03ckRBPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo=}
|
||||
image: 'oc-schedulerd:latest'
|
||||
ports:
|
||||
- 4222:4222
|
||||
command:
|
||||
- "--debug"
|
||||
- 9001:8080
|
||||
container_name: oc-schedulerd
|
||||
networks:
|
||||
- scheduler
|
||||
loki:
|
||||
image: 'grafana/loki'
|
||||
container_name: loki
|
||||
ports :
|
||||
- "3100:3100"
|
||||
networks:
|
||||
- scheduler
|
||||
- oc
|
||||
|
||||
networks:
|
||||
scheduler:
|
||||
oc:
|
||||
external: true
|
||||
11
docker_schedulerd.json
Normal file
11
docker_schedulerd.json
Normal file
@@ -0,0 +1,11 @@
|
||||
{
|
||||
"LOKI_URL" : "http://loki:3100",
|
||||
"MONGO_URL":"mongodb://mongo:27017/",
|
||||
"NATS_URL":"nats://nats:4222",
|
||||
"MONGO_DATABASE":"DC_myDC",
|
||||
"MONITORD_PATH": "oc-monitord",
|
||||
"MODE": "kubernetes",
|
||||
"KUBE_CA": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTkRNMk56UTNPRGt3SGhjTk1qVXdOREF6TVRBd05qSTVXaGNOTXpVd05EQXhNVEF3TmpJNQpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTkRNMk56UTNPRGt3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFUL1NDWEMycjFTWGdza0FvTGJKSEtIem4zQXYva2t0ZElpSk42WlBsWVEKY3p0dXV5K3JBMHJ5VUlkZnIyK3VCRS9VN0NjSlhPL004QVdyODFwVklzVmdvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVVFHOVBQQ0g0c1lMbFkvQk5CdnN5CklEam1PK0l3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUtJeFc4NERQTW1URXVVN0Z3ek44SFB6ZHdldWh6U20KVzNYMU9tczFSQVNRQWlFQXI4UTJZSGtNQndSOThhcWtTa2JqU1dhejg0OEY2VkZLWjFacXpNbDFZaTg9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K",
|
||||
"KUBE_CERT": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJZWFxQUp2bHhmYzh3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOelF6TmpjME56ZzVNQjRYRFRJMU1EUXdNekV3TURZeU9Wb1hEVEkyTURRdwpNekV3TURZeU9Wb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJJelpGSlJUVHJmYXlNNFoKTjlRclN4MC9wbDdoZGdvWFM5bGEydmFFRkhlYVFaalRML2NZd1dMUnhoOWVOa01SRDZjTk4reWZkSXE2aWo1SQo5RTlENGdLalNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFFzUkZXUlNweDV0RGZnZDh1UTdweUw0ZERMVEFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQStXZTlBVXJRUm5pWjVCUERELzJwWjA3TzFQWWFIc01ycTZZcVB4VlV5cGdDSUhrRE8rcVlMYUhkUEhXZgpWUGszNXJmejM0Qk4xN2VyaEVxRjF0U0c1MWFqCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTkRNMk56UTNPRGt3SGhjTk1qVXdOREF6TVRBd05qSTVXaGNOTXpVd05EQXhNVEF3TmpJNQpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTkRNMk56UTNPRGt3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFUNDF1NXIzM0JyenZ3ZXZaWHM2TEg3T1k4NGhOOGRrODdnTlhaUndBdWkKdXJBaU45TFdYcmYxeFoyaXp5d0FiVGk1ZVc2Q1hIMjhDdEVSWUlrcjNoTXdvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBMRVJWa1VxY2ViUTM0SGZMa082CmNpK0hReTB3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUloQUpLWGZLdXBzdklONEtQVW50c1lPNXhiaGhSQmhSYlIKN3JyeWs2VHpZMU5JQWlBVktKWis3UUxzeGFyQktORnI3eTVYYlNGanI3Y1gyQmhOYy9wdnFLcWtFUT09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K",
|
||||
"KUBE_DATA": "LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSUVJd01wVjdzMHc2S0VTQ2FBWDhvSVZPUHloa2U0Q3duNWZQZnhOaUYyM3JvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFak5rVWxGTk90OXJJemhrMzFDdExIVCttWHVGMkNoZEwyVnJhOW9RVWQ1cEJtTk12OXhqQgpZdEhHSDE0MlF4RVBwdzAzN0o5MGlycUtQa2owVDBQaUFnPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo="
|
||||
}
|
||||
@@ -3,7 +3,7 @@ package "main" {
|
||||
class Graph {
|
||||
[]DataModel Datas
|
||||
[]ComputingModel Computings
|
||||
[]DatacenterModel Datacenters
|
||||
[]ComputeModel Computes
|
||||
[]StorageModel Storages
|
||||
map[string, Link] Links
|
||||
HttpQuery ws
|
||||
@@ -13,7 +13,7 @@ package "main" {
|
||||
GetWorkflowComponents(workflow string)
|
||||
GetLinks(workflow string)
|
||||
AddDataModel(id string, user_input gjson.Result, wf_id string) error
|
||||
AddDatacenterModel(id string, user_input gjson.Result, wf_id string) error
|
||||
AddComputeModel(id string, user_input gjson.Result, wf_id string) error
|
||||
AddComputingModel(id string, user_input gjson.Result, wf_id string) error
|
||||
AddStorageModel(id string, user_input gjson.Result, wf_id string) error
|
||||
ExportToArgo(id string) error
|
||||
|
||||
@@ -4,14 +4,14 @@ package "main" {
|
||||
[]Link Links
|
||||
[]DataModel Datas
|
||||
[]ComputingModel Computings
|
||||
[]DatacenterModel Datacenters
|
||||
[]ComputeModel Computes
|
||||
[]StorageModel Storages
|
||||
HttpQuery ws
|
||||
|
||||
GetGraphList(apiurl string) (map[string]string, error)
|
||||
LoadFrom(workspace string) error
|
||||
AddDataModel(id string) error
|
||||
AddDatacenterModel(id string) error
|
||||
AddComputeModel(id string) error
|
||||
AddComputingModel(id string) error
|
||||
AddStorageModel(id string) error
|
||||
ExportToArgo(id string) error
|
||||
|
||||
@@ -1,14 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"oc-scheduler/daemons"
|
||||
"oc-scheduler/models"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCreateManifest(t *testing.T){
|
||||
em := daemons.ExecutionManager{}
|
||||
em.CreateManifest(models.Booking{},"fessity-chlics_23_07_2024_154326")
|
||||
|
||||
|
||||
}
|
||||
126
go.mod
126
go.mod
@@ -1,100 +1,84 @@
|
||||
module oc-scheduler
|
||||
module oc-schedulerd
|
||||
|
||||
go 1.22.0
|
||||
|
||||
toolchain go1.22.5
|
||||
go 1.25.0
|
||||
|
||||
require (
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20260312104524-e28b79ac0d62
|
||||
github.com/beego/beego v1.12.12
|
||||
github.com/beego/beego/v2 v2.2.2
|
||||
github.com/google/uuid v1.6.0
|
||||
github.com/goraz/onion v0.1.3
|
||||
github.com/nats-io/nats.go v1.9.1
|
||||
github.com/nwtgck/go-fakelish v0.1.3
|
||||
github.com/rs/zerolog v1.33.0
|
||||
github.com/tidwall/gjson v1.17.1
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
k8s.io/client-go v0.30.3
|
||||
github.com/rs/zerolog v1.34.0
|
||||
go.mongodb.org/mongo-driver v1.17.4
|
||||
k8s.io/api v0.35.1
|
||||
k8s.io/apimachinery v0.35.1
|
||||
k8s.io/client-go v0.35.1
|
||||
)
|
||||
|
||||
require (
|
||||
cloud.o-forge.io/core/oc-lib v0.0.0-20240812075555-6e3069068ce4 // indirect
|
||||
github.com/Klathmon/StructToMap v0.0.0-20140724123129-3d0229e2dce7 // indirect
|
||||
github.com/antihax/optional v1.0.0 // indirect
|
||||
github.com/aws/aws-sdk-go v1.36.29 // indirect
|
||||
github.com/beego/beego/v2 v2.3.8 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/biter777/countries v1.7.5 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.5 // indirect
|
||||
github.com/go-logr/logr v1.4.1 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.19.6 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
|
||||
github.com/emicklei/go-restful/v3 v3.12.2 // indirect
|
||||
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
|
||||
github.com/gabriel-vasile/mimetype v1.4.9 // indirect
|
||||
github.com/go-logr/logr v1.4.3 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.21.0 // indirect
|
||||
github.com/go-openapi/jsonreference v0.20.2 // indirect
|
||||
github.com/go-openapi/swag v0.22.3 // indirect
|
||||
github.com/go-openapi/swag v0.23.0 // indirect
|
||||
github.com/go-playground/locales v0.14.1 // indirect
|
||||
github.com/go-playground/universal-translator v0.18.1 // indirect
|
||||
github.com/go-playground/validator/v10 v10.22.0 // indirect
|
||||
github.com/go-stack/stack v1.8.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/protobuf v1.5.4 // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/google/gnostic-models v0.6.8 // indirect
|
||||
github.com/google/gofuzz v1.2.0 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
||||
github.com/imdario/mergo v0.3.8 // indirect
|
||||
github.com/jmespath/go-jmespath v0.4.0 // indirect
|
||||
github.com/go-playground/validator/v10 v10.27.0 // indirect
|
||||
github.com/golang/snappy v1.0.0 // indirect
|
||||
github.com/google/gnostic-models v0.7.0 // indirect
|
||||
github.com/hashicorp/golang-lru v1.0.2 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/klauspost/compress v1.17.9 // indirect
|
||||
github.com/klauspost/compress v1.18.0 // indirect
|
||||
github.com/leodido/go-urn v1.4.0 // indirect
|
||||
github.com/libp2p/go-libp2p/core v0.43.0-rc2 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-colorable v0.1.14 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
|
||||
github.com/montanaflynn/stats v0.7.1 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/nats-io/jwt v0.3.2 // indirect
|
||||
github.com/nats-io/nkeys v0.1.3 // indirect
|
||||
github.com/nats-io/nats.go v1.44.0 // indirect
|
||||
github.com/nats-io/nkeys v0.4.11 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/prometheus/client_golang v1.19.0 // indirect
|
||||
github.com/prometheus/client_model v0.5.0 // indirect
|
||||
github.com/prometheus/common v0.48.0 // indirect
|
||||
github.com/prometheus/procfs v0.12.0 // indirect
|
||||
github.com/robfig/cron/v3 v3.0.1 // indirect
|
||||
github.com/shiena/ansicolor v0.0.0-20200904210342-c7312218db18 // indirect
|
||||
github.com/spf13/pflag v1.0.5 // indirect
|
||||
github.com/tidwall/match v1.1.1 // indirect
|
||||
github.com/tidwall/pretty v1.2.0 // indirect
|
||||
github.com/ugorji/go/codec v1.1.7 // indirect
|
||||
github.com/vk496/cron v1.2.0 // indirect
|
||||
github.com/prometheus/client_golang v1.23.0 // indirect
|
||||
github.com/prometheus/client_model v0.6.2 // indirect
|
||||
github.com/prometheus/common v0.65.0 // indirect
|
||||
github.com/prometheus/procfs v0.17.0 // indirect
|
||||
github.com/shiena/ansicolor v0.0.0-20230509054315-a9deabde6e02 // indirect
|
||||
github.com/x448/float16 v0.8.4 // indirect
|
||||
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
|
||||
github.com/xdg-go/scram v1.1.2 // indirect
|
||||
github.com/xdg-go/stringprep v1.0.4 // indirect
|
||||
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
|
||||
github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc // indirect
|
||||
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
|
||||
go.mongodb.org/mongo-driver v1.16.1 // indirect
|
||||
golang.org/x/crypto v0.26.0 // indirect
|
||||
golang.org/x/net v0.28.0 // indirect
|
||||
golang.org/x/oauth2 v0.16.0 // indirect
|
||||
golang.org/x/sync v0.8.0 // indirect
|
||||
golang.org/x/sys v0.24.0 // indirect
|
||||
golang.org/x/term v0.23.0 // indirect
|
||||
golang.org/x/text v0.17.0 // indirect
|
||||
golang.org/x/time v0.3.0 // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
google.golang.org/protobuf v1.34.1 // indirect
|
||||
go.yaml.in/yaml/v2 v2.4.3 // indirect
|
||||
go.yaml.in/yaml/v3 v3.0.4 // indirect
|
||||
golang.org/x/crypto v0.44.0 // indirect
|
||||
golang.org/x/net v0.47.0 // indirect
|
||||
golang.org/x/oauth2 v0.30.0 // indirect
|
||||
golang.org/x/sync v0.18.0 // indirect
|
||||
golang.org/x/sys v0.38.0 // indirect
|
||||
golang.org/x/term v0.37.0 // indirect
|
||||
golang.org/x/text v0.31.0 // indirect
|
||||
golang.org/x/time v0.9.0 // indirect
|
||||
google.golang.org/protobuf v1.36.8 // indirect
|
||||
gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
k8s.io/api v0.30.3 // indirect
|
||||
k8s.io/apimachinery v0.30.3 // indirect
|
||||
k8s.io/klog/v2 v2.120.1 // indirect
|
||||
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
|
||||
k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect
|
||||
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
|
||||
sigs.k8s.io/yaml v1.3.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
k8s.io/klog/v2 v2.130.1 // indirect
|
||||
k8s.io/kube-openapi v0.0.0-20250910181357-589584f1c912 // indirect
|
||||
k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 // indirect
|
||||
sigs.k8s.io/json v0.0.0-20250730193827-2d320260d730 // indirect
|
||||
sigs.k8s.io/randfill v1.0.0 // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v6 v6.3.0 // indirect
|
||||
sigs.k8s.io/yaml v1.6.0 // indirect
|
||||
)
|
||||
|
||||
@@ -1,59 +0,0 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Note: the example only works with the code within the same release/branch.
|
||||
package k8s
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"path/filepath"
|
||||
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/client-go/util/homedir"
|
||||
//
|
||||
// Uncomment to load all auth plugins
|
||||
// _ "k8s.io/client-go/plugin/pkg/client/auth"
|
||||
//
|
||||
// Or uncomment to load specific auth plugins
|
||||
// _ "k8s.io/client-go/plugin/pkg/client/auth/azure"
|
||||
// _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
|
||||
// _ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
|
||||
)
|
||||
|
||||
func NewK8SClient() *kubernetes.Clientset {
|
||||
var kubeconfig *string
|
||||
if home := homedir.HomeDir(); home != "" {
|
||||
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
|
||||
} else {
|
||||
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
|
||||
}
|
||||
flag.Parse()
|
||||
|
||||
// use the current context in kubeconfig
|
||||
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
|
||||
// create the clientset
|
||||
clientset, err := kubernetes.NewForConfig(config)
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
|
||||
return clientset
|
||||
}
|
||||
@@ -1,13 +0,0 @@
|
||||
package logger
|
||||
|
||||
import (
|
||||
"cloud.o-forge.io/core/oc-lib/logs"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
var Logger zerolog.Logger
|
||||
|
||||
func init() {
|
||||
logs.SetAppName("oc-scheduler")
|
||||
Logger = logs.CreateLogger("","")
|
||||
}
|
||||
73
main.go
73
main.go
@@ -1,56 +1,53 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
conf "oc-scheduler/conf"
|
||||
"oc-scheduler/models"
|
||||
|
||||
"oc-scheduler/daemons"
|
||||
"oc-schedulerd/conf"
|
||||
"oc-schedulerd/daemons"
|
||||
"os"
|
||||
|
||||
oclib "cloud.o-forge.io/core/oc-lib"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
)
|
||||
|
||||
// var log zerolog.Logger
|
||||
var appname = "oc-schedulerd"
|
||||
|
||||
func main() {
|
||||
var bookings models.ScheduledBooking
|
||||
oclib.InitDaemon(appname)
|
||||
l := oclib.GetLogger()
|
||||
o := oclib.GetConfLoader(appname)
|
||||
|
||||
oclib.SetConfig(conf.GetConfig().MongoUrl,"DC_myDC")
|
||||
oclib.Init("oc-scheduler")
|
||||
conf.GetConfig().KubeHost = o.GetStringDefault("KUBERNETES_SERVICE_HOST", os.Getenv("KUBERNETES_SERVICE_HOST"))
|
||||
conf.GetConfig().KubePort = o.GetStringDefault("KUBERNETES_SERVICE_PORT", "6443")
|
||||
|
||||
app_conf := conf.GetConfig()
|
||||
apiurl := app_conf.OcCatalogUrl
|
||||
conf.GetConfig().KubeCA = o.GetStringDefault("KUBE_CA", os.Getenv("KUBE_CA"))
|
||||
conf.GetConfig().KubeCert = o.GetStringDefault("KUBE_CERT", os.Getenv("KUBE_CERT"))
|
||||
conf.GetConfig().KubeData = o.GetStringDefault("KUBE_DATA", os.Getenv("KUBE_DATA"))
|
||||
|
||||
sch_mngr := daemons.ScheduleManager{Api_url: apiurl}
|
||||
sch_mngr.SetBookings(&bookings)
|
||||
// Test if oc-monitor binary is reachable
|
||||
// For local executions
|
||||
if _, err := os.Stat("../oc-monitord/oc-monitord"); err == nil {
|
||||
conf.GetConfig().MonitorPath = "../oc-monitord/oc-monitord"
|
||||
}
|
||||
// For container executions
|
||||
if _, err := os.Stat("/usr/bin/oc-monitord"); conf.GetConfig().MonitorPath == "" && err == nil {
|
||||
conf.GetConfig().MonitorPath = "/usr/bin/oc-monitord"
|
||||
}
|
||||
|
||||
if conf.GetConfig().MonitorPath == "" {
|
||||
l.Fatal().Msg("Could not find oc-monitord binary")
|
||||
}
|
||||
|
||||
l.Info().Msg("oc-monitord binary at " + conf.GetConfig().MonitorPath)
|
||||
|
||||
sch_mngr := daemons.ScheduleManager{Logger: oclib.GetLogger()}
|
||||
exe_mngr := daemons.ExecutionManager{}
|
||||
exe_mngr.SetBookings(&bookings)
|
||||
|
||||
go sch_mngr.ListenForWorkflowSubmissions()
|
||||
|
||||
go tools.NewNATSCaller().ListenNats(map[tools.NATSMethod]func(tools.NATSResponse){
|
||||
tools.CREATE_EXECUTION: sch_mngr.GetNextScheduledWorkflows, // TODO: unused for now...
|
||||
tools.WORKFLOW_EVENT: sch_mngr.ExecuteWorkflow,
|
||||
tools.REMOVE_EXECUTION: daemons.Executions.DeleteSchedules, // TODO: unused for now...
|
||||
})
|
||||
go sch_mngr.SchedulePolling()
|
||||
exe_mngr.RetrieveNextExecutions()
|
||||
|
||||
|
||||
// method in Schedule manager that checks the first Schedule object for its start date and exe
|
||||
|
||||
// var g Graph
|
||||
|
||||
// list, err := g.GetGraphList(apiurl)
|
||||
// if err != nil {
|
||||
// log.Fatal().Msg("Failed to get the workspaces list, check api url and that api server is up : " + apiurl)
|
||||
// }
|
||||
|
||||
// println("Available workspaces :")
|
||||
// for workspace, _ := range list {
|
||||
// println(workspace)
|
||||
// }
|
||||
|
||||
|
||||
// g.LoadFrom(list["test-alpr"])
|
||||
// g.ExportToArgo("test-alpr")
|
||||
|
||||
fmt.Print("stop")
|
||||
|
||||
}
|
||||
|
||||
@@ -4,12 +4,12 @@ metadata:
|
||||
name: test-monitor
|
||||
spec:
|
||||
containers:
|
||||
- name: "oc-monitor-quity-anetran"
|
||||
- name: "oc-workflow-prous-skintris"
|
||||
image: docker.io/library/oc-monitor # Currently uses the local contenaird
|
||||
imagePullPolicy: IfNotPresent # This should be removed once a registry has been set up
|
||||
env:
|
||||
- name: "OCMONITOR_ARGOFILE"
|
||||
value: "quity-anetran_29_07_2024_144136.yml"
|
||||
value: "prous-skintris_29_07_2024_164008.yml"
|
||||
- name: "OCMONITOR_LOKIURL"
|
||||
value: "info" # !!!! In dev this must be replaced with the address of one of your interface (wifi, ethernet..)
|
||||
restartPolicy: OnFailure
|
||||
|
||||
@@ -1,53 +0,0 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/cookiejar"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
type HttpQuery struct {
|
||||
baseurl string
|
||||
jar http.CookieJar
|
||||
Cookies map[string]string
|
||||
}
|
||||
|
||||
func (h *HttpQuery) Init(url string) {
|
||||
h.baseurl = url
|
||||
h.jar, _ = cookiejar.New(nil)
|
||||
h.Cookies = make(map[string]string)
|
||||
}
|
||||
|
||||
func (h *HttpQuery) Get(url string) ([]byte, error) {
|
||||
client := &http.Client{Jar: h.jar}
|
||||
resp, err := client.Get(h.baseurl + url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// store received cookies
|
||||
for _, cookie := range h.jar.Cookies(resp.Request.URL) {
|
||||
h.Cookies[cookie.Name] = cookie.Value
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return body, nil
|
||||
}
|
||||
|
||||
func (h *HttpQuery) Post(url string, data url.Values) (*http.Response, error) {
|
||||
client := &http.Client{Jar: h.jar}
|
||||
resp, err := client.PostForm(h.baseurl+url, data)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// store received cookies
|
||||
for _, cookie := range h.jar.Cookies(resp.Request.URL) {
|
||||
h.Cookies[cookie.Name] = cookie.Value
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
@@ -1,71 +0,0 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"oc-scheduler/logger"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Is duration really important ?
|
||||
|
||||
type Booking struct {
|
||||
Start time.Time
|
||||
Stop time.Time
|
||||
Duration uint
|
||||
Workflow string
|
||||
}
|
||||
|
||||
type ScheduledBooking struct {
|
||||
Bookings []Booking
|
||||
Mu sync.Mutex
|
||||
}
|
||||
|
||||
func (s Booking) Equals(other Booking) bool {
|
||||
return s.Workflow == other.Workflow && s.Start == other.Start && s.Stop == other.Stop
|
||||
}
|
||||
|
||||
func (sb *ScheduledBooking) AddSchedule(new_booking Booking){
|
||||
if(!sb.scheduleAlreadyExists(new_booking)){
|
||||
sb.Bookings = append(sb.Bookings,new_booking)
|
||||
logger.Logger.Info().Msg("Updated list schedules : \n " + sb.String())
|
||||
} else {
|
||||
// Debug condition : delete once this feature is ready to be implemented
|
||||
logger.Logger.Debug().Msg("Workflow received not added")
|
||||
logger.Logger.Debug().Msg("current schedule contains")
|
||||
for _, booking := range(sb.Bookings){
|
||||
logger.Logger.Debug().Msg(booking.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sb *ScheduledBooking) GetListNames()(list_names []string ){
|
||||
for _, schedule := range(sb.Bookings){
|
||||
list_names = append(list_names, schedule.Workflow)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (sb *ScheduledBooking) scheduleAlreadyExists(new_booking Booking) bool {
|
||||
for _, booking := range(sb.Bookings){
|
||||
if booking.Equals(new_booking){
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (b *Booking) String() string {
|
||||
return fmt.Sprintf("{workflow : %s , start_date : %s , stop_date : %s }", b.Workflow, b.Start.Format(time.RFC3339), b.Stop.Format(time.RFC3339))
|
||||
|
||||
}
|
||||
|
||||
func (sb *ScheduledBooking) String() string {
|
||||
var str string
|
||||
for _, booking := range(sb.Bookings){
|
||||
str += fmt.Sprintf("%s\n", booking.String())
|
||||
}
|
||||
|
||||
return str
|
||||
}
|
||||
@@ -1,40 +0,0 @@
|
||||
package models
|
||||
|
||||
type Parameter struct {
|
||||
Name string `yaml:"name,omitempty"`
|
||||
Value string `yaml:"value,omitempty"`
|
||||
}
|
||||
|
||||
type Container struct {
|
||||
Image string `yaml:"image"`
|
||||
Command []string `yaml:"command,omitempty,flow"`
|
||||
Args []string `yaml:"args,omitempty,flow"`
|
||||
VolumeMounts []VolumeMount `yaml:"volumeMounts,omitempty"`
|
||||
}
|
||||
|
||||
type VolumeMount struct {
|
||||
Name string `yaml:"name"`
|
||||
MountPath string `yaml:"mountPath"`
|
||||
}
|
||||
|
||||
type Task struct {
|
||||
Name string `yaml:"name"`
|
||||
Template string `yaml:"template"`
|
||||
Dependencies []string `yaml:"dependencies,omitempty"`
|
||||
Arguments struct {
|
||||
Parameters []Parameter `yaml:"parameters,omitempty"`
|
||||
} `yaml:"arguments,omitempty"`
|
||||
}
|
||||
|
||||
type Dag struct {
|
||||
Tasks []Task `yaml:"tasks,omitempty"`
|
||||
}
|
||||
|
||||
type Template struct {
|
||||
Name string `yaml:"name"`
|
||||
Inputs struct {
|
||||
Parameters []Parameter `yaml:"parameters"`
|
||||
} `yaml:"inputs,omitempty"`
|
||||
Container Container `yaml:"container,omitempty"`
|
||||
Dag Dag `yaml:"dag,omitempty"`
|
||||
}
|
||||
@@ -1,19 +0,0 @@
|
||||
package models
|
||||
|
||||
|
||||
|
||||
type VolumeClaimTemplate struct {
|
||||
Metadata struct {
|
||||
Name string `yaml:"name"`
|
||||
} `yaml:"metadata"`
|
||||
Spec VolumeSpec `yaml:"spec"`
|
||||
}
|
||||
|
||||
type VolumeSpec struct {
|
||||
AccessModes []string `yaml:"accessModes,flow"`
|
||||
Resources struct {
|
||||
Requests struct {
|
||||
Storage string `yaml:"storage"`
|
||||
} `yaml:"requests"`
|
||||
} `yaml:"resources"`
|
||||
}
|
||||
BIN
oc-scheduler
BIN
oc-scheduler
Binary file not shown.
12
schedulerd.json
Normal file
12
schedulerd.json
Normal file
@@ -0,0 +1,12 @@
|
||||
{
|
||||
"LOKI_URL" : "http://172.16.0.181:3100",
|
||||
"MONGO_URL":"mongodb://172.16.0.181:27017/",
|
||||
"NATS_URL":"nats://172.16.0.181:4222",
|
||||
"MONGO_DATABASE":"DC_myDC",
|
||||
"MONITORD_PATH": "../oc-monitord/oc-monitord",
|
||||
"KUBERNETES_SERVICE_HOST" : "172.16.0.181",
|
||||
"MODE": "container",
|
||||
"KUBE_CA": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJlRENDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdGMyVnkKZG1WeUxXTmhRREUzTkRNMk56UTNPRGt3SGhjTk1qVXdOREF6TVRBd05qSTVXaGNOTXpVd05EQXhNVEF3TmpJNQpXakFqTVNFd0h3WURWUVFEREJock0zTXRjMlZ5ZG1WeUxXTmhRREUzTkRNMk56UTNPRGt3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFUL1NDWEMycjFTWGdza0FvTGJKSEtIem4zQXYva2t0ZElpSk42WlBsWVEKY3p0dXV5K3JBMHJ5VUlkZnIyK3VCRS9VN0NjSlhPL004QVdyODFwVklzVmdvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVVFHOVBQQ0g0c1lMbFkvQk5CdnN5CklEam1PK0l3Q2dZSUtvWkl6ajBFQXdJRFNRQXdSZ0loQUtJeFc4NERQTW1URXVVN0Z3ek44SFB6ZHdldWh6U20KVzNYMU9tczFSQVNRQWlFQXI4UTJZSGtNQndSOThhcWtTa2JqU1dhejg0OEY2VkZLWjFacXpNbDFZaTg9Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K",
|
||||
"KUBE_CERT": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJrVENDQVRlZ0F3SUJBZ0lJZWFxQUp2bHhmYzh3Q2dZSUtvWkl6ajBFQXdJd0l6RWhNQjhHQTFVRUF3d1kKYXpOekxXTnNhV1Z1ZEMxallVQXhOelF6TmpjME56ZzVNQjRYRFRJMU1EUXdNekV3TURZeU9Wb1hEVEkyTURRdwpNekV3TURZeU9Wb3dNREVYTUJVR0ExVUVDaE1PYzNsemRHVnRPbTFoYzNSbGNuTXhGVEFUQmdOVkJBTVRESE41CmMzUmxiVHBoWkcxcGJqQlpNQk1HQnlxR1NNNDlBZ0VHQ0NxR1NNNDlBd0VIQTBJQUJJelpGSlJUVHJmYXlNNFoKTjlRclN4MC9wbDdoZGdvWFM5bGEydmFFRkhlYVFaalRML2NZd1dMUnhoOWVOa01SRDZjTk4reWZkSXE2aWo1SQo5RTlENGdLalNEQkdNQTRHQTFVZER3RUIvd1FFQXdJRm9EQVRCZ05WSFNVRUREQUtCZ2dyQmdFRkJRY0RBakFmCkJnTlZIU01FR0RBV2dCVFFzUkZXUlNweDV0RGZnZDh1UTdweUw0ZERMVEFLQmdncWhrak9QUVFEQWdOSUFEQkYKQWlFQStXZTlBVXJRUm5pWjVCUERELzJwWjA3TzFQWWFIc01ycTZZcVB4VlV5cGdDSUhrRE8rcVlMYUhkUEhXZgpWUGszNXJmejM0Qk4xN2VyaEVxRjF0U0c1MWFqCi0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0KLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUJkekNDQVIyZ0F3SUJBZ0lCQURBS0JnZ3Foa2pPUFFRREFqQWpNU0V3SHdZRFZRUUREQmhyTTNNdFkyeHAKWlc1MExXTmhRREUzTkRNMk56UTNPRGt3SGhjTk1qVXdOREF6TVRBd05qSTVXaGNOTXpVd05EQXhNVEF3TmpJNQpXakFqTVNFd0h3WURWUVFEREJock0zTXRZMnhwWlc1MExXTmhRREUzTkRNMk56UTNPRGt3V1RBVEJnY3Foa2pPClBRSUJCZ2dxaGtqT1BRTUJCd05DQUFUNDF1NXIzM0JyenZ3ZXZaWHM2TEg3T1k4NGhOOGRrODdnTlhaUndBdWkKdXJBaU45TFdYcmYxeFoyaXp5d0FiVGk1ZVc2Q1hIMjhDdEVSWUlrcjNoTXdvMEl3UURBT0JnTlZIUThCQWY4RQpCQU1DQXFRd0R3WURWUjBUQVFIL0JBVXdBd0VCL3pBZEJnTlZIUTRFRmdRVTBMRVJWa1VxY2ViUTM0SGZMa082CmNpK0hReTB3Q2dZSUtvWkl6ajBFQXdJRFNBQXdSUUloQUpLWGZLdXBzdklONEtQVW50c1lPNXhiaGhSQmhSYlIKN3JyeWs2VHpZMU5JQWlBVktKWis3UUxzeGFyQktORnI3eTVYYlNGanI3Y1gyQmhOYy9wdnFLcWtFUT09Ci0tLS0tRU5EIENFUlRJRklDQVRFLS0tLS0K",
|
||||
"KUBE_DATA": "LS0tLS1CRUdJTiBFQyBQUklWQVRFIEtFWS0tLS0tCk1IY0NBUUVFSUVJd01wVjdzMHc2S0VTQ2FBWDhvSVZPUHloa2U0Q3duNWZQZnhOaUYyM3JvQW9HQ0NxR1NNNDkKQXdFSG9VUURRZ0FFak5rVWxGTk90OXJJemhrMzFDdExIVCttWHVGMkNoZEwyVnJhOW9RVWQ1cEJtTk12OXhqQgpZdEhHSDE0MlF4RVBwdzAzN0o5MGlycUtQa2owVDBQaUFnPT0KLS0tLS1FTkQgRUMgUFJJVkFURSBLRVktLS0tLQo="
|
||||
}
|
||||
Reference in New Issue
Block a user