default http behavior on is my self
This commit is contained in:
parent
c6ea2195ed
commit
e31815f576
@ -21,7 +21,8 @@ func (ao *Peer) IsMySelf() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *Peer) LaunchPeerExecution(peerID string, dataID string, url string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
|
func (p *Peer) LaunchPeerExecution(peerID string, dataID string, url string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
|
||||||
return (&PeerCache{}).LaunchPeerExecution(peerID, dataID, url, dt, method, body, caller)
|
p.UUID = peerID
|
||||||
|
return (&PeerCache{}).LaunchPeerExecution(peerID, p.IsMySelf(), dataID, url, dt, method, body, caller)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ao *Peer) GetID() string {
|
func (ao *Peer) GetID() string {
|
||||||
|
@ -21,6 +21,7 @@ type PeerExecution struct {
|
|||||||
Method tools.METHOD
|
Method tools.METHOD
|
||||||
Url string
|
Url string
|
||||||
Body map[string]interface{}
|
Body map[string]interface{}
|
||||||
|
IsMySelf bool
|
||||||
Caller tools.HTTPCaller
|
Caller tools.HTTPCaller
|
||||||
PeerID string
|
PeerID string
|
||||||
DataType utils.DataType
|
DataType utils.DataType
|
||||||
@ -41,7 +42,15 @@ func (p *PeerCache) GetAccessor(caller *tools.HTTPCaller) utils.Accessor {
|
|||||||
return data
|
return data
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string, url string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
|
func (p *PeerCache) LaunchPeerExecution(peerID string, isMySelf bool, dataID string, url string,
|
||||||
|
dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) {
|
||||||
|
if strings.Contains(url, "localhost") || strings.Contains(url, "127.0.0.1") {
|
||||||
|
if isMySelf {
|
||||||
|
url = "http://" + dt.API() // default behavior on basic API container name
|
||||||
|
} else {
|
||||||
|
return nil, errors.New("Peer " + peerID + " is not reachable")
|
||||||
|
}
|
||||||
|
}
|
||||||
var err error
|
var err error
|
||||||
b := []byte{}
|
b := []byte{}
|
||||||
methods := caller.URLS[dt.String()]
|
methods := caller.URLS[dt.String()]
|
||||||
@ -67,6 +76,7 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string, url string
|
|||||||
Method: method,
|
Method: method,
|
||||||
Url: url + methods[method],
|
Url: url + methods[method],
|
||||||
Body: body,
|
Body: body,
|
||||||
|
IsMySelf: isMySelf,
|
||||||
Caller: *caller,
|
Caller: *caller,
|
||||||
PeerID: peerID,
|
PeerID: peerID,
|
||||||
DataType: dt,
|
DataType: dt,
|
||||||
@ -88,7 +98,8 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string, url string
|
|||||||
func (p *PeerCache) retryPeerExecution() {
|
func (p *PeerCache) retryPeerExecution() {
|
||||||
execs := []*PeerExecution{}
|
execs := []*PeerExecution{}
|
||||||
for _, v := range singleton.Executions {
|
for _, v := range singleton.Executions {
|
||||||
d, err := p.LaunchPeerExecution(v.PeerID, v.DataID, v.Url, v.DataType, v.Method, v.Body, &v.Caller)
|
|
||||||
|
d, err := p.LaunchPeerExecution(v.PeerID, v.IsMySelf, v.DataID, v.Url, v.DataType, v.Method, v.Body, &v.Caller)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
execs = append(execs, d)
|
execs = append(execs, d)
|
||||||
} else {
|
} else {
|
||||||
|
@ -19,6 +19,23 @@ const (
|
|||||||
BOOKING
|
BOOKING
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var defaultAPI = [...]string{
|
||||||
|
"",
|
||||||
|
"oc-catalog",
|
||||||
|
"oc-catalog",
|
||||||
|
"oc-catalog",
|
||||||
|
"oc-catalog",
|
||||||
|
"oc-catalog",
|
||||||
|
"oc-workflow",
|
||||||
|
"",
|
||||||
|
"oc-workspace",
|
||||||
|
"",
|
||||||
|
"oc-peer",
|
||||||
|
"oc-shared",
|
||||||
|
"oc-shared",
|
||||||
|
"oc-datacenter",
|
||||||
|
}
|
||||||
|
|
||||||
var Str = [...]string{
|
var Str = [...]string{
|
||||||
"invalid",
|
"invalid",
|
||||||
"data_resource",
|
"data_resource",
|
||||||
@ -40,6 +57,10 @@ func FromInt(i int) string {
|
|||||||
return Str[i]
|
return Str[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d DataType) API() string {
|
||||||
|
return defaultAPI[d]
|
||||||
|
}
|
||||||
|
|
||||||
func (d DataType) String() string {
|
func (d DataType) String() string {
|
||||||
return Str[d]
|
return Str[d]
|
||||||
}
|
}
|
||||||
|
@ -118,21 +118,16 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w
|
|||||||
}
|
}
|
||||||
paccess := (&peer.Peer{})
|
paccess := (&peer.Peer{})
|
||||||
p, code, _ := paccess.GetAccessor(nil).LoadOne(peerID)
|
p, code, _ := paccess.GetAccessor(nil).LoadOne(peerID)
|
||||||
fmt.Println("PEER", p, code)
|
|
||||||
if code != 200 {
|
if code != 200 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
url := p.(*peer.Peer).Url
|
b, err := paccess.LaunchPeerExecution(p.GetID(), "", p.(*peer.Peer).Url, utils.BOOKING, tools.POST,
|
||||||
if p.(*peer.Peer).IsMySelf() {
|
|
||||||
url = "http://" + utils.BOOKING.String()
|
|
||||||
}
|
|
||||||
b, err := paccess.LaunchPeerExecution(p.GetID(), "", url, utils.BOOKING, tools.POST,
|
|
||||||
(&workflow_execution.WorkflowExecutions{
|
(&workflow_execution.WorkflowExecutions{
|
||||||
WorkflowID: id,
|
WorkflowID: id,
|
||||||
ResourceID: dc_id,
|
ResourceID: dc_id,
|
||||||
Executions: execs,
|
Executions: execs,
|
||||||
}).Serialize(), wfa.Caller)
|
}).Serialize(), wfa.Caller)
|
||||||
fmt.Println("LaunchPeerExecution RES", b, err)
|
fmt.Println("LaunchPeerExecutio RES", b, err)
|
||||||
if err != nil && b == nil {
|
if err != nil && b == nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -45,13 +45,9 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkspace(shared *SharedWorkspace
|
|||||||
for _, p := range shared.Peers {
|
for _, p := range shared.Peers {
|
||||||
pp, code, _ := paccess.GetAccessor(nil).LoadOne(p)
|
pp, code, _ := paccess.GetAccessor(nil).LoadOne(p)
|
||||||
if code == 200 {
|
if code == 200 {
|
||||||
url := pp.(*peer.Peer).Url
|
b, err := paccess.LaunchPeerExecution(p, v, pp.(*peer.Peer).Url, utils.WORKSPACE, tools.DELETE, nil, wfa.Caller)
|
||||||
if pp.(*peer.Peer).IsMySelf() {
|
|
||||||
url = "http://" + utils.WORKSPACE.String()
|
|
||||||
}
|
|
||||||
b, err := paccess.LaunchPeerExecution(p, v, url, utils.WORKSPACE, tools.DELETE, nil, wfa.Caller)
|
|
||||||
if err != nil && b == nil {
|
if err != nil && b == nil {
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + url + ". Error: " + err.Error())
|
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -71,13 +67,9 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkspace(shared *SharedWorkspace
|
|||||||
paccess := (&peer.Peer{})
|
paccess := (&peer.Peer{})
|
||||||
pp, code, _ := paccess.GetAccessor(nil).LoadOne(p)
|
pp, code, _ := paccess.GetAccessor(nil).LoadOne(p)
|
||||||
if code == 200 {
|
if code == 200 {
|
||||||
url := pp.(*peer.Peer).Url
|
b, err := paccess.LaunchPeerExecution(p, v, pp.(*peer.Peer).Url, utils.WORKSPACE, tools.POST, workspace.Serialize(), wfa.Caller)
|
||||||
if pp.(*peer.Peer).IsMySelf() {
|
|
||||||
url = "http://" + utils.WORKSPACE.String()
|
|
||||||
}
|
|
||||||
b, err := paccess.LaunchPeerExecution(p, v, url, utils.WORKSPACE, tools.POST, workspace.Serialize(), wfa.Caller)
|
|
||||||
if err != nil && b == nil {
|
if err != nil && b == nil {
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + url + ". Error: " + err.Error())
|
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -113,13 +105,9 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace,
|
|||||||
if code != 200 {
|
if code != 200 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
url := pp.(*peer.Peer).Url
|
b, err := paccess.LaunchPeerExecution(p, v, pp.(*peer.Peer).Url, utils.WORKFLOW, tools.DELETE, nil, wfa.Caller)
|
||||||
if pp.(*peer.Peer).IsMySelf() {
|
|
||||||
url = "http://" + utils.WORKFLOW.String()
|
|
||||||
}
|
|
||||||
b, err := paccess.LaunchPeerExecution(p, v, url, utils.WORKFLOW, tools.DELETE, nil, wfa.Caller)
|
|
||||||
if err != nil && b == nil {
|
if err != nil && b == nil {
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + url + ". Error: " + err.Error())
|
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -144,13 +132,9 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace,
|
|||||||
}
|
}
|
||||||
pp, code, _ := paccess.GetAccessor(nil).LoadOne(p)
|
pp, code, _ := paccess.GetAccessor(nil).LoadOne(p)
|
||||||
if code == 200 {
|
if code == 200 {
|
||||||
url := pp.(*peer.Peer).Url
|
b, err := paccess.LaunchPeerExecution(p, shared.UUID, pp.(*peer.Peer).Url, utils.WORKFLOW, tools.POST, workflow.Serialize(), wfa.Caller)
|
||||||
if pp.(*peer.Peer).IsMySelf() {
|
|
||||||
url = "http://" + utils.WORKFLOW.String()
|
|
||||||
}
|
|
||||||
b, err := paccess.LaunchPeerExecution(p, shared.UUID, url, utils.WORKFLOW, tools.POST, workflow.Serialize(), wfa.Caller)
|
|
||||||
if err != nil && b == nil {
|
if err != nil && b == nil {
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + url + ". Error: " + err.Error())
|
wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -173,13 +157,9 @@ func (wfa *sharedWorkspaceMongoAccessor) deleteToPeer(shared *SharedWorkspace) {
|
|||||||
if code != 200 {
|
if code != 200 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
url := p.(*peer.Peer).Url
|
b, err := paccess.LaunchPeerExecution(p.GetID(), shared.UUID, p.(*peer.Peer).Url, utils.SHARED_WORKSPACE, tools.DELETE, nil, wfa.Caller)
|
||||||
if p.(*peer.Peer).IsMySelf() {
|
|
||||||
url = "http://" + utils.SHARED_WORKSPACE.String()
|
|
||||||
}
|
|
||||||
b, err := paccess.LaunchPeerExecution(p.GetID(), shared.UUID, url, utils.SHARED_WORKSPACE, tools.DELETE, nil, wfa.Caller)
|
|
||||||
if err != nil && b == nil {
|
if err != nil && b == nil {
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + url + ". Error: " + err.Error())
|
wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error: " + err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -197,13 +177,9 @@ func (wfa *sharedWorkspaceMongoAccessor) sendToPeer(shared *SharedWorkspace) {
|
|||||||
if code != 200 {
|
if code != 200 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
url := p.(*peer.Peer).Url
|
b, err := paccess.LaunchPeerExecution(p.GetID(), v, p.(*peer.Peer).Url, utils.SHARED_WORKSPACE, tools.POST, shared.Serialize(), wfa.Caller)
|
||||||
if p.(*peer.Peer).IsMySelf() {
|
|
||||||
url = "http://" + utils.SHARED_WORKSPACE.String()
|
|
||||||
}
|
|
||||||
b, err := paccess.LaunchPeerExecution(p.GetID(), v, url, utils.SHARED_WORKSPACE, tools.POST, shared.Serialize(), wfa.Caller)
|
|
||||||
if err != nil && b == nil {
|
if err != nil && b == nil {
|
||||||
wfa.Logger.Error().Msg("Could not send to peer " + url + ". Error: " + err.Error())
|
wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error: " + err.Error())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user