From 8180fe5e99caafede8bf1652b761de3f37022cc9 Mon Sep 17 00:00:00 2001 From: mr Date: Fri, 30 Aug 2024 14:50:48 +0200 Subject: [PATCH] A question refers to the comment ! And if not Ooopsy --- entrypoint.go | 110 ++++++++++--- models/booking/booking.go | 15 +- models/booking/booking_mongo_accessor.go | 11 +- models/models.go | 6 + models/peer/peer.go | 21 ++- models/peer/peer_cache.go | 74 ++++----- models/peer/peer_mongo_accessor.go | 9 +- models/resource_model/resource_model.go | 49 ++++-- .../resource_model_mongo_accessor.go | 6 +- models/resources/data/data.go | 16 +- models/resources/data/data_mongo_accessor.go | 13 +- models/resources/datacenter/datacenter.go | 11 +- .../datacenter/datacenter_mongo_accessor.go | 13 +- models/resources/processing/processing.go | 22 +-- .../processing/processing_mongo_accessor.go | 14 +- models/resources/storage/storage.go | 28 ++-- .../storage/storage_mongo_accessor.go | 14 +- models/resources/workflow/graph/graph.go | 57 ++++--- models/resources/workflow/workflow.go | 8 +- .../workflow/workflow_mongo_accessor.go | 5 +- .../resources/workflow/workflow_schedule.go | 16 -- models/utils/abstracts.go | 32 ++-- models/utils/enums.go | 7 +- models/utils/interfaces.go | 3 + models/workflow/workflow.go | 38 +++-- models/workflow/workflow_mongo_accessor.go | 153 ++++++++++-------- models/workflow/workflow_schedule.go | 19 ++- .../workflow_execution/workflow_execution.go | 30 +++- .../workflow_execution_mongo_accessor.go | 3 +- models/workspace/shared/rules/rule/rule.go | 11 +- .../shared/rules/rule/rule_mongo_accessor.go | 11 +- models/workspace/shared/shared_workspace.go | 36 +++-- .../shared/shared_workspace_mongo_accessor.go | 90 +++++++---- models/workspace/workspace.go | 17 +- models/workspace/workspace_mongo_accessor.go | 44 +++-- static/peer_static.go | 9 ++ tools/api.go | 94 ++++++----- tools/nats_caller.go | 10 +- tools/remote_caller.go | 16 +- 39 files changed, 737 insertions(+), 404 deletions(-) delete mode 100644 models/resources/workflow/workflow_schedule.go diff --git a/entrypoint.go b/entrypoint.go index 4c45c66..fe938fb 100644 --- a/entrypoint.go +++ b/entrypoint.go @@ -31,6 +31,7 @@ type Filters = dbs.Filters type LibDataEnum int +// init accessible constant to retrieve data from the database const ( INVALID LibDataEnum = iota DATA_RESOURCE = utils.DATA_RESOURCE @@ -47,40 +48,49 @@ const ( BOOKING = utils.BOOKING ) +// will turn into standards api hostnames func (d LibDataEnum) API() string { return utils.DefaultAPI[d] } +// will turn into standards name func (d LibDataEnum) String() string { return utils.Str[d] } +// will turn into enum index func (d LibDataEnum) EnumIndex() int { return int(d) } +// model to define the shallow data structure type LibDataShallow struct { Data []utils.ShallowDBObject `bson:"data" json:"data"` Code int `bson:"code" json:"code"` Err string `bson:"error" json:"error"` } +// model to define the data structure type LibData struct { Data utils.DBObject `bson:"data" json:"data"` Code int `bson:"code" json:"code"` Err string `bson:"error" json:"error"` } +// here is the singleton variable to store the paths that api will use var paths map[LibDataEnum]string = map[LibDataEnum]string{} +// to get the paths func GetPaths() map[LibDataEnum]string { return paths } +// to get the path func GetPath(collection LibDataEnum) string { return paths[collection] } +// to add the path func AddPath(collection LibDataEnum, path string) { paths[collection] = path } @@ -92,16 +102,23 @@ func Init(appName string, hostname string, port string) { fmt.Printf("Panic recovered in Init : %v - %v\n", r, string(debug.Stack())) } }() - logs.SetAppName(appName) - logs.SetLogger(logs.CreateLogger("main", "")) - tools.GetConfig().Host = hostname - tools.GetConfig().Port = port - mongo.MONGOService.Init(models.GetModelsNames(), tools.GetConfig()) + logs.SetAppName(appName) // set the app name to the logger to define the main log chan + logs.SetLogger(logs.CreateLogger("main", "")) // create the logger + tools.GetConfig().Host = hostname // set the hostname to the config for inner discovery purpose actually not used + tools.GetConfig().Port = port // set the port to the config for inner discovery purpose actually not used + mongo.MONGOService.Init(models.GetModelsNames(), tools.GetConfig()) // init the mongo service + /* + Here we will check if the resource model is already stored in the database + If not we will store it + Resource model is the model that will define the structure of the resources + */ accessor := (&resource_model.ResourceModel{}).GetAccessor(nil) for _, model := range []string{utils.DATA_RESOURCE.String(), utils.PROCESSING_RESOURCE.String(), utils.STORAGE_RESOURCE.String(), utils.DATACENTER_RESOURCE.String(), utils.WORKFLOW_RESOURCE.String()} { data, code, _ := accessor.Search(nil, model) if code == 404 || len(data) == 0 { m := map[string]resource_model.Model{} + // TODO Specify the model for each resource + // for now only processing is specified here (not an elegant way) if model == utils.PROCESSING_RESOURCE.String() { m["image"] = resource_model.Model{ Type: "string", @@ -115,6 +132,10 @@ func Init(appName string, hostname string, port string) { Type: "string", ReadOnly: false, } + m["execute"] = resource_model.Model{ + Type: "map[int]map[string]string", + ReadOnly: false, + } } accessor.StoreOne(&resource_model.ResourceModel{ ResourceType: model, @@ -124,18 +145,27 @@ func Init(appName string, hostname string, port string) { } } +// GetLogger returns the main logger func GetLogger() zerolog.Logger { return logs.GetLogger() } +/* +* Search will search for the data in the database +* @param filters *dbs.Filters +* @param word string +* @param collection LibDataEnum +* @param c ...*tools.HTTPCaller +* @return data LibDataShallow + */ func Search(filters *dbs.Filters, word string, collection LibDataEnum, c ...*tools.HTTPCaller) (data LibDataShallow) { - defer func() { + defer func() { // recover the panic if r := recover(); r != nil { tools.UncatchedError = append(tools.UncatchedError, errors.New("Panic recovered in Search : "+fmt.Sprintf("%v", r))) data = LibDataShallow{Data: nil, Code: 500, Err: "Panic recovered in LoadAll : " + fmt.Sprintf("%v", r) + " - " + string(debug.Stack())} } }() - var caller *tools.HTTPCaller + var caller *tools.HTTPCaller // define the caller if len(c) > 0 { caller = c[0] } @@ -148,14 +178,20 @@ func Search(filters *dbs.Filters, word string, collection LibDataEnum, c ...*too return } +/* +* LoadAll will load all the data from the database +* @param collection LibDataEnum +* @param c ...*tools.HTTPCaller +* @return data LibDataShallow + */ func LoadAll(collection LibDataEnum, c ...*tools.HTTPCaller) (data LibDataShallow) { - defer func() { + defer func() { // recover the panic if r := recover(); r != nil { tools.UncatchedError = append(tools.UncatchedError, errors.New("Panic recovered in LoadAll : "+fmt.Sprintf("%v", r)+" - "+string(debug.Stack()))) data = LibDataShallow{Data: nil, Code: 500, Err: "Panic recovered in LoadAll : " + fmt.Sprintf("%v", r) + " - " + string(debug.Stack())} } }() - var caller *tools.HTTPCaller + var caller *tools.HTTPCaller // define the caller if len(c) > 0 { caller = c[0] } @@ -168,14 +204,21 @@ func LoadAll(collection LibDataEnum, c ...*tools.HTTPCaller) (data LibDataShallo return } +/* +* LoadOne will load one data from the database +* @param collection LibDataEnum +* @param id string +* @param c ...*tools.HTTPCaller +* @return data LibData + */ func LoadOne(collection LibDataEnum, id string, c ...*tools.HTTPCaller) (data LibData) { - defer func() { + defer func() { // recover the panic if r := recover(); r != nil { tools.UncatchedError = append(tools.UncatchedError, errors.New("Panic recovered in LoadOne : "+fmt.Sprintf("%v", r)+" - "+string(debug.Stack()))) data = LibData{Data: nil, Code: 500, Err: "Panic recovered in LoadOne : " + fmt.Sprintf("%v", r) + " - " + string(debug.Stack())} } }() - var caller *tools.HTTPCaller + var caller *tools.HTTPCaller // define the caller if len(c) > 0 { caller = c[0] } @@ -188,14 +231,22 @@ func LoadOne(collection LibDataEnum, id string, c ...*tools.HTTPCaller) (data Li return } +/* +* UpdateOne will update one data from the database +* @param collection LibDataEnum +* @param set map[string]interface{} +* @param id string +* @param c ...*tools.HTTPCaller +* @return data LibData + */ func UpdateOne(collection LibDataEnum, set map[string]interface{}, id string, c ...*tools.HTTPCaller) (data LibData) { - defer func() { + defer func() { // recover the panic if r := recover(); r != nil { tools.UncatchedError = append(tools.UncatchedError, errors.New("Panic recovered in UpdateOne : "+fmt.Sprintf("%v", r)+" - "+string(debug.Stack()))) data = LibData{Data: nil, Code: 500, Err: "Panic recovered in UpdateOne : " + fmt.Sprintf("%v", r) + " - " + string(debug.Stack())} } }() - var caller *tools.HTTPCaller + var caller *tools.HTTPCaller // define the caller if len(c) > 0 { caller = c[0] } @@ -209,14 +260,21 @@ func UpdateOne(collection LibDataEnum, set map[string]interface{}, id string, c return } +/* +* DeleteOne will delete one data from the database +* @param collection LibDataEnum +* @param id string +* @param c ...*tools.HTTPCaller +* @return data LibData + */ func DeleteOne(collection LibDataEnum, id string, c ...*tools.HTTPCaller) (data LibData) { - defer func() { + defer func() { // recover the panic if r := recover(); r != nil { tools.UncatchedError = append(tools.UncatchedError, errors.New("Panic recovered in DeleteOne : "+fmt.Sprintf("%v", r)+" - "+string(debug.Stack()))) data = LibData{Data: nil, Code: 500, Err: "Panic recovered in DeleteOne : " + fmt.Sprintf("%v", r) + " - " + string(debug.Stack())} } }() - var caller *tools.HTTPCaller + var caller *tools.HTTPCaller // define the caller if len(c) > 0 { caller = c[0] } @@ -229,14 +287,21 @@ func DeleteOne(collection LibDataEnum, id string, c ...*tools.HTTPCaller) (data return } +/* +* StoreOne will store one data from the database +* @param collection LibDataEnum +* @param object map[string]interface{} +* @param c ...*tools.HTTPCaller +* @return data LibData + */ func StoreOne(collection LibDataEnum, object map[string]interface{}, c ...*tools.HTTPCaller) (data LibData) { - defer func() { + defer func() { // recover the panic if r := recover(); r != nil { tools.UncatchedError = append(tools.UncatchedError, errors.New("Panic recovered in StoreOne : "+fmt.Sprintf("%v", r)+" - "+string(debug.Stack()))) data = LibData{Data: nil, Code: 500, Err: "Panic recovered in StoreOne : " + fmt.Sprintf("%v", r) + " - " + string(debug.Stack())} } }() - var caller *tools.HTTPCaller + var caller *tools.HTTPCaller // define the caller if len(c) > 0 { caller = c[0] } @@ -250,14 +315,21 @@ func StoreOne(collection LibDataEnum, object map[string]interface{}, c ...*tools return } +/* +* CopyOne will copy one data from the database +* @param collection LibDataEnum +* @param object map[string]interface{} +* @param c ...*tools.HTTPCaller +* @return data LibData + */ func CopyOne(collection LibDataEnum, object map[string]interface{}, c ...*tools.HTTPCaller) (data LibData) { - defer func() { + defer func() { // recover the panic if r := recover(); r != nil { tools.UncatchedError = append(tools.UncatchedError, errors.New("Panic recovered in CopyOne : "+fmt.Sprintf("%v", r)+" - "+string(debug.Stack()))) data = LibData{Data: nil, Code: 500, Err: "Panic recovered in UpdateOne : " + fmt.Sprintf("%v", r) + " - " + string(debug.Stack())} } }() - var caller *tools.HTTPCaller + var caller *tools.HTTPCaller // define the caller if len(c) > 0 { caller = c[0] } diff --git a/models/booking/booking.go b/models/booking/booking.go index ec3e0dc..a6879af 100644 --- a/models/booking/booking.go +++ b/models/booking/booking.go @@ -12,11 +12,15 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" ) +/* +* Booking is a struct that represents a booking + */ type Booking struct { - workflow_execution.WorkflowExecution - DatacenterResourceID string `json:"datacenter_resource_id,omitempty" bson:"datacenter_resource_id,omitempty" validate:"required"` + workflow_execution.WorkflowExecution // WorkflowExecution contains the workflow execution data + DatacenterResourceID string `json:"datacenter_resource_id,omitempty" bson:"datacenter_resource_id,omitempty" validate:"required"` // DatacenterResourceID is the ID of the datacenter resource specified in the booking } +// CheckBooking checks if a booking is possible on a specific datacenter resource func (wfa *Booking) CheckBooking(id string, start time.Time, end *time.Time) (bool, error) { // check if if end == nil { @@ -26,7 +30,7 @@ func (wfa *Booking) CheckBooking(id string, start time.Time, end *time.Time) (bo e := *end accessor := wfa.GetAccessor(nil) res, code, err := accessor.Search(&dbs.Filters{ - And: map[string][]dbs.Filter{ + And: map[string][]dbs.Filter{ // check if there is a booking on the same datacenter resource by filtering on the datacenter_resource_id, the state and the execution date "datacenter_resource_id": {{Operator: dbs.EQUAL.String(), Value: id}}, "workflowexecution.state": {{Operator: dbs.EQUAL.String(), Value: workflow_execution.SCHEDULED.EnumIndex()}}, "workflowexecution.execution_date": { @@ -41,6 +45,7 @@ func (wfa *Booking) CheckBooking(id string, start time.Time, end *time.Time) (bo return len(res) == 0, nil } +// tool to convert the argo status to a state func (wfa *Booking) ArgoStatusToState(status string) *Booking { wfa.WorkflowExecution.ArgoStatusToState(status) return wfa @@ -59,8 +64,8 @@ func (d *Booking) GetName() string { } func (d *Booking) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { - data := New() - data.Init(utils.BOOKING, caller) + data := New() // Create a new instance of the accessor + data.Init(utils.BOOKING, caller) // Initialize the accessor with the BOOKING model type return data } diff --git a/models/booking/booking_mongo_accessor.go b/models/booking/booking_mongo_accessor.go index d8d7b72..f994c90 100644 --- a/models/booking/booking_mongo_accessor.go +++ b/models/booking/booking_mongo_accessor.go @@ -7,13 +7,17 @@ import ( ) type bookingMongoAccessor struct { - utils.AbstractAccessor + utils.AbstractAccessor // AbstractAccessor contains the basic fields of an accessor (model, caller) } +// New creates a new instance of the bookingMongoAccessor func New() *bookingMongoAccessor { return &bookingMongoAccessor{} } +/* +* Nothing special here, just the basic CRUD operations + */ func (wfa *bookingMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { return wfa.GenericDeleteOne(id, wfa) } @@ -53,16 +57,17 @@ func (wfa bookingMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) return nil, 404, err } for _, r := range results { - objs = append(objs, &r.AbstractObject) + objs = append(objs, &r.AbstractObject) // Warning only AbstractObject is returned } return objs, 200, nil } +// Search is a function that searches for a booking in the database func (wfa *bookingMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) { objs := []utils.ShallowDBObject{} if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" { filters = &dbs.Filters{ - Or: map[string][]dbs.Filter{ + Or: map[string][]dbs.Filter{ // filter by name if no filters are provided "abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, }, } diff --git a/models/models.go b/models/models.go index 38680a2..b6cb3b4 100644 --- a/models/models.go +++ b/models/models.go @@ -19,6 +19,10 @@ import ( "cloud.o-forge.io/core/oc-lib/models/workspace/shared/rules/rule" ) +/* +This package contains the models used in the application +It's used to create the models dynamically +*/ var models = map[string]func() utils.DBObject{ utils.WORKFLOW_RESOURCE.String(): func() utils.DBObject { return &w.WorkflowResource{} }, utils.DATA_RESOURCE.String(): func() utils.DBObject { return &d.DataResource{} }, @@ -35,6 +39,7 @@ var models = map[string]func() utils.DBObject{ utils.BOOKING.String(): func() utils.DBObject { return &booking.Booking{} }, } +// Model returns the model object based on the model type func Model(model int) utils.DBObject { log := logs.GetLogger() if _, ok := models[utils.FromInt(model)]; ok { @@ -44,6 +49,7 @@ func Model(model int) utils.DBObject { return nil } +// GetModelsNames returns the names of the models func GetModelsNames() []string { names := []string{} for name := range models { diff --git a/models/peer/peer.go b/models/peer/peer.go index 5ce6095..601f622 100644 --- a/models/peer/peer.go +++ b/models/peer/peer.go @@ -10,17 +10,19 @@ import ( "github.com/google/uuid" ) +// Peer is a struct that represents a peer type Peer struct { utils.AbstractObject - Url string `json:"url,omitempty" bson:"url,omitempty" validate:"required,base64url"` - PublicKey string `json:"public_key,omitempty" bson:"public_key,omitempty"` - Services map[string]int `json:"services,omitempty" bson:"services,omitempty"` - FailedExecution []PeerExecution `json:"failed_execution" bson:"failed_execution"` + Url string `json:"url,omitempty" bson:"url,omitempty" validate:"required,base64url"` // Url is the URL of the peer (base64url) + PublicKey string `json:"public_key,omitempty" bson:"public_key,omitempty"` // PublicKey is the public key of the peer + Services map[string]int `json:"services,omitempty" bson:"services,omitempty"` // Services is the services of the peer + FailedExecution []PeerExecution `json:"failed_execution" bson:"failed_execution"` // FailedExecution is the list of failed executions, to be retried } +// AddExecution adds an execution to the list of failed executions func (ao *Peer) AddExecution(exec PeerExecution) { found := false - for _, v := range ao.FailedExecution { + for _, v := range ao.FailedExecution { // Check if the execution is already in the list if v.Url == exec.Url && v.Method == exec.Method && fmt.Sprint(v.Body) == fmt.Sprint(exec.Body) { found = true break @@ -31,6 +33,7 @@ func (ao *Peer) AddExecution(exec PeerExecution) { } } +// RemoveExecution removes an execution from the list of failed executions func (ao *Peer) RemoveExecution(exec PeerExecution) { new := []PeerExecution{} for i, v := range ao.FailedExecution { @@ -41,14 +44,16 @@ func (ao *Peer) RemoveExecution(exec PeerExecution) { ao.FailedExecution = new } +// IsMySelf checks if the peer is the local peer func (ao *Peer) IsMySelf() bool { id, _ := static.GetMyLocalJsonPeer() return ao.UUID == id } +// LaunchPeerExecution launches an execution on a peer func (p *Peer) LaunchPeerExecution(peerID string, dataID string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) { p.UUID = peerID - return (&PeerCache{}).LaunchPeerExecution(peerID, dataID, dt, method, body, caller) + return cache.LaunchPeerExecution(peerID, dataID, dt, method, body, caller) // Launch the execution on the peer through the cache } func (ao *Peer) GetID() string { @@ -64,8 +69,8 @@ func (d *Peer) GetName() string { } func (d *Peer) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { - data := New() - data.Init(utils.PEER, caller) + data := New() // Create a new instance of the accessor + data.Init(utils.PEER, caller) // Initialize the accessor with the PEER model type return data } diff --git a/models/peer/peer_cache.go b/models/peer/peer_cache.go index de66dc2..6efffb7 100644 --- a/models/peer/peer_cache.go +++ b/models/peer/peer_cache.go @@ -11,6 +11,10 @@ import ( "cloud.o-forge.io/core/oc-lib/tools" ) +/* +* PeerExecution is a struct that represents an execution on a peer +* it defines the execution data + */ type PeerExecution struct { Method string `json:"method" bson:"method"` Url string `json:"url" bson:"url"` @@ -19,11 +23,16 @@ type PeerExecution struct { DataID string `json:"data_id" bson:"data_id"` } +var cache = &PeerCache{} // Singleton instance of the peer cache +// PeerCache is a struct that represents a peer cache type PeerCache struct { Executions []*PeerExecution } +// urlFormat formats the URL of the peer with the data type API function func (p *PeerCache) urlFormat(url string, dt utils.DataType) string { + // localhost is replaced by the local peer URL + // because localhost must collide on a web request security protocol if strings.Contains(url, "localhost") || strings.Contains(url, "127.0.0.1") { url = strings.ReplaceAll(url, "localhost", dt.API()) url = strings.ReplaceAll(url, "127.0.0.1", dt.API()) @@ -37,53 +46,46 @@ func (p *PeerCache) urlFormat(url string, dt utils.DataType) string { return url } +// checkPeerStatus checks the status of a peer func (p *PeerCache) checkPeerStatus(peerID string, appName string, caller *tools.HTTPCaller) (*Peer, bool) { api := tools.API{} access := (&Peer{}).GetAccessor(nil) - res, code, _ := access.LoadOne(peerID) - if code != 200 { + res, code, _ := access.LoadOne(peerID) // Load the peer from db + if code != 200 { // no peer no party return nil, false } - methods := caller.URLS[utils.PEER.String()] - fmt.Println("PEER AFT 3", methods) + methods := caller.URLS[utils.PEER.String()] // Get the methods url of the peer if methods == nil { return res.(*Peer), false } - meth := methods[tools.POST] - fmt.Println("PEER AFT 4", meth) + meth := methods[tools.POST] // Get the POST method to check status if meth == "" { return res.(*Peer), false } - url := p.urlFormat(res.(*Peer).Url+meth, utils.PEER) - fmt.Println("PEER AFT 5", url) - state, services := api.CheckRemotePeer(url) - res.(*Peer).Services = services - access.UpdateOne(res, peerID) - fmt.Println("PEER AFT 6", state, services, appName) - return res.(*Peer), state != tools.DEAD && services[appName] == 0 -} - -func (p *PeerCache) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { - data := New() - data.Init(utils.PEER, caller) - return data + url := p.urlFormat(res.(*Peer).Url+meth, utils.PEER) // Format the URL + state, services := api.CheckRemotePeer(url) // Check the status of the peer + res.(*Peer).Services = services // Update the services states of the peer + access.UpdateOne(res, peerID) // Update the peer in the db + return res.(*Peer), state != tools.DEAD && services[appName] == 0 // Return the peer and its status } +// LaunchPeerExecution launches an execution on a peer func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) { - methods := caller.URLS[dt.String()] + methods := caller.URLS[dt.String()] // Get the methods url of the data type if _, ok := methods[method]; !ok { return nil, errors.New("no path found") } - meth := methods[method] + meth := methods[method] // Get the method url to execute if meth == "" { return nil, errors.New("no path found") } else { - meth = strings.ReplaceAll(meth, ":id", dataID) + meth = strings.ReplaceAll(meth, ":id", dataID) // Replace the id in the url in case of a DELETE / UPDATE method (it's a standard naming in OC) } url := "" - + // Check the status of the peer if mypeer, ok := p.checkPeerStatus(peerID, dt.API(), caller); !ok { + // If the peer is not reachable, add the execution to the failed executions list pexec := &PeerExecution{ Method: method.String(), Url: p.urlFormat((mypeer.Url)+meth, dt), @@ -92,31 +94,32 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string, DataID: dataID, } mypeer.AddExecution(*pexec) - mypeer.GetAccessor(nil).UpdateOne(mypeer, peerID) + mypeer.GetAccessor(nil).UpdateOne(mypeer, peerID) // Update the peer in the db return nil, errors.New("peer is not reachable") } else { - url = p.urlFormat((mypeer.Url)+meth, dt) - fmt.Println("LaunchPeerExecution AFT 3", url) - tmp := mypeer.FailedExecution - mypeer.FailedExecution = []PeerExecution{} - mypeer.GetAccessor(nil).UpdateOne(mypeer, peerID) - for _, v := range tmp { + // If the peer is reachable, launch the execution + url = p.urlFormat((mypeer.Url)+meth, dt) // Format the URL + tmp := mypeer.FailedExecution // Get the failed executions list + mypeer.FailedExecution = []PeerExecution{} // Reset the failed executions list + mypeer.GetAccessor(nil).UpdateOne(mypeer, peerID) // Update the peer in the db + for _, v := range tmp { // Retry the failed executions go p.exec(v.Url, tools.ToMethod(v.Method), v.Body, caller) } } - return nil, p.exec(url, method, body, caller) + return nil, p.exec(url, method, body, caller) // Execute the method } +// exec executes the method on the peer func (p *PeerCache) exec(url string, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) error { var b []byte var err error - if method == tools.POST { + if method == tools.POST { // Execute the POST method if it's a POST method b, err = caller.CallPost(url, "", body) } - if method == tools.GET { + if method == tools.GET { // Execute the GET method if it's a GET method b, err = caller.CallGet(url, "") } - if method == tools.DELETE { + if method == tools.DELETE { // Execute the DELETE method if it's a DELETE method b, err = caller.CallDelete(url, "") } var m map[string]interface{} @@ -124,9 +127,8 @@ func (p *PeerCache) exec(url string, method tools.METHOD, body map[string]interf if err != nil { return err } - fmt.Println("LaunchPeerExecution AFT 3", m, url) - if e, ok := m["error"]; !ok && e != "" { + if e, ok := m["error"]; !ok && e != "" { // Check if there is an error in the response return errors.New(fmt.Sprintf("%v", m["error"])) } return err diff --git a/models/peer/peer_mongo_accessor.go b/models/peer/peer_mongo_accessor.go index 7aa8e00..2eaa371 100644 --- a/models/peer/peer_mongo_accessor.go +++ b/models/peer/peer_mongo_accessor.go @@ -7,13 +7,18 @@ import ( ) type peerMongoAccessor struct { - utils.AbstractAccessor + utils.AbstractAccessor // AbstractAccessor contains the basic fields of an accessor (model, caller) } +// New creates a new instance of the peerMongoAccessor func New() *peerMongoAccessor { return &peerMongoAccessor{} } +/* +* Nothing special here, just the basic CRUD operations + */ + func (wfa *peerMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { return wfa.GenericDeleteOne(id, wfa) } @@ -63,7 +68,7 @@ func (wfa *peerMongoAccessor) Search(filters *dbs.Filters, search string) ([]uti objs := []utils.ShallowDBObject{} if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" { filters = &dbs.Filters{ - Or: map[string][]dbs.Filter{ + Or: map[string][]dbs.Filter{ // search by name if no filters are provided "abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, }, } diff --git a/models/resource_model/resource_model.go b/models/resource_model/resource_model.go index f33ab76..7d59f72 100644 --- a/models/resource_model/resource_model.go +++ b/models/resource_model/resource_model.go @@ -8,20 +8,27 @@ import ( "github.com/google/uuid" ) +/* +* AbstractResource is a struct that represents a resource +* it defines the resource data + */ type AbstractResource struct { - utils.AbstractObject - ShortDescription string `json:"short_description,omitempty" bson:"short_description,omitempty" validate:"required"` - Description string `json:"description,omitempty" bson:"description,omitempty"` - Logo string `json:"logo,omitempty" bson:"logo,omitempty" validate:"required"` - Owner string `json:"owner,omitempty" bson:"owner,omitempty" validate:"required"` - OwnerLogo string `json:"owner_logo,omitempty" bson:"owner_logo,omitempty"` - SourceUrl string `json:"source_url,omitempty" bson:"source_url,omitempty" validate:"required"` - PeerID string `json:"peer_id,omitempty" bson:"peer_id,omitempty" validate:"required"` - Price string `json:"price,omitempty" bson:"price,omitempty"` - License string `json:"license,omitempty" bson:"license,omitempty"` - ResourceModel *ResourceModel `json:"resource_model,omitempty" bson:"resource_model,omitempty"` + utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) + ShortDescription string `json:"short_description,omitempty" bson:"short_description,omitempty" validate:"required"` // ShortDescription is the short description of the resource + Description string `json:"description,omitempty" bson:"description,omitempty"` // Description is the description of the resource + Logo string `json:"logo,omitempty" bson:"logo,omitempty" validate:"required"` // Logo is the logo of the resource + Owner string `json:"owner,omitempty" bson:"owner,omitempty" validate:"required"` // Owner is the owner of the resource + OwnerLogo string `json:"owner_logo,omitempty" bson:"owner_logo,omitempty"` // OwnerLogo is the owner logo of the resource + SourceUrl string `json:"source_url,omitempty" bson:"source_url,omitempty" validate:"required"` // SourceUrl is the source URL of the resource + PeerID string `json:"peer_id,omitempty" bson:"peer_id,omitempty" validate:"required"` // PeerID is the ID of the peer getting this resource + Price string `json:"price,omitempty" bson:"price,omitempty"` // Price is the price of access to the resource + License string `json:"license,omitempty" bson:"license,omitempty"` // License is the license of the resource + ResourceModel *ResourceModel `json:"resource_model,omitempty" bson:"resource_model,omitempty"` // ResourceModel is the model of the resource } +/* +* GetModelValue returns the value of the model key + */ func (abs *AbstractResource) GetModelValue(key string) interface{} { if abs.ResourceModel == nil || abs.ResourceModel.Model == nil { return nil @@ -32,6 +39,9 @@ func (abs *AbstractResource) GetModelValue(key string) interface{} { return abs.ResourceModel.Model[key].Value } +/* +* GetModelType returns the type of the model key + */ func (abs *AbstractResource) GetModelType(key string) interface{} { if abs.ResourceModel == nil || abs.ResourceModel.Model == nil { return nil @@ -42,6 +52,9 @@ func (abs *AbstractResource) GetModelType(key string) interface{} { return abs.ResourceModel.Model[key].Type } +/* +* GetModelKeys returns the keys of the model + */ func (abs *AbstractResource) GetModelKeys() []string { keys := make([]string, 0) for k := range abs.ResourceModel.Model { @@ -50,6 +63,9 @@ func (abs *AbstractResource) GetModelKeys() []string { return keys } +/* +* GetModelReadOnly returns the readonly of the model key + */ func (abs *AbstractResource) GetModelReadOnly(key string) interface{} { if abs.ResourceModel == nil || abs.ResourceModel.Model == nil { return nil @@ -61,11 +77,16 @@ func (abs *AbstractResource) GetModelReadOnly(key string) interface{} { } type Model struct { - Type string `json:"type,omitempty" bson:"type,omitempty"` - Value interface{} `json:"value,omitempty" bson:"value,omitempty"` - ReadOnly bool `json:"readonly,omitempty" bson:"readonly,omitempty"` + Type string `json:"type,omitempty" bson:"type,omitempty"` // Type is the type of the model + Value interface{} `json:"value,omitempty" bson:"value,omitempty"` // Value is the value of the model + ReadOnly bool `json:"readonly,omitempty" bson:"readonly,omitempty"` // ReadOnly is the readonly of the model } +/* +* ResourceModel is a struct that represents a resource model +* it defines the resource metadata and specificity +* Warning: This struct is not user available, it is only used by the system + */ type ResourceModel struct { UUID string `json:"id,omitempty" bson:"id,omitempty" validate:"required"` ResourceType string `json:"resource_type,omitempty" bson:"resource_type,omitempty" validate:"required"` diff --git a/models/resource_model/resource_model_mongo_accessor.go b/models/resource_model/resource_model_mongo_accessor.go index ebea024..f4fb816 100644 --- a/models/resource_model/resource_model_mongo_accessor.go +++ b/models/resource_model/resource_model_mongo_accessor.go @@ -7,9 +7,13 @@ import ( ) type ResourceModelMongoAccessor struct { - utils.AbstractAccessor + utils.AbstractAccessor // AbstractAccessor contains the basic fields of an accessor (model, caller) } +/* +* Nothing special here, just the basic CRUD operations + */ + func (wfa *ResourceModelMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { return wfa.GenericDeleteOne(id, wfa) } diff --git a/models/resources/data/data.go b/models/resources/data/data.go index 958c600..877feec 100644 --- a/models/resources/data/data.go +++ b/models/resources/data/data.go @@ -8,11 +8,15 @@ import ( "cloud.o-forge.io/core/oc-lib/tools" ) +/* +* DataResource is a struct that represents a data resource +* it defines the resource data + */ type DataResource struct { - resource_model.AbstractResource - Protocols []string `json:"protocol,omitempty" bson:"protocol,omitempty"` //TODO Enum type - DataType string `json:"datatype,omitempty" bson:"datatype,omitempty"` - Example string `json:"example,omitempty" bson:"example,omitempty" description:"base64 encoded data"` + resource_model.AbstractResource // AbstractResource contains the basic fields of an object (id, name) + Protocols []string `json:"protocol,omitempty" bson:"protocol,omitempty"` //TODO Enum type + DataType string `json:"datatype,omitempty" bson:"datatype,omitempty"` // DataType is the type of the data + Example string `json:"example,omitempty" bson:"example,omitempty" description:"base64 encoded data"` // Example is an example of the data } func (dma *DataResource) Deserialize(j map[string]interface{}) utils.DBObject { @@ -35,7 +39,7 @@ func (dma *DataResource) Serialize() map[string]interface{} { } func (d *DataResource) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { - data := New() - data.Init(utils.DATA_RESOURCE, caller) + data := New() // Create a new instance of the accessor + data.Init(utils.DATA_RESOURCE, caller) // Initialize the accessor with the DATA_RESOURCE model type return data } diff --git a/models/resources/data/data_mongo_accessor.go b/models/resources/data/data_mongo_accessor.go index fe80e22..5401e36 100644 --- a/models/resources/data/data_mongo_accessor.go +++ b/models/resources/data/data_mongo_accessor.go @@ -8,13 +8,18 @@ import ( ) type dataMongoAccessor struct { - utils.AbstractAccessor + utils.AbstractAccessor // AbstractAccessor contains the basic fields of an accessor (model, caller) } +// New creates a new instance of the dataMongoAccessor func New() *dataMongoAccessor { return &dataMongoAccessor{} } +/* +* Nothing special here, just the basic CRUD operations + */ + func (dma *dataMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { return dma.GenericDeleteOne(id, dma) } @@ -66,7 +71,7 @@ func (wfa dataMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) { if err == nil && len(resources) > 0 { r.ResourceModel = resources[0].(*resource_model.ResourceModel) } - objs = append(objs, &r.AbstractResource) + objs = append(objs, &r.AbstractResource) // only get the abstract resource ! } return objs, 200, nil } @@ -75,7 +80,7 @@ func (wfa *dataMongoAccessor) Search(filters *dbs.Filters, search string) ([]uti objs := []utils.ShallowDBObject{} if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" { filters = &dbs.Filters{ - Or: map[string][]dbs.Filter{ + Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided "abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, "abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}}, "abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}}, @@ -99,7 +104,7 @@ func (wfa *dataMongoAccessor) Search(filters *dbs.Filters, search string) ([]uti if err == nil && len(resources) > 0 { r.ResourceModel = resources[0].(*resource_model.ResourceModel) } - objs = append(objs, &r.AbstractResource) + objs = append(objs, &r.AbstractResource) // only get the abstract resource ! } return objs, 200, nil } diff --git a/models/resources/datacenter/datacenter.go b/models/resources/datacenter/datacenter.go index d581a74..73e6f50 100644 --- a/models/resources/datacenter/datacenter.go +++ b/models/resources/datacenter/datacenter.go @@ -8,11 +8,15 @@ import ( "cloud.o-forge.io/core/oc-lib/tools" ) +/* +* DatacenterResource is a struct that represents a datacenter resource +* it defines the resource datacenter + */ type DatacenterResource struct { resource_model.AbstractResource - CPUs []*CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` - RAM *RAM `bson:"ram,omitempty" json:"ram,omitempty"` - GPUs []*GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` + CPUs []*CPU `bson:"cpus,omitempty" json:"cpus,omitempty"` // CPUs is the list of CPUs + RAM *RAM `bson:"ram,omitempty" json:"ram,omitempty"` // RAM is the RAM + GPUs []*GPU `bson:"gpus,omitempty" json:"gpus,omitempty"` // GPUs is the list of GPUs } func (dma *DatacenterResource) Deserialize(j map[string]interface{}) utils.DBObject { @@ -40,6 +44,7 @@ func (d *DatacenterResource) GetAccessor(caller *tools.HTTPCaller) utils.Accesso return data } +// CPU is a struct that represents a CPU type CPU struct { Cores uint `bson:"cores,omitempty" json:"cores,omitempty"` //TODO: validate Architecture string `bson:"architecture,omitempty" json:"architecture,omitempty"` //TOOD: enum diff --git a/models/resources/datacenter/datacenter_mongo_accessor.go b/models/resources/datacenter/datacenter_mongo_accessor.go index e352d63..e3c7648 100644 --- a/models/resources/datacenter/datacenter_mongo_accessor.go +++ b/models/resources/datacenter/datacenter_mongo_accessor.go @@ -8,13 +8,18 @@ import ( ) type datacenterMongoAccessor struct { - utils.AbstractAccessor + utils.AbstractAccessor // AbstractAccessor contains the basic fields of an accessor (model, caller) } +// New creates a new instance of the datacenterMongoAccessor func New() *datacenterMongoAccessor { return &datacenterMongoAccessor{} } +/* +* Nothing special here, just the basic CRUD operations + */ + func (dca *datacenterMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { return dca.GenericDeleteOne(id, dca) } @@ -68,7 +73,7 @@ func (wfa datacenterMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, erro if err == nil && len(resources) > 0 { r.ResourceModel = resources[0].(*resource_model.ResourceModel) } - objs = append(objs, &r.AbstractResource) + objs = append(objs, &r.AbstractResource) // only get the abstract resource ! } return objs, 200, nil } @@ -77,7 +82,7 @@ func (wfa *datacenterMongoAccessor) Search(filters *dbs.Filters, search string) objs := []utils.ShallowDBObject{} if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" { filters = &dbs.Filters{ - Or: map[string][]dbs.Filter{ + Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided "abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, "abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}}, "abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}}, @@ -101,7 +106,7 @@ func (wfa *datacenterMongoAccessor) Search(filters *dbs.Filters, search string) if err == nil && len(resources) > 0 { r.ResourceModel = resources[0].(*resource_model.ResourceModel) } - objs = append(objs, &r.AbstractResource) + objs = append(objs, &r.AbstractResource) // only get the abstract resource ! } return objs, 200, nil } diff --git a/models/resources/processing/processing.go b/models/resources/processing/processing.go index 870621a..b76b892 100644 --- a/models/resources/processing/processing.go +++ b/models/resources/processing/processing.go @@ -9,15 +9,19 @@ import ( "cloud.o-forge.io/core/oc-lib/tools" ) +/* +* ProcessingResource is a struct that represents a processing resource +* it defines the resource processing + */ type ProcessingResource struct { resource_model.AbstractResource - CPUs []*datacenter.CPU `bson:"cpus,omitempty" json:"cp_us,omitempty"` - GPUs []*datacenter.GPU `bson:"gpus,omitempty" json:"gp_us,omitempty"` - RAM *datacenter.RAM `bson:"ram,omitempty" json:"ram,omitempty"` - Storage uint `bson:"storage,omitempty" json:"storage,omitempty"` - Parallel bool `bson:"parallel,omitempty" json:"parallel,omitempty"` - ScalingModel uint `bson:"scaling_model,omitempty" json:"scaling_model,omitempty"` - DiskIO string `bson:"disk_io,omitempty" json:"disk_io,omitempty"` + CPUs []*datacenter.CPU `bson:"cpus,omitempty" json:"cp_us,omitempty"` // CPUs is the list of CPUs + GPUs []*datacenter.GPU `bson:"gpus,omitempty" json:"gp_us,omitempty"` // GPUs is the list of GPUs + RAM *datacenter.RAM `bson:"ram,omitempty" json:"ram,omitempty"` // RAM is the RAM + Storage uint `bson:"storage,omitempty" json:"storage,omitempty"` // Storage is the storage + Parallel bool `bson:"parallel,omitempty" json:"parallel,omitempty"` // Parallel is a flag that indicates if the processing is parallel + ScalingModel uint `bson:"scaling_model,omitempty" json:"scaling_model,omitempty"` // ScalingModel is the scaling model + DiskIO string `bson:"disk_io,omitempty" json:"disk_io,omitempty"` // DiskIO is the disk IO } func (dma *ProcessingResource) Deserialize(j map[string]interface{}) utils.DBObject { @@ -40,7 +44,7 @@ func (dma *ProcessingResource) Serialize() map[string]interface{} { } func (d *ProcessingResource) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { - data := New() - data.Init(utils.PROCESSING_RESOURCE, caller) + data := New() // Create a new instance of the accessor + data.Init(utils.PROCESSING_RESOURCE, caller) // Initialize the accessor with the PROCESSING_RESOURCE model type return data } diff --git a/models/resources/processing/processing_mongo_accessor.go b/models/resources/processing/processing_mongo_accessor.go index 450b1c7..bc06546 100644 --- a/models/resources/processing/processing_mongo_accessor.go +++ b/models/resources/processing/processing_mongo_accessor.go @@ -8,13 +8,18 @@ import ( ) type processingMongoAccessor struct { - utils.AbstractAccessor + utils.AbstractAccessor // AbstractAccessor contains the basic fields of an accessor (model, caller) } +// New creates a new instance of the processingMongoAccessor func New() *processingMongoAccessor { return &processingMongoAccessor{} } +/* +* Nothing special here, just the basic CRUD operations + */ + func (pma *processingMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { return pma.GenericDeleteOne(id, pma) } @@ -69,16 +74,17 @@ func (wfa processingMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, erro if err == nil && len(resources) > 0 { r.ResourceModel = resources[0].(*resource_model.ResourceModel) } - objs = append(objs, &r.AbstractResource) + objs = append(objs, &r.AbstractResource) // only get the abstract resource ! } return objs, 200, nil } +// Search searches for processing resources in the database, given some filters OR a search string func (wfa *processingMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) { objs := []utils.ShallowDBObject{} if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" { filters = &dbs.Filters{ - Or: map[string][]dbs.Filter{ + Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided "abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, "abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}}, "abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}}, @@ -102,7 +108,7 @@ func (wfa *processingMongoAccessor) Search(filters *dbs.Filters, search string) if err == nil && len(resources) > 0 { r.ResourceModel = resources[0].(*resource_model.ResourceModel) } - objs = append(objs, &r.AbstractResource) + objs = append(objs, &r.AbstractResource) // only get the abstract resource ! } return objs, 200, nil } diff --git a/models/resources/storage/storage.go b/models/resources/storage/storage.go index c91355c..52ef7e9 100644 --- a/models/resources/storage/storage.go +++ b/models/resources/storage/storage.go @@ -9,20 +9,24 @@ import ( ) type URL struct { - Protocol string `bson:"protocol,omitempty" json:"protocol,omitempty"` - Path string `bson:"path,omitempty" json:"path,omitempty"` + Protocol string `bson:"protocol,omitempty" json:"protocol,omitempty"` // Protocol is the protocol of the URL + Path string `bson:"path,omitempty" json:"path,omitempty"` // Path is the path of the URL } +/* +* StorageResource is a struct that represents a storage resource +* it defines the resource storage + */ type StorageResource struct { - resource_model.AbstractResource - Acronym string `bson:"acronym,omitempty" json:"acronym,omitempty"` - Type string `bson:"type,omitempty" json:"type,omitempty"` - Size uint `bson:"size,omitempty" json:"size,omitempty"` - Url *URL `bson:"url,omitempty" json:"url,omitempty"` // Will allow to select between several protocols + resource_model.AbstractResource // AbstractResource contains the basic fields of an object (id, name) + Acronym string `bson:"acronym,omitempty" json:"acronym,omitempty"` // Acronym is the acronym of the storage + Type string `bson:"type,omitempty" json:"type,omitempty"` // Type is the type of the storage + Size uint `bson:"size,omitempty" json:"size,omitempty"` // Size is the size of the storage + Url *URL `bson:"url,omitempty" json:"url,omitempty"` // Will allow to select between several protocols - Encryption bool `bson:"encryption,omitempty" json:"encryption,omitempty"` - Redundancy string `bson:"redundancy,omitempty" json:"redundancy,omitempty"` - Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` + Encryption bool `bson:"encryption,omitempty" json:"encryption,omitempty"` // Encryption is a flag that indicates if the storage is encrypted + Redundancy string `bson:"redundancy,omitempty" json:"redundancy,omitempty"` // Redundancy is the redundancy of the storage + Throughput string `bson:"throughput,omitempty" json:"throughput,omitempty"` // Throughput is the throughput of the storage } func (dma *StorageResource) Deserialize(j map[string]interface{}) utils.DBObject { @@ -45,7 +49,7 @@ func (dma *StorageResource) Serialize() map[string]interface{} { } func (d *StorageResource) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { - data := New() - data.Init(utils.STORAGE_RESOURCE, caller) + data := New() // Create a new instance of the accessor + data.Init(utils.STORAGE_RESOURCE, caller) // Initialize the accessor with the STORAGE_RESOURCE model type return data } diff --git a/models/resources/storage/storage_mongo_accessor.go b/models/resources/storage/storage_mongo_accessor.go index 5e9bf30..5df158d 100644 --- a/models/resources/storage/storage_mongo_accessor.go +++ b/models/resources/storage/storage_mongo_accessor.go @@ -8,13 +8,18 @@ import ( ) type storageMongoAccessor struct { - utils.AbstractAccessor + utils.AbstractAccessor // AbstractAccessor contains the basic fields of an accessor (model, caller) } +// New creates a new instance of the storageMongoAccessor func New() *storageMongoAccessor { return &storageMongoAccessor{} } +/* +* Nothing special here, just the basic CRUD operations + */ + func (sma *storageMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { return sma.GenericDeleteOne(id, sma) } @@ -69,16 +74,17 @@ func (wfa storageMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) if err == nil && len(resources) > 0 { r.ResourceModel = resources[0].(*resource_model.ResourceModel) } - objs = append(objs, &r.AbstractResource) + objs = append(objs, &r.AbstractResource) // only get the abstract resource ! } return objs, 200, nil } +// Search searches for storage resources in the database, given some filters OR a search string func (wfa *storageMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) { objs := []utils.ShallowDBObject{} if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" { filters = &dbs.Filters{ - Or: map[string][]dbs.Filter{ + Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided "abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, "abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}}, "abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}}, @@ -102,7 +108,7 @@ func (wfa *storageMongoAccessor) Search(filters *dbs.Filters, search string) ([] if err == nil && len(resources) > 0 { r.ResourceModel = resources[0].(*resource_model.ResourceModel) } - objs = append(objs, &r.AbstractResource) + objs = append(objs, &r.AbstractResource) // only get the abstract resource ! } return objs, 200, nil } diff --git a/models/resources/workflow/graph/graph.go b/models/resources/workflow/graph/graph.go index bf6b565..52d06d6 100644 --- a/models/resources/workflow/graph/graph.go +++ b/models/resources/workflow/graph/graph.go @@ -2,43 +2,48 @@ package graph import "cloud.o-forge.io/core/oc-lib/models/resources" +// Graph is a struct that represents a graph type Graph struct { - Zoom float64 `bson:"zoom" json:"zoom" default:"1"` - Items map[string]GraphItem `bson:"items" json:"items" default:"{}" validate:"required"` - Links []GraphLink `bson:"links" json:"links" default:"{}" validate:"required"` + Zoom float64 `bson:"zoom" json:"zoom" default:"1"` // Zoom is the graphical zoom of the graph + Items map[string]GraphItem `bson:"items" json:"items" default:"{}" validate:"required"` // Items is the list of elements in the graph + Links []GraphLink `bson:"links" json:"links" default:"{}" validate:"required"` // Links is the list of links between elements in the graph } +// GraphItem is a struct that represents an item in a graph type GraphItem struct { - ID string `bson:"id" json:"id" validate:"required"` - Width float64 `bson:"width" json:"width" validate:"required"` - Height float64 `bson:"height" json:"height" validate:"required"` - Position Position `bson:"position" json:"position" validate:"required"` - *resources.ItemResource + ID string `bson:"id" json:"id" validate:"required"` // ID is the unique identifier of the item + Width float64 `bson:"width" json:"width" validate:"required"` // Width is the graphical width of the item + Height float64 `bson:"height" json:"height" validate:"required"` // Height is the graphical height of the item + Position Position `bson:"position" json:"position" validate:"required"` // Position is the graphical position of the item + *resources.ItemResource // ItemResource is the resource of the item affected to the item } +// GraphLink is a struct that represents a link between two items in a graph type GraphLink struct { - Source Position `bson:"source" json:"source" validate:"required"` - Destination Position `bson:"destination" json:"destination" validate:"required"` - Style *GraphLinkStyle `bson:"style,omitempty" json:"style,omitempty"` + Source Position `bson:"source" json:"source" validate:"required"` // Source is the source graphical position of the link + Destination Position `bson:"destination" json:"destination" validate:"required"` // Destination is the destination graphical position of the link + Style *GraphLinkStyle `bson:"style,omitempty" json:"style,omitempty"` // Style is the graphical style of the link } +// GraphLinkStyle is a struct that represents the style of a link in a graph type GraphLinkStyle struct { - Color int64 `bson:"color" json:"color"` - Stroke float64 `bson:"stroke" json:"stroke"` - Tension float64 `bson:"tension" json:"tension"` - HeadRadius float64 `bson:"head_radius" json:"head_radius"` - DashWidth float64 `bson:"dash_width" json:"dash_width"` - DashSpace float64 `bson:"dash_space" json:"dash_space"` - EndArrow Position `bson:"end_arrow" json:"end_arrow"` - StartArrow Position `bson:"start_arrow" json:"start_arrow"` - ArrowStyle int64 `bson:"arrow_style" json:"arrow_style"` - ArrowDirection int64 `bson:"arrow_direction" json:"arrow_direction"` - StartArrowWidth float64 `bson:"start_arrow_width" json:"start_arrow_width"` - EndArrowWidth float64 `bson:"end_arrow_width" json:"end_arrow_width"` + Color int64 `bson:"color" json:"color"` // Color is the graphical color of the link (int description of a color, can be transpose as hex) + Stroke float64 `bson:"stroke" json:"stroke"` // Stroke is the graphical stroke of the link + Tension float64 `bson:"tension" json:"tension"` // Tension is the graphical tension of the link + HeadRadius float64 `bson:"head_radius" json:"head_radius"` // graphical pin radius + DashWidth float64 `bson:"dash_width" json:"dash_width"` // DashWidth is the graphical dash width of the link + DashSpace float64 `bson:"dash_space" json:"dash_space"` // DashSpace is the graphical dash space of the link + EndArrow Position `bson:"end_arrow" json:"end_arrow"` // EndArrow is the graphical end arrow of the link + StartArrow Position `bson:"start_arrow" json:"start_arrow"` // StartArrow is the graphical start arrow of the link + ArrowStyle int64 `bson:"arrow_style" json:"arrow_style"` // ArrowStyle is the graphical arrow style of the link (enum foundable in UI) + ArrowDirection int64 `bson:"arrow_direction" json:"arrow_direction"` // ArrowDirection is the graphical arrow direction of the link (enum foundable in UI) + StartArrowWidth float64 `bson:"start_arrow_width" json:"start_arrow_width"` // StartArrowWidth is the graphical start arrow width of the link + EndArrowWidth float64 `bson:"end_arrow_width" json:"end_arrow_width"` // EndArrowWidth is the graphical end arrow width of the link } +// Position is a struct that represents a graphical position type Position struct { - ID string `json:"id" bson:"id" validate:"required"` - X float64 `json:"x" bson:"x" validate:"required"` - Y float64 `json:"y" bson:"y" validate:"required"` + ID string `json:"id" bson:"id"` // ID reprents ItemID (optionnal), TODO: rename to ItemID + X float64 `json:"x" bson:"x" validate:"required"` // X is the graphical x position + Y float64 `json:"y" bson:"y" validate:"required"` // Y is the graphical y position } diff --git a/models/resources/workflow/workflow.go b/models/resources/workflow/workflow.go index 80cb68e..2a7a900 100644 --- a/models/resources/workflow/workflow.go +++ b/models/resources/workflow/workflow.go @@ -8,14 +8,16 @@ import ( "cloud.o-forge.io/core/oc-lib/tools" ) +// WorkflowResource is a struct that represents a workflow resource +// it defines the resource workflow type WorkflowResource struct { resource_model.AbstractResource - WorkflowID string `bson:"workflow_id,omitempty" json:"workflow_id,omitempty"` + WorkflowID string `bson:"workflow_id,omitempty" json:"workflow_id,omitempty"` // WorkflowID is the ID of the native workflow } func (d *WorkflowResource) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { - data := New() - data.Init(utils.WORKFLOW_RESOURCE, caller) + data := New() // Create a new instance of the accessor + data.Init(utils.WORKFLOW_RESOURCE, caller) // Initialize the accessor with the WORKFLOW_RESOURCE model type return data } diff --git a/models/resources/workflow/workflow_mongo_accessor.go b/models/resources/workflow/workflow_mongo_accessor.go index f73134a..01c7f9d 100644 --- a/models/resources/workflow/workflow_mongo_accessor.go +++ b/models/resources/workflow/workflow_mongo_accessor.go @@ -8,7 +8,7 @@ import ( ) type workflowResourceMongoAccessor struct { - utils.AbstractAccessor + utils.AbstractAccessor // AbstractAccessor contains the basic fields of an accessor (model, caller) } func New() *workflowResourceMongoAccessor { @@ -78,11 +78,12 @@ func (wfa workflowResourceMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int return objs, 200, nil } +// Search searches for workflow resources in the database, given some filters OR a search string func (wfa *workflowResourceMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) { objs := []utils.ShallowDBObject{} if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" { filters = &dbs.Filters{ - Or: map[string][]dbs.Filter{ + Or: map[string][]dbs.Filter{ // filter by like name, short_description, description, owner, url if no filters are provided "abstractresource.abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, "abstractresource.short_description": {{Operator: dbs.LIKE.String(), Value: search}}, "abstractresource.description": {{Operator: dbs.LIKE.String(), Value: search}}, diff --git a/models/resources/workflow/workflow_schedule.go b/models/resources/workflow/workflow_schedule.go deleted file mode 100644 index 036fbac..0000000 --- a/models/resources/workflow/workflow_schedule.go +++ /dev/null @@ -1,16 +0,0 @@ -package oclib - -import "time" - -type WorkflowSchedule struct { - Id string `json:"id" bson:"id"` - Name string `json:"name" bson:"name"` - Start time.Time `json:"start" bson:"start"` - End time.Time `json:"end,omitempty" bson:"end,omitempty"` - Cron string `json:"cron,omitempty" bson:"cron,omitempty"` -} - -func (ws *WorkflowSchedule) GetAllDates() (timetable []time.Time) { - // Return all the execution time generated by the Cron - return -} diff --git a/models/utils/abstracts.go b/models/utils/abstracts.go index b556cae..35bf71c 100644 --- a/models/utils/abstracts.go +++ b/models/utils/abstracts.go @@ -13,21 +13,30 @@ import ( "github.com/rs/zerolog" ) +// single instance of the validator used in every model Struct to validate the fields var validate = validator.New(validator.WithRequiredStructEnabled()) +/* +* AbstractObject is a struct that represents the basic fields of an object +* it defines the object id and name +* every data in base root model should inherit from this struct (only exception is the ResourceModel) + */ type AbstractObject struct { UUID string `json:"id,omitempty" bson:"id,omitempty" validate:"required"` Name string `json:"name,omitempty" bson:"name,omitempty" validate:"required"` } +// GetID returns the id of the object (abstract) func (ao *AbstractObject) GetID() string { return ao.UUID } +// GetName returns the name of the object (abstract) func (ao *AbstractObject) GetName() string { return ao.Name } +// GetAccessor returns the accessor of the object (abstract) func (dma *AbstractObject) GetAccessor(caller *tools.HTTPCaller) Accessor { return nil } @@ -58,9 +67,9 @@ func (r *AbstractObject) GenerateID() { } type AbstractAccessor struct { - Logger zerolog.Logger - Type string - Caller *tools.HTTPCaller + Logger zerolog.Logger // Logger is the logger of the accessor, it's a specilized logger for the accessor + Type string // Type is the data type of the accessor + Caller *tools.HTTPCaller // Caller is the http caller of the accessor (optionnal) only need in a peer connection } func (dma *AbstractAccessor) GetType() string { @@ -71,12 +80,14 @@ func (dma *AbstractAccessor) GetCaller() *tools.HTTPCaller { return dma.Caller } +// Init initializes the accessor with the data type and the http caller func (dma *AbstractAccessor) Init(t DataType, caller *tools.HTTPCaller) { - dma.Logger = logs.CreateLogger(t.String(), "") - dma.Caller = caller - dma.Type = t.String() + dma.Logger = logs.CreateLogger(t.String(), "") // Create a logger with the data type + dma.Caller = caller // Set the caller + dma.Type = t.String() // Set the data type } +// GenericLoadOne loads one object from the database (generic) func (wfa *AbstractAccessor) GenericStoreOne(data DBObject, accessor Accessor) (DBObject, int, error) { data.GenerateID() f := dbs.Filters{ @@ -106,6 +117,7 @@ func (wfa *AbstractAccessor) GenericStoreOne(data DBObject, accessor Accessor) ( return accessor.LoadOne(id) } +// GenericLoadOne loads one object from the database (generic) func (dma *AbstractAccessor) GenericDeleteOne(id string, accessor Accessor) (DBObject, int, error) { res, code, err := accessor.LoadOne(id) if err != nil { @@ -120,15 +132,17 @@ func (dma *AbstractAccessor) GenericDeleteOne(id string, accessor Accessor) (DBO return res, 200, nil } +// GenericLoadOne loads one object from the database (generic) +// json expected in entry is a flatted object no need to respect the inheritance hierarchy func (dma *AbstractAccessor) GenericUpdateOne(set DBObject, id string, accessor Accessor, new DBObject) (DBObject, int, error) { r, c, err := accessor.LoadOne(id) if err != nil { return nil, c, err } - change := set.Serialize() - loaded := r.Serialize() + change := set.Serialize() // get the changes + loaded := r.Serialize() // get the loaded object - for k, v := range change { + for k, v := range change { // apply the changes, with a flatten method loaded[k] = v } id, code, err := mongo.MONGOService.UpdateOne(new.Deserialize(loaded), id, accessor.GetType()) diff --git a/models/utils/enums.go b/models/utils/enums.go index c0c537f..ca16c0e 100644 --- a/models/utils/enums.go +++ b/models/utils/enums.go @@ -2,6 +2,7 @@ package utils type DataType int +// DataType - Enum for the different types of resources in db accessible from the outside const ( INVALID DataType = iota DATA_RESOURCE @@ -19,6 +20,7 @@ const ( BOOKING ) +// Bind the standard API name to the data type var DefaultAPI = [...]string{ "", "oc-catalog", @@ -36,6 +38,7 @@ var DefaultAPI = [...]string{ "oc-datacenter", } +// Bind the standard data name to the data type var Str = [...]string{ "invalid", "data_resource", @@ -57,11 +60,11 @@ func FromInt(i int) string { return Str[i] } -func (d DataType) API() string { +func (d DataType) API() string { // API - Returns the API name of the data type return DefaultAPI[d] } -func (d DataType) String() string { +func (d DataType) String() string { // String - Returns the string name of the data type return Str[d] } diff --git a/models/utils/interfaces.go b/models/utils/interfaces.go index 88c47bc..006a57c 100644 --- a/models/utils/interfaces.go +++ b/models/utils/interfaces.go @@ -5,6 +5,7 @@ import ( "cloud.o-forge.io/core/oc-lib/tools" ) +// ShallowDBObject is an interface that defines the basic methods shallowed version of a DBObject type ShallowDBObject interface { GenerateID() GetID() string @@ -13,6 +14,7 @@ type ShallowDBObject interface { Serialize() map[string]interface{} } +// DBObject is an interface that defines the basic methods for a DBObject type DBObject interface { GenerateID() GetID() string @@ -22,6 +24,7 @@ type DBObject interface { GetAccessor(caller *tools.HTTPCaller) Accessor } +// Accessor is an interface that defines the basic methods for an Accessor type Accessor interface { Init(t DataType, caller *tools.HTTPCaller) GetType() string diff --git a/models/workflow/workflow.go b/models/workflow/workflow.go index 54c5991..4b5cdec 100644 --- a/models/workflow/workflow.go +++ b/models/workflow/workflow.go @@ -12,14 +12,21 @@ import ( "cloud.o-forge.io/core/oc-lib/tools" ) +/* +* AbstractWorkflow is a struct that represents a workflow for resource or native workflow +* Warning: there is 2 types of workflows, the resource workflow and the native workflow +* native workflow is the one that you create to schedule an execution +* resource workflow is the one that is created to set our native workflow in catalog + */ type AbstractWorkflow struct { resources.ResourceSet - Graph *graph.Graph `bson:"graph,omitempty" json:"graph,omitempty"` - ScheduleActive bool `json:"schedule_active" bson:"schedule_active"` - Schedule *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"` - Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` + Graph *graph.Graph `bson:"graph,omitempty" json:"graph,omitempty"` // Graph UI & logic representation of the workflow + ScheduleActive bool `json:"schedule_active" bson:"schedule_active"` // ScheduleActive is a flag that indicates if the schedule is active, if not the workflow is not scheduled and no execution or booking will be set + Schedule *WorkflowSchedule `bson:"schedule,omitempty" json:"schedule,omitempty"` // Schedule is the schedule of the workflow + Shared []string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workflow } +// tool function to check if a link is a link between a datacenter and a resource func (w *AbstractWorkflow) isDCLink(link graph.GraphLink) (bool, string) { if w.Graph == nil || w.Graph.Items == nil { return false, "" @@ -33,28 +40,35 @@ func (w *AbstractWorkflow) isDCLink(link graph.GraphLink) (bool, string) { return false, "" } +/* +* Workflow is a struct that represents a workflow +* it defines the native workflow + */ type Workflow struct { - utils.AbstractObject - AbstractWorkflow + utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) + AbstractWorkflow // AbstractWorkflow contains the basic fields of a workflow } +/* +* CheckBooking is a function that checks the booking of the workflow on peers (even ourselves) + */ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) { // check if - if wfa.Graph == nil { + if wfa.Graph == nil { // no graph no booking return false, nil } accessor := (&datacenter.DatacenterResource{}).GetAccessor(nil) for _, link := range wfa.Graph.Links { - if ok, dc_id := wfa.isDCLink(link); ok { + if ok, dc_id := wfa.isDCLink(link); ok { // check if the link is a link between a datacenter and a resource dc, code, _ := accessor.LoadOne(dc_id) if code != 200 { continue } - // CHECK BOOKING + // CHECK BOOKING ON PEER, datacenter could be a remote one peerID := dc.(*datacenter.DatacenterResource).PeerID if peerID == "" { return false, errors.New("no peer id") - } + } // no peer id no booking, we need to know where to book _, err := (&peer.Peer{}).LaunchPeerExecution(peerID, dc_id, utils.BOOKING, tools.GET, nil, caller) if err != nil { return false, err @@ -69,8 +83,8 @@ func (d *Workflow) GetName() string { } func (d *Workflow) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { - data := New() - data.Init(utils.WORKFLOW, caller) + data := New() // Create a new instance of the accessor + data.Init(utils.WORKFLOW, caller) // Initialize the accessor with the WORKFLOW model type return data } diff --git a/models/workflow/workflow_mongo_accessor.go b/models/workflow/workflow_mongo_accessor.go index a528ad7..ef31949 100644 --- a/models/workflow/workflow_mongo_accessor.go +++ b/models/workflow/workflow_mongo_accessor.go @@ -2,7 +2,6 @@ package workflow import ( "errors" - "fmt" "strings" "cloud.o-forge.io/core/oc-lib/dbs" @@ -19,52 +18,62 @@ import ( ) type workflowMongoAccessor struct { - utils.AbstractAccessor + utils.AbstractAccessor // AbstractAccessor contains the basic fields of an accessor (model, caller) } +// New creates a new instance of the workflowMongoAccessor func New() *workflowMongoAccessor { return &workflowMongoAccessor{} } +/* +* THERE IS A LOT IN THIS FILE SHOULD BE AWARE OF THE COMMENTS + */ + +/* +* getExecutions is a function that returns the executions of a workflow +* it returns an array of workflow_execution.WorkflowExecution + */ func (wfa *workflowMongoAccessor) getExecutions(id string, data *Workflow) ([]*workflow_execution.WorkflowExecution, error) { workflows_execution := []*workflow_execution.WorkflowExecution{} - if data.Schedule != nil { - if data.Schedule.Start == nil { + if data.Schedule != nil { // only set execution on a scheduled workflow + if data.Schedule.Start == nil { // if no start date, return an error return workflows_execution, errors.New("should get a start date on the scheduler.") } - if data.Schedule.End != nil && data.Schedule.End.IsZero() { + if data.Schedule.End != nil && data.Schedule.End.IsZero() { // if end date is zero, set it to nil data.Schedule.End = nil } - if len(data.Schedule.Cron) > 0 { + if len(data.Schedule.Cron) > 0 { // if cron is set then end date should be set if data.Schedule.End == nil { return workflows_execution, errors.New("a cron task should have an end date.") } - cronStr := strings.Split(data.Schedule.Cron, " ") - if len(cronStr) < 6 { + cronStr := strings.Split(data.Schedule.Cron, " ") // split the cron string to treat it + if len(cronStr) < 6 { // if the cron string is less than 6 fields, return an error because format is : ss mm hh dd MM dw (6 fields) return nil, errors.New("Bad cron message: " + data.Schedule.Cron + ". Should be at least ss mm hh dd MM dw") } subCron := strings.Join(cronStr[:6], " ") // cron should be parsed as ss mm hh dd MM dw t (min 6 fields) - specParser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) - sched, err := specParser.Parse(subCron) + specParser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow) // create a new cron parser + sched, err := specParser.Parse(subCron) // parse the cron string if err != nil { return workflows_execution, errors.New("Bad cron message: " + err.Error()) } + // loop through the cron schedule to set the executions for s := sched.Next(*data.Schedule.Start); !s.IsZero() && s.Before(*data.Schedule.End); s = sched.Next(s) { obj := &workflow_execution.WorkflowExecution{ AbstractObject: utils.AbstractObject{ - Name: data.Schedule.Name, + Name: data.Schedule.Name, // set the name of the execution }, - ExecDate: &s, - EndDate: data.Schedule.End, - State: 1, - WorkflowID: id, + ExecDate: &s, // set the execution date + EndDate: data.Schedule.End, // set the end date + State: 1, // set the state to 1 (scheduled) + WorkflowID: id, // set the workflow id dependancy of the execution } - workflows_execution = append(workflows_execution, obj) + workflows_execution = append(workflows_execution, obj) // append the execution to the array } - } else { - obj := &workflow_execution.WorkflowExecution{ + } else { // if no cron, set the execution to the start date + obj := &workflow_execution.WorkflowExecution{ // create a new execution AbstractObject: utils.AbstractObject{ Name: data.Schedule.Name, }, @@ -73,24 +82,30 @@ func (wfa *workflowMongoAccessor) getExecutions(id string, data *Workflow) ([]*w State: 1, WorkflowID: id, } - workflows_execution = append(workflows_execution, obj) + workflows_execution = append(workflows_execution, obj) // append the execution to the array } } return workflows_execution, nil } +// DeleteOne deletes a workflow from the database, delete depending executions and bookings func (wfa *workflowMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { wfa.execution(id, &Workflow{ AbstractWorkflow: AbstractWorkflow{ScheduleActive: false}, - }, true) + }, true) // delete the executions res, code, err := wfa.GenericDeleteOne(id, wfa) if res != nil && code == 200 { - wfa.execute(res.(*Workflow), false) + wfa.execute(res.(*Workflow), false) // up to date the workspace for the workflow } - wfa.share(res.(*Workflow), true, wfa.Caller) + wfa.share(res.(*Workflow), true, wfa.Caller) // send the deletion to the peers where workflow is shared return res, code, err } +/* +* book is a function that books a workflow on the peers +* it takes the workflow id, the real data and the executions +* it returns an error if the booking fails + */ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*workflow_execution.WorkflowExecution) error { if wfa.Caller == nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.BOOKING.String()] == nil { return errors.New("no caller defined") @@ -105,27 +120,28 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w } r := res.(*Workflow) g := r.Graph - if realData.Graph != nil { + if realData.Graph != nil { // if the graph is set, set it to the real data g = realData.Graph } - if g != nil && g.Links != nil && len(g.Links) > 0 { + if g != nil && g.Links != nil && len(g.Links) > 0 { // if the graph is set and has links then book the workflow (even on ourselves) accessor := (&datacenter.DatacenterResource{}).GetAccessor(nil) for _, link := range g.Links { - if ok, dc_id := realData.isDCLink(link); ok { + if ok, dc_id := realData.isDCLink(link); ok { // check if the link is a link between a datacenter and a resource booking is only on datacenter dc, code, _ := accessor.LoadOne(dc_id) if code != 200 { continue } // CHECK BOOKING peerID := dc.(*datacenter.DatacenterResource).PeerID - if peerID == "" { + if peerID == "" { // no peer id no booking continue } + // BOOKING ON PEER _, err := (&peer.Peer{}).LaunchPeerExecution(peerID, "", utils.BOOKING, tools.POST, - (&workflow_execution.WorkflowExecutions{ - WorkflowID: id, - ResourceID: dc_id, - Executions: execs, + (&workflow_execution.WorkflowExecutions{ // it's the standard model for booking see OC-PEER + WorkflowID: id, // set the workflow id "WHO" + ResourceID: dc_id, // set the datacenter id "WHERE" + Executions: execs, // set the executions to book "WHAT" }).Serialize(), wfa.Caller) if err != nil { return err @@ -136,11 +152,14 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w return nil } +/* +* share is a function that shares a workflow to the peers if the workflow is shared + */ func (wfa *workflowMongoAccessor) share(realData *Workflow, delete bool, caller *tools.HTTPCaller) { - if realData.Shared == nil || len(realData.Shared) == 0 { + if realData.Shared == nil || len(realData.Shared) == 0 { // no shared no sharing return } - for _, sharedID := range realData.Shared { + for _, sharedID := range realData.Shared { // loop through the shared ids access := (&shallow_shared_workspace.ShallowSharedWorkspace{}).GetAccessor(nil) res, code, _ := access.LoadOne(sharedID) if code != 200 { @@ -150,12 +169,12 @@ func (wfa *workflowMongoAccessor) share(realData *Workflow, delete bool, caller paccess := &peer.Peer{} for _, p := range res.(*shallow_shared_workspace.ShallowSharedWorkspace).Peers { paccess.UUID = p - if paccess.IsMySelf() { + if paccess.IsMySelf() { // if the peer is the current peer, never share because it will create a loop continue } - if delete { + if delete { // if the workflow is deleted, share the deletion _, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKFLOW, tools.DELETE, map[string]interface{}{}, caller) - } else { + } else { // if the workflow is updated, share the update _, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKFLOW, tools.PUT, res.Serialize(), caller) } } @@ -165,108 +184,112 @@ func (wfa *workflowMongoAccessor) share(realData *Workflow, delete bool, caller } } +/* +* execution is a create or delete function for the workflow executions depending on the schedule of the workflow + */ func (wfa *workflowMongoAccessor) execution(id string, realData *Workflow, delete bool) (int, error) { var err error - nats := tools.NewNATSCaller() - fmt.Println("EXECUTION", realData.ScheduleActive) - if !realData.ScheduleActive { + nats := tools.NewNATSCaller() // create a new nats caller because executions are sent to the nats for daemons + if !realData.ScheduleActive { // if the schedule is not active, delete the executions mongo.MONGOService.DeleteMultiple(map[string]interface{}{ - "state": 1, + "state": 1, // only delete the scheduled executions only scheduled if executions are in progress or ended, they should not be deleted for registration "workflow_id": id, }, utils.WORKFLOW_EXECUTION.String()) - err := wfa.book(id, realData, []*workflow_execution.WorkflowExecution{}) - nats.SetNATSPub(utils.WORKFLOW.String(), tools.REMOVE, realData) + err := wfa.book(id, realData, []*workflow_execution.WorkflowExecution{}) // delete the booking of the workflow on the peers + nats.SetNATSPub(utils.WORKFLOW.String(), tools.REMOVE, realData) // send the deletion to the nats return 200, err } accessor := (&workflow_execution.WorkflowExecution{}).GetAccessor(nil) - execs, err := wfa.getExecutions(id, realData) + execs, err := wfa.getExecutions(id, realData) // get the executions of the workflow if err != nil { return 422, err } - err = wfa.book(id, realData, execs) + err = wfa.book(id, realData, execs) // book the workflow on the peers if err != nil { - return 409, err + return 409, err // if the booking fails, return an error for integrity between peers } - if delete { + if delete { // if delete is set to true, delete the executions mongo.MONGOService.DeleteMultiple(map[string]interface{}{ "workflow_id": id, "state": 1, }, utils.WORKFLOW_EXECUTION.String()) wfa.book(id, realData, []*workflow_execution.WorkflowExecution{}) nats.SetNATSPub(utils.WORKFLOW.String(), tools.REMOVE, realData) + return 200, nil } - if len(execs) > 0 { + if len(execs) > 0 { // if the executions are set, store them for _, obj := range execs { _, code, err := accessor.StoreOne(obj) if code != 200 { return code, err } } - nats.SetNATSPub(utils.WORKFLOW.String(), tools.CREATE, realData) + nats.SetNATSPub(utils.WORKFLOW.String(), tools.CREATE, realData) // send the creation to the nats } else { return 422, err } return 200, nil } +// UpdateOne updates a workflow in the database func (wfa *workflowMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { res, code, err := wfa.LoadOne(id) if code != 200 { return nil, 409, err } - //new := set.(*Workflow) + // avoid the update if the schedule is the same avoid := set.(*Workflow).Schedule == nil || (res.(*Workflow).Schedule != nil && res.(*Workflow).ScheduleActive == set.(*Workflow).ScheduleActive && res.(*Workflow).Schedule.Start == set.(*Workflow).Schedule.Start && res.(*Workflow).Schedule.End == set.(*Workflow).Schedule.End && res.(*Workflow).Schedule.Cron == set.(*Workflow).Schedule.Cron) - /*for _, i := range new.Graph.Items { - if i.Datacenter == nil && i.Processing == nil && i.Storage == nil && i.Workflow == nil && i.Data == nil { - return nil, 422, errors.New("graph item should have at least one resource data is corrupted") - } - }*/ res, code, err = wfa.GenericUpdateOne(set, id, wfa, &Workflow{}) if code != 200 { return nil, code, err } - if !avoid { + if !avoid { // if the schedule is not avoided, update the executions if code, err := wfa.execution(id, res.(*Workflow), true); err != nil { return nil, code, err } } - wfa.execute(res.(*Workflow), false) - wfa.share(res.(*Workflow), false, wfa.Caller) + wfa.execute(res.(*Workflow), false) // update the workspace for the workflow + wfa.share(res.(*Workflow), false, wfa.Caller) // share the update to the peers return res, code, err } +// StoreOne stores a workflow in the database func (wfa *workflowMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { res, code, err := wfa.GenericStoreOne(data, wfa) if err != nil { return nil, code, err } + //store the executions if code, err := wfa.execution(res.GetID(), res.(*Workflow), false); err != nil { return nil, code, err } - wfa.execute(res.(*Workflow), false) + wfa.execute(res.(*Workflow), false) // store the workspace for the workflow return res, code, err } +// CopyOne copies a workflow in the database func (wfa *workflowMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) { return wfa.GenericStoreOne(data, wfa) } +// execute is a function that executes a workflow +// it stores the workflow resources in a specific workspace to never have a conflict in UI and logic func (wfa *workflowMongoAccessor) execute(workflow *Workflow, delete bool) { accessor := (&workspace.Workspace{}).GetAccessor(nil) filters := &dbs.Filters{ - Or: map[string][]dbs.Filter{ + Or: map[string][]dbs.Filter{ // filter by standard workspace name attached to a workflow "abstractobject.name": {{dbs.LIKE.String(), workflow.Name + "_workspace"}}, }, } resource, _, err := accessor.Search(filters, "") - if delete { + if delete { // if delete is set to true, delete the workspace for _, r := range resource { accessor.DeleteOne(r.GetID()) } return } - if err == nil && len(resource) > 0 { + if err == nil && len(resource) > 0 { // if the workspace already exists, update it accessor.UpdateOne(&workspace.Workspace{ Active: true, ResourceSet: resources.ResourceSet{ @@ -277,7 +300,7 @@ func (wfa *workflowMongoAccessor) execute(workflow *Workflow, delete bool) { Datacenters: workflow.Datacenters, }, }, resource[0].GetID()) - } else { + } else { // if the workspace does not exist, create it accessor.StoreOne(&workspace.Workspace{ Active: true, AbstractObject: utils.AbstractObject{Name: workflow.Name + "_workspace"}, @@ -292,6 +315,7 @@ func (wfa *workflowMongoAccessor) execute(workflow *Workflow, delete bool) { } } +// LoadOne loads a workflow from the database func (wfa *workflowMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { var workflow Workflow res_mongo, code, err := mongo.MONGOService.LoadOne(id, wfa.GetType()) @@ -300,11 +324,12 @@ func (wfa *workflowMongoAccessor) LoadOne(id string) (utils.DBObject, int, error return nil, code, err } res_mongo.Decode(&workflow) - wfa.execute(&workflow, false) + wfa.execute(&workflow, false) // if no workspace is attached to the workflow, create it return &workflow, 200, nil } +// LoadAll loads all the workflows from the database func (wfa workflowMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) { objs := []utils.ShallowDBObject{} res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType()) @@ -317,7 +342,7 @@ func (wfa workflowMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) return nil, 404, err } for _, r := range results { - objs = append(objs, &r.AbstractObject) + objs = append(objs, &r.AbstractObject) // only AbstractObject fields ! } return objs, 200, nil } @@ -326,7 +351,7 @@ func (wfa *workflowMongoAccessor) Search(filters *dbs.Filters, search string) ([ objs := []utils.ShallowDBObject{} if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" { filters = &dbs.Filters{ - Or: map[string][]dbs.Filter{ + Or: map[string][]dbs.Filter{ // filter by name if no filters are provided "abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, }, } diff --git a/models/workflow/workflow_schedule.go b/models/workflow/workflow_schedule.go index 39861ac..14ef12e 100644 --- a/models/workflow/workflow_schedule.go +++ b/models/workflow/workflow_schedule.go @@ -10,15 +10,14 @@ const ( SERVICE ) +/* +* WorkflowSchedule is a struct that contains the scheduling information of a workflow +* It contains the mode of the schedule (Task or Service), the name of the schedule, the start and end time of the schedule and the cron expression + */ type WorkflowSchedule struct { - Mode int64 `json:"mode" bson:"mode" validate:"required"` - Name string `json:"name" bson:"name" validate:"required"` - Start *time.Time `json:"start" bson:"start" validate:"required,ltfield=End"` - End *time.Time `json:"end,omitempty" bson:"end,omitempty"` - Cron string `json:"cron,omitempty" bson:"cron,omitempty"` // ss mm hh dd MM dw task -} - -func (ws *WorkflowSchedule) GetAllDates() (timetable []time.Time) { - // Return all the execution time generated by the Cron - return + Mode int64 `json:"mode" bson:"mode" validate:"required"` // Mode is the mode of the schedule (Task or Service) + Name string `json:"name" bson:"name" validate:"required"` // Name is the name of the schedule + Start *time.Time `json:"start" bson:"start" validate:"required,ltfield=End"` // Start is the start time of the schedule, is required and must be less than the End time + End *time.Time `json:"end,omitempty" bson:"end,omitempty"` // End is the end time of the schedule + Cron string `json:"cron,omitempty" bson:"cron,omitempty"` // here the cron format : ss mm hh dd MM dw task } diff --git a/models/workflow_execution/workflow_execution.go b/models/workflow_execution/workflow_execution.go index f68cec9..92aea5c 100644 --- a/models/workflow_execution/workflow_execution.go +++ b/models/workflow_execution/workflow_execution.go @@ -10,6 +10,7 @@ import ( "github.com/google/uuid" ) +// ScheduledType - Enum for the different states of a workflow execution type ScheduledType int const ( @@ -39,12 +40,18 @@ func (d ScheduledType) EnumIndex() int { return int(d) } +/* +* WorkflowExecutions is a struct that represents a list of workflow executions +* Warning: No user can write (del, post, put) a workflow execution, it is only used by the system +* workflows generate their own executions + */ type WorkflowExecutions struct { WorkflowID string `json:"workflow_id" bson:"workflow_id"` ResourceID string `json:"resource_id" bson:"resource_id"` Executions []*WorkflowExecution `json:"executions" bson:"executions"` } +// New - Creates a new instance of the WorkflowExecutions from a map func (dma *WorkflowExecutions) Deserialize(j map[string]interface{}) *WorkflowExecutions { b, err := json.Marshal(j) if err != nil { @@ -54,6 +61,7 @@ func (dma *WorkflowExecutions) Deserialize(j map[string]interface{}) *WorkflowEx return dma } +// Serialize - Returns the WorkflowExecutions as a map func (dma *WorkflowExecutions) Serialize() map[string]interface{} { var m map[string]interface{} b, err := json.Marshal(dma) @@ -64,14 +72,20 @@ func (dma *WorkflowExecutions) Serialize() map[string]interface{} { return m } +/* +* WorkflowExecution is a struct that represents a workflow execution +* Warning: No user can write (del, post, put) a workflow execution, it is only used by the system +* workflows generate their own executions + */ type WorkflowExecution struct { - utils.AbstractObject - ExecDate *time.Time `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` - EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` - State int64 `json:"state,omitempty" bson:"state,omitempty"` - WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` + utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) + ExecDate *time.Time `json:"execution_date,omitempty" bson:"execution_date,omitempty" validate:"required"` // ExecDate is the execution date of the workflow, is required + EndDate *time.Time `json:"end_date,omitempty" bson:"end_date,omitempty"` // EndDate is the end date of the workflow + State int64 `json:"state,omitempty" bson:"state,omitempty"` // State is the state of the workflow + WorkflowID string `json:"workflow_id" bson:"workflow_id,omitempty"` // WorkflowID is the ID of the workflow } +// tool to transform the argo status to a state func (wfa *WorkflowExecution) ArgoStatusToState(status string) *WorkflowExecution { status = strings.ToLower(status) switch status { @@ -100,11 +114,12 @@ func (d *WorkflowExecution) GetName() string { } func (d *WorkflowExecution) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { - data := New() - data.Init(utils.WORKFLOW_EXECUTION, caller) + data := New() // Create a new instance of the accessor + data.Init(utils.WORKFLOW_EXECUTION, caller) // Initialize the accessor with the WORKFLOW_EXECUTION model type return data } +// New creates a new instance of the WorkflowExecution from a map func (dma *WorkflowExecution) Deserialize(j map[string]interface{}) utils.DBObject { b, err := json.Marshal(j) if err != nil { @@ -114,6 +129,7 @@ func (dma *WorkflowExecution) Deserialize(j map[string]interface{}) utils.DBObje return dma } +// Serialize returns the WorkflowExecution as a map func (dma *WorkflowExecution) Serialize() map[string]interface{} { var m map[string]interface{} b, err := json.Marshal(dma) diff --git a/models/workflow_execution/workflow_execution_mongo_accessor.go b/models/workflow_execution/workflow_execution_mongo_accessor.go index 5d905d7..31503d3 100644 --- a/models/workflow_execution/workflow_execution_mongo_accessor.go +++ b/models/workflow_execution/workflow_execution_mongo_accessor.go @@ -58,11 +58,12 @@ func (wfa workflowExecutionMongoAccessor) LoadAll() ([]utils.ShallowDBObject, in return objs, 200, nil } +// Search searches for workflow executions in the database, given some filters OR a search string func (wfa *workflowExecutionMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) { objs := []utils.ShallowDBObject{} if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" { filters = &dbs.Filters{ - Or: map[string][]dbs.Filter{ + Or: map[string][]dbs.Filter{ // filter by name if no filters are provided "abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, }, } diff --git a/models/workspace/shared/rules/rule/rule.go b/models/workspace/shared/rules/rule/rule.go index 41418f6..7559e01 100644 --- a/models/workspace/shared/rules/rule/rule.go +++ b/models/workspace/shared/rules/rule/rule.go @@ -8,11 +8,14 @@ import ( "github.com/google/uuid" ) +/* +* Rule is a struct that represents a rule of a shared workspace + */ type Rule struct { - utils.AbstractObject - Description string `json:"description,omitempty" bson:"description,omitempty"` - Condition string `json:"condition,omitempty" bson:"condition,omitempty"` - Actions []string `json:"actions,omitempty" bson:"actions,omitempty"` + utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) + Description string `json:"description,omitempty" bson:"description,omitempty"` // Description is the description of the rule + Condition string `json:"condition,omitempty" bson:"condition,omitempty"` // NOT DEFINITIVE TO SPECIFICATION + Actions []string `json:"actions,omitempty" bson:"actions,omitempty"` // NOT DEFINITIVE TO SPECIFICATION } func (ao *Rule) GetID() string { diff --git a/models/workspace/shared/rules/rule/rule_mongo_accessor.go b/models/workspace/shared/rules/rule/rule_mongo_accessor.go index 4b58393..2278252 100644 --- a/models/workspace/shared/rules/rule/rule_mongo_accessor.go +++ b/models/workspace/shared/rules/rule/rule_mongo_accessor.go @@ -1,4 +1,4 @@ -package rule +package rule import ( "cloud.o-forge.io/core/oc-lib/dbs" @@ -10,18 +10,22 @@ type ruleMongoAccessor struct { utils.AbstractAccessor } +// New creates a new instance of the ruleMongoAccessor func New() *ruleMongoAccessor { return &ruleMongoAccessor{} } +// GetType returns the type of the rule func (wfa *ruleMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { return wfa.GenericDeleteOne(id, wfa) } +// UpdateOne updates a rule in the database func (wfa *ruleMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { return wfa.GenericUpdateOne(set.(*Rule), id, wfa, &Rule{}) } +// StoreOne stores a rule in the database func (wfa *ruleMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { return wfa.GenericStoreOne(data.(*Rule), wfa) } @@ -30,6 +34,7 @@ func (wfa *ruleMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, return wfa.GenericStoreOne(data, wfa) } +// LoadOne loads a rule from the database func (wfa *ruleMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { var rule Rule res_mongo, code, err := mongo.MONGOService.LoadOne(id, wfa.GetType()) @@ -41,6 +46,7 @@ func (wfa *ruleMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { return &rule, 200, nil } +// LoadAll loads all rules from the database func (wfa ruleMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) { objs := []utils.ShallowDBObject{} res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType()) @@ -58,11 +64,12 @@ func (wfa ruleMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) { return objs, 200, nil } +// Search searches for rules in the database, given some filters OR a search string func (wfa *ruleMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) { objs := []utils.ShallowDBObject{} if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" { filters = &dbs.Filters{ - Or: map[string][]dbs.Filter{ + Or: map[string][]dbs.Filter{ // filter by name if no filters are provided "abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, }, } diff --git a/models/workspace/shared/shared_workspace.go b/models/workspace/shared/shared_workspace.go index 03af1e3..fc681ac 100644 --- a/models/workspace/shared/shared_workspace.go +++ b/models/workspace/shared/shared_workspace.go @@ -12,22 +12,26 @@ import ( "github.com/google/uuid" ) +// SharedWorkspace is a struct that represents a shared workspace +// WARNING : it got a shallow object version, this one is the full version +// full version is the one used by API +// other one is a shallow version used by the Lib for import cycle problem purposes type SharedWorkspace struct { - utils.AbstractObject - IsSent bool `json:"is_sent" bson:"-"` - CreatorID string `json:"peer_id,omitempty" bson:"peer_id,omitempty" validate:"required"` - Version string `json:"version,omitempty" bson:"version,omitempty"` - Description string `json:"description,omitempty" bson:"description,omitempty" validate:"required"` - Attributes map[string]interface{} `json:"attributes,omitempty" bson:"attributes,omitempty"` - Workspaces []string `json:"workspaces,omitempty" bson:"workspaces,omitempty"` - Workflows []string `json:"workflows,omitempty" bson:"workflows,omitempty"` - Peers []string `json:"peers,omitempty" bson:"peers,omitempty"` - Rules []string `json:"rules,omitempty" bson:"rules,omitempty"` + utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) + IsSent bool `json:"is_sent" bson:"-"` // IsSent is a flag that indicates if the workspace is sent + CreatorID string `json:"peer_id,omitempty" bson:"peer_id,omitempty" validate:"required"` // CreatorID is the ID of the creator + Version string `json:"version,omitempty" bson:"version,omitempty"` // Version is the version of the workspace + Description string `json:"description,omitempty" bson:"description,omitempty" validate:"required"` // Description is the description of the workspace + Attributes map[string]interface{} `json:"attributes,omitempty" bson:"attributes,omitempty"` // Attributes is the attributes of the workspace (TODO) + Workspaces []string `json:"workspaces,omitempty" bson:"workspaces,omitempty"` // Workspaces is the workspaces of the workspace + Workflows []string `json:"workflows,omitempty" bson:"workflows,omitempty"` // Workflows is the workflows of the workspace + Peers []string `json:"peers,omitempty" bson:"peers,omitempty"` // Peers is the peers of the workspace + Rules []string `json:"rules,omitempty" bson:"rules,omitempty"` // Rules is the rules of the workspace - SharedRules []*rule.Rule `json:"shared_rules,omitempty" bson:"-"` - SharedWorkspaces []*workspace.Workspace `json:"shared_workspaces,omitempty" bson:"-"` - SharedWorkflows []*w.Workflow `json:"shared_workflows,omitempty" bson:"-"` - SharedPeers []*peer.Peer `json:"shared_peers,omitempty" bson:"-"` + SharedRules []*rule.Rule `json:"shared_rules,omitempty" bson:"-"` // SharedRules is the shared rules of the workspace + SharedWorkspaces []*workspace.Workspace `json:"shared_workspaces,omitempty" bson:"-"` // SharedWorkspaces is the shared workspaces of the workspace + SharedWorkflows []*w.Workflow `json:"shared_workflows,omitempty" bson:"-"` // SharedWorkflows is the shared workflows of the workspace + SharedPeers []*peer.Peer `json:"shared_peers,omitempty" bson:"-"` // SharedPeers is the shared peers of the workspace } func (ao *SharedWorkspace) GetID() string { @@ -45,8 +49,8 @@ func (d *SharedWorkspace) GetName() string { } func (d *SharedWorkspace) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { - data := New() - data.Init(utils.SHARED_WORKSPACE, caller) + data := New() // Create a new instance of the accessor + data.Init(utils.SHARED_WORKSPACE, caller) // Initialize the accessor with the SHARED_WORKSPACE model type return data } diff --git a/models/workspace/shared/shared_workspace_mongo_accessor.go b/models/workspace/shared/shared_workspace_mongo_accessor.go index 9e30131..ced0fad 100644 --- a/models/workspace/shared/shared_workspace_mongo_accessor.go +++ b/models/workspace/shared/shared_workspace_mongo_accessor.go @@ -15,37 +15,43 @@ import ( "cloud.o-forge.io/core/oc-lib/tools" ) +// SharedWorkspace is a struct that represents a shared workspace type sharedWorkspaceMongoAccessor struct { - utils.AbstractAccessor + utils.AbstractAccessor // AbstractAccessor contains the basic fields of an accessor (model, caller) } +// New creates a new instance of the sharedWorkspaceMongoAccessor func New() *sharedWorkspaceMongoAccessor { return &sharedWorkspaceMongoAccessor{} } +// DeleteOne deletes a shared workspace from the database, given its ID, it automatically share to peers if the workspace is shared func (wfa *sharedWorkspaceMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { set, code, _ := wfa.LoadOne(id) - if code == 200 { + if code == 200 { // always delete on peers than recreate wfa.deleteToPeer(set.(*SharedWorkspace)) } - wfa.sharedWorkflow(&SharedWorkspace{}, id) - wfa.sharedWorkspace(&SharedWorkspace{}, id) - return wfa.GenericDeleteOne(id, wfa) + wfa.sharedWorkflow(&SharedWorkspace{}, id) // create all shared workflows + wfa.sharedWorkspace(&SharedWorkspace{}, id) // create all shared workspaces + return wfa.GenericDeleteOne(id, wfa) // then add on yours } +/* +sharedWorkspace is a function that shares the shared workspace to the peers +*/ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkspace(shared *SharedWorkspace, id string) { - eldest, code, _ := wfa.LoadOne(id) + eldest, code, _ := wfa.LoadOne(id) // get the eldest accessor := (&workspace.Workspace{}).GetAccessor(nil) if code == 200 { eld := eldest.(*SharedWorkspace) - if eld.Workspaces != nil { + if eld.Workspaces != nil { // update all your workspaces in the eldest by replacing shared ref by an empty string for _, v := range eld.Workspaces { - accessor.UpdateOne(&workspace.Workspace{Shared: shared.UUID}, v) + accessor.UpdateOne(&workspace.Workspace{Shared: ""}, v) if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKSPACE.String()] == nil { continue } - paccess := (&peer.Peer{}) - for _, p := range shared.Peers { + paccess := (&peer.Peer{}) // send to all peers + for _, p := range shared.Peers { // delete the shared workspace on the peer b, err := paccess.LaunchPeerExecution(p, v, utils.WORKSPACE, tools.DELETE, nil, wfa.Caller) if err != nil && b == nil { wfa.Logger.Error().Msg("Could not send to peer " + p + ". Error: " + err.Error()) @@ -55,8 +61,8 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkspace(shared *SharedWorkspace } } if shared.Workspaces != nil { - for _, v := range shared.Workspaces { - workspace, code, _ := accessor.UpdateOne(&workspace.Workspace{Shared: shared.UUID}, v) + for _, v := range shared.Workspaces { // update all the shared workspaces + workspace, code, _ := accessor.UpdateOne(&workspace.Workspace{Shared: shared.UUID}, v) // add the shared ref to workspace if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKSPACE.String()] == nil { continue } @@ -64,7 +70,7 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkspace(shared *SharedWorkspace if code != 200 { continue } - paccess := (&peer.Peer{}) + paccess := (&peer.Peer{}) // send to all peers, add the shared workspace on the peer s := workspace.Serialize() s["name"] = fmt.Sprintf("%v", s["name"]) + "_" + p b, err := paccess.LaunchPeerExecution(p, v, utils.WORKSPACE, tools.POST, s, wfa.Caller) @@ -74,11 +80,14 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkspace(shared *SharedWorkspace } } } + // deleting on peers before adding, to avoid conflicts on peers side + // because you have no reference to the remote shared workspace } +// sharedWorkflow is a function that shares the shared workflow to the peers func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace, id string) { accessor := (&w.Workflow{}).GetAccessor(nil) - eldest, code, _ := wfa.LoadOne(id) + eldest, code, _ := wfa.LoadOne(id) // get the eldest if code == 200 { eld := eldest.(*SharedWorkspace) if eld.Workflows != nil { @@ -91,15 +100,15 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace, if id2 != id { new = append(new, id2) } - } + } // kick the shared reference in your old shared workflow n := &w.Workflow{} n.Shared = new accessor.UpdateOne(n, v) if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKFLOW.String()] == nil { continue } - paccess := (&peer.Peer{}) - for _, p := range shared.Peers { + paccess := (&peer.Peer{}) // send to all peers + for _, p := range shared.Peers { // delete the shared workflow on the peer b, err := paccess.LaunchPeerExecution(p, v, utils.WORKFLOW, tools.DELETE, nil, wfa.Caller) if err != nil && b == nil { wfa.Logger.Error().Msg("Could not send to peer " + p + ". Error: " + err.Error()) @@ -109,7 +118,7 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace, } } } - if shared.Workflows != nil { + if shared.Workflows != nil { // update all the shared workflows for _, v := range shared.Workflows { data, code, _ := accessor.LoadOne(v) if code == 200 { @@ -121,9 +130,9 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace, continue } paccess := (&peer.Peer{}) - for _, p := range shared.Peers { + for _, p := range shared.Peers { // send to all peers if code == 200 { - s := workflow.Serialize() + s := workflow.Serialize() // add the shared workflow on the peer s["name"] = fmt.Sprintf("%v", s["name"]) + "_" + p b, err := paccess.LaunchPeerExecution(p, shared.UUID, utils.WORKFLOW, tools.POST, s, wfa.Caller) if err != nil && b == nil { @@ -135,8 +144,11 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace, } } } + // deleting on peers before adding, to avoid conflicts on peers side + // because you have no reference to the remote shared workflow } +// sharedWorkspace is a function that shares the shared workspace to the peers func (wfa *sharedWorkspaceMongoAccessor) deleteToPeer(shared *SharedWorkspace) { if wfa.Caller == nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()] == nil { return @@ -153,6 +165,7 @@ func (wfa *sharedWorkspaceMongoAccessor) deleteToPeer(shared *SharedWorkspace) { } } +// sharedWorkspace is a function that shares the shared workspace to the peers func (wfa *sharedWorkspaceMongoAccessor) sendToPeer(shared *SharedWorkspace) { if wfa.Caller == nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.SHARED_WORKSPACE.String()] == nil { return @@ -171,20 +184,22 @@ func (wfa *sharedWorkspaceMongoAccessor) sendToPeer(shared *SharedWorkspace) { } } +// UpdateOne updates a shared workspace in the database, given its ID and the new data, it automatically share to peers if the workspace is shared func (wfa *sharedWorkspaceMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { res, code, err := wfa.GenericUpdateOne(set.(*SharedWorkspace), id, wfa, &SharedWorkspace{}) - fmt.Println("UPDATE SHARED", res.Serialize()) - wfa.deleteToPeer(res.(*SharedWorkspace)) - wfa.sharedWorkflow(res.(*SharedWorkspace), id) - wfa.sharedWorkspace(res.(*SharedWorkspace), id) - wfa.sendToPeer(res.(*SharedWorkspace)) + wfa.deleteToPeer(res.(*SharedWorkspace)) // delete the shared workspace on the peer + wfa.sharedWorkflow(res.(*SharedWorkspace), id) // replace all shared workflows + wfa.sharedWorkspace(res.(*SharedWorkspace), id) // replace all shared workspaces (not shared worspace obj but workspace one) + wfa.sendToPeer(res.(*SharedWorkspace)) // send the shared workspace (shared workspace object) to the peers return res, code, err } +// StoreOne stores a shared workspace in the database, it automatically share to peers if the workspace is shared func (wfa *sharedWorkspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { - id, _ := static.GetMyLocalJsonPeer() - data.(*SharedWorkspace).CreatorID = id - data.(*SharedWorkspace).Peers = append(data.(*SharedWorkspace).Peers, id) + id, _ := static.GetMyLocalJsonPeer() // get the local peer + data.(*SharedWorkspace).CreatorID = id // set the creator id + data.(*SharedWorkspace).Peers = append(data.(*SharedWorkspace).Peers, id) // add the creator id to the peers + // then reset the shared fields if data.(*SharedWorkspace).Workspaces == nil { data.(*SharedWorkspace).Workspaces = []string{} } @@ -198,17 +213,19 @@ func (wfa *sharedWorkspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DB d, code, err := wfa.GenericStoreOne(data.(*SharedWorkspace), wfa) if code == 200 { - wfa.sharedWorkflow(d.(*SharedWorkspace), d.GetID()) - wfa.sharedWorkspace(d.(*SharedWorkspace), d.GetID()) - wfa.sendToPeer(d.(*SharedWorkspace)) + wfa.sharedWorkflow(d.(*SharedWorkspace), d.GetID()) // create all shared workflows + wfa.sharedWorkspace(d.(*SharedWorkspace), d.GetID()) // create all shared workspaces + wfa.sendToPeer(d.(*SharedWorkspace)) // send the shared workspace (shared workspace object) to the peers } return data, code, err } +// CopyOne copies a shared workspace in the database func (wfa *sharedWorkspaceMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) { return wfa.StoreOne(data) } +// enrich is a function that enriches the shared workspace with the shared objects func (wfa *sharedWorkspaceMongoAccessor) enrich(sharedWorkspace *SharedWorkspace) *SharedWorkspace { access := (&workspace.Workspace{}).GetAccessor(nil) res, code, _ := access.Search(&dbs.Filters{ @@ -257,6 +274,7 @@ func (wfa *sharedWorkspaceMongoAccessor) enrich(sharedWorkspace *SharedWorkspace return sharedWorkspace } +// LoadOne loads a shared workspace from the database, given its ID and enrich it func (wfa *sharedWorkspaceMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { var sharedWorkspace SharedWorkspace res_mongo, code, err := mongo.MONGOService.LoadOne(id, wfa.GetType()) @@ -265,9 +283,10 @@ func (wfa *sharedWorkspaceMongoAccessor) LoadOne(id string) (utils.DBObject, int return nil, code, err } res_mongo.Decode(&sharedWorkspace) - return wfa.enrich(&sharedWorkspace), 200, nil + return wfa.enrich(&sharedWorkspace), 200, nil // enrich the shared workspace } +// LoadAll loads all the shared workspaces from the database and enrich them func (wfa sharedWorkspaceMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) { objs := []utils.ShallowDBObject{} res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType()) @@ -280,16 +299,17 @@ func (wfa sharedWorkspaceMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, return nil, 404, err } for _, r := range results { - objs = append(objs, wfa.enrich(&r)) + objs = append(objs, wfa.enrich(&r)) // enrich the shared workspace } return objs, 200, nil } +// Search searches for shared workspaces in the database, given some filters OR a search string func (wfa *sharedWorkspaceMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) { objs := []utils.ShallowDBObject{} if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" { filters = &dbs.Filters{ - Or: map[string][]dbs.Filter{ + Or: map[string][]dbs.Filter{ // search by name only by default can be override "abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, }, } @@ -304,7 +324,7 @@ func (wfa *sharedWorkspaceMongoAccessor) Search(filters *dbs.Filters, search str return nil, 404, err } for _, r := range results { - objs = append(objs, wfa.enrich(&r)) + objs = append(objs, wfa.enrich(&r)) // enrich the shared workspace } return objs, 200, nil } diff --git a/models/workspace/workspace.go b/models/workspace/workspace.go index 7c1c9e4..d5510e6 100644 --- a/models/workspace/workspace.go +++ b/models/workspace/workspace.go @@ -9,12 +9,13 @@ import ( "github.com/google/uuid" ) +// Workspace is a struct that represents a workspace type Workspace struct { - utils.AbstractObject - resources.ResourceSet - IsContextual bool `json:"is_contextual" bson:"is_contextual" default:"false"` - Active bool `json:"active" bson:"active" default:"false"` - Shared string `json:"shared,omitempty" bson:"shared,omitempty"` + utils.AbstractObject // AbstractObject contains the basic fields of an object (id, name) + resources.ResourceSet // ResourceSet contains the resources of the workspace (data, datacenter, processing, storage, workflow) + IsContextual bool `json:"is_contextual" bson:"is_contextual" default:"false"` // IsContextual is a flag that indicates if the workspace is contextual + Active bool `json:"active" bson:"active" default:"false"` // Active is a flag that indicates if the workspace is active + Shared string `json:"shared,omitempty" bson:"shared,omitempty"` // Shared is the ID of the shared workspace } func (ao *Workspace) GetID() string { @@ -32,11 +33,12 @@ func (d *Workspace) GetName() string { } func (d *Workspace) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { - data := New() - data.Init(utils.WORKSPACE, caller) + data := New() // Create a new instance of the accessor + data.Init(utils.WORKSPACE, caller) // Initialize the accessor with the WORKSPACE model type return data } +// New creates a new instance of the workspaceMongoAccessor from a map func (dma *Workspace) Deserialize(j map[string]interface{}) utils.DBObject { b, err := json.Marshal(j) if err != nil { @@ -46,6 +48,7 @@ func (dma *Workspace) Deserialize(j map[string]interface{}) utils.DBObject { return dma } +// Serialize returns the workspaceMongoAccessor as a map func (dma *Workspace) Serialize() map[string]interface{} { var m map[string]interface{} b, err := json.Marshal(dma) diff --git a/models/workspace/workspace_mongo_accessor.go b/models/workspace/workspace_mongo_accessor.go index 251ef18..b92571c 100644 --- a/models/workspace/workspace_mongo_accessor.go +++ b/models/workspace/workspace_mongo_accessor.go @@ -16,28 +16,33 @@ import ( "cloud.o-forge.io/core/oc-lib/tools" ) +// Workspace is a struct that represents a workspace type workspaceMongoAccessor struct { - utils.AbstractAccessor + utils.AbstractAccessor // AbstractAccessor contains the basic fields of an accessor (model, caller) } +// New creates a new instance of the workspaceMongoAccessor func New() *workspaceMongoAccessor { return &workspaceMongoAccessor{} } +// DeleteOne deletes a workspace from the database, given its ID, it automatically share to peers if the workspace is shared +// it checks if a workspace with the same name already exists func (wfa *workspaceMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { res, code, err := wfa.GenericDeleteOne(id, wfa) - wfa.share(res.(*Workspace), true, wfa.Caller) + wfa.share(res.(*Workspace), true, wfa.Caller) // Share the deletion to the peers return res, code, err } +// UpdateOne updates a workspace in the database, given its ID, it automatically share to peers if the workspace is shared func (wfa *workspaceMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { - d := set.(*Workspace) - d.DataResources = nil + d := set.(*Workspace) // Get the workspace from the set + d.DataResources = nil // Reset the resources d.DatacenterResources = nil d.StorageResources = nil d.ProcessingResources = nil d.WorkflowResources = nil - if d.Active { + if d.Active { // If the workspace is active, deactivate all the other workspaces res, _, err := wfa.LoadAll() if err == nil { for _, r := range res { @@ -53,16 +58,18 @@ func (wfa *workspaceMongoAccessor) UpdateOne(set utils.DBObject, id string) (uti return res, code, err } +// StoreOne stores a workspace in the database, it checks if a workspace with the same name already exists func (wfa *workspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { filters := &dbs.Filters{ Or: map[string][]dbs.Filter{ "abstractobject.name": {{dbs.LIKE.String(), data.GetName() + "_workspace"}}, }, } - res, _, err := wfa.Search(filters, "") - if err == nil && len(res) > 0 { + res, _, err := wfa.Search(filters, "") // Search for the workspace + if err == nil && len(res) > 0 { // If the workspace already exists, return an error return nil, 409, errors.New("A workspace with the same name already exists") } + // reset the resources d := data.(*Workspace) d.DataResources = nil d.DatacenterResources = nil @@ -72,11 +79,16 @@ func (wfa *workspaceMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject return wfa.GenericStoreOne(d, wfa) } +// CopyOne copies a workspace in the database func (wfa *workspaceMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) { return wfa.GenericStoreOne(data, wfa) } +/* +This function is used to fill the workspace with the resources +*/ func (wfa *workspaceMongoAccessor) fill(workflow *Workspace) *Workspace { + // Fill the workspace with the resources if workflow.Datas != nil && len(workflow.Datas) > 0 { dataAccessor := (&data.DataResource{}).GetAccessor(nil) for _, id := range workflow.Datas { @@ -86,6 +98,7 @@ func (wfa *workspaceMongoAccessor) fill(workflow *Workspace) *Workspace { } } } + // Fill the workspace with the datacenters if workflow.Datacenters != nil && len(workflow.Datacenters) > 0 { dataAccessor := (&datacenter.DatacenterResource{}).GetAccessor(nil) for _, id := range workflow.Datacenters { @@ -95,6 +108,7 @@ func (wfa *workspaceMongoAccessor) fill(workflow *Workspace) *Workspace { } } } + // Fill the workspace with the storages if workflow.Storages != nil && len(workflow.Storages) > 0 { dataAccessor := (&storage.StorageResource{}).GetAccessor(nil) for _, id := range workflow.Storages { @@ -104,6 +118,7 @@ func (wfa *workspaceMongoAccessor) fill(workflow *Workspace) *Workspace { } } } + // Fill the workspace with the processings if workflow.Processings != nil && len(workflow.Processings) > 0 { dataAccessor := (&processing.ProcessingResource{}).GetAccessor(nil) for _, id := range workflow.Processings { @@ -113,6 +128,7 @@ func (wfa *workspaceMongoAccessor) fill(workflow *Workspace) *Workspace { } } } + // Fill the workspace with the workflows if workflow.Workflows != nil && len(workflow.Workflows) > 0 { dataAccessor := (&w.WorkflowResource{}).GetAccessor(nil) for _, id := range workflow.Workflows { @@ -125,6 +141,7 @@ func (wfa *workspaceMongoAccessor) fill(workflow *Workspace) *Workspace { return workflow } +// LoadOne loads a workspace from the database, given its ID func (wfa *workspaceMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { var workflow Workspace res_mongo, code, err := mongo.MONGOService.LoadOne(id, wfa.GetType()) @@ -137,6 +154,7 @@ func (wfa *workspaceMongoAccessor) LoadOne(id string) (utils.DBObject, int, erro return wfa.fill(&workflow), 200, nil } +// LoadAll loads all the workspaces from the database func (wfa workspaceMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) { objs := []utils.ShallowDBObject{} res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType()) @@ -154,11 +172,12 @@ func (wfa workspaceMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error return objs, 200, nil } +// Search searches for workspaces in the database, given some filters OR a search string func (wfa *workspaceMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) { objs := []utils.ShallowDBObject{} if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" { filters = &dbs.Filters{ - Or: map[string][]dbs.Filter{ + Or: map[string][]dbs.Filter{ // filter by name if no filters are provided "abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, }, } @@ -178,6 +197,9 @@ func (wfa *workspaceMongoAccessor) Search(filters *dbs.Filters, search string) ( return objs, 200, nil } +/* +This function is used to share the workspace with the peers +*/ func (wfa *workspaceMongoAccessor) share(realData *Workspace, delete bool, caller *tools.HTTPCaller) { if realData.Shared == "" { return @@ -191,12 +213,12 @@ func (wfa *workspaceMongoAccessor) share(realData *Workspace, delete bool, calle paccess := &peer.Peer{} for _, p := range res.(*shallow_shared_workspace.ShallowSharedWorkspace).Peers { paccess.UUID = p - if paccess.IsMySelf() { + if paccess.IsMySelf() { // If the peer is the current peer, never share because it will create a loop continue } - if delete { + if delete { // If the workspace is deleted, share the deletion _, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKSPACE, tools.DELETE, map[string]interface{}{}, caller) - } else { + } else { // If the workspace is updated, share the update _, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKSPACE, tools.PUT, res.Serialize(), caller) } } diff --git a/static/peer_static.go b/static/peer_static.go index e50a1d1..9909172 100644 --- a/static/peer_static.go +++ b/static/peer_static.go @@ -1,5 +1,13 @@ package static +/* +This package contains static data for the peer model +It's used to test the peer model +Temporary version, will be removed in the future and replaced with a more dynamic solution +to generate the data +*/ + +// GetMyLocalBsonPeer returns a tuple with the peer ID and the peer data in BSON format func GetMyLocalBsonPeer() (string, map[string]interface{}) { return "6fd0134c-fefc-427e-94c2-e01365fc5fb0", map[string]interface{}{ "abstractobject": map[string]interface{}{ @@ -11,6 +19,7 @@ func GetMyLocalBsonPeer() (string, map[string]interface{}) { } } +// GetMyLocalJsonPeer returns a tuple with the peer ID and the peer data in JSON format func GetMyLocalJsonPeer() (string, map[string]interface{}) { return "6fd0134c-fefc-427e-94c2-e01365fc5fb0", map[string]interface{}{ "id": "6fd0134c-fefc-427e-94c2-e01365fc5fb0", diff --git a/tools/api.go b/tools/api.go index 40a4136..3c0e20c 100644 --- a/tools/api.go +++ b/tools/api.go @@ -3,31 +3,36 @@ package tools import ( "encoding/json" "errors" - "fmt" "cloud.o-forge.io/core/oc-lib/dbs/mongo" ) -var UncatchedError = []error{} +/* +* API is the Health Check API +* it defines the health check methods + */ +var UncatchedError = []error{} // Singleton instance of the api 500 error cache type State int +// State is an enum that defines the state of the API const ( - ALIVE State = iota - REDUCED_SERVICE - UNPROCESSABLE_ENTITY - DB_FALLOUT - TEAPOT - DEAD - WAITING + ALIVE State = iota + REDUCED_SERVICE // occurs when some services are down + UNPROCESSABLE_ENTITY // occurs when the database is up but the collections are not + DB_FALLOUT // occurs when the database is down + TEAPOT // well some things boils in here, i'm probably a teapot, occurs when uncatched errors are present (it's fun) + DEAD // occurs when the peer is dead ) +// EnumIndex returns the index of the enum func (s State) EnumIndex() int { return int(s) } +// ToState returns the state from a string func ToState(str string) State { - for _, s := range []State{ALIVE, REDUCED_SERVICE, UNPROCESSABLE_ENTITY, DB_FALLOUT, TEAPOT, DEAD, WAITING} { + for _, s := range []State{ALIVE, REDUCED_SERVICE, UNPROCESSABLE_ENTITY, DB_FALLOUT, TEAPOT, DEAD} { if s.String() == str { return s } @@ -35,92 +40,105 @@ func ToState(str string) State { return DEAD } +// String returns the string of the enum func (s State) String() string { return [...]string{"alive", "reduced service", "unprocessable entity", "database fallout", - "some things boils in here, i'm probably a teapot", "dead", "waiting"}[s] + "some things boils in here, i'm probably a teapot", "dead"}[s] } type API struct{} +// GetState returns the state of the API func (a *API) GetState() (State, int, error) { // Check if the database is up err := mongo.MONGOService.TestDB(GetConfig()) if err != nil { - return DB_FALLOUT, 200, err + return DB_FALLOUT, 200, err // If the database is not up, return database fallout } - err = mongo.MONGOService.TestCollections(GetConfig(), []string{}) + err = mongo.MONGOService.TestCollections(GetConfig(), []string{}) // Check if the collections are up if err != nil { - return UNPROCESSABLE_ENTITY, 200, err + return UNPROCESSABLE_ENTITY, 200, err // If the collections are not up, return unprocessable entity } - if len(UncatchedError) > 0 { + if len(UncatchedError) > 0 { // If there are uncatched errors, return teapot errStr := "" for _, e := range UncatchedError { errStr += e.Error() + "\n" } return TEAPOT, 200, errors.New(errStr) } - return ALIVE, 200, nil + return ALIVE, 200, nil // If everything is up, return alive } +// CheckRemotePeer checks the state of a remote peer func (a *API) CheckRemotePeer(url string) (State, map[string]int) { // Check if the database is up - caller := NewHTTPCaller(map[string]map[METHOD]string{}) + caller := NewHTTPCaller(map[string]map[METHOD]string{}) // Create a new http caller var resp APIStatusResponse - b, err := caller.CallPost(url, "/status", map[string]interface{}{}) - fmt.Println("CheckRemotePeer", b, url, err) + b, err := caller.CallPost(url, "/status", map[string]interface{}{}) // Call the status endpoint of the peer if err != nil { - return DEAD, map[string]int{} + return DEAD, map[string]int{} // If the peer is not reachable, return dead } json.Unmarshal(b, &resp) - fmt.Println("CheckRemotePeer2", b, err) - if resp.Data == nil { + if resp.Data == nil { // If the response is empty, return dead return DEAD, map[string]int{} } new := map[string]int{} - fmt.Println("CheckRemotePeer", resp.Data.Services) - for k, v := range resp.Data.Services { + for k, v := range resp.Data.Services { // Return the services states of the peer new[k] = ToState(v).EnumIndex() } - return ToState(resp.Data.State), new + return ToState(resp.Data.State), new // Return the state of the peer & its services states } +// CheckRemoteAPIs checks the state of remote APIs from your proper OC func (a *API) CheckRemoteAPIs(urls map[string]string) (State, map[string]string, error) { // Check if the database is up new := map[string]string{} - caller := NewHTTPCaller(map[string]map[METHOD]string{}) + caller := NewHTTPCaller(map[string]map[METHOD]string{}) // Create a new http caller code := 0 - u := "" e := "" - for appName, url := range urls { + state := ALIVE + reachable := false + for appName, url := range urls { // Check the state of each remote API in the list var resp APIStatusResponse - b, err := caller.CallGet(url, "/version/status") + b, err := caller.CallGet(url, "/version/status") // Call the status endpoint of the remote API (standard OC status endpoint) if err != nil { - return REDUCED_SERVICE, new, err + state = REDUCED_SERVICE // If a remote API is not reachable, return reduced service + continue } json.Unmarshal(b, &resp) - if resp.Data == nil { - return DEAD, new, errors.New(url + " -> is DEAD") + if resp.Data == nil { // + state = REDUCED_SERVICE // If the response is empty, return reduced service + continue } new[appName] = resp.Data.State if resp.Data.Code > code { code = resp.Data.Code - u = url e += resp.Error } + reachable = true // If the remote API is reachable, set reachable to true cause we are not dead + } + if !reachable { + state = DEAD // If no remote API is reachable, return dead, nobody is alive } if code > 0 { - return REDUCED_SERVICE, new, errors.New(u + " -> " + e) + state = REDUCED_SERVICE } - return ALIVE, new, nil + return state, new, nil } +/* APIStatusResponse is the response of the API status */ type APIStatusResponse struct { Data *APIStatus `json:"data"` Error string `json:"error"` } +/* +* APIStatus is the status of the API +* it defines the state of the API +* Code is the status code, where 0 is ALIVE, 1 is REDUCED_SERVICE, 2 is UNPROCESSABLE_ENTITY, 3 is DB_FALLOUT, 4 is TEAPOT, 5 is DEAD + */ type APIStatus struct { - Code int `json:"code"` - State string `json:"state"` - Services map[string]string `json:"services"` + Code int `json:"code"` // Code is the status code, where 0 is ALIVE, 1 is REDUCED_SERVICE, 2 is UNPROCESSABLE_ENTITY, 3 is DB_FALLOUT, 4 is TEAPOT, 5 is DEAD + State string `json:"state"` // State is the state of the API (status shows as a string) (alive, reduced service, unprocessable entity, database fallout, some things boils in here, i'm probably a teapot, dead) + Services map[string]string `json:"services"` // Services is the state of the services of the API (status shows as a string) (alive, reduced service, unprocessable entity, database fallout, some things boils in here, i'm probably a teapot, dead) } diff --git a/tools/nats_caller.go b/tools/nats_caller.go index 0c2eda5..16806ef 100644 --- a/tools/nats_caller.go +++ b/tools/nats_caller.go @@ -7,6 +7,7 @@ import ( "github.com/nats-io/nats.go" ) +// NATS Method Enum defines the different methods that can be used to interact with the NATS server type NATSMethod int const ( @@ -14,6 +15,7 @@ const ( CREATE ) +// NameToMethod returns the NATSMethod enum value from a string func NameToMethod(name string) NATSMethod { for _, v := range [...]NATSMethod{REMOVE, CREATE} { if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) { @@ -23,20 +25,24 @@ func NameToMethod(name string) NATSMethod { return -1 } +// GenerateKey generates a key for the NATSMethod usefull for standard key based on data name & method func (d NATSMethod) GenerateKey(name string) string { return name + "_" + d.String() } +// String returns the string of the enum func (d NATSMethod) String() string { return [...]string{"remove", "create", "discovery"}[d] } type natsCaller struct{} +// NewNATSCaller creates a new instance of the NATS Caller func NewNATSCaller() *natsCaller { return &natsCaller{} } +// SetNATSPub sets a message to the NATS server func (o *natsCaller) SetNATSPub(dataName string, method NATSMethod, data interface{}) string { if GetConfig().NATSUrl == "" { return " -> NATS_SERVER is not set" @@ -50,9 +56,9 @@ func (o *natsCaller) SetNATSPub(dataName string, method NATSMethod, data interfa if err != nil { return " -> " + err.Error() } - err = nc.Publish(method.GenerateKey(dataName), js) + err = nc.Publish(method.GenerateKey(dataName), js) // Publish the message on the NATS server with a channel name based on the data name (or whatever start) and the method if err != nil { - return " -> " + err.Error() + return " -> " + err.Error() // Return an error if the message could not be published } return "" } diff --git a/tools/remote_caller.go b/tools/remote_caller.go index 1d4467d..9ff4920 100644 --- a/tools/remote_caller.go +++ b/tools/remote_caller.go @@ -7,6 +7,7 @@ import ( "net/http" ) +// HTTP Method Enum defines the different methods that can be used to interact with the HTTP server type METHOD int const ( @@ -16,14 +17,17 @@ const ( DELETE ) +// String returns the string of the enum func (m METHOD) String() string { return [...]string{"GET", "PUT", "POST", "DELETE"}[m] } +// EnumIndex returns the index of the enum func (m METHOD) EnumIndex() int { return int(m) } +// ToMethod returns the method from a string func ToMethod(str string) METHOD { for _, s := range []METHOD{GET, PUT, POST, DELETE} { if s.String() == str { @@ -33,18 +37,20 @@ func ToMethod(str string) METHOD { return GET } -var HTTPCallerInstance = &HTTPCaller{} +var HTTPCallerInstance = &HTTPCaller{} // Singleton instance of the HTTPCaller type HTTPCaller struct { - URLS map[string]map[METHOD]string + URLS map[string]map[METHOD]string // Map of the different methods and their urls } +// NewHTTPCaller creates a new instance of the HTTP Caller func NewHTTPCaller(urls map[string]map[METHOD]string) *HTTPCaller { return &HTTPCaller{ - URLS: urls, + URLS: urls, // Set the urls defined in the config & based on the data name type & method } } +// CallGet calls the GET method on the HTTP server func (caller *HTTPCaller) CallGet(url string, subpath string) ([]byte, error) { resp, err := http.Get(url + subpath) if err != nil { @@ -54,6 +60,7 @@ func (caller *HTTPCaller) CallGet(url string, subpath string) ([]byte, error) { return io.ReadAll(resp.Body) } +// CallPut calls the DELETE method on the HTTP server func (caller *HTTPCaller) CallDelete(url string, subpath string) ([]byte, error) { resp, err := http.NewRequest("DELETE", url+subpath, nil) if err != nil { @@ -63,6 +70,7 @@ func (caller *HTTPCaller) CallDelete(url string, subpath string) ([]byte, error) return io.ReadAll(resp.Body) } +// CallPost calls the POST method on the HTTP server func (caller *HTTPCaller) CallPost(url string, subpath string, body map[string]interface{}) ([]byte, error) { postBody, _ := json.Marshal(body) responseBody := bytes.NewBuffer(postBody) @@ -73,3 +81,5 @@ func (caller *HTTPCaller) CallPost(url string, subpath string, body map[string]i defer resp.Body.Close() return io.ReadAll(resp.Body) } + +// NO PUT IN HERE TO DANGEROUS TO USE ON A REMOTE SERVER, MAYBE NEEDED IN THE FUTURE