From e4ee33dcbe75b37e0bdc8410b36caed2824775f7 Mon Sep 17 00:00:00 2001 From: mr Date: Fri, 23 Aug 2024 09:01:28 +0200 Subject: [PATCH] No discovery needed --- entrypoint.go | 27 --------------------------- models/peer/peer.go | 6 +++--- models/utils/enums.go | 3 --- tools/api.go | 35 ++++++++++++++++++++++++----------- tools/nats_caller.go | 39 --------------------------------------- 5 files changed, 27 insertions(+), 83 deletions(-) diff --git a/entrypoint.go b/entrypoint.go index 4a2c682..9fadc78 100644 --- a/entrypoint.go +++ b/entrypoint.go @@ -10,7 +10,6 @@ import ( "cloud.o-forge.io/core/oc-lib/dbs/mongo" "cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/models" - "cloud.o-forge.io/core/oc-lib/models/discovery" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resource_model" "cloud.o-forge.io/core/oc-lib/models/resources/data" @@ -46,7 +45,6 @@ const ( SHARED_WORKSPACE = utils.SHARED_WORKSPACE RULE = utils.RULE BOOKING = utils.BOOKING - DISCOVERY = utils.DISCOVERY ) func (d LibDataEnum) API() string { @@ -96,7 +94,6 @@ func Init(appName string, hostname string, port string) { }() logs.SetAppName(appName) logs.SetLogger(logs.CreateLogger("main", "")) - logger := logs.GetLogger() mongo.MONGOService.Init(models.GetModelsNames(), tools.GetConfig()) accessor := (&resource_model.ResourceModel{}).GetAccessor(nil) for _, model := range []string{utils.DATA_RESOURCE.String(), utils.PROCESSING_RESOURCE.String(), utils.STORAGE_RESOURCE.String(), utils.DATACENTER_RESOURCE.String(), utils.WORKFLOW_RESOURCE.String()} { @@ -123,23 +120,6 @@ func Init(appName string, hostname string, port string) { }) } } - discoveryAccess := (&discovery.Discovery{}).GetAccessor(nil) - res, code, _ := discoveryAccess.Search(nil, appName) - initial := &discovery.Discovery{ - AbstractObject: utils.AbstractObject{ - Name: appName, - }, - Host: hostname, - Port: port, - State: 1, - } - if code == 200 && len(res) == 0 { - discoveryAccess.StoreOne(initial) - } - err := tools.NewNATSCaller().DiscoveryNATS(appName, initial) - if err != nil { - logger.Error().Msg(err.Error()) - } } func GetLogger() zerolog.Logger { @@ -362,10 +342,3 @@ func (l *LibData) ToWorkflowExecution() *workflow_execution.WorkflowExecution { } return nil } - -func (l *LibData) ToDiscovery() *discovery.Discovery { - if l.Data.GetAccessor(nil).GetType() == utils.DISCOVERY.String() { - return l.Data.(*discovery.Discovery) - } - return nil -} diff --git a/models/peer/peer.go b/models/peer/peer.go index 086a132..afd69a4 100644 --- a/models/peer/peer.go +++ b/models/peer/peer.go @@ -11,9 +11,9 @@ import ( type Peer struct { utils.AbstractObject - Url string `json:"url,omitempty" bson:"url,omitempty" validate:"required,base64url"` - PublicKey string `json:"public_key,omitempty" bson:"public_key,omitempty"` - State tools.State `json:"state,omitempty" bson:"state,omitempty"` + Url string `json:"url,omitempty" bson:"url,omitempty" validate:"required,base64url"` + PublicKey string `json:"public_key,omitempty" bson:"public_key,omitempty"` + Services map[string]int `json:"services,omitempty" bson:"services,omitempty"` } func (ao *Peer) IsMySelf() bool { diff --git a/models/utils/enums.go b/models/utils/enums.go index e5a1853..c0c537f 100644 --- a/models/utils/enums.go +++ b/models/utils/enums.go @@ -17,7 +17,6 @@ const ( SHARED_WORKSPACE RULE BOOKING - DISCOVERY ) var DefaultAPI = [...]string{ @@ -35,7 +34,6 @@ var DefaultAPI = [...]string{ "oc-shared", "oc-shared", "oc-datacenter", - "oc-peers", } var Str = [...]string{ @@ -53,7 +51,6 @@ var Str = [...]string{ "shared_workspace", "rule", "booking", - "discovery", } func FromInt(i int) string { diff --git a/tools/api.go b/tools/api.go index 7df8a3f..7f0fb9a 100644 --- a/tools/api.go +++ b/tools/api.go @@ -21,6 +21,10 @@ const ( WAITING ) +func (s State) EnumIndex() int { + return int(s) +} + func ToState(str string) State { for _, s := range []State{ALIVE, REDUCED_SERVICE, UNPROCESSABLE_ENTITY, DB_FALLOUT, TEAPOT, DEAD, WAITING} { if s.String() == str { @@ -57,39 +61,47 @@ func (a *API) GetState() (State, int, error) { return ALIVE, 200, nil } -func (a *API) CheckRemotePeer(url string) State { +func (a *API) CheckRemotePeer(url string) (State, map[string]int) { // Check if the database is up caller := NewHTTPCaller(map[string]map[METHOD]string{}) var resp APIStatusResponse b, err := caller.CallPost(url, "/status", []string{}) if err != nil { - return DEAD + return DEAD, map[string]int{} } json.Unmarshal(b, &resp) if resp.Data == nil { - return DEAD + return DEAD, map[string]int{} } - return ToState(resp.Data.State) + new := map[string]int{} + for k, v := range resp.Data.Services { + new[k] = ToState(v).EnumIndex() + } + return ToState(resp.Data.State), new } -func (a *API) CheckRemoteAPIs(urls []string) (State, int, error) { +func (a *API) CheckRemoteAPIs(urls []string) (State, map[string]int, error) { // Check if the database is up + new := map[string]int{} caller := NewHTTPCaller(map[string]map[METHOD]string{}) for _, url := range urls { var resp APIStatusResponse b, err := caller.CallGet(url, "/version/status") if err != nil { - return REDUCED_SERVICE, 200, err + return REDUCED_SERVICE, new, err } json.Unmarshal(b, &resp) if resp.Data == nil { - return DEAD, 200, errors.New(url + " -> is DEAD") + return DEAD, new, errors.New(url + " -> is DEAD") + } + for k, v := range resp.Data.Services { + new[k] = ToState(v).EnumIndex() } if resp.Data.Code != 0 { - return REDUCED_SERVICE, 200, errors.New(url + " -> " + resp.Error) + return REDUCED_SERVICE, new, errors.New(url + " -> " + resp.Error) } } - return ALIVE, 200, nil + return ALIVE, new, nil } type APIStatusResponse struct { @@ -98,6 +110,7 @@ type APIStatusResponse struct { } type APIStatus struct { - Code int `json:"code"` - State string `json:"state"` + Code int `json:"code"` + State string `json:"state"` + Services map[string]string `json:"services"` } diff --git a/tools/nats_caller.go b/tools/nats_caller.go index c53767e..0c2eda5 100644 --- a/tools/nats_caller.go +++ b/tools/nats_caller.go @@ -2,10 +2,8 @@ package tools import ( "encoding/json" - "errors" "strings" - "cloud.o-forge.io/core/oc-lib/logs" "github.com/nats-io/nats.go" ) @@ -14,7 +12,6 @@ type NATSMethod int const ( REMOVE NATSMethod = iota CREATE - DISCOVERY ) func NameToMethod(name string) NATSMethod { @@ -59,39 +56,3 @@ func (o *natsCaller) SetNATSPub(dataName string, method NATSMethod, data interfa } return "" } - -type NATSObjectInterface interface { - Serialize() map[string]interface{} -} - -func (o *natsCaller) DiscoveryNATS(name string, model NATSObjectInterface) error { - if GetConfig().NATSUrl == "" { - return errors.New("NATS_SERVER is not set") - } - nc, err := nats.Connect(GetConfig().NATSUrl) - if err != nil { - return errors.New("Could not connect to NATS") - } - go o.listenForChange(nc, model, DISCOVERY.GenerateKey("ask")) - go o.listenForChange(nc, model, DISCOVERY.GenerateKey(name)) - return nil -} - -func (o *natsCaller) listenForChange(nc *nats.Conn, model NATSObjectInterface, chanName string) { - api := API{} - logger := logs.GetLogger() - ch := make(chan *nats.Msg, 64) - subs, err := nc.ChanSubscribe(chanName, ch) - if err != nil { - logger.Error().Msg("Error listening to NATS : " + err.Error()) - return - } - defer subs.Unsubscribe() - for msg := range ch { - logger.Info().Msg("Received message from NATS : " + string(msg.Data)) - m := model.Serialize() - s, _, _ := api.GetState() - m["state"] = s - o.SetNATSPub("answer", DISCOVERY, m) - } -}