Commit 3d8fbe87 authored by Vijay Kumar Chauhan's avatar Vijay Kumar Chauhan
Browse files

Merge branch 'SG_AggregateOptions' into 'devbranch'

#26 Added aggregate options to find operation and update, insert many operations.

See merge request !287
parents 330b5809 8b7e3ca9
2 merge requests!288Release v1.2.16,!287#26 Added aggregate options to find operation and update, insert many operations.
Showing with 119 additions and 4 deletions
...@@ -189,8 +189,10 @@ type MongoDAO struct { ...@@ -189,8 +189,10 @@ type MongoDAO struct {
MetaData MetaData MetaData MetaData
} }
type MetaData struct { type MetaData struct {
Findops map[string]interface{} Findops map[string]interface{}
UpdateOps map[string]interface{} UpdateOps map[string]interface{}
AggregateOptions map[string]interface{}
InsertManyOptions map[string]interface{}
} }
// GetMongoDAOWithHost return mongo DAO instance // GetMongoDAOWithHost return mongo DAO instance
...@@ -230,6 +232,42 @@ func (mg *MongoDAO) SaveData(data interface{}) (string, error) { ...@@ -230,6 +232,42 @@ func (mg *MongoDAO) SaveData(data interface{}) (string, error) {
return getInsertedId(opts.InsertedID), nil return getInsertedId(opts.InsertedID), nil
} }
// Insert many data in mongo db
func (mg *MongoDAO) SaveBulkData(data []interface{}) (string, error) {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return "", errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config[mg.hostName]
if errormdl.CheckBool(!ok) {
return "", errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.Database(db.Database).Collection(mg.collectionName)
insertManyOptions := options.InsertMany()
if mg.MetaData.InsertManyOptions != nil {
marshaledQuery, err := json.Marshal(mg.MetaData.InsertManyOptions)
if err != nil {
return "", errormdl.Wrap("cannot marshal:query option")
}
obj := gjson.ParseBytes(marshaledQuery)
if obj.Get("ordered").Value() != nil {
insertManyOptions.SetOrdered(obj.Get("ordered").Bool())
}
if obj.Get("bypassDocumentValidation").Value() != nil {
insertManyOptions.SetBypassDocumentValidation(obj.Get("bypassDocumentValidation").Bool())
}
}
opts, insertManyError := collection.InsertMany(context.Background(), data, insertManyOptions)
if errormdl.CheckErr1(insertManyError) != nil {
return "", errormdl.CheckErr1(insertManyError)
}
return getInsertedId(opts.InsertedIDs), nil
}
// UpdateAll update all // UpdateAll update all
func (mg *MongoDAO) UpdateAll(selector map[string]interface{}, data interface{}) error { func (mg *MongoDAO) UpdateAll(selector map[string]interface{}, data interface{}) error {
session, sessionError := GetMongoConnection(mg.hostName) session, sessionError := GetMongoConnection(mg.hostName)
...@@ -364,6 +402,16 @@ func (mg *MongoDAO) SetFindOps(findOps map[string]interface{}) { ...@@ -364,6 +402,16 @@ func (mg *MongoDAO) SetFindOps(findOps map[string]interface{}) {
mg.MetaData.Findops = findOps mg.MetaData.Findops = findOps
} }
// Set Aggregate Options
func (mg *MongoDAO) SetAggregateOptions(aggregateOpts map[string]interface{}) {
mg.MetaData.AggregateOptions = aggregateOpts
}
// Set InsertMany Options
func (mg *MongoDAO) SetInsertManyOptions(insertManyOptions map[string]interface{}) {
mg.MetaData.InsertManyOptions = insertManyOptions
}
// GetProjectedData will return query for selector and projector // GetProjectedData will return query for selector and projector
func (mg *MongoDAO) GetProjectedData(selector map[string]interface{}, projector map[string]interface{}) (*gjson.Result, error) { func (mg *MongoDAO) GetProjectedData(selector map[string]interface{}, projector map[string]interface{}) (*gjson.Result, error) {
session, sessionError := GetMongoConnection(mg.hostName) session, sessionError := GetMongoConnection(mg.hostName)
...@@ -399,7 +447,7 @@ func (mg *MongoDAO) GetProjectedData(selector map[string]interface{}, projector ...@@ -399,7 +447,7 @@ func (mg *MongoDAO) GetProjectedData(selector map[string]interface{}, projector
m := value.Map() m := value.Map()
// fmt.Println(m) // fmt.Println(m)
for k, v := range m { for k, v := range m {
sort = append(sort, bson.E{k, v.Value()}) sort = append(sort, bson.E{Key: k, Value: v.Value()})
} }
return true return true
}) })
...@@ -447,7 +495,34 @@ func (mg *MongoDAO) GetAggregateData(selector interface{}) (*gjson.Result, error ...@@ -447,7 +495,34 @@ func (mg *MongoDAO) GetAggregateData(selector interface{}) (*gjson.Result, error
return nil, errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) return nil, errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
} }
collection := session.Database(db.Database).Collection(mg.collectionName) collection := session.Database(db.Database).Collection(mg.collectionName)
cur, err := collection.Aggregate(context.Background(), selector) aggregateOpts := &options.AggregateOptions{}
if mg.MetaData.AggregateOptions != nil {
marshaledQuery, err := json.Marshal(mg.MetaData.AggregateOptions)
if err != nil {
return nil, errormdl.Wrap("cannot marshal:query option")
}
obj := gjson.ParseBytes(marshaledQuery)
loggermdl.LogError("Aggregate Options", obj)
if obj.Get("AllowDiskUse").Value() != nil {
aggregateOpts.SetAllowDiskUse(obj.Get("AllowDiskUse").Bool())
}
if obj.Get("BypassDocumentValidation").Value() != nil {
aggregateOpts.SetBypassDocumentValidation(obj.Get("BypassDocumentValidation").Bool())
}
if obj.Get("Custom").Value() != nil {
custom := obj.Get("Custom").Value().(map[string]interface{})
aggregateOpts.SetCustom(primitive.M(custom))
}
if obj.Get("MaxTime").Value() != nil {
maxTime := time.Duration(obj.Get("MaxTime").Int())
aggregateOpts.SetMaxTime(maxTime)
}
if obj.Get("MaxAwaitTime").Value() != nil {
maxAwaitTime := time.Duration(obj.Get("MaxAwaitTime").Int())
aggregateOpts.SetMaxAwaitTime(maxAwaitTime)
}
}
cur, err := collection.Aggregate(context.Background(), selector, aggregateOpts)
if err != nil { if err != nil {
loggermdl.LogError(err) loggermdl.LogError(err)
return nil, err return nil, err
...@@ -655,6 +730,7 @@ func (mg *MongoDAO) BulkSaveData(data []interface{}) error { ...@@ -655,6 +730,7 @@ func (mg *MongoDAO) BulkSaveData(data []interface{}) error {
} }
// BulkUpdateData update data in mongo db in bulk // BulkUpdateData update data in mongo db in bulk
// Updating only first document from resultset
func (mg *MongoDAO) BulkUpdateData(data []interface{}) error { func (mg *MongoDAO) BulkUpdateData(data []interface{}) error {
if checkBulkInput(data) { if checkBulkInput(data) {
return nil return nil
...@@ -691,7 +767,46 @@ func (mg *MongoDAO) BulkUpdateData(data []interface{}) error { ...@@ -691,7 +767,46 @@ func (mg *MongoDAO) BulkUpdateData(data []interface{}) error {
return nil return nil
} }
// BulkUpdateData update data in mongo db in bulk
// Updating All document from resultset
func (mg *MongoDAO) BulkUpdate(data []interface{}) error {
if checkBulkInput(data) {
return nil
}
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config[mg.hostName]
if errormdl.CheckBool(!ok) {
return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.Database(db.Database).Collection(mg.collectionName)
opts := &options.BulkWriteOptions{}
opts.SetOrdered(true)
var models []mongo.WriteModel
for i := 0; i < len(data); i++ {
model := mongo.NewUpdateManyModel()
model.SetFilter(data[i])
i++
model.SetUpdate(bson.M{"$set": data[i]})
models = append(models, model)
}
_, insertError := collection.BulkWrite(context.Background(), models, opts)
if errormdl.CheckErr1(insertError) != nil {
loggermdl.LogError(insertError)
return errormdl.CheckErr1(insertError)
}
return nil
}
// BulkDeleteData delete data in mongo db in bulk // BulkDeleteData delete data in mongo db in bulk
// Deleting only first document from resultset
func (mg *MongoDAO) BulkDeleteData(data []interface{}) error { func (mg *MongoDAO) BulkDeleteData(data []interface{}) error {
if checkBulkInput(data) { if checkBulkInput(data) {
return nil return nil
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment