Commit c6012fd3 authored by Ajit Jagtap's avatar Ajit Jagtap
Browse files

Added WorkerPool

parent bac7a2fb
1 merge request!23Devbranch to Master
Showing with 157 additions and 3 deletions
...@@ -26,9 +26,6 @@ func TestGetDataDAO_WithRaw(t *testing.T) { ...@@ -26,9 +26,6 @@ func TestGetDataDAO_WithRaw(t *testing.T) {
//pass same result again //pass same result again
result, err = GetDataDAO("../../testingdata/users.json", "*", true, result) result, err = GetDataDAO("../../testingdata/users.json", "*", true, result)
assert.NoError(t, err, "This should not return error") assert.NoError(t, err, "This should not return error")
// a := result.String()
// assert.NotZero(t, a, "Should give len")
} }
func TestGetDataDAO_WithFileReadError(t *testing.T) { func TestGetDataDAO_WithFileReadError(t *testing.T) {
errormdl.IsTestingNegetiveCaseOn = true errormdl.IsTestingNegetiveCaseOn = true
......
package workerpoolmdl
import (
"sync"
)
// HOW TO USE IT
//func MyWorker(data interface{}) error {
// fmt.Println("Process Started", data)
// // time.Sleep(time.Second * 1)
// fmt.Println("Process Ended", data)
// return nil
// // return errors.New("emit macho dwarf: elf header corrupted")
// }
// func main() {
// tasks := []*workerpool.Task{}
// for index := 0; index < 100; index++ {
// tasks = append(tasks, workerpool.NewTask(index, MyWorker))
// }
// p := workerpool.NewPool(tasks, 100)
// p.Run()
// var numErrors int
// for _, task := range p.Tasks {
// if task.Err != nil {
// fmt.Println(task.Err)
// numErrors++
// }
// if numErrors >= 10 {
// fmt.Println("Too many errors.")
// break
// }
// }
// }
// Pool is a worker group that runs a number of tasks at a
// configured concurrency.
type Pool struct {
Tasks []*Task
concurrency int
tasksChan chan *Task
wg sync.WaitGroup
}
// NewPool initializes a new pool with the given tasks and
// at the given concurrency.
func NewPool(tasks []*Task, concurrency int) *Pool {
return &Pool{
Tasks: tasks,
concurrency: concurrency,
tasksChan: make(chan *Task),
}
}
// Run runs all work within the pool and blocks until it's
// finished.
func (p *Pool) Run() {
for i := 0; i < p.concurrency; i++ {
go p.work()
}
p.wg.Add(len(p.Tasks))
for _, task := range p.Tasks {
p.tasksChan <- task
}
// all workers return
close(p.tasksChan)
p.wg.Wait()
}
// The work loop for any single goroutine.
func (p *Pool) work() {
for task := range p.tasksChan {
task.Run(&p.wg)
}
}
// Task encapsulates a work item that should go in a work
// pool.
type Task struct {
// Err holds an error that occurred during a task. Its
// result is only meaningful after Run has been called
// for the pool that holds it.
Err error
Data interface{}
f func(data interface{}) error
}
// NewTask initializes a new task based on a given work
// function.
func NewTask(d interface{}, f func(data interface{}) error) *Task {
return &Task{Data: d, f: f}
}
// Run runs a Task and does appropriate accounting via a
// given sync.WorkGroup.
func (t *Task) Run(wg *sync.WaitGroup) {
t.Err = t.f(t.Data)
wg.Done()
}
package workerpoolmdl
import (
"fmt"
"sync"
"testing"
"github.com/stretchr/testify/assert"
)
var globalvar int
var mutex = &sync.Mutex{}
func TestWorkerPool(t *testing.T) {
globalvar = 0
// HOW TO USE IT
fn := func(data interface{}) error {
mutex.Lock()
globalvar++
mutex.Unlock()
return nil
}
tasks := []*Task{}
for index := 0; index < 100; index++ {
tasks = append(tasks, NewTask(index, fn))
}
p := NewPool(tasks, 100)
p.Run()
var numErrors int
for _, task := range p.Tasks {
if task.Err != nil {
fmt.Println(task.Err)
numErrors++
}
if numErrors >= 10 {
fmt.Println("Too many errors.")
break
}
}
assert.Equal(t, 100, globalvar, "Count should match")
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment