simplify path reservation

This commit is contained in:
mr
2024-10-02 10:45:52 +02:00
parent f9bfafa004
commit 3c1a84011e
9 changed files with 31 additions and 32 deletions

View File

@@ -51,7 +51,7 @@ func (ao *Peer) IsMySelf() bool {
}
// LaunchPeerExecution launches an execution on a peer
func (p *Peer) LaunchPeerExecution(peerID string, dataID string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
func (p *Peer) LaunchPeerExecution(peerID string, dataID string, dt tools.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
p.UUID = peerID
return cache.LaunchPeerExecution(peerID, dataID, dt, method, body, caller) // Launch the execution on the peer through the cache
}

View File

@@ -7,7 +7,6 @@ import (
"regexp"
"strings"
"cloud.o-forge.io/core/oc-lib/models/utils"
"cloud.o-forge.io/core/oc-lib/tools"
)
@@ -30,7 +29,7 @@ type PeerCache struct {
}
// urlFormat formats the URL of the peer with the data type API function
func (p *PeerCache) urlFormat(url string, dt utils.DataType) string {
func (p *PeerCache) urlFormat(url string, dt tools.DataType) string {
// localhost is replaced by the local peer URL
// because localhost must collide on a web request security protocol
if strings.Contains(url, "localhost") || strings.Contains(url, "127.0.0.1") {
@@ -53,7 +52,7 @@ func (p *PeerCache) checkPeerStatus(peerID string, appName string, caller *tools
if code != 200 { // no peer no party
return nil, false
}
methods := caller.URLS[utils.PEER.String()] // Get the methods url of the peer
methods := caller.URLS[tools.PEER] // Get the methods url of the peer
if methods == nil {
return res.(*Peer), false
}
@@ -61,7 +60,7 @@ func (p *PeerCache) checkPeerStatus(peerID string, appName string, caller *tools
if meth == "" {
return res.(*Peer), false
}
url := p.urlFormat(res.(*Peer).Url+meth, utils.PEER) // Format the URL
url := p.urlFormat(res.(*Peer).Url+meth, tools.PEER) // Format the URL
state, services := api.CheckRemotePeer(url) // Check the status of the peer
res.(*Peer).Services = services // Update the services states of the peer
access.UpdateOne(res, peerID) // Update the peer in the db
@@ -70,8 +69,8 @@ func (p *PeerCache) checkPeerStatus(peerID string, appName string, caller *tools
// LaunchPeerExecution launches an execution on a peer
func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string,
dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
methods := caller.URLS[dt.String()] // Get the methods url of the data type
dt tools.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
methods := caller.URLS[dt] // Get the methods url of the data type
if _, ok := methods[method]; !ok {
return nil, errors.New("no path found")
}

View File

@@ -1,82 +0,0 @@
package utils
type DataType int
// DataType - Enum for the different types of resources in db accessible from the outside
const (
INVALID DataType = iota
DATA_RESOURCE
PROCESSING_RESOURCE
STORAGE_RESOURCE
DATACENTER_RESOURCE
WORKFLOW_RESOURCE
WORKFLOW
WORKFLOW_EXECUTION
WORKSPACE
RESOURCE_MODEL
PEER
COLLABORATIVE_AREA
RULE
BOOKING
)
var NOAPI = ""
var CATALOGAPI = "oc-catalog"
var SHAREDAPI = "oc-shared"
var WORKFLOWAPI = "oc-workflow"
var WORKSPACEAPI = "oc-workspace"
var PEERSAPI = "oc-peers"
var DATACENTERAPI = "oc-datacenter"
// Bind the standard API name to the data type
var DefaultAPI = [...]string{
NOAPI,
CATALOGAPI,
CATALOGAPI,
CATALOGAPI,
CATALOGAPI,
CATALOGAPI,
WORKFLOWAPI,
NOAPI,
WORKSPACEAPI,
NOAPI,
PEERSAPI,
SHAREDAPI,
SHAREDAPI,
DATACENTERAPI,
}
// Bind the standard data name to the data type
var Str = [...]string{
"invalid",
"data_resource",
"processing_resource",
"storage_resource",
"datacenter_resource",
"workflow_resource",
"workflow",
"workflow_execution",
"workspace",
"resource_model",
"peer",
"shared_workspace",
"rule",
"booking",
}
func FromInt(i int) string {
return Str[i]
}
func (d DataType) API() string { // API - Returns the API name of the data type
return DefaultAPI[d]
}
func (d DataType) String() string { // String - Returns the string name of the data type
return Str[d]
}
// EnumIndex - Creating common behavior-give the type a EnumIndex functio
func (d DataType) EnumIndex() int {
return int(d)
}

View File

@@ -69,7 +69,7 @@ func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) {
if peerID == "" {
return false, errors.New("no peer id")
} // no peer id no booking, we need to know where to book
_, err := (&peer.Peer{}).LaunchPeerExecution(peerID, dc_id, utils.BOOKING, tools.GET, nil, caller)
_, err := (&peer.Peer{}).LaunchPeerExecution(peerID, dc_id, tools.BOOKING, tools.GET, nil, caller)
if err != nil {
return false, err
}

View File

@@ -108,10 +108,10 @@ func (wfa *workflowMongoAccessor) DeleteOne(id string) (utils.DBObject, int, err
* it returns an error if the booking fails
*/
func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*workflow_execution.WorkflowExecution) error {
if wfa.Caller == nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.BOOKING.String()] == nil {
if wfa.Caller == nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[tools.BOOKING] == nil {
return errors.New("no caller defined")
}
methods := wfa.Caller.URLS[utils.BOOKING.String()]
methods := wfa.Caller.URLS[tools.BOOKING]
if _, ok := methods[tools.POST]; !ok {
return errors.New("no path found")
}
@@ -138,7 +138,7 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w
continue
}
// BOOKING ON PEER
_, err := (&peer.Peer{}).LaunchPeerExecution(peerID, "", utils.BOOKING, tools.POST,
_, err := (&peer.Peer{}).LaunchPeerExecution(peerID, "", tools.BOOKING, tools.POST,
(&workflow_execution.WorkflowExecutions{ // it's the standard model for booking see OC-PEER
WorkflowID: id, // set the workflow id "WHO"
ResourceID: dc_id, // set the datacenter id "WHERE"
@@ -175,9 +175,9 @@ func (wfa *workflowMongoAccessor) share(realData *Workflow, delete bool, caller
continue
}
if delete { // if the workflow is deleted, share the deletion
_, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKFLOW, tools.DELETE, map[string]interface{}{}, caller)
_, err = paccess.LaunchPeerExecution(p, res.GetID(), tools.WORKFLOW, tools.DELETE, map[string]interface{}{}, caller)
} else { // if the workflow is updated, share the update
_, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKFLOW, tools.PUT, res.Serialize(), caller)
_, err = paccess.LaunchPeerExecution(p, res.GetID(), tools.WORKFLOW, tools.PUT, res.Serialize(), caller)
}
}
if err != nil {

View File

@@ -47,12 +47,12 @@ func (wfa *collaborativeAreaMongoAccessor) sharedWorkspace(shared *Collaborative
if eld.Workspaces != nil { // update all your workspaces in the eldest by replacing shared ref by an empty string
for _, v := range eld.Workspaces {
accessor.UpdateOne(&workspace.Workspace{Shared: ""}, v)
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKSPACE.String()] == nil {
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[tools.WORKSPACE] == nil {
continue
}
paccess := (&peer.Peer{}) // send to all peers
for _, p := range shared.Peers { // delete the collaborative area on the peer
b, err := paccess.LaunchPeerExecution(p, v, utils.WORKSPACE, tools.DELETE, nil, wfa.Caller)
b, err := paccess.LaunchPeerExecution(p, v, tools.WORKSPACE, tools.DELETE, nil, wfa.Caller)
if err != nil && b == nil {
wfa.Logger.Error().Msg("Could not send to peer " + p + ". Error: " + err.Error())
}
@@ -63,7 +63,7 @@ func (wfa *collaborativeAreaMongoAccessor) sharedWorkspace(shared *Collaborative
if shared.Workspaces != nil {
for _, v := range shared.Workspaces { // update all the collaborative areas
workspace, code, _ := accessor.UpdateOne(&workspace.Workspace{Shared: shared.UUID}, v) // add the shared ref to workspace
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKSPACE.String()] == nil {
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[tools.WORKSPACE] == nil {
continue
}
for _, p := range shared.Peers {
@@ -73,7 +73,7 @@ func (wfa *collaborativeAreaMongoAccessor) sharedWorkspace(shared *Collaborative
paccess := (&peer.Peer{}) // send to all peers, add the collaborative area on the peer
s := workspace.Serialize()
s["name"] = fmt.Sprintf("%v", s["name"]) + "_" + p
b, err := paccess.LaunchPeerExecution(p, v, utils.WORKSPACE, tools.POST, s, wfa.Caller)
b, err := paccess.LaunchPeerExecution(p, v, tools.WORKSPACE, tools.POST, s, wfa.Caller)
if err != nil && b == nil {
wfa.Logger.Error().Msg("Could not send to peer " + p + ". Error: " + err.Error())
}
@@ -104,12 +104,12 @@ func (wfa *collaborativeAreaMongoAccessor) sharedWorkflow(shared *CollaborativeA
n := &w.Workflow{}
n.Shared = new
accessor.UpdateOne(n, v)
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKFLOW.String()] == nil {
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[tools.WORKFLOW] == nil {
continue
}
paccess := (&peer.Peer{}) // send to all peers
for _, p := range shared.Peers { // delete the shared workflow on the peer
b, err := paccess.LaunchPeerExecution(p, v, utils.WORKFLOW, tools.DELETE, nil, wfa.Caller)
b, err := paccess.LaunchPeerExecution(p, v, tools.WORKFLOW, tools.DELETE, nil, wfa.Caller)
if err != nil && b == nil {
wfa.Logger.Error().Msg("Could not send to peer " + p + ". Error: " + err.Error())
}
@@ -126,7 +126,7 @@ func (wfa *collaborativeAreaMongoAccessor) sharedWorkflow(shared *CollaborativeA
if !slices.Contains(s.Shared, id) {
s.Shared = append(s.Shared, id)
workflow, code, _ := accessor.UpdateOne(s, v)
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.WORKFLOW.String()] == nil {
if wfa.Caller != nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[tools.WORKFLOW] == nil {
continue
}
paccess := (&peer.Peer{})
@@ -134,7 +134,7 @@ func (wfa *collaborativeAreaMongoAccessor) sharedWorkflow(shared *CollaborativeA
if code == 200 {
s := workflow.Serialize() // add the shared workflow on the peer
s["name"] = fmt.Sprintf("%v", s["name"]) + "_" + p
b, err := paccess.LaunchPeerExecution(p, shared.UUID, utils.WORKFLOW, tools.POST, s, wfa.Caller)
b, err := paccess.LaunchPeerExecution(p, shared.UUID, tools.WORKFLOW, tools.POST, s, wfa.Caller)
if err != nil && b == nil {
wfa.Logger.Error().Msg("Could not send to peer " + p + ". Error: " + err.Error())
}
@@ -150,7 +150,7 @@ func (wfa *collaborativeAreaMongoAccessor) sharedWorkflow(shared *CollaborativeA
// sharedWorkspace is a function that shares the collaborative area to the peers
func (wfa *collaborativeAreaMongoAccessor) deleteToPeer(shared *CollaborativeArea) {
if wfa.Caller == nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.COLLABORATIVE_AREA.String()] == nil {
if wfa.Caller == nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[tools.COLLABORATIVE_AREA] == nil {
return
}
paccess := (&peer.Peer{})
@@ -158,7 +158,7 @@ func (wfa *collaborativeAreaMongoAccessor) deleteToPeer(shared *CollaborativeAre
if (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: v}}).IsMySelf() {
continue
}
b, err := paccess.LaunchPeerExecution(v, shared.UUID, utils.COLLABORATIVE_AREA, tools.DELETE, nil, wfa.Caller)
b, err := paccess.LaunchPeerExecution(v, shared.UUID, tools.COLLABORATIVE_AREA, tools.DELETE, nil, wfa.Caller)
if err != nil && b == nil {
wfa.Logger.Error().Msg("Could not send to peer " + v + ". Error: " + err.Error())
}
@@ -167,7 +167,7 @@ func (wfa *collaborativeAreaMongoAccessor) deleteToPeer(shared *CollaborativeAre
// sharedWorkspace is a function that shares the collaborative area to the peers
func (wfa *collaborativeAreaMongoAccessor) sendToPeer(shared *CollaborativeArea) {
if wfa.Caller == nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[utils.COLLABORATIVE_AREA.String()] == nil {
if wfa.Caller == nil || wfa.Caller.URLS == nil || wfa.Caller.URLS[tools.COLLABORATIVE_AREA] == nil {
return
}
@@ -177,7 +177,7 @@ func (wfa *collaborativeAreaMongoAccessor) sendToPeer(shared *CollaborativeArea)
continue
}
shared.IsSent = true
b, err := paccess.LaunchPeerExecution(v, v, utils.COLLABORATIVE_AREA, tools.POST, shared.Serialize(), wfa.Caller)
b, err := paccess.LaunchPeerExecution(v, v, tools.COLLABORATIVE_AREA, tools.POST, shared.Serialize(), wfa.Caller)
if err != nil && b == nil {
wfa.Logger.Error().Msg("Could not send to peer " + v + ". Error: " + err.Error())
}

View File

@@ -221,9 +221,9 @@ func (wfa *workspaceMongoAccessor) share(realData *Workspace, delete bool, calle
continue
}
if delete { // If the workspace is deleted, share the deletion
_, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKSPACE, tools.DELETE, map[string]interface{}{}, caller)
_, err = paccess.LaunchPeerExecution(p, res.GetID(), tools.WORKSPACE, tools.DELETE, map[string]interface{}{}, caller)
} else { // If the workspace is updated, share the update
_, err = paccess.LaunchPeerExecution(p, res.GetID(), utils.WORKSPACE, tools.PUT, res.Serialize(), caller)
_, err = paccess.LaunchPeerExecution(p, res.GetID(), tools.WORKSPACE, tools.PUT, res.Serialize(), caller)
}
}
if err != nil {