argo by kube invocation

This commit is contained in:
mr
2025-02-14 12:00:29 +01:00
parent df6e3d5a46
commit 34547e8b2f
11 changed files with 602 additions and 414 deletions

View File

@@ -6,7 +6,9 @@ package workflow_builder
import (
"fmt"
"oc-monitord/conf"
. "oc-monitord/models"
tools2 "oc-monitord/tools"
"os"
"strings"
"time"
@@ -58,10 +60,10 @@ type Spec struct {
// TODO: found on a processing instance linked to storage
// add s3, gcs, azure, etc if needed on a link between processing and storage
func (b *ArgoBuilder) CreateDAG(write bool) (string, int, []string, []string, error) {
func (b *ArgoBuilder) CreateDAG(namespace string, write bool) (string, int, []string, []string, error) {
fmt.Println("Creating DAG", b.OriginWorkflow.Graph.Items)
// handle services by checking if there is only one processing with hostname and port
firstItems, lastItems, volumes := b.createTemplates()
firstItems, lastItems, volumes := b.createTemplates(namespace)
b.createVolumes(volumes)
if b.Timeout > 0 {
@@ -93,7 +95,7 @@ func (b *ArgoBuilder) CreateDAG(write bool) (string, int, []string, []string, er
return file_name, len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
}
func (b *ArgoBuilder) createTemplates() ([]string, []string, []VolumeMount) {
func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []VolumeMount) {
volumes := []VolumeMount{}
firstItems := []string{}
lastItems := []string{}
@@ -106,7 +108,7 @@ func (b *ArgoBuilder) createTemplates() ([]string, []string, []VolumeMount) {
logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName())
return firstItems, lastItems, volumes
}
volumes, firstItems, lastItems = b.createArgoTemplates(
volumes, firstItems, lastItems = b.createArgoTemplates(namespace,
item.ID, item.Processing, volumes, firstItems, lastItems)
}
firstWfTasks := map[string][]string{}
@@ -119,7 +121,7 @@ func (b *ArgoBuilder) createTemplates() ([]string, []string, []VolumeMount) {
continue
}
subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Timeout: b.Timeout}
_, _, fi, li, err := subBuilder.CreateDAG(false)
_, _, fi, li, err := subBuilder.CreateDAG(namespace, false)
if err != nil {
logger.Error().Msg("Error creating the subworkflow : " + err.Error())
continue
@@ -176,7 +178,8 @@ func (b *ArgoBuilder) createTemplates() ([]string, []string, []VolumeMount) {
return firstItems, lastItems, volumes
}
func (b *ArgoBuilder) createArgoTemplates(id string,
func (b *ArgoBuilder) createArgoTemplates(namespace string,
id string,
processing *resources.ProcessingResource,
volumes []VolumeMount,
firstItems []string,
@@ -184,7 +187,7 @@ func (b *ArgoBuilder) createArgoTemplates(id string,
_, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems)
template := &Template{Name: getArgoName(processing.GetName(), id)}
fmt.Println("Creating template for", template.Name)
template.CreateContainer(processing, b.Workflow.getDag())
template.CreateContainer(processing, b.Workflow.getDag(), template.Name)
// get datacenter from the processing
if processing.IsService {
b.CreateService(id, processing)
@@ -204,7 +207,40 @@ func (b *ArgoBuilder) createArgoTemplates(id string,
}
if storage.StorageType == enum.S3 {
art.S3 = &Key{
Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env),
Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env),
Insecure: true, // temporary
}
sel := storage.GetSelectedInstance()
if sel != nil {
if sel.(*resources.StorageResourceInstance).Credentials != nil {
tool, err := tools2.NewService(conf.GetConfig().Mode)
if err != nil || tool == nil {
logger.Error().Msg("Could not create the access secret")
} else {
id, err := tool.CreateAccessSecret(namespace,
sel.(*resources.StorageResourceInstance).Credentials.Login,
sel.(*resources.StorageResourceInstance).Credentials.Pass)
if err == nil {
art.S3.AccessKeySecret = &Secret{
Name: id,
Key: "access-key",
}
art.S3.SecretKeySecret = &Secret{
Name: id,
Key: "secret-key",
}
}
}
}
art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source+"/", "")
art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source, "")
splits := strings.Split(art.S3.EndPoint, "/")
if len(splits) > 1 {
art.S3.Bucket = splits[0]
art.S3.EndPoint = strings.Join(splits[1:], "/")
} else {
art.S3.Bucket = splits[0]
}
}
}
if rw.Write {

View File

@@ -41,7 +41,7 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo
return new_wf, nil
}
func (w *WorflowDB) ExportToArgo(timeout int) (string, int, error) {
func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (string, int, error) {
logger := oclib.GetLogger()
fmt.Println("Exporting to Argo", w.Workflow)
if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil {
@@ -49,7 +49,7 @@ func (w *WorflowDB) ExportToArgo(timeout int) (string, int, error) {
}
argo_builder := ArgoBuilder{OriginWorkflow: w.Workflow, Timeout: timeout}
filename, stepMax, _, _, err := argo_builder.CreateDAG(true)
filename, stepMax, _, _, err := argo_builder.CreateDAG(namespace, true)
if err != nil {
logger.Error().Msg("Could not create the argo file for " + w.Workflow.Name)
return "", 0, err