diff --git a/tools/api.go b/tools/api.go index a8408ee..f54fa61 100644 --- a/tools/api.go +++ b/tools/api.go @@ -6,6 +6,7 @@ import ( "cloud.o-forge.io/core/oc-lib/config" "cloud.o-forge.io/core/oc-lib/dbs/mongo" + beego "github.com/beego/beego/v2/server/web" ) /* @@ -70,6 +71,24 @@ func (a *API) GetState() (State, int, error) { return ALIVE, 200, nil // If everything is up, return alive } +func (a *API) ListenRouter(exec func(msg map[string]interface{})) { + nats := NewNATSCaller() + go nats.ListenNats(DISCOVERY.GenerateKey("api"), exec) +} + +func (a *API) SubscribeRouter() { + nats := NewNATSCaller() + discovery := map[string]interface{}{} + for _, info := range beego.BeeApp.Handlers.GetAllControllerInfo() { + methods := []string{} + for k := range info.GetMethod() { + methods = append(methods, k) + } + discovery[info.GetPattern()] = methods + } + nats.SetNATSPub("api", DISCOVERY, discovery) +} + // CheckRemotePeer checks the state of a remote peer func (a *API) CheckRemotePeer(url string) (State, map[string]int) { // Check if the database is up diff --git a/tools/nats_caller.go b/tools/nats_caller.go index d4aa7fc..7ef63e2 100644 --- a/tools/nats_caller.go +++ b/tools/nats_caller.go @@ -5,6 +5,7 @@ import ( "strings" "cloud.o-forge.io/core/oc-lib/config" + "cloud.o-forge.io/core/oc-lib/logs" "github.com/nats-io/nats.go" ) @@ -14,6 +15,7 @@ type NATSMethod int const ( REMOVE NATSMethod = iota CREATE + DISCOVERY ) // NameToMethod returns the NATSMethod enum value from a string @@ -43,6 +45,33 @@ func NewNATSCaller() *natsCaller { return &natsCaller{} } +// on workflows' scheduling. Messages must contain +// workflow execution ID, to allow retrieval of execution infos +func (s *natsCaller) ListenNats(chanName string, exec func(msg map[string]interface{})) { + log := logs.GetLogger() + if config.GetConfig().NATSUrl == "" { + log.Error().Msg(" -> NATS_SERVER is not set") + return + } + nc, err := nats.Connect(config.GetConfig().NATSUrl) + if err != nil { + log.Error().Msg(" -> Could not reach NATS server : " + err.Error()) + return + } + ch := make(chan *nats.Msg, 64) + subs, err := nc.ChanSubscribe(chanName, ch) + if err != nil { + log.Error().Msg("Error listening to NATS : " + err.Error()) + } + defer subs.Unsubscribe() + + for msg := range ch { + map_mess := map[string]interface{}{} + json.Unmarshal(msg.Data, &map_mess) + exec(map_mess) + } +} + // SetNATSPub sets a message to the NATS server func (o *natsCaller) SetNATSPub(dataName string, method NATSMethod, data interface{}) string { if config.GetConfig().NATSUrl == "" {