You need to sign in or sign up before continuing.
Newer
Older
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/fdb"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/loggermdl"
"github.com/noaway/heartbeat"
)
// var PerformanceAnalyser = map[string]LazyCacheObject{}
// var isDebugMode = false
var lazyMutex = &sync.Mutex{}
const MIN_DATA_LENGTH = 10
// LazyFDBHelper Helps to Save and Get cache object
Heartbeat *heartbeat.Task
IsProcessRunning bool
NumberOfUpdateAttempts int
MAX_NUMBER_OF_RETRY int
RETRY_SLEEP_TIME_SEC time.Duration
INTERVAL_TIME_SEC int
MEMORY_WRITE_COUNT int
MEMORY_READ_COUNT int
DISK_READ_COUNT int
DISK_WRITE_COUNT int
CacheExpirationTime time.Duration
CacheCleanUpInterval time.Duration
}
// LazyCacheObject is a
type LazyCacheObject struct {
// Identifier will be used to identify the LazyCacheObject
Identifier string
// This will be used as a Key for GC cache
FileName string
// Data is for storing byte array of an object; right now it is not used.
// This will hold object which developer wants to fetch or store
InterfaceData interface{}
//This number indicates how many times InterfaceData data is changed
ChangeCount int
// This will tell if object is locked for file saving or not.
IsLocked bool
MEMORY_WRITE_COUNT int
MEMORY_READ_COUNT int
DISK_READ_COUNT int
DISK_WRITE_COUNT int
// SaveDataFn - This is an user defined callback function executed to presist data. If not provided default save function will be executed.
type SaveDataFn func(key string, value LazyCacheObject)
func (lfd *LazyFDBHelper) StartProcess(objectCount int, taskName string,
intervalTime int, sleepTime int, maxNumberOfRetry int, isDebugMode bool) {
//This is Default value that is why used as
lfd.INTERVAL_TIME_SEC = 5
lfd.RETRY_SLEEP_TIME_SEC = 1
// isDebugMode = isDebugMode
// check parameter is valid - if not keep default else set new value
if intervalTime > 0 {
lfd.INTERVAL_TIME_SEC = intervalTime
}
// check parameter is valid - if not keep default else set new value
if sleepTime > 0 {
lfd.RETRY_SLEEP_TIME_SEC = time.Duration(sleepTime) * time.Millisecond
}
if maxNumberOfRetry <= 0 {
lfd.MAX_NUMBER_OF_RETRY = 3
} else {
lfd.MAX_NUMBER_OF_RETRY = maxNumberOfRetry
}
//start Heartbeat event
lfd.Heartbeat, _ = heartbeat.NewTast(taskName, lfd.INTERVAL_TIME_SEC)
// TODO: Use Fast Cache here
// Use default ARC algo for store
// lfd.gc = gcache.New(objectCount).
// LFU().
// Build()
lfd.gc = cache.New(lfd.CacheExpirationTime, lfd.CacheCleanUpInterval)
//Init timer
lazyMutex.Lock()
lfd.Heartbeat.Start(func() error {
// check if process already running
if lfd.IsProcessRunning {
//If process is already running skip processing running it again.
return nil
}
//process eatch object in cache and save to hdd.
lfd.saveObjectsToFdb()
return nil
})
lazyMutex.Unlock()
}
// saveObjectsToFdb is private method which will be called to save object on hdd
// this will fetch all objects in store and save it one by one
// right now it is not using any threading
lfd.IsProcessRunning = true
// Fetch All Rows and then save into db
//TODO: catch errors
cacheObjectraw, _ := lfd.gc.Get(item)
cacheObjActual, _ := cacheObjectraw.(LazyCacheObject)
if cacheObjActual.ChangeCount > 0 {
cacheObjActual.IsLocked = true
// TODO: Catch errors from save function
if cacheObjActual.SaveFn == nil {
loggermdl.LogError("Executing default function")
saveDataToFDB(cacheObjActual.FileName, cacheObjActual.InterfaceData, cacheObjActual.GJSONData)
} else {
loggermdl.LogError("Executing custom function")
cacheObjActual.SaveFn(item, cacheObjActual)
}
cacheObjActual.ChangeCount = 0
cacheObjActual.IsLocked = false
lfd.gc.Set(cacheObjActual.FileName, cacheObjActual, lfd.CacheExpirationTime)
lazyMutex.Lock()
lfd.MEMORY_WRITE_COUNT++
lfd.DISK_WRITE_COUNT++
// if isDebugMode {
// lazyCacheObject := PerformanceAnalyser[cacheObjActual.FileName]
// lazyCacheObject.MEMORY_WRITE_COUNT++
// PerformanceAnalyser[cacheObjActual.FileName] = lazyCacheObject
// }
lazyMutex.Unlock()
loggermdl.LogError("changes saved to disk at ", cacheObjActual.FileName)
}
}
lfd.NumberOfUpdateAttempts = lfd.NumberOfUpdateAttempts + 1
lfd.IsProcessRunning = false
}
// SaveOrUpdateDataInCache this method will Save object in cache if unavailable
func (lfd *LazyFDBHelper) SaveOrUpdateDataInCache(newObject LazyCacheObject) bool {
jsonString := newObject.GJSONData.String()
byteArray := []byte(jsonString)
if jsonString == "" {
var marshalError error
byteArray, marshalError = ffjson.Marshal(newObject.InterfaceData)
if marshalError != nil {
loggermdl.LogError("error occured while marshaling data ", marshalError)
return false
}
}
// check data length before saving. If less than assumed data length, return false
lengthOfData := len(byteArray)
if lengthOfData < MIN_DATA_LENGTH {
loggermdl.LogError("data size is less than minimun expected data length. Actual data length: ", lengthOfData)
loggermdl.LogError("data received: ", string(byteArray))
return false
}
retryCount := 0
retrylabel:
// Get prev object and then save new one
//TODO: catch errors
dataFromGC, ok := lfd.gc.Get(newObject.FileName)
lfd.gc.Set(newObject.FileName, newObject, lfd.CacheExpirationTime)
} else {
oldObject, _ := dataFromGC.(LazyCacheObject)
if oldObject.IsLocked && retryCount < lfd.MAX_NUMBER_OF_RETRY {
retryCount++
// Sleep for few sec so that other thread will release lock
time.Sleep(lfd.RETRY_SLEEP_TIME_SEC)
goto retrylabel
}
newObject.ChangeCount = oldObject.ChangeCount + 1
lfd.gc.Set(newObject.FileName, newObject, lfd.CacheExpirationTime)
}
lazyMutex.Lock()
lfd.MEMORY_WRITE_COUNT++
lazyMutex.Unlock()
// if isDebugMode {
// lazyCacheObject := PerformanceAnalyser[newObject.FileName]
// lazyCacheObject.MEMORY_WRITE_COUNT++
// PerformanceAnalyser[newObject.FileName] = lazyCacheObject
// }
return true
}
//saveDataToFDB data to hard disk.
// This saves data by marshaling using json.Marshal
func saveDataToFDB(filePath string, objectData interface{}, GJSONData gjson.Result) bool {
stringData := GJSONData.String()
byteArray := []byte(stringData)
if stringData == "" {
var marshalError error
byteArray, marshalError = ffjson.Marshal(objectData)
if marshalError != nil {
loggermdl.LogError("error occured while marshaling data ", marshalError)
return false
}
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
// check data length before saving. If less than assumed data length, return false
lengthOfData := len(byteArray)
if lengthOfData < MIN_DATA_LENGTH {
loggermdl.LogError("data size is less than minimun expected data length. Actual data length: ", lengthOfData)
loggermdl.LogError("data received: ", string(byteArray))
return false
}
saveError := fdb.SaveDataToFDBWithoutQueue(filePath, byteArray, true, false)
if saveError != nil {
loggermdl.LogError("error occured while saving data ", saveError)
return false
}
return true
}
// use it when req.
// func GetBytes(key interface{}) ([]byte, error) {
// var buf bytes.Buffer
// enc := gob.NewEncoder(&buf)
// err := enc.Encode(key)
// if err != nil {
// return nil, err
// }
// return buf.Bytes(), nil
// }
// RemoveDataFromCache Removes Data From Cache
func (lfd *LazyFDBHelper) RemoveDataFromCache(identifier string) {
// cachedObjectList := lfd.gc.Items()
for item := range lfd.gc.Items() {
cachedObject, ok := lfd.gc.Get(item)
if !ok {
loggermdl.LogError("error occured while getting ", item, " from gcache")
}
cachedObjectActual, _ := cachedObject.(LazyCacheObject)
if cachedObjectActual.Identifier == identifier {
if cachedObjectActual.ChangeCount > 0 {
cachedObjectActual.IsLocked = true
saveDataToFDB(cachedObjectActual.FileName, cachedObjectActual.InterfaceData, cachedObjectActual.GJSONData)
cachedObjectActual.ChangeCount = 0
cachedObjectActual.IsLocked = false
lazyMutex.Lock()
lfd.DISK_WRITE_COUNT++
lazyMutex.Unlock()
// if isDebugMode {
// lazyCacheObject := PerformanceAnalyser[cachedObjectActual.FileName]
// lazyCacheObject.DISK_WRITE_COUNT++
// PerformanceAnalyser[cachedObjectActual.FileName] = lazyCacheObject
// }
}
lazyMutex.Lock()
lazyMutex.Unlock()
}
}
}
//New Add new Key and value
// func (lfd *LazyFDBHelper) SetToCache(newObject LazyCacheObject) error {
// lfd.MEMORY_WRITE_COUNT++
// return lfd.gc.Set(newObject.FileName, newObject.InterfaceData)
// }
//Get object based on key
func (lfd *LazyFDBHelper) GetFromCache(newObject LazyCacheObject) (interface{}, bool) {
lazyMutex.Lock()
lfd.MEMORY_READ_COUNT++
lazyMutex.Unlock()
// if isDebugMode {
// lazyCacheObject := PerformanceAnalyser[newObject.FileName]
// lazyCacheObject.MEMORY_READ_COUNT++
// PerformanceAnalyser[newObject.FileName] = lazyCacheObject
// }
return lfd.gc.Get(newObject.FileName)
}
// GetAll objects from gc
// func (lfd *LazyFDBHelper) GetAllFromCache() map[interface{}]interface{} {
// return lfd.gc.Items()
// }
func (lfd *LazyFDBHelper) GetCacheLength() int {
return lfd.gc.ItemCount()
}
// PurgeCache first saves all data inside FDB and finally purge all Cache
// cachedObjectList := lfd.gc.GetALL()
for item := range lfd.gc.Items() {
cachedObject, ok := lfd.gc.Get(item)
if !ok {
loggermdl.LogError("error occured while getting ", item, " from gcache")
}
cachedObjectActual, conversionSuccessful := cachedObject.(LazyCacheObject)
if conversionSuccessful && cachedObjectActual.ChangeCount > 0 {
cachedObjectActual.IsLocked = true
saveDataToFDB(cachedObjectActual.FileName, cachedObjectActual.InterfaceData, cachedObjectActual.GJSONData)
cachedObjectActual.ChangeCount = 0
cachedObjectActual.IsLocked = false
lazyMutex.Lock()
lfd.DISK_WRITE_COUNT++
lazyMutex.Unlock()
}
}
lazyMutex.Lock()
lazyMutex.Unlock()
}
// =================================== > Lazywriter for appending data to file < ============================================
// StartProcessForAppend StartProcessForAppend
func (lfd *LazyFDBHelper) StartProcessForAppend(objectCount int, taskName string,
intervalTime int, sleepTime int, maxNumberOfRetry int, isDebugMode bool) {
//This is Default value that is why used as
lfd.INTERVAL_TIME_SEC = 5
lfd.RETRY_SLEEP_TIME_SEC = 1
// isDebugMode = isDebugMode
// check parameter is valid - if not keep default else set new value
if intervalTime > 0 {
lfd.INTERVAL_TIME_SEC = intervalTime
}
// check parameter is valid - if not keep default else set new value
if sleepTime > 0 {
lfd.RETRY_SLEEP_TIME_SEC = time.Duration(sleepTime) * time.Millisecond
}
if maxNumberOfRetry <= 0 {
lfd.MAX_NUMBER_OF_RETRY = 3
} else {
lfd.MAX_NUMBER_OF_RETRY = maxNumberOfRetry
}
//start Heartbeat event
lfd.Heartbeat, _ = heartbeat.NewTast(taskName, lfd.INTERVAL_TIME_SEC)
// Use default ARC algo for store
lfd.gc = cache.New(lfd.CacheExpirationTime, lfd.CacheCleanUpInterval)
//Init timer
lazyMutex.Lock()
lfd.Heartbeat.Start(func() error {
// check if process already running
if lfd.IsProcessRunning {
//If process is already running skip processing running it again.
return nil
}
//process eatch object in cache and save to hdd.
lfd.appendObjectsToFdb()
return nil
})
lazyMutex.Unlock()
}
// appendObjectsToFdb is private method which will be called to append object/data to file on hdd
// this will fetch all objects in store and append data to respective file it one by one
// right now it is not using any threading
func (lfd *LazyFDBHelper) appendObjectsToFdb() {
lfd.IsProcessRunning = true
// Fetch All Rows and then save into db
//TODO: catch errors
cacheObjectraw, _ := lfd.gc.Get(item)
cacheObjActual, _ := cacheObjectraw.(LazyCacheObject)
if cacheObjActual.ChangeCount > 0 {
cacheObjActual.IsLocked = true
appendDataToFDB(cacheObjActual.FileName, cacheObjActual.InterfaceData, cacheObjActual.GJSONData)
cacheObjActual.ChangeCount = 0
cacheObjActual.IsLocked = false
lfd.gc.Set(cacheObjActual.FileName, cacheObjActual, lfd.CacheExpirationTime)
lazyMutex.Lock()
lfd.MEMORY_WRITE_COUNT++
lfd.DISK_WRITE_COUNT++
// if isDebugMode {
// lazyCacheObject := PerformanceAnalyser[cacheObjActual.FileName]
// lazyCacheObject.MEMORY_WRITE_COUNT++
// PerformanceAnalyser[cacheObjActual.FileName] = lazyCacheObject
// }
lazyMutex.Unlock()
}
}
lfd.NumberOfUpdateAttempts = lfd.NumberOfUpdateAttempts + 1
lfd.IsProcessRunning = false
}
// appendDataToFDB appends data to file on hard drive.
// This appends data by marshaling using json.Marshal
// '\n' will be added as a separator between two different objects
func appendDataToFDB(filePath string, objectData interface{}, GJSONData gjson.Result) {
jsonString := GJSONData.String()
byteArray := []byte(jsonString)
if jsonString == "" {
var marshalError error
byteArray, marshalError = ffjson.Marshal(objectData)
if marshalError != nil {
loggermdl.LogError("error occured while marshaling data ", marshalError)
}
}
byteArray = append(byteArray, []byte("\n")...)
saveError := fdb.AppendDataToFDBWithoutQueue(filePath, byteArray, false)
if saveError != nil {
loggermdl.LogError("error occured while saving data ", saveError)
}
}
// RemoveDataFromCacheForAppend Removes Data From Cache
func (lfd *LazyFDBHelper) RemoveDataFromCacheForAppend(identifier string) {
for item := range cachedObjectList {
//TODO: catch errors
cachedObject, ok := lfd.gc.Get(item)
if !ok {
loggermdl.LogError("error occured while getting ", item, " from gcache")
}
cachedObjectActual, _ := cachedObject.(LazyCacheObject)
if cachedObjectActual.Identifier == identifier {
if cachedObjectActual.ChangeCount > 0 {
cachedObjectActual.IsLocked = true
appendDataToFDB(cachedObjectActual.FileName, cachedObjectActual.InterfaceData, cachedObjectActual.GJSONData)
cachedObjectActual.ChangeCount = 0
cachedObjectActual.IsLocked = false
lazyMutex.Lock()
lfd.DISK_WRITE_COUNT++
lazyMutex.Unlock()
// if isDebugMode {appendDataToFDB
// lazyCacheObject := PerformanceAnalyser[cachedObjectActual.FileName]
// lazyCacheObject.DISK_WRITE_COUNT++
// PerformanceAnalyser[cachedObjectActual.FileName] = lazyCacheObject
// }
}
lazyMutex.Lock()