package workflow import ( "encoding/json" "errors" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resources" "cloud.o-forge.io/core/oc-lib/models/resources/datacenter" "cloud.o-forge.io/core/oc-lib/models/resources/workflow/graph" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" ) /* * AbstractWorkflow is a struct that represents a workflow for resource or native workflow * Warning: there is 2 types of workflows, the resource workflow and the native workflow * native workflow is the one that you create to schedule an execution * resource workflow is the one that is created to set our native workflow in catalog */ type AbstractWorkflow struct { resources.ResourceSet Graph *graph.Graph `bson:"graph,omitempty" json:"graph,omitempty"` // Graph UI & logic representation of the workflow ScheduleActive bool `json:"schedule_active" bson:"schedule_active"` // ScheduleActive is a flag that indicates if the schedule is active, if not the workflow is not scheduled and no execution or booking will be set Schedule *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"` // Schedule is the schedule of the workflow Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workflow } func (w AbstractWorkflow) isADependancy(id string) (bool, []string) { dependancyOfIDs := []string{} isDeps := false for _, link := range w.Graph.Links { source := w.Graph.Items[link.Destination.ID].Processing if id == link.Source.ID && source != nil { isDeps = true dependancyOfIDs = append(dependancyOfIDs, link.Destination.ID) } wourceWF := w.Graph.Items[link.Destination.ID].Workflow if id == link.Source.ID && wourceWF != nil { isDeps = true dependancyOfIDs = append(dependancyOfIDs, link.Destination.ID) } } return isDeps, dependancyOfIDs } func (w *AbstractWorkflow) GetStoragesByRelatedProcessing(processingID string, relatedToData bool, ignoreRelation bool) (map[string][]utils.DBObject, map[string]map[string][]utils.DBObject) { storages := make(map[string][]utils.DBObject) datasRelatedToStorage := make(map[string]map[string][]utils.DBObject) for _, link := range w.Graph.Links { inout := "in" storageID := link.Source.ID // Default value because we are looking for the input storage cause processing is destination nodeID := link.Destination.ID // we considers that the processing is the destination node := w.Graph.Items[link.Source.ID].Storage // we are looking for the storage as source if node == nil { // if the source is not a storage, we consider that the destination is the storage inout = "out" storageID = link.Destination.ID // then we are looking for the output storage nodeID = link.Source.ID // and the processing is the source node = w.Graph.Items[link.Destination.ID].Storage // we are looking for the storage as destination } if processingID == nodeID && node != nil { // if the storage is linked to the processing if storages[inout] == nil { storages[inout] = []utils.DBObject{} } if !ignoreRelation { datasRelatedToStorage[storageID], _ = w.GetDatasByRelatedProcessing(processingID, false, true) if relatedToData && len(datasRelatedToStorage[storageID]) > 0 { storages[inout] = append(storages[inout], node) } else if !relatedToData && len(datasRelatedToStorage[storageID]) == 0 { storages[inout] = append(storages[inout], node) } } else { storages[inout] = append(storages[inout], node) } } } return storages, datasRelatedToStorage } func (w *AbstractWorkflow) GetDatasByRelatedProcessing(dataID string, relatedToStorage bool, ignoreRelation bool) (map[string][]utils.DBObject, map[string]map[string][]utils.DBObject) { datas := make(map[string][]utils.DBObject) datasRelatedToData := make(map[string]map[string][]utils.DBObject) for _, link := range w.Graph.Links { inout := "in" dataID := link.Source.ID // Default value because we are looking for the input storage cause processing is destination nodeID := link.Destination.ID // we considers that the processing is the destination node := w.Graph.Items[link.Source.ID].Data // we are looking for the storage as source if node == nil { // if the source is not a storage, we consider that the destination is the storage inout = "out" dataID = link.Destination.ID // then we are looking for the output storage nodeID = link.Source.ID // and the processing is the source node = w.Graph.Items[link.Destination.ID].Data // we are looking for the storage as destination } if dataID == nodeID && node != nil { // if the storage is linked to the processing if datas[inout] == nil { datas[inout] = []utils.DBObject{} } datas[inout] = append(datas[inout], node) if !ignoreRelation { datasRelatedToData[dataID], _ = w.GetStoragesByRelatedProcessing(dataID, false, true) if relatedToStorage && len(datasRelatedToData[dataID]) > 0 { datas[inout] = append(datas[inout], node) } else if !relatedToStorage && len(datasRelatedToData[dataID]) == 0 { datas[inout] = append(datas[inout], node) } } else { datas[inout] = append(datas[inout], node) } } } return datas, datasRelatedToData } func (w *AbstractWorkflow) getProcessingsByRelatedProcessing() (list_computings []graph.GraphItem) { for _, item := range w.Graph.Items { if item.Processing != nil { list_computings = append(list_computings, item) } } return } // tool function to check if a link is a link between a datacenter and a resource func (w *AbstractWorkflow) isDCLink(link graph.GraphLink) (bool, string) { if w.Graph == nil || w.Graph.Items == nil { return false, "" } if d, ok := w.Graph.Items[link.Source.ID]; ok && d.Datacenter != nil { return true, d.Datacenter.UUID } if d, ok := w.Graph.Items[link.Destination.ID]; ok && d.Datacenter != nil { return true, d.Datacenter.UUID } return false, "" } /* * Workflow is a struct that represents a workflow * it defines the native workflow */ type Workflow struct { utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) AbstractWorkflow // AbstractWorkflow contains the basic fields of a workflow } /* * CheckBooking is a function that checks the booking of the workflow on peers (even ourselves) */ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) { // check if if wfa.Graph == nil { // no graph no booking return false, nil } accessor := (&datacenter.DatacenterResource{}).GetAccessor(nil) for _, link := range wfa.Graph.Links { if ok, dc_id := wfa.isDCLink(link); ok { // check if the link is a link between a datacenter and a resource dc, code, _ := accessor.LoadOne(dc_id) if code != 200 { continue } // CHECK BOOKING ON PEER, datacenter could be a remote one peerID := dc.(*datacenter.DatacenterResource).PeerID if peerID == "" { return false, errors.New("no peer id") } // no peer id no booking, we need to know where to book _, err := (&peer.Peer{}).LaunchPeerExecution(peerID, dc_id, tools.BOOKING, tools.GET, nil, caller) if err != nil { return false, err } } } return true, nil } func (d *Workflow) GetName() string { return d.Name } func (d *Workflow) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { data := New() // Create a new instance of the accessor data.Init(tools.WORKFLOW, caller) // Initialize the accessor with the WORKFLOW model type return data } func (dma *Workflow) Deserialize(j map[string]interface{}) utils.DBObject { b, err := json.Marshal(j) if err != nil { return nil } json.Unmarshal(b, dma) return dma } func (dma *Workflow) Serialize() map[string]interface{} { var m map[string]interface{} b, err := json.Marshal(dma) if err != nil { return nil } json.Unmarshal(b, &m) return m }