package workflow_builder import ( "oc-monitord/models" "strconv" "strings" "cloud.o-forge.io/core/oc-lib/models/resource_model" "cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" "github.com/nwtgck/go-fakelish" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/bson/primitive" "gopkg.in/yaml.v3" ) // TODO : refactor this method or the deserialization process in oc-lib to get rid of the mongo code func getExposeContract(expose resource_model.Model) map[string]map[string]string { contract := make(map[string]map[string]string,0) mapped_info := bson.M{} // var contract PortTranslation _ , byt, _ := bson.MarshalValue(expose.Value) bson.Unmarshal(byt,&mapped_info) for _,v := range mapped_info { port := v.(primitive.M)["Key"].(string) // exposed_port := map[string]interface{}{data["Key"] : ""} port_translation := v.(primitive.M)["Value"] contract[port] = map[string]string{} for _,v2 := range port_translation.(primitive.A) { if v2.(primitive.M)["Key"] == "reverse" { contract[port]["reverse"] = v2.(primitive.M)["Value"].(string) } if v2.(primitive.M)["Key"] == "PAT" { contract[port]["PAT"] = v2.(primitive.M)["Value"].(string) } } } return contract } func (b *ArgoBuilder) CreateService(processing graph.GraphItem) models.Service{ // model { // Type : "dict", // Value : { // "80" : { // "reverse" : "", // "PAT" : "34000" // }, // "344" : { // "reverse" : "", // "PAT" : "34400" // } // } // } new_service := models.Service{APIVersion: "v1", Kind: "Service", Metadata: models.Metadata{ Name: "workflow-service-" , }, Spec: models.ServiceSpec{ Selector: map[string]string{"app": "service-" + fakelish.GenerateFakeWord(5, 8)}, Ports: []models.ServicePort{ }, Type: "NodePort", }, } completeServicePorts(&new_service, processing) yamlified, _ := yaml.Marshal(new_service) x := string(yamlified) _ = x return new_service } func completeServicePorts(service *models.Service, processing graph.GraphItem) { contract := getExposeContract(processing.Processing.ResourceModel.Model["expose"]) for str_port,translation_dict := range contract{ port, err := strconv.ParseInt(str_port, 10, 64) if err != nil { logger.Error().Msg("Could not convert " + str_port + "to an int") return } if _, ok := translation_dict["PAT"]; ok{ port_translation, err := strconv.ParseInt(translation_dict["PAT"], 10, 64) if err != nil { logger.Error().Msg("Could not convert " + translation_dict["PAT"] + "to an int") return } new_port_translation := models.ServicePort{ Name: strings.ToLower(processing.Processing.Name) + processing.ID, Port: port_translation-30000, TargetPort: port, NodePort: port_translation, Protocol: "TCP", } service.Spec.Ports = append(service.Spec.Ports, new_port_translation) } } return } // The k8s service passed as the parameter only expose one port because it is the result of CreateService() // we check if this port is already exposed by a service in the workflow and we proceed to the creation of a new service OR // add the port to the list of port exposed by an existing if portAlreadyExposed is false func (b *ArgoBuilder) addServiceToWorkflow(service models.Service, processing_name string, processing_id string) (label string) { if exposed, service_available_port := b.portAlreadyExposed(service.Spec.Ports[0].TargetPort); exposed && service_available_port != nil{ // The port you want to expose is already exposed by all the existing services service_available_port.Spec.Ports = append(service_available_port.Spec.Ports, service.Spec.Ports...) return service_available_port.Spec.Selector["app"] } b.Services = append(b.Services, service) return service.Spec.Selector["app"] } func (b *ArgoBuilder) addServiceToArgo(service models.Service) error { service_manifest, err := yaml.Marshal(service) if err != nil { logger.Error().Msg("Could not marshal service manifest") return err } service_template := models.Template{Name: "workflow-service-pod-" + strconv.Itoa(len(b.Services)), Resource: models.ServiceResource{ Action: "create", SuccessCondition: "status.succeeded > 0", FailureCondition: "status.failed > 3", SetOwnerReference: true, Manifest: string(service_manifest), }, } b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, service_template) return nil } func (b *ArgoBuilder) addLabel(name string, id string) { argo_name := getArgoName(name,id) for _, template := range b.Workflow.Spec.Templates{ if template.Name == argo_name{ template.Metadata.Labels["app"] = "service-workflow" return } } } func (b *ArgoBuilder) portAlreadyExposed(port int64) (exposed bool, service *models.Service ){ // For all already existing k8s services, test if the port in parameter is already exposed and returns the first service that doesn't yet expose this port for _, s := range b.Services { i := 0 port_exposed := false for !port_exposed { if s.Spec.Ports[i].TargetPort == port { port_exposed = true } } if !port_exposed { return false, &s } } return true, nil }