Workin' Argo struct

This commit is contained in:
mr
2025-02-05 08:36:26 +01:00
parent a1d28f2563
commit a0b1117075
24 changed files with 459 additions and 119 deletions

View File

@@ -8,14 +8,13 @@ import (
"fmt"
. "oc-monitord/models"
"os"
"regexp"
"strings"
"time"
oclib "cloud.o-forge.io/core/oc-lib"
"cloud.o-forge.io/core/oc-lib/models/common/enum"
"cloud.o-forge.io/core/oc-lib/models/resources"
w "cloud.o-forge.io/core/oc-lib/models/workflow"
"cloud.o-forge.io/core/oc-lib/models/workflow/graph"
"github.com/nwtgck/go-fakelish"
"github.com/rs/zerolog"
"gopkg.in/yaml.v3"
@@ -39,14 +38,6 @@ type Workflow struct {
Spec Spec `yaml:"spec,omitempty"`
}
func (b *Workflow) setDag(dag *Dag) {
for _, t := range b.Spec.Templates {
if t.Name == "dag" {
t.Dag = dag
}
}
}
func (b *Workflow) getDag() *Dag {
for _, t := range b.Spec.Templates {
if t.Name == "dag" {
@@ -65,8 +56,10 @@ type Spec struct {
Timeout int `yaml:"activeDeadlineSeconds,omitempty"`
}
// TODO: found on a processing instance linked to storage
// add s3, gcs, azure, etc if needed on a link between processing and storage
func (b *ArgoBuilder) CreateDAG(write bool) (string, int, []string, []string, error) {
fmt.Println("Creating DAG", b.OriginWorkflow.Graph.Items)
// handle services by checking if there is only one processing with hostname and port
firstItems, lastItems, volumes := b.createTemplates()
b.createVolumes(volumes)
@@ -104,14 +97,17 @@ func (b *ArgoBuilder) createTemplates() ([]string, []string, []VolumeMount) {
volumes := []VolumeMount{}
firstItems := []string{}
lastItems := []string{}
for _, comp := range b.OriginWorkflow.ProcessingResources {
if comp.Container != nil {
volumes, firstItems, lastItems = b.createArgoTemplates(
comp.UUID, comp, volumes, firstItems, lastItems)
} else {
logger.Error().Msg("Not enough configuration setup, template can't be created : " + comp.GetName())
items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing)
fmt.Println("Creating templates", len(items))
for _, item := range b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing) {
instance := item.Processing.GetSelectedInstance()
fmt.Println("Creating template for", item.Processing.GetName(), instance)
if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil {
logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName())
return firstItems, lastItems, volumes
}
volumes, firstItems, lastItems = b.createArgoTemplates(
item.ID, item.Processing, volumes, firstItems, lastItems)
}
firstWfTasks := map[string][]string{}
latestWfTasks := map[string][]string{}
@@ -187,6 +183,7 @@ func (b *ArgoBuilder) createArgoTemplates(id string,
lastItems []string) ([]VolumeMount, []string, []string) {
_, firstItems, lastItems = b.addTaskToArgo(b.Workflow.getDag(), id, processing, firstItems, lastItems)
template := &Template{Name: getArgoName(processing.GetName(), id)}
fmt.Println("Creating template for", template.Name)
template.CreateContainer(processing, b.Workflow.getDag())
// get datacenter from the processing
if processing.IsService {
@@ -194,17 +191,39 @@ func (b *ArgoBuilder) createArgoTemplates(id string,
template.Metadata.Labels = make(map[string]string)
template.Metadata.Labels["app"] = "oc-service-" + processing.GetName() // Construct the template for the k8s service and add a link in graph between k8s service and processing
}
storages := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.IsStorage)
for _, storage := range storages {
if storage.(*resources.StorageResource).SelectedInstanceIndex < 0 {
continue
related := b.OriginWorkflow.GetByRelatedProcessing(id, b.OriginWorkflow.Graph.IsStorage)
for _, r := range related {
storage := r.Node.(*resources.StorageResource)
for _, linkToStorage := range r.Links {
for _, rw := range linkToStorage.StorageLinkInfos {
art := Artifact{Path: template.ReplacePerEnv(rw.Source, linkToStorage.Env)}
if rw.Write {
art.Name = storage.GetName() + "-" + rw.Destination + "-input-write"
} else {
art.Name = storage.GetName() + "-" + rw.Destination + "-input-read"
}
if storage.StorageType == enum.S3 {
art.S3 = &Key{
Key: template.ReplacePerEnv(rw.Destination+"/"+rw.FileName, linkToStorage.Env),
}
}
if rw.Write {
template.Outputs.Artifacts = append(template.Inputs.Artifacts, art)
} else {
template.Inputs.Artifacts = append(template.Outputs.Artifacts, art)
}
}
}
s := storage.(*resources.StorageResource).Instances[storage.(*resources.StorageResource).SelectedInstanceIndex]
index := 0
if storage.SelectedInstanceIndex != nil && (*storage.SelectedInstanceIndex) >= 0 {
index = *storage.SelectedInstanceIndex
}
s := storage.Instances[index]
if s.Local {
volumes = template.Container.AddVolumeMount(VolumeMount{
Name: strings.ReplaceAll(strings.ToLower(storage.GetName()), " ", "-"),
MountPath: s.Source,
Storage: storage.(*resources.StorageResource),
Storage: storage,
}, volumes)
}
}
@@ -215,11 +234,24 @@ func (b *ArgoBuilder) addTaskToArgo(dag *Dag, graphItemID string, processing *re
firstItems []string, lastItems []string) (*Dag, []string, []string) {
unique_name := getArgoName(processing.GetName(), graphItemID)
step := Task{Name: unique_name, Template: unique_name}
if processing.Container != nil {
for name, value := range processing.Container.Env {
instance := processing.GetSelectedInstance()
if instance != nil {
for _, value := range instance.(*resources.ProcessingInstance).Env {
step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{
Name: name,
Value: b.affectVariableEnv(value, b.OriginWorkflow.Graph),
Name: value.Name,
Value: value.Value,
})
}
for _, value := range instance.(*resources.ProcessingInstance).Inputs {
step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{
Name: value.Name,
Value: value.Value,
})
}
for _, value := range instance.(*resources.ProcessingInstance).Outputs {
step.Arguments.Parameters = append(step.Arguments.Parameters, Parameter{
Name: value.Name,
Value: value.Value,
})
}
}
@@ -241,32 +273,13 @@ func (b *ArgoBuilder) addTaskToArgo(dag *Dag, graphItemID string, processing *re
return dag, firstItems, lastItems
}
func (b *ArgoBuilder) affectVariableEnv(envVar string, graph *graph.Graph) string {
var myExp = regexp.MustCompile(`(\{\{.*\}\})`) // regex to find all the variables in the command
matches := myExp.FindAllString(envVar, -1) // find all the variables in the command
for _, match := range matches { // for each variable in the command
splitted := strings.Split( // split the variable to get the inout and the vars only
strings.ReplaceAll(strings.ReplaceAll(strings.ReplaceAll(match, "{{", ""), "}}", ""), " ", ""), "_")
if len(splitted) < 3 { // if the variable is not well formatted, we skip it
logger.Error().Msgf("The variable %v is not well formatted", match)
continue
}
graphItemID := splitted[1] // graphitemid is the id of the object
vars := splitted[2] // vars is the name of the variable of the object
_, obj := graph.GetResource(graphItemID)
if obj != nil {
envVar = strings.ReplaceAll(envVar, match, fmt.Sprintf("%v", obj.Serialize(obj)[vars]))
}
}
return envVar
}
func (b *ArgoBuilder) createVolumes(volumes []VolumeMount) { // TODO : one think about remote volume but TG
for _, volume := range volumes {
if volume.Storage.SelectedInstanceIndex < 0 {
continue
index := 0
if volume.Storage.SelectedInstanceIndex != nil && (*volume.Storage.SelectedInstanceIndex) >= 0 {
index = *volume.Storage.SelectedInstanceIndex
}
storage := volume.Storage.Instances[volume.Storage.SelectedInstanceIndex]
storage := volume.Storage.Instances[index]
new_volume := VolumeClaimTemplate{}
new_volume.Metadata.Name = strings.ReplaceAll(strings.ToLower(volume.Name), " ", "-")
new_volume.Spec.AccessModes = []string{"ReadWriteOnce"}
@@ -295,6 +308,10 @@ func (b *ArgoBuilder) isArgoDependancy(id string) (bool, []string) {
func (b *ArgoBuilder) getArgoDependencies(id string) (dependencies []string) {
for _, link := range b.OriginWorkflow.Graph.Links {
if _, ok := b.OriginWorkflow.Graph.Items[link.Source.ID]; !ok {
fmt.Println("Could not find the source of the link", link.Source.ID)
continue
}
source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing
if id == link.Destination.ID && source != nil {
dependency_name := getArgoName(source.GetName(), link.Source.ID)

View File

@@ -29,15 +29,18 @@ func (b *ArgoBuilder) CreateService(id string, processing *resources.ProcessingR
}
func (b *ArgoBuilder) completeServicePorts(service *models.Service, id string, processing *resources.ProcessingResource) {
for _, execute := range processing.Container.Exposes {
if execute.PAT != 0 {
new_port_translation := models.ServicePort{
Name: strings.ToLower(processing.Name) + id,
Port: execute.Port,
TargetPort: execute.PAT,
Protocol: "TCP",
instance := processing.GetSelectedInstance()
if instance != nil && instance.(*resources.ProcessingInstance).Access != nil && instance.(*resources.ProcessingInstance).Access.Container != nil {
for _, execute := range instance.(*resources.ProcessingInstance).Access.Container.Exposes {
if execute.PAT != 0 {
new_port_translation := models.ServicePort{
Name: strings.ToLower(processing.Name) + id,
Port: execute.Port,
TargetPort: execute.PAT,
Protocol: "TCP",
}
service.Spec.Ports = append(service.Spec.Ports, new_port_translation)
}
service.Spec.Ports = append(service.Spec.Ports, new_port_translation)
}
}
}
@@ -46,7 +49,6 @@ func (b *ArgoBuilder) addServiceToArgo() error {
for _, service := range b.Services {
service_manifest, err := yaml.Marshal(service)
if err != nil {
logger.Error().Msg("Could not marshal service manifest : " + err.Error())
return err
}
service_template := models.Template{Name: "workflow-service-pod",

View File

@@ -13,20 +13,21 @@ type WorflowDB struct {
}
// Create the obj!ects from the mxgraphxml stored in the workflow given as a parameter
func (w *WorflowDB) LoadFrom(workflow_id string) error {
func (w *WorflowDB) LoadFrom(workflow_id string, peerID string) error {
fmt.Println("Loading workflow from " + workflow_id)
var err error
if w.Workflow, err = w.getWorkflow(workflow_id); err != nil {
if w.Workflow, err = w.getWorkflow(workflow_id, peerID); err != nil {
return err
}
return nil
}
// Use oclib to retrieve the graph contained in the workflow referenced
func (w *WorflowDB) getWorkflow(workflow_id string) (workflow *workflow.Workflow, err error) {
func (w *WorflowDB) getWorkflow(workflow_id string, peerID string) (workflow *workflow.Workflow, err error) {
logger := oclib.GetLogger()
lib_data := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", "", []string{}, nil).LoadOne(workflow_id)
fmt.Println(lib_data.Code, lib_data.Err)
lib_data := oclib.NewRequest(oclib.LibDataEnum(oclib.WORKFLOW), "", peerID, []string{}, nil).LoadOne(workflow_id)
fmt.Println("ERR", lib_data.Code, lib_data.Err)
if lib_data.Code != 200 {
logger.Error().Msg("Error loading the graph")
return workflow, errors.New(lib_data.Err)
@@ -42,7 +43,7 @@ func (w *WorflowDB) getWorkflow(workflow_id string) (workflow *workflow.Workflow
func (w *WorflowDB) ExportToArgo(timeout int) (string, int, error) {
logger := oclib.GetLogger()
fmt.Println("Exporting to Argo", w.Workflow)
if len(w.Workflow.Name) == 0 || w.Workflow.Graph == nil {
return "", 0, fmt.Errorf("can't export a graph that has not been loaded yet")
}

View File

@@ -4,7 +4,7 @@ import (
"testing"
)
func TestGetGraph(t *testing.T){
func TestGetGraph(t *testing.T) {
w := WorflowDB{}
w.LoadFrom("test-log")
w.LoadFrom("test-log", "")
}