Commit 68472fcf authored by Kunal Taitkar's avatar Kunal Taitkar
Browse files

Merge branch 'devbranch' into kunalt_downloadManager

parents 39677d66 078aea73
Branches
Tags
2 merge requests!11331 Aug MEP Merge Dev to Stg,!111Timeout added in downloadhelper.mdl
Showing with 879 additions and 440 deletions
This diff is collapsed.
......@@ -169,7 +169,7 @@ func TestDeleteData(t *testing.T) {
log.Fatal(err)
}
_, errList := DeleteData("myfdb", i.IndexID, `#[name=="vivek"]`)
errList := DeleteData("myfdb", i.IndexID, `#[name=="vivek"]`)
if len(errList) > 0 {
loggermdl.LogError(errList)
// log.Fatal(err)
......@@ -271,14 +271,14 @@ func BenchmarkReindex(b *testing.B) {
}
}
func TestWriteDataToBucket(t *testing.T) {
func TestSaveDataInInfileBucket(t *testing.T) {
db, err := CreateFDBInstance("/home/vivekn/fdb_data", "myfdb", false)
if err != nil {
log.Fatal("CreateFDBInstance = ", err)
}
// step 1: create bucket
b := db.GetNewBucket("bucketPackType", false, &Bucket{})
b.SetBucketType(BucketTypeInfileIndex)
b.SetBucketType(BucketTypePack)
infileIndexProfile := InFileIndex{
FileType: "Profile",
......@@ -293,8 +293,8 @@ func TestWriteDataToBucket(t *testing.T) {
FileType: "Exam",
IndexFields: []IndexField{
IndexField{
FieldName: "marks",
Query: "marks",
FieldName: "examId",
Query: "examId",
},
},
}
......@@ -330,6 +330,7 @@ func TestWriteDataToBucket(t *testing.T) {
data, _ = sjson.Set(data, "examName", "unit2")
data, _ = sjson.Set(data, "totalQuestion", 50)
data, _ = sjson.Set(data, "marks", 26)
data, _ = sjson.Set(data, "examId", "MATH001")
data, _ = sjson.Set(data, "fileType", "Exam")
//////// ------------vijay
......@@ -348,27 +349,110 @@ func TestWriteDataToBucket(t *testing.T) {
// data, _ = sjson.Set(data, "fileType", "Exam")
// studentList := []gjson.Result{}
// for index := 300; index < 1000; index++ {
// for index := 0; index < 5; index++ {
// data, _ = sjson.Set(data, "studentId", index+1)
// studentList = append(studentList, gjson.Parse(data))
// }
// step 3: save data
// for _, studentObj := range studentList {
// // data, _ = sjson.Set(data, "studentId", v+1)
// // studentObj := gjson.Parse(data)
// err = WriteDataToBucket("myfdb", i.IndexID, &studentObj)
// err = SaveDataInFDB("myfdb", i.IndexID, &studentObj)
// if err != nil {
// log.Fatal(err)
// }
// }
studentObj := gjson.Parse(data)
err = WriteDataToBucket("myfdb", i.IndexID, &studentObj)
err = SaveDataInFDB("myfdb", i.IndexID, &studentObj)
if err != nil {
log.Fatal(err)
}
indexFilePath := filepath.Join(db.DBPath, INDEXFOLDER, i.IndexNameQuery)
err = LogFDBIndexFile(indexFilePath, i)
if err != nil {
log.Fatal(err)
}
}
func TestSaveDataInNormalBucket(t *testing.T) {
db, err := CreateFDBInstance("/home/vivekn/fdb_data", "myfdb", false)
if err != nil {
log.Fatal("CreateFDBInstance = ", err)
}
// step 1: create bucket
b := db.GetNewBucket("bucketPackType", false, &Bucket{})
b.SetBucketType(BucketTypeSimple)
i, err := db.GetNewIndex("studentId", true)
if err != nil {
log.Fatal(err)
}
i.SetBucket(b)
fields := []IndexField{
IndexField{
FieldName: "name",
Query: "name",
},
}
i.SetFields(fields...)
// step 2: create index
err = db.CreateIndex(i)
if err != nil {
log.Fatal(err)
}
// data, _ := sjson.Set("", "name", "ajay")
// data, _ = sjson.Set(data, "studentId", 10013)
// data, _ = sjson.Set(data, "class", "TY_MCA")
// data, _ = sjson.Set(data, "age", 24)
// data, _ = sjson.Set(data, "fileType", "Profile")
data, _ := sjson.Set("", "name", "ajay")
data, _ = sjson.Set(data, "studentId", 1234)
data, _ = sjson.Set(data, "examName", "unit2")
data, _ = sjson.Set(data, "totalQuestion", 50)
data, _ = sjson.Set(data, "marks", 26)
data, _ = sjson.Set(data, "examId", "MATH001")
data, _ = sjson.Set(data, "fileType", "Exam")
//////// ------------vijay
// data, _ := sjson.Set("", "name", "vijay")
// data, _ = sjson.Set(data, "studentId", 10014)
// data, _ = sjson.Set(data, "class", "SY_MCA")
// data, _ = sjson.Set(data, "age", 23)
// data, _ = sjson.Set(data, "fileType", "Profile")
// data, _ := sjson.Set("", "studentId", 10014)
// data, _ = sjson.Set(data, "name", "vijay")
// data, _ = sjson.Set(data, "examName", "Unit3")
// data, _ = sjson.Set(data, "totalQuestion", 100)
// data, _ = sjson.Set(data, "marks", 32)
// data, _ = sjson.Set(data, "fileType", "Exam")
// studentList := []gjson.Result{}
// for index := 0; index < 5; index++ {
// data, _ = sjson.Set(data, "studentId", index+1)
// studentList = append(studentList, gjson.Parse(data))
// }
// for _, studentObj := range studentList {
// err = SaveDataInFDB("myfdb", i.IndexID, &studentObj)
// if err != nil {
// log.Fatal(err)
// }
// }
studentObj := gjson.Parse(data)
err = SaveDataInFDB("myfdb", i.IndexID, &studentObj)
if err != nil {
log.Fatal(err)
}
indexFilePath := filepath.Join(db.DBPath, INDEXFOLDER, i.IndexNameQuery)
err = LogFDBIndexFile(indexFilePath, i)
if err != nil {
log.Fatal(err)
}
}
func TestGetDataFromInFileBucket(t *testing.T) {
db, err := CreateFDBInstance("/home/vivekn/fdb_data", "myfdb", false)
if err != nil {
......@@ -376,7 +460,7 @@ func TestGetDataFromInFileBucket(t *testing.T) {
}
// step 1: create bucket
b := db.GetNewBucket("bucketPackType", false, &Bucket{})
b.SetBucketType(BucketTypeInfileIndex)
b.SetBucketType(BucketTypePack)
infileIndexProfile := InFileIndex{
FileType: "Profile",
......@@ -439,7 +523,7 @@ func TestGetDataFromInFileBucket(t *testing.T) {
studentObj := gjson.Parse(data)
// step 3: save data
result, err := GetDataFromDb("myfdb", i.IndexID, &studentObj, queries, inFileIndexQueries)
result, err := ReadDataFromFDB("myfdb", i.IndexID, &studentObj, queries, inFileIndexQueries)
if err != nil {
log.Fatal(err)
}
......@@ -453,7 +537,7 @@ func TestUpdateData(t *testing.T) {
}
// step 1: create bucket
b := db.GetNewBucket("bucketPackType", false, &Bucket{})
b.SetBucketType(BucketTypeInfileIndex)
b.SetBucketType(BucketTypePack)
infileIndexProfile := InFileIndex{
FileType: "Profile",
......@@ -526,7 +610,7 @@ func TestUpdateData(t *testing.T) {
infileIndexQueries := []string{`#[marks==32]`}
studentObj := gjson.Parse(data)
// step 3: save data
updatedData, err := UpdateBucketData("myfdb", i.IndexID, &studentObj, queries, infileIndexQueries)
updatedData, err := UpdateDataInFDB("myfdb", i.IndexID, &studentObj, queries, infileIndexQueries)
if err != nil {
log.Fatal(err)
}
......@@ -540,7 +624,7 @@ func TestDeleteRecordsFromInFileIndexBucket(t *testing.T) {
}
// step 1: create bucket
b := db.GetNewBucket("bucketPackType", false, &Bucket{})
b.SetBucketType(BucketTypeInfileIndex)
b.SetBucketType(BucketTypePack)
infileIndexProfile := InFileIndex{
FileType: "Profile",
......@@ -604,7 +688,7 @@ func TestReindexOnSpecialBucket(t *testing.T) {
}
// step 1: create bucket
b := db.GetNewBucket("bucketPackType", false, &Bucket{})
b.SetBucketType(BucketTypeInfileIndex)
b.SetBucketType(BucketTypePack)
infileIndexFDB := InFileIndex{
FileType: "FDBIndex",
IndexFields: []IndexField{
......@@ -643,7 +727,7 @@ func TestLogFDBIndexFile(t *testing.T) {
}
// step 1: create bucket
b := db.GetNewBucket("bucketPackType", false, &Bucket{})
b.SetBucketType(BucketTypeInfileIndex)
b.SetBucketType(BucketTypePack)
infileIndexProfile := InFileIndex{
FileType: "Profile",
......@@ -691,7 +775,7 @@ func TestLogFDBIndexFile(t *testing.T) {
data, _ = sjson.Set(data, "fileType", "Exam")
studentList := []gjson.Result{}
for index := 0; index < 10; index++ {
for index := 0; index < 1; index++ {
data, _ = sjson.Set(data, "studentId", index+1)
studentList = append(studentList, gjson.Parse(data))
}
......@@ -699,7 +783,7 @@ func TestLogFDBIndexFile(t *testing.T) {
for _, studentObj := range studentList {
// data, _ = sjson.Set(data, "studentId", v+1)
// studentObj := gjson.Parse(data)
err = WriteDataToBucket("myfdb", i.IndexID, &studentObj)
err = SaveDataInFDB("myfdb", i.IndexID, &studentObj)
if err != nil {
log.Fatal(err)
}
......@@ -720,7 +804,7 @@ func TestLoadFDBIndexFromFile(t *testing.T) {
}
// step 1: create bucket
b := db.GetNewBucket("bucketPackType", false, &Bucket{})
b.SetBucketType(BucketTypeInfileIndex)
b.SetBucketType(BucketTypePack)
infileIndexProfile := InFileIndex{
FileType: "Profile",
......@@ -778,7 +862,7 @@ func TestLoadFDBIndexFromFile(t *testing.T) {
studentObj := gjson.Parse(data)
// step 3: save data
result, err := GetDataFromDb("myfdb", i.IndexID, &studentObj, queries, inFileIndexQueries)
result, err := ReadDataFromFDB("myfdb", i.IndexID, &studentObj, queries, inFileIndexQueries)
if err != nil {
log.Fatal(err)
}
......@@ -792,7 +876,7 @@ func TestSaveDataInAppendBucket(t *testing.T) {
}
// step 1: create bucket
b := db.GetNewBucket("appendBucket", false, &Bucket{})
b.SetBucketType(BucketTypeInfileIndex)
b.SetBucketType(BucketTypeAppend)
// loggermdl.LogDebug(infileIndexExam)
i, err := db.GetNewIndex("studentProfId", true)
......@@ -827,7 +911,7 @@ func TestSaveDataInAppendBucket(t *testing.T) {
// data2, _ = sjson.Set(data2, "marks", 26)
// data2, _ = sjson.Set(data2, "fileType", "Exam")
// studentExamObj := gjson.Parse(data2)
err = SaveDataInAppendBucket("myfdb", i.IndexID, &studentProfObj)
err = SaveDataInFDB("myfdb", i.IndexID, &studentProfObj)
if err != nil {
log.Fatal(err)
}
......@@ -849,7 +933,7 @@ func TestLoadFDBEncryptedData(t *testing.T) {
}
// step 1: create bucket
b := db.GetNewBucket("appendBucket", false, &Bucket{})
b.SetBucketType(BucketTypeInfileIndex)
b.SetBucketType(BucketTypePack)
infileIndexProfile := InFileIndex{
FileType: "Profile",
......@@ -903,3 +987,122 @@ func TestLoadFDBEncryptedData(t *testing.T) {
})
})
}
func TestDeleteDataFromInfileBucket(t *testing.T) {
// TestSaveDataInFDB(t)
db, err := CreateFDBInstance("/home/vivekn/fdb_data", "myfdb", false)
if err != nil {
log.Fatal("CreateFDBInstance = ", err)
}
// step 1: create bucket
b := db.GetNewBucket("bucketPackType", false, &Bucket{})
b.SetBucketType(BucketTypeSimple)
infileIndexProfile := InFileIndex{
FileType: "Profile",
IndexFields: []IndexField{
IndexField{
FieldName: "class",
Query: "class",
},
},
}
infileIndexExam := InFileIndex{
FileType: "Exam",
IndexFields: []IndexField{
IndexField{
FieldName: "examId",
Query: "examId",
},
},
}
b.SetInFileIndex(infileIndexProfile)
b.SetInFileIndex(infileIndexExam)
i, err := db.GetNewIndex("studentId", true)
if err != nil {
log.Fatal(err)
}
i.SetBucket(b)
fields := []IndexField{
IndexField{
FieldName: "name",
Query: "name",
},
}
i.SetFields(fields...)
// step 2: create index
err = db.CreateIndex(i)
if err != nil {
log.Fatal(err)
}
indexFilePath := filepath.Join(db.DBPath, INDEXFOLDER, i.IndexNameQuery)
err = LoadFDBIndexFromFile(indexFilePath, db, i.IndexID)
if err != nil {
log.Fatal(err)
}
// data, _ := sjson.Set("", "fileType", "Exam")
// data, _ = sjson.Set(data, "studentId", 1234)
studentObj := gjson.Result{}
// err = SaveDataInFDB("myfdb", i.IndexID, &studentObj)
// if err != nil {
// log.Fatal(err)
// }
queries := []string{`#[name=="ajay"]`}
infileIndexQueries := []string{`#[examId==MATH001]`}
// studentObj := gjson.Parse(data)
errList := DeleteDataFromFDB("myfdb", i.IndexID, &studentObj, queries, infileIndexQueries)
if len(errList) > 0 {
log.Fatal(errList)
}
}
func TestDeleteDataFromNormalBucket(t *testing.T) {
db, err := CreateFDBInstance("/home/vivekn/fdb_data", "myfdb", false)
if err != nil {
log.Fatal("CreateFDBInstance = ", err)
}
// step 1: create bucket
b := db.GetNewBucket("bucketPackType", false, &Bucket{})
b.SetBucketType(BucketTypeSimple)
i, err := db.GetNewIndex("studentId", true)
if err != nil {
log.Fatal(err)
}
i.SetBucket(b)
fields := []IndexField{
IndexField{
FieldName: "name",
Query: "name",
},
}
i.SetFields(fields...)
// step 2: create index
err = db.CreateIndex(i)
if err != nil {
log.Fatal(err)
}
indexFilePath := filepath.Join(db.DBPath, INDEXFOLDER, i.IndexNameQuery)
err = LoadFDBIndexFromFile(indexFilePath, db, i.IndexID)
if err != nil {
log.Fatal(err)
}
queries := []string{`#[name=="ajay"]`}
infileIndexQueries := []string{`#[examId==MATH001]`}
// data, _ := sjson.Set("", "fileType", "Exam")
// data, _ := sjson.Set(data, "studentId", 1234)
studentObj := gjson.Result{}
errList := DeleteDataFromFDB("myfdb", i.IndexID, &studentObj, queries, infileIndexQueries)
if len(errList) > 0 {
log.Fatal(errList)
}
}
package corefdb
import (
"time"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/cachemdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/dalmdl/lazywriter"
)
// Create Master lazy object
// IndexMaster - Holds master object for index files
var IndexMaster lazywriter.LazyFDBHelper
// IndexLazyObjHolder - Holds lazy cache objects for indexes, indexID as key and lazy Object as value
var IndexLazyObjHolder cachemdl.FastCacheHelper
const (
maxObjetsCnt int = 100000
maxRetryCnt int = 5
intervalTime int = 5
sleepTime int = 5
lazyIndexProcessName string = "lazyIndex"
)
func init() {
// start process for lazy index file operations
IndexMaster.StartProcess(maxObjetsCnt, lazyIndexProcessName, intervalTime, sleepTime, maxRetryCnt, false)
// loggermdl.LogError("Lazy writer process started")
// initialize a map to hold lazy writer objects for the index file data
IndexLazyObjHolder.Setup(10000, time.Second*600, time.Second*600)
// loggermdl.LogError("cache setup done")
}
......@@ -65,8 +65,13 @@ type LazyCacheObject struct {
MEMORY_READ_COUNT int
DISK_READ_COUNT int
DISK_WRITE_COUNT int
SaveFn SaveDataFn
}
// SaveDataFn - This is an user defined callback function executed to presist data. If not provided default save function will be executed.
type SaveDataFn func(key string, value LazyCacheObject)
// StartProcess
func (lfd *LazyFDBHelper) StartProcess(objectCount int, taskName string,
intervalTime int, sleepTime int, maxNumberOfRetry int, isDebugMode bool) {
......@@ -133,7 +138,14 @@ func (lfd *LazyFDBHelper) saveObjectsToFdb() {
if cacheObjActual.ChangeCount > 0 {
cacheObjActual.IsLocked = true
saveDataToFDB(cacheObjActual.FileName, cacheObjActual.InterfaceData, cacheObjActual.GJSONData)
// TODO: Catch errors from save function
if cacheObjActual.SaveFn == nil {
loggermdl.LogError("Executing default function")
saveDataToFDB(cacheObjActual.FileName, cacheObjActual.InterfaceData, cacheObjActual.GJSONData)
} else {
loggermdl.LogError("Executing custom function")
cacheObjActual.SaveFn(item, cacheObjActual)
}
cacheObjActual.ChangeCount = 0
cacheObjActual.IsLocked = false
lfd.gc.Set(cacheObjActual.FileName, cacheObjActual, lfd.CacheExpirationTime)
......@@ -148,6 +160,7 @@ func (lfd *LazyFDBHelper) saveObjectsToFdb() {
// PerformanceAnalyser[cacheObjActual.FileName] = lazyCacheObject
// }
lazyMutex.Unlock()
loggermdl.LogError("changes saved to disk at ", cacheObjActual.FileName)
}
}
lfd.NumberOfUpdateAttempts = lfd.NumberOfUpdateAttempts + 1
......@@ -209,6 +222,7 @@ retrylabel:
// lazyCacheObject.MEMORY_WRITE_COUNT++
// PerformanceAnalyser[newObject.FileName] = lazyCacheObject
// }
loggermdl.LogError("data updated in cache")
return true
}
......
......@@ -2,6 +2,7 @@ package mysql
import (
"database/sql"
"database/sql/driver"
"strconv"
"strings"
"sync"
......@@ -229,7 +230,7 @@ func (md *MySQLDAO) ExecQuery(query string, args ...interface{}) (string, error)
return "", errormdl.CheckErr(connectionError)
}
pingError := connection.Ping()
if errormdl.CheckErr(pingError) != nil {
if errormdl.CheckErr(pingError) != nil && pingError != driver.ErrBadConn {
loggermdl.LogError(pingError)
return "", errormdl.CheckErr(pingError)
}
......@@ -262,7 +263,7 @@ func (md *MySQLDAO) SelectQuery(query string, args ...interface{}) (*gjson.Resul
// loggermdl.LogSpot(connection)
pingError := connection.Ping()
if errormdl.CheckErr(pingError) != nil {
if errormdl.CheckErr(pingError) != nil && pingError != driver.ErrBadConn {
loggermdl.LogError(pingError)
return nil, errormdl.CheckErr(pingError)
}
......
......@@ -22,8 +22,16 @@ var (
filePtrs cachemdl.FastCacheHelper
)
const (
// TempDir - Serves as tmp directory for atomic file operations
TempDir = "tmp"
)
func init() {
filePtrs.Setup(100000, time.Hour*72, time.Hour*72)
if !FileAvailabilityCheck(TempDir) {
CreateDirectory(TempDir) // Create directory "tmp" at app root
}
}
// TODO: Symbolic link evelution for this package not supported in windows .symlink file
......@@ -627,7 +635,7 @@ func FastWriteFile(filePath string, data []byte, makeDir bool, createBackup bool
func writeFileSafely(filePath string, data []byte, perm os.FileMode) error {
_, name := filepath.Split(filePath)
tmpFile, err := ioutil.TempFile("", name)
tmpFile, err := ioutil.TempFile(TempDir, name)
if err != nil {
return err
}
......@@ -659,8 +667,9 @@ func writeFileSafely(filePath string, data []byte, perm os.FileMode) error {
return errormdl.Wrap("can't set filemode on tempfile: " + tmpFileName + ", error: " + err.Error())
}
}
loggermdl.LogError(tmpFileName)
if err := AtomicReplaceFile(tmpFileName, filePath); err != nil {
loggermdl.LogError("Atomic replace failed - ", err)
return errormdl.Wrap("cannot replace " + filePath + " with " + tmpFileName)
}
return nil
......
......@@ -47,6 +47,7 @@ func Init(tomlFilepath string) error {
type Email struct {
from string
replyTo string
to []string
cc []string
bcc []string
......@@ -56,9 +57,10 @@ type Email struct {
plainBody string // alternate text if template fails
}
func NewEmail(to, cc, bcc, attachments []string, from, subject, body string) *Email {
func NewEmail(to, cc, bcc, attachments []string, from, replyTo, subject, body string) *Email {
return &Email{
from: from,
replyTo: replyTo,
to: to,
cc: cc,
bcc: bcc,
......@@ -148,6 +150,7 @@ func (email *Email) Send() error {
message := gomail.NewMessage()
message.SetHeader("From", email.from)
message.SetHeader("To", email.to...)
message.SetHeader("Reply-To", email.replyTo)
message.SetHeader("Cc", email.cc...)
message.SetHeader("Bcc", email.bcc...)
message.SetHeader("Subject", email.subject)
......
......@@ -58,6 +58,7 @@ func (email *Email) SendMailFromSLS(hostName string) error {
message := gomail.NewMessage()
message.SetHeader("From", email.from)
message.SetHeader("To", email.to...)
message.SetHeader("Reply-To", email.replyTo)
message.SetHeader("Cc", email.cc...)
message.SetHeader("Bcc", email.bcc...)
message.SetHeader("Subject", email.subject)
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment