diff --git a/dalmdl/corefdb/bucket.go b/dalmdl/corefdb/bucket.go index 3b84a0b28231253ffddf510155fb593067895187..6b646f781b4f8ae3b0a639a2ed90620968c315a1 100644 --- a/dalmdl/corefdb/bucket.go +++ b/dalmdl/corefdb/bucket.go @@ -54,7 +54,8 @@ const ( // IndexKeyValSeperator - IndexKeyValSeperator = "=" // FileType - represents key for type of file. Used whenever we need to set file type field in json - FileType = "fileType" + FileType = "fileType" + FileTypeFDBIndex = "FDBIndex" // GetAll - wraper gjson query to return all results. GetAll = "#[*]" ) @@ -239,7 +240,7 @@ func (fdb *FDB) bucketNameResolver(bucketID string, rs *gjson.Result) (string, e name = rs.Get(name).String() } if name == "" { - return name, errormdl.Wrap("Bucket name should not be empty: " + bucketID) + return name, errormdl.Wrap("Bucket name should not be empty: " + bucket.BucketNameQuery) } return name, nil } @@ -259,7 +260,7 @@ func (fdb *FDB) resolveIndex(index *Index, rs *gjson.Result) (string, error) { indexName = rs.Get(index.IndexNameQuery).String() } if indexName == "" { - return path, errormdl.Wrap("primary key value not provided:" + index.IndexID) + return path, errormdl.Wrap("required attribute not provided:" + index.IndexNameQuery) } path = filepath.Join(path, indexName) return path, nil @@ -280,11 +281,8 @@ func createIndexJSON(index *Index, rs *gjson.Result) (string, error) { // updateIndexJSON - update JSON with index field data func updateIndexJSON(index *Index, existingData string, rs *gjson.Result) (string, error) { json := existingData - loggermdl.LogError("Existing json - ", json) var err error for _, indexField := range index.IndexFields { - // loggermdl.LogError("indexField - ", indexField) - // loggermdl.LogError("rs.Get(indexField.Query).Value() - ", rs.Get(indexField.Query).Value()) if rs.Get(indexField.Query).Value() == nil { continue } @@ -313,7 +311,6 @@ func UpdateIndexLazyObjectInCache(indexID string, indexData *Index) error { // lazy cache must be present for provided indexID lazyObj, ok := IndexLazyObjHolder.Get(indexID) if !ok { - // loggermdl.LogError(IndexLazyObjHolder.GetItems()) loggermdl.LogError("index not found in lazy writer cache") return errormdl.Wrap("index not found in lazy writer cache") } @@ -332,7 +329,6 @@ func UpdateIndexLazyObjectInCache(indexID string, indexData *Index) error { } IndexLazyObjHolder.SetNoExpiration(indexID, idxLazyData) - loggermdl.LogError(indexID, " updated with - ", idxLazyData) return nil } @@ -407,7 +403,6 @@ func AppendDataInLazyObjectInCache(bucketID string, data gjson.Result) error { AppendLazyObjHolder.SetNoExpiration(bucketID, bucketLazyData) mutex.Unlock() - loggermdl.LogError(bucketID, " updated with - ", bucketLazyData) return nil } @@ -459,7 +454,6 @@ func (fdb *FDB) readDataFile(path []string) (*gjson.Result, error) { sb := strings.Builder{} sb.WriteString("[") for i := 0; i < len(path); i++ { - loggermdl.LogError("path[i] - ", path[i]) updatedPath := filepath.Join(fdb.DBPath, path[i]) ba := []byte{'{', '}'} var err error @@ -470,7 +464,7 @@ func (fdb *FDB) readDataFile(path []string) (*gjson.Result, error) { return nil, err } - loggermdl.LogError("File data for index = ", i, " - ", string(ba)) + // loggermdl.LogError("File data for index = ", i, " - ", string(ba)) _, err = sb.Write(ba) if err != nil { loggermdl.LogError(err) @@ -481,9 +475,7 @@ func (fdb *FDB) readDataFile(path []string) (*gjson.Result, error) { } sb.WriteString("]") finalResult := strings.Replace(sb.String(), ",]", "]", 1) - loggermdl.LogError("Final result - ", finalResult) rs := gjson.Parse(finalResult) - loggermdl.LogError("Final data - ", rs.String()) return &rs, nil } @@ -524,6 +516,12 @@ func GetDataByConcat(dbName, indexID string, query ...string) (*gjson.Result, er } func saveDataInNormalBucket(filePath string, rs *gjson.Result) error { + // fileLock, err := filemdl.AcquireFileLock(filePath) + // defer filemdl.ReleaseFileLock(fileLock) + // if errormdl.CheckErr(err) != nil { + // loggermdl.LogError(err) + // return errormdl.CheckErr(err) + // } err := filemdl.WriteFile(filePath, []byte(rs.String()), true, false) if errormdl.CheckErr(err) != nil { loggermdl.LogError(err) @@ -552,28 +550,21 @@ func saveDataInPackBucket(fdb *FDB, filePath string, index *Index, rs *gjson.Res if len(fileType) == 0 { return errormdl.Wrap("please specify fileType") } - var f *os.File - var err error isFilePresent := filemdl.FileAvailabilityCheck(filePath) if !isFilePresent { dir, _ := filepath.Split(filePath) - err = filemdl.CreateDirectoryRecursive(dir) - if errormdl.CheckErr(err) != nil { - loggermdl.LogError(err) - return errormdl.CheckErr(err) - } - // initailize file with headers and fdb index entry - f, err = filemdl.OpenFile(filePath, os.O_CREATE|os.O_RDWR|os.O_SYNC, 0777) + err := filemdl.CreateDirectoryRecursive(dir) if errormdl.CheckErr(err) != nil { loggermdl.LogError(err) return errormdl.CheckErr(err) } - err := initializeWithHeader(f) + + err = initializeWithHeader(filePath) if err != nil { loggermdl.LogError(err) return err } - err = f.Close() + err = addFDBIndexEntryFile(filePath, bucket, index.IndexFields, rs) if err != nil { loggermdl.LogError("fail to add fdb index entry in file: ", err) @@ -620,13 +611,21 @@ func addDataToSpecialBucketFile(filePath string, bucket *Bucket, rs *gjson.Resul if !ok { return errormdl.Wrap("infileIndex for specified fileType not found") } + // fileLock, err := filemdl.AcquireFileLock(filePath) + // defer filemdl.ReleaseFileLock(fileLock) + + // if errormdl.CheckErr(err) != nil { + // loggermdl.LogError(err) + // return errormdl.CheckErr(err) + // } + f, err := filemdl.OpenFile(filePath, os.O_RDWR|os.O_SYNC, 0777) + defer f.Close() if errormdl.CheckErr(err) != nil { loggermdl.LogError(err) return errormdl.CheckErr(err) } // if file is being created for first time - defer f.Close() fileStatus := fileStatusReady // if isFilePresent { fileStatus, err = getFileStatus(f) @@ -703,7 +702,6 @@ func addDataToSpecialBucketFile(filePath string, bucket *Bucket, rs *gjson.Resul return err } f.Sync() - } else { // retry after timeout } @@ -739,10 +737,8 @@ func getInFileIndexData(f *os.File) (string, error) { func updateDataInNormalBucket(fdb *FDB, filePaths []string, rs *gjson.Result) (*gjson.Result, []string, []error) { resultStr := "[]" errList := []error{} - loggermdl.LogError("filePaths", filePaths) filesUpdated, updatedCnt := make([]string, len(filePaths)), 0 for _, filePath := range filePaths { - loggermdl.LogError("path", filePath) path := filepath.Join(fdb.DBPath, filePath) data, err := updateFileData(path, *rs) if err != nil { @@ -759,15 +755,17 @@ func updateDataInNormalBucket(fdb *FDB, filePaths []string, rs *gjson.Result) (* loggermdl.LogError("invalid path for update - ", filePath) } } - loggermdl.LogError("updatedCnt", updatedCnt) - loggermdl.LogError("filesUpdated ", filesUpdated) - loggermdl.LogError("filesUpdatedslice ", filesUpdated[:updatedCnt]) - loggermdl.LogError("filesUpdatedslice ", filesUpdated[:updatedCnt-1]) result := gjson.Parse(resultStr) return &result, filesUpdated[:updatedCnt], errList } func updateFileData(filePath string, rs gjson.Result) (*gjson.Result, error) { + // fileLock, err := filemdl.AcquireFileLock(filePath) + // defer filemdl.ReleaseFileLock(fileLock) + // if errormdl.CheckErr(err) != nil { + // loggermdl.LogError(err) + // return nil, errormdl.CheckErr(err) + // } data, err := filemdl.FastReadFile(filePath) if err != nil { return nil, err @@ -794,21 +792,19 @@ func updateDataInFileIndexBucket(fdb *FDB, bucket *Bucket, filePaths []string, r dataUpdatedAtFilePathsCnt := 0 for _, path := range filePaths { filePath := filepath.Join(fdb.DBPath, path) + resultArray, err := updateDataWithInFileIndex(filePath, bucket, rs, infileIndexQuery) - if err == ErrNoDataFound { - continue - } if err != nil { errList = append(errList, err) continue } + // dataUpdatedAtFilePaths = append(dataUpdatedAtFilePaths, path) dataUpdatedAtFilePaths[dataUpdatedAtFilePathsCnt] = path dataUpdatedAtFilePathsCnt++ finalResultArray = append(finalResultArray, resultArray.Array()...) } - loggermdl.LogError("finalResultArray", finalResultArray) resultListStr := "[]" for _, resultObj := range finalResultArray { resultListStr, _ = sjson.Set(resultListStr, "-1", resultObj.Value()) @@ -836,7 +832,13 @@ func updateDataWithInFileIndex(filePath string, bucket *Bucket, rs *gjson.Result if !filemdl.FileAvailabilityCheck(filePath) { return nil, errormdl.Wrap("file not found: " + filePath) } + // lock, err := filemdl.AcquireFileLock(filePath) + // defer filemdl.ReleaseFileLock(lock) + // if err != nil { + // return nil, errormdl.Wrap("fail to update data" + err.Error()) + // } f, err := filemdl.OpenFile(filePath, os.O_RDWR|os.O_SYNC, 0777) + defer f.Close() if err != nil { return nil, errormdl.Wrap("fail to update data" + err.Error()) } @@ -845,7 +847,6 @@ func updateDataWithInFileIndex(filePath string, bucket *Bucket, rs *gjson.Result loggermdl.LogError(err) return nil, err } - loggermdl.LogError("indexDataString", indexDataString) infileIndexQueries = append(infileIndexQueries, `#[fileType==`+requestedFileType+`]`) indexRows := gjson.Parse(indexDataString) indexRecordsToUpdate := indexRows @@ -856,13 +857,10 @@ func updateDataWithInFileIndex(filePath string, bucket *Bucket, rs *gjson.Result } indexRecordsToUpdate = indexRecordsToUpdate.Get(query + "#") } - loggermdl.LogError("indexRecordsToUpdate - ", indexRecordsToUpdate) indexRecordsToUpdateObjs := indexRecordsToUpdate.Array() - loggermdl.LogError("Len indexRecordsToUpdateObjs - ", len(indexRecordsToUpdateObjs)) if len(indexRecordsToUpdateObjs) == 0 { return nil, ErrNoDataFound } - loggermdl.LogError("indexRowObjs", indexRecordsToUpdateObjs) // updating first record resultArrayStr := "[]" for _, recordToUpdateIndexRow := range indexRecordsToUpdateObjs { @@ -878,7 +876,6 @@ func updateDataWithInFileIndex(filePath string, bucket *Bucket, rs *gjson.Result } func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gjson.Result, rs, existingIndexRows gjson.Result) (*gjson.Result, error) { - loggermdl.LogError("recordToUpdateIndexRow", recordToUpdateIndexRow) fileStartOffset := recordToUpdateIndexRow.Get("startOffset").Int() dataSize := recordToUpdateIndexRow.Get("dataSize").Int() if fileStartOffset == 0 || dataSize == 0 { @@ -894,7 +891,6 @@ func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gj if err != nil { return nil, err } - loggermdl.LogError("existingData", existingData) updatedDataStr := strings.TrimSpace(string(existingData)) rs.ForEach(func(key, val gjson.Result) bool { @@ -902,7 +898,6 @@ func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gj updatedDataStr, _ = sjson.Set(updatedDataStr, key.String(), val.Value()) return true }) - loggermdl.LogError("updatedDataStr", updatedDataStr) newDataSize := int64(len(updatedDataStr)) footerStartOffset := getFooterOffset(f) updatedFooterOffset := footerStartOffset @@ -927,7 +922,6 @@ func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gj // if err != nil { // return nil, err // } - loggermdl.LogError("existingIndexRows - ", existingIndexRows) // update startofffset and datasize in infile index // updateIndexRow updatedIndexRowStr := recordToUpdateIndexRow.String() @@ -941,7 +935,6 @@ func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gj }) updatedIndexRowStr, _ = sjson.Set(updatedIndexRowStr, "startOffset", fileStartOffset) updatedIndexRowStr, _ = sjson.Set(updatedIndexRowStr, "dataSize", newDataSize) - loggermdl.LogError("updatedIndexRowStr - ", updatedIndexRowStr) updatedIndexRows, err := updateIndexRow(existingIndexRows, recordToUpdateIndexRow, gjson.Parse(updatedIndexRowStr)) @@ -982,8 +975,6 @@ func updateDataInFileIndex(f *os.File, bucket *Bucket, recordToUpdateIndexRow gj } func updateIndexRow(indexRows gjson.Result, previousIndexRow gjson.Result, updatedRow gjson.Result) (gjson.Result, error) { - loggermdl.LogError("Index rows to update - ", indexRows.String()) - loggermdl.LogError("Ilength - ", len(indexRows.Array())) indexRowObjs := indexRows.Array() if len(indexRowObjs) == 0 { return indexRows, errormdl.Wrap("no data found to update") @@ -1014,27 +1005,70 @@ func updateIndexRow(indexRows gjson.Result, previousIndexRow gjson.Result, updat return gjson.Parse(updatedIndexDataString), nil } -func initializeWithHeader(f *os.File) error { - _, err := f.WriteAt([]byte(strconv.Itoa(fileStatusReady)), fileStatusOffsetInFile) +func initializeWithHeader(filePath string) error { + // fileLock, err := filemdl.AcquireFileLock(filePath) + // defer filemdl.ReleaseFileLock(fileLock) + // if errormdl.CheckErr(err) != nil { + // loggermdl.LogError(err) + // return errormdl.CheckErr(err) + // } + // initailize file with headers and fdb index entry + f, err := filemdl.OpenFile(filePath, os.O_CREATE|os.O_RDWR|os.O_SYNC, 0777) + defer f.Close() + if errormdl.CheckErr(err) != nil { + loggermdl.LogError(err) + return errormdl.CheckErr(err) + } + _, err = f.WriteAt([]byte(strconv.Itoa(fileStatusReady)), fileStatusOffsetInFile) + if err != nil { + loggermdl.LogError(err) + return err + } // isFile ready for upload =0 _, err = f.WriteAt([]byte("0"), isReadyForUploadOffsetInFile) + if err != nil { + loggermdl.LogError(err) + return err + } _, err = f.WriteAt([]byte("0"), isUpdatedAndNotCommitedOffsetInFile) + if err != nil { + loggermdl.LogError(err) + return err + } _, err = f.WriteAt([]byte("0"), isReorgRequiredOffsetInFile) + if err != nil { + loggermdl.LogError(err) + return err + } _, err = f.WriteAt([]byte("0"), isReindexRequiredOffsetInFile) + if err != nil { + loggermdl.LogError(err) + return err + } // _, err = f.WriteAt([]byte(appendPaddingToNumber(sizeReservedForHeaders, 15)), footerOffsetInFile) err = setFooterOffset(f, sizeReservedForHeaders+int64(len(lineBreak))) + if err != nil { + loggermdl.LogError(err) + return err + } err = setFooterSize(f, 0) - + if err != nil { + loggermdl.LogError(err) + return err + } _, err = f.WriteAt([]byte("filehash"), filehashOffest) + if err != nil { + loggermdl.LogError(err) + return err + } timestamp := strconv.FormatInt(time.Now().Unix(), 10) _, err = f.WriteAt([]byte(timestamp), lastUpdatedOffset) - _, err = f.WriteAt([]byte("\r\n"), sizeReservedForHeaders) - if err != nil { loggermdl.LogError(err) return err } - return nil + _, err = f.WriteAt([]byte("\r\n"), sizeReservedForHeaders) + return err } func appendPaddingToNumber(value int64, padNumber int) string { @@ -1160,7 +1194,6 @@ func encryptData(data, key []byte) (dataOut []byte, err error) { } func decryptData(data, key []byte) (dataOut []byte, err error) { - // loggermdl.LogDebug("data", string(data)) dataOut, err = securitymdl.AESDecrypt(data, key) if errormdl.CheckErr1(err) != nil { @@ -1197,8 +1230,6 @@ func addFileDataInFile(f *os.File, offset int64, data string, breakLine bool) (i if breakLine { dataBytes = append(dataBytes, []byte(lineBreak)...) } - s := fmt.Sprintf("%#v", string(dataBytes)) - fmt.Println(s) dataSize, err := filemdl.WriteFileAtOffset(f, offset, dataBytes) return dataSize, err } @@ -1227,7 +1258,6 @@ func getFileDataFromFile(f *os.File, startOffset, dataSize int64) ([]byte, error } return dataByte, nil } - // loggermdl.LogDebug("dataByte", string(dataByte)) return dataByte, err } @@ -1248,8 +1278,6 @@ func readDataFromNormalBucket(fdb *FDB, paths []string) (*gjson.Result, error) { } func getFileDataUsingInFileIndex(filePath string, bucket *Bucket, requestedFileType string, inFileIndexQuery []string) (string, error) { - loggermdl.LogError("Infile index query - ", inFileIndexQuery) - loggermdl.LogError("requestedFileType - ", requestedFileType) _, ok := bucket.InFileIndexMap[requestedFileType] if !ok { return "", errormdl.Wrap("infileIndex for specified fileType not found: " + requestedFileType) @@ -1258,6 +1286,7 @@ func getFileDataUsingInFileIndex(filePath string, bucket *Bucket, requestedFileT return "", errormdl.Wrap("file not found at:" + filePath) } f, err := filemdl.Open(filePath) + defer f.Close() if err != nil { loggermdl.LogError("err while opening file: ", filePath, err) return "", err @@ -1267,7 +1296,6 @@ func getFileDataUsingInFileIndex(filePath string, bucket *Bucket, requestedFileT loggermdl.LogError("index data not found: ", filePath, err) return "", err } - // loggermdl.LogDebug("indexDataString", indexDataString) indexData := gjson.Parse(indexDataString) indexRows := indexData.Get(`#[fileType==` + requestedFileType + `]#`) for i := 0; i < len(inFileIndexQuery); i++ { @@ -1282,9 +1310,7 @@ func getFileDataUsingInFileIndex(filePath string, bucket *Bucket, requestedFileT indexRows.ForEach(func(key, indexRow gjson.Result) bool { // read files startOffSet := indexRow.Get("startOffset").Int() - loggermdl.LogError("startOffSet - ", startOffSet) dataSize := indexRow.Get("dataSize").Int() - loggermdl.LogError("dataSize - ", dataSize) if startOffSet == 0 || dataSize == 0 { return true } @@ -1315,7 +1341,14 @@ func deletDataFromInfileIndex(filePath string, fileType string, infileIndexQueri // get update index records // save updated index // update index size header + // lock, err := filemdl.AcquireFileLock(filePath) + // defer filemdl.ReleaseFileLock(lock) + // if err != nil { + // return err + // } + f, err := filemdl.OpenFile(filePath, os.O_RDWR|os.O_SYNC, 0777) + defer f.Close() if err != nil { return err } @@ -1331,7 +1364,6 @@ func deletDataFromInfileIndex(filePath string, fileType string, infileIndexQueri for _, query := range infileIndexQueries { indexRecordsToDelete = indexRecordsToDelete.Get(query + "#") } - loggermdl.LogError(indexRecordsToDelete) indexRowsToDelete := indexRecordsToDelete.Array() if len(indexRowsToDelete) == 0 { return ErrNoDataFound @@ -1416,6 +1448,8 @@ func ReindexOnSpecialBucket(fdbName string, indexID string) error { } pathToStartWalk = filepath.Join(pathToStartWalk, bucket.BucketNameQuery) } + bucketID := index.BucketSequence[len(index.BucketSequence)-1] + bucket := fdb.buckets[bucketID] indexDataMap := make(map[string]string) if !filemdl.FileAvailabilityCheck(pathToStartWalk) { return errormdl.Wrap("invalid path: " + pathToStartWalk) @@ -1428,32 +1462,15 @@ func ReindexOnSpecialBucket(fdbName string, indexID string) error { } if !info.IsDir() { - f, err := filemdl.OpenFile(filePath, os.O_RDWR|os.O_SYNC, 0777) - if err != nil { - loggermdl.LogError(err) - return nil - } - infileIndexString, err := getInFileIndexData(f) - if err != nil { - loggermdl.LogError(err) - return nil - } - // loggermdl.LogDebug("infileIndexString", infileIndexString) - infileIndexData := gjson.Parse(infileIndexString) - fdbIndexData := infileIndexData.Get(`#[fileType=="FDBIndex"]`) - if fdbIndexData.String() == "" { - return nil - } - startOffset := fdbIndexData.Get("startOffset").Int() - dataSize := fdbIndexData.Get("dataSize").Int() - // dataByte, err := filemdl.ReadFileFromOffset(f, startOffset, dataSize) - dataByte, err := getFileDataFromFile(f, startOffset, dataSize) + result, err := getFileDataUsingInFileIndex(filePath, bucket, FileTypeFDBIndex, []string{`#[fileType=="FDBIndex"]`}) if err != nil { loggermdl.LogError(err) return nil } + data := gjson.Parse(result) + data = data.Get("0") pathToSave := strings.TrimPrefix(filePath, fdb.DBPath+string(filepath.Separator)) - valString, _ := sjson.Delete(string(dataByte), "fileType") + valString, _ := sjson.Delete(data.String(), "fileType") // val= gjson.Parse(valString) indexDataMap[pathToSave] = strings.TrimSpace(valString) } @@ -1641,7 +1658,9 @@ func deleteDataFromNormalBucket(fdbPath string, index *Index, paths []string) (r // } // } // delete file - err = filemdl.DeleteFile(path) + + err = deleteNormalFile(path) + if err != nil { errList = append(errList, errormdl.Wrap("unable to delete file : "+path+err.Error())) continue @@ -1660,6 +1679,15 @@ func deleteDataFromNormalBucket(fdbPath string, index *Index, paths []string) (r return } +func deleteNormalFile(filePath string) error { + // lock, ferr := filemdl.AcquireFileLock(filePath) + // defer filemdl.ReleaseFileLock(lock) + + // if ferr != nil { + // return errormdl.Wrap("unable to delete file : " + filePath + ferr.Error()) + // } + return filemdl.DeleteFile(filePath) +} func deleteDataFromInFileIndexBucket(fdbPath string, bucket *Bucket, paths []string, rs *gjson.Result, infileIndexQueries []string) (recordsDeleted int, errList []error) { fileType := rs.Get("fileType").String() @@ -1675,6 +1703,7 @@ func deleteDataFromInFileIndexBucket(fdbPath string, bucket *Bucket, paths []str noDataFoundCnt := 0 for _, filePath := range paths { path := filepath.Join(fdbPath, filePath) + err := deletDataFromInfileIndex(path, fileType, infileIndexQueries) if err == ErrNoDataFound { noDataFoundCnt++ @@ -1718,7 +1747,6 @@ func SaveDataInFDB(dbName, indexID string, rs *gjson.Result) error { // check is infile index present // if yes - loggermdl.LogError("Bucket type - ", *bucket) switch bucket.BucketType { case BucketTypeSimple: err := saveDataInNormalBucket(filePath, rs) @@ -1759,7 +1787,6 @@ func SaveDataInFDB(dbName, indexID string, rs *gjson.Result) error { } return nil }) - // TODO: Currently index data is overwritten by new data. err = UpdateIndexLazyObjectInCache(indexID, index) if err != nil { @@ -1790,7 +1817,6 @@ func ReadDataFromFDB(dbName, indexID string, rs *gjson.Result, query []string, i // loggermdl.LogDebug(query) index.indexObj.View(func(tx *buntdb.Tx) error { return tx.Ascend(index.IndexID, func(key, value string) bool { - loggermdl.LogError("Key - ", key, " Value- ", value) rsJSON := gjson.Parse("[" + value + "]") for i := 0; i < len(query); i++ { loggermdl.LogError(query[i]) @@ -1807,17 +1833,14 @@ func ReadDataFromFDB(dbName, indexID string, rs *gjson.Result, query []string, i }) }) resultToReturn := gjson.Result{} - loggermdl.LogError("File paths- ", filePaths) if len(filePaths) == 0 { loggermdl.LogError("files not found") return &resultToReturn, nil } - loggermdl.LogError("Bucket type - ", bucket.BucketType) resultArray := "[]" if bucket.BucketType == BucketTypePack { for _, path := range filePaths { filePath := filepath.Join(fdb.DBPath, path) - loggermdl.LogError("file path - ", filePath) requestedFileType := rs.Get("fileType").String() if len(requestedFileType) == 0 { return &resultToReturn, errormdl.Wrap("please specify fileType") @@ -1829,13 +1852,11 @@ func ReadDataFromFDB(dbName, indexID string, rs *gjson.Result, query []string, i loggermdl.LogError(err) return &resultToReturn, err } - loggermdl.LogError("Result - ", result) // loggermdl.LogDebug("result", result) for _, val := range gjson.Parse(result).Array() { resultArray, _ = sjson.Set(resultArray, "-1", val.Value()) } } - loggermdl.LogError("resultArray", resultArray) resultToReturn = gjson.Parse(resultArray) return &resultToReturn, nil } else if bucket.BucketType == BucketTypeSimple { @@ -1878,7 +1899,6 @@ func UpdateDataInFDB(dbName, indexID string, rs *gjson.Result, query []string, i // loggermdl.LogDebug(query) index.indexObj.View(func(tx *buntdb.Tx) error { return tx.Ascend(index.IndexID, func(key, value string) bool { - loggermdl.LogError("key = ", key, " val = ", value) rsJSON := gjson.Parse("[" + value + "]") for i := 0; i < len(query); i++ { if query[i] == GetAll { @@ -1894,7 +1914,6 @@ func UpdateDataInFDB(dbName, indexID string, rs *gjson.Result, query []string, i return true }) }) - loggermdl.LogError("indexKeyValMap", indexKeyValMap) // loggermdl.LogDebug("filePaths", filePaths) resultToReturn := gjson.Result{} @@ -1919,9 +1938,7 @@ func UpdateDataInFDB(dbName, indexID string, rs *gjson.Result, query []string, i loggermdl.LogError("no data found to update") return &resultToReturn, []error{ErrNoDataFound} } - loggermdl.LogError("dataUpdatedAtFilePaths", dataUpdatedAtFilePaths) // path = path + ".json" - loggermdl.LogError("indexKeyValMap", indexKeyValMap) index.indexObj.Update(func(tx *buntdb.Tx) error { // optimization : if data not changed then dont update for _, path := range dataUpdatedAtFilePaths { @@ -1962,7 +1979,6 @@ func DeleteDataFromFDB(dbName string, indexID string, rs *gjson.Result, queries loggermdl.LogError("index not found: " + indexID) return recordsDeleted, []error{errormdl.Wrap("index not found: " + indexID)} } - filePaths := []string{} // loggermdl.LogDebug(query) index.indexObj.View(func(tx *buntdb.Tx) error { diff --git a/dalmdl/corefdb/bucket_test.go b/dalmdl/corefdb/bucket_test.go index 538e2ac9015b2aedfc437c74c47c257b657b1df1..52a51257967424e8175990fd5f85d691c659abbf 100644 --- a/dalmdl/corefdb/bucket_test.go +++ b/dalmdl/corefdb/bucket_test.go @@ -18,12 +18,13 @@ var bucketInstance *Bucket var indexInstance *Index func init() { - Init(true, false, "") + Init(true, false, false, "") db, err := CreateFDBInstance("/home/vivekn/fdb_data", "myfdb", false) if err != nil { log.Fatal("CreateFDBInstance = ", err) } + dbInstance = db // step 1: create bucket bucket := db.GetNewBucket("Candidates", false, &Bucket{}) diff --git a/dalmdl/corefdb/buntdbmdl.go b/dalmdl/corefdb/buntdbmdl.go index e035dc2025b49a3d0a5659ec74ad86a2b2734d38..9d0dc329fb7cf4c1383b9dc15c41e129cc71dff8 100644 --- a/dalmdl/corefdb/buntdbmdl.go +++ b/dalmdl/corefdb/buntdbmdl.go @@ -278,14 +278,11 @@ func GetLazyCallBackFunc(funcName string) (lazywriter.SaveDataFn, error) { } var lazyCallBackFnAppendBucket lazywriter.SaveDataFn = func(bucketId string, data *lazywriter.LazyCacheObject) { - loggermdl.LogError("Saving ", bucketId, " with data ", data) dataInLazyMemory, ok := data.InterfaceData.(string) if !ok { return } _, fileName := filepath.Split(data.FileName) - // loggermdl.LogError("dir ", dir, "name", name) - loggermdl.LogError("filepath to write append data lazily", data.Identifier) fileName = filepath.Join(data.FileName, fileName) _, _, err := filemdl.AppendDataToFile(fileName, []byte(dataInLazyMemory+"\r\n"), true) if err != nil { diff --git a/filemdl/filemdl.go b/filemdl/filemdl.go index 7d9267906a6220fc80f62bf5b76b1f0946b55a0e..c15031288a955bdc289700d5158e28e572c78d2f 100644 --- a/filemdl/filemdl.go +++ b/filemdl/filemdl.go @@ -9,6 +9,8 @@ import ( "strings" "time" + "github.com/juju/fslock" + "corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/cachemdl" "corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/loggermdl" @@ -147,10 +149,12 @@ func AppendFile(filename string, text string) (int, error) { //DeleteFile deletes provided file path func DeleteFile(filePath string) error { + path, linkErr := os.Readlink(filePath) if errormdl.CheckErr1(linkErr) == nil { filePath = path } + err := os.Remove(filePath) if errormdl.CheckErr(err) != nil { loggermdl.LogError(err) @@ -754,3 +758,19 @@ func WriteFileAtOffset(f *os.File, startOffset int64, bytesToWrite []byte) (int6 return int64(n), nil } + +// AcquireFileLock - +func AcquireFileLock(filePath string) (*fslock.Lock, error) { + lock := fslock.New(filePath) + lockErr := lock.Lock() + if lockErr != nil { + loggermdl.LogError("failed to acquire lock > " + lockErr.Error()) + return nil, errormdl.Wrap("failed to acquire lock > " + lockErr.Error()) + } + return lock, nil +} + +// ReleaseFileLock +func ReleaseFileLock(lock *fslock.Lock) error { + return lock.Unlock() +} diff --git a/routebuildermdl/serviceCachemdl.go b/routebuildermdl/serviceCachemdl.go index 647889fbf21851ebc39321db2acbe65e062643d3..f076c94cd25eb73c2007b8763303c866da077e83 100644 --- a/routebuildermdl/serviceCachemdl.go +++ b/routebuildermdl/serviceCachemdl.go @@ -175,8 +175,8 @@ func CallService(name string, rs *gjson.Result, isRestricted bool, isRoleBased b serviceCache := tmpSvcCache res, _, err := serviceCache.Service(rs, p) if err != nil { - loggermdl.LogError("Service execution failed for ", name) - return nil, errormdl.Wrap("Service execution failed for " + name) + loggermdl.LogError("Service execution failed for ", name, " : ", err) + return nil, err } return res, nil } diff --git a/workerpoolmdl/workerpoolmdlv2.go b/workerpoolmdl/workerpoolmdlv2.go new file mode 100644 index 0000000000000000000000000000000000000000..50369d9c3ed775b47e55895906436a1704693971 --- /dev/null +++ b/workerpoolmdl/workerpoolmdlv2.go @@ -0,0 +1,112 @@ +package workerpoolmdl + +import ( + "context" +) + +// PoolWithContext is a worker group that runs a number of tasks at a +// configured concurrency. +type PoolWithContext struct { + Tasks []*TaskWithContext + concurrency int + tasksChan chan *TaskWithContext + CancelHandler *cancelHandler + IsPoolCanceled bool +} + +type cancelHandler struct { + Ctx context.Context + CancelFunc context.CancelFunc +} + +// NewPoolWithContext initializes a new pool with the given tasks and +// at the given concurrency. +func NewPoolWithContext(tasks []*TaskWithContext, concurrency int) *PoolWithContext { + cntx, cancelFunction := context.WithCancel(context.Background()) + + obj := cancelHandler{} + + obj.Ctx = cntx + obj.CancelFunc = cancelFunction + + return &PoolWithContext{ + Tasks: tasks, + concurrency: concurrency, + tasksChan: make(chan *TaskWithContext), + CancelHandler: &obj, + } +} + +// Run runs all work within the pool +func (p *PoolWithContext) Run() { + + for i := 0; i < p.concurrency; i++ { + go p.work() + } + + for _, task := range p.Tasks { + + if p.IsPoolCanceled { + return + } else { + p.tasksChan <- task + } + + } + + close(p.tasksChan) + +} + +// The work loop for any single goroutine. +func (p *PoolWithContext) work() { + for task := range p.tasksChan { + task.Run(p) + } +} + +// TaskWithContext encapsulates a work item that should go in a work +// pool. +type TaskWithContext struct { + // Err holds an error that occurred during a task. Its + // result is only meaningful after Run has been called + // for the pool that holds it. + Err error + + Data interface{} + + f func(data interface{}) error +} + +// NewTaskWithContext initializes a new task based on a given work +// function. +func NewTaskWithContext(d interface{}, f func(data interface{}) error) *TaskWithContext { + return &TaskWithContext{Data: d, f: f} +} + +// Run runs a Task and does appropriate accounting via a +func (t *TaskWithContext) Run(p *PoolWithContext) { + + for { + select { + + case <-p.CancelHandler.Ctx.Done(): + return + + default: + + t.Err = t.f(t.Data) + + return + } + } + +} + +//Cancel all tasks +func (p *PoolWithContext) Cancel() { + if p != nil { + p.CancelHandler.CancelFunc() + p.IsPoolCanceled = true + } +} diff --git a/workerpoolmdl/workerpoolmdlv2_test.go b/workerpoolmdl/workerpoolmdlv2_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f6ea611effa3c2f9537e7b029edf92ad20ce22d9 --- /dev/null +++ b/workerpoolmdl/workerpoolmdlv2_test.go @@ -0,0 +1,71 @@ +package workerpoolmdl + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +var count int +var countMutex = &sync.Mutex{} +var cancelTriggered bool + +//TestWorkerPoolWithContext - test for cancel trigger if count >= 500 +func TestWorkerPoolWithContext(t *testing.T) { + count = 0 + + tasks := []*TaskWithContext{} + + for index := 0; index < 1000; index++ { + tasks = append(tasks, NewTaskWithContext(index, incrementCount)) + } + + pool := NewPoolWithContext(tasks, 10) + + ticker := time.NewTicker(1 * time.Millisecond) + + go func() { + for range ticker.C { + if count > 500 { + fmt.Println("cancelling tasks...") + pool.Cancel() + return + } + } + }() + + pool.Run() + + assert.GreaterOrEqual(t, count, 500, "Count be greater than or equals to 500") + +} + +//TestWorkerpoolWithoutCancel - test without cancel trigger +func TestWorkerpoolWithoutCancel(t *testing.T) { + count = 0 + + tasks := []*TaskWithContext{} + + for index := 0; index < 1000; index++ { + tasks = append(tasks, NewTaskWithContext(index, incrementCount)) + } + + pool := NewPoolWithContext(tasks, 10) + + pool.Run() + + assert.Equal(t, count, 1000, "Count should be equals to 1000") +} + +//incrementCount- increment count by 1 +func incrementCount(data interface{}) error { + + countMutex.Lock() + count++ + countMutex.Unlock() + + return nil +}