Commit ef83b300 authored by Kunal Taitkar's avatar Kunal Taitkar
Browse files

Workerpool with cancellable context

parent ff2d11ad
2 merge requests!11915 Oct MEP Merge Dev to Stg,!117Workerpool with cancellable context
Showing with 183 additions and 0 deletions
package workerpoolmdl
import (
"context"
)
// PoolWithContext is a worker group that runs a number of tasks at a
// configured concurrency.
type PoolWithContext struct {
Tasks []*TaskWithContext
concurrency int
tasksChan chan *TaskWithContext
CancelHandler *cancelHandler
IsPoolCanceled bool
}
type cancelHandler struct {
Ctx context.Context
CancelFunc context.CancelFunc
}
// NewPoolWithContext initializes a new pool with the given tasks and
// at the given concurrency.
func NewPoolWithContext(tasks []*TaskWithContext, concurrency int) *PoolWithContext {
cntx, cancelFunction := context.WithCancel(context.Background())
obj := cancelHandler{}
obj.Ctx = cntx
obj.CancelFunc = cancelFunction
return &PoolWithContext{
Tasks: tasks,
concurrency: concurrency,
tasksChan: make(chan *TaskWithContext),
CancelHandler: &obj,
}
}
// Run runs all work within the pool
func (p *PoolWithContext) Run() {
for i := 0; i < p.concurrency; i++ {
go p.work()
}
for _, task := range p.Tasks {
if p.IsPoolCanceled {
return
} else {
p.tasksChan <- task
}
}
close(p.tasksChan)
}
// The work loop for any single goroutine.
func (p *PoolWithContext) work() {
for task := range p.tasksChan {
task.Run(p)
}
}
// TaskWithContext encapsulates a work item that should go in a work
// pool.
type TaskWithContext struct {
// Err holds an error that occurred during a task. Its
// result is only meaningful after Run has been called
// for the pool that holds it.
Err error
Data interface{}
f func(data interface{}) error
}
// NewTaskWithContext initializes a new task based on a given work
// function.
func NewTaskWithContext(d interface{}, f func(data interface{}) error) *TaskWithContext {
return &TaskWithContext{Data: d, f: f}
}
// Run runs a Task and does appropriate accounting via a
func (t *TaskWithContext) Run(p *PoolWithContext) {
for {
select {
case <-p.CancelHandler.Ctx.Done():
return
default:
t.Err = t.f(t.Data)
return
}
}
}
//Cancel all tasks
func (p *PoolWithContext) Cancel() {
if p != nil {
p.CancelHandler.CancelFunc()
p.IsPoolCanceled = true
}
}
package workerpoolmdl
import (
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
var count int
var countMutex = &sync.Mutex{}
var cancelTriggered bool
//TestWorkerPoolWithContext - test for cancel trigger if count >= 500
func TestWorkerPoolWithContext(t *testing.T) {
count = 0
tasks := []*TaskWithContext{}
for index := 0; index < 1000; index++ {
tasks = append(tasks, NewTaskWithContext(index, incrementCount))
}
pool := NewPoolWithContext(tasks, 10)
ticker := time.NewTicker(1 * time.Millisecond)
go func() {
for range ticker.C {
if count > 500 {
fmt.Println("cancelling tasks...")
pool.Cancel()
return
}
}
}()
pool.Run()
assert.GreaterOrEqual(t, count, 500, "Count be greater than or equals to 500")
}
//TestWorkerpoolWithoutCancel - test without cancel trigger
func TestWorkerpoolWithoutCancel(t *testing.T) {
count = 0
tasks := []*TaskWithContext{}
for index := 0; index < 1000; index++ {
tasks = append(tasks, NewTaskWithContext(index, incrementCount))
}
pool := NewPoolWithContext(tasks, 10)
pool.Run()
assert.Equal(t, count, 1000, "Count should be equals to 1000")
}
//incrementCount- increment count by 1
func incrementCount(data interface{}) error {
countMutex.Lock()
count++
countMutex.Unlock()
return nil
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment