diff --git a/Source/AffineThreadPool.Test.cs b/Source/AffineThreadPool.Test.cs
index c5de8a8..9267cc6 100644
--- a/Source/AffineThreadPool.Test.cs
+++ b/Source/AffineThreadPool.Test.cs
@@ -142,6 +142,7 @@ namespace Nuclex.Support {
#endregion // class WaitTask
+#if false
#region class ThrowingDisposable
/// Throws an exception when it is disposed
@@ -169,6 +170,7 @@ namespace Nuclex.Support {
);
}
+#endif
/// Tests whether the QueueUserWorkItem() method is working
[Test]
@@ -200,21 +202,21 @@ namespace Nuclex.Support {
///
[Test]
public void TestExceptionFromUserWorkItem() {
- using(ManualResetEvent assertEvent = new ManualResetEvent(false)) {
- AffineThreadPool.AssertionDelegate oldAssertionHandler =
- AffineThreadPool.AssertionHandler;
+ using(ManualResetEvent exceptionEvent = new ManualResetEvent(false)) {
+ AffineThreadPool.ExceptionDelegate oldExceptionHandler =
+ AffineThreadPool.ExceptionHandler;
- AffineThreadPool.AssertionHandler = delegate(
- bool condition, string message, string details
- ) { assertEvent.Set(); };
+ AffineThreadPool.ExceptionHandler = delegate(Exception exception) {
+ exceptionEvent.Set();
+ };
try {
AffineThreadPool.QueueUserWorkItem(
delegate(object state) { throw new KeyNotFoundException(); }
);
- Assert.IsTrue(assertEvent.WaitOne(1000));
+ Assert.IsTrue(exceptionEvent.WaitOne(1000));
}
finally {
- AffineThreadPool.AssertionHandler = oldAssertionHandler;
+ AffineThreadPool.ExceptionHandler = oldExceptionHandler;
}
}
}
@@ -229,81 +231,87 @@ namespace Nuclex.Support {
}
///
- /// Tests whether the thread pool can handle an exception from a user work item
+ /// Verifies that the ProcessThread instance for a system thread id can
+ /// be determined using the GetProcessThread() method
///
[Test]
- public void TestExceptionFromDisposableState() {
- using(ManualResetEvent assertEvent = new ManualResetEvent(false)) {
- AffineThreadPool.AssertionDelegate oldAssertionHandler =
- AffineThreadPool.AssertionHandler;
+ public void TestGetProcessThread() {
+ Thread.BeginThreadAffinity();
+ try {
+ int threadId = AffineThreadPool.GetCurrentThreadId();
- AffineThreadPool.AssertionHandler = delegate(
- bool condition, string message, string details
- ) { assertEvent.Set(); };
+ Assert.IsNotNull(AffineThreadPool.GetProcessThread(threadId));
+ Assert.IsNull(AffineThreadPool.GetProcessThread(0));
+ }
+ finally {
+ Thread.EndThreadAffinity();
+ }
- try {
- int eventCount = AffineThreadPool.CpuCores;
- WaitTask[] tasks = new WaitTask[eventCount];
+ }
- int createdTasks = 0;
- try {
+ ///
+ /// Verifies that the waiting work items count and active thread count are
+ /// updated by the thread pool.
+ ///
+ [Test]
+ public void TestWaitingWorkItemsProperty() {
+ int eventCount = AffineThreadPool.CpuCores;
+ WaitTask[] tasks = new WaitTask[eventCount];
- // Create the tasks, counting up the created task counter. If an exception
- // occurs, we will roll back from there.
- for(createdTasks = 0; createdTasks < eventCount; ++createdTasks) {
- tasks[createdTasks] = new WaitTask();
- }
+ int createdTasks = 0;
+ try {
+ // CHECK: Is there danger that the thread pool still has not finished
+ // queued items for other unit tests, thereby failing to meet
+ // our expected task counts?
- // Schedule the blocking tasks in the thread pool so it will not be able
- // to process the next task we add to the queue
- for(int index = 0; index < eventCount; ++index) {
- AffineThreadPool.QueueUserWorkItem(tasks[index].Callback);
- }
-
- // Wait for the tasks to start so they aren't aborted by EmptyQueue()
- for(int index = 0; index < eventCount; ++index) {
- Assert.IsTrue(tasks[index].StartEvent.WaitOne(1000));
- }
- Assert.AreEqual(createdTasks, AffineThreadPool.ActiveThreads);
- Assert.AreEqual(0, AffineThreadPool.WaitingCallbacks);
-
- // Add a task to the queue whose state implements a faulty IDisposable
- AffineThreadPool.QueueUserWorkItem(
- delegate(object state) { }, new ThrowingDisposable()
- );
-
- Assert.AreEqual(1, AffineThreadPool.WaitingCallbacks);
-
- // Now clear the thread pool. This should cause the faulty IDisposable
- // to be disposed and then throw its exception.
- AffineThreadPool.EmptyQueue();
-
- // Make sure our custom assertion handler has been triggered
- Assert.IsTrue(assertEvent.WaitOne(1000));
-
- Assert.AreEqual(createdTasks, AffineThreadPool.ActiveThreads);
- Assert.AreEqual(0, AffineThreadPool.WaitingCallbacks);
-
- // Let the thread pool finish its active tasks
- for(int index = 0; index < eventCount; ++index) {
- tasks[index].WaitEvent.Set();
- }
-
- // Wait for the tasks to end before we dispose them
- for(int index = 0; index < eventCount; ++index) {
- Assert.IsTrue(tasks[index].FinishEvent.WaitOne(1000));
- }
- }
- finally {
- for(--createdTasks; createdTasks >= 0; --createdTasks) {
- tasks[createdTasks].Dispose();
- }
- }
+ // Create the tasks, counting up the created task counter. If an exception
+ // occurs, we will roll back from there.
+ for(createdTasks = 0; createdTasks < eventCount; ++createdTasks) {
+ tasks[createdTasks] = new WaitTask();
}
- finally {
- AffineThreadPool.AssertionHandler = oldAssertionHandler;
+
+ // Schedule the blocking tasks in the thread pool so it will not be able
+ // to process the next task we add to the queue
+ for(int index = 0; index < eventCount; ++index) {
+ AffineThreadPool.QueueUserWorkItem(tasks[index].Callback);
}
- } // using
+
+ // Wait for the tasks to start so they aren't preempted by the tasks we're
+ // going to add (which would finish immediately). The affine thread pool
+ // works on a first come first serve basis, but we don't want to rely on this
+ // implementation detail in the unit test.
+ for(int index = 0; index < eventCount; ++index) {
+ Assert.IsTrue(tasks[index].StartEvent.WaitOne(1000));
+ }
+
+ // All Thread should now be active and no work items should be waiting
+ Assert.AreEqual(createdTasks, AffineThreadPool.ActiveThreads);
+ Assert.AreEqual(0, AffineThreadPool.WaitingWorkItems);
+
+ // Add a task to the queue and make sure the waiting work item count goes up
+ AffineThreadPool.QueueUserWorkItem(delegate(object state) { });
+ Assert.AreEqual(1, AffineThreadPool.WaitingWorkItems);
+
+ // The same again. Now we should have 2 work items sitting in the queue
+ AffineThreadPool.QueueUserWorkItem(delegate(object state) { });
+ Assert.AreEqual(2, AffineThreadPool.WaitingWorkItems);
+
+ // Let the WaitTasks finish so we're not blocking the thread pool any longer
+ for(int index = 0; index < eventCount; ++index) {
+ tasks[index].WaitEvent.Set();
+ }
+
+ // Wait for the tasks to end before we get rid of them
+ for(int index = 0; index < eventCount; ++index) {
+ Assert.IsTrue(tasks[index].FinishEvent.WaitOne(1000));
+ }
+ }
+ finally {
+ for(--createdTasks; createdTasks >= 0; --createdTasks) {
+ tasks[createdTasks].Dispose();
+ }
+ }
+
}
}
diff --git a/Source/AffineThreadPool.cs b/Source/AffineThreadPool.cs
index 6eb3f53..923926e 100644
--- a/Source/AffineThreadPool.cs
+++ b/Source/AffineThreadPool.cs
@@ -49,15 +49,9 @@ namespace Nuclex.Support {
public static readonly int CpuCores = Environment.ProcessorCount;
#endif
- /// Delegate used by the thread pool to handle assertion checks
- /// Condition that will be asserted
- /// Message explaining what causes the assertion to fail
- ///
- /// Detailed description of the exact cause of the assertion failure
- ///
- public delegate void AssertionDelegate(
- bool condition, string message, string details
- );
+ /// Delegate used by the thread pool to report unhandled exceptions
+ /// Exception that has not been handled
+ public delegate void ExceptionDelegate(Exception exception);
#region class UserWorkItem
@@ -93,10 +87,11 @@ namespace Nuclex.Support {
workerThreads = new List(CpuCores);
inUseThreads = 0;
- // We can only use these hardware thread indices on the XBox 360
#if XBOX360
+ // We can only use these hardware thread indices on the XBox 360
hardwareThreads = new Queue(new int[] { 5, 4, 3, 1 });
#else
+ // We can use all cores in the PC, starting from index 1
hardwareThreads = new Queue(CpuCores);
for(int core = CpuCores; core >= 1; --core) {
hardwareThreads.Enqueue(core);
@@ -153,34 +148,6 @@ namespace Nuclex.Support {
}
- /// Empties the work queue of any queued work items
- public static void EmptyQueue() {
- lock(userWorkItems) {
- try {
- while(userWorkItems.Count > 0) {
- UserWorkItem callback = userWorkItems.Dequeue();
- IDisposable disposableState = callback.State as IDisposable;
- if(disposableState != null) {
- disposableState.Dispose();
- }
- }
- }
- catch(Exception) { // Make sure an error isn't thrown.
- AssertionHandler(
- false,
- "Unhandled exception disposing the state of a user work item",
- "The AffineThreadPool.EmptyQueue() method tried to dispose of any states" +
- "associated with waiting user work items. One of the states implementing" +
- "IDisposable threw an exception during Dispose()."
- );
- }
-
- // Clear all waiting items and reset the number of worker threads currently needed
- // to be 0 (there is nothing for threads to do)
- userWorkItems.Clear();
- }
- }
-
/// Gets the number of threads at the disposal of the thread pool
public static int MaxThreads { get { return CpuCores; } }
@@ -190,7 +157,7 @@ namespace Nuclex.Support {
///
/// Gets the number of callback delegates currently waiting in the thread pool
///
- public static int WaitingCallbacks {
+ public static int WaitingWorkItems {
get {
lock(userWorkItems) {
return userWorkItems.Count;
@@ -201,27 +168,37 @@ namespace Nuclex.Support {
/// A thread worker function that processes items from the work queue
private static void ProcessQueuedItems() {
+ // Get the system/hardware thread index this thread is going to use. We hope that
+ // the threads more or less start after each other, but there is no guarantee that
+ // tasks will be handled by the CPU cores in the order the queue was filled with.
+ // This could be added, though, by using a WaitHandle so the thread creator could
+ // wait for each thread to take one entry out of the queue.
int hardwareThreadIndex;
lock(hardwareThreads) {
hardwareThreadIndex = hardwareThreads.Dequeue();
}
#if XBOX360
- // MSDN states that SetProcessorAffinity() should be called from the thread
- // whose affinity is being changed.
+ // On the XBox 360, the only way to get a thread to run on another core is to
+ // explicitly move it to that core. MSDN states that SetProcessorAffinity() should
+ // be called from the thread whose affinity is being changed.
Thread.CurrentThread.SetProcessorAffinity(new int[] { hardwareThreadIndex });
#else
// Prevent this managed thread from impersonating another system thread.
- // Threads in .NET can take
+ // In .NET, managed threads can supposedly be moved to different system threads
+ // and, more worryingly, even fibers. This should make sure we're sitting on
+ // a normal system thread and stay with that thread during our lifetime.
Thread.BeginThreadAffinity();
- ProcessThread thread = getCurrentProcessThread();
+ // Assign the ideal processor, but don't force it. It's not a good idea to
+ // circumvent the thread scheduler of a desktop machine, so we try to play nice.
+ int threadId = GetCurrentThreadId();
+ ProcessThread thread = GetProcessThread(threadId);
if(thread != null) {
thread.IdealProcessor = hardwareThreadIndex;
}
#endif
-
// Keep processing tasks indefinitely
for(; ; ) {
UserWorkItem workItem = getNextWorkItem();
@@ -232,13 +209,11 @@ namespace Nuclex.Support {
Interlocked.Increment(ref inUseThreads);
workItem.Callback(workItem.State);
}
- catch(Exception) { // Make sure we don't throw here.
- AssertionHandler(
- false,
- "Unhandled exception in queued user work item",
- "An unhandled exception travelled up to the AffineThreadPool from" +
- "a queued user work item that was being executed"
- );
+ catch(Exception exception) { // Make sure we don't throw here.
+ ExceptionDelegate exceptionHandler = ExceptionHandler;
+ if(exceptionHandler != null) {
+ exceptionHandler(exception);
+ }
}
finally {
Interlocked.Decrement(ref inUseThreads);
@@ -249,9 +224,7 @@ namespace Nuclex.Support {
#if !XBOX360
/// Retrieves the ProcessThread for the calling thread
/// The ProcessThread for the calling thread
- private static ProcessThread getCurrentProcessThread() {
- int threadId = GetCurrentThreadId();
-
+ internal static ProcessThread GetProcessThread(int threadId) {
ProcessThreadCollection threads = Process.GetCurrentProcess().Threads;
for(int index = 0; index < threads.Count; ++index) {
if(threads[index].Id == threadId) {
@@ -290,25 +263,15 @@ namespace Nuclex.Support {
}
- /// Default assertion handler for the affine thread pool
- /// Condition which is being asserted
- /// Message explaining what causes the assertion to fail
- ///
- /// Detailed description of the exact cause of the assertion failure
- ///
- public static void DefaultAssertionHandler(
- bool condition, string message, string details
- ) {
- Trace.Assert(condition, message, details);
- }
-
/// Delegate used to handle assertion checks in the code
- public static AssertionDelegate AssertionHandler = DefaultAssertionHandler;
+ public static volatile ExceptionDelegate ExceptionHandler;
+#if !XBOX360
/// Retrieves the calling thread's thread id
/// The thread is of the calling thread
[DllImport("kernel32.dll")]
- private static extern int GetCurrentThreadId();
+ internal static extern int GetCurrentThreadId();
+#endif
/// Available hardware threads the thread pool threads pick from
private static Queue hardwareThreads;