diff --git a/main.go b/main.go index 3ef3269..5100059 100644 --- a/main.go +++ b/main.go @@ -47,8 +47,6 @@ const localConfigFile = "./conf/local_ocmonitord_conf.json" func main() { - os.Setenv("test_service","true") // Only for service demo, delete before merging on main - monitorLocal = false // Test if monitor is launched outside (with parameters) or in a k8s environment (env variables sets) if os.Getenv("KUBERNETES_SERVICE_HOST") == "" { diff --git a/models/services.go b/models/services.go index ba0e10a..e525545 100644 --- a/models/services.go +++ b/models/services.go @@ -17,7 +17,7 @@ type Service struct { } type Metadata struct { - Name string `yaml:"name"` + Name string `yaml:"generateName"` } diff --git a/workflow_builder/argo_builder.go b/workflow_builder/argo_builder.go index 0976e5d..6156b42 100644 --- a/workflow_builder/argo_builder.go +++ b/workflow_builder/argo_builder.go @@ -5,9 +5,11 @@ package workflow_builder import ( + "oc-monitord/models" . "oc-monitord/models" "os" "slices" + "strconv" "strings" "time" @@ -25,7 +27,7 @@ var logger zerolog.Logger type ArgoBuilder struct { OriginWorkflow w.Workflow Workflow Workflow - Services *Service + Services []Service Timeout int } @@ -49,10 +51,7 @@ type Spec struct { func (b *ArgoBuilder) CreateDAG() (string, error) { // handle services by checking if there is only one processing with hostname and port - - b.createNginxVolumes() - - + b.createTemplates() b.createDAGstep() b.createVolumes() @@ -91,6 +90,7 @@ func (b *ArgoBuilder) createTemplates() { var command string var args string var env string + var serv models.Service comp_res := comp.Processing @@ -114,24 +114,22 @@ func (b *ArgoBuilder) createTemplates() { new_temp := Template{Name: argo_name, Container: temp_container} new_temp.Inputs.Parameters = inputs_container new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "workdir", MountPath: "/mnt/vol"}) // TODO : replace this with a search of the storage / data source name - new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "nginx-demo", MountPath: "/usr/share/nginx"}) // Used for processing services' demo with nginx if (b.isService(comp.ID)){ - serv := b.CreateService(comp) - b.createService(serv, argo_name, comp.ID) + serv = b.CreateService(comp) + b.addServiceToWorkflow(serv, argo_name, comp.ID) new_temp.Metadata.Labels = make(map[string]string) - new_temp.Metadata.Labels["app"] = "oc-service" // Construct the template for the k8s service and add a link in graph between k8s service and processing + new_temp.Metadata.Labels["app"] = serv.Spec.Selector["app"] // Construct the template for the k8s service and add a link in graph between k8s service and processing + b.addServiceToArgo(serv) // if err != nil { // // TODO // } } b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, new_temp) + } - if b.Services != nil { - b.addServiceToArgo() - } } @@ -153,8 +151,9 @@ func (b *ArgoBuilder) createDAGstep() { new_dag.Tasks = append(new_dag.Tasks, step) } - if b.Services != nil { - new_dag.Tasks = append(new_dag.Tasks, Task{Name:"workflow-service-pod", Template: "workflow-service-pod"}) + for i, _ := range b.Services { + name := "workflow-service-pod-"+strconv.Itoa(i + 1) + new_dag.Tasks = append(new_dag.Tasks, Task{Name: name , Template: name}) } b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, Template{Name: "dag", Dag: new_dag}) @@ -170,15 +169,6 @@ func (b *ArgoBuilder) createVolumes() { b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) } -// For demo purposes, until we implement the use of storage ressources -func (b *ArgoBuilder) createNginxVolumes() { - new_volume := VolumeClaimTemplate{} - new_volume.Metadata.Name = "nginx-demo" - new_volume.Spec.AccessModes = []string{"ReadWriteOnce"} - new_volume.Spec.Resources.Requests.Storage = "1Gi" - - b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume) -} func (b *ArgoBuilder) getDependency(current_computing_id string) (dependencies []string) { @@ -320,13 +310,4 @@ func (b *ArgoBuilder) isService(id string) bool{ } -func (b *ArgoBuilder) addLabel(name string, id string) { - argo_name := getArgoName(name,id) - for _, template := range b.Workflow.Spec.Templates{ - if template.Name == argo_name{ - template.Metadata.Labels["app"] = "service-workflow" - return - } - } -} diff --git a/workflow_builder/argo_services.go b/workflow_builder/argo_services.go index a990deb..40bd122 100644 --- a/workflow_builder/argo_services.go +++ b/workflow_builder/argo_services.go @@ -7,6 +7,7 @@ import ( "cloud.o-forge.io/core/oc-lib/models/resource_model" "cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" + "github.com/nwtgck/go-fakelish" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "gopkg.in/yaml.v3" @@ -60,10 +61,10 @@ func (b *ArgoBuilder) CreateService(processing graph.GraphItem) models.Service{ new_service := models.Service{APIVersion: "v1", Kind: "Service", Metadata: models.Metadata{ - Name: "workflow-service" , + Name: "workflow-service-" , }, Spec: models.ServiceSpec{ - Selector: map[string]string{"app": "oc-service"}, + Selector: map[string]string{"app": "service-" + fakelish.GenerateFakeWord(5, 8)}, Ports: []models.ServicePort{ }, Type: "NodePort", @@ -115,25 +116,28 @@ func completeServicePorts(service *models.Service, processing graph.GraphItem) { return } -func (b *ArgoBuilder) createService(service models.Service, processing_name string, processing_id string) { - if b.Services != nil{ - b.Services.Spec.Ports = append(b.Services.Spec.Ports, service.Spec.Ports...) - }else { - b.Services = &service - } - - b.addLabel(processing_name,processing_id) +// The k8s service passed as the parameter only expose one port because it is the result of CreateService() +// we check if this port is already exposed by a service in the workflow and we proceed to the creation of a new service OR +// add the port to the list of port exposed by an existing if portAlreadyExposed is false +func (b *ArgoBuilder) addServiceToWorkflow(service models.Service, processing_name string, processing_id string) (label string) { + if exposed, service_available_port := b.portAlreadyExposed(service.Spec.Ports[0].TargetPort); exposed && service_available_port != nil{ + // The port you want to expose is already exposed by all the existing services + service_available_port.Spec.Ports = append(service_available_port.Spec.Ports, service.Spec.Ports...) + return service_available_port.Spec.Selector["app"] + } + b.Services = append(b.Services, service) + return service.Spec.Selector["app"] } -func (b *ArgoBuilder) addServiceToArgo() error { - service_manifest, err := yaml.Marshal(b.Services) +func (b *ArgoBuilder) addServiceToArgo(service models.Service) error { + service_manifest, err := yaml.Marshal(service) if err != nil { logger.Error().Msg("Could not marshal service manifest") return err } - service_template := models.Template{Name: "workflow-service-pod", + service_template := models.Template{Name: "workflow-service-pod-" + strconv.Itoa(len(b.Services)), Resource: models.ServiceResource{ Action: "create", SuccessCondition: "status.succeeded > 0", @@ -145,4 +149,33 @@ func (b *ArgoBuilder) addServiceToArgo() error { b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, service_template) return nil +} + + +func (b *ArgoBuilder) addLabel(name string, id string) { + argo_name := getArgoName(name,id) + for _, template := range b.Workflow.Spec.Templates{ + if template.Name == argo_name{ + template.Metadata.Labels["app"] = "service-workflow" + return + } + } +} + +func (b *ArgoBuilder) portAlreadyExposed(port int64) (exposed bool, service *models.Service ){ + // For all already existing k8s services, test if the port in parameter is already exposed and returns the first service that doesn't yet expose this port + for _, s := range b.Services { + i := 0 + port_exposed := false + for !port_exposed { + if s.Spec.Ports[i].TargetPort == port { + port_exposed = true + } + } + if !port_exposed { + return false, &s + } + } + + return true, nil } \ No newline at end of file