Commit 7276215f authored by Sandeep S. Shewalkar's avatar Sandeep S. Shewalkar
Browse files

Lazywriter changes

1. ForceSaveDataToFDB, StartProcessForAppend, appendObjectsToFdb, appendDataToFDB these methods added newly
parent 70986ea7
Branches
1 merge request!24Lazywriter new methods sss
Showing with 156 additions and 5 deletions
package dalhelper
import (
"encoding/json"
"sync"
"time"
......@@ -71,8 +73,9 @@ func (lfd *LazyFDPHelper) StartProcess(objectCount int, taskName string,
}
// check parameter is valid - if not keep default else set new value
if sleepTime > 0 {
lfd.INTERVAL_TIME_SEC = sleepTime
lfd.RETRY_SLEEP_TIME_SEC = time.Duration(sleepTime) * time.Millisecond
}
if maxNumberOfRetry <= 0 {
......@@ -124,8 +127,8 @@ func (lfd *LazyFDPHelper) saveObjectsToFdb() {
cacheObjActual.IsLocked = true
saveDataToFDB(cacheObjActual.FileName, cacheObjActual.InterfaceData)
cacheObjActual.ChangeCount = 0
lfd.gc.Set(cacheObjActual.FileName, cacheObjActual)
cacheObjActual.IsLocked = false
lfd.gc.Set(cacheObjActual.FileName, cacheObjActual)
lazyMutex.Lock()
lfd.MEMORY_WRITE_COUNT++
......@@ -150,7 +153,6 @@ func (lfd *LazyFDPHelper) SaveOrUpdateDataInCache(newObject LazyCacheObject) {
retryCount := 0
retrylabel:
// Get prev object and then save new one
//TODO: catch errors
dataFromGC, getError := lfd.gc.Get(newObject.FileName)
if getError != nil || dataFromGC == nil {
......@@ -162,7 +164,7 @@ retrylabel:
if oldObject.IsLocked && retryCount < lfd.MAX_NUMBER_OF_RETRY {
retryCount++
// Sleep for few sec so that other thread will release lock
time.Sleep(time.Millisecond * lfd.RETRY_SLEEP_TIME_SEC)
time.Sleep(lfd.RETRY_SLEEP_TIME_SEC)
goto retrylabel
}
newObject.ChangeCount = oldObject.ChangeCount + 1
......@@ -190,7 +192,6 @@ func saveDataToFDB(filePath string, objectData interface{}) {
if saveError != nil {
logginghelper.LogError("error occured while saving data ", saveError)
}
}
// use it when req.
......@@ -300,3 +301,153 @@ func (lfd *LazyFDPHelper) PurgeCache() {
lfd.gc.Purge()
lazyMutex.Unlock()
}
// ForceSaveDataToFDB this method saves data to fdb forcefully and put the object inside cache
func (lfd *LazyFDPHelper) ForceSaveDataToFDB(identifier string) {
// Fetch All Rows and then save into db
cachedObjectList := lfd.gc.GetALL()
for item := range cachedObjectList {
//TODO: catch errors
cachedObject, getError := lfd.gc.Get(item)
if getError != nil {
logginghelper.LogError("error occured while getting ", item, " from gcache")
}
cachedObjectActual, _ := cachedObject.(LazyCacheObject)
if cachedObjectActual.Identifier == identifier {
cachedObjectActual.IsLocked = true
saveDataToFDB(cachedObjectActual.FileName, cachedObjectActual.InterfaceData)
cachedObjectActual.ChangeCount = 0
cachedObjectActual.IsLocked = false
lazyMutex.Lock()
lfd.DISK_WRITE_COUNT++
lazyMutex.Unlock()
// get data from data base and it inside cache
dataFromFdb, getError := GetDataFromFDB(cachedObjectActual.FileName)
if getError != nil {
logginghelper.LogError("error occured while fetching data from database for file : ", cachedObjectActual.FileName, getError)
lfd.gc.Set(cachedObjectActual.FileName, cachedObjectActual)
} else {
var interfaceData interface{}
unmarshalError := json.Unmarshal(dataFromFdb, &interfaceData)
if unmarshalError != nil {
logginghelper.LogError("error occured while unmarshalling data of file : ", cachedObjectActual.FileName, unmarshalError)
lfd.gc.Set(cachedObjectActual.FileName, cachedObjectActual)
} else {
cachedObjectActual.InterfaceData = interfaceData
lfd.gc.Set(cachedObjectActual.FileName, cachedObjectActual)
}
}
}
}
}
// =================================== > Lazywriter for appending data to file < ============================================
// StartProcessForAppend StartProcessForAppend
func (lfd *LazyFDPHelper) StartProcessForAppend(objectCount int, taskName string,
intervalTime int, sleepTime int, maxNumberOfRetry int, isDebugMode bool) {
//This is Default value that is why used as
lfd.INTERVAL_TIME_SEC = 5
lfd.RETRY_SLEEP_TIME_SEC = 1
// isDebugMode = isDebugMode
// check parameter is valid - if not keep default else set new value
if intervalTime > 0 {
lfd.INTERVAL_TIME_SEC = intervalTime
}
// check parameter is valid - if not keep default else set new value
if sleepTime > 0 {
lfd.RETRY_SLEEP_TIME_SEC = time.Duration(sleepTime) * time.Millisecond
}
if maxNumberOfRetry <= 0 {
lfd.MAX_NUMBER_OF_RETRY = 3
} else {
lfd.MAX_NUMBER_OF_RETRY = maxNumberOfRetry
}
//start Heartbeat event
lfd.Heartbeat, _ = heartbeat.NewTast(taskName, lfd.INTERVAL_TIME_SEC)
// Use default ARC algo for store
lfd.gc = gcache.New(objectCount).
LFU().
Build()
//Init timer
lazyMutex.Lock()
lfd.Heartbeat.Start(func() error {
// check if process already running
if lfd.IsProcessRunning {
//If process is already running skip processing running it again.
return nil
}
//process eatch object in cache and save to hdd.
lfd.appendObjectsToFdb()
return nil
})
lazyMutex.Unlock()
}
// appendObjectsToFdb is private method which will be called to append object/data to file on hdd
// this will fetch all objects in store and append data to respective file it one by one
// right now it is not using any threading
func (lfd *LazyFDPHelper) appendObjectsToFdb() {
lfd.IsProcessRunning = true
// Fetch All Rows and then save into db
for item := range lfd.gc.GetALL() {
//TODO: catch errors
cacheObjectraw, _ := lfd.gc.Get(item)
cacheObjActual, _ := cacheObjectraw.(LazyCacheObject)
if cacheObjActual.ChangeCount > 0 {
cacheObjActual.IsLocked = true
appendDataToFDB(cacheObjActual.FileName, cacheObjActual.InterfaceData)
cacheObjActual.ChangeCount = 0
cacheObjActual.IsLocked = false
lfd.gc.Set(cacheObjActual.FileName, cacheObjActual)
lazyMutex.Lock()
lfd.MEMORY_WRITE_COUNT++
lfd.DISK_WRITE_COUNT++
// if isDebugMode {
// lazyCacheObject := PerformanceAnalyser[cacheObjActual.FileName]
// lazyCacheObject.MEMORY_WRITE_COUNT++
// PerformanceAnalyser[cacheObjActual.FileName] = lazyCacheObject
// }
lazyMutex.Unlock()
}
}
lfd.NumberOfUpdateAttempts = lfd.NumberOfUpdateAttempts + 1
lfd.IsProcessRunning = false
}
// appendDataToFDB appends data to file on hard drive.
// This appends data by marshaling using json.Marshal
// '\n' will be added as a separator between two different objects
func appendDataToFDB(filePath string, objectData interface{}) {
//TODO: catch errors
byteArray, marshalError := ffjson.Marshal(objectData)
if marshalError != nil {
logginghelper.LogError("error occured while marshaling data ", marshalError)
}
byteArray = append(byteArray, []byte("\n")...)
saveError := AppendDataToFDB(filePath, byteArray)
if saveError != nil {
logginghelper.LogError("error occured while saving data ", saveError)
}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment