Commit b82ec0a1 authored by Ajit Jagtap's avatar Ajit Jagtap
Browse files

Merge branch 'fdb_file_lock' into 'devbranch'

added filelock while performing data manipulation on fdb bucket

See merge request !118
parents 75f3367d 01f729d6
2 merge requests!11915 Oct MEP Merge Dev to Stg,!118added filelock while performing data manipulation on fdb bucket
Showing with 94 additions and 61 deletions
...@@ -237,7 +237,7 @@ func (fdb *FDB) bucketNameResolver(bucketID string, rs *gjson.Result) (string, e ...@@ -237,7 +237,7 @@ func (fdb *FDB) bucketNameResolver(bucketID string, rs *gjson.Result) (string, e
name = rs.Get(name).String() name = rs.Get(name).String()
} }
if name == "" { if name == "" {
return name, errormdl.Wrap("Bucket name should not be empty: " + bucketID) return name, errormdl.Wrap("Bucket name should not be empty: " + bucket.BucketNameQuery)
} }
return name, nil return name, nil
} }
...@@ -257,7 +257,7 @@ func (fdb *FDB) resolveIndex(index *Index, rs *gjson.Result) (string, error) { ...@@ -257,7 +257,7 @@ func (fdb *FDB) resolveIndex(index *Index, rs *gjson.Result) (string, error) {
indexName = rs.Get(index.IndexNameQuery).String() indexName = rs.Get(index.IndexNameQuery).String()
} }
if indexName == "" { if indexName == "" {
return path, errormdl.Wrap("primary key value not provided:" + index.IndexID) return path, errormdl.Wrap("required attribute not provided:" + index.IndexNameQuery)
} }
path = filepath.Join(path, indexName) path = filepath.Join(path, indexName)
return path, nil return path, nil
...@@ -278,11 +278,8 @@ func createIndexJSON(index *Index, rs *gjson.Result) (string, error) { ...@@ -278,11 +278,8 @@ func createIndexJSON(index *Index, rs *gjson.Result) (string, error) {
// updateIndexJSON - update JSON with index field data // updateIndexJSON - update JSON with index field data
func updateIndexJSON(index *Index, existingData string, rs *gjson.Result) (string, error) { func updateIndexJSON(index *Index, existingData string, rs *gjson.Result) (string, error) {
json := existingData json := existingData
loggermdl.LogError("Existing json - ", json)
var err error var err error
for _, indexField := range index.IndexFields { for _, indexField := range index.IndexFields {
// loggermdl.LogError("indexField - ", indexField)
// loggermdl.LogError("rs.Get(indexField.Query).Value() - ", rs.Get(indexField.Query).Value())
if rs.Get(indexField.Query).Value() == nil { if rs.Get(indexField.Query).Value() == nil {
continue continue
} }
...@@ -311,7 +308,6 @@ func UpdateIndexLazyObjectInCache(indexID string, indexData *Index) error { ...@@ -311,7 +308,6 @@ func UpdateIndexLazyObjectInCache(indexID string, indexData *Index) error {
// lazy cache must be present for provided indexID // lazy cache must be present for provided indexID
lazyObj, ok := IndexLazyObjHolder.Get(indexID) lazyObj, ok := IndexLazyObjHolder.Get(indexID)
if !ok { if !ok {
// loggermdl.LogError(IndexLazyObjHolder.GetItems())
loggermdl.LogError("index not found in lazy writer cache") loggermdl.LogError("index not found in lazy writer cache")
return errormdl.Wrap("index not found in lazy writer cache") return errormdl.Wrap("index not found in lazy writer cache")
} }
...@@ -330,7 +326,6 @@ func UpdateIndexLazyObjectInCache(indexID string, indexData *Index) error { ...@@ -330,7 +326,6 @@ func UpdateIndexLazyObjectInCache(indexID string, indexData *Index) error {
} }
IndexLazyObjHolder.SetNoExpiration(indexID, idxLazyData) IndexLazyObjHolder.SetNoExpiration(indexID, idxLazyData)
loggermdl.LogError(indexID, " updated with - ", idxLazyData)
return nil return nil
} }
...@@ -457,7 +452,6 @@ func (fdb *FDB) readDataFile(path []string) (*gjson.Result, error) { ...@@ -457,7 +452,6 @@ func (fdb *FDB) readDataFile(path []string) (*gjson.Result, error) {
sb := strings.Builder{} sb := strings.Builder{}
sb.WriteString("[") sb.WriteString("[")
for i := 0; i < len(path); i++ { for i := 0; i < len(path); i++ {
loggermdl.LogError("path[i] - ", path[i])
updatedPath := filepath.Join(fdb.DBPath, path[i]) updatedPath := filepath.Join(fdb.DBPath, path[i])
ba := []byte{'{', '}'} ba := []byte{'{', '}'}
var err error var err error
...@@ -468,7 +462,7 @@ func (fdb *FDB) readDataFile(path []string) (*gjson.Result, error) { ...@@ -468,7 +462,7 @@ func (fdb *FDB) readDataFile(path []string) (*gjson.Result, error) {
return nil, err return nil, err
} }
loggermdl.LogError("File data for index = ", i, " - ", string(ba)) // loggermdl.LogError("File data for index = ", i, " - ", string(ba))
_, err = sb.Write(ba) _, err = sb.Write(ba)
if err != nil { if err != nil {
loggermdl.LogError(err) loggermdl.LogError(err)
...@@ -479,9 +473,7 @@ func (fdb *FDB) readDataFile(path []string) (*gjson.Result, error) { ...@@ -479,9 +473,7 @@ func (fdb *FDB) readDataFile(path []string) (*gjson.Result, error) {
} }
sb.WriteString("]") sb.WriteString("]")
finalResult := strings.Replace(sb.String(), ",]", "]", 1) finalResult := strings.Replace(sb.String(), ",]", "]", 1)
loggermdl.LogError("Final result - ", finalResult)
rs := gjson.Parse(finalResult) rs := gjson.Parse(finalResult)
loggermdl.LogError("Final data - ", rs.String())
return &rs, nil return &rs, nil
} }
...@@ -522,7 +514,13 @@ func GetDataByConcat(dbName, indexID string, query ...string) (*gjson.Result, er ...@@ -522,7 +514,13 @@ func GetDataByConcat(dbName, indexID string, query ...string) (*gjson.Result, er
} }
func saveDataInNormalBucket(filePath string, rs *gjson.Result) error { func saveDataInNormalBucket(filePath string, rs *gjson.Result) error {
err := filemdl.WriteFile(filePath, []byte(rs.String()), true, false) fileLock, err := filemdl.AcquireFileLock(filePath)
defer filemdl.ReleaseFileLock(fileLock)
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
return errormdl.CheckErr(err)
}
err = filemdl.WriteFile(filePath, []byte(rs.String()), true, false)
if errormdl.CheckErr(err) != nil { if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err) loggermdl.LogError(err)
return errormdl.CheckErr(err) return errormdl.CheckErr(err)
...@@ -550,28 +548,21 @@ func saveDataInPackBucket(fdb *FDB, filePath string, index *Index, rs *gjson.Res ...@@ -550,28 +548,21 @@ func saveDataInPackBucket(fdb *FDB, filePath string, index *Index, rs *gjson.Res
if len(fileType) == 0 { if len(fileType) == 0 {
return errormdl.Wrap("please specify fileType") return errormdl.Wrap("please specify fileType")
} }
var f *os.File
var err error
isFilePresent := filemdl.FileAvailabilityCheck(filePath) isFilePresent := filemdl.FileAvailabilityCheck(filePath)
if !isFilePresent { if !isFilePresent {
dir, _ := filepath.Split(filePath) dir, _ := filepath.Split(filePath)
err = filemdl.CreateDirectoryRecursive(dir) err := filemdl.CreateDirectoryRecursive(dir)
if errormdl.CheckErr(err) != nil { if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err) loggermdl.LogError(err)
return errormdl.CheckErr(err) return errormdl.CheckErr(err)
} }
// initailize file with headers and fdb index entry
f, err = filemdl.OpenFile(filePath, os.O_CREATE|os.O_RDWR|os.O_SYNC, 0777) err = initializeWithHeader(filePath)
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
return errormdl.CheckErr(err)
}
err := initializeWithHeader(f)
if err != nil { if err != nil {
loggermdl.LogError(err) loggermdl.LogError(err)
return err return err
} }
err = f.Close()
err = addFDBIndexEntryFile(filePath, bucket, index.IndexFields, rs) err = addFDBIndexEntryFile(filePath, bucket, index.IndexFields, rs)
if err != nil { if err != nil {
loggermdl.LogError("fail to add fdb index entry in file: ", err) loggermdl.LogError("fail to add fdb index entry in file: ", err)
...@@ -618,13 +609,20 @@ func addDataToSpecialBucketFile(filePath string, bucket *Bucket, rs *gjson.Resul ...@@ -618,13 +609,20 @@ func addDataToSpecialBucketFile(filePath string, bucket *Bucket, rs *gjson.Resul
if !ok { if !ok {
return errormdl.Wrap("infileIndex for specified fileType not found") return errormdl.Wrap("infileIndex for specified fileType not found")
} }
fileLock, err := filemdl.AcquireFileLock(filePath)
defer filemdl.ReleaseFileLock(fileLock)
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
return errormdl.CheckErr(err)
}
f, err := filemdl.OpenFile(filePath, os.O_RDWR|os.O_SYNC, 0777) f, err := filemdl.OpenFile(filePath, os.O_RDWR|os.O_SYNC, 0777)
if errormdl.CheckErr(err) != nil { if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err) loggermdl.LogError(err)
return errormdl.CheckErr(err) return errormdl.CheckErr(err)
} }
// if file is being created for first time // if file is being created for first time
defer f.Close()
fileStatus := fileStatusReady fileStatus := fileStatusReady
// if isFilePresent { // if isFilePresent {
fileStatus, err = getFileStatus(f) fileStatus, err = getFileStatus(f)
...@@ -701,7 +699,7 @@ func addDataToSpecialBucketFile(filePath string, bucket *Bucket, rs *gjson.Resul ...@@ -701,7 +699,7 @@ func addDataToSpecialBucketFile(filePath string, bucket *Bucket, rs *gjson.Resul
return err return err
} }
f.Sync() f.Sync()
f.Close()
} else { } else {
// retry after timeout // retry after timeout
} }
...@@ -737,10 +735,8 @@ func getInFileIndexData(f *os.File) (string, error) { ...@@ -737,10 +735,8 @@ func getInFileIndexData(f *os.File) (string, error) {
func updateDataInNormalBucket(fdb *FDB, filePaths []string, rs *gjson.Result) (*gjson.Result, []string, []error) { func updateDataInNormalBucket(fdb *FDB, filePaths []string, rs *gjson.Result) (*gjson.Result, []string, []error) {
resultStr := "[]" resultStr := "[]"
errList := []error{} errList := []error{}
loggermdl.LogError("filePaths", filePaths)
filesUpdated, updatedCnt := make([]string, len(filePaths)), 0 filesUpdated, updatedCnt := make([]string, len(filePaths)), 0
for _, filePath := range filePaths { for _, filePath := range filePaths {
loggermdl.LogError("path", filePath)
path := filepath.Join(fdb.DBPath, filePath) path := filepath.Join(fdb.DBPath, filePath)
data, err := updateFileData(path, *rs) data, err := updateFileData(path, *rs)
if err != nil { if err != nil {
...@@ -757,15 +753,17 @@ func updateDataInNormalBucket(fdb *FDB, filePaths []string, rs *gjson.Result) (* ...@@ -757,15 +753,17 @@ func updateDataInNormalBucket(fdb *FDB, filePaths []string, rs *gjson.Result) (*
loggermdl.LogError("invalid path for update - ", filePath) loggermdl.LogError("invalid path for update - ", filePath)
} }
} }
loggermdl.LogError("updatedCnt", updatedCnt)
loggermdl.LogError("filesUpdated ", filesUpdated)
loggermdl.LogError("filesUpdatedslice ", filesUpdated[:updatedCnt])
loggermdl.LogError("filesUpdatedslice ", filesUpdated[:updatedCnt-1])
result := gjson.Parse(resultStr) result := gjson.Parse(resultStr)
return &result, filesUpdated[:updatedCnt], errList return &result, filesUpdated[:updatedCnt], errList
} }
func updateFileData(filePath string, rs gjson.Result) (*gjson.Result, error) { func updateFileData(filePath string, rs gjson.Result) (*gjson.Result, error) {
fileLock, err := filemdl.AcquireFileLock(filePath)
defer filemdl.ReleaseFileLock(fileLock)
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
return nil, errormdl.CheckErr(err)
}
data, err := filemdl.FastReadFile(filePath) data, err := filemdl.FastReadFile(filePath)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -792,21 +790,19 @@ func updateDataInFileIndexBucket(fdb *FDB, bucket *Bucket, filePaths []string, r ...@@ -792,21 +790,19 @@ func updateDataInFileIndexBucket(fdb *FDB, bucket *Bucket, filePaths []string, r
dataUpdatedAtFilePathsCnt := 0 dataUpdatedAtFilePathsCnt := 0
for _, path := range filePaths { for _, path := range filePaths {
filePath := filepath.Join(fdb.DBPath, path) filePath := filepath.Join(fdb.DBPath, path)
resultArray, err := updateDataWithInFileIndex(filePath, bucket, rs, infileIndexQuery) resultArray, err := updateDataWithInFileIndex(filePath, bucket, rs, infileIndexQuery)
if err == ErrNoDataFound {
continue
}
if err != nil { if err != nil {
errList = append(errList, err) errList = append(errList, err)
continue continue
} }
// dataUpdatedAtFilePaths = append(dataUpdatedAtFilePaths, path) // dataUpdatedAtFilePaths = append(dataUpdatedAtFilePaths, path)
dataUpdatedAtFilePaths[dataUpdatedAtFilePathsCnt] = path dataUpdatedAtFilePaths[dataUpdatedAtFilePathsCnt] = path
dataUpdatedAtFilePathsCnt++ dataUpdatedAtFilePathsCnt++
finalResultArray = append(finalResultArray, resultArray.Array()...) finalResultArray = append(finalResultArray, resultArray.Array()...)
} }
loggermdl.LogError("finalResultArray", finalResultArray)
resultListStr := "[]" resultListStr := "[]"
for _, resultObj := range finalResultArray { for _, resultObj := range finalResultArray {
resultListStr, _ = sjson.Set(resultListStr, "-1", resultObj.Value()) resultListStr, _ = sjson.Set(resultListStr, "-1", resultObj.Value())
...@@ -834,6 +830,11 @@ func updateDataWithInFileIndex(filePath string, bucket *Bucket, rs *gjson.Result ...@@ -834,6 +830,11 @@ func updateDataWithInFileIndex(filePath string, bucket *Bucket, rs *gjson.Result
if !filemdl.FileAvailabilityCheck(filePath) { if !filemdl.FileAvailabilityCheck(filePath) {
return nil, errormdl.Wrap("file not found: " + filePath) return nil, errormdl.Wrap("file not found: " + filePath)
} }
lock, err := filemdl.AcquireFileLock(filePath)
defer filemdl.ReleaseFileLock(lock)
if err != nil {
return nil, errormdl.Wrap("fail to update data" + err.Error())
}
f, err := filemdl.OpenFile(filePath, os.O_RDWR|os.O_SYNC, 0777) f, err := filemdl.OpenFile(filePath, os.O_RDWR|os.O_SYNC, 0777)
if err != nil { if err != nil {
return nil, errormdl.Wrap("fail to update data" + err.Error()) return nil, errormdl.Wrap("fail to update data" + err.Error())
...@@ -843,20 +844,16 @@ func updateDataWithInFileIndex(filePath string, bucket *Bucket, rs *gjson.Result ...@@ -843,20 +844,16 @@ func updateDataWithInFileIndex(filePath string, bucket *Bucket, rs *gjson.Result
loggermdl.LogError(err) loggermdl.LogError(err)
return nil, err return nil, err
} }
loggermdl.LogError("indexDataString", indexDataString)
infileIndexQueries = append(infileIndexQueries, `#[fileType==`+requestedFileType+`]`) infileIndexQueries = append(infileIndexQueries, `#[fileType==`+requestedFileType+`]`)
indexRows := gjson.Parse(indexDataString) indexRows := gjson.Parse(indexDataString)
indexRecordsToUpdate := indexRows indexRecordsToUpdate := indexRows
for _, query := range infileIndexQueries { for _, query := range infileIndexQueries {
indexRecordsToUpdate = indexRecordsToUpdate.Get(query + "#") indexRecordsToUpdate = indexRecordsToUpdate.Get(query + "#")
} }
loggermdl.LogError("indexRecordsToUpdate - ", indexRecordsToUpdate)
indexRecordsToUpdateObjs := indexRecordsToUpdate.Array() indexRecordsToUpdateObjs := indexRecordsToUpdate.Array()
loggermdl.LogError("Len indexRecordsToUpdateObjs - ", len(indexRecordsToUpdateObjs))
if len(indexRecordsToUpdateObjs) == 0 { if len(indexRecordsToUpdateObjs) == 0 {
return nil, ErrNoDataFound return nil, ErrNoDataFound
} }
loggermdl.LogError("indexRowObjs", indexRecordsToUpdateObjs)
// updating first record // updating first record
resultArrayStr := "[]" resultArrayStr := "[]"
for _, recordToUpdateIndexRow := range indexRecordsToUpdateObjs { for _, recordToUpdateIndexRow := range indexRecordsToUpdateObjs {
...@@ -872,7 +869,6 @@ func updateDataWithInFileIndex(filePath string, bucket *Bucket, rs *gjson.Result ...@@ -872,7 +869,6 @@ func updateDataWithInFileIndex(filePath string, bucket *Bucket, rs *gjson.Result
} }
func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gjson.Result, rs, existingIndexRows gjson.Result) (*gjson.Result, error) { func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gjson.Result, rs, existingIndexRows gjson.Result) (*gjson.Result, error) {
loggermdl.LogError("recordToUpdateIndexRow", recordToUpdateIndexRow)
fileStartOffset := recordToUpdateIndexRow.Get("startOffset").Int() fileStartOffset := recordToUpdateIndexRow.Get("startOffset").Int()
dataSize := recordToUpdateIndexRow.Get("dataSize").Int() dataSize := recordToUpdateIndexRow.Get("dataSize").Int()
if fileStartOffset == 0 || dataSize == 0 { if fileStartOffset == 0 || dataSize == 0 {
...@@ -888,7 +884,6 @@ func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gj ...@@ -888,7 +884,6 @@ func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gj
if err != nil { if err != nil {
return nil, err return nil, err
} }
loggermdl.LogError("existingData", existingData)
updatedDataStr := strings.TrimSpace(string(existingData)) updatedDataStr := strings.TrimSpace(string(existingData))
rs.ForEach(func(key, val gjson.Result) bool { rs.ForEach(func(key, val gjson.Result) bool {
...@@ -896,7 +891,6 @@ func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gj ...@@ -896,7 +891,6 @@ func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gj
updatedDataStr, _ = sjson.Set(updatedDataStr, key.String(), val.Value()) updatedDataStr, _ = sjson.Set(updatedDataStr, key.String(), val.Value())
return true return true
}) })
loggermdl.LogError("updatedDataStr", updatedDataStr)
newDataSize := int64(len(updatedDataStr)) newDataSize := int64(len(updatedDataStr))
footerStartOffset := getFooterOffset(f) footerStartOffset := getFooterOffset(f)
updatedFooterOffset := footerStartOffset updatedFooterOffset := footerStartOffset
...@@ -921,7 +915,6 @@ func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gj ...@@ -921,7 +915,6 @@ func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gj
// if err != nil { // if err != nil {
// return nil, err // return nil, err
// } // }
loggermdl.LogError("existingIndexRows - ", existingIndexRows)
// update startofffset and datasize in infile index // update startofffset and datasize in infile index
// updateIndexRow // updateIndexRow
updatedIndexRowStr := recordToUpdateIndexRow.String() updatedIndexRowStr := recordToUpdateIndexRow.String()
...@@ -935,7 +928,6 @@ func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gj ...@@ -935,7 +928,6 @@ func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gj
}) })
updatedIndexRowStr, _ = sjson.Set(updatedIndexRowStr, "startOffset", fileStartOffset) updatedIndexRowStr, _ = sjson.Set(updatedIndexRowStr, "startOffset", fileStartOffset)
updatedIndexRowStr, _ = sjson.Set(updatedIndexRowStr, "dataSize", newDataSize) updatedIndexRowStr, _ = sjson.Set(updatedIndexRowStr, "dataSize", newDataSize)
loggermdl.LogError("updatedIndexRowStr - ", updatedIndexRowStr)
updatedIndexRows, err := updateIndexRow(existingIndexRows, recordToUpdateIndexRow, gjson.Parse(updatedIndexRowStr)) updatedIndexRows, err := updateIndexRow(existingIndexRows, recordToUpdateIndexRow, gjson.Parse(updatedIndexRowStr))
...@@ -976,8 +968,6 @@ func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gj ...@@ -976,8 +968,6 @@ func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gj
} }
func updateIndexRow(indexRows gjson.Result, previousIndexRow gjson.Result, updatedRow gjson.Result) (gjson.Result, error) { func updateIndexRow(indexRows gjson.Result, previousIndexRow gjson.Result, updatedRow gjson.Result) (gjson.Result, error) {
loggermdl.LogError("Index rows to update - ", indexRows.String())
loggermdl.LogError("Ilength - ", len(indexRows.Array()))
indexRowObjs := indexRows.Array() indexRowObjs := indexRows.Array()
if len(indexRowObjs) == 0 { if len(indexRowObjs) == 0 {
return indexRows, errormdl.Wrap("no data found to update") return indexRows, errormdl.Wrap("no data found to update")
...@@ -1008,8 +998,20 @@ func updateIndexRow(indexRows gjson.Result, previousIndexRow gjson.Result, updat ...@@ -1008,8 +998,20 @@ func updateIndexRow(indexRows gjson.Result, previousIndexRow gjson.Result, updat
return gjson.Parse(updatedIndexDataString), nil return gjson.Parse(updatedIndexDataString), nil
} }
func initializeWithHeader(f *os.File) error { func initializeWithHeader(filePath string) error {
_, err := f.WriteAt([]byte(strconv.Itoa(fileStatusReady)), fileStatusOffsetInFile) fileLock, err := filemdl.AcquireFileLock(filePath)
defer filemdl.ReleaseFileLock(fileLock)
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
return errormdl.CheckErr(err)
}
// initailize file with headers and fdb index entry
f, err := filemdl.OpenFile(filePath, os.O_CREATE|os.O_RDWR|os.O_SYNC, 0777)
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
return errormdl.CheckErr(err)
}
_, err = f.WriteAt([]byte(strconv.Itoa(fileStatusReady)), fileStatusOffsetInFile)
// isFile ready for upload =0 // isFile ready for upload =0
_, err = f.WriteAt([]byte("0"), isReadyForUploadOffsetInFile) _, err = f.WriteAt([]byte("0"), isReadyForUploadOffsetInFile)
_, err = f.WriteAt([]byte("0"), isUpdatedAndNotCommitedOffsetInFile) _, err = f.WriteAt([]byte("0"), isUpdatedAndNotCommitedOffsetInFile)
...@@ -1028,7 +1030,8 @@ func initializeWithHeader(f *os.File) error { ...@@ -1028,7 +1030,8 @@ func initializeWithHeader(f *os.File) error {
loggermdl.LogError(err) loggermdl.LogError(err)
return err return err
} }
return nil err = f.Close()
return err
} }
func appendPaddingToNumber(value int64, padNumber int) string { func appendPaddingToNumber(value int64, padNumber int) string {
...@@ -1154,7 +1157,6 @@ func encryptData(data, key []byte) (dataOut []byte, err error) { ...@@ -1154,7 +1157,6 @@ func encryptData(data, key []byte) (dataOut []byte, err error) {
} }
func decryptData(data, key []byte) (dataOut []byte, err error) { func decryptData(data, key []byte) (dataOut []byte, err error) {
// loggermdl.LogDebug("data", string(data))
dataOut, err = securitymdl.AESDecrypt(data, key) dataOut, err = securitymdl.AESDecrypt(data, key)
if errormdl.CheckErr1(err) != nil { if errormdl.CheckErr1(err) != nil {
...@@ -1221,7 +1223,6 @@ func getFileDataFromFile(f *os.File, startOffset, dataSize int64) ([]byte, error ...@@ -1221,7 +1223,6 @@ func getFileDataFromFile(f *os.File, startOffset, dataSize int64) ([]byte, error
} }
return dataByte, nil return dataByte, nil
} }
// loggermdl.LogDebug("dataByte", string(dataByte))
return dataByte, err return dataByte, err
} }
...@@ -1261,7 +1262,6 @@ func getFileDataUsingInFileIndex(filePath string, bucket *Bucket, requestedFileT ...@@ -1261,7 +1262,6 @@ func getFileDataUsingInFileIndex(filePath string, bucket *Bucket, requestedFileT
loggermdl.LogError("index data not found: ", filePath, err) loggermdl.LogError("index data not found: ", filePath, err)
return "", err return "", err
} }
// loggermdl.LogDebug("indexDataString", indexDataString)
indexData := gjson.Parse(indexDataString) indexData := gjson.Parse(indexDataString)
indexRows := indexData.Get(`#[fileType==` + requestedFileType + `]#`) indexRows := indexData.Get(`#[fileType==` + requestedFileType + `]#`)
for i := 0; i < len(inFileIndexQuery); i++ { for i := 0; i < len(inFileIndexQuery); i++ {
...@@ -1272,9 +1272,7 @@ func getFileDataUsingInFileIndex(filePath string, bucket *Bucket, requestedFileT ...@@ -1272,9 +1272,7 @@ func getFileDataUsingInFileIndex(filePath string, bucket *Bucket, requestedFileT
indexRows.ForEach(func(key, indexRow gjson.Result) bool { indexRows.ForEach(func(key, indexRow gjson.Result) bool {
// read files // read files
startOffSet := indexRow.Get("startOffset").Int() startOffSet := indexRow.Get("startOffset").Int()
loggermdl.LogError("startOffSet - ", startOffSet)
dataSize := indexRow.Get("dataSize").Int() dataSize := indexRow.Get("dataSize").Int()
loggermdl.LogError("dataSize - ", dataSize)
if startOffSet == 0 || dataSize == 0 { if startOffSet == 0 || dataSize == 0 {
return true return true
} }
...@@ -1305,6 +1303,12 @@ func deletDataFromInfileIndex(filePath string, fileType string, infileIndexQueri ...@@ -1305,6 +1303,12 @@ func deletDataFromInfileIndex(filePath string, fileType string, infileIndexQueri
// get update index records // get update index records
// save updated index // save updated index
// update index size header // update index size header
lock, err := filemdl.AcquireFileLock(filePath)
defer filemdl.ReleaseFileLock(lock)
if err != nil {
return err
}
f, err := filemdl.OpenFile(filePath, os.O_RDWR|os.O_SYNC, 0777) f, err := filemdl.OpenFile(filePath, os.O_RDWR|os.O_SYNC, 0777)
if err != nil { if err != nil {
return err return err
...@@ -1321,7 +1325,6 @@ func deletDataFromInfileIndex(filePath string, fileType string, infileIndexQueri ...@@ -1321,7 +1325,6 @@ func deletDataFromInfileIndex(filePath string, fileType string, infileIndexQueri
for _, query := range infileIndexQueries { for _, query := range infileIndexQueries {
indexRecordsToDelete = indexRecordsToDelete.Get(query + "#") indexRecordsToDelete = indexRecordsToDelete.Get(query + "#")
} }
loggermdl.LogError(indexRecordsToDelete)
indexRowsToDelete := indexRecordsToDelete.Array() indexRowsToDelete := indexRecordsToDelete.Array()
if len(indexRowsToDelete) == 0 { if len(indexRowsToDelete) == 0 {
return ErrNoDataFound return ErrNoDataFound
...@@ -1631,7 +1634,9 @@ func deleteDataFromNormalBucket(fdbPath string, index *Index, paths []string) (r ...@@ -1631,7 +1634,9 @@ func deleteDataFromNormalBucket(fdbPath string, index *Index, paths []string) (r
// } // }
// } // }
// delete file // delete file
err = filemdl.DeleteFile(path)
err = deleteNormalFile(path)
if err != nil { if err != nil {
errList = append(errList, errormdl.Wrap("unable to delete file : "+path+err.Error())) errList = append(errList, errormdl.Wrap("unable to delete file : "+path+err.Error()))
continue continue
...@@ -1650,6 +1655,15 @@ func deleteDataFromNormalBucket(fdbPath string, index *Index, paths []string) (r ...@@ -1650,6 +1655,15 @@ func deleteDataFromNormalBucket(fdbPath string, index *Index, paths []string) (r
return return
} }
func deleteNormalFile(filePath string) error {
lock, ferr := filemdl.AcquireFileLock(filePath)
defer filemdl.ReleaseFileLock(lock)
if ferr != nil {
return errormdl.Wrap("unable to delete file : " + filePath + ferr.Error())
}
return filemdl.DeleteFile(filePath)
}
func deleteDataFromInFileIndexBucket(fdbPath string, bucket *Bucket, paths []string, rs *gjson.Result, infileIndexQueries []string) (recordsDeleted int, errList []error) { func deleteDataFromInFileIndexBucket(fdbPath string, bucket *Bucket, paths []string, rs *gjson.Result, infileIndexQueries []string) (recordsDeleted int, errList []error) {
fileType := rs.Get("fileType").String() fileType := rs.Get("fileType").String()
...@@ -1665,6 +1679,7 @@ func deleteDataFromInFileIndexBucket(fdbPath string, bucket *Bucket, paths []str ...@@ -1665,6 +1679,7 @@ func deleteDataFromInFileIndexBucket(fdbPath string, bucket *Bucket, paths []str
noDataFoundCnt := 0 noDataFoundCnt := 0
for _, filePath := range paths { for _, filePath := range paths {
path := filepath.Join(fdbPath, filePath) path := filepath.Join(fdbPath, filePath)
err := deletDataFromInfileIndex(path, fileType, infileIndexQueries) err := deletDataFromInfileIndex(path, fileType, infileIndexQueries)
if err == ErrNoDataFound { if err == ErrNoDataFound {
noDataFoundCnt++ noDataFoundCnt++
...@@ -1944,7 +1959,7 @@ func DeleteDataFromFDB(dbName string, indexID string, rs *gjson.Result, queries ...@@ -1944,7 +1959,7 @@ func DeleteDataFromFDB(dbName string, indexID string, rs *gjson.Result, queries
loggermdl.LogError("index not found: " + indexID) loggermdl.LogError("index not found: " + indexID)
return recordsDeleted, []error{errormdl.Wrap("index not found: " + indexID)} return recordsDeleted, []error{errormdl.Wrap("index not found: " + indexID)}
} }
loggermdl.LogError("queries", queries)
filePaths := []string{} filePaths := []string{}
// loggermdl.LogDebug(query) // loggermdl.LogDebug(query)
index.indexObj.View(func(tx *buntdb.Tx) error { index.indexObj.View(func(tx *buntdb.Tx) error {
......
...@@ -18,12 +18,13 @@ var bucketInstance *Bucket ...@@ -18,12 +18,13 @@ var bucketInstance *Bucket
var indexInstance *Index var indexInstance *Index
func init() { func init() {
Init(true, false, "") Init(true, false, false, "")
db, err := CreateFDBInstance("/home/vivekn/fdb_data", "myfdb", false) db, err := CreateFDBInstance("/home/vivekn/fdb_data", "myfdb", false)
if err != nil { if err != nil {
log.Fatal("CreateFDBInstance = ", err) log.Fatal("CreateFDBInstance = ", err)
} }
dbInstance = db dbInstance = db
// step 1: create bucket // step 1: create bucket
bucket := db.GetNewBucket("Candidates", false, &Bucket{}) bucket := db.GetNewBucket("Candidates", false, &Bucket{})
......
...@@ -278,14 +278,11 @@ func GetLazyCallBackFunc(funcName string) (lazywriter.SaveDataFn, error) { ...@@ -278,14 +278,11 @@ func GetLazyCallBackFunc(funcName string) (lazywriter.SaveDataFn, error) {
} }
var lazyCallBackFnAppendBucket lazywriter.SaveDataFn = func(bucketId string, data *lazywriter.LazyCacheObject) { var lazyCallBackFnAppendBucket lazywriter.SaveDataFn = func(bucketId string, data *lazywriter.LazyCacheObject) {
loggermdl.LogError("Saving ", bucketId, " with data ", data)
dataInLazyMemory, ok := data.InterfaceData.(string) dataInLazyMemory, ok := data.InterfaceData.(string)
if !ok { if !ok {
return return
} }
_, fileName := filepath.Split(data.FileName) _, fileName := filepath.Split(data.FileName)
// loggermdl.LogError("dir ", dir, "name", name)
loggermdl.LogError("filepath to write append data lazily", data.Identifier)
fileName = filepath.Join(data.FileName, fileName) fileName = filepath.Join(data.FileName, fileName)
_, _, err := filemdl.AppendDataToFile(fileName, []byte(dataInLazyMemory+"\r\n"), true) _, _, err := filemdl.AppendDataToFile(fileName, []byte(dataInLazyMemory+"\r\n"), true)
if err != nil { if err != nil {
......
...@@ -9,6 +9,8 @@ import ( ...@@ -9,6 +9,8 @@ import (
"strings" "strings"
"time" "time"
"github.com/juju/fslock"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/cachemdl" "corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/cachemdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/loggermdl" "corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/loggermdl"
...@@ -147,10 +149,12 @@ func AppendFile(filename string, text string) (int, error) { ...@@ -147,10 +149,12 @@ func AppendFile(filename string, text string) (int, error) {
//DeleteFile deletes provided file path //DeleteFile deletes provided file path
func DeleteFile(filePath string) error { func DeleteFile(filePath string) error {
path, linkErr := os.Readlink(filePath) path, linkErr := os.Readlink(filePath)
if errormdl.CheckErr1(linkErr) == nil { if errormdl.CheckErr1(linkErr) == nil {
filePath = path filePath = path
} }
err := os.Remove(filePath) err := os.Remove(filePath)
if errormdl.CheckErr(err) != nil { if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err) loggermdl.LogError(err)
...@@ -754,3 +758,19 @@ func WriteFileAtOffset(f *os.File, startOffset int64, bytesToWrite []byte) (int6 ...@@ -754,3 +758,19 @@ func WriteFileAtOffset(f *os.File, startOffset int64, bytesToWrite []byte) (int6
return int64(n), nil return int64(n), nil
} }
// AcquireFileLock -
func AcquireFileLock(filePath string) (*fslock.Lock, error) {
lock := fslock.New(filePath)
lockErr := lock.Lock()
if lockErr != nil {
loggermdl.LogError("failed to acquire lock > " + lockErr.Error())
return nil, errormdl.Wrap("failed to acquire lock > " + lockErr.Error())
}
return lock, nil
}
// ReleaseFileLock
func ReleaseFileLock(lock *fslock.Lock) error {
return lock.Unlock()
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment