oc-lib/models/workflow/graph/graph.go

128 lines
5.0 KiB
Go
Raw Normal View History

2024-07-18 11:51:12 +02:00
package graph
2024-10-10 08:55:22 +02:00
import (
"time"
2024-10-10 08:55:22 +02:00
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/tools"
)
2024-07-30 12:08:13 +02:00
// Graph is a struct that represents a graph
2024-07-18 11:51:12 +02:00
type Graph struct {
Zoom float64 `bson:"zoom" json:"zoom" default:"1"` // Zoom is the graphical zoom of the graph
Items map[string]GraphItem `bson:"items" json:"items" default:"{}" validate:"required"` // Items is the list of elements in the graph
Links []GraphLink `bson:"links" json:"links" default:"{}" validate:"required"` // Links is the list of links between elements in the graph
2024-07-18 11:51:12 +02:00
}
2025-01-15 10:56:44 +01:00
func (wf *Graph) IsProcessing(item GraphItem) bool {
return item.Processing != nil
}
func (wf *Graph) IsCompute(item GraphItem) bool {
return item.Compute != nil
}
func (wf *Graph) IsData(item GraphItem) bool {
return item.Data != nil
}
func (wf *Graph) IsStorage(item GraphItem) bool {
return item.Storage != nil
}
func (wf *Graph) IsWorkflow(item GraphItem) bool {
return item.Workflow != nil
}
2025-01-13 11:24:07 +01:00
func (g *Graph) GetAverageTimeRelatedToProcessingActivity(start time.Time, processings []*resources.ProcessingResource, resource resources.ResourceInterface,
f func(GraphItem) resources.ResourceInterface, request *tools.APIRequest) (float64, float64) {
nearestStart := float64(10000000000)
oneIsInfinite := false
longestDuration := float64(0)
for _, link := range g.Links {
for _, processing := range processings {
var source string // source is the source of the link
if link.Destination.ID == processing.GetID() && f(g.Items[link.Source.ID]) != nil && f(g.Items[link.Source.ID]).GetID() == resource.GetID() { // if the destination is the processing and the source is not a compute
source = link.Source.ID
} else if link.Source.ID == processing.GetID() && f(g.Items[link.Source.ID]) != nil && f(g.Items[link.Source.ID]).GetID() == resource.GetID() { // if the source is the processing and the destination is not a compute
source = link.Destination.ID
}
2025-01-13 11:24:07 +01:00
priced := processing.ConvertToPricedResource(tools.PROCESSING_RESOURCE, request)
if source != "" {
2025-01-13 11:24:07 +01:00
if priced.GetLocationStart() != nil {
near := float64(priced.GetLocationStart().Sub(start).Seconds())
if near < nearestStart {
nearestStart = near
}
}
2025-01-13 11:24:07 +01:00
if priced.GetLocationEnd() != nil {
duration := float64(priced.GetLocationEnd().Sub(*priced.GetLocationStart()).Seconds())
if longestDuration < duration {
longestDuration = duration
}
} else {
oneIsInfinite = true
}
}
}
}
if oneIsInfinite {
return nearestStart, -1
}
return nearestStart, longestDuration
}
/*
* GetAverageTimeBeforeStart is a function that returns the average time before the start of a processing
*/
2025-01-13 11:24:07 +01:00
func (g *Graph) GetAverageTimeProcessingBeforeStart(average float64, processingID string, request *tools.APIRequest) float64 {
currents := []float64{} // list of current time
for _, link := range g.Links { // for each link
var source string // source is the source of the link
if link.Destination.ID == processingID && g.Items[link.Source.ID].Processing == nil { // if the destination is the processing and the source is not a compute
source = link.Source.ID
} else if link.Source.ID == processingID && g.Items[link.Source.ID].Processing == nil { // if the source is the processing and the destination is not a compute
source = link.Destination.ID
}
if source == "" { // if source is empty, continue
continue
}
2025-01-13 11:24:07 +01:00
dt, r := g.GetResource(source) // get the resource of the source
if r == nil { // if item is nil, continue
continue
}
priced := r.ConvertToPricedResource(dt, request)
current := priced.GetExplicitDurationInS() // get the explicit duration of the item
if current < 0 { // if current is negative, its means that duration of a before could be infinite continue
return current
}
2025-01-13 11:24:07 +01:00
current += g.GetAverageTimeProcessingBeforeStart(current, source, request) // get the average time before start of the source
currents = append(currents, current) // append the current to the currents
}
var max float64 // get the max time to wait dependancies to finish
for _, current := range currents {
if current > max {
max = current
}
}
return max
}
2025-01-13 11:24:07 +01:00
func (g *Graph) GetResource(id string) (tools.DataType, resources.ResourceInterface) {
2024-10-10 08:55:22 +02:00
if item, ok := g.Items[id]; ok {
if item.Data != nil {
2025-01-13 11:24:07 +01:00
return tools.DATA_RESOURCE, item.Data
2024-11-07 11:05:24 +01:00
} else if item.Compute != nil {
2025-01-13 11:24:07 +01:00
return tools.COMPUTE_RESOURCE, item.Compute
2024-10-10 08:55:22 +02:00
} else if item.Workflow != nil {
2025-01-13 11:24:07 +01:00
return tools.WORKFLOW_RESOURCE, item.Workflow
2024-10-10 08:55:22 +02:00
} else if item.Processing != nil {
2025-01-13 11:24:07 +01:00
return tools.PROCESSING_RESOURCE, item.Processing
2024-10-10 08:55:22 +02:00
} else if item.Storage != nil {
2025-01-13 11:24:07 +01:00
return tools.STORAGE_RESOURCE, item.Storage
2024-10-10 08:55:22 +02:00
}
}
2025-01-13 11:24:07 +01:00
return tools.INVALID, nil
2024-10-10 08:55:22 +02:00
}