From 39e29b8e8c524edc4a012ecc8b6639054f55017d Mon Sep 17 00:00:00 2001
From: somnath ghorpade <somnathg@mkcl.org>
Date: Wed, 11 Sep 2024 09:58:31 +0530
Subject: [PATCH] Added mongo new functions and test cases.

---
 dalmdl/coremongo/coremongo.go      | 226 ++++++++++++++++++++++++++++-
 dalmdl/coremongo/coremongo_test.go |  82 ++++++++++-
 2 files changed, 304 insertions(+), 4 deletions(-)

diff --git a/dalmdl/coremongo/coremongo.go b/dalmdl/coremongo/coremongo.go
index 20817f2..eddf29d 100644
--- a/dalmdl/coremongo/coremongo.go
+++ b/dalmdl/coremongo/coremongo.go
@@ -152,7 +152,7 @@ func InitNewSession(hostDetails MongoHost) error {
 	return nil
 }
 
-//GetMongoConnection method
+// GetMongoConnection method
 func GetMongoConnection(hostName string) (*mongo.Client, error) {
 	mutex.Lock()
 	defer mutex.Unlock()
@@ -397,7 +397,7 @@ func (mg *MongoDAO) DeleteAll(selector map[string]interface{}) error {
 	return deleteError
 }
 
-//Set Find Options
+// Set Find Options
 func (mg *MongoDAO) SetFindOps(findOps map[string]interface{}) {
 	mg.MetaData.Findops = findOps
 }
@@ -626,7 +626,7 @@ func (mg *MongoDAO) PushData(selector map[string]interface{}, data interface{})
 	return nil
 }
 
-//set update option for custom update
+// set update option for custom update
 func (mg *MongoDAO) SetUpdateOps(updateOps map[string]interface{}) {
 	mg.MetaData.UpdateOps = updateOps
 }
@@ -878,6 +878,226 @@ func (mg *MongoDAO) BulkUpsertData(data []interface{}) error {
 	return nil
 }
 
+// Count Documents
+func (mg *MongoDAO) Count(selector map[string]interface{}) (int64, error) {
+	session, sessionError := GetMongoConnection(mg.hostName)
+	if errormdl.CheckErr(sessionError) != nil {
+		return 0, errormdl.CheckErr(sessionError)
+	}
+
+	if mg.hostName == "" {
+		mg.hostName = defaultHost
+	}
+	db, ok := config[mg.hostName]
+	if !ok {
+		return 0, errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
+	}
+	collection := session.Database(db.Database).Collection(mg.collectionName)
+	count, err := collection.CountDocuments(context.Background(), selector)
+	if err != nil {
+		loggermdl.LogError(err)
+		return 0, err
+	}
+	return count, nil
+}
+
+// GetDistinctValues - Get distinct values
+func (mg *MongoDAO) GetDistinctValues(selector map[string]interface{}, fieldName string) ([]interface{}, error) {
+	session, sessionError := GetMongoConnection(mg.hostName)
+	if errormdl.CheckErr(sessionError) != nil {
+		return nil, errormdl.CheckErr(sessionError)
+	}
+
+	if mg.hostName == "" {
+		mg.hostName = defaultHost
+	}
+	db, ok := config[mg.hostName]
+	if !ok {
+		return nil, errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
+	}
+	collection := session.Database(db.Database).Collection(mg.collectionName)
+	distinctValues, err := collection.Distinct(context.Background(), fieldName, selector)
+	if err != nil {
+		loggermdl.LogError(err)
+		return nil, err
+	}
+	return distinctValues, nil
+}
+
+// FindOneAndUpdate
+func (mg *MongoDAO) FindOneAndUpdate(selector map[string]interface{}, data interface{}) (*gjson.Result, error) {
+	session, sessionError := GetMongoConnection(mg.hostName)
+	if errormdl.CheckErr(sessionError) != nil {
+		return nil, errormdl.CheckErr(sessionError)
+	}
+
+	if mg.hostName == "" {
+		mg.hostName = defaultHost
+	}
+	db, ok := config[mg.hostName]
+	if !ok {
+		return nil, errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
+	}
+	collection := session.Database(db.Database).Collection(mg.collectionName)
+	opts := options.FindOneAndUpdate()
+	opts.SetReturnDocument(options.After)
+	result := collection.FindOneAndUpdate(context.Background(), selector, bson.M{"$set": data}, opts)
+	if result.Err() != nil {
+		return nil, result.Err()
+	}
+	var resultData bson.M
+	err := result.Decode(&resultData)
+	if err != nil {
+		return nil, err
+	}
+	ba, marshalError := json.Marshal(resultData)
+	if errormdl.CheckErr2(marshalError) != nil {
+		return nil, errormdl.CheckErr2(marshalError)
+	}
+	rs := gjson.ParseBytes(ba)
+	return &rs, nil
+}
+
+// FindOneAndDelete
+func (mg *MongoDAO) FindOneAndDelete(selector map[string]interface{}) (*gjson.Result, error) {
+	session, sessionError := GetMongoConnection(mg.hostName)
+	if errormdl.CheckErr(sessionError) != nil {
+		return nil, errormdl.CheckErr(sessionError)
+	}
+
+	if mg.hostName == "" {
+		mg.hostName = defaultHost
+	}
+	db, ok := config[mg.hostName]
+	if !ok {
+		return nil, errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
+	}
+	collection := session.Database(db.Database).Collection(mg.collectionName)
+	result := collection.FindOneAndDelete(context.Background(), selector)
+	if result.Err() != nil {
+		return nil, result.Err()
+	}
+	var resultData bson.M
+	err := result.Decode(&resultData)
+	if err != nil {
+		return nil, err
+	}
+	ba, marshalError := json.Marshal(resultData)
+	if errormdl.CheckErr2(marshalError) != nil {
+		return nil, errormdl.CheckErr2(marshalError)
+	}
+	rs := gjson.ParseBytes(ba)
+	return &rs, nil
+}
+
+// UpdateByID
+func (mg *MongoDAO) UpdateByID(id string, data interface{}) error {
+	session, sessionError := GetMongoConnection(mg.hostName)
+	if errormdl.CheckErr(sessionError) != nil {
+		return errormdl.CheckErr(sessionError)
+	}
+
+	if mg.hostName == "" {
+		mg.hostName = defaultHost
+	}
+
+	db, ok := config[mg.hostName]
+	if !ok {
+		return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
+	}
+
+	collection := session.Database(db.Database).Collection(mg.collectionName)
+	objectID, err := primitive.ObjectIDFromHex(id)
+	if err != nil {
+		return errormdl.Wrap("Invalid_ObjectID")
+	}
+
+	selector := bson.M{"_id": objectID}
+	_, updateError := collection.UpdateByID(context.Background(), selector, bson.M{"$set": data})
+	if errormdl.CheckErr1(updateError) != nil {
+		return errormdl.CheckErr1(updateError)
+	}
+	return nil
+}
+
+// ReplaceOne
+func (mg *MongoDAO) ReplaceOne(selector map[string]interface{}, data interface{}) error {
+	session, sessionError := GetMongoConnection(mg.hostName)
+	if errormdl.CheckErr(sessionError) != nil {
+		return errormdl.CheckErr(sessionError)
+	}
+
+	if mg.hostName == "" {
+		mg.hostName = defaultHost
+	}
+	db, ok := config[mg.hostName]
+	if !ok {
+		return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
+	}
+	collection := session.Database(db.Database).Collection(mg.collectionName)
+	_, updateError := collection.ReplaceOne(context.Background(), selector, data)
+	if errormdl.CheckErr1(updateError) != nil {
+		return errormdl.CheckErr1(updateError)
+	}
+	return nil
+}
+
+// FindOneAndReplace
+func (mg *MongoDAO) FindOneAndReplace(selector map[string]interface{}, data interface{}) (*gjson.Result, error) {
+	session, sessionError := GetMongoConnection(mg.hostName)
+	if errormdl.CheckErr(sessionError) != nil {
+		return nil, errormdl.CheckErr(sessionError)
+	}
+
+	if mg.hostName == "" {
+		mg.hostName = defaultHost
+	}
+	db, ok := config[mg.hostName]
+	if !ok {
+		return nil, errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
+	}
+	collection := session.Database(db.Database).Collection(mg.collectionName)
+	result := collection.FindOneAndReplace(context.Background(), selector, data)
+	if result.Err() != nil {
+		return nil, result.Err()
+	}
+	var resultData bson.M
+	err := result.Decode(&resultData)
+	if err != nil {
+		return nil, err
+	}
+	ba, marshalError := json.Marshal(resultData)
+	if errormdl.CheckErr2(marshalError) != nil {
+		return nil, errormdl.CheckErr2(marshalError)
+	}
+	rs := gjson.ParseBytes(ba)
+	return &rs, nil
+}
+
+// WatchChangeStream
+func (mg *MongoDAO) WatchChangeStream(pipeline []interface{}) (*mongo.ChangeStream, error) {
+	session, sessionError := GetMongoConnection(mg.hostName)
+	if errormdl.CheckErr(sessionError) != nil {
+		return nil, errormdl.CheckErr(sessionError)
+	}
+	if mg.hostName == "" {
+		mg.hostName = defaultHost
+	}
+	db, ok := config[mg.hostName]
+	if !ok {
+		return nil, errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
+	}
+	collection := session.Database(db.Database).Collection(mg.collectionName)
+	opts := options.ChangeStream()
+	opts.SetFullDocument(options.UpdateLookup)
+	cs, err := collection.Watch(context.Background(), pipeline, opts)
+	if err != nil {
+		loggermdl.LogError(err)
+		return nil, err
+	}
+	return cs, nil
+}
+
 func checkBulkInput(d []interface{}) bool {
 	return len(d) == 0
 }
diff --git a/dalmdl/coremongo/coremongo_test.go b/dalmdl/coremongo/coremongo_test.go
index 6a4aa6d..92c0b93 100644
--- a/dalmdl/coremongo/coremongo_test.go
+++ b/dalmdl/coremongo/coremongo_test.go
@@ -16,7 +16,7 @@ func init() {
 			Port:         27017,
 			Username:     "",
 			Password:     "",
-			Database:     "",
+			Database:     "CoreStudio",
 			Server:       "localhost",
 			IsDefault:    true,
 			MaxIdleConns: 10,
@@ -353,3 +353,83 @@ func TestBulkUpsertData(t *testing.T) {
 	}
 	assert.NoError(t, nil, "This should not return error")
 }
+
+func TestGetDistinctValues(t *testing.T) {
+	MongoDao := GetMongoDAO("admin")
+	query := make(map[string]interface{})
+	query["loginId"] = "anandku@mkcl.org"
+	field := "name"
+	data, err := MongoDao.GetDistinctValues(query, field)
+	if err != nil {
+		loggermdl.LogError("err: ", err)
+	}
+	loggermdl.LogInfo("MongoDao: ", data)
+	assert.NoError(t, nil, "This should not return error")
+}
+
+func TestUpdateByID(t *testing.T) {
+	MongoDao := GetMongoDAO("admin")
+	data := make(map[string]interface{})
+	data["name"] = "name"
+	id := `642d0f840ad60fa14a976909`
+	err := MongoDao.UpdateByID(id, data)
+	if err != nil {
+		loggermdl.LogError("err: ", err)
+	}
+	assert.NoError(t, nil, "This should not return error")
+}
+
+func TestReplaceOne(t *testing.T) {
+	MongoDao := GetMongoDAO("admin")
+	data := make(map[string]interface{})
+	data["name"] = "ss"
+	query := make(map[string]interface{})
+	query["loginId"] = "ss@mkcl.org"
+	err := MongoDao.ReplaceOne(query, data)
+	if err != nil {
+		loggermdl.LogError("err: ", err)
+	}
+	assert.NoError(t, nil, "This should not return error")
+}
+
+func TestFindOneAndReplace(t *testing.T) {
+	MongoDao := GetMongoDAO("admin")
+	data := make(map[string]interface{})
+	data["name"] = "name"
+	query := make(map[string]interface{})
+	query["loginId"] = "anandku@mkcl.org"
+	res, err := MongoDao.FindOneAndReplace(query, data)
+	if err != nil {
+		loggermdl.LogError("err: ", err)
+	}
+	loggermdl.LogInfo("MongoDao: ", res)
+	assert.NoError(t, nil, "This should not return error")
+}
+
+func TestWatchChangeStream(t *testing.T) {
+
+	MongoDao := GetMongoDAO("admin")
+	var interfaceQuery []interface{}
+	query := bson.D{{Key: "$match", Value: bson.D{{Key: "operationType", Value: "insert"}}}}
+	for _, d := range query {
+		interfaceQuery = append(interfaceQuery, d)
+	}
+	data, err := MongoDao.WatchChangeStream(interfaceQuery)
+	if err != nil {
+		loggermdl.LogError("err: ", err)
+	}
+	loggermdl.LogInfo("MongoDao: ", data)
+	assert.NoError(t, nil, "This should not return error")
+}
+
+func TestFindOneAndDelete(t *testing.T) {
+	MongoDao := GetMongoDAO("admin")
+	query := make(map[string]interface{})
+	query["name"] = "name"
+	res, err := MongoDao.FindOneAndDelete(query)
+	if err != nil {
+		loggermdl.LogError("err: ", err)
+	}
+	loggermdl.LogInfo("MongoDao: ", res)
+	assert.NoError(t, nil, "This should not return error")
+}
-- 
GitLab