From 0bfe87793bcc67539956613f1e4256ecc8287e42 Mon Sep 17 00:00:00 2001 From: mr Date: Thu, 22 Aug 2024 13:11:21 +0200 Subject: [PATCH] Discovery Nats automate --- entrypoint.go | 29 ++++++- ..._accessor.go => booking_mongo_accessor.go} | 20 ++--- models/discovery/discovery.go | 54 ++++++++++++ models/discovery/discovery_mongo_accessor.go | 83 +++++++++++++++++++ models/models.go | 2 + models/utils/enums.go | 3 + tools/api.go | 3 +- tools/nats_caller.go | 46 ++++++++++ 8 files changed, 227 insertions(+), 13 deletions(-) rename models/booking/{booking_execution_mongo_accessor.go => booking_mongo_accessor.go} (68%) create mode 100644 models/discovery/discovery.go create mode 100644 models/discovery/discovery_mongo_accessor.go diff --git a/entrypoint.go b/entrypoint.go index b1b54ac..ad876e0 100644 --- a/entrypoint.go +++ b/entrypoint.go @@ -8,6 +8,7 @@ import ( "cloud.o-forge.io/core/oc-lib/dbs/mongo" "cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/models" + "cloud.o-forge.io/core/oc-lib/models/discovery" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resource_model" "cloud.o-forge.io/core/oc-lib/models/resources/data" @@ -43,6 +44,7 @@ const ( SHARED_WORKSPACE = utils.SHARED_WORKSPACE RULE = utils.RULE BOOKING = utils.BOOKING + DISCOVERY = utils.DISCOVERY ) func (d LibDataEnum) API() string { @@ -83,7 +85,7 @@ func AddPath(collection LibDataEnum, path string) { paths[collection] = path } -func Init(appName string) { +func Init(appName string, hostname string, port string) { defer func() { if r := recover(); r != nil { tools.UncatchedError = append(tools.UncatchedError, errors.New("Panic recovered in Init : "+fmt.Sprintf("%v", r))) @@ -91,6 +93,24 @@ func Init(appName string) { }() logs.SetAppName(appName) logs.SetLogger(logs.CreateLogger("main", "")) + logger := logs.GetLogger() + discoveryAccess := (&discovery.Discovery{}).GetAccessor(nil) + res, code, _ := discoveryAccess.Search(nil, appName) + initial := &discovery.Discovery{ + AbstractObject: utils.AbstractObject{ + Name: appName, + }, + Host: hostname, + Port: port, + State: 1, + } + if code == 200 && len(res) == 0 { + discoveryAccess.StoreOne(initial) + } + err := tools.NewNATSCaller().DiscoveryNATS(appName, initial) + if err != nil { + logger.Error().Msg(err.Error()) + } mongo.MONGOService.Init(models.GetModelsNames(), tools.GetConfig()) accessor := (&resource_model.ResourceModel{}).GetAccessor(nil) for _, model := range []string{utils.DATA_RESOURCE.String(), utils.PROCESSING_RESOURCE.String(), utils.STORAGE_RESOURCE.String(), utils.DATACENTER_RESOURCE.String(), utils.WORKFLOW_RESOURCE.String()} { @@ -339,3 +359,10 @@ func (l *LibData) ToWorkflowExecution() *workflow_execution.WorkflowExecution { } return nil } + +func (l *LibData) ToDiscovery() *discovery.Discovery { + if l.Data.GetAccessor(nil).GetType() == utils.DISCOVERY.String() { + return l.Data.(*discovery.Discovery) + } + return nil +} diff --git a/models/booking/booking_execution_mongo_accessor.go b/models/booking/booking_mongo_accessor.go similarity index 68% rename from models/booking/booking_execution_mongo_accessor.go rename to models/booking/booking_mongo_accessor.go index 891d450..d8d7b72 100644 --- a/models/booking/booking_execution_mongo_accessor.go +++ b/models/booking/booking_mongo_accessor.go @@ -6,31 +6,31 @@ import ( "cloud.o-forge.io/core/oc-lib/models/utils" ) -type bookingExecutionMongoAccessor struct { +type bookingMongoAccessor struct { utils.AbstractAccessor } -func New() *bookingExecutionMongoAccessor { - return &bookingExecutionMongoAccessor{} +func New() *bookingMongoAccessor { + return &bookingMongoAccessor{} } -func (wfa *bookingExecutionMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { +func (wfa *bookingMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { return wfa.GenericDeleteOne(id, wfa) } -func (wfa *bookingExecutionMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { +func (wfa *bookingMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { return wfa.GenericUpdateOne(set, id, wfa, &Booking{}) } -func (wfa *bookingExecutionMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { +func (wfa *bookingMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { return wfa.GenericStoreOne(data, wfa) } -func (wfa *bookingExecutionMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) { +func (wfa *bookingMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) { return wfa.GenericStoreOne(data, wfa) } -func (wfa *bookingExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { +func (wfa *bookingMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { var workflow Booking res_mongo, code, err := mongo.MONGOService.LoadOne(id, wfa.GetType()) if err != nil { @@ -41,7 +41,7 @@ func (wfa *bookingExecutionMongoAccessor) LoadOne(id string) (utils.DBObject, in return &workflow, 200, nil } -func (wfa bookingExecutionMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) { +func (wfa bookingMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) { objs := []utils.ShallowDBObject{} res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType()) if err != nil { @@ -58,7 +58,7 @@ func (wfa bookingExecutionMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int return objs, 200, nil } -func (wfa *bookingExecutionMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) { +func (wfa *bookingMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) { objs := []utils.ShallowDBObject{} if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" { filters = &dbs.Filters{ diff --git a/models/discovery/discovery.go b/models/discovery/discovery.go new file mode 100644 index 0000000..4637ba3 --- /dev/null +++ b/models/discovery/discovery.go @@ -0,0 +1,54 @@ +package discovery + +import ( + "encoding/json" + + "cloud.o-forge.io/core/oc-lib/models/utils" + "cloud.o-forge.io/core/oc-lib/tools" + "github.com/google/uuid" +) + +type Discovery struct { + utils.AbstractObject + Host string `json:"host,omitempty" bson:"host,omitempty"` + Port string `json:"port,omitempty" bson:"port,omitempty"` + State int `json:"state,omitempty" bson:"state,omitempty"` +} + +func (ao *Discovery) GetID() string { + return ao.UUID +} + +func (r *Discovery) GenerateID() { + r.UUID = uuid.New().String() +} + +func (d *Discovery) GetName() string { + return d.Name +} + +func (d *Discovery) GetAccessor(caller *tools.HTTPCaller) utils.Accessor { + data := New() + data.Init(utils.BOOKING, caller) + return data +} + +func (dma *Discovery) Deserialize(j map[string]interface{}) utils.DBObject { + d := &Discovery{} + b, err := json.Marshal(j) + if err != nil { + return nil + } + json.Unmarshal(b, d) + return d +} + +func (dma *Discovery) Serialize() map[string]interface{} { + var m map[string]interface{} + b, err := json.Marshal(dma) + if err != nil { + return nil + } + json.Unmarshal(b, &m) + return m +} diff --git a/models/discovery/discovery_mongo_accessor.go b/models/discovery/discovery_mongo_accessor.go new file mode 100644 index 0000000..10c04e2 --- /dev/null +++ b/models/discovery/discovery_mongo_accessor.go @@ -0,0 +1,83 @@ +package discovery + +import ( + "cloud.o-forge.io/core/oc-lib/dbs" + "cloud.o-forge.io/core/oc-lib/dbs/mongo" + "cloud.o-forge.io/core/oc-lib/models/utils" +) + +type discoveryMongoAccessor struct { + utils.AbstractAccessor +} + +func New() *discoveryMongoAccessor { + return &discoveryMongoAccessor{} +} + +func (wfa *discoveryMongoAccessor) DeleteOne(id string) (utils.DBObject, int, error) { + return wfa.GenericDeleteOne(id, wfa) +} + +func (wfa *discoveryMongoAccessor) UpdateOne(set utils.DBObject, id string) (utils.DBObject, int, error) { + return wfa.GenericUpdateOne(set, id, wfa, &Discovery{}) +} + +func (wfa *discoveryMongoAccessor) StoreOne(data utils.DBObject) (utils.DBObject, int, error) { + return wfa.GenericStoreOne(data, wfa) +} + +func (wfa *discoveryMongoAccessor) CopyOne(data utils.DBObject) (utils.DBObject, int, error) { + return wfa.GenericStoreOne(data, wfa) +} + +func (wfa *discoveryMongoAccessor) LoadOne(id string) (utils.DBObject, int, error) { + var workflow Discovery + res_mongo, code, err := mongo.MONGOService.LoadOne(id, wfa.GetType()) + if err != nil { + wfa.Logger.Error().Msg("Could not retrieve " + id + " from db. Error: " + err.Error()) + return nil, code, err + } + res_mongo.Decode(&workflow) + return &workflow, 200, nil +} + +func (wfa discoveryMongoAccessor) LoadAll() ([]utils.ShallowDBObject, int, error) { + objs := []utils.ShallowDBObject{} + res_mongo, code, err := mongo.MONGOService.LoadAll(wfa.GetType()) + if err != nil { + wfa.Logger.Error().Msg("Could not retrieve any from db. Error: " + err.Error()) + return nil, code, err + } + var results []Discovery + if err = res_mongo.All(mongo.MngoCtx, &results); err != nil { + return nil, 404, err + } + for _, r := range results { + objs = append(objs, &r.AbstractObject) + } + return objs, 200, nil +} + +func (wfa *discoveryMongoAccessor) Search(filters *dbs.Filters, search string) ([]utils.ShallowDBObject, int, error) { + objs := []utils.ShallowDBObject{} + if (filters == nil || len(filters.And) == 0 || len(filters.Or) == 0) && search != "" { + filters = &dbs.Filters{ + Or: map[string][]dbs.Filter{ + "abstractobject.name": {{Operator: dbs.LIKE.String(), Value: search}}, + }, + } + } + res_mongo, code, err := mongo.MONGOService.Search(filters, wfa.GetType()) + if err != nil { + wfa.Logger.Error().Msg("Could not store to db. Error: " + err.Error()) + return nil, code, err + } + var results []Discovery + if err = res_mongo.All(mongo.MngoCtx, &results); err != nil { + return nil, 404, err + } + for _, r := range results { + objs = append(objs, &r) + } + return objs, 200, nil +} diff --git a/models/models.go b/models/models.go index 38680a2..23b3e35 100644 --- a/models/models.go +++ b/models/models.go @@ -4,6 +4,7 @@ import ( "cloud.o-forge.io/core/oc-lib/logs" "cloud.o-forge.io/core/oc-lib/models/booking" + "cloud.o-forge.io/core/oc-lib/models/discovery" "cloud.o-forge.io/core/oc-lib/models/peer" "cloud.o-forge.io/core/oc-lib/models/resource_model" d "cloud.o-forge.io/core/oc-lib/models/resources/data" @@ -33,6 +34,7 @@ var models = map[string]func() utils.DBObject{ utils.SHARED_WORKSPACE.String(): func() utils.DBObject { return &shared_workspace.SharedWorkspace{} }, utils.RULE.String(): func() utils.DBObject { return &rule.Rule{} }, utils.BOOKING.String(): func() utils.DBObject { return &booking.Booking{} }, + utils.DISCOVERY.String(): func() utils.DBObject { return &discovery.Discovery{} }, } func Model(model int) utils.DBObject { diff --git a/models/utils/enums.go b/models/utils/enums.go index c0c537f..e5a1853 100644 --- a/models/utils/enums.go +++ b/models/utils/enums.go @@ -17,6 +17,7 @@ const ( SHARED_WORKSPACE RULE BOOKING + DISCOVERY ) var DefaultAPI = [...]string{ @@ -34,6 +35,7 @@ var DefaultAPI = [...]string{ "oc-shared", "oc-shared", "oc-datacenter", + "oc-peers", } var Str = [...]string{ @@ -51,6 +53,7 @@ var Str = [...]string{ "shared_workspace", "rule", "booking", + "discovery", } func FromInt(i int) string { diff --git a/tools/api.go b/tools/api.go index effb463..e1f2f68 100644 --- a/tools/api.go +++ b/tools/api.go @@ -34,8 +34,7 @@ func (s State) String() string { "some things boils in here, i'm probably a teapot", "dead"}[s] } -type API struct { -} +type API struct{} func (a *API) GetState() (State, int, error) { // Check if the database is up diff --git a/tools/nats_caller.go b/tools/nats_caller.go index be32e8d..b3d7204 100644 --- a/tools/nats_caller.go +++ b/tools/nats_caller.go @@ -2,8 +2,11 @@ package tools import ( "encoding/json" + "errors" "strings" + "sync" + "cloud.o-forge.io/core/oc-lib/logs" "github.com/nats-io/nats.go" ) @@ -12,6 +15,7 @@ type NATSMethod int const ( REMOVE NATSMethod = iota CREATE + DISCOVERY ) func NameToMethod(name string) NATSMethod { @@ -56,3 +60,45 @@ func (o *natsCaller) SetNATSPub(dataName string, method NATSMethod, data interfa } return "" } + +type NATSObjectInterface interface { + Serialize() map[string]interface{} +} + +func (o *natsCaller) DiscoveryNATS(name string, model NATSObjectInterface) error { + if GetConfig().NATSUrl == "" { + return errors.New("NATS_SERVER is not set") + } + nc, err := nats.Connect(GetConfig().NATSUrl) + if err != nil { + return errors.New("Could not connect to NATS") + } + defer nc.Close() + wg := &sync.WaitGroup{} + wg.Add(2) + go o.listenForChange(nc, model, DISCOVERY.GenerateKey("ask"), wg) + go o.listenForChange(nc, model, DISCOVERY.GenerateKey(name), wg) + wg.Wait() + return nil +} + +func (o *natsCaller) listenForChange(nc *nats.Conn, model NATSObjectInterface, chanName string, wg *sync.WaitGroup) { + defer wg.Done() + api := API{} + logger := logs.GetLogger() + ch := make(chan *nats.Msg, 64) + subs, err := nc.ChanSubscribe(chanName, ch) + if err != nil { + logger.Error().Msg("Error listening to NATS : " + err.Error()) + return + } + defer subs.Unsubscribe() + + for msg := range ch { + logger.Info().Msg("Received message from NATS : " + string(msg.Data)) + m := model.Serialize() + s, _, _ := api.GetState() + m["state"] = s + o.SetNATSPub("answer", DISCOVERY, m) + } +}