A question refers to the comment ! And if not Ooopsy

This commit is contained in:
mr
2024-08-30 14:50:48 +02:00
parent db78c70dc3
commit 8180fe5e99
39 changed files with 737 additions and 404 deletions

View File

@@ -12,14 +12,21 @@ import (
"cloud.o-forge.io/core/oc-lib/tools"
)
/*
* AbstractWorkflow is a struct that represents a workflow for resource or native workflow
* Warning: there is 2 types of workflows, the resource workflow and the native workflow
* native workflow is the one that you create to schedule an execution
* resource workflow is the one that is created to set our native workflow in catalog
*/
type AbstractWorkflow struct {
resources.ResourceSet
Graph *graph.Graph `bson:"graph,omitempty" json:"graph,omitempty"`
ScheduleActive bool `json:"schedule_active" bson:"schedule_active"`
Schedule *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"`
Shared []string `json:"shared,omitempty" bson:"shared,omitempty"`
Graph *graph.Graph `bson:"graph,omitempty" json:"graph,omitempty"` // Graph UI & logic representation of the workflow
ScheduleActive bool `json:"schedule_active" bson:"schedule_active"` // ScheduleActive is a flag that indicates if the schedule is active, if not the workflow is not scheduled and no execution or booking will be set
Schedule *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"` // Schedule is the schedule of the workflow
Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workflow
}
// tool function to check if a link is a link between a datacenter and a resource
func (w *AbstractWorkflow) isDCLink(link graph.GraphLink) (bool, string) {
if w.Graph == nil || w.Graph.Items == nil {
return false, ""
@@ -33,28 +40,35 @@ func (w *AbstractWorkflow) isDCLink(link graph.GraphLink) (bool, string) {
return false, ""
}
/*
* Workflow is a struct that represents a workflow
* it defines the native workflow
*/
type Workflow struct {
utils.AbstractObject
AbstractWorkflow
utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name)
AbstractWorkflow // AbstractWorkflow contains the basic fields of a workflow
}
/*
* CheckBooking is a function that checks the booking of the workflow on peers (even ourselves)
*/
func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) {
// check if
if wfa.Graph == nil {
if wfa.Graph == nil { // no graph no booking
return false, nil
}
accessor := (&datacenter.DatacenterResource{}).GetAccessor(nil)
for _, link := range wfa.Graph.Links {
if ok, dc_id := wfa.isDCLink(link); ok {
if ok, dc_id := wfa.isDCLink(link); ok { // check if the link is a link between a datacenter and a resource
dc, code, _ := accessor.LoadOne(dc_id)
if code != 200 {
continue
}
// CHECK BOOKING
// CHECK BOOKING ON PEER, datacenter could be a remote one
peerID := dc.(*datacenter.DatacenterResource).PeerID
if peerID == "" {
return false, errors.New("no peer id")
}
} // no peer id no booking, we need to know where to book
_, err := (&peer.Peer{}).LaunchPeerExecution(peerID, dc_id, utils.BOOKING, tools.GET, nil, caller)
if err != nil {
return false, err
@@ -69,8 +83,8 @@ func (d *Workflow) GetName() string {
}
func (d *Workflow) GetAccessor(caller *tools.HTTPCaller) utils.Accessor {
data := New()
data.Init(utils.WORKFLOW, caller)
data := New() // Create a new instance of the accessor
data.Init(utils.WORKFLOW, caller) // Initialize the accessor with the WORKFLOW model type
return data
}

View File

@@ -2,7 +2,6 @@ package workflow
import (
"errors"
"fmt"
"strings"
"cloud.o-forge.io/core/oc-lib/dbs"
@@ -19,52 +18,62 @@ import (
)
type workflowMongoAccessor struct {
utils.AbstractAccessor
utils.AbstractAccessor // AbstractAccessor contains the basic fields of an accessor (model, caller)
}
// New creates a new instance of the workflowMongoAccessor
func New() *workflowMongoAccessor {
return &workflowMongoAccessor{}
}
/*
* THERE IS A LOT IN THIS FILE SHOULD BE AWARE OF THE COMMENTS
*/
/*
* getExecutions is a function that returns the executions of a workflow
* it returns an array of workflow_execution.WorkflowExecution
*/
func (wfa *workflowMongoAccessor) getExecutions(id string, data *Workflow) ([]*workflow_execution.WorkflowExecution, error) {
workflows_execution := []*workflow_execution.WorkflowExecution{}
if data.Schedule != nil {
if data.Schedule.Start == nil {
if data.Schedule != nil { // only set execution on a scheduled workflow
if data.Schedule.Start == nil { // if no start date, return an error
return workflows_execution, errors.New("should get a start date on the scheduler.")
}
if data.Schedule.End != nil && data.Schedule.End.IsZero() {
if data.Schedule.End != nil && data.Schedule.End.IsZero() { // if end date is zero, set it to nil
data.Schedule.End = nil
}
if len(data.Schedule.Cron) > 0 {
if len(data.Schedule.Cron) > 0 { // if cron is set then end date should be set
if data.Schedule.End == nil {
return workflows_execution, errors.New("a cron task should have an end date.")
}
cronStr := strings.Split(data.Schedule.Cron, " ")
if len(cronStr) < 6 {
cronStr := strings.Split(data.Schedule.Cron, " ") // split the cron string to treat it
if len(cronStr) < 6 { // if the cron string is less than 6 fields, return an error because format is : ss mm hh dd MM dw (6 fields)
return nil, errors.New("Bad cron message: " + data.Schedule.Cron + ". Should be at least ss mm hh dd MM dw")
}
subCron := strings.Join(cronStr[:6], " ")
// cron should be parsed as ss mm hh dd MM dw t (min 6 fields)
specParser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
sched, err := specParser.Parse(subCron)
specParser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) // create a new cron parser
sched, err := specParser.Parse(subCron) // parse the cron string
if err != nil {
return workflows_execution, errors.New("Bad cron message: " + err.Error())
}
// loop through the cron schedule to set the executions
for s := sched.Next(*data.Schedule.Start); !s.IsZero() && s.Before(*data.Schedule.End); s = sched.Next(s) {
obj := &workflow_execution.WorkflowExecution{
AbstractObject: utils.AbstractObject{
Name: data.Schedule.Name,
Name: data.Schedule.Name, // set the name of the execution
},
ExecDate: &s,
EndDate: data.Schedule.End,
State: 1,
WorkflowID: id,
ExecDate: &s, // set the execution date
EndDate: data.Schedule.End, // set the end date
State: 1, // set the state to 1 (scheduled)
WorkflowID: id, // set the workflow id dependancy of the execution
}
workflows_execution = append(workflows_execution, obj)
workflows_execution = append(workflows_execution, obj) // append the execution to the array
}
} else {
obj := &workflow_execution.WorkflowExecution{
} else { // if no cron, set the execution to the start date
obj := &workflow_execution.WorkflowExecution{ // create a new execution
AbstractObject: utils.AbstractObject{
Name: data.Schedule.Name,
},
@@ -73,24 +82,30 @@ func (wfa *workflowMongoAccessor) getExecutions(id string, data *Workflow) ([]*w
State: 1,
WorkflowID: id,
}
workflows_execution = append(workflows_execution, obj)
workflows_execution = append(workflows_execution, obj) // append the execution to the array
}
}
return workflows_execution, nil
}
// DeleteOne deletes a workflow from the database, delete depending executions and bookings
func (wfa *workflowMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) {
wfa.execution(id, &Workflow{
AbstractWorkflow: AbstractWorkflow{ScheduleActive: false},
}, true)
}, true) // delete the executions
res, code, err := wfa.GenericDeleteOne(id, wfa)
if res != nil && code == 200 {
wfa.execute(res.(*Workflow), false)
wfa.execute(res.(*Workflow), false) // up to date the workspace for the workflow
}
wfa.share(res.(*Workflow), true, wfa.Caller)
wfa.share(res.(*Workflow), true, wfa.Caller) // send the deletion to the peers where workflow is shared
return res, code, err
}
/*
* book is a function that books a workflow on the peers
* it takes the workflow id, the real data and the executions
* it returns an error if the booking fails
*/
func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*workflow_execution.WorkflowExecution) error {
if wfa.Caller == nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.BOOKING.String()] == nil {
return errors.New("no caller defined")
@@ -105,27 +120,28 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w
}
r := res.(*Workflow)
g := r.Graph
if realData.Graph != nil {
if realData.Graph != nil { // if the graph is set, set it to the real data
g = realData.Graph
}
if g != nil && g.Links != nil && len(g.Links) > 0 {
if g != nil && g.Links != nil && len(g.Links) > 0 { // if the graph is set and has links then book the workflow (even on ourselves)
accessor := (&datacenter.DatacenterResource{}).GetAccessor(nil)
for _, link := range g.Links {
if ok, dc_id := realData.isDCLink(link); ok {
if ok, dc_id := realData.isDCLink(link); ok { // check if the link is a link between a datacenter and a resource booking is only on datacenter
dc, code, _ := accessor.LoadOne(dc_id)
if code != 200 {
continue
}
// CHECK BOOKING
peerID := dc.(*datacenter.DatacenterResource).PeerID
if peerID == "" {
if peerID == "" { // no peer id no booking
continue
}
// BOOKING ON PEER
_, err := (&peer.Peer{}).LaunchPeerExecution(peerID, "", utils.BOOKING, tools.POST,
(&workflow_execution.WorkflowExecutions{
WorkflowID: id,
ResourceID: dc_id,
Executions: execs,
(&workflow_execution.WorkflowExecutions{ // it's the standard model for booking see OC-PEER
WorkflowID: id, // set the workflow id "WHO"
ResourceID: dc_id, // set the datacenter id "WHERE"
Executions: execs, // set the executions to book "WHAT"
}).Serialize(), wfa.Caller)
if err != nil {
return err
@@ -136,11 +152,14 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w
return nil
}
/*
* share is a function that shares a workflow to the peers if the workflow is shared
*/
func (wfa *workflowMongoAccessor) share(realData *Workflow, delete bool, caller *tools.HTTPCaller) {
if realData.Shared == nil || len(realData.Shared) == 0 {
if realData.Shared == nil || len(realData.Shared) == 0 { // no shared no sharing
return
}
for _, sharedID := range realData.Shared {
for _, sharedID := range realData.Shared { // loop through the shared ids
access := (&shallow_shared_workspace.ShallowSharedWorkspace{}).GetAccessor(nil)
res, code, _ := access.LoadOne(sharedID)
if code != 200 {
@@ -150,12 +169,12 @@ func (wfa *workflowMongoAccessor) share(realData *Workflow, delete bool, caller
paccess := &peer.Peer{}
for _, p := range res.(*shallow_shared_workspace.ShallowSharedWorkspace).Peers {
paccess.UUID = p
if paccess.IsMySelf() {
if paccess.IsMySelf() { // if the peer is the current peer, never share because it will create a loop
continue
}
if delete {
if delete { // if the workflow is deleted, share the deletion
_, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKFLOW, tools.DELETE, map[string]interface{}{}, caller)
} else {
} else { // if the workflow is updated, share the update
_, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKFLOW, tools.PUT, res.Serialize(), caller)
}
}
@@ -165,108 +184,112 @@ func (wfa *workflowMongoAccessor) share(realData *Workflow, delete bool, caller
}
}
/*
* execution is a create or delete function for the workflow executions depending on the schedule of the workflow
*/
func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delete bool) (int, error) {
var err error
nats := tools.NewNATSCaller()
fmt.Println("EXECUTION", realData.ScheduleActive)
if !realData.ScheduleActive {
nats := tools.NewNATSCaller() // create a new nats caller because executions are sent to the nats for daemons
if !realData.ScheduleActive { // if the schedule is not active, delete the executions
mongo.MONGOService.DeleteMultiple(map[string]interface{}{
"state": 1,
"state": 1, // only delete the scheduled executions only scheduled if executions are in progress or ended, they should not be deleted for registration
"workflow_id": id,
}, utils.WORKFLOW_EXECUTION.String())
err := wfa.book(id, realData, []*workflow_execution.WorkflowExecution{})
nats.SetNATSPub(utils.WORKFLOW.String(), tools.REMOVE, realData)
err := wfa.book(id, realData, []*workflow_execution.WorkflowExecution{}) // delete the booking of the workflow on the peers
nats.SetNATSPub(utils.WORKFLOW.String(), tools.REMOVE, realData) // send the deletion to the nats
return 200, err
}
accessor := (&workflow_execution.WorkflowExecution{}).GetAccessor(nil)
execs, err := wfa.getExecutions(id, realData)
execs, err := wfa.getExecutions(id, realData) // get the executions of the workflow
if err != nil {
return 422, err
}
err = wfa.book(id, realData, execs)
err = wfa.book(id, realData, execs) // book the workflow on the peers
if err != nil {
return 409, err
return 409, err // if the booking fails, return an error for integrity between peers
}
if delete {
if delete { // if delete is set to true, delete the executions
mongo.MONGOService.DeleteMultiple(map[string]interface{}{
"workflow_id": id,
"state": 1,
}, utils.WORKFLOW_EXECUTION.String())
wfa.book(id, realData, []*workflow_execution.WorkflowExecution{})
nats.SetNATSPub(utils.WORKFLOW.String(), tools.REMOVE, realData)
return 200, nil
}
if len(execs) > 0 {
if len(execs) > 0 { // if the executions are set, store them
for _, obj := range execs {
_, code, err := accessor.StoreOne(obj)
if code != 200 {
return code, err
}
}
nats.SetNATSPub(utils.WORKFLOW.String(), tools.CREATE, realData)
nats.SetNATSPub(utils.WORKFLOW.String(), tools.CREATE, realData) // send the creation to the nats
} else {
return 422, err
}
return 200, nil
}
// UpdateOne updates a workflow in the database
func (wfa *workflowMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) {
res, code, err := wfa.LoadOne(id)
if code != 200 {
return nil, 409, err
}
//new := set.(*Workflow)
// avoid the update if the schedule is the same
avoid := set.(*Workflow).Schedule == nil || (res.(*Workflow).Schedule != nil && res.(*Workflow).ScheduleActive == set.(*Workflow).ScheduleActive && res.(*Workflow).Schedule.Start == set.(*Workflow).Schedule.Start && res.(*Workflow).Schedule.End == set.(*Workflow).Schedule.End && res.(*Workflow).Schedule.Cron == set.(*Workflow).Schedule.Cron)
/*for _, i := range new.Graph.Items {
if i.Datacenter == nil && i.Processing == nil && i.Storage == nil && i.Workflow == nil && i.Data == nil {
return nil, 422, errors.New("graph item should have at least one resource data is corrupted")
}
}*/
res, code, err = wfa.GenericUpdateOne(set, id, wfa, &Workflow{})
if code != 200 {
return nil, code, err
}
if !avoid {
if !avoid { // if the schedule is not avoided, update the executions
if code, err := wfa.execution(id, res.(*Workflow), true); err != nil {
return nil, code, err
}
}
wfa.execute(res.(*Workflow), false)
wfa.share(res.(*Workflow), false, wfa.Caller)
wfa.execute(res.(*Workflow), false) // update the workspace for the workflow
wfa.share(res.(*Workflow), false, wfa.Caller) // share the update to the peers
return res, code, err
}
// StoreOne stores a workflow in the database
func (wfa *workflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) {
res, code, err := wfa.GenericStoreOne(data, wfa)
if err != nil {
return nil, code, err
}
//store the executions
if code, err := wfa.execution(res.GetID(), res.(*Workflow), false); err != nil {
return nil, code, err
}
wfa.execute(res.(*Workflow), false)
wfa.execute(res.(*Workflow), false) // store the workspace for the workflow
return res, code, err
}
// CopyOne copies a workflow in the database
func (wfa *workflowMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) {
return wfa.GenericStoreOne(data, wfa)
}
// execute is a function that executes a workflow
// it stores the workflow resources in a specific workspace to never have a conflict in UI and logic
func (wfa *workflowMongoAccessor) execute(workflow *Workflow, delete bool) {
accessor := (&workspace.Workspace{}).GetAccessor(nil)
filters := &dbs.Filters{
Or: map[string][]dbs.Filter{
Or: map[string][]dbs.Filter{ // filter by standard workspace name attached to a workflow
"abstractobject.name": {{dbs.LIKE.String(), workflow.Name + "_workspace"}},
},
}
resource, _, err := accessor.Search(filters, "")
if delete {
if delete { // if delete is set to true, delete the workspace
for _, r := range resource {
accessor.DeleteOne(r.GetID())
}
return
}
if err == nil && len(resource) > 0 {
if err == nil && len(resource) > 0 { // if the workspace already exists, update it
accessor.UpdateOne(&workspace.Workspace{
Active: true,
ResourceSet: resources.ResourceSet{
@@ -277,7 +300,7 @@ func (wfa *workflowMongoAccessor) execute(workflow *Workflow, delete bool) {
Datacenters: workflow.Datacenters,
},
}, resource[0].GetID())
} else {
} else { // if the workspace does not exist, create it
accessor.StoreOne(&workspace.Workspace{
Active: true,
AbstractObject: utils.AbstractObject{Name: workflow.Name + "_workspace"},
@@ -292,6 +315,7 @@ func (wfa *workflowMongoAccessor) execute(workflow *Workflow, delete bool) {
}
}
// LoadOne loads a workflow from the database
func (wfa *workflowMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) {
var workflow Workflow
res_mongo, code, err := mongo.MONGOService.LoadOne(id, wfa.GetType())
@@ -300,11 +324,12 @@ func (wfa *workflowMongoAccessor) LoadOne(id string) (utils.DBObject, int, error
return nil, code, err
}
res_mongo.Decode(&workflow)
wfa.execute(&workflow, false)
wfa.execute(&workflow, false) // if no workspace is attached to the workflow, create it
return &workflow, 200, nil
}
// LoadAll loads all the workflows from the database
func (wfa workflowMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) {
objs := []utils.ShallowDBObject{}
res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType())
@@ -317,7 +342,7 @@ func (wfa workflowMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error)
return nil, 404, err
}
for _, r := range results {
objs = append(objs, &r.AbstractObject)
objs = append(objs, &r.AbstractObject) // only AbstractObject fields !
}
return objs, 200, nil
}
@@ -326,7 +351,7 @@ func (wfa *workflowMongoAccessor) Search(filters *dbs.Filters, search string) ([
objs := []utils.ShallowDBObject{}
if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" {
filters = &dbs.Filters{
Or: map[string][]dbs.Filter{
Or: map[string][]dbs.Filter{ // filter by name if no filters are provided
"abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}},
},
}

View File

@@ -10,15 +10,14 @@ const (
SERVICE
)
/*
* WorkflowSchedule is a struct that contains the scheduling information of a workflow
* It contains the mode of the schedule (Task or Service), the name of the schedule, the start and end time of the schedule and the cron expression
*/
type WorkflowSchedule struct {
Mode int64 `json:"mode" bson:"mode" validate:"required"`
Name string `json:"name" bson:"name" validate:"required"`
Start *time.Time `json:"start" bson:"start" validate:"required,ltfield=End"`
End *time.Time `json:"end,omitempty" bson:"end,omitempty"`
Cron string `json:"cron,omitempty" bson:"cron,omitempty"` // ss mm hh dd MM dw task
}
func (ws *WorkflowSchedule) GetAllDates() (timetable []time.Time) {
// Return all the execution time generated by the Cron
return
Mode int64 `json:"mode" bson:"mode" validate:"required"` // Mode is the mode of the schedule (Task or Service)
Name string `json:"name" bson:"name" validate:"required"` // Name is the name of the schedule
Start *time.Time `json:"start" bson:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time
End *time.Time `json:"end,omitempty" bson:"end,omitempty"` // End is the end time of the schedule
Cron string `json:"cron,omitempty" bson:"cron,omitempty"` // here the cron format : ss mm hh dd MM dw task
}