filequeueing.go 5.57 KiB
Newer Older
package filemdl

import (
	"bytes"
	"container/list"
	"errors"
	"io"
	"sync"

	"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/constantmdl"

	"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/errormdl"
	"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/loggermdl"
	"corelab.mkcl.org/MKCLOS/coredevelopmentplatform/corepkgv2/workerpoolmdl"
)

// Filemdl Object
type Filemdl struct {
	requestList *list.List
	lock        sync.Mutex
	taskCount   int
	concurrency int
	Error       error
}

// enqueObject is used for enquing writing request in list
type enqueObject struct {
Roshan Patil's avatar
Roshan Patil committed
	writer       *io.PipeWriter
	Reader       *io.PipeReader
	FilePath     string
	Data         []byte
	makeDir      bool
	createBackup bool
}

var instance *Filemdl
var once sync.Once
var trigger sync.Once

Roshan Patil's avatar
Roshan Patil committed
// init initalises once and initiate new list object
func init() {
	once.Do(func() {
		instance = &Filemdl{
			requestList: list.New(),
		}
Roshan Patil's avatar
Roshan Patil committed
		instance.taskCount = constantmdl.TASKCOUNT
		instance.concurrency = constantmdl.TASKCOUNT
Roshan Patil's avatar
Roshan Patil committed
// Init initializes Filemdl object with parameters
func Init(taskCount, concurrency int) {
	instance.taskCount = taskCount
	instance.concurrency = concurrency
	if taskCount == 0 {
		instance.taskCount = constantmdl.TASKCOUNT
		instance.concurrency = constantmdl.TASKCOUNT
	}
}

// GetInstance return single instance of filemdl object
func GetInstance() *Filemdl {
	if instance == nil || instance.requestList == nil {
		instance = &Filemdl{}
		loggermdl.LogError("filemdl does not initialise")
		instance.Error = errors.New("filemdl does not initialise")
	}
	return instance
}

// getEnqueueObject returns new enques object for each write request
Roshan Patil's avatar
Roshan Patil committed
func getEnqueueObject(filePath string, ba []byte, makeDir, createBackup bool) enqueObject {
	r, w := io.Pipe()
	obj := enqueObject{}
	obj.writer = w
	obj.Reader = r
	obj.Data = ba
	obj.FilePath = filePath
Roshan Patil's avatar
Roshan Patil committed
	obj.makeDir = makeDir
	obj.createBackup = createBackup
	return obj
}

// enqueueingRequest push request in list from back
Roshan Patil's avatar
Roshan Patil committed
func (filemdl *Filemdl) enqueueingRequest(filePath string, ba []byte, makeDir, createBackup bool) *enqueObject {
	en := getEnqueueObject(filePath, ba, makeDir, createBackup)
	filemdl.lock.Lock()
	filemdl.requestList.PushBack(en)
	filemdl.lock.Unlock()
	return &en
}

// Save Enque the requested object
Roshan Patil's avatar
Roshan Patil committed
func (filemdl *Filemdl) Save(filePath string, ba []byte, makeDir, createBackup bool) error {
	if filemdl.Error != nil {
		loggermdl.LogError(filemdl.Error)
		return filemdl.Error
	}
Roshan Patil's avatar
Roshan Patil committed
	en := filemdl.enqueueingRequest(filePath, ba, makeDir, createBackup)
	trigger.Do(func() {
		go filemdl.triggerWritingData()
	})
	buf := new(bytes.Buffer)
	buf.ReadFrom(en.Reader)
	if len(buf.Bytes()) != 0 {
		err := errormdl.Wrap(buf.String())
		loggermdl.LogError(err)
		return err
	}
	en.Reader.Close()
	return nil
}

// triggerWritingData triggers the writing thread to write data (only once)
func (filemdl *Filemdl) triggerWritingData() {
	tasks := []*workerpoolmdl.Task{}
	fileList := make(map[string]string)
	deleteStatus := false
	el := filemdl.getFirstElement()
	if el == nil {
		return
	}
	// This logic will execute very first time
	en := el.Value.(enqueObject)
	if _, ok := fileList[en.FilePath]; !ok {
		tasks = append(tasks, workerpoolmdl.NewTask(en, writeQueueData))
		fileList[en.FilePath] = ""
		deleteStatus = true
	}
	tmp := el
	el = el.Next()
	if deleteStatus {
		filemdl.deleteElement(el, tmp)
	}
	tasks, el, fileList = filemdl.listTasks(tasks, el, fileList)
}

// listTasks collect task as per threshold from list
func (filemdl *Filemdl) listTasks(tasks []*workerpoolmdl.Task, el *list.Element, fileList map[string]string) ([]*workerpoolmdl.Task, *list.Element, map[string]string) {
	for {
		deleteStatus := false
		if el == nil {
			if len(tasks) > 0 {
				tasks, fileList = filemdl.runTask(tasks)
			}
			el = filemdl.getFirstElement()
			continue
		}
		if len(tasks) == filemdl.taskCount {
			tasks, fileList = filemdl.runTask(tasks)
			el = filemdl.getFirstElement()
			continue
		}
		en := el.Value.(enqueObject)
		// this will check if task contains only unique files
		if _, ok := fileList[en.FilePath]; !ok {
			tasks = append(tasks, workerpoolmdl.NewTask(en, writeQueueData))
			fileList[en.FilePath] = ""
			deleteStatus = true
		}
		tmp := el
		el = el.Next()
		if deleteStatus {
			filemdl.deleteElement(el, tmp)
		}
	}
}

// runTask runs the collected task from list using workpool
func (filemdl *Filemdl) runTask(tasks []*workerpoolmdl.Task) ([]*workerpoolmdl.Task, map[string]string) {
	p := workerpoolmdl.NewPool(tasks, filemdl.concurrency)
	p.Run()
	tasks = []*workerpoolmdl.Task{}
	fileList := make(map[string]string)
	return tasks, fileList
}

// getFirstElement return front element from list
func (filemdl *Filemdl) getFirstElement() *list.Element {
	defer filemdl.lock.Unlock()
	filemdl.lock.Lock()
	return filemdl.requestList.Front()
}

// deleteElement delete given element from list
func (filemdl *Filemdl) deleteElement(el, tmp *list.Element) {
	filemdl.lock.Lock()
	// if current element is not nil, it will delete its prevevious else delete current element
	if el != nil {
		filemdl.requestList.Remove(el.Prev())
	} else {
		filemdl.requestList.Remove(tmp)
	}
	filemdl.lock.Unlock()
}

// writeQueueData writes data in the file or any other stream
func writeQueueData(raw interface{}) error {
	en := raw.(enqueObject)
Roshan Patil's avatar
Roshan Patil committed
	err := WriteFile(en.FilePath, en.Data, en.makeDir, en.createBackup)
Roshan Patil's avatar
Roshan Patil committed
	defer en.writer.Close()
	// if error found in writing on any destinatio stream, error will be written on same pipe instance
Roshan Patil's avatar
Roshan Patil committed
	if errormdl.CheckErr(err) != nil {
		loggermdl.LogError(err)
		en.writer.Write([]byte(errormdl.CheckErr(err).Error()))
	}