Commit d2647d21 authored by Ajit Jagtap's avatar Ajit Jagtap
Browse files

Merge branch 'fdb_code_refactor' into 'devbranch'

Fdb code refactor

See merge request !178
parents c95f4f00 0cdca0f1
Branches
Tags
2 merge requests!180Mep release 07042020,!178Fdb code refactor
Showing with 4238 additions and 4664 deletions
This diff is collapsed.
package bucket
import (
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/corefdb/filetype"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/corefdb/locker"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/errormdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/utiliymdl/guidmdl"
"github.com/tidwall/gjson"
)
type AppendBucket struct {
Bucket
}
func NewAppendBucket(bucketNameQuery string, isDynamicName bool, isLazyEnable bool, bucketPath string) (*AppendBucket, error) {
if bucketNameQuery == "" {
return nil, errormdl.Wrap("please provide value of bucketNameQuery")
}
b := Bucket{
BucketID: guidmdl.GetGUID(),
BucketNameQuery: bucketNameQuery,
IsDynamicName: isDynamicName,
BucketPath: bucketPath,
}
bucket := AppendBucket{}
bucket.Bucket = b
return &bucket, nil
}
func (ab *AppendBucket) Insert(filePath string, data *gjson.Result) error {
locker := locker.NewLocker(filePath)
appendFile, err := filetype.NewAppendFile(filePath, ab.Bucket.SecurityProvider, locker)
defer appendFile.Close()
if err != nil {
return err
}
return appendFile.Write(data)
}
func (ab *AppendBucket) Find(filePaths []string, queries []string, data *gjson.Result) (string, error) {
return "", errormdl.Wrap("operation not allowed")
}
func (ab *AppendBucket) Update(filePaths []string, queries []string, data *gjson.Result) (*gjson.Result, []error) {
return nil, []error{errormdl.Wrap("operation not allowed")}
}
func (ab *AppendBucket) Delete(filePaths, queries []string, data *gjson.Result) (recordsDeletedCnt int, errList []error) {
return 0, []error{errormdl.Wrap("operation not allowed")}
}
package bucket
import (
"strings"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/corefdb/index"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/corefdb/securityprovider"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/errormdl"
"github.com/tidwall/gjson"
)
const (
PathSeperator = "/"
DynamicPathPrefix = "$$"
)
type PathProvider interface {
GetPath(rs *gjson.Result) (string, error)
}
type Securable interface {
Secure(securityprovider.SecurityProvider)
}
type Store interface {
Insert(string, *gjson.Result) error
Find([]string, []string, *gjson.Result) (string, error)
Update([]string, []string, *gjson.Result) (*gjson.Result, []error)
Delete([]string, []string, *gjson.Result) (int, []error)
}
type MediaStore interface {
WriteMedia(filePath string, mediaData []byte, rs *gjson.Result) (string, error)
ReadMedia(filePath string, recordID string) ([]byte, *gjson.Result, error)
UpdateMedia(filePath string, recordID string, mediaData []byte, rs *gjson.Result) (err error)
UpsertMedia(filePath string, recordID string, mediaData []byte, rs *gjson.Result) (string, error)
}
type Bucket struct {
BucketID string `json:"bucketId"`
IsDynamicName bool `json:"isDynamicName"`
BucketNameQuery string `json:"bucketNameQuery"`
// TODO: rename to Indexex
Indexes []string `json:"indices"`
BucketPath string `json:"bucketPath"`
SecurityProvider securityprovider.SecurityProvider
}
func (bucket *Bucket) AddIndex(index *index.Index) error {
if index == nil {
return errormdl.Wrap("index value is nil")
}
bucket.Indexes = append(bucket.Indexes, index.IndexID)
index.BucketSequence = append(index.BucketSequence, bucket.BucketID)
return nil
}
// ResolveName - returns bucket name
func (bucket *Bucket) GetPath(rs *gjson.Result) (string, error) {
path := ""
pathChunks := strings.Split(bucket.BucketPath, PathSeperator)
for i := range pathChunks {
pathVal := pathChunks[i]
if strings.HasPrefix(pathChunks[i], DynamicPathPrefix) {
dynamicField := strings.TrimSpace(strings.TrimPrefix(pathChunks[i], DynamicPathPrefix))
pathVal = strings.TrimSpace(rs.Get(dynamicField).String())
if pathVal == "" {
return "", errormdl.Wrap("please provide value for bucket name: " + dynamicField)
}
}
path = path + PathSeperator + pathVal
}
name := bucket.BucketNameQuery
if bucket.IsDynamicName {
name = rs.Get(name).String()
}
if name == "" {
return name, errormdl.Wrap("please provide value for bucket name: " + bucket.BucketNameQuery)
}
path = strings.TrimPrefix(path+PathSeperator+name, PathSeperator)
return path, nil
}
func (bucket *Bucket) Secure(securityprovider securityprovider.SecurityProvider) {
bucket.SecurityProvider = securityprovider
}
package bucket
import (
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/loggermdl"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/corefdb/filetype"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/corefdb/locker"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/errormdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/utiliymdl/guidmdl"
)
type PackBucket struct {
Bucket
InFileIndexSchemaMap map[string]filetype.InFileIndex `json:"inFileIndexMap"`
// TODO: filepointer cache
packFiles map[string]filetype.PackFile
}
func NewPackBucket(bucketNameQuery string, isDynamicName bool, bucketPath string, inFileIndexSchemaMap map[string]filetype.InFileIndex) (*PackBucket, error) {
if bucketNameQuery == "" {
return nil, errormdl.Wrap("please provide value of bucketNameQuery")
}
bucket := Bucket{
BucketID: guidmdl.GetGUID(),
BucketNameQuery: bucketNameQuery,
IsDynamicName: isDynamicName,
BucketPath: bucketPath,
}
packBucket := PackBucket{}
packBucket.Bucket = bucket
if inFileIndexSchemaMap != nil {
packBucket.InFileIndexSchemaMap = inFileIndexSchemaMap
} else {
packBucket.InFileIndexSchemaMap = make(map[string]filetype.InFileIndex)
}
return &packBucket, nil
}
// TODO: add fdb index data call
func (pb *PackBucket) Insert(filePath string, data *gjson.Result) error {
requestedFileType := data.Get("fileType").String()
if len(requestedFileType) == 0 {
return errormdl.Wrap("please specify fileType")
}
_, ok := pb.InFileIndexSchemaMap[requestedFileType]
if !ok {
return errormdl.Wrap("filetype not found: " + requestedFileType)
}
locker := locker.NewLocker(filePath)
packFile, err := filetype.NewPackFile(filePath, pb.InFileIndexSchemaMap, pb.Bucket.SecurityProvider, locker)
defer packFile.Close()
if err != nil {
return err
}
return packFile.Write(data)
}
func (pb *PackBucket) Find(filePaths []string, queries []string, data *gjson.Result) (string, error) {
requestedFileType := data.Get("fileType").String()
if len(requestedFileType) == 0 {
return "", errormdl.Wrap("please specify fileType")
}
_, ok := pb.InFileIndexSchemaMap[requestedFileType]
if !ok {
return "", errormdl.Wrap("filetype not found: " + requestedFileType)
}
queries = append(queries, `#[fileType==`+requestedFileType+`]`)
resultArray := "[]"
loggermdl.LogError("len", len(filePaths))
for i := range filePaths {
loggermdl.LogError("path", filePaths[i])
locker := locker.NewLocker(filePaths[i])
packFile, err := filetype.NewPackFile(filePaths[i], pb.InFileIndexSchemaMap, pb.Bucket.SecurityProvider, locker)
defer packFile.Close()
if err != nil {
return "", err
}
result, err := packFile.Read(queries, data)
if err != nil {
return resultArray, err
}
for _, val := range gjson.Parse(result).Array() {
resultArray, _ = sjson.Set(resultArray, "-1", val.Value())
}
}
return resultArray, nil
}
func (pb *PackBucket) Update(filePaths []string, queries []string, data *gjson.Result) (*gjson.Result, []error) {
requestedFileType := data.Get("fileType").String()
if len(requestedFileType) == 0 {
loggermdl.LogError("please specify fileType")
return nil, []error{errormdl.Wrap("please specify fileType")}
}
_, ok := pb.InFileIndexSchemaMap[requestedFileType]
if !ok {
return nil, []error{errormdl.Wrap("filetype not found: " + requestedFileType)}
}
queries = append(queries, `#[fileType=="`+requestedFileType+`"]`)
finalResultArray := []gjson.Result{}
errList := []error{}
for i := range filePaths {
locker := locker.NewLocker(filePaths[i])
loggermdl.LogError("filePaths[i]", filePaths[i])
packFile, err := filetype.NewPackFile(filePaths[i], pb.InFileIndexSchemaMap, pb.Bucket.SecurityProvider, locker)
defer packFile.Close()
if err != nil {
errList = append(errList, err)
continue
}
resultArray, err := packFile.Update(queries, data)
if err != nil {
errList = append(errList, err)
continue
}
finalResultArray = append(finalResultArray, resultArray.Array()...)
}
resultListStr := "[]"
for _, resultObj := range finalResultArray {
resultListStr, _ = sjson.Set(resultListStr, "-1", resultObj.Value())
}
result := gjson.Parse(resultListStr)
return &result, errList
}
func (pb *PackBucket) Delete(filePaths []string, queries []string, data *gjson.Result) (recordsDeletedCnt int, errList []error) {
fileType := data.Get("fileType").String()
if len(fileType) == 0 {
loggermdl.LogError("fileType value not provided")
return recordsDeletedCnt, []error{errormdl.Wrap("please specify fileType")}
}
_, ok := pb.InFileIndexSchemaMap[fileType]
if !ok {
loggermdl.LogError("infileIndex for specified fileType not found")
return recordsDeletedCnt, []error{errormdl.Wrap("infileIndex for specified fileType not found")}
}
queries = append(queries, `#[fileType=="`+fileType+`"]`)
noDataFoundCnt := 0
for i := range filePaths {
locker := locker.NewLocker(filePaths[i])
packFile, err := filetype.NewPackFile(filePaths[i], pb.InFileIndexSchemaMap, pb.Bucket.SecurityProvider, locker)
defer packFile.Close()
if err != nil {
errList = append(errList, err)
continue
}
deletedRecordsCnt, err := packFile.Remove(queries)
if err != nil {
if err.Error() == "not found" {
noDataFoundCnt++
continue
}
errList = append(errList, err)
continue
}
recordsDeletedCnt += deletedRecordsCnt
}
if noDataFoundCnt == len(filePaths) {
errList = []error{errormdl.Wrap("data not found")}
}
return
}
func (pb *PackBucket) WriteMedia(filePath string, mediaData []byte, rs *gjson.Result) (recordID string, err error) {
locker := locker.NewLocker(filePath)
packFile, err := filetype.NewPackFile(filePath, pb.InFileIndexSchemaMap, pb.Bucket.SecurityProvider, locker)
defer packFile.Close()
if err != nil {
return "", err
}
return packFile.WriteMedia(mediaData, rs)
}
func (pb *PackBucket) ReadMedia(filePath string, recordID string) ([]byte, *gjson.Result, error) {
locker := locker.NewLocker(filePath)
packFile, err := filetype.NewPackFile(filePath, pb.InFileIndexSchemaMap, pb.Bucket.SecurityProvider, locker)
defer packFile.Close()
if err != nil {
return nil, nil, err
}
return packFile.ReadMedia(recordID)
}
func (pb *PackBucket) UpdateMedia(filePath string, recordID string, mediaData []byte, rs *gjson.Result) (err error) {
locker := locker.NewLocker(filePath)
packFile, err := filetype.NewPackFile(filePath, pb.InFileIndexSchemaMap, pb.Bucket.SecurityProvider, locker)
defer packFile.Close()
if err != nil {
return err
}
return packFile.UpdateMedia(recordID, mediaData, rs)
}
func (pb *PackBucket) UpsertMedia(filePath string, recordID string, mediaData []byte, rs *gjson.Result) (string, error) {
locker := locker.NewLocker(filePath)
packFile, err := filetype.NewPackFile(filePath, pb.InFileIndexSchemaMap, pb.Bucket.SecurityProvider, locker)
defer packFile.Close()
if err != nil {
return recordID, err
}
return packFile.UpsertMedia(recordID, mediaData, rs)
}
func (pb *PackBucket) DeleteMedia(filePath string, recordID string) error {
// TODO: implement media delete
return nil
}
func (pb *PackBucket) Reorg(filePaths []string) (errList []error) {
for i := range filePaths {
locker := locker.NewLocker(filePaths[i])
packFile, err := filetype.NewPackFile(filePaths[i], pb.InFileIndexSchemaMap, pb.Bucket.SecurityProvider, locker)
if err != nil {
errList = append(errList, err)
continue
}
err = packFile.Reorg()
if err != nil {
errList = append(errList, err)
continue
}
}
return nil
}
package bucket
import (
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/corefdb/locker"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/corefdb/securityprovider"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/corefdb/filetype"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/errormdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/utiliymdl/guidmdl"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)
type SimpleBucket struct {
Bucket
// TODO: implement lazy
EnableLazy bool
securityProvider securityprovider.SecurityProvider
Locker locker.Locker
}
func NewSimpleBucket(bucketNameQuery string, isDynamicName bool, isLazyEnable bool, bucketPath string) (*SimpleBucket, error) {
if bucketNameQuery == "" {
return nil, errormdl.Wrap("please provide value of bucketNameQuery")
}
b := Bucket{
BucketID: guidmdl.GetGUID(),
BucketNameQuery: bucketNameQuery,
IsDynamicName: isDynamicName,
BucketPath: bucketPath,
}
bucket := SimpleBucket{
EnableLazy: isLazyEnable,
}
bucket.Bucket = b
return &bucket, nil
}
func (sb *SimpleBucket) Insert(filePath string, data *gjson.Result) error {
locker := locker.NewLocker(filePath)
simpleFile, err := filetype.NewSimpleFile(filePath, sb.Bucket.SecurityProvider, locker)
defer simpleFile.Close()
if err != nil {
return err
}
return simpleFile.Write(data)
}
func (sb *SimpleBucket) Find(filePaths []string, queries []string, data *gjson.Result) (string, error) {
resultArray := "[]"
for i := range filePaths {
locker := locker.NewLocker(filePaths[i])
simpleFile, err := filetype.NewSimpleFile(filePaths[i], sb.Bucket.SecurityProvider, locker)
defer simpleFile.Close()
if err != nil {
return "", err
}
result, err := simpleFile.Read(data)
if err != nil {
return resultArray, err
}
resultArray, _ = sjson.Set(resultArray, "-1", gjson.ParseBytes(result).Value())
}
return resultArray, nil
}
func (sb *SimpleBucket) Update(filePaths []string, queries []string, data *gjson.Result) (*gjson.Result, []error) {
errList := []error{}
resultListStr := "[]"
for i := range filePaths {
locker := locker.NewLocker(filePaths[i])
simpleFile, err := filetype.NewSimpleFile(filePaths[i], sb.Bucket.SecurityProvider, locker)
defer simpleFile.Close()
if err != nil {
errList = append(errList, err)
continue
}
updatedData, err := simpleFile.Update(data)
if err != nil {
errList = append(errList, err)
continue
}
resultListStr, _ = sjson.Set(resultListStr, "-1", updatedData.Value())
}
result := gjson.Parse(resultListStr)
return &result, errList
}
func (sb *SimpleBucket) Delete(filePaths, queries []string, data *gjson.Result) (recordsDeletedCnt int, errList []error) {
noDataFoundCnt := 0
for i := range filePaths {
locker := locker.NewLocker(filePaths[i])
simpleFile, err := filetype.NewSimpleFile(filePaths[i], sb.Bucket.SecurityProvider, locker)
defer simpleFile.Close()
if err != nil {
errList = append(errList, err)
continue
}
err = simpleFile.Remove()
if err != nil {
if err.Error() == "not found" {
noDataFoundCnt++
continue
}
errList = append(errList, err)
continue
}
recordsDeletedCnt++
}
if noDataFoundCnt == len(filePaths) {
errList = []error{errormdl.Wrap("no data found")}
}
return
}
This diff is collapsed.
package corefdb
import (
"os"
"strings"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/filemdl/filepack"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
"path/filepath"
"sync"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/lazywriter"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/errormdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/cachemdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/filemdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/loggermdl"
"github.com/tidwall/buntdb"
)
const (
// INDEXFOLDER -INDEXFOLDER
INDEXFOLDER = "index"
// LazyCallBackFnAppendBucket - LazyCallBackFnAppendBucket
LazyCallBackFnAppendBucket = "LazyWriterAppendBucketCallBackFn"
// LazyCallBackFnSaveIndex - LazyCallBackFnSaveIndex
LazyCallBackFnSaveIndex = "LazyWriterCallBackFnAppendBucketSaveIndex"
)
var databases cachemdl.FastCacheHelper
var defaultDB string
var isLazyWriterEnabled = false
var defaultSecurityKey = []byte{}
var isSecurityEnabled = false
var fileFpCache Cache
func init() {
databases.Setup(1, 1000, 1000)
fpCache, _ := NewCache()
fileFpCache = *fpCache
}
// Init - initializes bundbmdl
func Init(isSecurityRequired, isCompressionRequired, isLazyWriterEnable bool, securityKey string) {
filepack.Init(isSecurityRequired, isCompressionRequired, securityKey)
defaultSecurityKey = []byte(securityKey)
isLazyWriterEnabled = isLazyWriterEnable
isSecurityEnabled = isSecurityRequired
}
// FDB - FDB
type FDB struct {
DBName string
DBPath string `json:"dbPath"`
EnableSecurity bool `json:"enableSec"` // if enabled, fdb files will be encrypted
EnableCompression bool `json:"enableCmp"` // if enabled, fdb files will be compressed and then encrypted
indices map[string]*Index
indexMux sync.Mutex
buckets map[string]*Bucket
bLocker sync.Mutex
restoreFileFromPack bool
}
// CreateFDBInstance - creates fdb instance
func CreateFDBInstance(dbPath, dbName string, isDefault bool) (*FDB, error) {
fdb := &FDB{
DBPath: dbPath,
indices: make(map[string]*Index),
indexMux: sync.Mutex{},
buckets: make(map[string]*Bucket),
bLocker: sync.Mutex{},
restoreFileFromPack: true,
DBName: dbName,
}
if isDefault {
defaultDB = dbName
}
databases.SetNoExpiration(dbName, fdb)
return fdb, nil
}
// EnableFDBSecurity enables security. Files will be encrypted.
func (fdb *FDB) EnableFDBSecurity(sec bool) {
if !sec {
return
}
fdb.EnableSecurity = sec
}
// EnableFDBCompression enables security. Files will be encrypted.
func (fdb *FDB) EnableFDBCompression(cmp bool) {
if !cmp {
return
}
fdb.EnableCompression = cmp
}
// GetBucketByName - reaturn bucket with specified bucket name
func (f *FDB) GetBucketByName(bucketName string) *Bucket {
for _, val := range f.buckets {
if val.BucketNameQuery == bucketName {
return val
}
}
return nil
}
// SetFileRestoreFormPackFlag - set whether to restore file from pack while performing Get operation
func (f *FDB) SetFileRestoreFormPackFlag(restoreFile bool) {
f.restoreFileFromPack = restoreFile
}
// GetFDBInstance - returns fdb instance
func GetFDBInstance(dbName string) (*FDB, error) {
if dbName == "" {
dbName = defaultDB
}
rawDB, ok := databases.Get(dbName)
if !ok {
loggermdl.LogError("Database instance not found")
return nil, errormdl.Wrap("Database instance not found")
}
fdb, ok := rawDB.(*FDB)
if !ok {
loggermdl.LogError("Can not cast object into *FDB")
return nil, errormdl.Wrap("Can not cast object into *FDB")
}
return fdb, nil
}
// GetFDBIndex - returns index
func (f *FDB) GetFDBIndex(indexName string) (*Index, bool) {
index, ok := f.indices[indexName]
return index, ok
}
// CreateIndex - Creates index
func (f *FDB) CreateIndex(index *Index) error {
return index.CreateIndex()
}
// GetBucketIndexes ; returns list of indexes of bucket
func (f *FDB) GetBucketIndexes(bucketName string) ([]*Index, error) {
var bucket *Bucket
var foundBucket = false
for _, bucketObj := range f.buckets {
if bucketObj.BucketNameQuery == bucketName {
foundBucket = true
bucket = bucketObj
break
}
}
if !foundBucket {
return nil, errormdl.Wrap("Bucket not found")
}
indexList := make([]*Index, 0)
for _, indexID := range bucket.Indices {
index, ok := f.indices[indexID]
if !ok {
return nil, errormdl.Wrap("index not found")
}
indexList = append(indexList, index)
}
return indexList, nil
}
// ReIndex - performs reindexing
func (f *FDB) ReIndex(indexID string) error {
index, found := f.GetFDBIndex(indexID)
if !found {
return errormdl.Wrap("index not found")
}
// find path to start file walk
pathToStartWalk := f.DBPath
for _, bucketID := range index.BucketSequence {
bucket := f.buckets[bucketID]
if bucket.IsDynamicName {
break
}
pathToStartWalk = filepath.Join(pathToStartWalk, bucket.BucketNameQuery)
}
// get required data for index file
indexDataMap, err := getFDBIndexData(pathToStartWalk, index, f.DBPath)
if err != nil {
loggermdl.LogError(err)
return err
}
// create or replace index
var fns []func(a, b string) bool
for _, idx := range index.IndexFields {
fns = append(fns, buntdb.IndexJSON(idx.FieldName))
}
err = index.CloseStore()
if err != nil {
return err
}
indexFilePath := filepath.Join(f.DBPath, INDEXFOLDER, index.IndexID)
if filemdl.FileAvailabilityCheck(indexFilePath) {
err = filemdl.DeleteFile(indexFilePath)
if err != nil {
return err
}
}
err = index.ReplaceIndex()
if err != nil {
loggermdl.LogError(err)
return err
}
// update index file by reading all data and updating index file
return index.AddEntries(indexDataMap)
}
// getFDBIndexData - returns index data with index fields recursively from specified direcory
func getFDBIndexData(path string, index *Index, dbPath string) (map[string]string, error) {
indexDataMap := make(map[string]string)
if !filemdl.FileAvailabilityCheck(path) {
return nil, errormdl.Wrap("invalid path: " + path)
}
dbPath = filepath.Join(dbPath) + string(filepath.Separator)
ferr := filemdl.Walk(path, func(filePath string, info os.FileInfo, err error) error {
if err != nil {
loggermdl.LogError(err)
return nil
}
if !info.IsDir() {
fileData, err := filemdl.ReadFile(filePath)
if err != nil {
loggermdl.LogError(err)
return nil
}
dataObj := gjson.Parse(string(fileData))
indexDataObj := ""
for _, indexField := range index.IndexFields {
indexDataObj, _ = sjson.Set(indexDataObj, indexField.FieldName, dataObj.Get(indexField.Query).String())
}
if indexDataObj != "" {
pathToSave := strings.TrimPrefix(filePath, dbPath)
indexDataMap[pathToSave] = indexDataObj
}
}
return nil
})
if ferr != nil {
return indexDataMap, ferr
}
return indexDataMap, nil
}
// GetLazyCallBackFunc - return callback functions for lazywriter
func GetLazyCallBackFunc(funcName string) (lazywriter.SaveDataFn, error) {
if funcName == LazyCallBackFnAppendBucket {
return lazyCallBackFnAppendBucket, nil
} else if funcName == LazyCallBackFnSaveIndex {
return lazyCallBackFnSaveIndex, nil
} else {
return nil, errormdl.Wrap("func not found for: " + funcName)
}
}
var lazyCallBackFnAppendBucket lazywriter.SaveDataFn = func(bucketId string, data *lazywriter.LazyCacheObject) {
dataInLazyMemory, ok := data.InterfaceData.(string)
if !ok {
return
}
_, _, err := filemdl.AppendDataToFile(data.FileName, []byte(dataInLazyMemory+"\r\n"), true)
if err != nil {
loggermdl.LogError(err)
return
}
data.InterfaceData = nil
err = AppendMaster.ClearLazyObjInterfaceData(data.Identifier)
if err != nil {
loggermdl.LogError(err)
return
}
AppendLazyObjHolder.Set(data.Identifier, *data)
}
var lazyCallBackFnSaveIndex lazywriter.SaveDataFn = func(indexID string, data *lazywriter.LazyCacheObject) {
indexData, ok := data.InterfaceData.(*Index)
if !ok {
return
}
err := LogFDBIndexFile(data.FileName, indexData)
if err != nil {
loggermdl.LogError(err)
return
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment