diff --git a/dalmdl/corefdb/corefdb.go b/dalmdl/corefdb/corefdb.go index fce9f2156e52bcad00dbb1ab5c1869eeae2b9d01..e49f737b527f57578367e299b3e453508c14296f 100644 --- a/dalmdl/corefdb/corefdb.go +++ b/dalmdl/corefdb/corefdb.go @@ -777,6 +777,65 @@ func ReorganizeFiles(dbName string) (errList []error) { return errList } +func ReorganizeFDBBucketData(dbName, indexId string, queries []string) (errList []error) { + fdb, err := GetFDBInstance(dbName) + if err != nil { + loggermdl.LogError("Error occured while fetching DB instance", err) + return []error{errormdl.Wrap("Error occured while fetching DB instance")} + } + index, found := fdb.GetFDBIndex(indexId) + if !found { + loggermdl.LogError("index not found") + return []error{errormdl.Wrap("index not found")} + } + bktCnt := len(index.BucketSequence) + if bktCnt == 0 { + loggermdl.LogError("no buckets available") + return []error{errormdl.Wrap("no buckets available")} + } + bucketID := index.BucketSequence[bktCnt-1] + bucketObj, ok := fdb.buckets[bucketID] + if !ok { + loggermdl.LogError("Bucket not found: " + bucketID) + return []error{errormdl.Wrap("Bucket not found: " + bucketID)} + } + + if packBucketObj, ok := bucketObj.(*bucket.PackBucket); ok { + var indexKeyValMap map[string]string + var err error + if len(queries) > 0 { + indexKeyValMap, err = index.GetEntriesByQueries(queries) + } else { + indexKeyValMap, err = index.GetAllEntries() + } + if err != nil { + loggermdl.LogError("index data not found", err) + return []error{errormdl.Wrap("index data not found")} + } + if indexKeyValMap == nil || len(indexKeyValMap) == 0 { + loggermdl.LogError("no data found to reorganize") + return []error{} + } + filePaths := make([]string, len(indexKeyValMap)) + i := 0 + for filePath := range indexKeyValMap { + sourceFile, err := filepath.Abs(filepath.Join(fdb.DBPath, filePath)) + if err != nil { + errList = append(errList, errormdl.Wrap("Error occured during reOrg of file data")) + continue + } + filePaths[i] = sourceFile + i++ + } + reorgErrs := packBucketObj.Reorg(filePaths[:i]) + if len(reorgErrs) > 0 { + errList = append(errList, reorgErrs...) + } + } + + return errList +} + func (fdb *FDB) ResolvePath(index *index.Index, rs *gjson.Result) (string, error) { path := "" for _, bucketID := range index.BucketSequence { diff --git a/dalmdl/corefdb/corefdb_test.go b/dalmdl/corefdb/corefdb_test.go index f89a54d4de7e8308cfe263166c375aada2324edd..9793bd0f79ccd11294fae63712a20fedf2f42ec3 100644 --- a/dalmdl/corefdb/corefdb_test.go +++ b/dalmdl/corefdb/corefdb_test.go @@ -755,3 +755,18 @@ func TestReorg(t *testing.T) { loggermdl.LogError("elapsed", elapsed) } + +func TestReorgBucket(t *testing.T) { + indexID := "studentPack" + + now := time.Now() + requeries := []string{} + errList := ReorganizeFDBBucketData(dbName, indexID, requeries) + if len(errList) > 0 { + log.Fatal(errList) + } + elapsed := time.Since(now) + + loggermdl.LogError("elapsed", elapsed) + +} diff --git a/dalmdl/corefdb/filetype/pack.go b/dalmdl/corefdb/filetype/pack.go index 32c3fed2f0423cbb38028ca25040d4ef857395e2..7bff9ba7285d2eb3e089540ab841e48965018ec6 100644 --- a/dalmdl/corefdb/filetype/pack.go +++ b/dalmdl/corefdb/filetype/pack.go @@ -191,15 +191,15 @@ func (p *PackFile) Write(rs *gjson.Result) (err error) { } func (p *PackFile) Read(queries []string, data *gjson.Result) (string, error) { + p.Locker.Lock() + defer func() { + p.Locker.Unlock() + }() filePath := p.Fp.Name() if !filemdl.FileAvailabilityCheck(filePath) { return "", errormdl.Wrap("file not found at:" + filePath) } - p.Locker.Lock() - defer func() { - p.Locker.Unlock() - }() indexDataString := "" var err error if p.InfileIndexRows == nil { @@ -266,13 +266,13 @@ func (p *PackFile) Update(queries []string, rs *gjson.Result) (gjson.Result, err if !ok { return updatedData, errormdl.Wrap("infileIndex schema for specified fileType not found: " + fileType) } - if !filemdl.FileAvailabilityCheck(p.Fp.Name()) { - return updatedData, errormdl.Wrap("file not found: " + p.Fp.Name()) - } p.Locker.Lock() defer func() { p.Locker.Unlock() }() + if !filemdl.FileAvailabilityCheck(p.Fp.Name()) { + return updatedData, errormdl.Wrap("file not found: " + p.Fp.Name()) + } indexDataString := "[]" var err error @@ -370,16 +370,16 @@ func (p *PackFile) Remove(queries []string) (recordsDeletedCnt int, err error) { func (p *PackFile) WriteMedia(mediaData []byte, rs *gjson.Result) (recordID string, err error) { - p.Locker.Lock() - defer func() { - p.Locker.Unlock() - }() - fileHash, err := securitymdl.GetHash(rs.String()) if err != nil { loggermdl.LogError("error writing to bucket: ", err) return recordID, err } + + p.Locker.Lock() + defer func() { + p.Locker.Unlock() + }() previousIndexData := "[]" if p.InfileIndexRows == nil { previousIndexData, err = getInFileIndexData(p.Fp) @@ -438,6 +438,11 @@ func (p *PackFile) WriteMedia(mediaData []byte, rs *gjson.Result) (recordID stri } func (p *PackFile) UpdateMedia(recordID string, mediaData []byte, rs *gjson.Result) (err error) { + + p.Locker.Lock() + defer func() { + p.Locker.Unlock() + }() isValid, err := isValidPackFile(p.Fp) if err != nil { return err @@ -447,11 +452,6 @@ func (p *PackFile) UpdateMedia(recordID string, mediaData []byte, rs *gjson.Resu } inFileIndexQueries := []string{`#[recordID=` + recordID + `]`} - - p.Locker.Lock() - defer func() { - p.Locker.Unlock() - }() indexDataString := "[]" if p.InfileIndexRows == nil { @@ -527,6 +527,10 @@ func (p *PackFile) UpdateMedia(recordID string, mediaData []byte, rs *gjson.Resu } func (p *PackFile) UpsertMedia(recordID string, mediaData []byte, rs *gjson.Result) (string, error) { + p.Locker.Lock() + defer func() { + p.Locker.Unlock() + }() isValid, err := isValidPackFile(p.Fp) if err != nil { return recordID, err @@ -541,11 +545,6 @@ func (p *PackFile) UpsertMedia(recordID string, mediaData []byte, rs *gjson.Resu inFileIndexQueries := []string{`#[recordID=` + recordID + `]`} - p.Locker.Lock() - - defer func() { - p.Locker.Unlock() - }() f := p.Fp indexDataString := "[]" @@ -618,6 +617,11 @@ func (p *PackFile) UpsertMedia(recordID string, mediaData []byte, rs *gjson.Resu func (p *PackFile) ReadMedia(recordID string) ([]byte, *gjson.Result, error) { dataByte := []byte{} var metaData *gjson.Result + p.Locker.Lock() + defer func() { + p.Locker.Unlock() + }() + isValid, err := isValidPackFile(p.Fp) if err != nil { return dataByte, metaData, err @@ -629,11 +633,6 @@ func (p *PackFile) ReadMedia(recordID string) ([]byte, *gjson.Result, error) { inFileIndexQueries := []string{`#[recordID=` + recordID + `]`} - p.Locker.Lock() - defer func() { - p.Locker.Unlock() - }() - f = p.Fp indexDataString := "[]" if p.InfileIndexRows == nil { @@ -682,13 +681,7 @@ func (p *PackFile) RemoveMedia(recordID string) error { } func (p *PackFile) Reorg() error { - f := p.Fp - p.Locker.Lock() - defer func() { - p.Locker.Unlock() - }() - loggermdl.LogDebug("reorg FilePath", p.FilePath) _, sourceFileName := filepath.Split(p.FilePath) desFileName := sourceFileName + "_" + strconv.FormatInt(time.Now().Unix(), 10) tempFilepath, err := filepath.Abs(filepath.Join(filemdl.TempDir, desFileName)) @@ -704,6 +697,10 @@ func (p *PackFile) Reorg() error { return errormdl.CheckErr(createError) } } + p.Locker.Lock() + defer func() { + p.Locker.Unlock() + }() fpTemp, err := os.OpenFile(tempFilepath, os.O_CREATE|os.O_RDWR, os.ModePerm) if err != nil { return err @@ -998,7 +995,6 @@ func initializeFile(fp *os.File) (err error) { err = filemdl.CreateDirectoryRecursive(dir) if errormdl.CheckErr(err) != nil { loggermdl.LogError(err) - err = err return }