This commit is contained in:
mr
2024-11-29 10:25:42 +01:00
parent 8d701e67d8
commit b591533ba4
15 changed files with 320 additions and 387 deletions

View File

@@ -13,12 +13,11 @@ import (
"cloud.o-forge.io/core/oc-lib/models/collaborative_area/shallow_collaborative_area"
"cloud.o-forge.io/core/oc-lib/models/peer"
"cloud.o-forge.io/core/oc-lib/models/resources"
"cloud.o-forge.io/core/oc-lib/models/resources/compute"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/models/workflow_execution"
"cloud.o-forge.io/core/oc-lib/models/workspace"
"cloud.o-forge.io/core/oc-lib/tools"
cron "github.com/robfig/cron/v3"
cron "github.com/robfig/cron"
)
type workflowMongoAccessor struct {
@@ -33,7 +32,7 @@ type workflowMongoAccessor struct {
// New creates a new instance of the workflowMongoAccessor
func New(t tools.DataType, peerID string, groups []string, caller *tools.HTTPCaller) *workflowMongoAccessor {
return &workflowMongoAccessor{
computeResourceAccessor: (&compute.ComputeResource{}).GetAccessor(peerID, groups, nil),
computeResourceAccessor: (&resources.ComputeResource{}).GetAccessor(peerID, groups, nil),
collaborativeAreaAccessor: (&shallow_collaborative_area.ShallowCollaborativeArea{}).GetAccessor(peerID, groups, nil),
executionAccessor: (&workflow_execution.WorkflowExecution{}).GetAccessor(peerID, groups, nil),
workspaceAccessor: (&workspace.Workspace{}).GetAccessor(peerID, groups, nil),
@@ -55,7 +54,7 @@ func New(t tools.DataType, peerID string, groups []string, caller *tools.HTTPCal
* getExecutions is a function that returns the executions of a workflow
* it returns an array of workflow_execution.WorkflowExecution
*/
func (wfa *workflowMongoAccessor) getExecutions(id string, data *Workflow) ([]*workflow_execution.WorkflowExecution, error) {
func (a *workflowMongoAccessor) getExecutions(id string, data *Workflow) ([]*workflow_execution.WorkflowExecution, error) {
workflows_execution := []*workflow_execution.WorkflowExecution{}
if data.Schedule != nil { // only set execution on a scheduled workflow
if data.Schedule.Start == nil { // if no start date, return an error
@@ -110,14 +109,14 @@ func (wfa *workflowMongoAccessor) getExecutions(id string, data *Workflow) ([]*w
}
// DeleteOne deletes a workflow from the database, delete depending executions and bookings
func (wfa *workflowMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) {
wfa.execution(id, &Workflow{
func (a *workflowMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) {
a.execution(id, &Workflow{
AbstractWorkflow: AbstractWorkflow{ScheduleActive: false},
}, true) // delete the executions
res, code, err := wfa.GenericDeleteOne(id, wfa)
res, code, err := utils.GenericDeleteOne(id, a)
if res != nil && code == 200 {
wfa.execute(res.(*Workflow), true, false) // up to date the workspace for the workflow
wfa.share(res.(*Workflow), true, wfa.Caller)
a.execute(res.(*Workflow), true, false) // up to date the workspace for the workflow
a.share(res.(*Workflow), true, a.Caller)
}
return res, code, err
}
@@ -127,15 +126,15 @@ func (wfa *workflowMongoAccessor) DeleteOne(id string) (utils.DBObject, int, err
* it takes the workflow id, the real data and the executions
* it returns an error if the booking fails
*/
func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*workflow_execution.WorkflowExecution) error {
if wfa.Caller == nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[tools.BOOKING] == nil {
func (a *workflowMongoAccessor) book(id string, realData *Workflow, execs []*workflow_execution.WorkflowExecution) error {
if a.Caller == nil || a.Caller.URLS == nil || a.Caller.URLS[tools.BOOKING] == nil {
return errors.New("no caller defined")
}
methods := wfa.Caller.URLS[tools.BOOKING]
methods := a.Caller.URLS[tools.BOOKING]
if _, ok := methods[tools.POST]; !ok {
return errors.New("no path found")
}
res, code, _ := wfa.LoadOne(id)
res, code, _ := a.LoadOne(id)
if code != 200 {
return errors.New("could not load workflow")
}
@@ -152,7 +151,7 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w
continue
} // if the compute is already found, skip it
isDCFound = append(isDCFound, dc_id)
dc, code, _ := wfa.computeResourceAccessor.LoadOne(dc_id)
dc, code, _ := a.computeResourceAccessor.LoadOne(dc_id)
if code != 200 {
continue
}
@@ -167,7 +166,7 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w
WorkflowID: id, // set the workflow id "WHO"
ResourceID: dc_id, // set the compute id "WHERE"
Executions: execs, // set the executions to book "WHAT"
}).Serialize(), wfa.Caller)
}).Serialize(), a.Caller)
if err != nil {
fmt.Println("BOOKING", err)
return err
@@ -181,12 +180,12 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w
/*
* share is a function that shares a workflow to the peers if the workflow is shared
*/
func (wfa *workflowMongoAccessor) share(realData *Workflow, delete bool, caller *tools.HTTPCaller) {
func (a *workflowMongoAccessor) share(realData *Workflow, delete bool, caller *tools.HTTPCaller) {
if realData == nil || realData.Shared == nil || len(realData.Shared) == 0 || caller == nil || caller.Disabled { // no shared no sharing
return
}
for _, sharedID := range realData.Shared { // loop through the shared ids
res, code, _ := wfa.collaborativeAreaAccessor.LoadOne(sharedID)
res, code, _ := a.collaborativeAreaAccessor.LoadOne(sharedID)
if code != 200 {
continue
}
@@ -207,7 +206,7 @@ func (wfa *workflowMongoAccessor) share(realData *Workflow, delete bool, caller
}
}
if err != nil {
wfa.Logger.Error().Msg(err.Error())
a.Logger.Error().Msg(err.Error())
}
}
}
@@ -215,34 +214,34 @@ func (wfa *workflowMongoAccessor) share(realData *Workflow, delete bool, caller
/*
* execution is a create or delete function for the workflow executions depending on the schedule of the workflow
*/
func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delete bool) (int, error) {
func (a *workflowMongoAccessor) execution(id string, realData *Workflow, delete bool) (int, error) {
nats := tools.NewNATSCaller() // create a new nats caller because executions are sent to the nats for daemons
mongo.MONGOService.DeleteMultiple(map[string]interface{}{
"state": 1, // only delete the scheduled executions only scheduled if executions are in progress or ended, they should not be deleted for registration
"workflow_id": id,
}, tools.WORKFLOW_EXECUTION.String())
err := wfa.book(id, realData, []*workflow_execution.WorkflowExecution{}) // delete the booking of the workflow on the peers
err := a.book(id, realData, []*workflow_execution.WorkflowExecution{}) // delete the booking of the workflow on the peers
fmt.Println("DELETE BOOKING", err)
nats.SetNATSPub(tools.WORKFLOW.String(), tools.REMOVE, realData) // send the deletion to the nats
if err != nil {
return 409, err
}
execs, err := wfa.getExecutions(id, realData) // get the executions of the workflow
execs, err := a.getExecutions(id, realData) // get the executions of the workflow
if err != nil {
return 422, err
}
if !realData.ScheduleActive || delete { // if the schedule is not active, delete the executions
execs = []*workflow_execution.WorkflowExecution{}
}
err = wfa.book(id, realData, execs) // book the workflow on the peers
err = a.book(id, realData, execs) // book the workflow on the peers
fmt.Println("BOOKING", err)
if err != nil {
return 409, err // if the booking fails, return an error for integrity between peers
}
fmt.Println("BOOKING", delete)
for _, obj := range execs {
_, code, err := wfa.executionAccessor.StoreOne(obj)
_, code, err := a.executionAccessor.StoreOne(obj)
fmt.Println("EXEC", code, err)
if code != 200 {
return code, err
@@ -253,21 +252,21 @@ func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delet
}
// UpdateOne updates a workflow in the database
func (wfa *workflowMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
res, code, err := wfa.LoadOne(id)
func (a *workflowMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
res, code, err := a.LoadOne(id)
if code != 200 {
return nil, 409, err
}
// avoid the update if the schedule is the same
avoid := set.(*Workflow).Schedule == nil || (res.(*Workflow).Schedule != nil && res.(*Workflow).ScheduleActive == set.(*Workflow).ScheduleActive && res.(*Workflow).Schedule.Start == set.(*Workflow).Schedule.Start && res.(*Workflow).Schedule.End == set.(*Workflow).Schedule.End && res.(*Workflow).Schedule.Cron == set.(*Workflow).Schedule.Cron)
res, code, err = wfa.GenericUpdateOne(set, id, wfa, &Workflow{})
res, code, err = utils.GenericUpdateOne(set, id, a, &Workflow{})
if code != 200 {
return nil, code, err
}
workflow := res.(*Workflow)
if !avoid { // if the schedule is not avoided, update the executions
if code, err := wfa.execution(id, workflow, false); code != 200 {
if code, err := a.execution(id, workflow, false); code != 200 {
return nil, code, errors.New("could not update the executions : " + err.Error())
}
}
@@ -276,16 +275,16 @@ func (wfa *workflowMongoAccessor) UpdateOne(set utils.DBObject, id string) (util
now := time.Now().UTC()
if (workflow.Schedule.End != nil && now.After(*workflow.Schedule.End)) || (workflow.Schedule.End == nil && workflow.Schedule.Start != nil && now.After(*workflow.Schedule.Start)) { // if the start date is passed, then you can book
workflow.ScheduleActive = false
wfa.GenericRawUpdateOne(workflow, id, wfa)
utils.GenericRawUpdateOne(workflow, id, a)
} // if the start date is passed, update the executions
}
wfa.execute(workflow, false, false) // update the workspace for the workflow
wfa.share(workflow, false, wfa.Caller) // share the update to the peers
a.execute(workflow, false, false) // update the workspace for the workflow
a.share(workflow, false, a.Caller) // share the update to the peers
return res, code, nil
}
// StoreOne stores a workflow in the database
func (wfa *workflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
func (a *workflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
d := data.(*Workflow)
if d.ScheduleActive && d.Schedule != nil { // if the workflow is scheduled, update the executions
now := time.Now().UTC()
@@ -293,44 +292,44 @@ func (wfa *workflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject,
d.ScheduleActive = false
} // if the start date is passed, update the executions
}
res, code, err := wfa.GenericStoreOne(d, wfa)
res, code, err := utils.GenericStoreOne(d, a)
if err != nil || code != 200 {
return nil, code, err
}
workflow := res.(*Workflow)
wfa.share(workflow, false, wfa.Caller) // share the creation to the peers
a.share(workflow, false, a.Caller) // share the creation to the peers
//store the executions
if code, err := wfa.execution(res.GetID(), workflow, false); err != nil {
if code, err := a.execution(res.GetID(), workflow, false); err != nil {
return nil, code, err
}
wfa.execute(workflow, false, false) // store the workspace for the workflow
a.execute(workflow, false, false) // store the workspace for the workflow
return res, code, nil
}
// CopyOne copies a workflow in the database
func (wfa *workflowMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
return wfa.GenericStoreOne(data, wfa)
func (a *workflowMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
return utils.GenericStoreOne(data, a)
}
// execute is a function that executes a workflow
// it stores the workflow resources in a specific workspace to never have a conflict in UI and logic
func (wfa *workflowMongoAccessor) execute(workflow *Workflow, delete bool, active bool) {
func (a *workflowMongoAccessor) execute(workflow *Workflow, delete bool, active bool) {
filters := &dbs.Filters{
Or: map[string][]dbs.Filter{ // filter by standard workspace name attached to a workflow
"abstractobject.name": {{Operator: dbs.LIKE.String(), Value: workflow.Name + "_workspace"}},
},
}
resource, _, err := wfa.workspaceAccessor.Search(filters, "")
resource, _, err := a.workspaceAccessor.Search(filters, "")
if delete { // if delete is set to true, delete the workspace
for _, r := range resource {
wfa.workspaceAccessor.DeleteOne(r.GetID())
a.workspaceAccessor.DeleteOne(r.GetID())
}
return
}
if err == nil && len(resource) > 0 { // if the workspace already exists, update it
wfa.workspaceAccessor.UpdateOne(&workspace.Workspace{
a.workspaceAccessor.UpdateOne(&workspace.Workspace{
Active: active,
ResourceSet: resources.ResourceSet{
Datas: workflow.Datas,
@@ -341,7 +340,7 @@ func (wfa *workflowMongoAccessor) execute(workflow *Workflow, delete bool, activ
},
}, resource[0].GetID())
} else { // if the workspace does not exist, create it
wfa.workspaceAccessor.StoreOne(&workspace.Workspace{
a.workspaceAccessor.StoreOne(&workspace.Workspace{
Active: active,
AbstractObject: utils.AbstractObject{Name: workflow.Name + "_workspace"},
ResourceSet: resources.ResourceSet{
@@ -362,7 +361,7 @@ func (a *workflowMongoAccessor) LoadOne(id string) (utils.DBObject, int, error)
now := time.Now().UTC()
if (w.Schedule.End != nil && now.After(*w.Schedule.End)) || (w.Schedule.End == nil && w.Schedule.Start != nil && now.After(*w.Schedule.Start)) { // if the start date is passed, then you can book
w.ScheduleActive = false
a.GenericRawUpdateOne(d, id, a)
utils.GenericRawUpdateOne(d, id, a)
} // if the start date is passed, update the executions
}
a.execute(w, false, true) // if no workspace is attached to the workflow, create it