From 316e2c379a609cb2d2b97294f843029cfe8124c2 Mon Sep 17 00:00:00 2001 From: Markus Ewald Date: Tue, 15 Sep 2009 19:39:08 +0000 Subject: [PATCH] Added custom CPU core-affine thread pool implementation (which doesn't work on the XBox 360 because Monitor.Wait() and Monitor.Pulse() are not supported - but the code is so nice I want to capture this state in Subversion :D) git-svn-id: file:///srv/devel/repo-conversion/nusu@172 d2e56fa2-650e-0410-a79f-9358c0239efd --- Nuclex.Support (Xbox 360).csproj | 6 +- Nuclex.Support.csproj | 4 + Source/AffineThreadPool.Test.cs | 313 +++++++++++++++++++++++++ Source/AffineThreadPool.cs | 388 +++++++++++++++++++++++++++++++ Source/AssertHelper.cs | 2 +- 5 files changed, 711 insertions(+), 2 deletions(-) create mode 100644 Source/AffineThreadPool.Test.cs create mode 100644 Source/AffineThreadPool.cs diff --git a/Nuclex.Support (Xbox 360).csproj b/Nuclex.Support (Xbox 360).csproj index 5d4f7ed..26b1dec 100644 --- a/Nuclex.Support (Xbox 360).csproj +++ b/Nuclex.Support (Xbox 360).csproj @@ -1,4 +1,4 @@ - + {DFFEAB70-51B8-4714-BCA6-79B733BBC520} {2DF5C3F4-5A5F-47a9-8E94-23B4456F55E2};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC} @@ -61,6 +61,9 @@ + + AffineThreadPool.cs + AssertHelper.cs @@ -130,6 +133,7 @@ ReverseComparer.cs + PrototypeFactory.cs diff --git a/Nuclex.Support.csproj b/Nuclex.Support.csproj index 059af82..d35843c 100644 --- a/Nuclex.Support.csproj +++ b/Nuclex.Support.csproj @@ -47,6 +47,9 @@ + + AffineThreadPool.cs + AssertHelper.cs @@ -116,6 +119,7 @@ ReverseComparer.cs + PrototypeFactory.cs diff --git a/Source/AffineThreadPool.Test.cs b/Source/AffineThreadPool.Test.cs new file mode 100644 index 0000000..c5de8a8 --- /dev/null +++ b/Source/AffineThreadPool.Test.cs @@ -0,0 +1,313 @@ +#region CPL License +/* +Nuclex Framework +Copyright (C) 2002-2009 Nuclex Development Labs + +This library is free software; you can redistribute it and/or +modify it under the terms of the IBM Common Public License as +published by the IBM Corporation; either version 1.0 of the +License, or (at your option) any later version. + +This library is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +IBM Common Public License for more details. + +You should have received a copy of the IBM Common Public +License along with this library +*/ +#endregion + +#if UNITTEST + +using System; +using System.Collections.Generic; +using System.Threading; + +using NUnit.Framework; + +namespace Nuclex.Support { + + /// Unit Test for the CPU core-affine thread pool + [TestFixture] + public class AffineThreadPoolTest { + + #region class TestTask + + /// ThreadPool task that can be used for testing + private class TestTask : IDisposable { + + /// Initializes a new test task + public TestTask() { + this.callbackEvent = new ManualResetEvent(false); + } + + /// Immediately releases all resources owned by the instance + public void Dispose() { + if(this.callbackEvent != null) { + this.callbackEvent.Close(); + this.callbackEvent = null; + } + } + + /// Callback that can be added to the thread pool as a task + /// User defined state + public void Callback(object state) { + this.LastCallbackState = state; + this.callbackEvent.Set(); + } + + /// Event that will be set when the callback is executed + public ManualResetEvent CallbackEvent { + get { return this.callbackEvent; } + } + + /// + /// State parameter that was provide when the callback was called + /// + public volatile object LastCallbackState; + + /// Event that will be set when the callback is invoked + private ManualResetEvent callbackEvent; + + } + + #endregion // class TestTask + + #region class WaitTask + + /// ThreadPool task that can be used for testing + private class WaitTask : IDisposable { + + /// Initializes a new test task + public WaitTask() { + this.startEvent = new ManualResetEvent(false); + this.finishEvent = new ManualResetEvent(false); + this.waitEvent = new ManualResetEvent(false); + } + + /// Immediately releases all resources owned by the instance + public void Dispose() { + if(this.waitEvent != null) { + this.waitEvent.Close(); + this.waitEvent = null; + } + if(this.finishEvent != null) { + this.finishEvent.Close(); + this.finishEvent = null; + } + if(this.startEvent != null) { + this.startEvent.Close(); + this.startEvent = null; + } + } + + /// Callback that can be added to the thread pool as a task + /// User defined state + public void Callback(object state) { + this.LastCallbackState = state; + this.startEvent.Set(); + this.waitEvent.WaitOne(); + this.finishEvent.Set(); + } + + /// Event that will be set when the callback has started + public ManualResetEvent StartEvent { + get { return this.startEvent; } + } + + /// Event that will be set when the callback has finished + public ManualResetEvent FinishEvent { + get { return this.finishEvent; } + } + + /// Event that blocks the callback + public ManualResetEvent WaitEvent { + get { return this.waitEvent; } + } + + /// + /// State parameter that was provide when the callback was called + /// + public volatile object LastCallbackState; + + /// Event that will be set when the callback has started + private ManualResetEvent startEvent; + /// Event that will be set when the callback has finished + private ManualResetEvent finishEvent; + /// Event used to block the callback + private ManualResetEvent waitEvent; + + } + + #endregion // class WaitTask + + #region class ThrowingDisposable + + /// Throws an exception when it is disposed + private class ThrowingDisposable : IDisposable { + + /// Immediately releases all resources owned by the instance + public void Dispose() { + throw new ArithmeticException("Simulated exception for unit testing"); + } + + } + + #endregion // class ThrowingDisposable + + /// + /// Verifies that the Thread Pool's default assertion handler is working + /// + [Test] + public void TestDefaultAssertionHandler() { + + // We can't test a failing assertion because our tests need to run + // unattended on a build server without blocking for user input. + AffineThreadPool.DefaultAssertionHandler( + true, "Unit test", "This should not fail" + ); + + } + + /// Tests whether the QueueUserWorkItem() method is working + [Test] + public void TestQueueUserWorkItem() { + using(TestTask task = new TestTask()) { + AffineThreadPool.QueueUserWorkItem(task.Callback); + Assert.IsTrue(task.CallbackEvent.WaitOne(1000)); + } + } + + /// + /// Verifies that the QueueUserWorkItem() method is passing the state parameter + /// on to the callback + /// + [Test] + public void TestQueueUserWorkItemWithState() { + using(TestTask task = new TestTask()) { + object state = new object(); + + AffineThreadPool.QueueUserWorkItem(task.Callback, state); + + Assert.IsTrue(task.CallbackEvent.WaitOne(1000)); + Assert.AreSame(state, task.LastCallbackState); + } + } + + /// + /// Tests whether the thread pool can handle an exception from a user work item + /// + [Test] + public void TestExceptionFromUserWorkItem() { + using(ManualResetEvent assertEvent = new ManualResetEvent(false)) { + AffineThreadPool.AssertionDelegate oldAssertionHandler = + AffineThreadPool.AssertionHandler; + + AffineThreadPool.AssertionHandler = delegate( + bool condition, string message, string details + ) { assertEvent.Set(); }; + try { + AffineThreadPool.QueueUserWorkItem( + delegate(object state) { throw new KeyNotFoundException(); } + ); + Assert.IsTrue(assertEvent.WaitOne(1000)); + } + finally { + AffineThreadPool.AssertionHandler = oldAssertionHandler; + } + } + } + + /// + /// Verifies that the affine thread pool's maximum thread count equals + /// the number of logical processors in the system + /// + [Test] + public void TestMaxThreadsProperty() { + Assert.AreEqual(Environment.ProcessorCount, AffineThreadPool.MaxThreads); + } + + /// + /// Tests whether the thread pool can handle an exception from a user work item + /// + [Test] + public void TestExceptionFromDisposableState() { + using(ManualResetEvent assertEvent = new ManualResetEvent(false)) { + AffineThreadPool.AssertionDelegate oldAssertionHandler = + AffineThreadPool.AssertionHandler; + + AffineThreadPool.AssertionHandler = delegate( + bool condition, string message, string details + ) { assertEvent.Set(); }; + + try { + int eventCount = AffineThreadPool.CpuCores; + WaitTask[] tasks = new WaitTask[eventCount]; + + int createdTasks = 0; + try { + + // Create the tasks, counting up the created task counter. If an exception + // occurs, we will roll back from there. + for(createdTasks = 0; createdTasks < eventCount; ++createdTasks) { + tasks[createdTasks] = new WaitTask(); + } + + // Schedule the blocking tasks in the thread pool so it will not be able + // to process the next task we add to the queue + for(int index = 0; index < eventCount; ++index) { + AffineThreadPool.QueueUserWorkItem(tasks[index].Callback); + } + + // Wait for the tasks to start so they aren't aborted by EmptyQueue() + for(int index = 0; index < eventCount; ++index) { + Assert.IsTrue(tasks[index].StartEvent.WaitOne(1000)); + } + Assert.AreEqual(createdTasks, AffineThreadPool.ActiveThreads); + Assert.AreEqual(0, AffineThreadPool.WaitingCallbacks); + + // Add a task to the queue whose state implements a faulty IDisposable + AffineThreadPool.QueueUserWorkItem( + delegate(object state) { }, new ThrowingDisposable() + ); + + Assert.AreEqual(1, AffineThreadPool.WaitingCallbacks); + + // Now clear the thread pool. This should cause the faulty IDisposable + // to be disposed and then throw its exception. + AffineThreadPool.EmptyQueue(); + + // Make sure our custom assertion handler has been triggered + Assert.IsTrue(assertEvent.WaitOne(1000)); + + Assert.AreEqual(createdTasks, AffineThreadPool.ActiveThreads); + Assert.AreEqual(0, AffineThreadPool.WaitingCallbacks); + + // Let the thread pool finish its active tasks + for(int index = 0; index < eventCount; ++index) { + tasks[index].WaitEvent.Set(); + } + + // Wait for the tasks to end before we dispose them + for(int index = 0; index < eventCount; ++index) { + Assert.IsTrue(tasks[index].FinishEvent.WaitOne(1000)); + } + } + finally { + for(--createdTasks; createdTasks >= 0; --createdTasks) { + tasks[createdTasks].Dispose(); + } + } + } + finally { + AffineThreadPool.AssertionHandler = oldAssertionHandler; + } + } // using + } + + } + +} // namespace Nuclex.Support + +#endif // UNITTEST diff --git a/Source/AffineThreadPool.cs b/Source/AffineThreadPool.cs new file mode 100644 index 0000000..125ace5 --- /dev/null +++ b/Source/AffineThreadPool.cs @@ -0,0 +1,388 @@ +#region CPL License +/* +Nuclex Framework +Copyright (C) 2002-2009 Nuclex Development Labs + +This library is free software; you can redistribute it and/or +modify it under the terms of the IBM Common Public License as +published by the IBM Corporation; either version 1.0 of the +License, or (at your option) any later version. + +This library is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +IBM Common Public License for more details. + +You should have received a copy of the IBM Common Public +License along with this library +*/ +#endregion + +using System; +using System.Threading; +using System.Collections.Generic; +using System.Diagnostics; + +namespace Nuclex.Support { + + /// Alternative Thread pool providing one thread for each core + /// + /// + /// Unlike the normal thread pool, the affine thread pool provides only as many + /// threads as there are CPU cores available on the current platform. This makes + /// it more suitable for tasks you want to spread across all available cpu cores + /// explicitely, however, it's not a good match if you just want to run a series + /// of tasks asynchronously. + /// + /// + /// Implementation based on original code provided by Stephen Toub + /// (stoub at microsoft ignorethis dot com) + /// + /// + public static class AffineThreadPool { + + /// Number of CPU cores available on the system +#if XBOX360 + public static readonly int CpuCores = 4; +#else + public static readonly int CpuCores = Environment.ProcessorCount; +#endif + + /// Delegate used by the thread pool to handle assertion checks + /// Condition that will be asserted + /// Message explaining what causes the assertion to fail + /// + /// Detailed description of the exact cause of the assertion failure + /// + public delegate void AssertionDelegate( + bool condition, string message, string details + ); + + #region class Semaphore + + /// + /// Lightweight variant of Dijkstra's PV Semaphore semaphore based on + /// the Monitor class. + /// + /// + /// Based on code by Stephen Toub (stoub at microsoft ignorethis dot com). + /// + private class Semaphore { + +#if false // The Thread Pool doesn't use this. + /// Initializes the semaphore as a binary semaphore (mutex) + public Semaphore() : this(1) { } +#endif + + /// Initializes the semaphore as a counting semaphore + /// + /// Initial number of threads that can take out units from this semaphore + /// + /// + /// Thrown if the count argument is less than 1 + /// + public Semaphore(int count) { +#if false // The Thread Pool doesn't make this mistake. + if(count < 0) { + throw new ArgumentException( + "Semaphore must have a count of at least 0.", "count" + ); + } +#endif + this.count = count; + } + + /// V the semaphore (add 1 unit to it). + public void Release() { v(); } + + /// P the semaphore (take out 1 unit from it). + public void WaitOne() { p(); } + + /// P the semaphore (take out 1 unit from it). + private void p() { + + // Lock so we can work in peace. This works because lock is actually + // built around Monitor. + lock(this) { + + // Wait until a unit becomes available. We need to wait in a loop in case + // someone else wakes up before us. This could happen if the Monitor.Pulse + // statements were changed to Monitor.PulseAll statements in order to + // introduce some randomness into the order in which threads are woken. + while(this.count <= 0) { + Monitor.Wait(this, Timeout.Infinite); + } + + --this.count; + + } + + } + + /// V the semaphore (add 1 unit to it). + private void v() { + + // Lock so we can work in peace. This works because lock is actually + // built around Monitor. + lock(this) { + + // Release our hold on the unit of control. Then tell everyone + // waiting on this object that there is a unit available. + ++this.count; + Monitor.Pulse(this); + + } + + } + + /// + /// Resets the semaphore to the specified count. Should be used cautiously. + /// + public void Reset(int count) { + lock(this) { + this.count = count; + } + } + + /// The number of units alloted by this semaphore. + private int count; + + } + + #endregion // class Semaphore + + #region class WaitingCallback + + /// Used to hold a callback delegate and the state for that delegate. + private struct UserWorkItem { + + /// Initialize the callback holding object. + /// Callback delegate for the callback. + /// State with which to call the callback delegate. + public UserWorkItem(WaitCallback callback, object state) { + this.Callback = callback; + this.State = state; + } + + /// Callback delegate for the callback. + public WaitCallback Callback; + /// State with which to call the callback delegate. + public object State; + + } + + #endregion // class WaitingCallback + + /// Initializes the thread pool + static AffineThreadPool() { + + // Create our thread stores; we handle synchronization ourself + // as we may run into situations where multiple operations need to be atomic. + // We keep track of the threads we've created just for good measure; not actually + // needed for any core functionality. + waitingCallbacks = new Queue(CpuCores * 4); + workerThreads = new List(CpuCores); + inUseThreads = 0; + + // We can only use these hardware thread indices on the XBox 360 +#if XBOX360 + XboxHardwareThreads = new Queue(new int[] { 5, 4, 3, 1 }); +#endif + + // Create our "thread needed" event + workerThreadNeeded = new Semaphore(0); + + // Create all of the worker threads + for(int index = 0; index < CpuCores; index++) { + + // Create a new thread and add it to the list of threads. + Thread newThread = new Thread(new ThreadStart(ProcessQueuedItems)); + workerThreads.Add(newThread); + + // Configure the new thread and start it + newThread.Name = "Nuclex.Support.AffineThreadPool Thread #" + index.ToString(); + newThread.IsBackground = true; + newThread.Start(); + } + + } + + /// Queues a user work item to the thread pool + /// + /// A WaitCallback representing the delegate to invoke when a thread in the + /// thread pool picks up the work item + /// + public static void QueueUserWorkItem(WaitCallback callback) { + + // Queue the delegate with no state + QueueUserWorkItem(callback, null); + + } + + /// Queues a user work item to the thread pool. + /// + /// A WaitCallback representing the delegate to invoke when a thread in the + /// thread pool picks up the work item + /// + /// + /// The object that is passed to the delegate when serviced from the thread pool + /// + public static void QueueUserWorkItem(WaitCallback callback, object state) { + + // Create a waiting callback that contains the delegate and its state. + // Add it to the processing queue, and signal that data is waiting. + UserWorkItem waiting = new UserWorkItem(callback, state); + lock(waitingCallbacks) { + waitingCallbacks.Enqueue(waiting); + } + + // Decrement the semaphore into the negative range, so the worker threads will + // be woken up until no more tasks are available. + workerThreadNeeded.Release(); + + } + + /// Empties the work queue of any queued work items + public static void EmptyQueue() { + lock(waitingCallbacks) { + try { + while(waitingCallbacks.Count > 0) { + UserWorkItem callback = waitingCallbacks.Dequeue(); + IDisposable disposableState = callback.State as IDisposable; + if(disposableState != null) { + disposableState.Dispose(); + } + } + } + catch(Exception) { // Make sure an error isn't thrown. + AssertionHandler( + false, + "Unhandled exception disposing the state of a user work item", + "The AffineThreadPool.EmptyQueue() method tried to dispose of any states" + + "associated with waiting user work items. One of the states implementing" + + "IDisposable threw an exception during Dispose()." + ); + } + + // Clear all waiting items and reset the number of worker threads currently needed + // to be 0 (there is nothing for threads to do) + waitingCallbacks.Clear(); + workerThreadNeeded.Reset(0); + } + } + + /// Gets the number of threads at the disposal of the thread pool + public static int MaxThreads { get { return CpuCores; } } + + /// Gets the number of currently active threads in the thread pool + public static int ActiveThreads { get { return inUseThreads; } } + + /// + /// Gets the number of callback delegates currently waiting in the thread pool + /// + public static int WaitingCallbacks { + get { + lock(waitingCallbacks) { + return waitingCallbacks.Count; + } + } + } + + /// A thread worker function that processes items from the work queue + private static void ProcessQueuedItems() { +#if XBOX360 + // MSDN states that SetProcessorAffinity() should be called from the thread + // whose affinity is being changed. + int hardwareThreadIndex; + lock(XboxHardwareThreads) { + hardwareThreadIndex = XboxHardwareThreads.Dequeue(); + } + Thread.CurrentThread.SetProcessorAffinity(new int[] { hardwareThreadIndex }); +#endif + + // Keep processing tasks indefinitely + for(; ; ) { + UserWorkItem workItem = getNextWorkItem(); + + // Execute the work item we just picked up. Make sure to accurately + // record how many callbacks are currently executing. + try { + Interlocked.Increment(ref inUseThreads); + workItem.Callback(workItem.State); + } + catch(Exception) { // Make sure we don't throw here. + AssertionHandler( + false, + "Unhandled exception in queued user work item", + "An unhandled exception travelled up to the AffineThreadPool from" + + "a queued user work item that was being executed" + ); + } + finally { + Interlocked.Decrement(ref inUseThreads); + } + } + } + + /// Obtains the next work item from the queue + /// The next work item in the queue + /// + /// If the queue is empty, the call will block until an item is added to + /// the queue and the calling thread was the one picking it up. + /// + private static UserWorkItem getNextWorkItem() { + + // Get the next item in the queue. If there is nothing there, go to sleep + // for a while until we're woken up when a callback is waiting. + for(; ; ) { + + // Try to get the next callback available. We need to lock on the + // queue in order to make our count check and retrieval atomic. + lock(waitingCallbacks) { + if(waitingCallbacks.Count > 0) { + return waitingCallbacks.Dequeue(); + } + } + + // If we can't get one, go to sleep. + workerThreadNeeded.WaitOne(); + + } + + } + + /// Default assertion handler for the affine thread pool + /// Condition which is being asserted + /// Message explaining what causes the assertion to fail + /// + /// Detailed description of the exact cause of the assertion failure + /// + public static void DefaultAssertionHandler( + bool condition, string message, string details + ) { + Trace.Assert(condition, message, details); + } + + /// Delegate used to handle assertion checks in the code + public static AssertionDelegate AssertionHandler = DefaultAssertionHandler; + +#if XBOX360 + /// XNA games on the XBox 360 can use only 4 of 6 hardware threads + private static Queue XboxHardwareThreads; +#endif + /// Queue of all the callbacks waiting to be executed. + private static Queue waitingCallbacks; + /// + /// Used to signal that a worker thread is needed for processing. Note that multiple + /// threads may be needed simultaneously and as such we use a semaphore instead of + /// an auto reset event. + /// + private static Semaphore workerThreadNeeded; + /// List of all worker threads at the disposal of the thread pool. + private static List workerThreads; + /// Number of threads currently active. + private static int inUseThreads; + + } + +} // namespace Nuclex.Support diff --git a/Source/AssertHelper.cs b/Source/AssertHelper.cs index 1a60731..d3be35e 100644 --- a/Source/AssertHelper.cs +++ b/Source/AssertHelper.cs @@ -25,7 +25,7 @@ using System; using NUnit.Framework; // Decide: -// - Move (part of) this to Nuclex.Support? +// - Move (part of) this to Nuclex.Support? [done] // - Create new Assemblies Nuclex.NUnit.dll and Nuclex.NUnit.Xna.dll? namespace Nuclex.Support {