wait for NATS
This commit is contained in:
parent
e84d262f38
commit
2c8dcbe93d
@ -110,14 +110,14 @@ func (a *API) SubscribeRouter(infos []*beego.ControllerInfo) {
|
||||
}
|
||||
}
|
||||
}
|
||||
nats.SetNATSPub("api", DISCOVERY, discovery)
|
||||
go nats.SetNATSPub("api", DISCOVERY, discovery)
|
||||
}
|
||||
|
||||
// CheckRemotePeer checks the state of a remote peer
|
||||
func (a *API) CheckRemotePeer(url string) (State, map[string]int) {
|
||||
// Check if the database is up
|
||||
var resp APIStatusResponse
|
||||
caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller
|
||||
caller := NewHTTPCaller(map[DataType]map[METHOD]string{}) // Create a new http caller
|
||||
b, err := caller.CallPost(url, "", map[string]interface{}{}) // Call the status endpoint of the peer
|
||||
if err != nil {
|
||||
return DEAD, map[string]int{} // If the peer is not reachable, return dead
|
||||
|
@ -3,6 +3,7 @@ package tools
|
||||
import (
|
||||
"encoding/json"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"cloud.o-forge.io/core/oc-lib/config"
|
||||
"cloud.o-forge.io/core/oc-lib/logs"
|
||||
@ -53,22 +54,25 @@ func (s *natsCaller) ListenNats(chanName string, exec func(msg map[string]interf
|
||||
log.Error().Msg(" -> NATS_SERVER is not set")
|
||||
return
|
||||
}
|
||||
nc, err := nats.Connect(config.GetConfig().NATSUrl)
|
||||
if err != nil {
|
||||
log.Error().Msg(" -> Could not reach NATS server : " + err.Error())
|
||||
return
|
||||
}
|
||||
ch := make(chan *nats.Msg, 64)
|
||||
subs, err := nc.ChanSubscribe(chanName, ch)
|
||||
if err != nil {
|
||||
log.Error().Msg("Error listening to NATS : " + err.Error())
|
||||
}
|
||||
defer subs.Unsubscribe()
|
||||
for {
|
||||
nc, err := nats.Connect(config.GetConfig().NATSUrl)
|
||||
if err != nil {
|
||||
time.Sleep(1 * time.Minute)
|
||||
continue
|
||||
}
|
||||
ch := make(chan *nats.Msg, 64)
|
||||
subs, err := nc.ChanSubscribe(chanName, ch)
|
||||
if err != nil {
|
||||
log.Error().Msg("Error listening to NATS : " + err.Error())
|
||||
}
|
||||
defer subs.Unsubscribe()
|
||||
|
||||
for msg := range ch {
|
||||
map_mess := map[string]interface{}{}
|
||||
json.Unmarshal(msg.Data, &map_mess)
|
||||
exec(map_mess)
|
||||
for msg := range ch {
|
||||
map_mess := map[string]interface{}{}
|
||||
json.Unmarshal(msg.Data, &map_mess)
|
||||
exec(map_mess)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
@ -77,18 +81,23 @@ func (o *natsCaller) SetNATSPub(dataName string, method NATSMethod, data interfa
|
||||
if config.GetConfig().NATSUrl == "" {
|
||||
return " -> NATS_SERVER is not set"
|
||||
}
|
||||
nc, err := nats.Connect(config.GetConfig().NATSUrl)
|
||||
if err != nil {
|
||||
return " -> Could not reach NATS server : " + err.Error()
|
||||
}
|
||||
defer nc.Close()
|
||||
js, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return " -> " + err.Error()
|
||||
}
|
||||
err = nc.Publish(method.GenerateKey(dataName), js) // Publish the message on the NATS server with a channel name based on the data name (or whatever start) and the method
|
||||
if err != nil {
|
||||
return " -> " + err.Error() // Return an error if the message could not be published
|
||||
for {
|
||||
nc, err := nats.Connect(config.GetConfig().NATSUrl)
|
||||
if err != nil {
|
||||
time.Sleep(1 * time.Minute)
|
||||
continue
|
||||
}
|
||||
defer nc.Close()
|
||||
js, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return " -> " + err.Error()
|
||||
}
|
||||
err = nc.Publish(method.GenerateKey(dataName), js) // Publish the message on the NATS server with a channel name based on the data name (or whatever start) and the method
|
||||
if err != nil {
|
||||
time.Sleep(1 * time.Minute)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user