Event Argo Building
This commit is contained in:
@@ -18,8 +18,11 @@ import (
|
||||
"cloud.o-forge.io/core/oc-lib/logs"
|
||||
"cloud.o-forge.io/core/oc-lib/models/common/enum"
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources"
|
||||
"cloud.o-forge.io/core/oc-lib/models/resources/native_tools"
|
||||
w "cloud.o-forge.io/core/oc-lib/models/workflow"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
|
||||
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
|
||||
"cloud.o-forge.io/core/oc-lib/tools"
|
||||
"github.com/nwtgck/go-fakelish"
|
||||
"github.com/rs/zerolog"
|
||||
"gopkg.in/yaml.v3"
|
||||
@@ -55,22 +58,22 @@ func (b *Workflow) getDag() *Dag {
|
||||
}
|
||||
|
||||
type Spec struct {
|
||||
ServiceAccountName string `yaml:"serviceAccountName,omitempty"`
|
||||
Entrypoint string `yaml:"entrypoint"`
|
||||
Arguments []Parameter `yaml:"arguments,omitempty"`
|
||||
Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"`
|
||||
Templates []Template `yaml:"templates"`
|
||||
Timeout int `yaml:"activeDeadlineSeconds,omitempty"`
|
||||
ServiceAccountName string `yaml:"serviceAccountName,omitempty"`
|
||||
Entrypoint string `yaml:"entrypoint"`
|
||||
Arguments []Parameter `yaml:"arguments,omitempty"`
|
||||
Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"`
|
||||
Templates []Template `yaml:"templates"`
|
||||
Timeout int `yaml:"activeDeadlineSeconds,omitempty"`
|
||||
}
|
||||
|
||||
// TODO: found on a processing instance linked to storage
|
||||
// add s3, gcs, azure, etc if needed on a link between processing and storage
|
||||
func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, []string, error) {
|
||||
func (b *ArgoBuilder) CreateDAG(exec *workflow_execution.WorkflowExecution, namespace string, write bool) (int, []string, []string, error) {
|
||||
logger = logs.GetLogger()
|
||||
logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items))
|
||||
// handle services by checking if there is only one processing with hostname and port
|
||||
firstItems, lastItems, volumes := b.createTemplates(namespace)
|
||||
b.createVolumes(volumes)
|
||||
firstItems, lastItems, volumes := b.createTemplates(exec, namespace)
|
||||
b.createVolumes(exec, volumes)
|
||||
|
||||
if b.Timeout > 0 {
|
||||
b.Workflow.Spec.Timeout = b.Timeout
|
||||
@@ -82,27 +85,44 @@ func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, [
|
||||
if !write {
|
||||
return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
|
||||
}
|
||||
|
||||
|
||||
return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
|
||||
|
||||
return len(b.Workflow.getDag().Tasks), firstItems, lastItems, nil
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []VolumeMount) {
|
||||
|
||||
func (b *ArgoBuilder) createTemplates(exec *workflow_execution.WorkflowExecution, namespace string) ([]string, []string, []VolumeMount) {
|
||||
volumes := []VolumeMount{}
|
||||
firstItems := []string{}
|
||||
lastItems := []string{}
|
||||
items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing)
|
||||
logger.Info().Msg(fmt.Sprint("Creating templates", len(items)))
|
||||
for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) {
|
||||
instance := item.Processing.GetSelectedInstance()
|
||||
index := 0
|
||||
_, res := item.GetResource()
|
||||
if d, ok := exec.SelectedInstances[res.GetID()]; ok {
|
||||
index = d
|
||||
}
|
||||
instance := item.Processing.GetSelectedInstance(&index)
|
||||
logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance))
|
||||
if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil {
|
||||
logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName())
|
||||
return firstItems, lastItems, volumes
|
||||
}
|
||||
volumes, firstItems, lastItems = b.createArgoTemplates(namespace,
|
||||
volumes, firstItems, lastItems = b.createArgoTemplates(exec,
|
||||
namespace,
|
||||
item.ID, item.Processing, volumes, firstItems, lastItems)
|
||||
}
|
||||
for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsNativeTool) {
|
||||
if item.NativeTool.Kind != int(native_tools.WORKFLOW_EVENT) {
|
||||
continue
|
||||
}
|
||||
index := 0
|
||||
_, res := item.GetResource()
|
||||
if d, ok := exec.SelectedInstances[res.GetID()]; ok {
|
||||
index = d
|
||||
}
|
||||
instance := item.NativeTool.GetSelectedInstance(&index)
|
||||
logger.Info().Msg(fmt.Sprint("Creating template for", item.NativeTool.GetName(), instance))
|
||||
volumes, firstItems, lastItems = b.createArgoTemplates(exec,
|
||||
namespace, item.ID, item.NativeTool, volumes, firstItems, lastItems)
|
||||
}
|
||||
firstWfTasks := map[string][]string{}
|
||||
latestWfTasks := map[string][]string{}
|
||||
relatedWfTasks := map[string][]string{}
|
||||
@@ -113,7 +133,7 @@ func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []V
|
||||
continue
|
||||
}
|
||||
subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Timeout: b.Timeout}
|
||||
_, fi, li, err := subBuilder.CreateDAG(namespace, false)
|
||||
_, fi, li, err := subBuilder.CreateDAG(exec, namespace, false)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Error creating the subworkflow : " + err.Error())
|
||||
continue
|
||||
@@ -170,36 +190,42 @@ func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []V
|
||||
return firstItems, lastItems, volumes
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) createArgoTemplates(namespace string,
|
||||
func (b *ArgoBuilder) createArgoTemplates(
|
||||
exec *workflow_execution.WorkflowExecution,
|
||||
namespace string,
|
||||
id string,
|
||||
processing *resources.ProcessingResource,
|
||||
processing resources.ResourceInterface,
|
||||
volumes []VolumeMount,
|
||||
firstItems []string,
|
||||
lastItems []string) ([]VolumeMount, []string, []string) {
|
||||
_, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems)
|
||||
_, firstItems, lastItems = b.addTaskToArgo(exec, b.Workflow.getDag(), id, processing, firstItems, lastItems)
|
||||
template := &Template{Name: getArgoName(processing.GetName(), id)}
|
||||
logger.Info().Msg(fmt.Sprint("Creating template for", template.Name))
|
||||
isReparted, peerId := b.isProcessingReparted(*processing, id)
|
||||
template.CreateContainer(processing, b.Workflow.getDag())
|
||||
|
||||
isReparted, peerId := b.isProcessingReparted(processing, id)
|
||||
if processing.GetType() == tools.PROCESSING_RESOURCE.String() {
|
||||
template.CreateContainer(exec, processing.(*resources.ProcessingResource), b.Workflow.getDag())
|
||||
} else if processing.GetType() == tools.NATIVE_TOOL.String() {
|
||||
template.CreateEventContainer(exec, processing.(*resources.NativeTool), b.Workflow.getDag())
|
||||
}
|
||||
|
||||
if isReparted {
|
||||
logger.Debug().Msg("Reparted processing, on " + peerId)
|
||||
b.RemotePeers = append(b.RemotePeers, peerId)
|
||||
template.AddAdmiraltyAnnotations(peerId)
|
||||
}
|
||||
// get datacenter from the processing
|
||||
if processing.IsService {
|
||||
b.CreateService(id, processing)
|
||||
if processing.GetType() == tools.PROCESSING_RESOURCE.String() && processing.(*resources.ProcessingResource).IsService {
|
||||
b.CreateService(exec, id, processing)
|
||||
template.Metadata.Labels = make(map[string]string)
|
||||
template.Metadata.Labels["app"] = "oc-service-" + processing.GetName() // Construct the template for the k8s service and add a link in graph between k8s service and processing
|
||||
}
|
||||
|
||||
volumes = b.addStorageAnnotations(id, template, namespace, volumes)
|
||||
volumes = b.addStorageAnnotations(exec, id, template, namespace, volumes)
|
||||
b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, *template)
|
||||
return volumes, firstItems, lastItems
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) addStorageAnnotations(id string, template *Template, namespace string, volumes []VolumeMount) []VolumeMount {
|
||||
func (b *ArgoBuilder) addStorageAnnotations(exec *workflow_execution.WorkflowExecution, id string, template *Template, namespace string, volumes []VolumeMount) []VolumeMount {
|
||||
related := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.Graph.IsStorage) // Retrieve all of the storage node linked to the processing for which we create the template
|
||||
|
||||
for _, r := range related {
|
||||
@@ -218,7 +244,7 @@ func (b *ArgoBuilder) addStorageAnnotations(id string, template *Template, names
|
||||
|
||||
if storage.StorageType == enum.S3 {
|
||||
|
||||
b.addS3annotations(&art, template, rw, linkToStorage, storage, namespace)
|
||||
b.addS3annotations(exec, &art, template, rw, linkToStorage, storage, namespace)
|
||||
}
|
||||
|
||||
if rw.Write {
|
||||
@@ -229,8 +255,8 @@ func (b *ArgoBuilder) addStorageAnnotations(id string, template *Template, names
|
||||
}
|
||||
}
|
||||
index := 0
|
||||
if storage.SelectedInstanceIndex != nil && (*storage.SelectedInstanceIndex) >= 0 {
|
||||
index = *storage.SelectedInstanceIndex
|
||||
if s, ok := exec.SelectedInstances[storage.GetID()]; ok {
|
||||
index = s
|
||||
}
|
||||
s := storage.Instances[index]
|
||||
if s.Local {
|
||||
@@ -242,10 +268,10 @@ func (b *ArgoBuilder) addStorageAnnotations(id string, template *Template, names
|
||||
}
|
||||
}
|
||||
return volumes
|
||||
}
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) addS3annotations(exec *workflow_execution.WorkflowExecution, art *Artifact, template *Template, rw graph.StorageProcessingGraphLink, linkToStorage graph.GraphLink, storage *resources.StorageResource, namespace string) {
|
||||
|
||||
func (b *ArgoBuilder) addS3annotations(art *Artifact, template *Template, rw graph.StorageProcessingGraphLink, linkToStorage graph.GraphLink, storage *resources.StorageResource, namespace string) {
|
||||
|
||||
art.S3 = &Key{
|
||||
// Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env),
|
||||
Insecure: true, // temporary
|
||||
@@ -253,58 +279,65 @@ func (b *ArgoBuilder) addS3annotations(art *Artifact, template *Template, rw gra
|
||||
if rw.Write {
|
||||
art.S3.Key = rw.Destination + "/" + rw.FileName
|
||||
} else {
|
||||
art.S3.Key = rw.Source
|
||||
art.S3.Key = rw.Source
|
||||
}
|
||||
sel := storage.GetSelectedInstance()
|
||||
index := 0
|
||||
if d, ok := exec.SelectedInstances[storage.GetID()]; ok {
|
||||
index = d
|
||||
}
|
||||
sel := storage.GetSelectedInstance(&index)
|
||||
// v0.1 : add the storage.Source to the s3 object
|
||||
// v0.2 : test if the storage.Source exists in the configMap and quit if not
|
||||
// v1 : v0.2 + if doesn't exist edit/create the configMap with the response from API call
|
||||
if sel != nil {
|
||||
b.addAuthInformation(storage, namespace, art)
|
||||
art.S3.Bucket = namespace // DEFAULT : will need to update this to create an unique
|
||||
art.S3.EndPoint = sel.(*resources.StorageResourceInstance).Source
|
||||
b.addAuthInformation(exec, storage, namespace, art)
|
||||
art.S3.Bucket = namespace // DEFAULT : will need to update this to create an unique
|
||||
art.S3.EndPoint = sel.(*resources.StorageResourceInstance).Source
|
||||
}
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) addAuthInformation(exec *workflow_execution.WorkflowExecution, storage *resources.StorageResource, namespace string, art *Artifact) {
|
||||
index := 0
|
||||
if d, ok := exec.SelectedInstances[storage.GetID()]; ok {
|
||||
index = d
|
||||
}
|
||||
|
||||
sel := storage.GetSelectedInstance(&index)
|
||||
|
||||
func (b *ArgoBuilder) addAuthInformation(storage *resources.StorageResource, namespace string, art *Artifact) {
|
||||
|
||||
sel := storage.GetSelectedInstance()
|
||||
|
||||
tool, err := tools2.NewService(conf.GetConfig().Mode)
|
||||
if err != nil || tool == nil {
|
||||
logger.Fatal().Msg("Could not create the access secret :" + err.Error())
|
||||
}
|
||||
|
||||
secretName, err := b.SetupS3Credentials(storage, namespace, tool) // this method return should be updated once we have decided how to retrieve credentials
|
||||
|
||||
if err == nil {
|
||||
art.S3.AccessKeySecret = &Secret{
|
||||
Name: secretName,
|
||||
Key: "access-key",
|
||||
}
|
||||
art.S3.SecretKeySecret = &Secret{
|
||||
Name: secretName,
|
||||
Key: "secret-key",
|
||||
}
|
||||
|
||||
secretName, err := b.SetupS3Credentials(storage, namespace, tool) // this method return should be updated once we have decided how to retrieve credentials
|
||||
|
||||
if err == nil {
|
||||
art.S3.AccessKeySecret = &Secret{
|
||||
Name: secretName,
|
||||
Key: "access-key",
|
||||
}
|
||||
art.S3.SecretKeySecret = &Secret{
|
||||
Name: secretName,
|
||||
Key: "secret-key",
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source+"/", "")
|
||||
art.S3.Key = strings.ReplaceAll(art.S3.Key, sel.(*resources.StorageResourceInstance).Source, "")
|
||||
splits := strings.Split(art.S3.EndPoint, "/")
|
||||
if len(splits) > 1 {
|
||||
art.S3.Bucket = splits[0]
|
||||
art.S3.EndPoint = strings.Join(splits[1:], "/")
|
||||
} else {
|
||||
art.S3.Bucket = splits[0]
|
||||
} else {
|
||||
art.S3.Bucket = splits[0]
|
||||
}
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) SetupS3Credentials(storage *resources.StorageResource, namespace string, tool tools2.Tool) (string, error) {
|
||||
s := tool.GetS3Secret(storage.UUID, namespace)
|
||||
// var s *v1.Secret
|
||||
accessKey, secretKey := retrieveMinioCredential("peer",namespace)
|
||||
|
||||
accessKey, secretKey := retrieveMinioCredential("peer", namespace)
|
||||
|
||||
if s == nil {
|
||||
id, err := tool.CreateAccessSecret(
|
||||
accessKey,
|
||||
@@ -319,22 +352,24 @@ func (b *ArgoBuilder) SetupS3Credentials(storage *resources.StorageResource, nam
|
||||
|
||||
return id, nil
|
||||
}
|
||||
|
||||
// s.Name = "toto"
|
||||
return s.Name, nil
|
||||
|
||||
|
||||
}
|
||||
|
||||
// This method needs to evolve to an API call to the peer passed as a parameter
|
||||
func retrieveMinioCredential(peer string, namespace string) (string,string) {
|
||||
func retrieveMinioCredential(peer string, namespace string) (string, string) {
|
||||
return "hF9wRGog75JuMdshWeEZ", "OwXXJkVQyb5l1aVPdOegKOtDJGoP1dJYeo8O7mDW"
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) addTaskToArgo(dag *Dag, graphItemID string, processing *resources.ProcessingResource,
|
||||
func (b *ArgoBuilder) addTaskToArgo(exec *workflow_execution.WorkflowExecution, dag *Dag, graphItemID string, processing resources.ResourceInterface,
|
||||
firstItems []string, lastItems []string) (*Dag, []string, []string) {
|
||||
unique_name := getArgoName(processing.GetName(), graphItemID)
|
||||
step := Task{Name: unique_name, Template: unique_name}
|
||||
instance := processing.GetSelectedInstance()
|
||||
unique_name := getArgoName(processing.GetName(), graphItemID)
|
||||
step := Task{Name: unique_name, Template: unique_name}
|
||||
index := 0
|
||||
if d, ok := exec.SelectedInstances[processing.GetID()]; ok {
|
||||
index = d
|
||||
}
|
||||
instance := processing.GetSelectedInstance(&index)
|
||||
if instance != nil {
|
||||
for _, value := range instance.(*resources.ProcessingInstance).Env {
|
||||
step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{
|
||||
@@ -373,11 +408,11 @@ func (b *ArgoBuilder) addTaskToArgo(dag *Dag, graphItemID string, processing *re
|
||||
return dag, firstItems, lastItems
|
||||
}
|
||||
|
||||
func (b *ArgoBuilder) createVolumes(volumes []VolumeMount) { // TODO : one think about remote volume but TG
|
||||
func (b *ArgoBuilder) createVolumes(exec *workflow_execution.WorkflowExecution, volumes []VolumeMount) { // TODO : one think about remote volume but TG
|
||||
for _, volume := range volumes {
|
||||
index := 0
|
||||
if volume.Storage.SelectedInstanceIndex != nil && (*volume.Storage.SelectedInstanceIndex) >= 0 {
|
||||
index = *volume.Storage.SelectedInstanceIndex
|
||||
if s, ok := exec.SelectedInstances[volume.Storage.GetID()]; ok {
|
||||
index = s
|
||||
}
|
||||
storage := volume.Storage.Instances[index]
|
||||
new_volume := VolumeClaimTemplate{}
|
||||
@@ -435,10 +470,10 @@ func getArgoName(raw_name string, component_id string) (formatedName string) {
|
||||
|
||||
// Verify if a processing resource is attached to another Compute than the one hosting
|
||||
// the current Open Cloud instance. If true return the peer ID to contact
|
||||
func (b *ArgoBuilder) isProcessingReparted(processing resources.ProcessingResource, graphID string) (bool, string) {
|
||||
func (b *ArgoBuilder) isProcessingReparted(processing resources.ResourceInterface, graphID string) (bool, string) {
|
||||
computeAttached := b.retrieveProcessingCompute(graphID)
|
||||
if computeAttached == nil {
|
||||
logger.Error().Msg("No compute was found attached to processing " + processing.Name + " : " + processing.UUID)
|
||||
logger.Error().Msg("No compute was found attached to processing " + processing.GetName() + " : " + processing.GetID())
|
||||
panic(0)
|
||||
}
|
||||
|
||||
@@ -473,7 +508,7 @@ func (b *ArgoBuilder) retrieveProcessingCompute(graphID string) *resources.Compu
|
||||
} else if link.Destination.ID == graphID {
|
||||
oppositeId = link.Source.ID
|
||||
}
|
||||
|
||||
|
||||
if oppositeId != "" {
|
||||
dt, res := b.OriginWorkflow.Graph.GetResource(oppositeId)
|
||||
if dt == oclib.COMPUTE_RESOURCE {
|
||||
@@ -495,10 +530,10 @@ func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) {
|
||||
// Setup admiralty for each node
|
||||
for _, peer := range b.RemotePeers {
|
||||
logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer))
|
||||
setter.InitializeAdmiralty(conf.GetConfig().PeerID,peer)
|
||||
setter.InitializeAdmiralty(conf.GetConfig().PeerID, peer)
|
||||
}
|
||||
|
||||
// Update the name of the admiralty node to use
|
||||
// Update the name of the admiralty node to use
|
||||
for _, template := range b.Workflow.Spec.Templates {
|
||||
if len(template.Metadata.Annotations) > 0 {
|
||||
if peerId, ok := template.Metadata.Annotations["multicluster.admiralty.io/clustername"]; ok {
|
||||
@@ -514,14 +549,14 @@ func (b *ArgoBuilder) CompleteBuild(executionsId string) (string, error) {
|
||||
yamlified, err := yaml.Marshal(b.Workflow)
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not transform object to yaml file")
|
||||
return "", err
|
||||
return "", err
|
||||
}
|
||||
// Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss
|
||||
current_timestamp := time.Now().Format("02_01_2006_150405")
|
||||
file_name := random_name + "_" + current_timestamp + ".yml"
|
||||
workflows_dir := "./argo_workflows/"
|
||||
err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660)
|
||||
|
||||
|
||||
if err != nil {
|
||||
logger.Error().Msg("Could not write the yaml file")
|
||||
return "", err
|
||||
|
||||
Reference in New Issue
Block a user