diff --git a/tools/api.go b/tools/api.go index 7b09cb0..50623b1 100644 --- a/tools/api.go +++ b/tools/api.go @@ -110,14 +110,14 @@ func (a *API) SubscribeRouter(infos []*beego.ControllerInfo) { } } } - nats.SetNATSPub("api", DISCOVERY, discovery) + go nats.SetNATSPub("api", DISCOVERY, discovery) } // CheckRemotePeer checks the state of a remote peer func (a *API) CheckRemotePeer(url string) (State, map[string]int) { // Check if the database is up var resp APIStatusResponse - caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller + caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller b, err := caller.CallPost(url, "", map[string]interface{}{}) // Call the status endpoint of the peer if err != nil { return DEAD, map[string]int{} // If the peer is not reachable, return dead diff --git a/tools/nats_caller.go b/tools/nats_caller.go index 7ef63e2..d03e269 100644 --- a/tools/nats_caller.go +++ b/tools/nats_caller.go @@ -3,6 +3,7 @@ package tools import ( "encoding/json" "strings" + "time" "cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/logs" @@ -53,22 +54,25 @@ func (s *natsCaller) ListenNats(chanName string, exec func(msg map[string]interf log.Error().Msg(" -> NATS_SERVER is not set") return } - nc, err := nats.Connect(config.GetConfig().NATSUrl) - if err != nil { - log.Error().Msg(" -> Could not reach NATS server : " + err.Error()) - return - } - ch := make(chan *nats.Msg, 64) - subs, err := nc.ChanSubscribe(chanName, ch) - if err != nil { - log.Error().Msg("Error listening to NATS : " + err.Error()) - } - defer subs.Unsubscribe() + for { + nc, err := nats.Connect(config.GetConfig().NATSUrl) + if err != nil { + time.Sleep(1 * time.Minute) + continue + } + ch := make(chan *nats.Msg, 64) + subs, err := nc.ChanSubscribe(chanName, ch) + if err != nil { + log.Error().Msg("Error listening to NATS : " + err.Error()) + } + defer subs.Unsubscribe() - for msg := range ch { - map_mess := map[string]interface{}{} - json.Unmarshal(msg.Data, &map_mess) - exec(map_mess) + for msg := range ch { + map_mess := map[string]interface{}{} + json.Unmarshal(msg.Data, &map_mess) + exec(map_mess) + } + break } } @@ -77,18 +81,23 @@ func (o *natsCaller) SetNATSPub(dataName string, method NATSMethod, data interfa if config.GetConfig().NATSUrl == "" { return " -> NATS_SERVER is not set" } - nc, err := nats.Connect(config.GetConfig().NATSUrl) - if err != nil { - return " -> Could not reach NATS server : " + err.Error() - } - defer nc.Close() - js, err := json.Marshal(data) - if err != nil { - return " -> " + err.Error() - } - err = nc.Publish(method.GenerateKey(dataName), js) // Publish the message on the NATS server with a channel name based on the data name (or whatever start) and the method - if err != nil { - return " -> " + err.Error() // Return an error if the message could not be published + for { + nc, err := nats.Connect(config.GetConfig().NATSUrl) + if err != nil { + time.Sleep(1 * time.Minute) + continue + } + defer nc.Close() + js, err := json.Marshal(data) + if err != nil { + return " -> " + err.Error() + } + err = nc.Publish(method.GenerateKey(dataName), js) // Publish the message on the NATS server with a channel name based on the data name (or whatever start) and the method + if err != nil { + time.Sleep(1 * time.Minute) + continue + } + break } return "" }