Newer
Older
// Deprecated: As of corepkgv2 tag 1.2.15 onwards no longer maintained,simply use coremongo and in future it will be removed.
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/statemdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/errormdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/loggermdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/configmdl"
HostName string `json:"hostName"`
Server string `json:"server"`
Port int `json:"port"`
Username string `json:"username"`
Password string `json:"password"`
Database string `json:"database"`
IsDefault bool `json:"isDefault"`
MaxIdleConns int `json:"maxIdleConns" `
MaxOpenConns int `json:"maxOpenConns"`
ConnMaxLifetime time.Duration `json:"connMaxLifetime" `
IsDisabled bool `json:"isDisabled" `
// TomlConfig - TomlConfig
type TomlConfig struct {
MongoHosts map[string]MongoHost
}
var instances map[string]*mgo.Session
// func init() {
// instances = make(map[string]*mgo.Session)
// }
// Init initializes Mongo Connections for give toml file
instances = make(map[string]*mgo.Session)
_, err := configmdl.InitConfig(tomlFilepath, &config)
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
return
}
for hostName, hostDetails := range config.MongoHosts {
session, err := mgo.DialWithInfo(&mgo.DialInfo{
Addrs: []string{bindMongoServerWithPort(hostDetails.Server, hostDetails.Port)},
Username: hostDetails.Username,
Password: hostDetails.Password,
Database: hostDetails.Database,
})
if err != nil {
sessionError = err
loggermdl.LogError(sessionError)
return
}
instances[hostName] = session
})
return sessionError
}
// InitUsingJSON initializes Mongo Connections for give JSON data
if hostDetails.IsDisabled {
continue
}
Addrs: []string{bindMongoServerWithPort(hostDetails.Server, hostDetails.Port)},
Username: hostDetails.Username,
Password: hostDetails.Password,
Timeout: time.Second * 3,
Database: hostDetails.Database,
})
if err != nil {
sessionError = err
loggermdl.LogError(sessionError)
return
}
config.MongoHosts[hostDetails.HostName] = hostDetails
// DeleteSession -DeleteSession
func DeleteSession(hostName string) error {
defer mutex.Unlock()
mutex.Lock()
if _, ok := instances[hostName]; !ok {
return errormdl.Wrap("NO_HOST_FOUND")
}
delete(instances, hostName)
return nil
}
// InitNewSession - InitNewSession
func InitNewSession(hostDetails MongoHost) error {
defer mutex.Unlock()
mutex.Lock()
if instances == nil {
instances = make(map[string]*mgo.Session)
}
if _, ok := instances[hostDetails.HostName]; ok {
return errormdl.Wrap("DUPLICATE_HOSTNAME")
}
session, err := mgo.DialWithInfo(&mgo.DialInfo{
Addrs: []string{bindMongoServerWithPort(hostDetails.Server, hostDetails.Port)},
Password: hostDetails.Password,
Database: hostDetails.Database,
})
if err != nil {
loggermdl.LogError(err)
return err
}
instances[hostDetails.HostName] = session
return nil
}
func GetMongoConnection(hostName string) (*mgo.Session, error) {
if instances == nil {
return nil, errormdl.Wrap("MONGO_INIT_NOT_DONE")
}
if hostName == "" {
if instance, ok := instances[defaultHost]; ok {
if instance, ok := instances[hostName]; ok {
}
return nil, errormdl.Wrap("Session not found for instance: " + hostName)
}
// MongoDAO mongo DAO struct
type MongoDAO struct {
hostName string
collectionName string
}
// GetMongoDAOWithHost return mongo DAO instance
func GetMongoDAOWithHost(host, collection string) *MongoDAO {
return &MongoDAO{
hostName: host,
collectionName: collection,
}
}
// GetMongoDAO return mongo DAO instance
func GetMongoDAO(collection string) *MongoDAO {
return &MongoDAO{
collectionName: collection,
}
}
// SaveData Save data in mongo db
func (mg *MongoDAO) SaveData(data interface{}) error {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config.MongoHosts[mg.hostName]
if errormdl.CheckBool(!ok) {
return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.DB(db.Database).C(mg.collectionName)
insertError := collection.Insert(data)
if errormdl.CheckErr1(insertError) != nil {
return errormdl.CheckErr1(insertError)
}
return nil
}
// UpdateAll update all
func (mg *MongoDAO) UpdateAll(selector map[string]interface{}, data interface{}) error {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config.MongoHosts[mg.hostName]
if !ok {
return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.DB(db.Database).C(mg.collectionName)
_, updateError := collection.UpdateAll(selector, bson.M{"$set": data})
if errormdl.CheckErr1(updateError) != nil {
return errormdl.CheckErr1(updateError)
}
return nil
}
// Update will update single entry
func (mg *MongoDAO) Update(selector map[string]interface{}, data interface{}) error {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config.MongoHosts[mg.hostName]
if !ok {
return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.DB(db.Database).C(mg.collectionName)
updateError := collection.Update(selector, bson.M{"$set": data})
if errormdl.CheckErr1(updateError) != nil {
return errormdl.CheckErr1(updateError)
}
return nil
}
// GetData will return query for selector
func (mg *MongoDAO) GetData(selector map[string]interface{}) (*gjson.Result, error) {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return nil, errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config.MongoHosts[mg.hostName]
if !ok {
return nil, errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.DB(db.Database).C(mg.collectionName)
var result []interface{}
collection.Find(selector).All(&result)
ba, marshalError := json.Marshal(result)
if errormdl.CheckErr2(marshalError) != nil {
return nil, errormdl.CheckErr2(marshalError)
}
}
// DeleteData will delete data given for selector
func (mg *MongoDAO) DeleteData(selector map[string]interface{}) error {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config.MongoHosts[mg.hostName]
if !ok {
return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.DB(db.Database).C(mg.collectionName)
deleteError := collection.Remove(selector)
if errormdl.CheckErr1(deleteError) != nil {
return errormdl.CheckErr1(deleteError)
}
return deleteError
}
// DeleteAll will delete all the matching data given for selector
func (mg *MongoDAO) DeleteAll(selector map[string]interface{}) error {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config.MongoHosts[mg.hostName]
if !ok {
return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.DB(db.Database).C(mg.collectionName)
_, deleteError := collection.RemoveAll(selector)
if errormdl.CheckErr1(deleteError) != nil {
return errormdl.CheckErr1(deleteError)
}
return deleteError
// GetProjectedData will return query for selector and projector
func (mg *MongoDAO) GetProjectedData(selector map[string]interface{}, projector map[string]interface{}) (*gjson.Result, error) {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return nil, errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config.MongoHosts[mg.hostName]
if !ok {
return nil, errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.DB(db.Database).C(mg.collectionName)
var result []interface{}
collection.Find(selector).Select(projector).All(&result)
ba, marshalError := json.Marshal(result)
if errormdl.CheckErr2(marshalError) != nil {
return nil, errormdl.CheckErr2(marshalError)
}
rs := gjson.ParseBytes(ba)
return &rs, nil
}
// GetAggregateData - return result using aggregation query
func (mg *MongoDAO) GetAggregateData(selector interface{}) (*gjson.Result, error) {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return nil, errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config.MongoHosts[mg.hostName]
if !ok {
return nil, errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.DB(db.Database).C(mg.collectionName)
var result []bson.M
collection.Pipe(selector).All(&result)
ba, marshalError := json.Marshal(result)
if errormdl.CheckErr2(marshalError) != nil {
return nil, errormdl.CheckErr2(marshalError)
}
rs := gjson.ParseBytes(ba)
return &rs, nil
}
// Upsert will update single entry
func (mg *MongoDAO) Upsert(selector map[string]interface{}, data interface{}) error {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config.MongoHosts[mg.hostName]
if !ok {
return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.DB(db.Database).C(mg.collectionName)
_, updateError := collection.Upsert(selector, bson.M{"$set": data})
if errormdl.CheckErr1(updateError) != nil {
return errormdl.CheckErr1(updateError)
}
return nil
}
// PushData - append in array
func (mg *MongoDAO) PushData(selector map[string]interface{}, data interface{}) error {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config.MongoHosts[mg.hostName]
if !ok {
return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.DB(db.Database).C(mg.collectionName)
_, updateError := collection.UpdateAll(selector, bson.M{"$push": data})
if errormdl.CheckErr1(updateError) != nil {
return errormdl.CheckErr1(updateError)
}
return nil
}
// CustomUpdate - CustomUpdate
func (mg *MongoDAO) CustomUpdate(selector map[string]interface{}, data interface{}) error {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config.MongoHosts[mg.hostName]
if !ok {
return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.DB(db.Database).C(mg.collectionName)
_, updateError := collection.UpdateAll(selector, data)
if errormdl.CheckErr1(updateError) != nil {
return errormdl.CheckErr1(updateError)
}
return nil
}
// CustomUpdateOne - CustomUpdateOne
func (mg *MongoDAO) CustomUpdateOne(selector map[string]interface{}, data interface{}) error {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config.MongoHosts[mg.hostName]
if !ok {
return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.DB(db.Database).C(mg.collectionName)
updateError := collection.Update(selector, data)
if errormdl.CheckErr1(updateError) != nil {
return errormdl.CheckErr1(updateError)
}
return nil
}
/************************* BULK Functionalities ******************************/
// BulkSaveData ata Save data in mongo db in bulk
func (mg *MongoDAO) BulkSaveData(data []interface{}) error {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config.MongoHosts[mg.hostName]
if errormdl.CheckBool(!ok) {
return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.DB(db.Database).C(mg.collectionName)
b := collection.Bulk()
b.Insert(data...)
_, insertError := b.Run()
if errormdl.CheckErr1(insertError) != nil {
loggermdl.LogError(insertError)
return errormdl.CheckErr1(insertError)
}
return nil
}
// BulkUpdateData update data in mongo db in bulk
func (mg *MongoDAO) BulkUpdateData(data []interface{}) error {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config.MongoHosts[mg.hostName]
if errormdl.CheckBool(!ok) {
return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.DB(db.Database).C(mg.collectionName)
b := collection.Bulk()
_, insertError := b.Run()
if errormdl.CheckErr1(insertError) != nil {
return errormdl.CheckErr1(insertError)
}
return nil
}
// BulkDeleteData delete data in mongo db in bulk
func (mg *MongoDAO) BulkDeleteData(data []interface{}) error {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config.MongoHosts[mg.hostName]
if errormdl.CheckBool(!ok) {
return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.DB(db.Database).C(mg.collectionName)
b := collection.Bulk()
_, insertError := b.Run()
if errormdl.CheckErr1(insertError) != nil {
return errormdl.CheckErr1(insertError)
}
return nil
}
// BulkUpsertData Upsert data in mongo db in bulk
func (mg *MongoDAO) BulkUpsertData(data []interface{}) error {
session, sessionError := GetMongoConnection(mg.hostName)
if errormdl.CheckErr(sessionError) != nil {
return errormdl.CheckErr(sessionError)
}
if mg.hostName == "" {
mg.hostName = defaultHost
}
db, ok := config.MongoHosts[mg.hostName]
if errormdl.CheckBool(!ok) {
return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName)
}
collection := session.DB(db.Database).C(mg.collectionName)
b := collection.Bulk()
b.Upsert(data...)
_, insertError := b.Run()
if errormdl.CheckErr1(insertError) != nil {
return errormdl.CheckErr1(insertError)
}
return nil
}
func bindMongoServerWithPort(server string, port int) string {
// if port is empty then used default port 27017 & bind to server ip
var serverURI string
if port <= 0 || strings.TrimSpace(strconv.Itoa(port)) == "" {
serverURI = server + ":27017"
serverURI = server + ":" + strconv.Itoa(port)