diff --git a/Nuclex.Support (net-4.0).csproj b/Nuclex.Support (net-4.0).csproj
index c5593b1..9378f3d 100644
--- a/Nuclex.Support (net-4.0).csproj
+++ b/Nuclex.Support (net-4.0).csproj
@@ -215,6 +215,7 @@
LicenseKey.cs
+
CommandLine.cs
diff --git a/Source/FloatHelper.Test.cs b/Source/FloatHelper.Test.cs
index accf4d4..15a27c5 100644
--- a/Source/FloatHelper.Test.cs
+++ b/Source/FloatHelper.Test.cs
@@ -148,7 +148,12 @@ namespace Nuclex.Support {
// If both are negative -> fine
// If both are positive -> fine
// If different -> Measure both distances to zero in ulps and sum them
- public void NegativeZeroEqualsPositiveZero() {
+ ///
+ /// Verifies that the negative floating point zero is within one ulp of the positive
+ /// floating point zero and vice versa
+ ///
+ [Test]
+ public void NegativeZeroFloatEqualsPositiveZero() {
float zero = 0.0f;
float zeroPlusOneUlp = FloatHelper.ReinterpretAsFloat(
FloatHelper.ReinterpretAsInt(zero) + 1
@@ -163,6 +168,26 @@ namespace Nuclex.Support {
Assert.IsTrue(FloatHelper.AreAlmostEqual(zero, zeroMinusOneUlp, 1));
}
+ ///
+ /// Verifies that the negative double precision floating point zero is within one ulp
+ /// of the positive double precision floating point zero and vice versa
+ ///
+ [Test]
+ public void NegativeZeroDoubleEqualsPositiveZero() {
+ double zero = 0.0;
+ double zeroPlusOneUlp = FloatHelper.ReinterpretAsDouble(
+ FloatHelper.ReinterpretAsLong(zero) + 1
+ );
+ double zeroMinusOneUlp = -zeroPlusOneUlp;
+
+ bool test = FloatHelper.AreAlmostEqual(zeroMinusOneUlp, zeroPlusOneUlp, 1);
+
+ Assert.IsFalse(FloatHelper.AreAlmostEqual(zero, zeroPlusOneUlp, 0));
+ Assert.IsTrue(FloatHelper.AreAlmostEqual(zero, zeroPlusOneUlp, 1));
+ Assert.IsFalse(FloatHelper.AreAlmostEqual(zero, zeroMinusOneUlp, 0));
+ Assert.IsTrue(FloatHelper.AreAlmostEqual(zero, zeroMinusOneUlp, 1));
+ }
+
}
} // namespace Nuclex.Support
diff --git a/Source/IntegerHelper.cs b/Source/IntegerHelper.cs
index e06f7c5..4ce1256 100644
--- a/Source/IntegerHelper.cs
+++ b/Source/IntegerHelper.cs
@@ -77,7 +77,7 @@ namespace Nuclex.Support {
return value;
}
- /// Returns the number of bits set in an
+ /// Returns the number of bits set in an integer
/// Value whose bits will be counted
/// The number of bits set in the integer
public static int CountBits(this int value) {
diff --git a/Source/ParallelBackgroundWorker.Test.cs b/Source/ParallelBackgroundWorker.Test.cs
new file mode 100644
index 0000000..f8835e1
--- /dev/null
+++ b/Source/ParallelBackgroundWorker.Test.cs
@@ -0,0 +1,239 @@
+#region CPL License
+/*
+Nuclex Framework
+Copyright (C) 2002-2013 Nuclex Development Labs
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the IBM Common Public License as
+published by the IBM Corporation; either version 1.0 of the
+License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+IBM Common Public License for more details.
+
+You should have received a copy of the IBM Common Public
+License along with this library
+*/
+#endregion
+
+using System;
+using System.IO;
+
+#if UNITTEST
+
+using NUnit.Framework;
+using System.Threading;
+using System.Collections.Generic;
+
+namespace Nuclex.Support {
+
+ /// Unit Test for the parallel background worker class
+ [TestFixture]
+ public class ParallelBackgroundWorkerTest {
+
+ private class TestWorker : ParallelBackgroundWorker {
+
+ /// Initializes a new parallel background worker with unlimited threads
+ public TestWorker() : base() { }
+
+ ///
+ /// Initializes a new parallel background worker running the specified number
+ /// of tasks in parallel
+ ///
+ ///
+ /// Number of tasks to run in parallel (if positive) or number of CPU cores to leave
+ /// unused (if negative).
+ ///
+ ///
+ /// If a negative number of threads is used, at least one thread will be always
+ /// be created, so specifying -2 on a single-core system will still occupy
+ /// the only core.
+ ///
+ public TestWorker(int threadCount) : base(threadCount) {}
+
+ ///
+ /// Initializes a new parallel background worker that uses the specified name for
+ /// its worker threads.
+ ///
+ /// Name that will be assigned to the worker threads
+ public TestWorker(string name) : base(name) {}
+
+ ///
+ /// Initializes a new parallel background worker that uses the specified name for
+ /// its worker threads and running the specified number of tasks in parallel.
+ ///
+ /// Name that will be assigned to the worker threads
+ ///
+ /// Number of tasks to run in parallel (if positive) or number of CPU cores to leave
+ /// unused (if negative).
+ ///
+ ///
+ /// If a negative number of threads is used, at least one thread will be always
+ /// be created, so specifying -2 on a single-core system will still occupy
+ /// the only core.
+ ///
+ public TestWorker(string name, int threadCount) : base(name, threadCount) { }
+
+ /// Called in a thread to execute a single task
+ /// Task that should be executed
+ ///
+ /// Cancellation token through which the method can be signalled to cancel
+ ///
+ protected override void Run(object task, CancellationToken cancellationToken) {
+ if(this.ThrowException) {
+ throw new Exception("Something went wrong");
+ }
+
+ if(this.WaitEvent != null) {
+ this.WaitEvent.WaitOne();
+ }
+
+ this.WasCancelled = cancellationToken.IsCancellationRequested;
+
+ if(this.Tasks != null) {
+ this.Tasks.Add(task);
+ }
+ }
+
+ public bool ThrowException;
+ public ManualResetEvent WaitEvent;
+ public bool WasCancelled;
+ public ICollection Tasks;
+
+ }
+
+ /// Verifies that the background worker has a default constructor
+ [Test]
+ public void CanBeDefaultConstructed() {
+ using(new TestWorker()) { }
+ }
+
+ ///
+ /// Verifies that a background worker can be constructed that uses a fixed number
+ /// of threads
+ ///
+ [Test]
+ public void CanUseFixedNumberOfThreads() {
+ using(new TestWorker(4)) { }
+ }
+
+ ///
+ /// Verifies that a background worker can be constructed that leaves free a fixed
+ /// number of CPU cores
+ ///
+ [Test]
+ public void CanPreserveFixedNumberOfCores() {
+ using(new TestWorker(-2)) { }
+ }
+
+ ///
+ /// Verifies that a background worker can be constructed using a specific name
+ /// for its worker threads
+ ///
+ [Test]
+ public void CanUseNamedThreads() {
+ using(new TestWorker("Test Task Thread")) { }
+ }
+
+ ///
+ /// Verifies that a background worker can be constructed that uses a fixed number
+ /// of threads using a specific name
+ ///
+ [Test]
+ public void CanUseFixedNumberOfNamedThreads() {
+ using(new TestWorker("Test Task Thread", 4)) { }
+ }
+
+ ///
+ /// Verifies that a background worker can be constructed that leaves free a fixed
+ /// number of CPU cores and uses a specific name for its worker threads.
+ ///
+ [Test]
+ public void CanPreserveFixedNumberOfCoresAndUseNamedThreads() {
+ using(new TestWorker("Test Task Thread", -2)) { }
+ }
+
+ ///
+ /// Verifies that exceptions happening inside the tasks are collected and re-thrown
+ /// in the Join() method.
+ ///
+ [Test]
+ public void ExceptionsAreReThrownInJoin() {
+ using(var testWorker = new TestWorker()) {
+ testWorker.ThrowException = true;
+ testWorker.AddTask(new object());
+ testWorker.AddTask(new object());
+
+ Assert.Throws(
+ () => {
+ testWorker.Join();
+ }
+ );
+
+ try {
+ testWorker.Join();
+ Assert.Fail(
+ "Calling ParallelBackgroundWorker.Join() multiple times should re-throw " +
+ "exceptions multiple times"
+ );
+ }
+ catch(AggregateException aggregateException) {
+ Assert.AreEqual(2, aggregateException.InnerExceptions.Count);
+ }
+ }
+ }
+
+ ///
+ /// Verifies that tasks can be cancelled while they are running
+ ///
+ [Test]
+ public void TasksCanBeCancelled() {
+ using(var waitEvent = new ManualResetEvent(false)) {
+ using(var testWorker = new TestWorker()) {
+ testWorker.WaitEvent = waitEvent;
+
+ testWorker.AddTask(new object());
+ testWorker.CancelRunningTasks();
+
+ waitEvent.Set();
+
+ Assert.IsTrue(testWorker.Wait(1000));
+
+ Assert.IsTrue(testWorker.WasCancelled);
+ }
+ } // disposes waitEvent
+ }
+
+ /// Verifies that calling Join() waits for all queued tasks
+ [Test]
+ public void JoinWaitsForQueuedTasks() {
+ var tasks = new List(100);
+ for(int index = 0; index < 100; ++index) {
+ tasks.Add(new object());
+ }
+
+ using(var waitEvent = new ManualResetEvent(false)) {
+ using(var testWorker = new TestWorker(2)) {
+ testWorker.WaitEvent = waitEvent;
+ testWorker.Tasks = new List();
+ for(int index = 0; index < 100; ++index) {
+ testWorker.AddTask(tasks[index]);
+ }
+
+ CollectionAssert.IsEmpty(testWorker.Tasks);
+
+ waitEvent.Set();
+ testWorker.Join();
+
+ CollectionAssert.AreEquivalent(tasks, testWorker.Tasks);
+ }
+ } // disposes waitEvent
+ }
+
+ }
+
+} // namespace Nuclex.Support
+
+#endif // UNITTEST
diff --git a/Source/ParallelBackgroundWorker.cs b/Source/ParallelBackgroundWorker.cs
index 6dbed19..3572b69 100644
--- a/Source/ParallelBackgroundWorker.cs
+++ b/Source/ParallelBackgroundWorker.cs
@@ -17,6 +17,12 @@ namespace Nuclex.Support {
public static readonly int Processors = Environment.ProcessorCount;
#endif
+ ///
+ /// Timeout after which Dispose() will stop waiting for unfinished tasks and
+ /// free resources anyway
+ ///
+ private static readonly int DefaultDisposeTimeoutMilliseconds = 1500; // milliseconds
+
/// Initializes a new parallel background worker with unlimited threads
public ParallelBackgroundWorker() : this(int.MaxValue) { }
@@ -40,11 +46,12 @@ namespace Nuclex.Support {
threadCount = Math.Max(1, Processors + threadCount);
}
- this.runQueuedTasksInThreadDelegate = new Action(runQueuedTasksInThread);
- this.runningThreads = new List();
+ this.queueSynchronizationRoot = new object();
+ this.runQueuedTasksInThreadDelegate = new Action(runQueuedTasksInThread);
this.tasks = new Queue();
this.threadTerminatedEvent = new AutoResetEvent(false);
this.cancellationTokenSource = new CancellationTokenSource();
+ this.exceptions = new ConcurrentBag();
}
///
@@ -81,7 +88,8 @@ namespace Nuclex.Support {
if(this.threadTerminatedEvent != null) {
CancelPendingTasks();
CancelRunningTasks();
- Join();
+
+ Wait(DefaultDisposeTimeoutMilliseconds);
this.threadTerminatedEvent.Dispose();
this.threadTerminatedEvent = null;
@@ -92,7 +100,6 @@ namespace Nuclex.Support {
}
}
-
/// Adds a task for processing by the background worker threads
/// Task that will be processed in the background
public void AddTask(TTask task) {
@@ -100,26 +107,28 @@ namespace Nuclex.Support {
return;
}
+ bool needNewThread;
+
lock(this.queueSynchronizationRoot) {
this.tasks.Enqueue(task);
- if(this.runningThreads.Count < this.threadCount) {
- //Task newThread = new Task(this.runQueuedTasksInThreadDelegate, );
+ needNewThread = (this.runningThreadCount < this.threadCount);
+ if(needNewThread) {
+ ++this.runningThreadCount;
}
}
- // Thread 1:
- // lock() {
- // - take task
- // - or deregister and exit
- // }
- //
- // Thread 2:
- // lock() {
- // - put task
- // - if too few threads, register and add
- // }
-
+ if(needNewThread) {
+ Task newThread = new Task(
+ this.runQueuedTasksInThreadDelegate,
+ // this.cancellationTokenSource.Token, // DO NOT PASS THIS!
+ // Passing the cancellation token makes tasks that have been queued but
+ // not started yet cease to execute at all - meaning our runningThreadCount
+ // goes out of sync and Dispose() / Wait() / Join() sit around endlessly!
+ TaskCreationOptions.LongRunning
+ );
+ newThread.Start();
+ }
}
/// Cancels all tasks that are currently executing
@@ -134,11 +143,36 @@ namespace Nuclex.Support {
}
}
- /// Waits until all executing and queued tasks have been processed
+ ///
+ /// Waits until all executing and queued tasks have been processed and throws an
+ /// exception if any errors have occurred
+ ///
public void Join() {
- while(this.runningThreads.Count > 0) {
+ while(Thread.VolatileRead(ref this.runningThreadCount) > 0) {
this.threadTerminatedEvent.WaitOne();
}
+
+ if(this.exceptions.Count > 0) {
+ throw new AggregateException(this.exceptions);
+ }
+ }
+
+ ///
+ /// Waits until all executing and queued tasks have been processed or
+ /// the timeout is reached
+ ///
+ /// Milliseconds after which the wait times out
+ ///
+ /// True if all tasks have been processed, false if the timeout was reached
+ ///
+ public bool Wait(int timeoutMilliseconds) {
+ while(Thread.VolatileRead(ref this.runningThreadCount) > 0) {
+ if(this.threadTerminatedEvent.WaitOne(timeoutMilliseconds) == false) {
+ return false;
+ }
+ }
+
+ return true;
}
/// Called in a thread to execute a single task
@@ -151,17 +185,23 @@ namespace Nuclex.Support {
///
/// Runs queued tasks of the parallel background worker until the queue is empty
///
- /// Threading task in which this worker is running
- private void runQueuedTasksInThread(object thisTaskAsObject) {
+ private void runQueuedTasksInThread() {
string previousThreadName = null;
if(!string.IsNullOrEmpty(this.threadName)) {
previousThreadName = Thread.CurrentThread.Name;
Thread.CurrentThread.Name = this.threadName;
}
try {
-#if false
- TTask task;
- while(this.tasks.TryDequeue(out task)) {
+ for(;;) {
+ TTask task;
+ lock(this.queueSynchronizationRoot) {
+ if(this.tasks.Count == 0) {
+ --this.runningThreadCount;
+ break;
+ }
+ task = this.tasks.Dequeue();
+ }
+
try {
Run(task, this.cancellationTokenSource.Token);
}
@@ -170,11 +210,7 @@ namespace Nuclex.Support {
}
}
- lock(this.runningThreads) {
- this.runningThreads.Remove((Task)thisTaskAsObject);
- }
this.threadTerminatedEvent.Set();
-#endif
}
finally {
if(!string.IsNullOrEmpty(this.threadName)) {
@@ -236,19 +272,11 @@ namespace Nuclex.Support {
private object queueSynchronizationRoot;
/// Delegate for the runQueuedTasksInThread() method
- private Action runQueuedTasksInThreadDelegate;
+ private Action runQueuedTasksInThreadDelegate;
/// Tasks remaining to be processed
private Queue tasks;
/// Threads that are currently executing tasks
- private IList runningThreads;
-
- // Idea:
- // private int runningThreadCount;
- // Before the task looks for new work, it will decrement this
- // if the task gets new work, it will increment this again.
- // - if it would be above threadCount now, put work back in the queue
- // AddTask() increments this after pushing new work
- // - if it would be above threadCount, do not create a new thread
+ private int runningThreadCount;
/// Exceptions that have occurred while executing tasks
private ConcurrentBag exceptions;