package mongo import ( "context" "errors" "time" "cloud.o-forge.io/core/oc-lib/dbs" "cloud.o-forge.io/core/oc-lib/logs" "github.com/rs/zerolog" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) var ( mngoClient *mongo.Client mngoDB *mongo.Database MngoCtx context.Context cancel context.CancelFunc isConnected bool existingCollections []string mngoCollections []string mngoConfig MongoConf ResourceMap map[string]interface{} ) var MONGOService = MongoDB{} type MongoConf interface { GetUrl() string GetDatabase() string } type MongoDB struct { Logger zerolog.Logger } func (m *MongoDB) Init(collections []string, config MongoConf) { // var baseConfig string isConnected = false m.Logger = logs.GetLogger() ResourceMap = make(map[string]interface{}) m.Logger.Info().Msg("Connecting to" + config.GetUrl()) mngoCollections = collections mngoConfig = config if err := m.createClient(config.GetUrl()); err != nil { m.Logger.Error().Msg(err.Error()) } } func (m *MongoDB) createClient(MongoURL string) error { var err error // Allows us to use marshal and unmarshall with results of FindOne() and others bsonOpts := &options.BSONOptions{ UseJSONStructTags: true, NilSliceAsEmpty: true, } MngoCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() clientOptions := options.Client().ApplyURI(MongoURL).SetBSONOptions(bsonOpts) // Ping the primary if mngoClient, err = mongo.Connect(MngoCtx, clientOptions); err != nil || mngoClient == nil { mngoClient = nil isConnected = false return errors.New("Mongodb connect " + MongoURL + ":" + err.Error()) } MngoCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err = mngoClient.Ping(MngoCtx, nil); err != nil { mngoClient = nil isConnected = false return errors.New("Mongodb ping " + MongoURL + ":" + err.Error()) } if !isConnected && mngoClient != nil { m.Logger.Info().Msg("Connecting mongo client to db " + mngoConfig.GetDatabase()) m.prepareDB(mngoCollections, mngoConfig) m.Logger.Info().Msg("Database is READY") } return nil } func (m *MongoDB) prepareDB(list_collection []string, config MongoConf) { var err error mngoDB = mngoClient.Database(config.GetDatabase()) MngoCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() existingCollections, err = mngoDB.ListCollectionNames(MngoCtx, bson.D{}) if err != nil { m.Logger.Fatal().Msg("Error contacting MongoDB\n" + err.Error()) } collectionMap := make(map[string]bool) for _, name := range existingCollections { collectionMap[name] = true } // Only do the collection definition process if it doesn't already exists // we add the collection to the collection map from mongo/mongo_utils to provide faster access to the collection for _, collection_name := range list_collection { new_collection := mngoDB.Collection(collection_name) if _, exists := collectionMap[collection_name]; !exists { m.createCollection(collection_name, new_collection) } else { CollectionMap[collection_name] = new_collection } } isConnected = true } // Creates the collection with index specified in mongo/mongo_collections // or use the basic collection creation function func (m *MongoDB) createCollection(collection_name string, new_collection *mongo.Collection) { MngoCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() var err error CollectionMap[collection_name] = new_collection _, exists := IndexesMap[collection_name] if exists { if _, err = new_collection.Indexes().CreateMany(MngoCtx, IndexesMap[collection_name]); err != nil { var cmdErr mongo.CommandError if errors.As(err, &cmdErr) && cmdErr.Code != 85 { m.Logger.Fatal().Msg("Error creating indexes for " + collection_name + " collection : \n" + err.Error()) panic(err) } else if !errors.As(err, &cmdErr) { m.Logger.Fatal().Msg("Unexpected error: " + err.Error()) panic(err) } } } else { mngoDB.CreateCollection(MngoCtx, collection_name) } } func (m *MongoDB) DeleteOne(id string, collection_name string) (int64, int, error) { if err := m.createClient(mngoConfig.GetUrl()); err != nil { return 0, 503, err } filter := bson.M{"_id": id} targetDBCollection := CollectionMap[collection_name] opts := options.Delete().SetHint(bson.D{{Key: "_id", Value: 1}}) MngoCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() result, err := targetDBCollection.DeleteOne(MngoCtx, filter, opts) if err != nil { m.Logger.Error().Msg("Couldn't insert resource: " + err.Error()) return 0, 404, err } return result.DeletedCount, 200, nil } func (m *MongoDB) DeleteMultiple(f map[string]interface{}, collection_name string) (int64, int, error) { if err := m.createClient(mngoConfig.GetUrl()); err != nil { return 0, 503, err } filter := bson.D{} for k, v := range f { filter = append(filter, bson.E{Key: k, Value: v}) } targetDBCollection := CollectionMap[collection_name] opts := options.Delete().SetHint(bson.D{{Key: "_id", Value: 1}}) MngoCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() result, err := targetDBCollection.DeleteMany(MngoCtx, filter, opts) if err != nil { m.Logger.Error().Msg("Couldn't insert resource: " + err.Error()) return 0, 404, err } return result.DeletedCount, 200, nil } func (m *MongoDB) UpdateMultiple(set interface{}, filter map[string]interface{}, collection_name string) (int64, int, error) { if err := m.createClient(mngoConfig.GetUrl()); err != nil { return 0, 503, err } var doc map[string]interface{} b, _ := bson.Marshal(set) bson.Unmarshal(b, &doc) f := bson.D{} for k, v := range filter { f = append(f, bson.E{Key: k, Value: v}) } targetDBCollection := CollectionMap[collection_name] MngoCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() res, err := targetDBCollection.UpdateMany(MngoCtx, f, dbs.InputToBson(doc, true)) if err != nil { m.Logger.Error().Msg("Couldn't update resource: " + err.Error()) return 0, 404, err } return res.UpsertedCount, 200, nil } func (m *MongoDB) UpdateOne(set interface{}, id string, collection_name string) (string, int, error) { if err := m.createClient(mngoConfig.GetUrl()); err != nil { return "", 503, err } var doc map[string]interface{} b, _ := bson.Marshal(set) bson.Unmarshal(b, &doc) filter := bson.M{"_id": id} targetDBCollection := CollectionMap[collection_name] MngoCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _, err := targetDBCollection.UpdateOne(MngoCtx, filter, dbs.InputToBson(doc, true)) if err != nil { m.Logger.Error().Msg("Couldn't update resource: " + err.Error()) return "", 404, err } return id, 200, nil } func (m *MongoDB) StoreOne(obj interface{}, id string, collection_name string) (string, int, error) { if err := m.createClient(mngoConfig.GetUrl()); err != nil { return "", 503, err } var doc map[string]interface{} b, _ := bson.Marshal(obj) bson.Unmarshal(b, &doc) doc["_id"] = id targetDBCollection := CollectionMap[collection_name] MngoCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() _, err := targetDBCollection.InsertOne(MngoCtx, doc) if err != nil { m.Logger.Error().Msg("Couldn't insert resource: " + err.Error()) return "", 409, err } return id, 200, nil } func (m *MongoDB) LoadOne(id string, collection_name string) (*mongo.SingleResult, int, error) { if err := m.createClient(mngoConfig.GetUrl()); err != nil { return nil, 503, err } filter := bson.M{"_id": id} targetDBCollection := CollectionMap[collection_name] MngoCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() res := targetDBCollection.FindOne(MngoCtx, filter) if res.Err() != nil { m.Logger.Error().Msg("Couldn't find resource " + id + ". Error : " + res.Err().Error()) err := res.Err() return nil, 404, err } return res, 200, nil } func (m *MongoDB) Search(filters *dbs.Filters, collection_name string) (*mongo.Cursor, int, error) { if err := m.createClient(mngoConfig.GetUrl()); err != nil { return nil, 503, err } opts := options.Find() opts.SetLimit(100) targetDBCollection := CollectionMap[collection_name] orList := bson.A{} andList := bson.A{} f := bson.D{} if filters != nil { for k, filter := range filters.Or { for _, ff := range filter { orList = append(orList, dbs.StringToOperator(ff.Operator).ToMongoOperator(k, ff.Value)) } } for k, filter := range filters.And { for _, ff := range filter { andList = append(andList, dbs.StringToOperator(ff.Operator).ToMongoOperator(k, ff.Value)) } } if len(orList) > 0 && len(andList) == 0 { f = bson.D{{"$or", orList}} } else { if len(orList) > 0 { andList = append(andList, bson.M{"$or": orList}) } f = bson.D{{"$and", andList}} } } MngoCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if cursor, err := targetDBCollection.Find( MngoCtx, f, opts, ); err != nil { return nil, 404, err } else { return cursor, 200, nil } } func (m *MongoDB) LoadFilter(filter map[string]interface{}, collection_name string) (*mongo.Cursor, int, error) { if err := m.createClient(mngoConfig.GetUrl()); err != nil { return nil, 503, err } f := bson.D{} for k, v := range filter { f = append(f, bson.E{Key: k, Value: v}) } targetDBCollection := CollectionMap[collection_name] MngoCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() res, err := targetDBCollection.Find(MngoCtx, f) if err != nil { m.Logger.Error().Msg("Couldn't find any resources. Error : " + err.Error()) return nil, 404, err } return res, 200, nil } func (m *MongoDB) LoadAll(collection_name string) (*mongo.Cursor, int, error) { if err := m.createClient(mngoConfig.GetUrl()); err != nil { return nil, 503, err } targetDBCollection := CollectionMap[collection_name] MngoCtx, cancel = context.WithTimeout(context.Background(), 10*time.Second) defer cancel() res, err := targetDBCollection.Find(MngoCtx, bson.D{}) if err != nil { m.Logger.Error().Msg("Couldn't find any resources. Error : " + err.Error()) return nil, 404, err } return res, 200, nil }