Commit ff2d11ad authored by Ajit Jagtap's avatar Ajit Jagtap
Browse files

Merge branch 'feature_lazy_append' into 'devbranch'

feature of lazy append

See merge request !116
parents 74df42fc e82d416b
Branches
Tags
2 merge requests!11915 Oct MEP Merge Dev to Stg,!116feature of lazy append
Showing with 207 additions and 28 deletions
......@@ -7,6 +7,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/lazywriter"
......@@ -280,8 +281,8 @@ func updateIndexJSON(index *Index, existingData string, rs *gjson.Result) (strin
loggermdl.LogError("Existing json - ", json)
var err error
for _, indexField := range index.IndexFields {
loggermdl.LogError("indexField - ", indexField)
loggermdl.LogError("rs.Get(indexField.Query).Value() - ", rs.Get(indexField.Query).Value())
// loggermdl.LogError("indexField - ", indexField)
// loggermdl.LogError("rs.Get(indexField.Query).Value() - ", rs.Get(indexField.Query).Value())
if rs.Get(indexField.Query).Value() == nil {
continue
}
......@@ -310,7 +311,7 @@ func UpdateIndexLazyObjectInCache(indexID string, indexData *Index) error {
// lazy cache must be present for provided indexID
lazyObj, ok := IndexLazyObjHolder.Get(indexID)
if !ok {
loggermdl.LogError(IndexLazyObjHolder.GetItems())
// loggermdl.LogError(IndexLazyObjHolder.GetItems())
loggermdl.LogError("index not found in lazy writer cache")
return errormdl.Wrap("index not found in lazy writer cache")
}
......@@ -333,6 +334,81 @@ func UpdateIndexLazyObjectInCache(indexID string, indexData *Index) error {
return nil
}
func getUpdatedFilePath(filePath string, data gjson.Result) (string, error) {
updatedFilePath := ""
filePathSplit := strings.Split(filePath, string(filepath.Separator))
for _, pathChunk := range filePathSplit {
if strings.Index(pathChunk, "$$") != -1 {
indexFieldQuery := strings.TrimPrefix(pathChunk, "$$")
value := data.Get(indexFieldQuery)
if value.Exists() {
updatedFilePath = filepath.Join(updatedFilePath, value.String())
} else {
return "", errormdl.Wrap("index filed value not provided: " + indexFieldQuery)
}
} else {
updatedFilePath = filepath.Join(updatedFilePath, pathChunk)
}
}
return updatedFilePath, nil
}
// AppendDataInLazyObjectInCache - updates index data in lay writer cache
func AppendDataInLazyObjectInCache(bucketID string, data gjson.Result) error {
// lazy cache must be present for provided bucketID
// loggermdl.LogError("AppendDataInLazyObjectInCache")
mutex := sync.Mutex{}
mutex.Lock()
lazyObj, ok := AppendLazyObjHolder.Get(bucketID)
if !ok {
loggermdl.LogError(AppendLazyObjHolder.GetItems())
return errormdl.Wrap("lazyObject not found in lazywriter cache")
}
bucketLazyData, ok := lazyObj.(lazywriter.LazyCacheObject)
if !ok {
loggermdl.LogError("interface type is not lazywriter.LazyCacheObject")
return errormdl.Wrap("interface type is not lazywriter.LazyCacheObject")
}
filePath := bucketLazyData.FileName
if filePath == "" {
loggermdl.LogError("filepath not found in lazywriter")
return errormdl.Wrap("filepath not found in lazywriter")
}
updatedFilePath, err := getUpdatedFilePath(filePath, data)
if err != nil {
loggermdl.LogError(err)
return err
}
bucketLazyData.FileName = updatedFilePath
updatedData := ""
if bucketLazyData.InterfaceData == nil {
updatedData = data.String()
} else {
previousData, ok := bucketLazyData.InterfaceData.(string)
if !ok {
return errormdl.Wrap("previous lazy data is not of type string")
}
updatedData = previousData + string(lineBreak) + data.String()
}
// bucketLazyData.GJSONData = indexData
bucketLazyData.InterfaceData = updatedData
if ok := AppendMaster.SaveOrUpdateDataInCache(bucketLazyData); !ok {
loggermdl.LogError("failed to update index data in lazy cache")
return errormdl.Wrap("failed to update index data in lazy cache")
}
AppendLazyObjHolder.SetNoExpiration(bucketID, bucketLazyData)
mutex.Unlock()
loggermdl.LogError(bucketID, " updated with - ", bucketLazyData)
return nil
}
// searchDataInPack - searchDataInPack
func searchDataInPack(paths []string, searchTillPath string, restoreFileFromPack bool) (*gjson.Result, error) {
dirPath, _ := filepath.Split(paths[0])
......@@ -505,19 +581,19 @@ func saveDataInPackBucket(fdb *FDB, filePath string, index *Index, rs *gjson.Res
return addDataToSpecialBucketFile(filePath, bucket, rs)
}
func saveDataInAppendBucket(filePath string, rs *gjson.Result) error {
func saveDataInAppendBucket(filePath string, rs *gjson.Result, bucket *Bucket) error {
// loggermdl.LogDebug("is singlepack", bucket.IsSinglePackType)
// fullPath := filepath.Join(fdb.DBPath, filePath)
// bucketPath, _ := filepath.Split(fullPath)
// fileName := bucketPath + bucket.BucketNameQuery
_, _, err := filemdl.AppendDataToFile(filePath, []byte(rs.String()+lineBreak), true)
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
return errormdl.CheckErr(err)
// loggermdl.LogError("isLazyWriterEnabled", isLazyWriterEnabled)
if isLazyWriterEnabled {
err := AppendDataInLazyObjectInCache(bucket.BucketID, *rs)
return err
}
return nil
// else
_, _, err := filemdl.AppendDataToFile(filePath, []byte(rs.String()+lineBreak), true)
return err
}
func addFDBIndexEntryFile(filePath string, bucket *Bucket, infileIndexFields []IndexField, rs *gjson.Result) error {
// adding FDBIndex as infileIndex
FDBIndexInFileIndexMap := InFileIndex{
......@@ -1078,7 +1154,7 @@ func encryptData(data, key []byte) (dataOut []byte, err error) {
}
func decryptData(data, key []byte) (dataOut []byte, err error) {
loggermdl.LogDebug("data", string(data))
// loggermdl.LogDebug("data", string(data))
dataOut, err = securitymdl.AESDecrypt(data, key)
if errormdl.CheckErr1(err) != nil {
......@@ -1145,7 +1221,7 @@ func getFileDataFromFile(f *os.File, startOffset, dataSize int64) ([]byte, error
}
return dataByte, nil
}
loggermdl.LogDebug("dataByte", string(dataByte))
// loggermdl.LogDebug("dataByte", string(dataByte))
return dataByte, err
}
......@@ -1185,7 +1261,7 @@ func getFileDataUsingInFileIndex(filePath string, bucket *Bucket, requestedFileT
loggermdl.LogError("index data not found: ", filePath, err)
return "", err
}
loggermdl.LogDebug("indexDataString", indexDataString)
// loggermdl.LogDebug("indexDataString", indexDataString)
indexData := gjson.Parse(indexDataString)
indexRows := indexData.Get(`#[fileType==` + requestedFileType + `]#`)
for i := 0; i < len(inFileIndexQuery); i++ {
......@@ -1641,14 +1717,13 @@ func SaveDataInFDB(dbName, indexID string, rs *gjson.Result) error {
return err
}
case BucketTypePack:
err := saveDataInPackBucket(fdb, filePath, index, rs)
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
return errormdl.CheckErr(err)
}
case BucketTypeAppend:
err := saveDataInAppendBucket(filePath, rs)
err := saveDataInAppendBucket(filePath, rs, bucket)
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
return errormdl.CheckErr(err)
......@@ -1874,7 +1949,7 @@ func DeleteDataFromFDB(dbName string, indexID string, rs *gjson.Result, queries
// loggermdl.LogDebug(query)
index.indexObj.View(func(tx *buntdb.Tx) error {
return tx.Ascend(index.IndexID, func(key, value string) bool {
loggermdl.LogDebug(key, value)
// loggermdl.LogDebug(key, value)
rsJSON := gjson.Parse("[" + value + "]")
for i := 0; i < len(queries); i++ {
rsJSON = rsJSON.Get(queries[i] + "#")
......
......@@ -10,6 +10,7 @@ import (
"path/filepath"
"sync"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/lazywriter"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/errormdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/cachemdl"
......@@ -22,6 +23,10 @@ import (
const (
// INDEXFOLDER -INDEXFOLDER
INDEXFOLDER = "index"
// LazyCallBackFnAppendBucket - LazyCallBackFnAppendBucket
LazyCallBackFnAppendBucket = "LazyWriterAppendBucketCallBackFn"
// LazyCallBackFnSaveIndex - LazyCallBackFnSaveIndex
LazyCallBackFnSaveIndex = "LazyWriterCallBackFnAppendBucketSaveIndex"
)
var databases cachemdl.FastCacheHelper
......@@ -29,6 +34,7 @@ var defaultDB string
var isSecurityEnabled = false
var isCompressionEnabled = false
var isLazyWriterEnabled = false
var defaultSecurityKey = []byte{}
func init() {
......@@ -36,10 +42,11 @@ func init() {
}
// Init - initializes bundbmdl
func Init(isSecurityRequired, isCompressionRequired bool, securityKey string) {
func Init(isSecurityRequired, isCompressionRequired, isLazyWriterEnable bool, securityKey string) {
defaultSecurityKey = []byte(securityKey)
isSecurityEnabled = isSecurityRequired
isCompressionEnabled = isCompressionRequired
isLazyWriterEnabled = isLazyWriterEnable
}
// FDB - FDB
......@@ -257,5 +264,49 @@ func getFDBIndexData(path string, index *Index, dbPath string) (map[string]strin
return indexDataMap, ferr
}
return indexDataMap, nil
}
// GetLazyCallBackFunc - return callback functions for lazywriter
func GetLazyCallBackFunc(funcName string) (lazywriter.SaveDataFn, error) {
if funcName == LazyCallBackFnAppendBucket {
return lazyCallBackFnAppendBucket, nil
} else if funcName == LazyCallBackFnSaveIndex {
return lazyCallBackFnSaveIndex, nil
} else {
return nil, errormdl.Wrap("func not found for: " + funcName)
}
}
var lazyCallBackFnAppendBucket lazywriter.SaveDataFn = func(bucketId string, data *lazywriter.LazyCacheObject) {
loggermdl.LogError("Saving ", bucketId, " with data ", data)
dataInLazyMemory, ok := data.InterfaceData.(string)
if !ok {
return
}
_, fileName := filepath.Split(data.FileName)
// loggermdl.LogError("dir ", dir, "name", name)
loggermdl.LogError("filepath to write append data lazily", data.Identifier)
fileName = filepath.Join(data.FileName, fileName)
_, _, err := filemdl.AppendDataToFile(fileName, []byte(dataInLazyMemory+"\r\n"), true)
if err != nil {
loggermdl.LogError(err)
return
}
data.InterfaceData = nil
err = AppendMaster.ClearLazyObjInterfaceData(data.Identifier)
AppendLazyObjHolder.Set(data.Identifier, *data)
return
}
var lazyCallBackFnSaveIndex lazywriter.SaveDataFn = func(indexID string, data *lazywriter.LazyCacheObject) {
indexData, ok := data.InterfaceData.(*Index)
if !ok {
return
}
err := LogFDBIndexFile(data.FileName, indexData)
if err != nil {
loggermdl.LogError(err)
return
}
}
......@@ -12,26 +12,41 @@ import (
// IndexMaster - Holds master object for index files
var IndexMaster lazywriter.LazyFDBHelper
// AppendMaster - Holds master object for Append files
var AppendMaster lazywriter.LazyFDBHelper
// IndexLazyObjHolder - Holds lazy cache objects for indexes, indexID as key and lazy Object as value
var IndexLazyObjHolder cachemdl.FastCacheHelper
// AppendLazyObjHolder - Holds lazy cache objects for Append files, filepath as key and lazy Object as value
var AppendLazyObjHolder cachemdl.FastCacheHelper
const (
maxObjetsCnt int = 100000
maxRetryCnt int = 5
intervalTime int = 5
sleepTime int = 5
maxObjetsCnt int = 100000
maxRetryCnt int = 5
intervalTimeForLazyIndexWrite int = 60
intervalTimeForLazyAppendWrite int = 60
sleepTime int = 5
// NoExpiration -
NoExpiration time.Duration = -1
lazyIndexProcessName string = "lazyIndex"
lazyIndexProcessName string = "lazyIndex"
lazyAppendProcessName string = "lazyAppend"
)
func init() {
// loggermdl.LogError("init lazyIndex")
// start process for lazy index file operations
IndexMaster.StartProcess(maxObjetsCnt, lazyIndexProcessName, intervalTime, sleepTime, maxRetryCnt, false)
IndexMaster.StartProcess(maxObjetsCnt, lazyIndexProcessName, intervalTimeForLazyIndexWrite, sleepTime, maxRetryCnt, false)
// loggermdl.LogError("Lazy writer process started")
// initialize a map to hold lazy writer objects for the index file data
IndexLazyObjHolder.Setup(10000, NoExpiration, NoExpiration)
// start process for lazy Append file operations
AppendMaster.StartProcess(maxObjetsCnt, lazyAppendProcessName, intervalTimeForLazyAppendWrite, sleepTime, maxRetryCnt, false)
// initialize a map to hold lazy writer objects for the append file data
AppendLazyObjHolder.Setup(10000, NoExpiration, NoExpiration)
// loggermdl.LogError("cache setup done")
}
......@@ -4,6 +4,8 @@ import (
"sync"
"time"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/errormdl"
"github.com/tidwall/gjson"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/fdb"
......@@ -70,11 +72,12 @@ type LazyCacheObject struct {
}
// SaveDataFn - This is an user defined callback function executed to presist data. If not provided default save function will be executed.
type SaveDataFn func(key string, value LazyCacheObject)
type SaveDataFn func(key string, value *LazyCacheObject)
// StartProcess
func (lfd *LazyFDBHelper) StartProcess(objectCount int, taskName string,
intervalTime int, sleepTime int, maxNumberOfRetry int, isDebugMode bool) {
// loggermdl.LogDebug("StartProcess")
//This is Default value that is why used as
lfd.INTERVAL_TIME_SEC = 5
lfd.RETRY_SLEEP_TIME_SEC = 1
......@@ -122,6 +125,41 @@ func (lfd *LazyFDBHelper) StartProcess(objectCount int, taskName string,
lazyMutex.Unlock()
}
// ClearLazyObjInterfaceData -
func (lfd *LazyFDBHelper) ClearLazyObjInterfaceData(identifier string) error {
for item := range lfd.gc.Items() {
//TODO: catch errors
cachedObject, ok := lfd.gc.Get(item)
if !ok {
return errormdl.Wrap("error occured while getting " + item + " from gcache")
}
cachedObjectActual, _ := cachedObject.(LazyCacheObject)
if cachedObjectActual.Identifier == identifier {
if cachedObjectActual.ChangeCount > 0 {
cachedObjectActual.IsLocked = true
// saveDataToFDB(cachedObjectActual.FileName, cachedObjectActual.InterfaceData, cachedObjectActual.GJSONData)
cachedObjectActual.ChangeCount = 0
cachedObjectActual.IsLocked = false
lazyMutex.Lock()
lfd.DISK_WRITE_COUNT++
lazyMutex.Unlock()
// if isDebugMode {
// lazyCacheObject := PerformanceAnalyser[cachedObjectActual.FileName]
// lazyCacheObject.DISK_WRITE_COUNT++
// PerformanceAnalyser[cachedObjectActual.FileName] = lazyCacheObject
// }
}
lazyMutex.Lock()
lfd.gc.Delete(cachedObjectActual.FileName)
lazyMutex.Unlock()
}
}
return nil
}
// saveObjectsToFdb is private method which will be called to save object on hdd
// this will fetch all objects in store and save it one by one
// right now it is not using any threading
......@@ -144,7 +182,7 @@ func (lfd *LazyFDBHelper) saveObjectsToFdb() {
saveDataToFDB(cacheObjActual.FileName, cacheObjActual.InterfaceData, cacheObjActual.GJSONData)
} else {
loggermdl.LogError("Executing custom function")
cacheObjActual.SaveFn(item, cacheObjActual)
cacheObjActual.SaveFn(item, &cacheObjActual)
}
cacheObjActual.ChangeCount = 0
cacheObjActual.IsLocked = false
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment