1 Commits

Author SHA1 Message Date
mr
d529c2f72c CLUSTER NAME in Makefile 2026-01-20 11:17:13 +01:00
8 changed files with 102 additions and 177 deletions

View File

@@ -16,7 +16,7 @@ docker:
docker tag oc-monitord oc/oc-monitord:0.0.1 docker tag oc-monitord oc/oc-monitord:0.0.1
publish-kind: publish-kind:
kind load docker-image oc/oc-monitord:0.0.1 --name opencloud | true kind load docker-image oc/oc-monitord:0.0.1 --name $(CLUSTER_NAME) | true
publish-registry: publish-registry:
@echo "TODO" @echo "TODO"

2
go.mod
View File

@@ -5,7 +5,7 @@ go 1.23.1
toolchain go1.23.3 toolchain go1.23.3
require ( require (
cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d cloud.o-forge.io/core/oc-lib v0.0.0-20250808141553-f4b0cf5683de
github.com/akamensky/argparse v1.4.0 github.com/akamensky/argparse v1.4.0
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/goraz/onion v0.1.3 github.com/goraz/onion v0.1.3

4
go.sum
View File

@@ -14,10 +14,6 @@ cloud.o-forge.io/core/oc-lib v0.0.0-20250808140536-e7a71188a3b5 h1:bmEG0M99WXWCH
cloud.o-forge.io/core/oc-lib v0.0.0-20250808140536-e7a71188a3b5/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= cloud.o-forge.io/core/oc-lib v0.0.0-20250808140536-e7a71188a3b5/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20250808141553-f4b0cf5683de h1:s47eEnWRCjBMOxbec5ROHztuwu0Zo7MuXgqWizgkiXU= cloud.o-forge.io/core/oc-lib v0.0.0-20250808141553-f4b0cf5683de h1:s47eEnWRCjBMOxbec5ROHztuwu0Zo7MuXgqWizgkiXU=
cloud.o-forge.io/core/oc-lib v0.0.0-20250808141553-f4b0cf5683de/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI= cloud.o-forge.io/core/oc-lib v0.0.0-20250808141553-f4b0cf5683de/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20260113155325-5cdfc28d2f51 h1:jlSEprNaUBe628uP9a9TrJ16Q5Ej6OxHlAKNtrHrN2o=
cloud.o-forge.io/core/oc-lib v0.0.0-20260113155325-5cdfc28d2f51/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d h1:6oGSN4Fb+H7LNVbUEN7vaDtWBHZTdd2Y1BkBdZ7MLXE=
cloud.o-forge.io/core/oc-lib v0.0.0-20260114125749-fa5b7543332d/go.mod h1:vHWauJsS6ryf7UDqq8hRXoYD5RsONxcFTxeZPOztEuI=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc= github.com/akamensky/argparse v1.4.0 h1:YGzvsTqCvbEZhL8zZu2AiA5nq805NZh75JNj4ajn1xc=
github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA= github.com/akamensky/argparse v1.4.0/go.mod h1:S5kwC7IuDcEr5VeXtGPRVZ5o/FdhcMlQz4IZQuw64xA=

22
main.go
View File

@@ -89,7 +89,7 @@ func main() {
logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API") logger.Error().Msg("Could not retrieve workflow " + conf.GetConfig().WorkflowID + " from oc-catalog API")
} }
builder, _, err := new_wf.ExportToArgo(exec, conf.GetConfig().Timeout) // Removed stepMax so far, I don't know if we need it anymore builder, _, err := new_wf.ExportToArgo(exec.ExecutionsID, conf.GetConfig().Timeout) // Removed stepMax so far, I don't know if we need it anymore
if err != nil { if err != nil {
logger.Error().Msg("Could not create the Argo file for " + conf.GetConfig().WorkflowID) logger.Error().Msg("Could not create the Argo file for " + conf.GetConfig().WorkflowID)
logger.Error().Msg(err.Error()) logger.Error().Msg(err.Error())
@@ -110,20 +110,20 @@ func main() {
// Executed in a k8s environment // Executed in a k8s environment
logger.Info().Msg("Executes inside a k8s") logger.Info().Msg("Executes inside a k8s")
// executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID() // executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID()
executeInside(exec.ExecutionsID, argoFilePath) executeInside(conf.GetConfig().ExecutionID, exec.ExecutionsID, argoFilePath)
} }
} }
// So far we only log the output from // So far we only log the output from
func executeInside(ns string, argo_file_path string) { func executeInside(execID string, ns string, argo_file_path string) {
t, err := tools2.NewService(conf.GetConfig().Mode) t, err := tools2.NewService(conf.GetConfig().Mode)
if err != nil { if err != nil {
logger.Error().Msg("Could not create KubernetesTool") logger.Error().Msg("Could not create KubernetesTool")
return return
} }
name, err := t.CreateArgoWorkflow(argo_file_path, ns) name, err := t.CreateArgoWorkflow(argo_file_path, ns)
// _ = name // _ = name
if err != nil { if err != nil {
logger.Error().Msg("Could not create argo workflow : " + err.Error()) logger.Error().Msg("Could not create argo workflow : " + err.Error())
logger.Info().Msg(fmt.Sprint("CA :" + conf.GetConfig().KubeCA)) logger.Info().Msg(fmt.Sprint("CA :" + conf.GetConfig().KubeCA))
@@ -152,20 +152,20 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) {
var wg sync.WaitGroup var wg sync.WaitGroup
var err error var err error
logger.Debug().Msg("executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID) logger.Debug().Msg("executing :" + "argo submit --watch " + argo_file_path + " --serviceaccount sa-" + conf.GetConfig().ExecutionID + " -n " + conf.GetConfig().ExecutionID )
cmdSubmit := exec.Command("argo", "submit", "--watch", argo_file_path, "--serviceaccount", "sa-"+conf.GetConfig().ExecutionID, "-n", conf.GetConfig().ExecutionID) cmdSubmit := exec.Command("argo", "submit", "--watch", argo_file_path, "--serviceaccount", "sa-"+conf.GetConfig().ExecutionID, "-n", conf.GetConfig().ExecutionID)
if stdoutSubmit, err = cmdSubmit.StdoutPipe(); err != nil { if stdoutSubmit, err = cmdSubmit.StdoutPipe(); err != nil {
wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error()) wf_logger.Error().Msg("Could not retrieve stdoutpipe " + err.Error())
return return
} }
cmdLogs := exec.Command("argo", "logs", "oc-monitor-"+workflowName, "-n", conf.GetConfig().ExecutionID, "--follow", "--no-color") cmdLogs := exec.Command("argo", "logs", "oc-monitor-"+workflowName, "-n", conf.GetConfig().ExecutionID, "--follow","--no-color")
if stdoutLogs, err = cmdLogs.StdoutPipe(); err != nil { if stdoutLogs, err = cmdLogs.StdoutPipe(); err != nil {
wf_logger.Error().Msg("Could not retrieve stdoutpipe for 'argo logs'" + err.Error()) wf_logger.Error().Msg("Could not retrieve stdoutpipe for 'argo logs'" + err.Error())
return return
} }
var steps []string var steps []string
for _, template := range workflow.Spec.Templates { for _, template := range workflow.Spec.Templates {
steps = append(steps, template.Name) steps = append(steps, template.Name)
@@ -186,7 +186,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) {
logger.Info().Msg("Running argo logs") logger.Info().Msg("Running argo logs")
if err := cmdLogs.Run(); err != nil { if err := cmdLogs.Run(); err != nil {
wf_logger.Error().Msg("Could not run '" + strings.Join(cmdLogs.Args, " ") + "'") wf_logger.Error().Msg("Could not run '" + strings.Join(cmdLogs.Args, " ") + "'")
wf_logger.Fatal().Msg(err.Error() + bufio.NewScanner(stderrLogs).Text()) wf_logger.Fatal().Msg(err.Error() + bufio.NewScanner(stderrLogs).Text())
} }
@@ -201,6 +201,7 @@ func executeOutside(argo_file_path string, workflow workflow_builder.Workflow) {
wg.Wait() wg.Wait()
} }
func loadConfig(is_k8s bool, parser *argparse.Parser) { func loadConfig(is_k8s bool, parser *argparse.Parser) {
var o *onion.Onion var o *onion.Onion
o = initOnion(o) o = initOnion(o)
@@ -304,6 +305,7 @@ func getContainerName(argo_file string) string {
return container_name return container_name
} }
func updateStatus(status string, log string) { func updateStatus(status string, log string) {
exec_id := conf.GetConfig().ExecutionID exec_id := conf.GetConfig().ExecutionID

View File

@@ -1,16 +1,10 @@
package models package models
import ( import (
"encoding/json"
"fmt"
"oc-monitord/conf"
"strings" "strings"
"cloud.o-forge.io/core/oc-lib/models/common/models" "cloud.o-forge.io/core/oc-lib/models/common/models"
"cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/resources/native_tools"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
) )
type Parameter struct { type Parameter struct {
@@ -102,34 +96,8 @@ type Template struct {
Resource ServiceResource `yaml:"resource,omitempty"` Resource ServiceResource `yaml:"resource,omitempty"`
} }
func (template *Template) CreateEventContainer(exec *workflow_execution.WorkflowExecution, nt *resources.NativeTool, dag *Dag) { func (template *Template) CreateContainer(processing *resources.ProcessingResource, dag *Dag) {
container := Container{Image: "natsio/nats-box"} instance := processing.GetSelectedInstance()
container.Command = []string{"sh", "-c"} // all is bash
var event *native_tools.WorkflowEventParams
b, err := json.Marshal(nt.Params)
if err != nil {
fmt.Println(err)
return
}
err = json.Unmarshal(b, event)
if err != nil {
fmt.Println(err)
return
}
if event != nil {
container.Args = append(container.Args, "nats pub --server "+conf.GetConfig().NatsURL+":4222 "+tools.WORKFLOW_EVENT.GenerateKey(tools.WORKFLOW_EVENT.String())+" '{\"workflow_id\":\""+event.WorkflowResourceID+"\"}'")
container.Args = []string{strings.Join(container.Args, " ")}
template.Container = container
}
}
func (template *Template) CreateContainer(exec *workflow_execution.WorkflowExecution, processing *resources.ProcessingResource, dag *Dag) {
index := 0
if d, ok := exec.SelectedInstances[processing.GetID()]; ok {
index = d
}
instance := processing.GetSelectedInstance(&index)
if instance == nil { if instance == nil {
return return
} }

View File

@@ -18,11 +18,8 @@ import (
"cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/logs"
"cloud.o-forge.io/core/oc-lib/models/common/enum" "cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/resources/native_tools"
w "cloud.o-forge.io/core/oc-lib/models/workflow" w "cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workflow/graph" "cloud.o-forge.io/core/oc-lib/models/workflow/graph"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/tools"
"github.com/nwtgck/go-fakelish" "github.com/nwtgck/go-fakelish"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
@@ -58,22 +55,22 @@ func (b *Workflow) getDag() *Dag {
} }
type Spec struct { type Spec struct {
ServiceAccountName string `yaml:"serviceAccountName,omitempty"` ServiceAccountName string `yaml:"serviceAccountName,omitempty"`
Entrypoint string `yaml:"entrypoint"` Entrypoint string `yaml:"entrypoint"`
Arguments []Parameter `yaml:"arguments,omitempty"` Arguments []Parameter `yaml:"arguments,omitempty"`
Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"` Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"`
Templates []Template `yaml:"templates"` Templates []Template `yaml:"templates"`
Timeout int `yaml:"activeDeadlineSeconds,omitempty"` Timeout int `yaml:"activeDeadlineSeconds,omitempty"`
} }
// TODO: found on a processing instance linked to storage // TODO: found on a processing instance linked to storage
// add s3, gcs, azure, etc if needed on a link between processing and storage // add s3, gcs, azure, etc if needed on a link between processing and storage
func (b *ArgoBuilder) CreateDAG(exec *workflow_execution.WorkflowExecution, namespace string, write bool) (int, []string, []string, error) { func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, []string, error) {
logger = logs.GetLogger() logger = logs.GetLogger()
logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items)) logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items))
// handle services by checking if there is only one processing with hostname and port // handle services by checking if there is only one processing with hostname and port
firstItems, lastItems, volumes := b.createTemplates(exec, namespace) firstItems, lastItems, volumes := b.createTemplates(namespace)
b.createVolumes(exec, volumes) b.createVolumes(volumes)
if b.Timeout > 0 { if b.Timeout > 0 {
b.Workflow.Spec.Timeout = b.Timeout b.Workflow.Spec.Timeout = b.Timeout
@@ -85,44 +82,27 @@ func (b *ArgoBuilder) CreateDAG(exec *workflow_execution.WorkflowExecution, name
if !write { if !write {
return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
} }
return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
} }
func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution, namespace string) ([]string, []string, []VolumeMount) { func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []VolumeMount) {
volumes := []VolumeMount{} volumes := []VolumeMount{}
firstItems := []string{} firstItems := []string{}
lastItems := []string{} lastItems := []string{}
items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing)
logger.Info().Msg(fmt.Sprint("Creating templates", len(items)))
for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) { for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) {
index := 0 instance := item.Processing.GetSelectedInstance()
_, res := item.GetResource()
if d, ok := exec.SelectedInstances[res.GetID()]; ok {
index = d
}
instance := item.Processing.GetSelectedInstance(&index)
logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance)) logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance))
if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil { if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil {
logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName()) logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName())
return firstItems, lastItems, volumes return firstItems, lastItems, volumes
} }
volumes, firstItems, lastItems = b.createArgoTemplates(exec, volumes, firstItems, lastItems = b.createArgoTemplates(namespace,
namespace,
item.ID, item.Processing, volumes, firstItems, lastItems) item.ID, item.Processing, volumes, firstItems, lastItems)
} }
for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsNativeTool) {
if item.NativeTool.Kind != int(native_tools.WORKFLOW_EVENT) {
continue
}
index := 0
_, res := item.GetResource()
if d, ok := exec.SelectedInstances[res.GetID()]; ok {
index = d
}
instance := item.NativeTool.GetSelectedInstance(&index)
logger.Info().Msg(fmt.Sprint("Creating template for", item.NativeTool.GetName(), instance))
volumes, firstItems, lastItems = b.createArgoTemplates(exec,
namespace, item.ID, item.NativeTool, volumes, firstItems, lastItems)
}
firstWfTasks := map[string][]string{} firstWfTasks := map[string][]string{}
latestWfTasks := map[string][]string{} latestWfTasks := map[string][]string{}
relatedWfTasks := map[string][]string{} relatedWfTasks := map[string][]string{}
@@ -133,7 +113,7 @@ func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution
continue continue
} }
subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Timeout: b.Timeout} subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Timeout: b.Timeout}
_, fi, li, err := subBuilder.CreateDAG(exec, namespace, false) _, fi, li, err := subBuilder.CreateDAG(namespace, false)
if err != nil { if err != nil {
logger.Error().Msg("Error creating the subworkflow : " + err.Error()) logger.Error().Msg("Error creating the subworkflow : " + err.Error())
continue continue
@@ -190,42 +170,36 @@ func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution
return firstItems, lastItems, volumes return firstItems, lastItems, volumes
} }
func (b *ArgoBuilder) createArgoTemplates( func (b *ArgoBuilder) createArgoTemplates(namespace string,
exec *workflow_execution.WorkflowExecution,
namespace string,
id string, id string,
processing resources.ResourceInterface, processing *resources.ProcessingResource,
volumes []VolumeMount, volumes []VolumeMount,
firstItems []string, firstItems []string,
lastItems []string) ([]VolumeMount, []string, []string) { lastItems []string) ([]VolumeMount, []string, []string) {
_, firstItems, lastItems = b.addTaskToArgo(exec, b.Workflow.getDag(), id, processing, firstItems, lastItems) _, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems)
template := &Template{Name: getArgoName(processing.GetName(), id)} template := &Template{Name: getArgoName(processing.GetName(), id)}
logger.Info().Msg(fmt.Sprint("Creating template for", template.Name)) logger.Info().Msg(fmt.Sprint("Creating template for", template.Name))
isReparted, peerId := b.isProcessingReparted(processing, id) isReparted, peerId := b.isProcessingReparted(*processing, id)
if processing.GetType() == tools.PROCESSING_RESOURCE.String() { template.CreateContainer(processing, b.Workflow.getDag())
template.CreateContainer(exec, processing.(*resources.ProcessingResource), b.Workflow.getDag())
} else if processing.GetType() == tools.NATIVE_TOOL.String() {
template.CreateEventContainer(exec, processing.(*resources.NativeTool), b.Workflow.getDag())
}
if isReparted { if isReparted {
logger.Debug().Msg("Reparted processing, on " + peerId) logger.Debug().Msg("Reparted processing, on " + peerId)
b.RemotePeers = append(b.RemotePeers, peerId) b.RemotePeers = append(b.RemotePeers, peerId)
template.AddAdmiraltyAnnotations(peerId) template.AddAdmiraltyAnnotations(peerId)
} }
// get datacenter from the processing // get datacenter from the processing
if processing.GetType() == tools.PROCESSING_RESOURCE.String() && processing.(*resources.ProcessingResource).IsService { if processing.IsService {
b.CreateService(exec, id, processing) b.CreateService(id, processing)
template.Metadata.Labels = make(map[string]string) template.Metadata.Labels = make(map[string]string)
template.Metadata.Labels["app"] = "oc-service-" + processing.GetName() // Construct the template for the k8s service and add a link in graph between k8s service and processing template.Metadata.Labels["app"] = "oc-service-" + processing.GetName() // Construct the template for the k8s service and add a link in graph between k8s service and processing
} }
volumes = b.addStorageAnnotations(exec, id, template, namespace, volumes) volumes = b.addStorageAnnotations(id, template, namespace, volumes)
b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, *template) b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, *template)
return volumes, firstItems, lastItems return volumes, firstItems, lastItems
} }
func (b *ArgoBuilder) addStorageAnnotations(exec *workflow_execution.WorkflowExecution, id string, template *Template, namespace string, volumes []VolumeMount) []VolumeMount { func (b *ArgoBuilder) addStorageAnnotations(id string, template *Template, namespace string, volumes []VolumeMount) []VolumeMount {
related := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.Graph.IsStorage) // Retrieve all of the storage node linked to the processing for which we create the template related := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.Graph.IsStorage) // Retrieve all of the storage node linked to the processing for which we create the template
for _, r := range related { for _, r := range related {
@@ -244,7 +218,7 @@ func (b *ArgoBuilder) addStorageAnnotations(exec *workflow_execution.WorkflowExe
if storage.StorageType == enum.S3 { if storage.StorageType == enum.S3 {
b.addS3annotations(exec, &art, template, rw, linkToStorage, storage, namespace) b.addS3annotations(&art, template, rw, linkToStorage, storage, namespace)
} }
if rw.Write { if rw.Write {
@@ -255,8 +229,8 @@ func (b *ArgoBuilder) addStorageAnnotations(exec *workflow_execution.WorkflowExe
} }
} }
index := 0 index := 0
if s, ok := exec.SelectedInstances[storage.GetID()]; ok { if storage.SelectedInstanceIndex != nil && (*storage.SelectedInstanceIndex) >= 0 {
index = s index = *storage.SelectedInstanceIndex
} }
s := storage.Instances[index] s := storage.Instances[index]
if s.Local { if s.Local {
@@ -268,10 +242,10 @@ func (b *ArgoBuilder) addStorageAnnotations(exec *workflow_execution.WorkflowExe
} }
} }
return volumes return volumes
} }
func (b *ArgoBuilder) addS3annotations(exec *workflow_execution.WorkflowExecution, art *Artifact, template *Template, rw graph.StorageProcessingGraphLink, linkToStorage graph.GraphLink, storage *resources.StorageResource, namespace string) {
func (b *ArgoBuilder) addS3annotations(art *Artifact, template *Template, rw graph.StorageProcessingGraphLink, linkToStorage graph.GraphLink, storage *resources.StorageResource, namespace string) {
art.S3 = &Key{ art.S3 = &Key{
// Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env), // Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env),
Insecure: true, // temporary Insecure: true, // temporary
@@ -279,65 +253,58 @@ func (b *ArgoBuilder) addS3annotations(exec *workflow_execution.WorkflowExecutio
if rw.Write { if rw.Write {
art.S3.Key = rw.Destination + "/" + rw.FileName art.S3.Key = rw.Destination + "/" + rw.FileName
} else { } else {
art.S3.Key = rw.Source art.S3.Key = rw.Source
} }
index := 0 sel := storage.GetSelectedInstance()
if d, ok := exec.SelectedInstances[storage.GetID()]; ok {
index = d
}
sel := storage.GetSelectedInstance(&index)
// v0.1 : add the storage.Source to the s3 object // v0.1 : add the storage.Source to the s3 object
// v0.2 : test if the storage.Source exists in the configMap and quit if not // v0.2 : test if the storage.Source exists in the configMap and quit if not
// v1 : v0.2 + if doesn't exist edit/create the configMap with the response from API call // v1 : v0.2 + if doesn't exist edit/create the configMap with the response from API call
if sel != nil { if sel != nil {
b.addAuthInformation(exec, storage, namespace, art) b.addAuthInformation(storage, namespace, art)
art.S3.Bucket = namespace // DEFAULT : will need to update this to create an unique art.S3.Bucket = namespace // DEFAULT : will need to update this to create an unique
art.S3.EndPoint = sel.(*resources.StorageResourceInstance).Source art.S3.EndPoint = sel.(*resources.StorageResourceInstance).Source
} }
} }
func (b *ArgoBuilder) addAuthInformation(exec *workflow_execution.WorkflowExecution, storage *resources.StorageResource, namespace string, art *Artifact) {
index := 0
if d, ok := exec.SelectedInstances[storage.GetID()]; ok {
index = d
}
sel := storage.GetSelectedInstance(&index)
func (b *ArgoBuilder) addAuthInformation(storage *resources.StorageResource, namespace string, art *Artifact) {
sel := storage.GetSelectedInstance()
tool, err := tools2.NewService(conf.GetConfig().Mode) tool, err := tools2.NewService(conf.GetConfig().Mode)
if err != nil || tool == nil { if err != nil || tool == nil {
logger.Fatal().Msg("Could not create the access secret :" + err.Error()) logger.Fatal().Msg("Could not create the access secret :" + err.Error())
} }
secretName, err := b.SetupS3Credentials(storage, namespace, tool) // this method return should be updated once we have decided how to retrieve credentials secretName, err := b.SetupS3Credentials(storage, namespace, tool) // this method return should be updated once we have decided how to retrieve credentials
if err == nil { if err == nil {
art.S3.AccessKeySecret = &Secret{ art.S3.AccessKeySecret = &Secret{
Name: secretName, Name: secretName,
Key: "access-key", Key: "access-key",
} }
art.S3.SecretKeySecret = &Secret{ art.S3.SecretKeySecret = &Secret{
Name: secretName, Name: secretName,
Key: "secret-key", Key: "secret-key",
} }
} }
art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source+"/", "") art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source+"/", "")
art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source, "") art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source, "")
splits := strings.Split(art.S3.EndPoint, "/") splits := strings.Split(art.S3.EndPoint, "/")
if len(splits) > 1 { if len(splits) > 1 {
art.S3.Bucket = splits[0] art.S3.Bucket = splits[0]
art.S3.EndPoint = strings.Join(splits[1:], "/") art.S3.EndPoint = strings.Join(splits[1:], "/")
} else { } else {
art.S3.Bucket = splits[0] art.S3.Bucket = splits[0]
} }
} }
func (b *ArgoBuilder) SetupS3Credentials(storage *resources.StorageResource, namespace string, tool tools2.Tool) (string, error) { func (b *ArgoBuilder) SetupS3Credentials(storage *resources.StorageResource, namespace string, tool tools2.Tool) (string, error) {
s := tool.GetS3Secret(storage.UUID, namespace) s := tool.GetS3Secret(storage.UUID, namespace)
// var s *v1.Secret // var s *v1.Secret
accessKey, secretKey := retrieveMinioCredential("peer", namespace) accessKey, secretKey := retrieveMinioCredential("peer",namespace)
if s == nil { if s == nil {
id, err := tool.CreateAccessSecret( id, err := tool.CreateAccessSecret(
accessKey, accessKey,
@@ -352,24 +319,22 @@ func (b *ArgoBuilder) SetupS3Credentials(storage *resources.StorageResource, nam
return id, nil return id, nil
} }
return s.Name, nil
// s.Name = "toto"
return s.Name, nil
} }
// This method needs to evolve to an API call to the peer passed as a parameter // This method needs to evolve to an API call to the peer passed as a parameter
func retrieveMinioCredential(peer string, namespace string) (string, string) { func retrieveMinioCredential(peer string, namespace string) (string,string) {
return "hF9wRGog75JuMdshWeEZ", "OwXXJkVQyb5l1aVPdOegKOtDJGoP1dJYeo8O7mDW" return "hF9wRGog75JuMdshWeEZ", "OwXXJkVQyb5l1aVPdOegKOtDJGoP1dJYeo8O7mDW"
} }
func (b *ArgoBuilder) addTaskToArgo(exec *workflow_execution.WorkflowExecution, dag *Dag, graphItemID string, processing resources.ResourceInterface, func (b *ArgoBuilder) addTaskToArgo(dag *Dag, graphItemID string, processing *resources.ProcessingResource,
firstItems []string, lastItems []string) (*Dag, []string, []string) { firstItems []string, lastItems []string) (*Dag, []string, []string) {
unique_name := getArgoName(processing.GetName(), graphItemID) unique_name := getArgoName(processing.GetName(), graphItemID)
step := Task{Name: unique_name, Template: unique_name} step := Task{Name: unique_name, Template: unique_name}
index := 0 instance := processing.GetSelectedInstance()
if d, ok := exec.SelectedInstances[processing.GetID()]; ok {
index = d
}
instance := processing.GetSelectedInstance(&index)
if instance != nil { if instance != nil {
for _, value := range instance.(*resources.ProcessingInstance).Env { for _, value := range instance.(*resources.ProcessingInstance).Env {
step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{ step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{
@@ -408,11 +373,11 @@ func (b *ArgoBuilder) addTaskToArgo(exec *workflow_execution.WorkflowExecution,
return dag, firstItems, lastItems return dag, firstItems, lastItems
} }
func (b *ArgoBuilder) createVolumes(exec *workflow_execution.WorkflowExecution, volumes []VolumeMount) { // TODO : one think about remote volume but TG func (b *ArgoBuilder) createVolumes(volumes []VolumeMount) { // TODO : one think about remote volume but TG
for _, volume := range volumes { for _, volume := range volumes {
index := 0 index := 0
if s, ok := exec.SelectedInstances[volume.Storage.GetID()]; ok { if volume.Storage.SelectedInstanceIndex != nil && (*volume.Storage.SelectedInstanceIndex) >= 0 {
index = s index = *volume.Storage.SelectedInstanceIndex
} }
storage := volume.Storage.Instances[index] storage := volume.Storage.Instances[index]
new_volume := VolumeClaimTemplate{} new_volume := VolumeClaimTemplate{}
@@ -470,10 +435,10 @@ func getArgoName(raw_name string, component_id string) (formatedName string) {
// Verify if a processing resource is attached to another Compute than the one hosting // Verify if a processing resource is attached to another Compute than the one hosting
// the current Open Cloud instance. If true return the peer ID to contact // the current Open Cloud instance. If true return the peer ID to contact
func (b *ArgoBuilder) isProcessingReparted(processing resources.ResourceInterface, graphID string) (bool, string) { func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource, graphID string) (bool, string) {
computeAttached := b.retrieveProcessingCompute(graphID) computeAttached := b.retrieveProcessingCompute(graphID)
if computeAttached == nil { if computeAttached == nil {
logger.Error().Msg("No compute was found attached to processing " + processing.GetName() + " : " + processing.GetID()) logger.Error().Msg("No compute was found attached to processing " + processing.Name + " : " + processing.UUID)
panic(0) panic(0)
} }
@@ -508,7 +473,7 @@ func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.Compu
} else if link.Destination.ID == graphID { } else if link.Destination.ID == graphID {
oppositeId = link.Source.ID oppositeId = link.Source.ID
} }
if oppositeId != "" { if oppositeId != "" {
dt, res := b.OriginWorkflow.Graph.GetResource(oppositeId) dt, res := b.OriginWorkflow.Graph.GetResource(oppositeId)
if dt == oclib.COMPUTE_RESOURCE { if dt == oclib.COMPUTE_RESOURCE {
@@ -530,10 +495,10 @@ func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) {
// Setup admiralty for each node // Setup admiralty for each node
for _, peer := range b.RemotePeers { for _, peer := range b.RemotePeers {
logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer)) logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer))
setter.InitializeAdmiralty(conf.GetConfig().PeerID, peer) setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer)
} }
// Update the name of the admiralty node to use // Update the name of the admiralty node to use
for _, template := range b.Workflow.Spec.Templates { for _, template := range b.Workflow.Spec.Templates {
if len(template.Metadata.Annotations) > 0 { if len(template.Metadata.Annotations) > 0 {
if peerId, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok { if peerId, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok {
@@ -549,14 +514,14 @@ func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) {
yamlified, err := yaml.Marshal(b.Workflow) yamlified, err := yaml.Marshal(b.Workflow)
if err != nil { if err != nil {
logger.Error().Msg("Could not transform object to yaml file") logger.Error().Msg("Could not transform object to yaml file")
return "", err return "", err
} }
// Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss // Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss
current_timestamp := time.Now().Format("02_01_2006_150405") current_timestamp := time.Now().Format("02_01_2006_150405")
file_name := random_name + "_" + current_timestamp + ".yml" file_name := random_name + "_" + current_timestamp + ".yml"
workflows_dir := "./argo_workflows/" workflows_dir := "./argo_workflows/"
err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660) err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660)
if err != nil { if err != nil {
logger.Error().Msg("Could not write the yaml file") logger.Error().Msg("Could not write the yaml file")
return "", err return "", err

View File

@@ -5,11 +5,10 @@ import (
"strings" "strings"
"cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
) )
func (b *ArgoBuilder) CreateService(exec *workflow_execution.WorkflowExecution, id string, processing resources.ResourceInterface) { func (b *ArgoBuilder) CreateService(id string, processing *resources.ProcessingResource) {
new_service := models.Service{ new_service := models.Service{
APIVersion: "v1", APIVersion: "v1",
Kind: "Service", Kind: "Service",
@@ -25,21 +24,17 @@ func (b *ArgoBuilder) CreateService(exec *workflow_execution.WorkflowExecution,
if processing == nil { if processing == nil {
return return
} }
b.completeServicePorts(exec, &new_service, id, processing) b.completeServicePorts(&new_service, id, processing)
b.Services = append(b.Services, &new_service) b.Services = append(b.Services, &new_service)
} }
func (b *ArgoBuilder) completeServicePorts(exec *workflow_execution.WorkflowExecution, service *models.Service, id string, processing resources.ResourceInterface) { func (b *ArgoBuilder) completeServicePorts(service *models.Service, id string, processing *resources.ProcessingResource) {
index := 0 instance := processing.GetSelectedInstance()
if d, ok := exec.SelectedInstances[processing.GetID()]; ok {
index = d
}
instance := processing.GetSelectedInstance(&index)
if instance != nil && instance.(*resources.ProcessingInstance).Access != nil && instance.(*resources.ProcessingInstance).Access.Container != nil { if instance != nil && instance.(*resources.ProcessingInstance).Access != nil && instance.(*resources.ProcessingInstance).Access.Container != nil {
for _, execute := range instance.(*resources.ProcessingInstance).Access.Container.Exposes { for _, execute := range instance.(*resources.ProcessingInstance).Access.Container.Exposes {
if execute.PAT != 0 { if execute.PAT != 0 {
new_port_translation := models.ServicePort{ new_port_translation := models.ServicePort{
Name: strings.ToLower(processing.GetName()) + id, Name: strings.ToLower(processing.Name) + id,
Port: execute.Port, Port: execute.Port,
TargetPort: execute.PAT, TargetPort: execute.PAT,
Protocol: "TCP", Protocol: "TCP",

View File

@@ -6,7 +6,6 @@ import (
oclib "cloud.o-forge.io/core/oc-lib" oclib "cloud.o-forge.io/core/oc-lib"
workflow "cloud.o-forge.io/core/oc-lib/models/workflow" workflow "cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
) )
type WorflowDB struct { type WorflowDB struct {
@@ -42,7 +41,7 @@ func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *wo
return new_wf, nil return new_wf, nil
} }
func (w *WorflowDB) ExportToArgo(exec *workflow_execution.WorkflowExecution, timeout int) (*ArgoBuilder, int, error) { func (w *WorflowDB) ExportToArgo(namespace string, timeout int) (*ArgoBuilder, int, error) {
logger := oclib.GetLogger() logger := oclib.GetLogger()
logger.Info().Msg(fmt.Sprint("Exporting to Argo", w.Workflow)) logger.Info().Msg(fmt.Sprint("Exporting to Argo", w.Workflow))
if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil { if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil {
@@ -50,7 +49,7 @@ func (w *WorflowDB) ExportToArgo(exec *workflow_execution.WorkflowExecution, tim
} }
argoBuilder := ArgoBuilder{OriginWorkflow: w.Workflow, Timeout: timeout} argoBuilder := ArgoBuilder{OriginWorkflow: w.Workflow, Timeout: timeout}
stepMax, _, _, err := argoBuilder.CreateDAG(exec, exec.ExecutionsID, true) stepMax, _, _, err := argoBuilder.CreateDAG(namespace, true)
if err != nil { if err != nil {
logger.Error().Msg("Could not create the argo file for " + w.Workflow.Name) logger.Error().Msg("Could not create the argo file for " + w.Workflow.Name)
return nil, 0, err return nil, 0, err