Removed the AssertionDelegate stuff and added an UnhandledException delegate to the ThreadPool - maybe I'll turn this into an event analogous to Application.ThreadException; removed the EmptyQueue method which was a flawed concept anyway (especially the disposing of state objects!); rewrote the unit tests to work with the new and improved AffineThreadPool class - there's one test that fails when run through NCover, indicating some synchronization problem that I have yet to track down!
git-svn-id: file:///srv/devel/repo-conversion/nusu@176 d2e56fa2-650e-0410-a79f-9358c0239efd
This commit is contained in:
parent
7a7e71d0c3
commit
09247541f2
|
@ -142,6 +142,7 @@ namespace Nuclex.Support {
|
||||||
|
|
||||||
#endregion // class WaitTask
|
#endregion // class WaitTask
|
||||||
|
|
||||||
|
#if false
|
||||||
#region class ThrowingDisposable
|
#region class ThrowingDisposable
|
||||||
|
|
||||||
/// <summary>Throws an exception when it is disposed</summary>
|
/// <summary>Throws an exception when it is disposed</summary>
|
||||||
|
@ -169,6 +170,7 @@ namespace Nuclex.Support {
|
||||||
);
|
);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/// <summary>Tests whether the QueueUserWorkItem() method is working</summary>
|
/// <summary>Tests whether the QueueUserWorkItem() method is working</summary>
|
||||||
[Test]
|
[Test]
|
||||||
|
@ -200,21 +202,21 @@ namespace Nuclex.Support {
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[Test]
|
[Test]
|
||||||
public void TestExceptionFromUserWorkItem() {
|
public void TestExceptionFromUserWorkItem() {
|
||||||
using(ManualResetEvent assertEvent = new ManualResetEvent(false)) {
|
using(ManualResetEvent exceptionEvent = new ManualResetEvent(false)) {
|
||||||
AffineThreadPool.AssertionDelegate oldAssertionHandler =
|
AffineThreadPool.ExceptionDelegate oldExceptionHandler =
|
||||||
AffineThreadPool.AssertionHandler;
|
AffineThreadPool.ExceptionHandler;
|
||||||
|
|
||||||
AffineThreadPool.AssertionHandler = delegate(
|
AffineThreadPool.ExceptionHandler = delegate(Exception exception) {
|
||||||
bool condition, string message, string details
|
exceptionEvent.Set();
|
||||||
) { assertEvent.Set(); };
|
};
|
||||||
try {
|
try {
|
||||||
AffineThreadPool.QueueUserWorkItem(
|
AffineThreadPool.QueueUserWorkItem(
|
||||||
delegate(object state) { throw new KeyNotFoundException(); }
|
delegate(object state) { throw new KeyNotFoundException(); }
|
||||||
);
|
);
|
||||||
Assert.IsTrue(assertEvent.WaitOne(1000));
|
Assert.IsTrue(exceptionEvent.WaitOne(1000));
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
AffineThreadPool.AssertionHandler = oldAssertionHandler;
|
AffineThreadPool.ExceptionHandler = oldExceptionHandler;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -229,81 +231,87 @@ namespace Nuclex.Support {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Tests whether the thread pool can handle an exception from a user work item
|
/// Verifies that the ProcessThread instance for a system thread id can
|
||||||
|
/// be determined using the GetProcessThread() method
|
||||||
/// </summary>
|
/// </summary>
|
||||||
[Test]
|
[Test]
|
||||||
public void TestExceptionFromDisposableState() {
|
public void TestGetProcessThread() {
|
||||||
using(ManualResetEvent assertEvent = new ManualResetEvent(false)) {
|
Thread.BeginThreadAffinity();
|
||||||
AffineThreadPool.AssertionDelegate oldAssertionHandler =
|
try {
|
||||||
AffineThreadPool.AssertionHandler;
|
int threadId = AffineThreadPool.GetCurrentThreadId();
|
||||||
|
|
||||||
AffineThreadPool.AssertionHandler = delegate(
|
Assert.IsNotNull(AffineThreadPool.GetProcessThread(threadId));
|
||||||
bool condition, string message, string details
|
Assert.IsNull(AffineThreadPool.GetProcessThread(0));
|
||||||
) { assertEvent.Set(); };
|
}
|
||||||
|
finally {
|
||||||
|
Thread.EndThreadAffinity();
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
}
|
||||||
int eventCount = AffineThreadPool.CpuCores;
|
|
||||||
WaitTask[] tasks = new WaitTask[eventCount];
|
|
||||||
|
|
||||||
int createdTasks = 0;
|
/// <summary>
|
||||||
try {
|
/// Verifies that the waiting work items count and active thread count are
|
||||||
|
/// updated by the thread pool.
|
||||||
|
/// </summary>
|
||||||
|
[Test]
|
||||||
|
public void TestWaitingWorkItemsProperty() {
|
||||||
|
int eventCount = AffineThreadPool.CpuCores;
|
||||||
|
WaitTask[] tasks = new WaitTask[eventCount];
|
||||||
|
|
||||||
// Create the tasks, counting up the created task counter. If an exception
|
int createdTasks = 0;
|
||||||
// occurs, we will roll back from there.
|
try {
|
||||||
for(createdTasks = 0; createdTasks < eventCount; ++createdTasks) {
|
// CHECK: Is there danger that the thread pool still has not finished
|
||||||
tasks[createdTasks] = new WaitTask();
|
// queued items for other unit tests, thereby failing to meet
|
||||||
}
|
// our expected task counts?
|
||||||
|
|
||||||
// Schedule the blocking tasks in the thread pool so it will not be able
|
// Create the tasks, counting up the created task counter. If an exception
|
||||||
// to process the next task we add to the queue
|
// occurs, we will roll back from there.
|
||||||
for(int index = 0; index < eventCount; ++index) {
|
for(createdTasks = 0; createdTasks < eventCount; ++createdTasks) {
|
||||||
AffineThreadPool.QueueUserWorkItem(tasks[index].Callback);
|
tasks[createdTasks] = new WaitTask();
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for the tasks to start so they aren't aborted by EmptyQueue()
|
|
||||||
for(int index = 0; index < eventCount; ++index) {
|
|
||||||
Assert.IsTrue(tasks[index].StartEvent.WaitOne(1000));
|
|
||||||
}
|
|
||||||
Assert.AreEqual(createdTasks, AffineThreadPool.ActiveThreads);
|
|
||||||
Assert.AreEqual(0, AffineThreadPool.WaitingCallbacks);
|
|
||||||
|
|
||||||
// Add a task to the queue whose state implements a faulty IDisposable
|
|
||||||
AffineThreadPool.QueueUserWorkItem(
|
|
||||||
delegate(object state) { }, new ThrowingDisposable()
|
|
||||||
);
|
|
||||||
|
|
||||||
Assert.AreEqual(1, AffineThreadPool.WaitingCallbacks);
|
|
||||||
|
|
||||||
// Now clear the thread pool. This should cause the faulty IDisposable
|
|
||||||
// to be disposed and then throw its exception.
|
|
||||||
AffineThreadPool.EmptyQueue();
|
|
||||||
|
|
||||||
// Make sure our custom assertion handler has been triggered
|
|
||||||
Assert.IsTrue(assertEvent.WaitOne(1000));
|
|
||||||
|
|
||||||
Assert.AreEqual(createdTasks, AffineThreadPool.ActiveThreads);
|
|
||||||
Assert.AreEqual(0, AffineThreadPool.WaitingCallbacks);
|
|
||||||
|
|
||||||
// Let the thread pool finish its active tasks
|
|
||||||
for(int index = 0; index < eventCount; ++index) {
|
|
||||||
tasks[index].WaitEvent.Set();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for the tasks to end before we dispose them
|
|
||||||
for(int index = 0; index < eventCount; ++index) {
|
|
||||||
Assert.IsTrue(tasks[index].FinishEvent.WaitOne(1000));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
finally {
|
|
||||||
for(--createdTasks; createdTasks >= 0; --createdTasks) {
|
|
||||||
tasks[createdTasks].Dispose();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
finally {
|
|
||||||
AffineThreadPool.AssertionHandler = oldAssertionHandler;
|
// Schedule the blocking tasks in the thread pool so it will not be able
|
||||||
|
// to process the next task we add to the queue
|
||||||
|
for(int index = 0; index < eventCount; ++index) {
|
||||||
|
AffineThreadPool.QueueUserWorkItem(tasks[index].Callback);
|
||||||
}
|
}
|
||||||
} // using
|
|
||||||
|
// Wait for the tasks to start so they aren't preempted by the tasks we're
|
||||||
|
// going to add (which would finish immediately). The affine thread pool
|
||||||
|
// works on a first come first serve basis, but we don't want to rely on this
|
||||||
|
// implementation detail in the unit test.
|
||||||
|
for(int index = 0; index < eventCount; ++index) {
|
||||||
|
Assert.IsTrue(tasks[index].StartEvent.WaitOne(1000));
|
||||||
|
}
|
||||||
|
|
||||||
|
// All Thread should now be active and no work items should be waiting
|
||||||
|
Assert.AreEqual(createdTasks, AffineThreadPool.ActiveThreads);
|
||||||
|
Assert.AreEqual(0, AffineThreadPool.WaitingWorkItems);
|
||||||
|
|
||||||
|
// Add a task to the queue and make sure the waiting work item count goes up
|
||||||
|
AffineThreadPool.QueueUserWorkItem(delegate(object state) { });
|
||||||
|
Assert.AreEqual(1, AffineThreadPool.WaitingWorkItems);
|
||||||
|
|
||||||
|
// The same again. Now we should have 2 work items sitting in the queue
|
||||||
|
AffineThreadPool.QueueUserWorkItem(delegate(object state) { });
|
||||||
|
Assert.AreEqual(2, AffineThreadPool.WaitingWorkItems);
|
||||||
|
|
||||||
|
// Let the WaitTasks finish so we're not blocking the thread pool any longer
|
||||||
|
for(int index = 0; index < eventCount; ++index) {
|
||||||
|
tasks[index].WaitEvent.Set();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the tasks to end before we get rid of them
|
||||||
|
for(int index = 0; index < eventCount; ++index) {
|
||||||
|
Assert.IsTrue(tasks[index].FinishEvent.WaitOne(1000));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
for(--createdTasks; createdTasks >= 0; --createdTasks) {
|
||||||
|
tasks[createdTasks].Dispose();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,15 +49,9 @@ namespace Nuclex.Support {
|
||||||
public static readonly int CpuCores = Environment.ProcessorCount;
|
public static readonly int CpuCores = Environment.ProcessorCount;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/// <summary>Delegate used by the thread pool to handle assertion checks</summary>
|
/// <summary>Delegate used by the thread pool to report unhandled exceptions</summary>
|
||||||
/// <param name="condition">Condition that will be asserted</param>
|
/// <param name="exception">Exception that has not been handled</param>
|
||||||
/// <param name="message">Message explaining what causes the assertion to fail</param>
|
public delegate void ExceptionDelegate(Exception exception);
|
||||||
/// <param name="details">
|
|
||||||
/// Detailed description of the exact cause of the assertion failure
|
|
||||||
/// </param>
|
|
||||||
public delegate void AssertionDelegate(
|
|
||||||
bool condition, string message, string details
|
|
||||||
);
|
|
||||||
|
|
||||||
#region class UserWorkItem
|
#region class UserWorkItem
|
||||||
|
|
||||||
|
@ -93,10 +87,11 @@ namespace Nuclex.Support {
|
||||||
workerThreads = new List<Thread>(CpuCores);
|
workerThreads = new List<Thread>(CpuCores);
|
||||||
inUseThreads = 0;
|
inUseThreads = 0;
|
||||||
|
|
||||||
// We can only use these hardware thread indices on the XBox 360
|
|
||||||
#if XBOX360
|
#if XBOX360
|
||||||
|
// We can only use these hardware thread indices on the XBox 360
|
||||||
hardwareThreads = new Queue<int>(new int[] { 5, 4, 3, 1 });
|
hardwareThreads = new Queue<int>(new int[] { 5, 4, 3, 1 });
|
||||||
#else
|
#else
|
||||||
|
// We can use all cores in the PC, starting from index 1
|
||||||
hardwareThreads = new Queue<int>(CpuCores);
|
hardwareThreads = new Queue<int>(CpuCores);
|
||||||
for(int core = CpuCores; core >= 1; --core) {
|
for(int core = CpuCores; core >= 1; --core) {
|
||||||
hardwareThreads.Enqueue(core);
|
hardwareThreads.Enqueue(core);
|
||||||
|
@ -153,34 +148,6 @@ namespace Nuclex.Support {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>Empties the work queue of any queued work items</summary>
|
|
||||||
public static void EmptyQueue() {
|
|
||||||
lock(userWorkItems) {
|
|
||||||
try {
|
|
||||||
while(userWorkItems.Count > 0) {
|
|
||||||
UserWorkItem callback = userWorkItems.Dequeue();
|
|
||||||
IDisposable disposableState = callback.State as IDisposable;
|
|
||||||
if(disposableState != null) {
|
|
||||||
disposableState.Dispose();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch(Exception) { // Make sure an error isn't thrown.
|
|
||||||
AssertionHandler(
|
|
||||||
false,
|
|
||||||
"Unhandled exception disposing the state of a user work item",
|
|
||||||
"The AffineThreadPool.EmptyQueue() method tried to dispose of any states" +
|
|
||||||
"associated with waiting user work items. One of the states implementing" +
|
|
||||||
"IDisposable threw an exception during Dispose()."
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clear all waiting items and reset the number of worker threads currently needed
|
|
||||||
// to be 0 (there is nothing for threads to do)
|
|
||||||
userWorkItems.Clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>Gets the number of threads at the disposal of the thread pool</summary>
|
/// <summary>Gets the number of threads at the disposal of the thread pool</summary>
|
||||||
public static int MaxThreads { get { return CpuCores; } }
|
public static int MaxThreads { get { return CpuCores; } }
|
||||||
|
|
||||||
|
@ -190,7 +157,7 @@ namespace Nuclex.Support {
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Gets the number of callback delegates currently waiting in the thread pool
|
/// Gets the number of callback delegates currently waiting in the thread pool
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public static int WaitingCallbacks {
|
public static int WaitingWorkItems {
|
||||||
get {
|
get {
|
||||||
lock(userWorkItems) {
|
lock(userWorkItems) {
|
||||||
return userWorkItems.Count;
|
return userWorkItems.Count;
|
||||||
|
@ -201,27 +168,37 @@ namespace Nuclex.Support {
|
||||||
/// <summary>A thread worker function that processes items from the work queue</summary>
|
/// <summary>A thread worker function that processes items from the work queue</summary>
|
||||||
private static void ProcessQueuedItems() {
|
private static void ProcessQueuedItems() {
|
||||||
|
|
||||||
|
// Get the system/hardware thread index this thread is going to use. We hope that
|
||||||
|
// the threads more or less start after each other, but there is no guarantee that
|
||||||
|
// tasks will be handled by the CPU cores in the order the queue was filled with.
|
||||||
|
// This could be added, though, by using a WaitHandle so the thread creator could
|
||||||
|
// wait for each thread to take one entry out of the queue.
|
||||||
int hardwareThreadIndex;
|
int hardwareThreadIndex;
|
||||||
lock(hardwareThreads) {
|
lock(hardwareThreads) {
|
||||||
hardwareThreadIndex = hardwareThreads.Dequeue();
|
hardwareThreadIndex = hardwareThreads.Dequeue();
|
||||||
}
|
}
|
||||||
|
|
||||||
#if XBOX360
|
#if XBOX360
|
||||||
// MSDN states that SetProcessorAffinity() should be called from the thread
|
// On the XBox 360, the only way to get a thread to run on another core is to
|
||||||
// whose affinity is being changed.
|
// explicitly move it to that core. MSDN states that SetProcessorAffinity() should
|
||||||
|
// be called from the thread whose affinity is being changed.
|
||||||
Thread.CurrentThread.SetProcessorAffinity(new int[] { hardwareThreadIndex });
|
Thread.CurrentThread.SetProcessorAffinity(new int[] { hardwareThreadIndex });
|
||||||
#else
|
#else
|
||||||
// Prevent this managed thread from impersonating another system thread.
|
// Prevent this managed thread from impersonating another system thread.
|
||||||
// Threads in .NET can take
|
// In .NET, managed threads can supposedly be moved to different system threads
|
||||||
|
// and, more worryingly, even fibers. This should make sure we're sitting on
|
||||||
|
// a normal system thread and stay with that thread during our lifetime.
|
||||||
Thread.BeginThreadAffinity();
|
Thread.BeginThreadAffinity();
|
||||||
|
|
||||||
ProcessThread thread = getCurrentProcessThread();
|
// Assign the ideal processor, but don't force it. It's not a good idea to
|
||||||
|
// circumvent the thread scheduler of a desktop machine, so we try to play nice.
|
||||||
|
int threadId = GetCurrentThreadId();
|
||||||
|
ProcessThread thread = GetProcessThread(threadId);
|
||||||
if(thread != null) {
|
if(thread != null) {
|
||||||
thread.IdealProcessor = hardwareThreadIndex;
|
thread.IdealProcessor = hardwareThreadIndex;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
// Keep processing tasks indefinitely
|
// Keep processing tasks indefinitely
|
||||||
for(; ; ) {
|
for(; ; ) {
|
||||||
UserWorkItem workItem = getNextWorkItem();
|
UserWorkItem workItem = getNextWorkItem();
|
||||||
|
@ -232,13 +209,11 @@ namespace Nuclex.Support {
|
||||||
Interlocked.Increment(ref inUseThreads);
|
Interlocked.Increment(ref inUseThreads);
|
||||||
workItem.Callback(workItem.State);
|
workItem.Callback(workItem.State);
|
||||||
}
|
}
|
||||||
catch(Exception) { // Make sure we don't throw here.
|
catch(Exception exception) { // Make sure we don't throw here.
|
||||||
AssertionHandler(
|
ExceptionDelegate exceptionHandler = ExceptionHandler;
|
||||||
false,
|
if(exceptionHandler != null) {
|
||||||
"Unhandled exception in queued user work item",
|
exceptionHandler(exception);
|
||||||
"An unhandled exception travelled up to the AffineThreadPool from" +
|
}
|
||||||
"a queued user work item that was being executed"
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
Interlocked.Decrement(ref inUseThreads);
|
Interlocked.Decrement(ref inUseThreads);
|
||||||
|
@ -249,9 +224,7 @@ namespace Nuclex.Support {
|
||||||
#if !XBOX360
|
#if !XBOX360
|
||||||
/// <summary>Retrieves the ProcessThread for the calling thread</summary>
|
/// <summary>Retrieves the ProcessThread for the calling thread</summary>
|
||||||
/// <returns>The ProcessThread for the calling thread</returns>
|
/// <returns>The ProcessThread for the calling thread</returns>
|
||||||
private static ProcessThread getCurrentProcessThread() {
|
internal static ProcessThread GetProcessThread(int threadId) {
|
||||||
int threadId = GetCurrentThreadId();
|
|
||||||
|
|
||||||
ProcessThreadCollection threads = Process.GetCurrentProcess().Threads;
|
ProcessThreadCollection threads = Process.GetCurrentProcess().Threads;
|
||||||
for(int index = 0; index < threads.Count; ++index) {
|
for(int index = 0; index < threads.Count; ++index) {
|
||||||
if(threads[index].Id == threadId) {
|
if(threads[index].Id == threadId) {
|
||||||
|
@ -290,25 +263,15 @@ namespace Nuclex.Support {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>Default assertion handler for the affine thread pool</summary>
|
|
||||||
/// <param name="condition">Condition which is being asserted</param>
|
|
||||||
/// <param name="message">Message explaining what causes the assertion to fail</param>
|
|
||||||
/// <param name="details">
|
|
||||||
/// Detailed description of the exact cause of the assertion failure
|
|
||||||
/// </param>
|
|
||||||
public static void DefaultAssertionHandler(
|
|
||||||
bool condition, string message, string details
|
|
||||||
) {
|
|
||||||
Trace.Assert(condition, message, details);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// <summary>Delegate used to handle assertion checks in the code</summary>
|
/// <summary>Delegate used to handle assertion checks in the code</summary>
|
||||||
public static AssertionDelegate AssertionHandler = DefaultAssertionHandler;
|
public static volatile ExceptionDelegate ExceptionHandler;
|
||||||
|
|
||||||
|
#if !XBOX360
|
||||||
/// <summary>Retrieves the calling thread's thread id</summary>
|
/// <summary>Retrieves the calling thread's thread id</summary>
|
||||||
/// <returns>The thread is of the calling thread</returns>
|
/// <returns>The thread is of the calling thread</returns>
|
||||||
[DllImport("kernel32.dll")]
|
[DllImport("kernel32.dll")]
|
||||||
private static extern int GetCurrentThreadId();
|
internal static extern int GetCurrentThreadId();
|
||||||
|
#endif
|
||||||
|
|
||||||
/// <summary>Available hardware threads the thread pool threads pick from</summary>
|
/// <summary>Available hardware threads the thread pool threads pick from</summary>
|
||||||
private static Queue<int> hardwareThreads;
|
private static Queue<int> hardwareThreads;
|
||||||
|
|
Loading…
Reference in New Issue
Block a user