206 lines
8.1 KiB
Go
206 lines
8.1 KiB
Go
// A class that translates the informations held in the graph object
|
|
// via its lists of components into an argo file, using the a list of
|
|
// link ID to build the dag
|
|
|
|
package workflow_builder
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"oc-monitord/conf"
|
|
"oc-monitord/models"
|
|
. "oc-monitord/models"
|
|
"os"
|
|
"time"
|
|
|
|
oclib "cloud.o-forge.io/core/oc-lib"
|
|
"cloud.o-forge.io/core/oc-lib/logs"
|
|
"cloud.o-forge.io/core/oc-lib/models/resources"
|
|
w "cloud.o-forge.io/core/oc-lib/models/workflow"
|
|
"github.com/nwtgck/go-fakelish"
|
|
"github.com/rs/zerolog"
|
|
"gopkg.in/yaml.v3"
|
|
)
|
|
|
|
var logger zerolog.Logger
|
|
|
|
type ArgoBuilder struct {
|
|
OriginWorkflow *w.Workflow
|
|
Workflow *models.Workflow
|
|
Services []*Service
|
|
Timeout int
|
|
RemotePeers []string
|
|
}
|
|
|
|
// TODO: found on a processing instance linked to storage
|
|
// add s3, gcs, azure, etc if needed on a link between processing and storage
|
|
func (b *ArgoBuilder) CreateDAG(namespace string, write bool) (int, []string, []string, error) {
|
|
logger = logs.GetLogger()
|
|
logger.Info().Msg(fmt.Sprint("Creating DAG ", b.OriginWorkflow.Graph.Items))
|
|
// handle services by checking if there is only one processing with hostname and port
|
|
firstItems, lastItems, volumes := b.createTemplates(namespace)
|
|
b.createVolumes(volumes)
|
|
if b.Timeout > 0 {
|
|
b.Workflow.Spec.Timeout = b.Timeout
|
|
}
|
|
b.Workflow.Spec.ServiceAccountName = "sa-" + namespace
|
|
b.Workflow.Spec.Entrypoint = "dag"
|
|
b.Workflow.ApiVersion = "argoproj.io/v1alpha1"
|
|
b.Workflow.Kind = "Workflow"
|
|
return len(b.Workflow.GetDag().Tasks), firstItems, lastItems, nil
|
|
}
|
|
|
|
func (b *ArgoBuilder) createTemplates(namespace string) ([]string, []string, []models.VolumeMount) {
|
|
volumes := []models.VolumeMount{}
|
|
firstItems := []string{}
|
|
lastItems := []string{}
|
|
items := b.OriginWorkflow.GetGraphItems(b.OriginWorkflow.Graph.IsProcessing)
|
|
logger.Info().Msg(fmt.Sprint("Creating templates", len(items)))
|
|
for _, item := range items {
|
|
instance := item.Processing.GetSelectedInstance()
|
|
logger.Info().Msg(fmt.Sprint("Creating template for", item.Processing.GetName(), instance))
|
|
if instance == nil || instance.(*resources.ProcessingInstance).Access == nil && instance.(*resources.ProcessingInstance).Access.Container != nil {
|
|
logger.Error().Msg("Not enough configuration setup, template can't be created : " + item.Processing.GetName())
|
|
return firstItems, lastItems, volumes
|
|
}
|
|
volumes, firstItems, lastItems = b.createArgoTemplates(namespace,
|
|
item.ID, item.Processing, volumes, firstItems, lastItems)
|
|
}
|
|
|
|
wfDeps := models.NewWorkflowDependancies()
|
|
for _, workflowID := range b.OriginWorkflow.Workflows {
|
|
b.createWorkflowArgoTemplate(workflowID, namespace, wfDeps)
|
|
}
|
|
wfDeps.BindRelatedTasks(b.Workflow.GetDag())
|
|
wfDeps.BindFirstTasks(b.OriginWorkflow.GetDependencies, b.Workflow.GetDag())
|
|
|
|
if b.Services != nil {
|
|
dag := b.Workflow.GetDag()
|
|
dag.Tasks = append(dag.Tasks, Task{Name: "workflow-service-pod", Template: "workflow-service-pod"})
|
|
b.addServiceToArgo()
|
|
}
|
|
return firstItems, lastItems, volumes
|
|
}
|
|
|
|
func (b *ArgoBuilder) createWorkflowArgoTemplate(
|
|
workflowID string,
|
|
namespace string,
|
|
wfDeps *models.WorkflowsDependancies,
|
|
) {
|
|
realWorkflow, code, err := w.NewAccessor(nil).LoadOne(workflowID)
|
|
if code != 200 {
|
|
logger.Error().Msg("Error loading the workflow : " + err.Error())
|
|
return
|
|
}
|
|
subBuilder := ArgoBuilder{OriginWorkflow: realWorkflow.(*w.Workflow), Workflow: &models.Workflow{}, Timeout: b.Timeout}
|
|
_, fi, li, err := subBuilder.CreateDAG(namespace, false)
|
|
if err != nil {
|
|
logger.Error().Msg("Error creating the subworkflow : " + err.Error())
|
|
return
|
|
}
|
|
wfDeps.FirstWfTasks[workflowID] = fi
|
|
if depsOfIds := subBuilder.OriginWorkflow.IsDependancy(workflowID); len(depsOfIds) > 0 { // IS BEFORE
|
|
wfDeps.LastWfTasks[workflowID] = li
|
|
wfDeps.RelatedWfTasks[workflowID] = models.TransformDepsToArgo(depsOfIds)
|
|
}
|
|
subDag := subBuilder.Workflow.GetDag()
|
|
d := b.Workflow.GetDag()
|
|
d.Tasks = append(d.Tasks, subDag.Tasks...) // add the tasks of the subworkflow to the main workflow
|
|
b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, subBuilder.Workflow.Spec.Templates...)
|
|
b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, subBuilder.Workflow.Spec.Volumes...)
|
|
b.Workflow.Spec.Arguments = append(b.Workflow.Spec.Arguments, subBuilder.Workflow.Spec.Arguments...)
|
|
b.Services = append(b.Services, subBuilder.Services...)
|
|
}
|
|
|
|
func (b *ArgoBuilder) createArgoTemplates(
|
|
namespace string,
|
|
id string,
|
|
processing *resources.ProcessingResource,
|
|
volumes []models.VolumeMount,
|
|
firstItems []string,
|
|
lastItems []string,
|
|
) ([]models.VolumeMount, []string, []string) {
|
|
_, firstItems, lastItems = NewTask(processing.Name, id).BindToArgo(b.Workflow.GetDag(), id, b.OriginWorkflow, processing, firstItems, lastItems)
|
|
template := &Template{Name: models.GetArgoName(processing.GetName(), id)}
|
|
logger.Info().Msg(fmt.Sprint("Creating template for", template.Name))
|
|
|
|
template.CreateContainer(processing, b.Workflow.GetDag())
|
|
if err := b.RepartiteProcess(*processing, id, template, namespace); err != nil {
|
|
logger.Error().Msg(fmt.Sprint("Problem to sets up repartition expected %s", err.Error()))
|
|
return volumes, firstItems, lastItems
|
|
}
|
|
// get datacenter from the processing
|
|
if processing.IsService {
|
|
b.CreateService(id, processing)
|
|
template.Metadata.Labels = make(map[string]string)
|
|
template.Metadata.Labels["app"] = "oc-service-" + processing.GetName() // Construct the template for the k8s service and add a link in graph between k8s service and processing
|
|
}
|
|
b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, *template)
|
|
return volumes, firstItems, lastItems
|
|
}
|
|
|
|
func (b *ArgoBuilder) createVolumes(volumes []models.VolumeMount) { // TODO : one think about remote volume but TG
|
|
for _, volume := range volumes {
|
|
volume.BindToArgo(b.Workflow)
|
|
}
|
|
}
|
|
|
|
// Verify if a processing resource is attached to another Compute than the one hosting
|
|
// the current Open Cloud instance. If true return the peer ID to contact
|
|
func (b *ArgoBuilder) RepartiteProcess(processing resources.ProcessingResource, graphID string, template *models.Template, namespace string) error {
|
|
computeAttached := b.OriginWorkflow.GetByRelatedProcessing(processing.GetID(), b.OriginWorkflow.Graph.IsCompute)
|
|
if len(computeAttached) == 0 {
|
|
return errors.New("No compute was found attached to processing " + processing.Name + " : " + processing.UUID)
|
|
}
|
|
// Creates an accessor srtictly for Peer Collection
|
|
for _, related := range computeAttached {
|
|
res := oclib.NewRequest(oclib.LibDataEnum(oclib.PEER), "", "", nil, nil).LoadOne(related.Node.GetCreatorID())
|
|
if res.Err != "" {
|
|
return errors.New(res.Err)
|
|
}
|
|
|
|
peer := *res.ToPeer()
|
|
|
|
isNotReparted := peer.State == 1
|
|
logger.Info().Msg(fmt.Sprint("Result IsMySelf for ", peer.UUID, " : ", isNotReparted))
|
|
if !(isNotReparted) {
|
|
logger.Debug().Msg("Reparted processing, on " + peer.UUID)
|
|
b.RemotePeers = append(b.RemotePeers, peer.UUID)
|
|
template.AddAdmiraltyAnnotations(peer.UUID, namespace)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Execute the last actions once the YAML file for the Argo Workflow is created
|
|
func (b *ArgoBuilder) CompleteBuild(namespace string) (string, error) {
|
|
logger.Info().Msg("DEV :: Completing build")
|
|
setter := AdmiraltySetter{Id: namespace}
|
|
// Setup admiralty for each node
|
|
for _, peer := range b.RemotePeers {
|
|
logger.Info().Msg(fmt.Sprint("DEV :: Launching Admiralty Setup for ", peer))
|
|
setter.InitializeAdmiralty(conf.GetConfig().PeerID, peer)
|
|
}
|
|
// Generate the YAML file
|
|
random_name := fakelish.GenerateFakeWord(5, 8) + "-" + fakelish.GenerateFakeWord(5, 8)
|
|
b.Workflow.Metadata.Name = "oc-monitor-" + random_name
|
|
logger = oclib.GetLogger()
|
|
yamlified, err := yaml.Marshal(b.Workflow)
|
|
if err != nil {
|
|
logger.Error().Msg("Could not transform object to yaml file")
|
|
return "", err
|
|
}
|
|
// Give a unique name to each argo file with its timestamp DD:MM:YYYY_hhmmss
|
|
current_timestamp := time.Now().Format("02_01_2006_150405")
|
|
file_name := random_name + "_" + current_timestamp + ".yml"
|
|
workflows_dir := "./argo_workflows/"
|
|
err = os.WriteFile(workflows_dir+file_name, []byte(yamlified), 0660)
|
|
|
|
if err != nil {
|
|
logger.Error().Msg("Could not write the yaml file")
|
|
return "", err
|
|
}
|
|
|
|
return workflows_dir + file_name, nil
|
|
}
|