From 09247541f201b78de2c42c9583900bbd2f099236 Mon Sep 17 00:00:00 2001 From: Markus Ewald Date: Thu, 17 Sep 2009 19:50:30 +0000 Subject: [PATCH] Removed the AssertionDelegate stuff and added an UnhandledException delegate to the ThreadPool - maybe I'll turn this into an event analogous to Application.ThreadException; removed the EmptyQueue method which was a flawed concept anyway (especially the disposing of state objects!); rewrote the unit tests to work with the new and improved AffineThreadPool class - there's one test that fails when run through NCover, indicating some synchronization problem that I have yet to track down! git-svn-id: file:///srv/devel/repo-conversion/nusu@176 d2e56fa2-650e-0410-a79f-9358c0239efd --- Source/AffineThreadPool.Test.cs | 156 +++++++++++++++++--------------- Source/AffineThreadPool.cs | 99 +++++++------------- 2 files changed, 113 insertions(+), 142 deletions(-) diff --git a/Source/AffineThreadPool.Test.cs b/Source/AffineThreadPool.Test.cs index c5de8a8..9267cc6 100644 --- a/Source/AffineThreadPool.Test.cs +++ b/Source/AffineThreadPool.Test.cs @@ -142,6 +142,7 @@ namespace Nuclex.Support { #endregion // class WaitTask +#if false #region class ThrowingDisposable /// Throws an exception when it is disposed @@ -169,6 +170,7 @@ namespace Nuclex.Support { ); } +#endif /// Tests whether the QueueUserWorkItem() method is working [Test] @@ -200,21 +202,21 @@ namespace Nuclex.Support { /// [Test] public void TestExceptionFromUserWorkItem() { - using(ManualResetEvent assertEvent = new ManualResetEvent(false)) { - AffineThreadPool.AssertionDelegate oldAssertionHandler = - AffineThreadPool.AssertionHandler; + using(ManualResetEvent exceptionEvent = new ManualResetEvent(false)) { + AffineThreadPool.ExceptionDelegate oldExceptionHandler = + AffineThreadPool.ExceptionHandler; - AffineThreadPool.AssertionHandler = delegate( - bool condition, string message, string details - ) { assertEvent.Set(); }; + AffineThreadPool.ExceptionHandler = delegate(Exception exception) { + exceptionEvent.Set(); + }; try { AffineThreadPool.QueueUserWorkItem( delegate(object state) { throw new KeyNotFoundException(); } ); - Assert.IsTrue(assertEvent.WaitOne(1000)); + Assert.IsTrue(exceptionEvent.WaitOne(1000)); } finally { - AffineThreadPool.AssertionHandler = oldAssertionHandler; + AffineThreadPool.ExceptionHandler = oldExceptionHandler; } } } @@ -229,81 +231,87 @@ namespace Nuclex.Support { } /// - /// Tests whether the thread pool can handle an exception from a user work item + /// Verifies that the ProcessThread instance for a system thread id can + /// be determined using the GetProcessThread() method /// [Test] - public void TestExceptionFromDisposableState() { - using(ManualResetEvent assertEvent = new ManualResetEvent(false)) { - AffineThreadPool.AssertionDelegate oldAssertionHandler = - AffineThreadPool.AssertionHandler; + public void TestGetProcessThread() { + Thread.BeginThreadAffinity(); + try { + int threadId = AffineThreadPool.GetCurrentThreadId(); - AffineThreadPool.AssertionHandler = delegate( - bool condition, string message, string details - ) { assertEvent.Set(); }; + Assert.IsNotNull(AffineThreadPool.GetProcessThread(threadId)); + Assert.IsNull(AffineThreadPool.GetProcessThread(0)); + } + finally { + Thread.EndThreadAffinity(); + } - try { - int eventCount = AffineThreadPool.CpuCores; - WaitTask[] tasks = new WaitTask[eventCount]; + } - int createdTasks = 0; - try { + /// + /// Verifies that the waiting work items count and active thread count are + /// updated by the thread pool. + /// + [Test] + public void TestWaitingWorkItemsProperty() { + int eventCount = AffineThreadPool.CpuCores; + WaitTask[] tasks = new WaitTask[eventCount]; - // Create the tasks, counting up the created task counter. If an exception - // occurs, we will roll back from there. - for(createdTasks = 0; createdTasks < eventCount; ++createdTasks) { - tasks[createdTasks] = new WaitTask(); - } + int createdTasks = 0; + try { + // CHECK: Is there danger that the thread pool still has not finished + // queued items for other unit tests, thereby failing to meet + // our expected task counts? - // Schedule the blocking tasks in the thread pool so it will not be able - // to process the next task we add to the queue - for(int index = 0; index < eventCount; ++index) { - AffineThreadPool.QueueUserWorkItem(tasks[index].Callback); - } - - // Wait for the tasks to start so they aren't aborted by EmptyQueue() - for(int index = 0; index < eventCount; ++index) { - Assert.IsTrue(tasks[index].StartEvent.WaitOne(1000)); - } - Assert.AreEqual(createdTasks, AffineThreadPool.ActiveThreads); - Assert.AreEqual(0, AffineThreadPool.WaitingCallbacks); - - // Add a task to the queue whose state implements a faulty IDisposable - AffineThreadPool.QueueUserWorkItem( - delegate(object state) { }, new ThrowingDisposable() - ); - - Assert.AreEqual(1, AffineThreadPool.WaitingCallbacks); - - // Now clear the thread pool. This should cause the faulty IDisposable - // to be disposed and then throw its exception. - AffineThreadPool.EmptyQueue(); - - // Make sure our custom assertion handler has been triggered - Assert.IsTrue(assertEvent.WaitOne(1000)); - - Assert.AreEqual(createdTasks, AffineThreadPool.ActiveThreads); - Assert.AreEqual(0, AffineThreadPool.WaitingCallbacks); - - // Let the thread pool finish its active tasks - for(int index = 0; index < eventCount; ++index) { - tasks[index].WaitEvent.Set(); - } - - // Wait for the tasks to end before we dispose them - for(int index = 0; index < eventCount; ++index) { - Assert.IsTrue(tasks[index].FinishEvent.WaitOne(1000)); - } - } - finally { - for(--createdTasks; createdTasks >= 0; --createdTasks) { - tasks[createdTasks].Dispose(); - } - } + // Create the tasks, counting up the created task counter. If an exception + // occurs, we will roll back from there. + for(createdTasks = 0; createdTasks < eventCount; ++createdTasks) { + tasks[createdTasks] = new WaitTask(); } - finally { - AffineThreadPool.AssertionHandler = oldAssertionHandler; + + // Schedule the blocking tasks in the thread pool so it will not be able + // to process the next task we add to the queue + for(int index = 0; index < eventCount; ++index) { + AffineThreadPool.QueueUserWorkItem(tasks[index].Callback); } - } // using + + // Wait for the tasks to start so they aren't preempted by the tasks we're + // going to add (which would finish immediately). The affine thread pool + // works on a first come first serve basis, but we don't want to rely on this + // implementation detail in the unit test. + for(int index = 0; index < eventCount; ++index) { + Assert.IsTrue(tasks[index].StartEvent.WaitOne(1000)); + } + + // All Thread should now be active and no work items should be waiting + Assert.AreEqual(createdTasks, AffineThreadPool.ActiveThreads); + Assert.AreEqual(0, AffineThreadPool.WaitingWorkItems); + + // Add a task to the queue and make sure the waiting work item count goes up + AffineThreadPool.QueueUserWorkItem(delegate(object state) { }); + Assert.AreEqual(1, AffineThreadPool.WaitingWorkItems); + + // The same again. Now we should have 2 work items sitting in the queue + AffineThreadPool.QueueUserWorkItem(delegate(object state) { }); + Assert.AreEqual(2, AffineThreadPool.WaitingWorkItems); + + // Let the WaitTasks finish so we're not blocking the thread pool any longer + for(int index = 0; index < eventCount; ++index) { + tasks[index].WaitEvent.Set(); + } + + // Wait for the tasks to end before we get rid of them + for(int index = 0; index < eventCount; ++index) { + Assert.IsTrue(tasks[index].FinishEvent.WaitOne(1000)); + } + } + finally { + for(--createdTasks; createdTasks >= 0; --createdTasks) { + tasks[createdTasks].Dispose(); + } + } + } } diff --git a/Source/AffineThreadPool.cs b/Source/AffineThreadPool.cs index 6eb3f53..923926e 100644 --- a/Source/AffineThreadPool.cs +++ b/Source/AffineThreadPool.cs @@ -49,15 +49,9 @@ namespace Nuclex.Support { public static readonly int CpuCores = Environment.ProcessorCount; #endif - /// Delegate used by the thread pool to handle assertion checks - /// Condition that will be asserted - /// Message explaining what causes the assertion to fail - /// - /// Detailed description of the exact cause of the assertion failure - /// - public delegate void AssertionDelegate( - bool condition, string message, string details - ); + /// Delegate used by the thread pool to report unhandled exceptions + /// Exception that has not been handled + public delegate void ExceptionDelegate(Exception exception); #region class UserWorkItem @@ -93,10 +87,11 @@ namespace Nuclex.Support { workerThreads = new List(CpuCores); inUseThreads = 0; - // We can only use these hardware thread indices on the XBox 360 #if XBOX360 + // We can only use these hardware thread indices on the XBox 360 hardwareThreads = new Queue(new int[] { 5, 4, 3, 1 }); #else + // We can use all cores in the PC, starting from index 1 hardwareThreads = new Queue(CpuCores); for(int core = CpuCores; core >= 1; --core) { hardwareThreads.Enqueue(core); @@ -153,34 +148,6 @@ namespace Nuclex.Support { } - /// Empties the work queue of any queued work items - public static void EmptyQueue() { - lock(userWorkItems) { - try { - while(userWorkItems.Count > 0) { - UserWorkItem callback = userWorkItems.Dequeue(); - IDisposable disposableState = callback.State as IDisposable; - if(disposableState != null) { - disposableState.Dispose(); - } - } - } - catch(Exception) { // Make sure an error isn't thrown. - AssertionHandler( - false, - "Unhandled exception disposing the state of a user work item", - "The AffineThreadPool.EmptyQueue() method tried to dispose of any states" + - "associated with waiting user work items. One of the states implementing" + - "IDisposable threw an exception during Dispose()." - ); - } - - // Clear all waiting items and reset the number of worker threads currently needed - // to be 0 (there is nothing for threads to do) - userWorkItems.Clear(); - } - } - /// Gets the number of threads at the disposal of the thread pool public static int MaxThreads { get { return CpuCores; } } @@ -190,7 +157,7 @@ namespace Nuclex.Support { /// /// Gets the number of callback delegates currently waiting in the thread pool /// - public static int WaitingCallbacks { + public static int WaitingWorkItems { get { lock(userWorkItems) { return userWorkItems.Count; @@ -201,27 +168,37 @@ namespace Nuclex.Support { /// A thread worker function that processes items from the work queue private static void ProcessQueuedItems() { + // Get the system/hardware thread index this thread is going to use. We hope that + // the threads more or less start after each other, but there is no guarantee that + // tasks will be handled by the CPU cores in the order the queue was filled with. + // This could be added, though, by using a WaitHandle so the thread creator could + // wait for each thread to take one entry out of the queue. int hardwareThreadIndex; lock(hardwareThreads) { hardwareThreadIndex = hardwareThreads.Dequeue(); } #if XBOX360 - // MSDN states that SetProcessorAffinity() should be called from the thread - // whose affinity is being changed. + // On the XBox 360, the only way to get a thread to run on another core is to + // explicitly move it to that core. MSDN states that SetProcessorAffinity() should + // be called from the thread whose affinity is being changed. Thread.CurrentThread.SetProcessorAffinity(new int[] { hardwareThreadIndex }); #else // Prevent this managed thread from impersonating another system thread. - // Threads in .NET can take + // In .NET, managed threads can supposedly be moved to different system threads + // and, more worryingly, even fibers. This should make sure we're sitting on + // a normal system thread and stay with that thread during our lifetime. Thread.BeginThreadAffinity(); - ProcessThread thread = getCurrentProcessThread(); + // Assign the ideal processor, but don't force it. It's not a good idea to + // circumvent the thread scheduler of a desktop machine, so we try to play nice. + int threadId = GetCurrentThreadId(); + ProcessThread thread = GetProcessThread(threadId); if(thread != null) { thread.IdealProcessor = hardwareThreadIndex; } #endif - // Keep processing tasks indefinitely for(; ; ) { UserWorkItem workItem = getNextWorkItem(); @@ -232,13 +209,11 @@ namespace Nuclex.Support { Interlocked.Increment(ref inUseThreads); workItem.Callback(workItem.State); } - catch(Exception) { // Make sure we don't throw here. - AssertionHandler( - false, - "Unhandled exception in queued user work item", - "An unhandled exception travelled up to the AffineThreadPool from" + - "a queued user work item that was being executed" - ); + catch(Exception exception) { // Make sure we don't throw here. + ExceptionDelegate exceptionHandler = ExceptionHandler; + if(exceptionHandler != null) { + exceptionHandler(exception); + } } finally { Interlocked.Decrement(ref inUseThreads); @@ -249,9 +224,7 @@ namespace Nuclex.Support { #if !XBOX360 /// Retrieves the ProcessThread for the calling thread /// The ProcessThread for the calling thread - private static ProcessThread getCurrentProcessThread() { - int threadId = GetCurrentThreadId(); - + internal static ProcessThread GetProcessThread(int threadId) { ProcessThreadCollection threads = Process.GetCurrentProcess().Threads; for(int index = 0; index < threads.Count; ++index) { if(threads[index].Id == threadId) { @@ -290,25 +263,15 @@ namespace Nuclex.Support { } - /// Default assertion handler for the affine thread pool - /// Condition which is being asserted - /// Message explaining what causes the assertion to fail - /// - /// Detailed description of the exact cause of the assertion failure - /// - public static void DefaultAssertionHandler( - bool condition, string message, string details - ) { - Trace.Assert(condition, message, details); - } - /// Delegate used to handle assertion checks in the code - public static AssertionDelegate AssertionHandler = DefaultAssertionHandler; + public static volatile ExceptionDelegate ExceptionHandler; +#if !XBOX360 /// Retrieves the calling thread's thread id /// The thread is of the calling thread [DllImport("kernel32.dll")] - private static extern int GetCurrentThreadId(); + internal static extern int GetCurrentThreadId(); +#endif /// Available hardware threads the thread pool threads pick from private static Queue hardwareThreads;