Newer
Older
package filemdl
import (
"bytes"
"container/list"
"errors"
"io"
"sync"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/constantmdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/errormdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/loggermdl"
"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/workerpoolmdl"
)
// Filemdl Object
type Filemdl struct {
requestList *list.List
lock sync.Mutex
taskCount int
concurrency int
Error error
}
// enqueObject is used for enquing writing request in list
type enqueObject struct {
writer *io.PipeWriter
Reader *io.PipeReader
FilePath string
Data []byte
makeDir bool
createBackup bool
}
var instance *Filemdl
var once sync.Once
var trigger sync.Once
// init initalises once and initiate new list object
func init() {
once.Do(func() {
instance = &Filemdl{
requestList: list.New(),
}
instance.taskCount = constantmdl.TASKCOUNT
instance.concurrency = constantmdl.TASKCOUNT
// Init initializes Filemdl object with parameters
func Init(taskCount, concurrency int) {
instance.taskCount = taskCount
instance.concurrency = concurrency
if taskCount == 0 {
instance.taskCount = constantmdl.TASKCOUNT
instance.concurrency = constantmdl.TASKCOUNT
}
}
// GetInstance return single instance of filemdl object
func GetInstance() *Filemdl {
if instance == nil || instance.requestList == nil {
instance = &Filemdl{}
loggermdl.LogError("filemdl does not initialise")
instance.Error = errors.New("filemdl does not initialise")
}
return instance
}
// getEnqueueObject returns new enques object for each write request
func getEnqueueObject(filePath string, ba []byte, makeDir, createBackup bool) enqueObject {
r, w := io.Pipe()
obj := enqueObject{}
obj.writer = w
obj.Reader = r
obj.Data = ba
obj.FilePath = filePath
obj.makeDir = makeDir
obj.createBackup = createBackup
return obj
}
// enqueueingRequest push request in list from back
func (filemdl *Filemdl) enqueueingRequest(filePath string, ba []byte, makeDir, createBackup bool) *enqueObject {
en := getEnqueueObject(filePath, ba, makeDir, createBackup)
filemdl.lock.Lock()
filemdl.requestList.PushBack(en)
filemdl.lock.Unlock()
return &en
}
// Save Enque the requested object
func (filemdl *Filemdl) Save(filePath string, ba []byte, makeDir, createBackup bool) error {
if filemdl.Error != nil {
loggermdl.LogError(filemdl.Error)
return filemdl.Error
}
en := filemdl.enqueueingRequest(filePath, ba, makeDir, createBackup)
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
trigger.Do(func() {
go filemdl.triggerWritingData()
})
buf := new(bytes.Buffer)
buf.ReadFrom(en.Reader)
if len(buf.Bytes()) != 0 {
err := errormdl.Wrap(buf.String())
loggermdl.LogError(err)
return err
}
en.Reader.Close()
return nil
}
// triggerWritingData triggers the writing thread to write data (only once)
func (filemdl *Filemdl) triggerWritingData() {
tasks := []*workerpoolmdl.Task{}
fileList := make(map[string]string)
deleteStatus := false
el := filemdl.getFirstElement()
if el == nil {
return
}
// This logic will execute very first time
en := el.Value.(enqueObject)
if _, ok := fileList[en.FilePath]; !ok {
tasks = append(tasks, workerpoolmdl.NewTask(en, writeQueueData))
fileList[en.FilePath] = ""
deleteStatus = true
}
tmp := el
el = el.Next()
if deleteStatus {
filemdl.deleteElement(el, tmp)
}
tasks, el, fileList = filemdl.listTasks(tasks, el, fileList)
}
// listTasks collect task as per threshold from list
func (filemdl *Filemdl) listTasks(tasks []*workerpoolmdl.Task, el *list.Element, fileList map[string]string) ([]*workerpoolmdl.Task, *list.Element, map[string]string) {
for {
deleteStatus := false
if el == nil {
if len(tasks) > 0 {
tasks, fileList = filemdl.runTask(tasks)
}
el = filemdl.getFirstElement()
continue
}
if len(tasks) == filemdl.taskCount {
tasks, fileList = filemdl.runTask(tasks)
el = filemdl.getFirstElement()
continue
}
en := el.Value.(enqueObject)
// this will check if task contains only unique files
if _, ok := fileList[en.FilePath]; !ok {
tasks = append(tasks, workerpoolmdl.NewTask(en, writeQueueData))
fileList[en.FilePath] = ""
deleteStatus = true
}
tmp := el
el = el.Next()
if deleteStatus {
filemdl.deleteElement(el, tmp)
}
}
}
// runTask runs the collected task from list using workpool
func (filemdl *Filemdl) runTask(tasks []*workerpoolmdl.Task) ([]*workerpoolmdl.Task, map[string]string) {
p := workerpoolmdl.NewPool(tasks, filemdl.concurrency)
p.Run()
tasks = []*workerpoolmdl.Task{}
fileList := make(map[string]string)
return tasks, fileList
}
// getFirstElement return front element from list
func (filemdl *Filemdl) getFirstElement() *list.Element {
defer filemdl.lock.Unlock()
filemdl.lock.Lock()
return filemdl.requestList.Front()
}
// deleteElement delete given element from list
func (filemdl *Filemdl) deleteElement(el, tmp *list.Element) {
filemdl.lock.Lock()
// if current element is not nil, it will delete its prevevious else delete current element
if el != nil {
filemdl.requestList.Remove(el.Prev())
} else {
filemdl.requestList.Remove(tmp)
}
filemdl.lock.Unlock()
}
// writeQueueData writes data in the file or any other stream
func writeQueueData(raw interface{}) error {
en := raw.(enqueObject)
err := WriteFile(en.FilePath, en.Data, en.makeDir, en.createBackup)
// if error found in writing on any destinatio stream, error will be written on same pipe instance
if errormdl.CheckErr(err) != nil {
loggermdl.LogError(err)
en.writer.Write([]byte(errormdl.CheckErr(err).Error()))
}