package workflow import ( "errors" "fmt" "time" "cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area" "cloud.o-forge.io/core/oc-lib/models/common/pricing" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/models/workflow/graph" "cloud.o-forge.io/core/oc-lib/tools" ) /* * AbstractWorkflow is a struct that represents a workflow for resource or native workflow * Warning: there is 2 types of workflows, the resource workflow and the native workflow * native workflow is the one that you create to schedule an execution * resource workflow is the one that is created to set our native workflow in catalog */ type AbstractWorkflow struct { resources.ResourceSet Graph *graph.Graph `bson:"graph,omitempty" json:"graph,omitempty"` // Graph UI & logic representation of the workflow ScheduleActive bool `json:"schedule_active" bson:"schedule_active"` // ScheduleActive is a flag that indicates if the schedule is active, if not the workflow is not scheduled and no execution or booking will be set // Schedule *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"` // Schedule is the schedule of the workflow Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workflow } func (d *Workflow) GetAccessor(request *tools.APIRequest) utils.Accessor { return NewAccessor(request) // Create a new instance of the accessor } func (w *AbstractWorkflow) GetGraphItems(f func(item graph.GraphItem) bool) (list_datas []graph.GraphItem) { for _, item := range w.Graph.Items { if f(item) { list_datas = append(list_datas, item) } } return } func (w *AbstractWorkflow) GetResources(f func(item graph.GraphItem) bool) map[string]resources.ResourceInterface { list_datas := map[string]resources.ResourceInterface{} for _, item := range w.Graph.Items { if f(item) { _, res := item.GetResource() list_datas[res.GetID()] = res } } return list_datas } func (w *AbstractWorkflow) GetPricedItem(f func(item graph.GraphItem) bool, request *tools.APIRequest) map[string]pricing.PricedItemITF { list_datas := map[string]pricing.PricedItemITF{} for _, item := range w.Graph.Items { if f(item) { dt, res := item.GetResource() ord := res.ConvertToPricedResource(dt, request) list_datas[res.GetID()] = ord } } return list_datas } func (w *AbstractWorkflow) GetByRelatedProcessing(processingID string, g func(item graph.GraphItem) bool) []resources.ResourceInterface { storages := []resources.ResourceInterface{} for _, link := range w.Graph.Links { nodeID := link.Destination.ID var node resources.ResourceInterface if g(w.Graph.Items[link.Source.ID]) { item := w.Graph.Items[link.Source.ID] _, node = item.GetResource() } if node == nil && g(w.Graph.Items[link.Destination.ID]) { // if the source is not a storage, we consider that the destination is the storage nodeID = link.Source.ID item := w.Graph.Items[link.Destination.ID] // and the processing is the source _, node = item.GetResource() // we are looking for the storage as destination } if processingID == nodeID && node != nil { // if the storage is linked to the processing storages = append(storages, node) } } return storages } func (wf *AbstractWorkflow) IsProcessing(item graph.GraphItem) bool { return item.Processing != nil } func (wf *AbstractWorkflow) IsCompute(item graph.GraphItem) bool { return item.Compute != nil } func (wf *AbstractWorkflow) IsData(item graph.GraphItem) bool { return item.Data != nil } func (wf *AbstractWorkflow) IsStorage(item graph.GraphItem) bool { return item.Storage != nil } func (wf *AbstractWorkflow) IsWorkflow(item graph.GraphItem) bool { return item.Workflow != nil } /* * Workflow is a struct that represents a workflow * it defines the native workflow */ type Workflow struct { utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) AbstractWorkflow // AbstractWorkflow contains the basic fields of a workflow } func (w *Workflow) getPricedItem(item graph.GraphItem, request *tools.APIRequest) pricing.PricedItemITF { dt, res := item.GetResource() if dt == tools.INVALID { return nil } return res.ConvertToPricedResource(dt, request) } func (ao *Workflow) VerifyAuth(request *tools.APIRequest) bool { isAuthorized := false if len(ao.Shared) > 0 { for _, shared := range ao.Shared { shared, code, _ := shallow_collaborative_area.NewAccessor(request).LoadOne(shared) if code != 200 || shared == nil { isAuthorized = false } isAuthorized = shared.VerifyAuth(request) } } return ao.AbstractObject.VerifyAuth(request) || isAuthorized } /* * CheckBooking is a function that checks the booking of the workflow on peers (even ourselves) */ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) { // check if if wfa.Graph == nil { // no graph no booking return false, nil } accessor := (&resources.ComputeResource{}).GetAccessor(&tools.APIRequest{Caller: caller}) for _, link := range wfa.Graph.Links { if ok, compute_id := link.IsComputeLink(*wfa.Graph); ok { // check if the link is a link between a compute and a resource compute, code, _ := accessor.LoadOne(compute_id) if code != 200 { continue } // CHECK BOOKING ON PEER, compute could be a remote one peerID := compute.(*resources.ComputeResource).CreatorID if peerID == "" { return false, errors.New("no peer id") } // no peer id no booking, we need to know where to book _, err := (&peer.Peer{}).LaunchPeerExecution(peerID, compute_id, tools.BOOKING, tools.GET, nil, caller) if err != nil { return false, err } } } return true, nil } func (wf *Workflow) Planify(start time.Time, end *time.Time, request *tools.APIRequest) (float64, map[tools.DataType][]pricing.PricedItemITF, *Workflow, error) { processings := []*resources.ProcessingResource{} priceds := map[tools.DataType][]pricing.PricedItemITF{} priceds[tools.PROCESSING_RESOURCE] = []pricing.PricedItemITF{} for _, item := range wf.GetGraphItems(wf.IsProcessing) { dt, realItem := item.GetResource() if realItem == nil { return 0, priceds, nil, errors.New("could not load the processing resource") } priced := realItem.ConvertToPricedResource(dt, request) timeFromStartS := wf.Graph.GetAverageTimeProcessingBeforeStart(0, realItem.GetID(), request) started := start.Add(time.Duration(timeFromStartS) * time.Second) priced.SetLocationStart(started) priced.SetLocationEnd(started.Add(time.Duration(priced.GetExplicitDurationInS()))) processings = append(processings, realItem.(*resources.ProcessingResource)) priceds[tools.PROCESSING_RESOURCE] = append(priceds[tools.PROCESSING_RESOURCE], priced) } priceds[tools.DATA_RESOURCE] = []pricing.PricedItemITF{} for _, item := range wf.GetGraphItems(wf.IsData) { dt, realItem := item.GetResource() if realItem == nil { continue } priced := realItem.ConvertToPricedResource(dt, request) priced.SetLocationStart(start) priced.SetLocationEnd(*end) priceds[tools.PROCESSING_RESOURCE] = append(priceds[tools.PROCESSING_RESOURCE], priced) } for _, f := range []func(graph.GraphItem) bool{wf.IsStorage, wf.IsCompute} { for _, item := range wf.GetGraphItems(f) { dt, r := item.GetResource() if r == nil { continue } if priceds[dt] == nil { priceds[dt] = []pricing.PricedItemITF{} } priced := r.ConvertToPricedResource(dt, request) nearestStart, longestDuration := wf.Graph.GetAverageTimeRelatedToProcessingActivity(start, processings, r, func(i graph.GraphItem) resources.ResourceInterface { if f(i) { _, r := i.GetResource() return r } else { return nil } }, request) started := start.Add(time.Duration(nearestStart) * time.Second) priced.SetLocationStart(started) if longestDuration >= 0 { priced.SetLocationEnd(started.Add(time.Duration(longestDuration))) } priceds[dt] = append(priceds[dt], priced) } } longest := wf.getLongestTime(end, priceds, request) priceds[tools.WORKFLOW_RESOURCE] = []pricing.PricedItemITF{} for _, item := range wf.GetGraphItems(wf.IsWorkflow) { access := NewAccessor(nil) _, r := item.GetResource() if r == nil { return 0, priceds, nil, errors.New("could not load the workflow") } priced := r.ConvertToPricedResource(tools.WORKFLOW_RESOURCE, request) res, code, err := access.LoadOne(r.GetID()) if code != 200 || err != nil { return 0, priceds, nil, errors.New("could not load the workflow with id: " + fmt.Sprintf("%v", err.Error())) } neoLongest := float64(0) innerWF := res.(*Workflow) neoLongest, _, innerWF, err = innerWF.Planify(start, end, request) if neoLongest > longest { longest = neoLongest } started := start.Add(time.Duration(wf.getNearestStart(start, priceds, request)) * time.Second) priced.SetLocationStart(started) durationE := time.Duration(longest) if durationE < 0 { continue } ended := start.Add(durationE * time.Second) priced.SetLocationEnd(ended) priceds[tools.WORKFLOW_RESOURCE] = append(priceds[tools.WORKFLOW_RESOURCE], priced) } return longest, priceds, wf, nil } func (wf *Workflow) getNearestStart(start time.Time, priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 { near := float64(10000000000) for _, items := range priceds { for _, priced := range items { if priced.GetLocationStart() == nil { continue } newS := priced.GetLocationStart() if newS.Sub(start).Seconds() < near { near = newS.Sub(start).Seconds() } } // get the nearest start from start var } return near } func (wf *Workflow) getLongestTime(end *time.Time, priceds map[tools.DataType][]pricing.PricedItemITF, request *tools.APIRequest) float64 { if end == nil { return -1 } longestTime := float64(0) for _, priced := range priceds[tools.PROCESSING_RESOURCE] { if priced.GetLocationEnd() == nil { continue } newS := priced.GetLocationEnd() if longestTime < newS.Sub(*end).Seconds() { longestTime = newS.Sub(*end).Seconds() } // get the nearest start from start var } return longestTime }