change for the win
This commit is contained in:
		
							
								
								
									
										12
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										12
									
								
								main.go
									
									
									
									
									
								
							@@ -62,11 +62,7 @@ func main() {
 | 
			
		||||
		monitorLocal = true
 | 
			
		||||
		loadConfig(true, nil)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	logger = logs.CreateLogger("oc-monitord")
 | 
			
		||||
 | 
			
		||||
	logger.Debug().Msg("Loki URL : " + conf.GetConfig().LokiURL)
 | 
			
		||||
	logger.Debug().Msg("Workflow executed : " + conf.GetConfig().ExecutionID)
 | 
			
		||||
	oclib.Init("oc-monitord")
 | 
			
		||||
 | 
			
		||||
	oclib.SetConfig(
 | 
			
		||||
		conf.GetConfig().MongoURL,
 | 
			
		||||
@@ -75,7 +71,11 @@ func main() {
 | 
			
		||||
		conf.GetConfig().LokiURL,
 | 
			
		||||
		conf.GetConfig().Logs,
 | 
			
		||||
	)
 | 
			
		||||
	oclib.Init("oc-monitord")
 | 
			
		||||
 | 
			
		||||
	logger = logs.CreateLogger("oc-monitord")
 | 
			
		||||
 | 
			
		||||
	logger.Debug().Msg("Loki URL : " + conf.GetConfig().LokiURL)
 | 
			
		||||
	logger.Debug().Msg("Workflow executed : " + conf.GetConfig().ExecutionID)
 | 
			
		||||
 | 
			
		||||
	wf_id := getWorkflowId(conf.GetConfig().ExecutionID)
 | 
			
		||||
	conf.GetConfig().WorkflowID = wf_id
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										
											BIN
										
									
								
								oc-monitord
									
									
									
									
									
								
							
							
						
						
									
										
											BIN
										
									
								
								oc-monitord
									
									
									
									
									
								
							
										
											Binary file not shown.
										
									
								
							@@ -5,6 +5,7 @@
 | 
			
		||||
package workflow_builder
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	. "oc-monitord/models"
 | 
			
		||||
	"os"
 | 
			
		||||
	"slices"
 | 
			
		||||
@@ -23,10 +24,10 @@ import (
 | 
			
		||||
var logger zerolog.Logger
 | 
			
		||||
 | 
			
		||||
type ArgoBuilder struct {
 | 
			
		||||
	OriginWorkflow 	w.Workflow
 | 
			
		||||
	Workflow       	Workflow
 | 
			
		||||
	Services		*Service
 | 
			
		||||
	Timeout        	int
 | 
			
		||||
	OriginWorkflow w.Workflow
 | 
			
		||||
	Workflow       Workflow
 | 
			
		||||
	Services       *Service
 | 
			
		||||
	Timeout        int
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Workflow struct {
 | 
			
		||||
@@ -47,17 +48,15 @@ type Spec struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *ArgoBuilder) CreateDAG() (string, error) {
 | 
			
		||||
	
 | 
			
		||||
 | 
			
		||||
	// handle services by checking if there is only one processing with hostname and port
 | 
			
		||||
	
 | 
			
		||||
 | 
			
		||||
	b.createNginxVolumes()
 | 
			
		||||
	
 | 
			
		||||
	
 | 
			
		||||
 | 
			
		||||
	b.createTemplates()
 | 
			
		||||
	b.createDAGstep()
 | 
			
		||||
	b.createVolumes()
 | 
			
		||||
 | 
			
		||||
	
 | 
			
		||||
	if b.Timeout > 0 {
 | 
			
		||||
		b.Workflow.Spec.Timeout = b.Timeout
 | 
			
		||||
	}
 | 
			
		||||
@@ -113,19 +112,19 @@ func (b *ArgoBuilder) createTemplates() {
 | 
			
		||||
		argo_name := getArgoName(comp_res.GetName(), comp.ID)
 | 
			
		||||
		new_temp := Template{Name: argo_name, Container: temp_container}
 | 
			
		||||
		new_temp.Inputs.Parameters = inputs_container
 | 
			
		||||
		new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "workdir", MountPath: "/mnt/vol"}) // TODO : replace this with a search of the storage / data source name
 | 
			
		||||
		new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "nginx-demo", MountPath: "/usr/share/nginx"}) // Used for processing services' demo with nginx 
 | 
			
		||||
		
 | 
			
		||||
		if (b.isService(comp.ID)){
 | 
			
		||||
		new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "workdir", MountPath: "/mnt/vol"})            // TODO : replace this with a search of the storage / data source name
 | 
			
		||||
		new_temp.Container.VolumeMounts = append(new_temp.Container.VolumeMounts, VolumeMount{Name: "nginx-demo", MountPath: "/usr/share/nginx"}) // Used for processing services' demo with nginx
 | 
			
		||||
 | 
			
		||||
		if b.isService(comp.ID) {
 | 
			
		||||
			serv := b.CreateService(comp)
 | 
			
		||||
			b.createService(serv, argo_name, comp.ID)
 | 
			
		||||
			new_temp.Metadata.Labels = make(map[string]string)
 | 
			
		||||
			new_temp.Metadata.Labels["app"] = "oc-service"		// Construct the template for the k8s service and add a link in graph between k8s service and processing
 | 
			
		||||
			new_temp.Metadata.Labels["app"] = "oc-service" // Construct the template for the k8s service and add a link in graph between k8s service and processing
 | 
			
		||||
			// if err != nil {
 | 
			
		||||
			// 	// TODO
 | 
			
		||||
			// }
 | 
			
		||||
		}
 | 
			
		||||
		
 | 
			
		||||
 | 
			
		||||
		b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, new_temp)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -154,11 +153,11 @@ func (b *ArgoBuilder) createDAGstep() {
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if b.Services != nil {
 | 
			
		||||
		new_dag.Tasks = append(new_dag.Tasks, Task{Name:"workflow-service-pod", Template: "workflow-service-pod"})
 | 
			
		||||
		new_dag.Tasks = append(new_dag.Tasks, Task{Name: "workflow-service-pod", Template: "workflow-service-pod"})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	b.Workflow.Spec.Templates = append(b.Workflow.Spec.Templates, Template{Name: "dag", Dag: new_dag})
 | 
			
		||||
	
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *ArgoBuilder) createVolumes() {
 | 
			
		||||
@@ -180,13 +179,13 @@ func (b *ArgoBuilder) createNginxVolumes() {
 | 
			
		||||
	b.Workflow.Spec.Volumes = append(b.Workflow.Spec.Volumes, new_volume)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
func (b *ArgoBuilder) getDependency(current_computing_id string) (dependencies []string) {
 | 
			
		||||
	for _, link := range b.OriginWorkflow.Graph.Links {
 | 
			
		||||
		if b.OriginWorkflow.Graph.Items[link.Source.ID].Processing == nil {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		source := b.OriginWorkflow.Graph.Items[link.Source.ID].Processing
 | 
			
		||||
		fmt.Println("source", source, current_computing_id, link.Destination.ID)
 | 
			
		||||
		if current_computing_id == link.Destination.ID && source != nil {
 | 
			
		||||
			dependency_name := getArgoName(source.GetName(), link.Source.ID)
 | 
			
		||||
			dependencies = append(dependencies, dependency_name)
 | 
			
		||||
@@ -209,7 +208,7 @@ func getComputingArgs(user_input string, command string) (list_args []string) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	args := strings.Split(user_input," ")
 | 
			
		||||
	args := strings.Split(user_input, " ")
 | 
			
		||||
 | 
			
		||||
	// quickfix that might need improvement
 | 
			
		||||
	if strings.Contains(command, "sh -c") {
 | 
			
		||||
@@ -295,7 +294,7 @@ func (b *ArgoBuilder) getProcessings() (list_computings []graph.GraphItem) {
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Pass a GraphItem's UUID and not the ID  
 | 
			
		||||
// Pass a GraphItem's UUID and not the ID
 | 
			
		||||
func (b *ArgoBuilder) IsProcessing(component_uuid string) bool {
 | 
			
		||||
	return slices.Contains(b.OriginWorkflow.Processings, component_uuid)
 | 
			
		||||
}
 | 
			
		||||
@@ -307,8 +306,8 @@ func getStringValue(comp resource_model.AbstractResource, key string) string {
 | 
			
		||||
	return ""
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (b *ArgoBuilder) isService(id string) bool{
 | 
			
		||||
	
 | 
			
		||||
func (b *ArgoBuilder) isService(id string) bool {
 | 
			
		||||
 | 
			
		||||
	comp := b.OriginWorkflow.Graph.Items[id]
 | 
			
		||||
 | 
			
		||||
	if comp.Processing == nil {
 | 
			
		||||
@@ -319,14 +318,12 @@ func (b *ArgoBuilder) isService(id string) bool{
 | 
			
		||||
	return is_exposed
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
func (b *ArgoBuilder) addLabel(name string, id string) {
 | 
			
		||||
	argo_name := getArgoName(name,id)
 | 
			
		||||
	for _, template := range b.Workflow.Spec.Templates{
 | 
			
		||||
		if template.Name == argo_name{
 | 
			
		||||
			template.Metadata.Labels["app"] = "service-workflow"			
 | 
			
		||||
	argo_name := getArgoName(name, id)
 | 
			
		||||
	for _, template := range b.Workflow.Spec.Templates {
 | 
			
		||||
		if template.Name == argo_name {
 | 
			
		||||
			template.Metadata.Labels["app"] = "service-workflow"
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user