Merge branch 'main' of https://cloud.o-forge.io/core/oc-monitord into feature/admiralty
This commit is contained in:
@@ -6,7 +6,9 @@ package workflow_builder
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"oc-monitord/conf"
|
||||
. "oc-monitord/models"
|
||||
tools2 "oc-monitord/tools"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -59,10 +61,10 @@ type Spec struct {
|
||||
|
||||
// TODO: found on a processing instance linked to storage
|
||||
// add s3, gcs, azure, etc if needed on a link between processing and storage
|
||||
func (b *ArgoBuilder) CreateDAG(write bool) (string, int, []string, []string, error) {
|
||||
func (b *ArgoBuilder) CreateDAG(namespace string, write bool) (string, int, []string, []string, error) {
|
||||
fmt.Println("Creating DAG", b.OriginWorkflow.Graph.Items)
|
||||
// handle services by checking if there is only one processing with hostname and port
|
||||
firstItems, lastItems, volumes := b.createTemplates()
|
||||
firstItems, lastItems, volumes := b.createTemplates(namespace)
|
||||
b.createVolumes(volumes)
|
||||
|
||||
if b.Timeout > 0 {
|
||||
@@ -91,10 +93,10 @@ func (b *ArgoBuilder) CreateDAG(write bool) (string, int, []string, []string, er
|
||||
logger.Error().Msg("Could not write the yaml file")
|
||||
return "", 0, firstItems, lastItems, err
|
||||
}
|
||||
return file_name, len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
|
||||
return workflows_dir + file_name, len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) createTemplates() ([]string, []string, []VolumeMount) {
|
||||
func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []VolumeMount) {
|
||||
volumes := []VolumeMount{}
|
||||
firstItems := []string{}
|
||||
lastItems := []string{}
|
||||
@@ -107,7 +109,7 @@ func (b *ArgoBuilder) createTemplates() ([]string, []string, []VolumeMount) {
|
||||
logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName())
|
||||
return firstItems, lastItems, volumes
|
||||
}
|
||||
volumes, firstItems, lastItems = b.createArgoTemplates(
|
||||
volumes, firstItems, lastItems = b.createArgoTemplates(namespace,
|
||||
item.ID, item.Processing, volumes, firstItems, lastItems)
|
||||
}
|
||||
firstWfTasks := map[string][]string{}
|
||||
@@ -120,7 +122,7 @@ func (b *ArgoBuilder) createTemplates() ([]string, []string, []VolumeMount) {
|
||||
continue
|
||||
}
|
||||
subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Timeout: b.Timeout}
|
||||
_, _, fi, li, err := subBuilder.CreateDAG(false)
|
||||
_, _, fi, li, err := subBuilder.CreateDAG(namespace, false)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Error creating the subworkflow : " + err.Error())
|
||||
continue
|
||||
@@ -177,7 +179,8 @@ func (b *ArgoBuilder) createTemplates() ([]string, []string, []VolumeMount) {
|
||||
return firstItems, lastItems, volumes
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) createArgoTemplates(id string,
|
||||
func (b *ArgoBuilder) createArgoTemplates(namespace string,
|
||||
id string,
|
||||
processing *resources.ProcessingResource,
|
||||
volumes []VolumeMount,
|
||||
firstItems []string,
|
||||
@@ -206,7 +209,40 @@ func (b *ArgoBuilder) createArgoTemplates(id string,
|
||||
}
|
||||
if storage.StorageType == enum.S3 {
|
||||
art.S3 = &Key{
|
||||
Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env),
|
||||
Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env),
|
||||
Insecure: true, // temporary
|
||||
}
|
||||
sel := storage.GetSelectedInstance()
|
||||
if sel != nil {
|
||||
if sel.(*resources.StorageResourceInstance).Credentials != nil {
|
||||
tool, err := tools2.NewService(conf.GetConfig().Mode)
|
||||
if err != nil || tool == nil {
|
||||
logger.Error().Msg("Could not create the access secret")
|
||||
} else {
|
||||
id, err := tool.CreateAccessSecret(namespace,
|
||||
sel.(*resources.StorageResourceInstance).Credentials.Login,
|
||||
sel.(*resources.StorageResourceInstance).Credentials.Pass)
|
||||
if err == nil {
|
||||
art.S3.AccessKeySecret = &Secret{
|
||||
Name: id,
|
||||
Key: "access-key",
|
||||
}
|
||||
art.S3.SecretKeySecret = &Secret{
|
||||
Name: id,
|
||||
Key: "secret-key",
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source+"/", "")
|
||||
art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source, "")
|
||||
splits := strings.Split(art.S3.EndPoint, "/")
|
||||
if len(splits) > 1 {
|
||||
art.S3.Bucket = splits[0]
|
||||
art.S3.EndPoint = strings.Join(splits[1:], "/")
|
||||
} else {
|
||||
art.S3.Bucket = splits[0]
|
||||
}
|
||||
}
|
||||
}
|
||||
if rw.Write {
|
||||
|
||||
Reference in New Issue
Block a user