package tools import ( "encoding/json" "errors" "strings" "sync" "cloud.o-forge.io/core/oc-lib/logs" "github.com/nats-io/nats.go" ) type NATSMethod int const ( REMOVE NATSMethod = iota CREATE DISCOVERY ) func NameToMethod(name string) NATSMethod { for _, v := range [...]NATSMethod{REMOVE, CREATE} { if strings.Contains(strings.ToLower(v.String()), strings.ToLower(name)) { return v } } return -1 } func (d NATSMethod) GenerateKey(name string) string { return name + "_" + d.String() } func (d NATSMethod) String() string { return [...]string{"remove", "create", "discovery"}[d] } type natsCaller struct{} func NewNATSCaller() *natsCaller { return &natsCaller{} } func (o *natsCaller) SetNATSPub(dataName string, method NATSMethod, data interface{}) string { if GetConfig().NATSUrl == "" { return " -> NATS_SERVER is not set" } nc, err := nats.Connect(GetConfig().NATSUrl) if err != nil { return " -> Could not reach NATS server : " + err.Error() } defer nc.Close() js, err := json.Marshal(data) if err != nil { return " -> " + err.Error() } err = nc.Publish(method.GenerateKey(dataName), js) if err != nil { return " -> " + err.Error() } return "" } type NATSObjectInterface interface { Serialize() map[string]interface{} } func (o *natsCaller) DiscoveryNATS(name string, model NATSObjectInterface) error { if GetConfig().NATSUrl == "" { return errors.New("NATS_SERVER is not set") } nc, err := nats.Connect(GetConfig().NATSUrl) if err != nil { return errors.New("Could not connect to NATS") } defer nc.Close() wg := &sync.WaitGroup{} wg.Add(2) go o.listenForChange(nc, model, DISCOVERY.GenerateKey("ask"), wg) go o.listenForChange(nc, model, DISCOVERY.GenerateKey(name), wg) wg.Wait() return nil } func (o *natsCaller) listenForChange(nc *nats.Conn, model NATSObjectInterface, chanName string, wg *sync.WaitGroup) { defer wg.Done() api := API{} logger := logs.GetLogger() ch := make(chan *nats.Msg, 64) subs, err := nc.ChanSubscribe(chanName, ch) if err != nil { logger.Error().Msg("Error listening to NATS : " + err.Error()) return } defer subs.Unsubscribe() for msg := range ch { logger.Info().Msg("Received message from NATS : " + string(msg.Data)) m := model.Serialize() s, _, _ := api.GetState() m["state"] = s o.SetNATSPub("answer", DISCOVERY, m) } }