Added unit tests for parallel background worker; fixed some issues with the parallel background worker; added failing unit test for almost equal checks with doubles; fixed a typo
git-svn-id: file:///srv/devel/repo-conversion/nusu@291 d2e56fa2-650e-0410-a79f-9358c0239efd
This commit is contained in:
parent
a18cb63fc5
commit
feac2b9c89
5 changed files with 334 additions and 41 deletions
|
@ -17,6 +17,12 @@ namespace Nuclex.Support {
|
|||
public static readonly int Processors = Environment.ProcessorCount;
|
||||
#endif
|
||||
|
||||
/// <summary>
|
||||
/// Timeout after which Dispose() will stop waiting for unfinished tasks and
|
||||
/// free resources anyway
|
||||
/// </summary>
|
||||
private static readonly int DefaultDisposeTimeoutMilliseconds = 1500; // milliseconds
|
||||
|
||||
/// <summary>Initializes a new parallel background worker with unlimited threads</summary>
|
||||
public ParallelBackgroundWorker() : this(int.MaxValue) { }
|
||||
|
||||
|
@ -40,11 +46,12 @@ namespace Nuclex.Support {
|
|||
threadCount = Math.Max(1, Processors + threadCount);
|
||||
}
|
||||
|
||||
this.runQueuedTasksInThreadDelegate = new Action<object>(runQueuedTasksInThread);
|
||||
this.runningThreads = new List<Task>();
|
||||
this.queueSynchronizationRoot = new object();
|
||||
this.runQueuedTasksInThreadDelegate = new Action(runQueuedTasksInThread);
|
||||
this.tasks = new Queue<TTask>();
|
||||
this.threadTerminatedEvent = new AutoResetEvent(false);
|
||||
this.cancellationTokenSource = new CancellationTokenSource();
|
||||
this.exceptions = new ConcurrentBag<Exception>();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
@ -81,7 +88,8 @@ namespace Nuclex.Support {
|
|||
if(this.threadTerminatedEvent != null) {
|
||||
CancelPendingTasks();
|
||||
CancelRunningTasks();
|
||||
Join();
|
||||
|
||||
Wait(DefaultDisposeTimeoutMilliseconds);
|
||||
|
||||
this.threadTerminatedEvent.Dispose();
|
||||
this.threadTerminatedEvent = null;
|
||||
|
@ -92,7 +100,6 @@ namespace Nuclex.Support {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/// <summary>Adds a task for processing by the background worker threads</summary>
|
||||
/// <param name="task">Task that will be processed in the background</param>
|
||||
public void AddTask(TTask task) {
|
||||
|
@ -100,26 +107,28 @@ namespace Nuclex.Support {
|
|||
return;
|
||||
}
|
||||
|
||||
bool needNewThread;
|
||||
|
||||
lock(this.queueSynchronizationRoot) {
|
||||
this.tasks.Enqueue(task);
|
||||
|
||||
if(this.runningThreads.Count < this.threadCount) {
|
||||
//Task newThread = new Task(this.runQueuedTasksInThreadDelegate, );
|
||||
needNewThread = (this.runningThreadCount < this.threadCount);
|
||||
if(needNewThread) {
|
||||
++this.runningThreadCount;
|
||||
}
|
||||
}
|
||||
|
||||
// Thread 1:
|
||||
// lock() {
|
||||
// - take task
|
||||
// - or deregister and exit
|
||||
// }
|
||||
//
|
||||
// Thread 2:
|
||||
// lock() {
|
||||
// - put task
|
||||
// - if too few threads, register and add
|
||||
// }
|
||||
|
||||
if(needNewThread) {
|
||||
Task newThread = new Task(
|
||||
this.runQueuedTasksInThreadDelegate,
|
||||
// this.cancellationTokenSource.Token, // DO NOT PASS THIS!
|
||||
// Passing the cancellation token makes tasks that have been queued but
|
||||
// not started yet cease to execute at all - meaning our runningThreadCount
|
||||
// goes out of sync and Dispose() / Wait() / Join() sit around endlessly!
|
||||
TaskCreationOptions.LongRunning
|
||||
);
|
||||
newThread.Start();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Cancels all tasks that are currently executing</summary>
|
||||
|
@ -134,11 +143,36 @@ namespace Nuclex.Support {
|
|||
}
|
||||
}
|
||||
|
||||
/// <summary>Waits until all executing and queued tasks have been processed</summary>
|
||||
/// <summary>
|
||||
/// Waits until all executing and queued tasks have been processed and throws an
|
||||
/// exception if any errors have occurred
|
||||
/// </summary>
|
||||
public void Join() {
|
||||
while(this.runningThreads.Count > 0) {
|
||||
while(Thread.VolatileRead(ref this.runningThreadCount) > 0) {
|
||||
this.threadTerminatedEvent.WaitOne();
|
||||
}
|
||||
|
||||
if(this.exceptions.Count > 0) {
|
||||
throw new AggregateException(this.exceptions);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Waits until all executing and queued tasks have been processed or
|
||||
/// the timeout is reached
|
||||
/// </summary>
|
||||
/// <param name="timeoutMilliseconds">Milliseconds after which the wait times out</param>
|
||||
/// <returns>
|
||||
/// True if all tasks have been processed, false if the timeout was reached
|
||||
/// </returns>
|
||||
public bool Wait(int timeoutMilliseconds) {
|
||||
while(Thread.VolatileRead(ref this.runningThreadCount) > 0) {
|
||||
if(this.threadTerminatedEvent.WaitOne(timeoutMilliseconds) == false) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/// <summary>Called in a thread to execute a single task</summary>
|
||||
|
@ -151,17 +185,23 @@ namespace Nuclex.Support {
|
|||
/// <summary>
|
||||
/// Runs queued tasks of the parallel background worker until the queue is empty
|
||||
/// </summary>
|
||||
/// <param name="thisTaskAsObject">Threading task in which this worker is running</param>
|
||||
private void runQueuedTasksInThread(object thisTaskAsObject) {
|
||||
private void runQueuedTasksInThread() {
|
||||
string previousThreadName = null;
|
||||
if(!string.IsNullOrEmpty(this.threadName)) {
|
||||
previousThreadName = Thread.CurrentThread.Name;
|
||||
Thread.CurrentThread.Name = this.threadName;
|
||||
}
|
||||
try {
|
||||
#if false
|
||||
TTask task;
|
||||
while(this.tasks.TryDequeue(out task)) {
|
||||
for(;;) {
|
||||
TTask task;
|
||||
lock(this.queueSynchronizationRoot) {
|
||||
if(this.tasks.Count == 0) {
|
||||
--this.runningThreadCount;
|
||||
break;
|
||||
}
|
||||
task = this.tasks.Dequeue();
|
||||
}
|
||||
|
||||
try {
|
||||
Run(task, this.cancellationTokenSource.Token);
|
||||
}
|
||||
|
@ -170,11 +210,7 @@ namespace Nuclex.Support {
|
|||
}
|
||||
}
|
||||
|
||||
lock(this.runningThreads) {
|
||||
this.runningThreads.Remove((Task)thisTaskAsObject);
|
||||
}
|
||||
this.threadTerminatedEvent.Set();
|
||||
#endif
|
||||
}
|
||||
finally {
|
||||
if(!string.IsNullOrEmpty(this.threadName)) {
|
||||
|
@ -236,19 +272,11 @@ namespace Nuclex.Support {
|
|||
private object queueSynchronizationRoot;
|
||||
|
||||
/// <summary>Delegate for the runQueuedTasksInThread() method</summary>
|
||||
private Action<object> runQueuedTasksInThreadDelegate;
|
||||
private Action runQueuedTasksInThreadDelegate;
|
||||
/// <summary>Tasks remaining to be processed</summary>
|
||||
private Queue<TTask> tasks;
|
||||
/// <summary>Threads that are currently executing tasks</summary>
|
||||
private IList<Task> runningThreads;
|
||||
|
||||
// Idea:
|
||||
// private int runningThreadCount;
|
||||
// Before the task looks for new work, it will decrement this
|
||||
// if the task gets new work, it will increment this again.
|
||||
// - if it would be above threadCount now, put work back in the queue
|
||||
// AddTask() increments this after pushing new work
|
||||
// - if it would be above threadCount, do not create a new thread
|
||||
private int runningThreadCount;
|
||||
|
||||
/// <summary>Exceptions that have occurred while executing tasks</summary>
|
||||
private ConcurrentBag<Exception> exceptions;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue