Newer
Older
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/errormdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/fdb"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/loggermdl"
"github.com/noaway/heartbeat"
)
// var PerformanceAnalyser = map[string]LazyCacheObject{}
// var isDebugMode = false
var lazyMutex = &sync.Mutex{}
const MIN_DATA_LENGTH = 10
// LazyFDBHelper Helps to Save and Get cache object
Heartbeat *heartbeat.Task
IsProcessRunning bool
NumberOfUpdateAttempts int
MAX_NUMBER_OF_RETRY int
RETRY_SLEEP_TIME_SEC time.Duration
INTERVAL_TIME_SEC int
MEMORY_WRITE_COUNT int
MEMORY_READ_COUNT int
DISK_READ_COUNT int
DISK_WRITE_COUNT int
CacheExpirationTime time.Duration
CacheCleanUpInterval time.Duration
}
// LazyCacheObject is a
type LazyCacheObject struct {
// Identifier will be used to identify the LazyCacheObject
Identifier string
// This will be used as a Key for GC cache
FileName string
// Data is for storing byte array of an object; right now it is not used.
// This will hold object which developer wants to fetch or store
InterfaceData interface{}
//This number indicates how many times InterfaceData data is changed
ChangeCount int
// This will tell if object is locked for file saving or not.
IsLocked bool
MEMORY_WRITE_COUNT int
MEMORY_READ_COUNT int
DISK_READ_COUNT int
DISK_WRITE_COUNT int
// SaveDataFn - This is an user defined callback function executed to presist data. If not provided default save function will be executed.
type SaveDataFn func(key string, value *LazyCacheObject)
func (lfd *LazyFDBHelper) StartProcess(objectCount int, taskName string,
intervalTime int, sleepTime int, maxNumberOfRetry int, isDebugMode bool) {
//This is Default value that is why used as
lfd.INTERVAL_TIME_SEC = 5
lfd.RETRY_SLEEP_TIME_SEC = 1
// isDebugMode = isDebugMode
// check parameter is valid - if not keep default else set new value
if intervalTime > 0 {
lfd.INTERVAL_TIME_SEC = intervalTime
}
// check parameter is valid - if not keep default else set new value
if sleepTime > 0 {
lfd.RETRY_SLEEP_TIME_SEC = time.Duration(sleepTime) * time.Millisecond
}
if maxNumberOfRetry <= 0 {
lfd.MAX_NUMBER_OF_RETRY = 3
} else {
lfd.MAX_NUMBER_OF_RETRY = maxNumberOfRetry
}
//start Heartbeat event
lfd.Heartbeat, _ = heartbeat.NewTast(taskName, lfd.INTERVAL_TIME_SEC)
// TODO: Use Fast Cache here
// Use default ARC algo for store
// lfd.gc = gcache.New(objectCount).
// LFU().
// Build()
lfd.gc = cache.New(lfd.CacheExpirationTime, lfd.CacheCleanUpInterval)
//Init timer
lazyMutex.Lock()
lfd.Heartbeat.Start(func() error {
// check if process already running
if lfd.IsProcessRunning {
//If process is already running skip processing running it again.
return nil
}
//process eatch object in cache and save to hdd.
lfd.saveObjectsToFdb()
return nil
})
lazyMutex.Unlock()
}
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// ClearLazyObjInterfaceData -
func (lfd *LazyFDBHelper) ClearLazyObjInterfaceData(identifier string) error {
for item := range lfd.gc.Items() {
//TODO: catch errors
cachedObject, ok := lfd.gc.Get(item)
if !ok {
return errormdl.Wrap("error occured while getting " + item + " from gcache")
}
cachedObjectActual, _ := cachedObject.(LazyCacheObject)
if cachedObjectActual.Identifier == identifier {
if cachedObjectActual.ChangeCount > 0 {
cachedObjectActual.IsLocked = true
// saveDataToFDB(cachedObjectActual.FileName, cachedObjectActual.InterfaceData, cachedObjectActual.GJSONData)
cachedObjectActual.ChangeCount = 0
cachedObjectActual.IsLocked = false
lazyMutex.Lock()
lfd.DISK_WRITE_COUNT++
lazyMutex.Unlock()
// if isDebugMode {
// lazyCacheObject := PerformanceAnalyser[cachedObjectActual.FileName]
// lazyCacheObject.DISK_WRITE_COUNT++
// PerformanceAnalyser[cachedObjectActual.FileName] = lazyCacheObject
// }
}
lazyMutex.Lock()
lfd.gc.Delete(cachedObjectActual.FileName)
lazyMutex.Unlock()
}
}
return nil
}
// saveObjectsToFdb is private method which will be called to save object on hdd
// this will fetch all objects in store and save it one by one
// right now it is not using any threading
lfd.IsProcessRunning = true
// Fetch All Rows and then save into db
//TODO: catch errors
cacheObjectraw, _ := lfd.gc.Get(item)
cacheObjActual, _ := cacheObjectraw.(LazyCacheObject)
if cacheObjActual.ChangeCount > 0 {
cacheObjActual.IsLocked = true
// TODO: Catch errors from save function
if cacheObjActual.SaveFn == nil {
loggermdl.LogError("Executing default function")
saveDataToFDB(cacheObjActual.FileName, cacheObjActual.InterfaceData, cacheObjActual.GJSONData)
} else {
loggermdl.LogError("Executing custom function")
cacheObjActual.ChangeCount = 0
cacheObjActual.IsLocked = false
lfd.gc.Set(cacheObjActual.FileName, cacheObjActual, lfd.CacheExpirationTime)
lazyMutex.Lock()
lfd.MEMORY_WRITE_COUNT++
lfd.DISK_WRITE_COUNT++
// if isDebugMode {
// lazyCacheObject := PerformanceAnalyser[cacheObjActual.FileName]
// lazyCacheObject.MEMORY_WRITE_COUNT++
// PerformanceAnalyser[cacheObjActual.FileName] = lazyCacheObject
// }
lazyMutex.Unlock()
loggermdl.LogError("changes saved to disk at ", cacheObjActual.FileName)
}
}
lfd.NumberOfUpdateAttempts = lfd.NumberOfUpdateAttempts + 1
lfd.IsProcessRunning = false
}
// SaveOrUpdateDataInCache this method will Save object in cache if unavailable
func (lfd *LazyFDBHelper) SaveOrUpdateDataInCache(newObject LazyCacheObject) bool {
jsonString := newObject.GJSONData.String()
byteArray := []byte(jsonString)
if jsonString == "" {
var marshalError error
byteArray, marshalError = ffjson.Marshal(newObject.InterfaceData)
if marshalError != nil {
loggermdl.LogError("error occured while marshaling data ", marshalError)
return false
}
}
// check data length before saving. If less than assumed data length, return false
lengthOfData := len(byteArray)
if lengthOfData < MIN_DATA_LENGTH {
loggermdl.LogError("data size is less than minimun expected data length. Actual data length: ", lengthOfData)
loggermdl.LogError("data received: ", string(byteArray))
return false
}
retryCount := 0
retrylabel:
// Get prev object and then save new one
//TODO: catch errors
dataFromGC, ok := lfd.gc.Get(newObject.FileName)
lfd.gc.Set(newObject.FileName, newObject, lfd.CacheExpirationTime)
} else {
oldObject, _ := dataFromGC.(LazyCacheObject)
if oldObject.IsLocked && retryCount < lfd.MAX_NUMBER_OF_RETRY {
retryCount++
// Sleep for few sec so that other thread will release lock
time.Sleep(lfd.RETRY_SLEEP_TIME_SEC)
goto retrylabel
}
newObject.ChangeCount = oldObject.ChangeCount + 1
lfd.gc.Set(newObject.FileName, newObject, lfd.CacheExpirationTime)
}
lazyMutex.Lock()
lfd.MEMORY_WRITE_COUNT++
lazyMutex.Unlock()
// if isDebugMode {
// lazyCacheObject := PerformanceAnalyser[newObject.FileName]
// lazyCacheObject.MEMORY_WRITE_COUNT++
// PerformanceAnalyser[newObject.FileName] = lazyCacheObject
// }
return true
}
//saveDataToFDB data to hard disk.
// This saves data by marshaling using json.Marshal
func saveDataToFDB(filePath string, objectData interface{}, GJSONData gjson.Result) bool {
stringData := GJSONData.String()
byteArray := []byte(stringData)
if stringData == "" {
var marshalError error
byteArray, marshalError = ffjson.Marshal(objectData)
if marshalError != nil {
loggermdl.LogError("error occured while marshaling data ", marshalError)
return false
}
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
// check data length before saving. If less than assumed data length, return false
lengthOfData := len(byteArray)
if lengthOfData < MIN_DATA_LENGTH {
loggermdl.LogError("data size is less than minimun expected data length. Actual data length: ", lengthOfData)
loggermdl.LogError("data received: ", string(byteArray))
return false
}
saveError := fdb.SaveDataToFDBWithoutQueue(filePath, byteArray, true, false)
if saveError != nil {
loggermdl.LogError("error occured while saving data ", saveError)
return false
}
return true
}
// use it when req.
// func GetBytes(key interface{}) ([]byte, error) {
// var buf bytes.Buffer
// enc := gob.NewEncoder(&buf)
// err := enc.Encode(key)
// if err != nil {
// return nil, err
// }
// return buf.Bytes(), nil
// }
// RemoveDataFromCache Removes Data From Cache
func (lfd *LazyFDBHelper) RemoveDataFromCache(identifier string) {
// cachedObjectList := lfd.gc.Items()
for item := range lfd.gc.Items() {
cachedObject, ok := lfd.gc.Get(item)
if !ok {
loggermdl.LogError("error occured while getting ", item, " from gcache")
}
cachedObjectActual, _ := cachedObject.(LazyCacheObject)
if cachedObjectActual.Identifier == identifier {
if cachedObjectActual.ChangeCount > 0 {
cachedObjectActual.IsLocked = true
saveDataToFDB(cachedObjectActual.FileName, cachedObjectActual.InterfaceData, cachedObjectActual.GJSONData)
cachedObjectActual.ChangeCount = 0
cachedObjectActual.IsLocked = false
lazyMutex.Lock()
lfd.DISK_WRITE_COUNT++
lazyMutex.Unlock()
// if isDebugMode {
// lazyCacheObject := PerformanceAnalyser[cachedObjectActual.FileName]
// lazyCacheObject.DISK_WRITE_COUNT++
// PerformanceAnalyser[cachedObjectActual.FileName] = lazyCacheObject
// }
}
lazyMutex.Lock()
lazyMutex.Unlock()
}
}
}
//New Add new Key and value
// func (lfd *LazyFDBHelper) SetToCache(newObject LazyCacheObject) error {
// lfd.MEMORY_WRITE_COUNT++
// return lfd.gc.Set(newObject.FileName, newObject.InterfaceData)
// }
//Get object based on key
func (lfd *LazyFDBHelper) GetFromCache(newObject LazyCacheObject) (interface{}, bool) {
lazyMutex.Lock()
lfd.MEMORY_READ_COUNT++
lazyMutex.Unlock()
// if isDebugMode {
// lazyCacheObject := PerformanceAnalyser[newObject.FileName]
// lazyCacheObject.MEMORY_READ_COUNT++
// PerformanceAnalyser[newObject.FileName] = lazyCacheObject
// }
return lfd.gc.Get(newObject.FileName)
}
// GetAll objects from gc
// func (lfd *LazyFDBHelper) GetAllFromCache() map[interface{}]interface{} {
// return lfd.gc.Items()
// }
func (lfd *LazyFDBHelper) GetCacheLength() int {
return lfd.gc.ItemCount()
}
// PurgeCache first saves all data inside FDB and finally purge all Cache
// cachedObjectList := lfd.gc.GetALL()
for item := range lfd.gc.Items() {
cachedObject, ok := lfd.gc.Get(item)
if !ok {
loggermdl.LogError("error occured while getting ", item, " from gcache")
}
cachedObjectActual, conversionSuccessful := cachedObject.(LazyCacheObject)
if conversionSuccessful && cachedObjectActual.ChangeCount > 0 {
cachedObjectActual.IsLocked = true
saveDataToFDB(cachedObjectActual.FileName, cachedObjectActual.InterfaceData, cachedObjectActual.GJSONData)
cachedObjectActual.ChangeCount = 0
cachedObjectActual.IsLocked = false
lazyMutex.Lock()
lfd.DISK_WRITE_COUNT++
lazyMutex.Unlock()
}
}
lazyMutex.Lock()
lazyMutex.Unlock()
}
// =================================== > Lazywriter for appending data to file < ============================================
// StartProcessForAppend StartProcessForAppend
func (lfd *LazyFDBHelper) StartProcessForAppend(objectCount int, taskName string,
intervalTime int, sleepTime int, maxNumberOfRetry int, isDebugMode bool) {
//This is Default value that is why used as
lfd.INTERVAL_TIME_SEC = 5
lfd.RETRY_SLEEP_TIME_SEC = 1
// isDebugMode = isDebugMode
// check parameter is valid - if not keep default else set new value
if intervalTime > 0 {
lfd.INTERVAL_TIME_SEC = intervalTime
}
// check parameter is valid - if not keep default else set new value
if sleepTime > 0 {
lfd.RETRY_SLEEP_TIME_SEC = time.Duration(sleepTime) * time.Millisecond
}
if maxNumberOfRetry <= 0 {
lfd.MAX_NUMBER_OF_RETRY = 3
} else {
lfd.MAX_NUMBER_OF_RETRY = maxNumberOfRetry
}
//start Heartbeat event
lfd.Heartbeat, _ = heartbeat.NewTast(taskName, lfd.INTERVAL_TIME_SEC)
// Use default ARC algo for store
lfd.gc = cache.New(lfd.CacheExpirationTime, lfd.CacheCleanUpInterval)
//Init timer
lazyMutex.Lock()
lfd.Heartbeat.Start(func() error {
// check if process already running
if lfd.IsProcessRunning {
//If process is already running skip processing running it again.
return nil
}
//process eatch object in cache and save to hdd.
lfd.appendObjectsToFdb()
return nil
})
lazyMutex.Unlock()
}
// appendObjectsToFdb is private method which will be called to append object/data to file on hdd
// this will fetch all objects in store and append data to respective file it one by one
// right now it is not using any threading
func (lfd *LazyFDBHelper) appendObjectsToFdb() {
lfd.IsProcessRunning = true
// Fetch All Rows and then save into db
//TODO: catch errors
cacheObjectraw, _ := lfd.gc.Get(item)
cacheObjActual, _ := cacheObjectraw.(LazyCacheObject)
if cacheObjActual.ChangeCount > 0 {
cacheObjActual.IsLocked = true
appendDataToFDB(cacheObjActual.FileName, cacheObjActual.InterfaceData, cacheObjActual.GJSONData)
cacheObjActual.ChangeCount = 0
cacheObjActual.IsLocked = false
lfd.gc.Set(cacheObjActual.FileName, cacheObjActual, lfd.CacheExpirationTime)
lazyMutex.Lock()
lfd.MEMORY_WRITE_COUNT++
lfd.DISK_WRITE_COUNT++
// if isDebugMode {
// lazyCacheObject := PerformanceAnalyser[cacheObjActual.FileName]
// lazyCacheObject.MEMORY_WRITE_COUNT++
// PerformanceAnalyser[cacheObjActual.FileName] = lazyCacheObject
// }
lazyMutex.Unlock()
}
}
lfd.NumberOfUpdateAttempts = lfd.NumberOfUpdateAttempts + 1
lfd.IsProcessRunning = false
}
// appendDataToFDB appends data to file on hard drive.
// This appends data by marshaling using json.Marshal
// '\n' will be added as a separator between two different objects
func appendDataToFDB(filePath string, objectData interface{}, GJSONData gjson.Result) {
jsonString := GJSONData.String()
byteArray := []byte(jsonString)
if jsonString == "" {
var marshalError error
byteArray, marshalError = ffjson.Marshal(objectData)
if marshalError != nil {
loggermdl.LogError("error occured while marshaling data ", marshalError)
}
}
byteArray = append(byteArray, []byte("\n")...)
saveError := fdb.AppendDataToFDBWithoutQueue(filePath, byteArray, false)
if saveError != nil {
loggermdl.LogError("error occured while saving data ", saveError)
}
}
// RemoveDataFromCacheForAppend Removes Data From Cache
func (lfd *LazyFDBHelper) RemoveDataFromCacheForAppend(identifier string) {
for item := range cachedObjectList {
//TODO: catch errors
cachedObject, ok := lfd.gc.Get(item)
if !ok {
loggermdl.LogError("error occured while getting ", item, " from gcache")
}
cachedObjectActual, _ := cachedObject.(LazyCacheObject)
if cachedObjectActual.Identifier == identifier {
if cachedObjectActual.ChangeCount > 0 {
cachedObjectActual.IsLocked = true
appendDataToFDB(cachedObjectActual.FileName, cachedObjectActual.InterfaceData, cachedObjectActual.GJSONData)
cachedObjectActual.ChangeCount = 0
cachedObjectActual.IsLocked = false
lazyMutex.Lock()
lfd.DISK_WRITE_COUNT++
lazyMutex.Unlock()
// if isDebugMode {appendDataToFDB
// lazyCacheObject := PerformanceAnalyser[cachedObjectActual.FileName]
// lazyCacheObject.DISK_WRITE_COUNT++
// PerformanceAnalyser[cachedObjectActual.FileName] = lazyCacheObject
// }
}
lazyMutex.Lock()