diff --git a/models/peer/peer.go b/models/peer/peer.go index afd69a4..4c90e99 100644 --- a/models/peer/peer.go +++ b/models/peer/peer.go @@ -2,6 +2,7 @@ package peer import ( "encoding/json" + "fmt" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/static" @@ -11,9 +12,33 @@ import ( type Peer struct { utils.AbstractObject - Url string `json:"url,omitempty" bson:"url,omitempty" validate:"required,base64url"` - PublicKey string `json:"public_key,omitempty" bson:"public_key,omitempty"` - Services map[string]int `json:"services,omitempty" bson:"services,omitempty"` + Url string `json:"url,omitempty" bson:"url,omitempty" validate:"required,base64url"` + PublicKey string `json:"public_key,omitempty" bson:"public_key,omitempty"` + Services map[string]int `json:"services,omitempty" bson:"services,omitempty"` + FailedExecution []PeerExecution `json:"failed_execution,omitempty" bson:"failed_execution,omitempty"` +} + +func (ao *Peer) AddExecution(exec PeerExecution) { + found := false + for _, v := range ao.FailedExecution { + if v.Url == exec.Url && v.Method == exec.Method && fmt.Sprint(v.Body) == fmt.Sprint(exec.Body) { + found = true + break + } + } + if !found { + ao.FailedExecution = append(ao.FailedExecution, exec) + } +} + +func (ao *Peer) RemoveExecution(exec PeerExecution) { + new := []PeerExecution{} + for i, v := range ao.FailedExecution { + if !(v.Url == exec.Url && v.Method == exec.Method && fmt.Sprint(v.Body) == fmt.Sprint(exec.Body)) { + new = append(new, ao.FailedExecution[i]) + } + } + ao.FailedExecution = new } func (ao *Peer) IsMySelf() bool { @@ -21,9 +46,9 @@ func (ao *Peer) IsMySelf() bool { return ao.UUID == id } -func (p *Peer) LaunchPeerExecution(peerID string, dataID string, url string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) { +func (p *Peer) LaunchPeerExecution(peerID string, dataID string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) { p.UUID = peerID - return (&PeerCache{}).LaunchPeerExecution(peerID, p.IsMySelf(), dataID, url, dt, method, body, caller) + return (&PeerCache{}).LaunchPeerExecution(peerID, dataID, dt, method, body, caller) } func (ao *Peer) GetID() string { diff --git a/models/peer/peer_cache.go b/models/peer/peer_cache.go index 4ee666a..b990a98 100644 --- a/models/peer/peer_cache.go +++ b/models/peer/peer_cache.go @@ -6,27 +6,17 @@ import ( "fmt" "regexp" "strings" - "time" - "cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/models/utils" "cloud.o-forge.io/core/oc-lib/tools" ) -var currentRountine = 0 -var singleton = &PeerCache{ - Executions: []*PeerExecution{}, -} - type PeerExecution struct { - Method tools.METHOD - Url string - Body map[string]interface{} - IsMySelf bool - Caller tools.HTTPCaller - PeerID string - DataType utils.DataType - DataID string + Method string `json:"method" bson:"method"` + Url string `json:"url" bson:"url"` + Body map[string]interface{} `json:"body" bson:"body"` + DataType int `json:"data_type" bson:"data_type"` + DataID string `json:"data_id" bson:"data_id"` } type PeerCache struct { @@ -47,22 +37,22 @@ func (p *PeerCache) urlFormat(url string, dt utils.DataType) string { return url } -func (p *PeerCache) checkPeerStatus(peerID string, caller *tools.HTTPCaller) bool { +func (p *PeerCache) checkPeerStatus(peerID string, caller *tools.HTTPCaller) (*Peer, bool) { api := tools.API{} access := (&Peer{}).GetAccessor(nil) res, code, _ := access.LoadOne(peerID) if code != 200 { - return false + return nil, false } methods := caller.URLS[utils.PEER.String()] fmt.Println("PEER AFT 3", methods) if methods == nil { - return false + return res.(*Peer), false } meth := methods[tools.POST] fmt.Println("PEER AFT 4", meth) if meth == "" { - return false + return res.(*Peer), false } url := p.urlFormat(res.(*Peer).Url+meth, utils.PEER) fmt.Println("PEER AFT 5", url) @@ -70,7 +60,7 @@ func (p *PeerCache) checkPeerStatus(peerID string, caller *tools.HTTPCaller) boo res.(*Peer).Services = services access.UpdateOne(res, peerID) fmt.Printf("Peer %v is %v\n", peerID, state) - return state != tools.DEAD + return res.(*Peer), state != tools.DEAD } func (p *PeerCache) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { @@ -79,7 +69,7 @@ func (p *PeerCache) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { return data } -func (p *PeerCache) LaunchPeerExecution(peerID string, isMySelf bool, dataID string, url string, +func (p *PeerCache) LaunchPeerExecution(peerID string, dataID string, dt utils.DataType, method tools.METHOD, body map[string]interface{}, caller *tools.HTTPCaller) (*PeerExecution, error) { var err error b := []byte{} @@ -93,10 +83,28 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, isMySelf bool, dataID str } else { meth = strings.ReplaceAll(meth, ":id", dataID) } - url = p.urlFormat(url+meth, dt) - fmt.Println("LaunchPeerExecution AFT 3", url) - if !p.checkPeerStatus(peerID, caller) { + url := "" + pexec := &PeerExecution{ + Method: method.String(), + Url: url + methods[method], + Body: body, + DataType: dt.EnumIndex(), + DataID: dataID, + } + if mypeer, ok := p.checkPeerStatus(peerID, caller); !ok { + mypeer.AddExecution(*pexec) return nil, errors.New("peer is not reachable") + } else { + url = p.urlFormat((mypeer.Url)+meth, dt) + fmt.Println("LaunchPeerExecution AFT 3", url) + tmp := []PeerExecution{} + for _, v := range mypeer.FailedExecution { + tmp = append(tmp, v) + } + mypeer.FailedExecution = []PeerExecution{} + for _, v := range tmp { + go p.LaunchPeerExecution(peerID, v.DataID, utils.DataType(v.DataType), tools.ToMethod(v.Method), v.Body, caller) + } } if method == tools.POST { b, err = caller.CallPost(url, "", body) @@ -110,22 +118,7 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, isMySelf bool, dataID str var m map[string]interface{} json.Unmarshal(b, &m) if err != nil { - pexec := &PeerExecution{ - Method: method, - Url: url + methods[method], - Body: body, - IsMySelf: isMySelf, - Caller: *caller, - PeerID: peerID, - DataType: dt, - DataID: dataID, - } - singleton.Executions = append(singleton.Executions, pexec) - /*if currentRountine == 0 { - currentRountine++ - go p.retryPeerExecution() - }*/ - return pexec, err + return nil, err } fmt.Println("LaunchPeerExecution AFT 3", m, url) @@ -134,23 +127,3 @@ func (p *PeerCache) LaunchPeerExecution(peerID string, isMySelf bool, dataID str } return nil, err } - -func (p *PeerCache) retryPeerExecution() { - execs := []*PeerExecution{} - for _, v := range singleton.Executions { - - d, err := p.LaunchPeerExecution(v.PeerID, v.IsMySelf, v.DataID, v.Url, v.DataType, v.Method, v.Body, &v.Caller) - if err == nil { - execs = append(execs, d) - } else { - logs.GetLogger().With().Err(err) - } - } - singleton.Executions = execs - if len(singleton.Executions) > 0 { - time.Sleep(60 * time.Second) - p.retryPeerExecution() - } else { - currentRountine = 0 - } -} diff --git a/models/workflow/workflow.go b/models/workflow/workflow.go index 47603e5..a592af3 100644 --- a/models/workflow/workflow.go +++ b/models/workflow/workflow.go @@ -4,7 +4,6 @@ import ( "encoding/json" "errors" "fmt" - "strings" "time" "cloud.o-forge.io/core/oc-lib/models/peer" @@ -38,16 +37,11 @@ type Workflow struct { AbstractWorkflow } -func (wfa *Workflow) CheckBooking(subPath string, caller *tools.HTTPCaller) (bool, error) { +func (wfa *Workflow) CheckBooking(caller *tools.HTTPCaller) (bool, error) { // check if - if wfa.Schedule == nil || wfa.Schedule.Start == nil || wfa.Graph == nil { + if wfa.Graph == nil { return false, nil } - if wfa.Schedule.End == nil { - // if no end... then Book like a savage - return true, nil - } - e := *wfa.Schedule.End accessor := (&datacenter.DatacenterResource{}).GetAccessor(nil) for _, link := range wfa.Graph.Links { if ok, dc_id := wfa.isDCLink(link); ok { @@ -60,14 +54,7 @@ func (wfa *Workflow) CheckBooking(subPath string, caller *tools.HTTPCaller) (boo if peerID == "" { return false, errors.New("no peer id") } - p, code, err := (&peer.Peer{}).GetAccessor(nil).LoadOne(peerID) - if code != 200 { - return false, err - } - subPath = strings.ReplaceAll(subPath, ":datacenter_id", fmt.Sprintf("%v", dc_id)) - subPath = strings.ReplaceAll(subPath, ":start_date", wfa.getFormat(wfa.Schedule.Start)) - subPath = strings.ReplaceAll(subPath, ":end_date", wfa.getFormat(&e)) - _, err = p.(*peer.Peer).LaunchPeerExecution(peerID, "", p.(*peer.Peer).Url+subPath, utils.BOOKING, tools.GET, nil, caller) + _, err := (&peer.Peer{}).LaunchPeerExecution(peerID, dc_id, utils.BOOKING, tools.GET, nil, caller) if err != nil { return false, err } diff --git a/models/workflow/workflow_mongo_accessor.go b/models/workflow/workflow_mongo_accessor.go index 3f0ac38..61e97ea 100644 --- a/models/workflow/workflow_mongo_accessor.go +++ b/models/workflow/workflow_mongo_accessor.go @@ -112,16 +112,10 @@ func (wfa *workflowMongoAccessor) book(id string, realData *Workflow, execs []*w } // CHECK BOOKING peerID := dc.(*datacenter.DatacenterResource).PeerID - fmt.Println("PEER ID", peerID) if peerID == "" { continue } - paccess := (&peer.Peer{}) - p, code, _ := paccess.GetAccessor(nil).LoadOne(peerID) - if code != 200 { - continue - } - b, err := paccess.LaunchPeerExecution(p.GetID(), "", p.(*peer.Peer).Url, utils.BOOKING, tools.POST, + b, err := (&peer.Peer{}).LaunchPeerExecution(peerID, "", utils.BOOKING, tools.POST, (&workflow_execution.WorkflowExecutions{ WorkflowID: id, ResourceID: dc_id, diff --git a/models/workspace/shared/shared_workspace_mongo_accessor.go b/models/workspace/shared/shared_workspace_mongo_accessor.go index 3560a46..df86b9a 100644 --- a/models/workspace/shared/shared_workspace_mongo_accessor.go +++ b/models/workspace/shared/shared_workspace_mongo_accessor.go @@ -43,12 +43,9 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkspace(shared *SharedWorkspace } paccess := (&peer.Peer{}) for _, p := range shared.Peers { - pp, code, _ := paccess.GetAccessor(nil).LoadOne(p) - if code == 200 { - b, err := paccess.LaunchPeerExecution(p, v, pp.(*peer.Peer).Url, utils.WORKSPACE, tools.DELETE, nil, wfa.Caller) - if err != nil && b == nil { - wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error()) - } + b, err := paccess.LaunchPeerExecution(p, v, utils.WORKSPACE, tools.DELETE, nil, wfa.Caller) + if err != nil && b == nil { + wfa.Logger.Error().Msg("Could not send to peer " + p + ". Error: " + err.Error()) } } } @@ -65,12 +62,9 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkspace(shared *SharedWorkspace continue } paccess := (&peer.Peer{}) - pp, code, _ := paccess.GetAccessor(nil).LoadOne(p) - if code == 200 { - b, err := paccess.LaunchPeerExecution(p, v, pp.(*peer.Peer).Url, utils.WORKSPACE, tools.POST, workspace.Serialize(), wfa.Caller) - if err != nil && b == nil { - wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error()) - } + b, err := paccess.LaunchPeerExecution(p, v, utils.WORKSPACE, tools.POST, workspace.Serialize(), wfa.Caller) + if err != nil && b == nil { + wfa.Logger.Error().Msg("Could not send to peer " + p + ". Error: " + err.Error()) } } } @@ -101,13 +95,9 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace, } paccess := (&peer.Peer{}) for _, p := range shared.Peers { - pp, code, _ := paccess.GetAccessor(nil).LoadOne(p) - if code != 200 { - continue - } - b, err := paccess.LaunchPeerExecution(p, v, pp.(*peer.Peer).Url, utils.WORKFLOW, tools.DELETE, nil, wfa.Caller) + b, err := paccess.LaunchPeerExecution(p, v, utils.WORKFLOW, tools.DELETE, nil, wfa.Caller) if err != nil && b == nil { - wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error()) + wfa.Logger.Error().Msg("Could not send to peer " + p + ". Error: " + err.Error()) } } } @@ -127,14 +117,10 @@ func (wfa *sharedWorkspaceMongoAccessor) sharedWorkflow(shared *SharedWorkspace, } paccess := (&peer.Peer{}) for _, p := range shared.Peers { - if code != 200 { - continue - } - pp, code, _ := paccess.GetAccessor(nil).LoadOne(p) if code == 200 { - b, err := paccess.LaunchPeerExecution(p, shared.UUID, pp.(*peer.Peer).Url, utils.WORKFLOW, tools.POST, workflow.Serialize(), wfa.Caller) + b, err := paccess.LaunchPeerExecution(p, shared.UUID, utils.WORKFLOW, tools.POST, workflow.Serialize(), wfa.Caller) if err != nil && b == nil { - wfa.Logger.Error().Msg("Could not send to peer " + pp.(*peer.Peer).Url + ". Error: " + err.Error()) + wfa.Logger.Error().Msg("Could not send to peer " + p + ". Error: " + err.Error()) } } } @@ -153,13 +139,9 @@ func (wfa *sharedWorkspaceMongoAccessor) deleteToPeer(shared *SharedWorkspace) { if (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: v}}).IsMySelf() { continue } - p, code, _ := paccess.GetAccessor(nil).LoadOne(v) - if code != 200 { - continue - } - b, err := paccess.LaunchPeerExecution(p.GetID(), shared.UUID, p.(*peer.Peer).Url, utils.SHARED_WORKSPACE, tools.DELETE, nil, wfa.Caller) + b, err := paccess.LaunchPeerExecution(v, shared.UUID, utils.SHARED_WORKSPACE, tools.DELETE, nil, wfa.Caller) if err != nil && b == nil { - wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error: " + err.Error()) + wfa.Logger.Error().Msg("Could not send to peer " + v + ". Error: " + err.Error()) } } } @@ -173,13 +155,9 @@ func (wfa *sharedWorkspaceMongoAccessor) sendToPeer(shared *SharedWorkspace) { if (&peer.Peer{AbstractObject: utils.AbstractObject{UUID: v}}).IsMySelf() { continue } - p, code, _ := paccess.GetAccessor(nil).LoadOne(v) - if code != 200 { - continue - } - b, err := paccess.LaunchPeerExecution(p.GetID(), v, p.(*peer.Peer).Url, utils.SHARED_WORKSPACE, tools.POST, shared.Serialize(), wfa.Caller) + b, err := paccess.LaunchPeerExecution(v, v, utils.SHARED_WORKSPACE, tools.POST, shared.Serialize(), wfa.Caller) if err != nil && b == nil { - wfa.Logger.Error().Msg("Could not send to peer " + p.(*peer.Peer).Url + ". Error: " + err.Error()) + wfa.Logger.Error().Msg("Could not send to peer " + v + ". Error: " + err.Error()) } } } diff --git a/tools/remote_caller.go b/tools/remote_caller.go index 320d690..a72c4c9 100644 --- a/tools/remote_caller.go +++ b/tools/remote_caller.go @@ -16,6 +16,23 @@ const ( DELETE ) +func (m METHOD) String() string { + return [...]string{"GET", "PUT", "POST", "DELETE"}[m] +} + +func (m METHOD) EnumIndex() int { + return int(m) +} + +func ToMethod(str string) METHOD { + for _, s := range []METHOD{GET, PUT, POST, DELETE} { + if s.String() == str { + return s + } + } + return GET +} + var HTTPCallerInstance = &HTTPCaller{} type HTTPCaller struct {