Compare commits
1 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
411effb000 |
@ -125,7 +125,7 @@ func LogKubernetesArgo(wfName string, namespace string, watcher watch.Interface)
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if len(wf.Status.Nodes) == 0 {
|
if len(wf.Status.Nodes) == 0 {
|
||||||
wfl.Info().Msg("No node status yet") // The first output of the channel doesn't contain Nodes so we skip it
|
wfl.Debug().Msg("No node status yet") // The first output of the channel doesn't contain Nodes so we skip it
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
7
main.go
7
main.go
@ -67,14 +67,14 @@ func main() {
|
|||||||
logger = u.GetLogger()
|
logger = u.GetLogger()
|
||||||
|
|
||||||
logger.Debug().Msg("Loki URL : " + conf.GetConfig().LokiURL)
|
logger.Debug().Msg("Loki URL : " + conf.GetConfig().LokiURL)
|
||||||
logger.Info().Msg("Workflow executed : " + conf.GetConfig().ExecutionID)
|
logger.Debug().Msg("Workflow executed : " + conf.GetConfig().ExecutionID)
|
||||||
exec := u.GetExecution(conf.GetConfig().ExecutionID)
|
exec := u.GetExecution(conf.GetConfig().ExecutionID)
|
||||||
if exec == nil {
|
if exec == nil {
|
||||||
logger.Fatal().Msg("Could not retrieve workflow ID from execution ID " + conf.GetConfig().ExecutionID + " on peer " + conf.GetConfig().PeerID)
|
logger.Fatal().Msg("Could not retrieve workflow ID from execution ID " + conf.GetConfig().ExecutionID + " on peer " + conf.GetConfig().PeerID)
|
||||||
}
|
}
|
||||||
conf.GetConfig().WorkflowID = exec.WorkflowID
|
conf.GetConfig().WorkflowID = exec.WorkflowID
|
||||||
|
|
||||||
logger.Info().Msg("Starting construction of yaml argo for workflow :" + exec.WorkflowID)
|
logger.Debug().Msg("Starting construction of yaml argo for workflow :" + exec.WorkflowID)
|
||||||
|
|
||||||
if _, err := os.Stat("./argo_workflows/"); os.IsNotExist(err) {
|
if _, err := os.Stat("./argo_workflows/"); os.IsNotExist(err) {
|
||||||
os.Mkdir("./argo_workflows/", 0755)
|
os.Mkdir("./argo_workflows/", 0755)
|
||||||
@ -102,6 +102,9 @@ func main() {
|
|||||||
|
|
||||||
workflowName = getContainerName(argoFilePath)
|
workflowName = getContainerName(argoFilePath)
|
||||||
|
|
||||||
|
wf_logger := u.GetWFLogger(workflowName)
|
||||||
|
wf_logger.Debug().Msg("Testing argo name")
|
||||||
|
|
||||||
if conf.GetConfig().KubeHost == "" {
|
if conf.GetConfig().KubeHost == "" {
|
||||||
// Not in a k8s environment, get conf from parameters
|
// Not in a k8s environment, get conf from parameters
|
||||||
logger.Info().Msg("Executes outside of k8s")
|
logger.Info().Msg("Executes outside of k8s")
|
||||||
|
@ -116,6 +116,8 @@ func (k *KubernetesTools) CreateAccessSecret(ns string, login string, password s
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch.Interface, error){
|
func (k *KubernetesTools) GetArgoWatch(executionId string, wfName string) (watch.Interface, error){
|
||||||
|
wfl := utils.GetWFLogger("")
|
||||||
|
wfl.Debug().Msg("Starting argo watch with argo lib")
|
||||||
options := metav1.ListOptions{FieldSelector: "metadata.name=oc-monitor-"+wfName}
|
options := metav1.ListOptions{FieldSelector: "metadata.name=oc-monitor-"+wfName}
|
||||||
|
|
||||||
watcher, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(executionId).Watch(context.Background(), options)
|
watcher, err := k.VersionedSet.ArgoprojV1alpha1().Workflows(executionId).Watch(context.Background(), options)
|
||||||
|
@ -59,6 +59,9 @@ type Spec struct {
|
|||||||
Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"`
|
Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"`
|
||||||
Templates []Template `yaml:"templates"`
|
Templates []Template `yaml:"templates"`
|
||||||
Timeout int `yaml:"activeDeadlineSeconds,omitempty"`
|
Timeout int `yaml:"activeDeadlineSeconds,omitempty"`
|
||||||
|
NodeSelector struct{
|
||||||
|
NodeRole string `yaml:"node-role"`
|
||||||
|
} `yaml:"nodeSelector"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: found on a processing instance linked to storage
|
// TODO: found on a processing instance linked to storage
|
||||||
@ -73,6 +76,7 @@ func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, [
|
|||||||
if b.Timeout > 0 {
|
if b.Timeout > 0 {
|
||||||
b.Workflow.Spec.Timeout = b.Timeout
|
b.Workflow.Spec.Timeout = b.Timeout
|
||||||
}
|
}
|
||||||
|
b.Workflow.Spec.NodeSelector.NodeRole = "worker"
|
||||||
b.Workflow.Spec.ServiceAccountName = "sa-"+namespace
|
b.Workflow.Spec.ServiceAccountName = "sa-"+namespace
|
||||||
b.Workflow.Spec.Entrypoint = "dag"
|
b.Workflow.Spec.Entrypoint = "dag"
|
||||||
b.Workflow.ApiVersion = "argoproj.io/v1alpha1"
|
b.Workflow.ApiVersion = "argoproj.io/v1alpha1"
|
||||||
|
Loading…
Reference in New Issue
Block a user