lazywriter.go 15.1 KiB
Newer Older
Roshan Patil's avatar
Roshan Patil committed
package lazywriter

import (
	"sync"
	"time"

Roshan Patil's avatar
Roshan Patil committed
	"github.com/tidwall/gjson"

Roshan Patil's avatar
Roshan Patil committed
	"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/fdb"

	"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/loggermdl"

	"github.com/noaway/heartbeat"
Roshan Patil's avatar
Roshan Patil committed
	"github.com/pquerna/ffjson/ffjson"

Roshan Patil's avatar
Roshan Patil committed
	"github.com/patrickmn/go-cache"
Roshan Patil's avatar
Roshan Patil committed
)

// var PerformanceAnalyser = map[string]LazyCacheObject{}

// var isDebugMode = false
var lazyMutex = &sync.Mutex{}

const MIN_DATA_LENGTH = 10

Roshan Patil's avatar
Roshan Patil committed
// LazyFDBHelper Helps to Save and Get cache object
Roshan Patil's avatar
Roshan Patil committed
// also saves information into hard disk
Roshan Patil's avatar
Roshan Patil committed
type LazyFDBHelper struct {
	gc                     *cache.Cache
Roshan Patil's avatar
Roshan Patil committed
	Heartbeat              *heartbeat.Task
	IsProcessRunning       bool
	NumberOfUpdateAttempts int
	MAX_NUMBER_OF_RETRY    int
	RETRY_SLEEP_TIME_SEC   time.Duration
	INTERVAL_TIME_SEC      int
	MEMORY_WRITE_COUNT     int
	MEMORY_READ_COUNT      int
	DISK_READ_COUNT        int
	DISK_WRITE_COUNT       int
Roshan Patil's avatar
Roshan Patil committed
	CacheExpirationTime    time.Duration
	CacheCleanUpInterval   time.Duration
Roshan Patil's avatar
Roshan Patil committed
}

// LazyCacheObject is a
type LazyCacheObject struct {
	// Identifier will be used to identify the LazyCacheObject
	Identifier string
	// This will be used as a Key for GC cache
	FileName string

	// Data is for storing byte array of an object; right now it is not used.
Roshan Patil's avatar
Roshan Patil committed
	GJSONData gjson.Result
Roshan Patil's avatar
Roshan Patil committed

	// This will hold object which developer wants to fetch or store
	InterfaceData interface{}

	//This number indicates how many times InterfaceData data is changed
	ChangeCount int

	// This will tell if object is locked for file saving or not.
	IsLocked bool

	MEMORY_WRITE_COUNT int
	MEMORY_READ_COUNT  int
	DISK_READ_COUNT    int
	DISK_WRITE_COUNT   int
// SaveDataFn - This is an user defined callback function executed to presist data. If not provided default save function will be executed.
type SaveDataFn func(key string, value LazyCacheObject)

Roshan Patil's avatar
Roshan Patil committed
// StartProcess
Roshan Patil's avatar
Roshan Patil committed
func (lfd *LazyFDBHelper) StartProcess(objectCount int, taskName string,
Roshan Patil's avatar
Roshan Patil committed
	intervalTime int, sleepTime int, maxNumberOfRetry int, isDebugMode bool) {
	//This is Default value that is why used as
	lfd.INTERVAL_TIME_SEC = 5
	lfd.RETRY_SLEEP_TIME_SEC = 1
	// isDebugMode = isDebugMode
	// check parameter is valid - if not keep default else set new value
	if intervalTime > 0 {
		lfd.INTERVAL_TIME_SEC = intervalTime
	}

	// check parameter is valid - if not keep default else set new value

	if sleepTime > 0 {
		lfd.RETRY_SLEEP_TIME_SEC = time.Duration(sleepTime) * time.Millisecond
	}

	if maxNumberOfRetry <= 0 {
		lfd.MAX_NUMBER_OF_RETRY = 3
	} else {
		lfd.MAX_NUMBER_OF_RETRY = maxNumberOfRetry
	}

	//start Heartbeat event
	lfd.Heartbeat, _ = heartbeat.NewTast(taskName, lfd.INTERVAL_TIME_SEC)

	// TODO: Use Fast Cache here
	// Use default ARC algo for store
Roshan Patil's avatar
Roshan Patil committed
	// lfd.gc = gcache.New(objectCount).
	// 	LFU().
	// 	Build()
	lfd.gc = cache.New(lfd.CacheExpirationTime, lfd.CacheCleanUpInterval)
Roshan Patil's avatar
Roshan Patil committed
	//Init timer
	lazyMutex.Lock()
	lfd.Heartbeat.Start(func() error {

		// check if process already running
		if lfd.IsProcessRunning {
			//If process is already running skip processing running it again.
			return nil
		}

		//process eatch object in cache and save to hdd.
		lfd.saveObjectsToFdb()
		return nil
	})
	lazyMutex.Unlock()
}

// saveObjectsToFdb is private method which will be called to save object on hdd
// this will fetch all objects in store and save it one by one
// right now it is not using any threading
Roshan Patil's avatar
Roshan Patil committed
func (lfd *LazyFDBHelper) saveObjectsToFdb() {
Roshan Patil's avatar
Roshan Patil committed
	lfd.IsProcessRunning = true

	// Fetch All Rows and then save into db
Roshan Patil's avatar
Roshan Patil committed
	for item := range lfd.gc.Items() {
Roshan Patil's avatar
Roshan Patil committed

		//TODO: catch errors
		cacheObjectraw, _ := lfd.gc.Get(item)
		cacheObjActual, _ := cacheObjectraw.(LazyCacheObject)

		if cacheObjActual.ChangeCount > 0 {

			cacheObjActual.IsLocked = true
			// TODO: Catch errors from save function
			if cacheObjActual.SaveFn == nil {
				loggermdl.LogError("Executing default function")
				saveDataToFDB(cacheObjActual.FileName, cacheObjActual.InterfaceData, cacheObjActual.GJSONData)
			} else {
				loggermdl.LogError("Executing custom function")
				cacheObjActual.SaveFn(item, cacheObjActual)
			}
Roshan Patil's avatar
Roshan Patil committed
			cacheObjActual.ChangeCount = 0
			cacheObjActual.IsLocked = false
Roshan Patil's avatar
Roshan Patil committed
			lfd.gc.Set(cacheObjActual.FileName, cacheObjActual, lfd.CacheExpirationTime)
Roshan Patil's avatar
Roshan Patil committed

			lazyMutex.Lock()
			lfd.MEMORY_WRITE_COUNT++
			lfd.DISK_WRITE_COUNT++

			// if isDebugMode {
			// 	lazyCacheObject := PerformanceAnalyser[cacheObjActual.FileName]
			// 	lazyCacheObject.MEMORY_WRITE_COUNT++
			// 	PerformanceAnalyser[cacheObjActual.FileName] = lazyCacheObject
			// }
			lazyMutex.Unlock()
Akshay Bharambe's avatar
Akshay Bharambe committed
			loggermdl.LogError("changes saved to disk at ", cacheObjActual.FileName)
Roshan Patil's avatar
Roshan Patil committed
		}
	}
	lfd.NumberOfUpdateAttempts = lfd.NumberOfUpdateAttempts + 1

	lfd.IsProcessRunning = false
}

// SaveOrUpdateDataInCache this method will Save object in cache if unavailable
Roshan Patil's avatar
Roshan Patil committed
func (lfd *LazyFDBHelper) SaveOrUpdateDataInCache(newObject LazyCacheObject) bool {

	jsonString := newObject.GJSONData.String()
	byteArray := []byte(jsonString)
	if jsonString == "" {
		var marshalError error
		byteArray, marshalError = ffjson.Marshal(newObject.InterfaceData)
		if marshalError != nil {
			loggermdl.LogError("error occured while marshaling data ", marshalError)
			return false
		}
Roshan Patil's avatar
Roshan Patil committed
	}

	// check data length before saving. If less than assumed data length, return false

	lengthOfData := len(byteArray)
	if lengthOfData < MIN_DATA_LENGTH {
		loggermdl.LogError("data size is less than minimun expected data length. Actual data length: ", lengthOfData)
		loggermdl.LogError("data received: ", string(byteArray))

		return false
	}

	retryCount := 0
retrylabel:
	// Get prev object and then save new one
	//TODO: catch errors
Roshan Patil's avatar
Roshan Patil committed
	dataFromGC, ok := lfd.gc.Get(newObject.FileName)
Roshan Patil's avatar
Roshan Patil committed
	if !ok || dataFromGC == nil {
Roshan Patil's avatar
Roshan Patil committed
		newObject.ChangeCount = 1
Roshan Patil's avatar
Roshan Patil committed
		lfd.gc.Set(newObject.FileName, newObject, lfd.CacheExpirationTime)
Roshan Patil's avatar
Roshan Patil committed

	} else {

		oldObject, _ := dataFromGC.(LazyCacheObject)
		if oldObject.IsLocked && retryCount < lfd.MAX_NUMBER_OF_RETRY {
			retryCount++
			// Sleep for few sec so that other thread will release lock
			time.Sleep(lfd.RETRY_SLEEP_TIME_SEC)
			goto retrylabel
		}
		newObject.ChangeCount = oldObject.ChangeCount + 1
Roshan Patil's avatar
Roshan Patil committed
		lfd.gc.Set(newObject.FileName, newObject, lfd.CacheExpirationTime)
Roshan Patil's avatar
Roshan Patil committed
	}
	lazyMutex.Lock()
	lfd.MEMORY_WRITE_COUNT++
	lazyMutex.Unlock()
	// if isDebugMode {
	// 	lazyCacheObject := PerformanceAnalyser[newObject.FileName]
	// 	lazyCacheObject.MEMORY_WRITE_COUNT++
	// 	PerformanceAnalyser[newObject.FileName] = lazyCacheObject
	// }
Akshay Bharambe's avatar
Akshay Bharambe committed
	loggermdl.LogError("data updated in cache")
Roshan Patil's avatar
Roshan Patil committed
	return true
}

//saveDataToFDB data to hard disk.
// This saves data by marshaling using json.Marshal
Roshan Patil's avatar
Roshan Patil committed
func saveDataToFDB(filePath string, objectData interface{}, GJSONData gjson.Result) bool {
Roshan Patil's avatar
Roshan Patil committed
	//TODO: catch errors
Roshan Patil's avatar
Roshan Patil committed
	stringData := GJSONData.String()
	byteArray := []byte(stringData)
	if stringData == "" {
		var marshalError error
		byteArray, marshalError = ffjson.Marshal(objectData)
		if marshalError != nil {
			loggermdl.LogError("error occured while marshaling data ", marshalError)
			return false
		}
Roshan Patil's avatar
Roshan Patil committed
	}
Roshan Patil's avatar
Roshan Patil committed
	// check data length before saving. If less than assumed data length, return false
	lengthOfData := len(byteArray)
	if lengthOfData < MIN_DATA_LENGTH {
		loggermdl.LogError("data size is less than minimun expected data length. Actual data length: ", lengthOfData)
		loggermdl.LogError("data received: ", string(byteArray))
		return false
	}

	saveError := fdb.SaveDataToFDBWithoutQueue(filePath, byteArray, true, false)
	if saveError != nil {
		loggermdl.LogError("error occured while saving data ", saveError)
		return false
	}
	return true
}

// use it when req.
// func GetBytes(key interface{}) ([]byte, error) {
// 	var buf bytes.Buffer
// 	enc := gob.NewEncoder(&buf)
// 	err := enc.Encode(key)
// 	if err != nil {
// 		return nil, err
// 	}
// 	return buf.Bytes(), nil
// }

// RemoveDataFromCache Removes Data From Cache
Roshan Patil's avatar
Roshan Patil committed
func (lfd *LazyFDBHelper) RemoveDataFromCache(identifier string) {
Roshan Patil's avatar
Roshan Patil committed

	// Fetch All Rows and then save into db

Roshan Patil's avatar
Roshan Patil committed
	// cachedObjectList := lfd.gc.Items()
	for item := range lfd.gc.Items() {
Roshan Patil's avatar
Roshan Patil committed

		//TODO: catch errors
Roshan Patil's avatar
Roshan Patil committed
		cachedObject, ok := lfd.gc.Get(item)
		if !ok {
Roshan Patil's avatar
Roshan Patil committed
			loggermdl.LogError("error occured while getting ", item, " from gcache")
		}
		cachedObjectActual, _ := cachedObject.(LazyCacheObject)

		if cachedObjectActual.Identifier == identifier {
			if cachedObjectActual.ChangeCount > 0 {
				cachedObjectActual.IsLocked = true
Roshan Patil's avatar
Roshan Patil committed
				saveDataToFDB(cachedObjectActual.FileName, cachedObjectActual.InterfaceData, cachedObjectActual.GJSONData)
Roshan Patil's avatar
Roshan Patil committed
				cachedObjectActual.ChangeCount = 0
				cachedObjectActual.IsLocked = false
				lazyMutex.Lock()
				lfd.DISK_WRITE_COUNT++
				lazyMutex.Unlock()
				// if isDebugMode {
				// 	lazyCacheObject := PerformanceAnalyser[cachedObjectActual.FileName]
				// 	lazyCacheObject.DISK_WRITE_COUNT++
				// 	PerformanceAnalyser[cachedObjectActual.FileName] = lazyCacheObject
				// }
			}
			lazyMutex.Lock()
Roshan Patil's avatar
Roshan Patil committed
			lfd.gc.Delete(cachedObjectActual.FileName)
Roshan Patil's avatar
Roshan Patil committed
			lazyMutex.Unlock()

		}
	}
}

//New Add new Key and value
Roshan Patil's avatar
Roshan Patil committed
// func  (lfd *LazyFDBHelper) SetToCache(newObject LazyCacheObject) error {
Roshan Patil's avatar
Roshan Patil committed
// 	lfd.MEMORY_WRITE_COUNT++
// 	return lfd.gc.Set(newObject.FileName, newObject.InterfaceData)
// }

//Get object based on key
Roshan Patil's avatar
Roshan Patil committed
func (lfd *LazyFDBHelper) GetFromCache(newObject LazyCacheObject) (interface{}, bool) {
Roshan Patil's avatar
Roshan Patil committed
	lazyMutex.Lock()
	lfd.MEMORY_READ_COUNT++
	lazyMutex.Unlock()
	// if isDebugMode {
	// 	lazyCacheObject := PerformanceAnalyser[newObject.FileName]
	// 	lazyCacheObject.MEMORY_READ_COUNT++
	// 	PerformanceAnalyser[newObject.FileName] = lazyCacheObject
	// }
	return lfd.gc.Get(newObject.FileName)
}

// GetAll objects from gc
Roshan Patil's avatar
Roshan Patil committed
// func (lfd *LazyFDBHelper) GetAllFromCache() map[interface{}]interface{} {
// 	return lfd.gc.Items()
// }
Roshan Patil's avatar
Roshan Patil committed

// GetCacheLength Get Cache Length
Roshan Patil's avatar
Roshan Patil committed
func (lfd *LazyFDBHelper) GetCacheLength() int {
	return lfd.gc.ItemCount()
Roshan Patil's avatar
Roshan Patil committed
}

// PurgeCache first saves all data inside FDB and finally purge all Cache
Roshan Patil's avatar
Roshan Patil committed
func (lfd *LazyFDBHelper) PurgeCache() {
Roshan Patil's avatar
Roshan Patil committed

	// Fetch All Rows and then save into db

Roshan Patil's avatar
Roshan Patil committed
	// cachedObjectList := lfd.gc.GetALL()
	for item := range lfd.gc.Items() {
Roshan Patil's avatar
Roshan Patil committed

		//TODO: catch errors
Roshan Patil's avatar
Roshan Patil committed
		cachedObject, ok := lfd.gc.Get(item)
		if !ok {
Roshan Patil's avatar
Roshan Patil committed
			loggermdl.LogError("error occured while getting ", item, " from gcache")
		}
		cachedObjectActual, conversionSuccessful := cachedObject.(LazyCacheObject)

		if conversionSuccessful && cachedObjectActual.ChangeCount > 0 {
			cachedObjectActual.IsLocked = true
Roshan Patil's avatar
Roshan Patil committed
			saveDataToFDB(cachedObjectActual.FileName, cachedObjectActual.InterfaceData, cachedObjectActual.GJSONData)
Roshan Patil's avatar
Roshan Patil committed
			cachedObjectActual.ChangeCount = 0
			cachedObjectActual.IsLocked = false
			lazyMutex.Lock()
			lfd.DISK_WRITE_COUNT++
			lazyMutex.Unlock()
		}
	}
	lazyMutex.Lock()
Roshan Patil's avatar
Roshan Patil committed
	lfd.gc.Flush()
Roshan Patil's avatar
Roshan Patil committed
	lazyMutex.Unlock()
}

// =================================== > Lazywriter for appending data to file < ============================================

// StartProcessForAppend StartProcessForAppend
Roshan Patil's avatar
Roshan Patil committed
func (lfd *LazyFDBHelper) StartProcessForAppend(objectCount int, taskName string,
Roshan Patil's avatar
Roshan Patil committed
	intervalTime int, sleepTime int, maxNumberOfRetry int, isDebugMode bool) {
	//This is Default value that is why used as
	lfd.INTERVAL_TIME_SEC = 5
	lfd.RETRY_SLEEP_TIME_SEC = 1
	// isDebugMode = isDebugMode
	// check parameter is valid - if not keep default else set new value
	if intervalTime > 0 {
		lfd.INTERVAL_TIME_SEC = intervalTime
	}

	// check parameter is valid - if not keep default else set new value
	if sleepTime > 0 {
		lfd.RETRY_SLEEP_TIME_SEC = time.Duration(sleepTime) * time.Millisecond
	}

	if maxNumberOfRetry <= 0 {
		lfd.MAX_NUMBER_OF_RETRY = 3
	} else {
		lfd.MAX_NUMBER_OF_RETRY = maxNumberOfRetry
	}

	//start Heartbeat event
	lfd.Heartbeat, _ = heartbeat.NewTast(taskName, lfd.INTERVAL_TIME_SEC)

	// Use default ARC algo for store
Roshan Patil's avatar
Roshan Patil committed
	lfd.gc = cache.New(lfd.CacheExpirationTime, lfd.CacheCleanUpInterval)
Roshan Patil's avatar
Roshan Patil committed

	//Init timer
	lazyMutex.Lock()
	lfd.Heartbeat.Start(func() error {

		// check if process already running
		if lfd.IsProcessRunning {
			//If process is already running skip processing running it again.
			return nil
		}

		//process eatch object in cache and save to hdd.
		lfd.appendObjectsToFdb()
		return nil
	})
	lazyMutex.Unlock()
}

// appendObjectsToFdb is private method which will be called to append object/data to file on hdd
// this will fetch all objects in store and append data to respective file it one by one
// right now it is not using any threading
Roshan Patil's avatar
Roshan Patil committed
func (lfd *LazyFDBHelper) appendObjectsToFdb() {
Roshan Patil's avatar
Roshan Patil committed
	lfd.IsProcessRunning = true

	// Fetch All Rows and then save into db
Roshan Patil's avatar
Roshan Patil committed
	for item := range lfd.gc.Items() {
Roshan Patil's avatar
Roshan Patil committed

		//TODO: catch errors
		cacheObjectraw, _ := lfd.gc.Get(item)
		cacheObjActual, _ := cacheObjectraw.(LazyCacheObject)

		if cacheObjActual.ChangeCount > 0 {

			cacheObjActual.IsLocked = true
Roshan Patil's avatar
Roshan Patil committed
			appendDataToFDB(cacheObjActual.FileName, cacheObjActual.InterfaceData, cacheObjActual.GJSONData)
Roshan Patil's avatar
Roshan Patil committed
			cacheObjActual.ChangeCount = 0
			cacheObjActual.IsLocked = false
Roshan Patil's avatar
Roshan Patil committed
			lfd.gc.Set(cacheObjActual.FileName, cacheObjActual, lfd.CacheExpirationTime)
Roshan Patil's avatar
Roshan Patil committed

			lazyMutex.Lock()
			lfd.MEMORY_WRITE_COUNT++
			lfd.DISK_WRITE_COUNT++

			// if isDebugMode {
			// 	lazyCacheObject := PerformanceAnalyser[cacheObjActual.FileName]
			// 	lazyCacheObject.MEMORY_WRITE_COUNT++
			// 	PerformanceAnalyser[cacheObjActual.FileName] = lazyCacheObject
			// }
			lazyMutex.Unlock()
		}
	}
	lfd.NumberOfUpdateAttempts = lfd.NumberOfUpdateAttempts + 1

	lfd.IsProcessRunning = false
}

// appendDataToFDB appends data to file on hard drive.
// This appends data by marshaling using json.Marshal
// '\n' will be added as a separator between two different objects
Roshan Patil's avatar
Roshan Patil committed
func appendDataToFDB(filePath string, objectData interface{}, GJSONData gjson.Result) {
Roshan Patil's avatar
Roshan Patil committed
	//TODO: catch errors
Roshan Patil's avatar
Roshan Patil committed
	jsonString := GJSONData.String()
	byteArray := []byte(jsonString)
	if jsonString == "" {
		var marshalError error
		byteArray, marshalError = ffjson.Marshal(objectData)
		if marshalError != nil {
			loggermdl.LogError("error occured while marshaling data ", marshalError)
		}
Roshan Patil's avatar
Roshan Patil committed
	}
	byteArray = append(byteArray, []byte("\n")...)
	saveError := fdb.AppendDataToFDBWithoutQueue(filePath, byteArray, false)
	if saveError != nil {
		loggermdl.LogError("error occured while saving data ", saveError)
	}
}

// RemoveDataFromCacheForAppend Removes Data From Cache
Roshan Patil's avatar
Roshan Patil committed
func (lfd *LazyFDBHelper) RemoveDataFromCacheForAppend(identifier string) {
Roshan Patil's avatar
Roshan Patil committed

	// Fetch All Rows and then save into db

Roshan Patil's avatar
Roshan Patil committed
	cachedObjectList := lfd.gc.Items()
Roshan Patil's avatar
Roshan Patil committed
	for item := range cachedObjectList {

		//TODO: catch errors
Roshan Patil's avatar
Roshan Patil committed
		cachedObject, ok := lfd.gc.Get(item)
		if !ok {
Roshan Patil's avatar
Roshan Patil committed
			loggermdl.LogError("error occured while getting ", item, " from gcache")
		}
		cachedObjectActual, _ := cachedObject.(LazyCacheObject)
		if cachedObjectActual.Identifier == identifier {
			if cachedObjectActual.ChangeCount > 0 {
				cachedObjectActual.IsLocked = true
Roshan Patil's avatar
Roshan Patil committed
				appendDataToFDB(cachedObjectActual.FileName, cachedObjectActual.InterfaceData, cachedObjectActual.GJSONData)
Roshan Patil's avatar
Roshan Patil committed
				cachedObjectActual.ChangeCount = 0
				cachedObjectActual.IsLocked = false
				lazyMutex.Lock()
				lfd.DISK_WRITE_COUNT++
				lazyMutex.Unlock()
				// if isDebugMode {appendDataToFDB
				// 	lazyCacheObject := PerformanceAnalyser[cachedObjectActual.FileName]
				// 	lazyCacheObject.DISK_WRITE_COUNT++
				// 	PerformanceAnalyser[cachedObjectActual.FileName] = lazyCacheObject
				// }
			}
			lazyMutex.Lock()
Roshan Patil's avatar
Roshan Patil committed
			lfd.gc.Delete(cachedObjectActual.FileName)
Roshan Patil's avatar
Roshan Patil committed
			lazyMutex.Unlock()

		}
	}
}