Adapted some of the steps of the executeInside()'s method to work with the updated Admiralty environment, using execution id as namespace, serviceAccount naming convention and adding the serviceAccount in the workflow's YAML. Logging not working yet.
This commit is contained in:
parent
9aefa18ea8
commit
e2ceb6e58d
12
README.md
12
README.md
@ -48,11 +48,17 @@ In rules add a new entry :
|
|||||||
This command **must return "yes"**
|
This command **must return "yes"**
|
||||||
|
|
||||||
|
|
||||||
|
# Notes features/admiralty-docker
|
||||||
|
|
||||||
|
- When executing monitord as a container we need to change any url with "localhost" to the container's host IP.
|
||||||
|
|
||||||
|
We can :
|
||||||
|
- declare a new parameter 'HOST_IP'
|
||||||
|
- decide that no peer can have "http://localhost" as its url and use an attribute from the peer object or isMyself() from oc-lib if a peer is the current host.
|
||||||
|
|
||||||
|
|
||||||
## TODO
|
## TODO
|
||||||
|
|
||||||
- [ ] Logs the output of each pods :
|
|
||||||
- logsPods() function already exists
|
|
||||||
- need to implement the logic to create each pod's logger and start the monitoring routing
|
|
||||||
- [ ] Allow the front to known on which IP the service are reachable
|
- [ ] Allow the front to known on which IP the service are reachable
|
||||||
- currently doing it by using `kubectl get nodes -o wide`
|
- currently doing it by using `kubectl get nodes -o wide`
|
||||||
|
|
||||||
|
@ -18,6 +18,7 @@ type Config struct {
|
|||||||
KubeCA string
|
KubeCA string
|
||||||
KubeCert string
|
KubeCert string
|
||||||
KubeData string
|
KubeData string
|
||||||
|
ArgoHost string // when executed in a container will replace addresses with "localhost" in their url
|
||||||
}
|
}
|
||||||
|
|
||||||
var instance *Config
|
var instance *Config
|
||||||
|
13
main.go
13
main.go
@ -96,12 +96,12 @@ func main() {
|
|||||||
logger.Error().Msg(err.Error())
|
logger.Error().Msg(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
argo_file_path, err := builder.CompleteBuild(exec.ExecutionsID)
|
argoFilePath, err := builder.CompleteBuild(exec.ExecutionsID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error().Msg(err.Error())
|
logger.Error().Msg(err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
workflowName = getContainerName(argo_file_path)
|
workflowName = getContainerName(argoFilePath)
|
||||||
|
|
||||||
wf_logger = logger.With().Str("argo_name", workflowName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger()
|
wf_logger = logger.With().Str("argo_name", workflowName).Str("workflow_id", conf.GetConfig().WorkflowID).Str("workflow_execution_id", conf.GetConfig().ExecutionID).Logger()
|
||||||
wf_logger.Debug().Msg("Testing argo name")
|
wf_logger.Debug().Msg("Testing argo name")
|
||||||
@ -110,11 +110,12 @@ func main() {
|
|||||||
if conf.GetConfig().KubeHost == "" {
|
if conf.GetConfig().KubeHost == "" {
|
||||||
// Not in a k8s environment, get conf from parameters
|
// Not in a k8s environment, get conf from parameters
|
||||||
fmt.Println("Executes outside of k8s")
|
fmt.Println("Executes outside of k8s")
|
||||||
executeOutside(argo_file_path, stepMax, builder.Workflow)
|
executeOutside(argoFilePath, stepMax, builder.Workflow)
|
||||||
} else {
|
} else {
|
||||||
// Executed in a k8s environment
|
// Executed in a k8s environment
|
||||||
fmt.Println("Executes inside a k8s")
|
fmt.Println("Executes inside a k8s")
|
||||||
executeInside(exec.GetID(), "argo", argo_file_path, stepMax)
|
// executeInside(exec.GetID(), "argo", argo_file_path, stepMax) // commenting to use conf.ExecutionID instead of exec.GetID()
|
||||||
|
executeInside(conf.GetConfig().ExecutionID, conf.GetConfig().ExecutionID, argoFilePath, stepMax)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -294,6 +295,8 @@ func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) {
|
|||||||
host := parser.String("H", "host", &argparse.Options{Required: false, Default: "", Help: "Host for the Kubernetes cluster"})
|
host := parser.String("H", "host", &argparse.Options{Required: false, Default: "", Help: "Host for the Kubernetes cluster"})
|
||||||
port := parser.String("P", "port", &argparse.Options{Required: false, Default: "6443", Help: "Port for the Kubernetes cluster"})
|
port := parser.String("P", "port", &argparse.Options{Required: false, Default: "6443", Help: "Port for the Kubernetes cluster"})
|
||||||
|
|
||||||
|
// argoHost := parser.String("h", "argoHost", &argparse.Options{Required: false, Default: "", Help: "Host where Argo is running from"}) // can't use -h because its reserved to help
|
||||||
|
|
||||||
err := parser.Parse(os.Args)
|
err := parser.Parse(os.Args)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(parser.Usage(err))
|
fmt.Println(parser.Usage(err))
|
||||||
@ -311,6 +314,8 @@ func setConf(is_k8s bool, o *onion.Onion, parser *argparse.Parser) {
|
|||||||
conf.GetConfig().KubeHost = *host
|
conf.GetConfig().KubeHost = *host
|
||||||
conf.GetConfig().KubePort = *port
|
conf.GetConfig().KubePort = *port
|
||||||
|
|
||||||
|
// conf.GetConfig().ArgoHost = *argoHost
|
||||||
|
|
||||||
decoded, err := base64.StdEncoding.DecodeString(*ca)
|
decoded, err := base64.StdEncoding.DecodeString(*ca)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
conf.GetConfig().KubeCA = string(decoded)
|
conf.GetConfig().KubeCA = string(decoded)
|
||||||
|
@ -52,6 +52,7 @@ func NewKubernetesTool() (Tool, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.New("Error creating Kubernetes versionned client: " + err.Error())
|
return nil, errors.New("Error creating Kubernetes versionned client: " + err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
return &KubernetesTools{
|
return &KubernetesTools{
|
||||||
Set: clientset,
|
Set: clientset,
|
||||||
VersionedSet: clientset2,
|
VersionedSet: clientset2,
|
||||||
@ -149,7 +150,7 @@ func (k *KubernetesTools) CreateArgoWorkflow(path string, ns string) (string, er
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return "", errors.New("failed to create workflow: " + err.Error())
|
return "", errors.New("failed to create workflow: " + err.Error())
|
||||||
}
|
}
|
||||||
fmt.Printf("workflow %s created in namespace %s\n", createdWf.Name, "argo")
|
fmt.Printf("workflow %s created in namespace %s\n", createdWf.Name, ns)
|
||||||
return createdWf.Name, nil
|
return createdWf.Name, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,6 +53,7 @@ func (b *Workflow) getDag() *Dag {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Spec struct {
|
type Spec struct {
|
||||||
|
ServiceAccountName string `yaml:"serviceAccountName"`
|
||||||
Entrypoint string `yaml:"entrypoint"`
|
Entrypoint string `yaml:"entrypoint"`
|
||||||
Arguments []Parameter `yaml:"arguments,omitempty"`
|
Arguments []Parameter `yaml:"arguments,omitempty"`
|
||||||
Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"`
|
Volumes []VolumeClaimTemplate `yaml:"volumeClaimTemplates,omitempty"`
|
||||||
@ -72,6 +73,7 @@ func (b *ArgoBuilder) CreateDAG(namespace string, write bool) ( int, []string, [
|
|||||||
if b.Timeout > 0 {
|
if b.Timeout > 0 {
|
||||||
b.Workflow.Spec.Timeout = b.Timeout
|
b.Workflow.Spec.Timeout = b.Timeout
|
||||||
}
|
}
|
||||||
|
b.Workflow.Spec.ServiceAccountName = "sa-"+namespace
|
||||||
b.Workflow.Spec.Entrypoint = "dag"
|
b.Workflow.Spec.Entrypoint = "dag"
|
||||||
b.Workflow.ApiVersion = "argoproj.io/v1alpha1"
|
b.Workflow.ApiVersion = "argoproj.io/v1alpha1"
|
||||||
b.Workflow.Kind = "Workflow"
|
b.Workflow.Kind = "Workflow"
|
||||||
|
Loading…
Reference in New Issue
Block a user