Commit 01f729d6 authored by Vivek Naik's avatar Vivek Naik
Browse files

defering release lock

parent 9536d7ad
2 merge requests!11915 Oct MEP Merge Dev to Stg,!118added filelock while performing data manipulation on fdb bucket
Showing with 56 additions and 58 deletions
......@@ -23,7 +23,6 @@ import (
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/loggermdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/utiliymdl/guidmdl"
"github.com/juju/fslock"
"github.com/tidwall/buntdb"
"github.com/tidwall/gjson"
)
......@@ -515,12 +514,12 @@ func GetDataByConcat(dbName, indexID string, query ...string) (*gjson.Result, er
}
func saveDataInNormalBucket(filePath string, rs *gjson.Result) error {
fileLock, err := acquireFileLock(filePath)
fileLock, err := filemdl.AcquireFileLock(filePath)
defer filemdl.ReleaseFileLock(fileLock)
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
return errormdl.CheckErr(err)
}
defer releaseFileLock(fileLock)
err = filemdl.WriteFile(filePath, []byte(rs.String()), true, false)
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
......@@ -610,7 +609,9 @@ func addDataToSpecialBucketFile(filePath string, bucket *Bucket, rs *gjson.Resul
if !ok {
return errormdl.Wrap("infileIndex for specified fileType not found")
}
fileLock, err := acquireFileLock(filePath)
fileLock, err := filemdl.AcquireFileLock(filePath)
defer filemdl.ReleaseFileLock(fileLock)
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
return errormdl.CheckErr(err)
......@@ -622,11 +623,6 @@ func addDataToSpecialBucketFile(filePath string, bucket *Bucket, rs *gjson.Resul
return errormdl.CheckErr(err)
}
// if file is being created for first time
defer func(filelock *fslock.Lock) {
f.Close()
releaseFileLock(fileLock)
}(fileLock)
fileStatus := fileStatusReady
// if isFilePresent {
fileStatus, err = getFileStatus(f)
......@@ -703,7 +699,7 @@ func addDataToSpecialBucketFile(filePath string, bucket *Bucket, rs *gjson.Resul
return err
}
f.Sync()
f.Close()
} else {
// retry after timeout
}
......@@ -762,12 +758,12 @@ func updateDataInNormalBucket(fdb *FDB, filePaths []string, rs *gjson.Result) (*
}
func updateFileData(filePath string, rs gjson.Result) (*gjson.Result, error) {
fileLock, err := acquireFileLock(filePath)
fileLock, err := filemdl.AcquireFileLock(filePath)
defer filemdl.ReleaseFileLock(fileLock)
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
return nil, errormdl.CheckErr(err)
}
defer releaseFileLock(fileLock)
data, err := filemdl.FastReadFile(filePath)
if err != nil {
return nil, err
......@@ -794,16 +790,8 @@ func updateDataInFileIndexBucket(fdb *FDB, bucket *Bucket, filePaths []string, r
dataUpdatedAtFilePathsCnt := 0
for _, path := range filePaths {
filePath := filepath.Join(fdb.DBPath, path)
lock, err := acquireFileLock(filePath)
if err != nil {
errList = append(errList, err)
continue
}
resultArray, err := updateDataWithInFileIndex(filePath, bucket, rs, infileIndexQuery)
ferr := releaseFileLock(lock)
if ferr != nil {
errList = append(errList, ferr)
}
if err != nil {
errList = append(errList, err)
continue
......@@ -842,6 +830,11 @@ func updateDataWithInFileIndex(filePath string, bucket *Bucket, rs *gjson.Result
if !filemdl.FileAvailabilityCheck(filePath) {
return nil, errormdl.Wrap("file not found: " + filePath)
}
lock, err := filemdl.AcquireFileLock(filePath)
defer filemdl.ReleaseFileLock(lock)
if err != nil {
return nil, errormdl.Wrap("fail to update data" + err.Error())
}
f, err := filemdl.OpenFile(filePath, os.O_RDWR|os.O_SYNC, 0777)
if err != nil {
return nil, errormdl.Wrap("fail to update data" + err.Error())
......@@ -1006,12 +999,12 @@ func updateIndexRow(indexRows gjson.Result, previousIndexRow gjson.Result, updat
}
func initializeWithHeader(filePath string) error {
fileLock, err := acquireFileLock(filePath)
fileLock, err := filemdl.AcquireFileLock(filePath)
defer filemdl.ReleaseFileLock(fileLock)
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
return errormdl.CheckErr(err)
}
defer releaseFileLock(fileLock)
// initailize file with headers and fdb index entry
f, err := filemdl.OpenFile(filePath, os.O_CREATE|os.O_RDWR|os.O_SYNC, 0777)
if errormdl.CheckErr(err) != nil {
......@@ -1310,6 +1303,12 @@ func deletDataFromInfileIndex(filePath string, fileType string, infileIndexQueri
// get update index records
// save updated index
// update index size header
lock, err := filemdl.AcquireFileLock(filePath)
defer filemdl.ReleaseFileLock(lock)
if err != nil {
return err
}
f, err := filemdl.OpenFile(filePath, os.O_RDWR|os.O_SYNC, 0777)
if err != nil {
return err
......@@ -1635,16 +1634,9 @@ func deleteDataFromNormalBucket(fdbPath string, index *Index, paths []string) (r
// }
// }
// delete file
lock, ferr := acquireFileLock(path)
if ferr != nil {
errList = append(errList, errormdl.Wrap("unable to delete file : "+path+ferr.Error()))
continue
}
err = filemdl.DeleteFile(path)
ferr = releaseFileLock(lock)
if ferr != nil {
errList = append(errList, errormdl.Wrap("unable to delete file : "+path+ferr.Error()))
}
err = deleteNormalFile(path)
if err != nil {
errList = append(errList, errormdl.Wrap("unable to delete file : "+path+err.Error()))
continue
......@@ -1663,6 +1655,15 @@ func deleteDataFromNormalBucket(fdbPath string, index *Index, paths []string) (r
return
}
func deleteNormalFile(filePath string) error {
lock, ferr := filemdl.AcquireFileLock(filePath)
defer filemdl.ReleaseFileLock(lock)
if ferr != nil {
return errormdl.Wrap("unable to delete file : " + filePath + ferr.Error())
}
return filemdl.DeleteFile(filePath)
}
func deleteDataFromInFileIndexBucket(fdbPath string, bucket *Bucket, paths []string, rs *gjson.Result, infileIndexQueries []string) (recordsDeleted int, errList []error) {
fileType := rs.Get("fileType").String()
......@@ -1678,19 +1679,10 @@ func deleteDataFromInFileIndexBucket(fdbPath string, bucket *Bucket, paths []str
noDataFoundCnt := 0
for _, filePath := range paths {
path := filepath.Join(fdbPath, filePath)
lock, err := acquireFileLock(path)
if err != nil {
errList = append(errList, err)
continue
}
err = deletDataFromInfileIndex(path, fileType, infileIndexQueries)
err := deletDataFromInfileIndex(path, fileType, infileIndexQueries)
if err == ErrNoDataFound {
noDataFoundCnt++
// continue
}
err = releaseFileLock(lock)
if err != nil {
errList = append(errList, err)
continue
}
if err != nil {
......@@ -1856,20 +1848,6 @@ func ReadDataFromFDB(dbName, indexID string, rs *gjson.Result, query []string, i
}
func acquireFileLock(filePath string) (*fslock.Lock, error) {
lock := fslock.New(filePath)
lockErr := lock.Lock()
if lockErr != nil {
loggermdl.LogError("failed to acquire lock > " + lockErr.Error())
return nil, errormdl.Wrap("failed to acquire lock > " + lockErr.Error())
}
return lock, nil
}
func releaseFileLock(lock *fslock.Lock) error {
return lock.Unlock()
}
// UpdateDataInFDB - UpdateDataInFDB
func UpdateDataInFDB(dbName, indexID string, rs *gjson.Result, query []string, infileIndexQuery []string) (*gjson.Result, []error) {
var errList []error
......
......@@ -9,6 +9,8 @@ import (
"strings"
"time"
"github.com/juju/fslock"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/cachemdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/loggermdl"
......@@ -147,10 +149,12 @@ func AppendFile(filename string, text string) (int, error) {
//DeleteFile deletes provided file path
func DeleteFile(filePath string) error {
path, linkErr := os.Readlink(filePath)
if errormdl.CheckErr1(linkErr) == nil {
filePath = path
}
err := os.Remove(filePath)
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
......@@ -754,3 +758,19 @@ func WriteFileAtOffset(f *os.File, startOffset int64, bytesToWrite []byte) (int6
return int64(n), nil
}
// AcquireFileLock -
func AcquireFileLock(filePath string) (*fslock.Lock, error) {
lock := fslock.New(filePath)
lockErr := lock.Lock()
if lockErr != nil {
loggermdl.LogError("failed to acquire lock > " + lockErr.Error())
return nil, errormdl.Wrap("failed to acquire lock > " + lockErr.Error())
}
return lock, nil
}
// ReleaseFileLock
func ReleaseFileLock(lock *fslock.Lock) error {
return lock.Unlock()
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment