diff --git a/dalmdl/coremongo/coremongo.go b/dalmdl/coremongo/coremongo.go new file mode 100644 index 0000000000000000000000000000000000000000..ebd329a4d54477165109cede3174d2e54b57a756 --- /dev/null +++ b/dalmdl/coremongo/coremongo.go @@ -0,0 +1,752 @@ +package coremongo + +import ( + "context" + "encoding/json" + "strconv" + "strings" + "sync" + "time" + + "go.mongodb.org/mongo-driver/mongo/readpref" + + "go.mongodb.org/mongo-driver/bson/primitive" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + + "corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/statemdl" + + "github.com/tidwall/gjson" + + "corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/errormdl" + "corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/loggermdl" +) + +// MongoHost -MongoHost +type MongoHost struct { + HostName string `json:"hostName"` + Server string `json:"server"` + Port int `json:"port"` + Username string `json:"username"` + Password string `json:"password"` + Database string `json:"database"` + IsDefault bool `json:"isDefault"` + MaxIdleConns int `json:"maxIdleConns" ` + MaxOpenConns int `json:"maxOpenConns"` + ConnMaxLifetime time.Duration `json:"connMaxLifetime" ` + IsDisabled bool `json:"isDisabled" ` +} + +var instances map[string]*mongo.Client +var mutex sync.Mutex +var once sync.Once + +var config map[string]MongoHost + +var defaultHost string + +func init() { + config = make(map[string]MongoHost) +} + +// InitUsingJSON initializes Mongo Connections for give JSON data +func InitUsingJSON(configs []MongoHost) error { + var sessionError error + once.Do(func() { + defer mutex.Unlock() + mutex.Lock() + config = make(map[string]MongoHost) + instances = make(map[string]*mongo.Client) + for _, hostDetails := range configs { + if hostDetails.IsDisabled { + continue + } + clientOption := options.Client() + clientOption.SetHosts([]string{bindMongoServerWithPort(hostDetails.Server, hostDetails.Port)}). + SetConnectTimeout(hostDetails.ConnMaxLifetime). + SetMaxPoolSize(uint64(hostDetails.MaxOpenConns)). + SetReadPreference(readpref.Primary()). + SetDirect(true) // important if in cluster, connect to primary only. + if hostDetails.Username != "" { + cred := options.Credential{} + cred.Username = hostDetails.Username + cred.Password = hostDetails.Password + cred.AuthSource = hostDetails.Database + clientOption.SetAuth(cred) + } + client, err := mongo.NewClient(clientOption) + if err != nil { + sessionError = err + loggermdl.LogError(sessionError) + return + } + err = client.Connect(context.Background()) + if err != nil { + sessionError = err + loggermdl.LogError(sessionError) + return + } + err = client.Ping(context.Background(), readpref.Primary()) + if err != nil { + sessionError = err + loggermdl.LogError("failed to connect to primary - ", sessionError) + return + } + instances[hostDetails.HostName] = client + if hostDetails.IsDefault { + defaultHost = hostDetails.HostName + } + config[hostDetails.HostName] = hostDetails + } + }) + return sessionError +} + +// DeleteSession -DeleteSession +func DeleteSession(hostName string) error { + defer mutex.Unlock() + mutex.Lock() + if _, ok := instances[hostName]; !ok { + return errormdl.Wrap("NO_HOST_FOUND") + } + delete(instances, hostName) + return nil +} + +// InitNewSession - InitNewSession +func InitNewSession(hostDetails MongoHost) error { + defer mutex.Unlock() + mutex.Lock() + if instances == nil { + instances = make(map[string]*mongo.Client) + } + if _, ok := instances[hostDetails.HostName]; ok { + return errormdl.Wrap("DUPLICATE_HOSTNAME") + } + clientOption := options.Client() + clientOption.SetHosts([]string{bindMongoServerWithPort(hostDetails.Server, hostDetails.Port)}). + SetConnectTimeout(hostDetails.ConnMaxLifetime). + SetMaxPoolSize(uint64(hostDetails.MaxOpenConns)). + SetReadPreference(readpref.Primary()). + SetDirect(true) // important if in cluster, connect to primary only. + if hostDetails.Username != "" { + cred := options.Credential{} + cred.Username = hostDetails.Username + cred.Password = hostDetails.Password + cred.AuthSource = hostDetails.Database + clientOption.SetAuth(cred) + } + client, err := mongo.NewClient(clientOption) + if err != nil { + loggermdl.LogError(err) + return err + } + err = client.Connect(context.Background()) + if err != nil { + loggermdl.LogError(err) + return err + } + instances[hostDetails.HostName] = client + return nil +} + +//GetMongoConnection method +func GetMongoConnection(hostName string) (*mongo.Client, error) { + mutex.Lock() + defer mutex.Unlock() + if instances == nil { + return nil, errormdl.Wrap("MONGO_INIT_NOT_DONE") + } + if hostName == "" { + if instance, ok := instances[defaultHost]; ok { + statemdl.MongoHits() + err := instance.Ping(context.Background(), readpref.Primary()) + if err != nil { + loggermdl.LogError(err) + return nil, err + } + return instance, nil + } + } + if instance, ok := instances[hostName]; ok { + statemdl.MongoHits() + err := instance.Ping(context.Background(), readpref.Primary()) + if err != nil { + loggermdl.LogError(err) + return nil, err + } + return instance, nil + } + return nil, errormdl.Wrap("Session not found for instance: " + hostName) +} + +// MongoDAO mongo DAO struct +type MongoDAO struct { + hostName string + collectionName string +} + +// GetMongoDAOWithHost return mongo DAO instance +func GetMongoDAOWithHost(host, collection string) *MongoDAO { + return &MongoDAO{ + hostName: host, + collectionName: collection, + } +} + +// GetMongoDAO return mongo DAO instance +func GetMongoDAO(collection string) *MongoDAO { + return &MongoDAO{ + collectionName: collection, + } +} + +// SaveData Save data in mongo db +func (mg *MongoDAO) SaveData(data interface{}) (string, error) { + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return "", errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if errormdl.CheckBool(!ok) { + return "", errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + opts, insertError := collection.InsertOne(context.Background(), data) + if errormdl.CheckErr1(insertError) != nil { + return "", errormdl.CheckErr1(insertError) + } + return opts.InsertedID.(primitive.ObjectID).Hex(), nil +} + +// UpdateAll update all +func (mg *MongoDAO) UpdateAll(selector map[string]interface{}, data interface{}) error { + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if !ok { + return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + + _, updateError := collection.UpdateMany(context.Background(), selector, bson.M{"$set": data}) + if errormdl.CheckErr1(updateError) != nil { + return errormdl.CheckErr1(updateError) + } + return nil +} + +// Update will update single entry +func (mg *MongoDAO) Update(selector map[string]interface{}, data interface{}) error { + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if !ok { + return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + _, updateError := collection.UpdateOne(context.Background(), selector, bson.M{"$set": data}) + if errormdl.CheckErr1(updateError) != nil { + return errormdl.CheckErr1(updateError) + } + return nil +} + +// GetData will return query for selector +func (mg *MongoDAO) GetData(selector map[string]interface{}) (*gjson.Result, error) { + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return nil, errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if !ok { + return nil, errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + + cur, err := collection.Find(context.Background(), selector) + if err != nil { + loggermdl.LogError(err) + return nil, err + } + defer cur.Close(context.Background()) + var results []interface{} + for cur.Next(context.Background()) { + var result bson.M + err := cur.Decode(&result) + if err != nil { + loggermdl.LogError(err) + return nil, err + } + results = append(results, result) + } + ba, marshalError := json.Marshal(results) + if errormdl.CheckErr2(marshalError) != nil { + return nil, errormdl.CheckErr2(marshalError) + } + rs := gjson.ParseBytes(ba) + return &rs, nil +} + +// DeleteData will delete data given for selector +func (mg *MongoDAO) DeleteData(selector map[string]interface{}) error { + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if !ok { + return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + _, deleteError := collection.DeleteOne(context.Background(), selector) + if errormdl.CheckErr1(deleteError) != nil { + return errormdl.CheckErr1(deleteError) + } + return deleteError +} + +// DeleteAll will delete all the matching data given for selector +func (mg *MongoDAO) DeleteAll(selector map[string]interface{}) error { + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if !ok { + return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + _, deleteError := collection.DeleteMany(context.Background(), selector) + if errormdl.CheckErr1(deleteError) != nil { + return errormdl.CheckErr1(deleteError) + } + return deleteError +} + +// GetProjectedData will return query for selector and projector +func (mg *MongoDAO) GetProjectedData(selector map[string]interface{}, projector map[string]interface{}) (*gjson.Result, error) { + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return nil, errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if !ok { + return nil, errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + ops := &options.FindOptions{} + ops.Projection = projector + cur, err := collection.Find(context.Background(), selector, ops) + if err != nil { + loggermdl.LogError(err) + return nil, err + } + defer cur.Close(context.Background()) + var results []interface{} + for cur.Next(context.Background()) { + var result bson.M + err := cur.Decode(&result) + if err != nil { + loggermdl.LogError(err) + return nil, err + } + results = append(results, result) + } + + ba, marshalError := json.Marshal(results) + if errormdl.CheckErr2(marshalError) != nil { + return nil, errormdl.CheckErr2(marshalError) + } + rs := gjson.ParseBytes(ba) + return &rs, nil +} + +// GetAggregateData - return result using aggregation query +func (mg *MongoDAO) GetAggregateData(selector interface{}) (*gjson.Result, error) { + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return nil, errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if !ok { + return nil, errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + cur, err := collection.Aggregate(context.Background(), selector) + if err != nil { + loggermdl.LogError(err) + return nil, err + } + defer cur.Close(context.Background()) + var results []interface{} + for cur.Next(context.Background()) { + var result bson.M + err := cur.Decode(&result) + if err != nil { + loggermdl.LogError(err) + return nil, err + } + results = append(results, result) + } + ba, marshalError := json.Marshal(results) + if errormdl.CheckErr2(marshalError) != nil { + return nil, errormdl.CheckErr2(marshalError) + } + rs := gjson.ParseBytes(ba) + return &rs, nil +} + +// Upsert will update single entry +func (mg *MongoDAO) Upsert(selector map[string]interface{}, data interface{}) error { + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if !ok { + return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + ops := options.UpdateOptions{} + ops.SetUpsert(true) + _, updateError := collection.UpdateOne(context.Background(), selector, bson.M{"$set": data}, &ops) + if errormdl.CheckErr1(updateError) != nil { + return errormdl.CheckErr1(updateError) + } + // loggermdl.LogDebug("=======UpsertedCount=====", upserted.UpsertedCount) + return nil +} + +// PushData - append in array +func (mg *MongoDAO) PushData(selector map[string]interface{}, data interface{}) error { + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if !ok { + return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + _, updateError := collection.UpdateMany(context.Background(), selector, bson.M{"$push": data}) + if errormdl.CheckErr1(updateError) != nil { + return errormdl.CheckErr1(updateError) + } + + return nil +} + +// CustomUpdate - CustomUpdate +func (mg *MongoDAO) CustomUpdate(selector map[string]interface{}, data interface{}) error { + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if !ok { + return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + _, updateError := collection.UpdateMany(context.Background(), selector, data) + if errormdl.CheckErr1(updateError) != nil { + return errormdl.CheckErr1(updateError) + } + return nil +} + +// CustomUpdateOne - CustomUpdateOne +func (mg *MongoDAO) CustomUpdateOne(selector map[string]interface{}, data interface{}) error { + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if !ok { + return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + _, updateError := collection.UpdateOne(context.Background(), selector, data) + if errormdl.CheckErr1(updateError) != nil { + return errormdl.CheckErr1(updateError) + } + return nil +} + +/************************* BULK Functionalities ******************************/ + +// BulkSaveData ata Save data in mongo db in bulk +func (mg *MongoDAO) BulkSaveData(data []interface{}) error { + if checkBulkInput(data) { + return nil + } + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if errormdl.CheckBool(!ok) { + return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + opts := &options.BulkWriteOptions{} + opts.SetOrdered(true) + var models []mongo.WriteModel + for i := 0; i < len(data); i++ { + model := mongo.NewInsertOneModel() + model.SetDocument(data[i]) + models = append(models, model) + } + _, insertError := collection.BulkWrite(context.Background(), models, opts) + if errormdl.CheckErr1(insertError) != nil { + loggermdl.LogError(insertError) + return errormdl.CheckErr1(insertError) + } + + return nil +} + +// BulkUpdateData update data in mongo db in bulk +func (mg *MongoDAO) BulkUpdateData(data []interface{}) error { + if checkBulkInput(data) { + return nil + } + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if errormdl.CheckBool(!ok) { + return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + opts := &options.BulkWriteOptions{} + opts.SetOrdered(true) + var models []mongo.WriteModel + for i := 0; i < len(data); i++ { + model := mongo.NewUpdateOneModel() + model.SetFilter(data[i]) + i++ + model.SetUpdate(data[i]) + models = append(models, model) + } + + _, insertError := collection.BulkWrite(context.Background(), models, opts) + if errormdl.CheckErr1(insertError) != nil { + loggermdl.LogError(insertError) + return errormdl.CheckErr1(insertError) + } + return nil +} + +// BulkDeleteData delete data in mongo db in bulk +func (mg *MongoDAO) BulkDeleteData(data []interface{}) error { + if checkBulkInput(data) { + return nil + } + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if errormdl.CheckBool(!ok) { + return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + opts := &options.BulkWriteOptions{} + opts.SetOrdered(true) + var models []mongo.WriteModel + for i := 0; i < len(data); i++ { + model := mongo.NewDeleteOneModel() + model.SetFilter(data[i]) + models = append(models, model) + } + _, insertError := collection.BulkWrite(context.Background(), models, opts) + if errormdl.CheckErr1(insertError) != nil { + loggermdl.LogError(insertError) + return errormdl.CheckErr1(insertError) + } + return nil +} + +// BulkUpsertData Upsert data in mongo db in bulk +func (mg *MongoDAO) BulkUpsertData(data []interface{}) error { + if checkBulkInput(data) { + return nil + } + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if errormdl.CheckBool(!ok) { + return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + opts := &options.BulkWriteOptions{} + opts.SetOrdered(true) + var models []mongo.WriteModel + for i := 0; i < len(data); i++ { + model := mongo.NewUpdateOneModel() + model.SetUpsert(true) + model.SetFilter(data[i]) + i++ + model.SetUpdate(data[i]) + models = append(models, model) + } + + _, insertError := collection.BulkWrite(context.Background(), models, opts) + if errormdl.CheckErr1(insertError) != nil { + loggermdl.LogError(insertError) + return errormdl.CheckErr1(insertError) + } + return nil +} + +func checkBulkInput(d []interface{}) bool { + return len(d) == 0 +} + +func bindMongoServerWithPort(server string, port int) string { + // if port is empty then used default port 27017 & bind to server ip + var serverURI string + if port <= 0 || strings.TrimSpace(strconv.Itoa(port)) == "" { + serverURI = server + ":27017" + } else { + serverURI = server + ":" + strconv.Itoa(port) + } + return serverURI +} + +/************************* Queue Functionalities ******************************/ + +// FindAndUpdateOne .. +func (mg *MongoDAO) FindAndUpdateOne() (interface{}, error) { + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return "", errormdl.CheckErr(sessionError) + } + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if !ok { + return "", errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + collection := session.Database(db.Database).Collection(mg.collectionName) + // 5) Create the search filter + filter := bson.M{"startTime": nil} + // 6) Create the update + update := bson.M{ + "$set": bson.M{"startTime": time.Now()}, + } + // 7) Create an instance of an options and set the desired options + opt := options.FindOneAndUpdateOptions{ + Sort: bson.M{"createdOn": 1}, + } + // 8) Find one result and update it + result := collection.FindOneAndUpdate(context.Background(), filter, update, &opt) + + doc := bson.M{} + decodeErr := result.Decode(&doc) + return doc, decodeErr +} + +// UpdateOneDequeue .. +func (mg *MongoDAO) UpdateOneDequeue(data string) error { + session, sessionError := GetMongoConnection(mg.hostName) + if errormdl.CheckErr(sessionError) != nil { + return errormdl.CheckErr(sessionError) + } + + if mg.hostName == "" { + mg.hostName = defaultHost + } + db, ok := config[mg.hostName] + if !ok { + return errormdl.Wrap("No_Configuration_Found_For_Host: " + mg.hostName) + } + filter := bson.M{"payload.filename": data} + // filter := bson.M{"testName": data} + // 6) Create the update + update := bson.M{ + "$set": bson.M{"endTime": time.Now()}, + } + collection := session.Database(db.Database).Collection(mg.collectionName) + result, updateError := collection.UpdateOne(context.Background(), filter, update) + loggermdl.LogDebug(result.ModifiedCount) + if errormdl.CheckErr1(updateError) != nil { + return errormdl.CheckErr1(updateError) + } + return nil +} diff --git a/dalmdl/coremongo/girdfs.go b/dalmdl/coremongo/girdfs.go new file mode 100644 index 0000000000000000000000000000000000000000..15abd4561347b2ce33cc26bac314a59355b5e44d --- /dev/null +++ b/dalmdl/coremongo/girdfs.go @@ -0,0 +1,135 @@ +package coremongo + +import ( + "bytes" + "context" + "errors" + "io" + "strings" + "time" + + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/gridfs" + "go.mongodb.org/mongo-driver/mongo/options" +) + +//SaveFileToGridFS - Saves file to gridfs +func SaveFileToGridFS(db *mongo.Database, bucketName, fileName string, source io.Reader) (string, string, error) { + + bucketName = strings.TrimSpace(bucketName) + fileName = strings.TrimSpace(fileName) + + //Validations + if db == nil { + return "", "", errors.New("db Required") + } else if bucketName == "" { + return "", "", errors.New("bucketName required") + } else if source == nil { + return "", "", errors.New("invalid source") + } + + //Set bucket config + bucketOptions := options.BucketOptions{} + bucketOptions.Name = &bucketName + + //Get bucket instance + dbBucket, bucketError := gridfs.NewBucket(db, &bucketOptions) + if bucketError != nil { + return "", "", bucketError + } + + //Upload incomming file to bucket + fileID, fileError := dbBucket.UploadFromStream(fileName, source) + if fileError != nil { + return "", "", fileError + } + + //Return generated fileId and file name + return fileID.String(), fileName, nil +} + +//GetFileFromGridFS - Gets file from gridfs +func GetFileFromGridFS(db *mongo.Database, bucketName, fileName string) ([]byte, error) { + + bucketName = strings.TrimSpace(bucketName) + fileName = strings.TrimSpace(fileName) + + //Validations + if db == nil { + return nil, errors.New("db Required") + } else if bucketName == "" { + return nil, errors.New("bucketName required") + } else if fileName == "" { + return nil, errors.New("fileName required'") + } + + //Set bucket config + bucketOptions := options.BucketOptions{} + bucketOptions.Name = &bucketName + + //Get bucket instance + dbBucket, bucketError := gridfs.NewBucket(db, &bucketOptions) + if bucketError != nil { + return nil, bucketError + } + + //Read file from DB + w := bytes.NewBuffer(make([]byte, 0)) + _, getFileError := dbBucket.DownloadToStreamByName(fileName, w) + if getFileError != nil { + return nil, getFileError + } + + fileBytes := w.Bytes() + + //Return bytes + return fileBytes, nil + +} + +//GetDBInstance - Gets database intance +func GetDBInstance(serverIPAddress, port, dbName string, timeOutInSeconds int) (*mongo.Database, error) { + + serverIPAddress = strings.TrimSpace(serverIPAddress) + dbName = strings.TrimSpace(dbName) + port = strings.TrimSpace(port) + + //Validations + if serverIPAddress == "" { + return nil, errors.New("serverIPAddress required") + } else if dbName == "" { + return nil, errors.New("dbName required") + } else if timeOutInSeconds <= 0 { + return nil, errors.New("valid timeOutInSeconds required") + } + + ipElements := strings.Split(serverIPAddress, ".") + if len(ipElements) != 4 { + return nil, errors.New("invalid serverIPAddress") + } + + if port == "" { + port = "27017" + } + + //Connection string + connectionString := "mongodb://" + serverIPAddress + ":" + port + client, connectionError := mongo.NewClient(options.Client().ApplyURI(connectionString)) + if connectionError != nil { + return nil, connectionError + } + + //Context with timeout + ctx, _ := context.WithTimeout(context.Background(), time.Duration(timeOutInSeconds)*time.Second) + contextError := client.Connect(ctx) + + if contextError != nil { + return nil, contextError + } + + //Create a db instance + db := client.Database(dbName) + + //Return db instance + return db, nil +}