Added custom CPU core-affine thread pool implementation (which doesn't work on the XBox 360 because Monitor.Wait() and Monitor.Pulse() are not supported - but the code is so nice I want to capture this state in Subversion :D)
git-svn-id: file:///srv/devel/repo-conversion/nusu@172 d2e56fa2-650e-0410-a79f-9358c0239efd
This commit is contained in:
parent
6ef2fdb789
commit
316e2c379a
|
@ -1,4 +1,4 @@
|
|||
<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="3.5">
|
||||
<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003" ToolsVersion="3.5">
|
||||
<PropertyGroup>
|
||||
<ProjectGuid>{DFFEAB70-51B8-4714-BCA6-79B733BBC520}</ProjectGuid>
|
||||
<ProjectTypeGuids>{2DF5C3F4-5A5F-47a9-8E94-23B4456F55E2};{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}</ProjectTypeGuids>
|
||||
|
@ -61,6 +61,9 @@
|
|||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<Compile Include="Properties\AssemblyInfo.cs" />
|
||||
<Compile Include="Source\AffineThreadPool.Test.cs">
|
||||
<DependentUpon>AffineThreadPool.cs</DependentUpon>
|
||||
</Compile>
|
||||
<Compile Include="Source\AssertHelper.cs" />
|
||||
<Compile Include="Source\AssertHelper.Test.cs">
|
||||
<DependentUpon>AssertHelper.cs</DependentUpon>
|
||||
|
@ -130,6 +133,7 @@
|
|||
<Compile Include="Source\Collections\ReverseComparer.Test.cs">
|
||||
<DependentUpon>ReverseComparer.cs</DependentUpon>
|
||||
</Compile>
|
||||
<Compile Include="Source\AffineThreadPool.cs" />
|
||||
<Compile Include="Source\Plugins\PrototypeFactory.cs" />
|
||||
<Compile Include="Source\Plugins\PrototypeFactory.Test.cs">
|
||||
<DependentUpon>PrototypeFactory.cs</DependentUpon>
|
||||
|
|
|
@ -47,6 +47,9 @@
|
|||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<Compile Include="Properties\AssemblyInfo.cs" />
|
||||
<Compile Include="Source\AffineThreadPool.Test.cs">
|
||||
<DependentUpon>AffineThreadPool.cs</DependentUpon>
|
||||
</Compile>
|
||||
<Compile Include="Source\AssertHelper.cs" />
|
||||
<Compile Include="Source\AssertHelper.Test.cs">
|
||||
<DependentUpon>AssertHelper.cs</DependentUpon>
|
||||
|
@ -116,6 +119,7 @@
|
|||
<Compile Include="Source\Collections\ReverseComparer.Test.cs">
|
||||
<DependentUpon>ReverseComparer.cs</DependentUpon>
|
||||
</Compile>
|
||||
<Compile Include="Source\AffineThreadPool.cs" />
|
||||
<Compile Include="Source\Plugins\PrototypeFactory.cs" />
|
||||
<Compile Include="Source\Plugins\PrototypeFactory.Test.cs">
|
||||
<DependentUpon>PrototypeFactory.cs</DependentUpon>
|
||||
|
|
313
Source/AffineThreadPool.Test.cs
Normal file
313
Source/AffineThreadPool.Test.cs
Normal file
|
@ -0,0 +1,313 @@
|
|||
#region CPL License
|
||||
/*
|
||||
Nuclex Framework
|
||||
Copyright (C) 2002-2009 Nuclex Development Labs
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the IBM Common Public License as
|
||||
published by the IBM Corporation; either version 1.0 of the
|
||||
License, or (at your option) any later version.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
IBM Common Public License for more details.
|
||||
|
||||
You should have received a copy of the IBM Common Public
|
||||
License along with this library
|
||||
*/
|
||||
#endregion
|
||||
|
||||
#if UNITTEST
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
|
||||
using NUnit.Framework;
|
||||
|
||||
namespace Nuclex.Support {
|
||||
|
||||
/// <summary>Unit Test for the CPU core-affine thread pool</summary>
|
||||
[TestFixture]
|
||||
public class AffineThreadPoolTest {
|
||||
|
||||
#region class TestTask
|
||||
|
||||
/// <summary>ThreadPool task that can be used for testing</summary>
|
||||
private class TestTask : IDisposable {
|
||||
|
||||
/// <summary>Initializes a new test task</summary>
|
||||
public TestTask() {
|
||||
this.callbackEvent = new ManualResetEvent(false);
|
||||
}
|
||||
|
||||
/// <summary>Immediately releases all resources owned by the instance</summary>
|
||||
public void Dispose() {
|
||||
if(this.callbackEvent != null) {
|
||||
this.callbackEvent.Close();
|
||||
this.callbackEvent = null;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Callback that can be added to the thread pool as a task</summary>
|
||||
/// <param name="state">User defined state</param>
|
||||
public void Callback(object state) {
|
||||
this.LastCallbackState = state;
|
||||
this.callbackEvent.Set();
|
||||
}
|
||||
|
||||
/// <summary>Event that will be set when the callback is executed</summary>
|
||||
public ManualResetEvent CallbackEvent {
|
||||
get { return this.callbackEvent; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// State parameter that was provide when the callback was called
|
||||
/// </summary>
|
||||
public volatile object LastCallbackState;
|
||||
|
||||
/// <summary>Event that will be set when the callback is invoked</summary>
|
||||
private ManualResetEvent callbackEvent;
|
||||
|
||||
}
|
||||
|
||||
#endregion // class TestTask
|
||||
|
||||
#region class WaitTask
|
||||
|
||||
/// <summary>ThreadPool task that can be used for testing</summary>
|
||||
private class WaitTask : IDisposable {
|
||||
|
||||
/// <summary>Initializes a new test task</summary>
|
||||
public WaitTask() {
|
||||
this.startEvent = new ManualResetEvent(false);
|
||||
this.finishEvent = new ManualResetEvent(false);
|
||||
this.waitEvent = new ManualResetEvent(false);
|
||||
}
|
||||
|
||||
/// <summary>Immediately releases all resources owned by the instance</summary>
|
||||
public void Dispose() {
|
||||
if(this.waitEvent != null) {
|
||||
this.waitEvent.Close();
|
||||
this.waitEvent = null;
|
||||
}
|
||||
if(this.finishEvent != null) {
|
||||
this.finishEvent.Close();
|
||||
this.finishEvent = null;
|
||||
}
|
||||
if(this.startEvent != null) {
|
||||
this.startEvent.Close();
|
||||
this.startEvent = null;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Callback that can be added to the thread pool as a task</summary>
|
||||
/// <param name="state">User defined state</param>
|
||||
public void Callback(object state) {
|
||||
this.LastCallbackState = state;
|
||||
this.startEvent.Set();
|
||||
this.waitEvent.WaitOne();
|
||||
this.finishEvent.Set();
|
||||
}
|
||||
|
||||
/// <summary>Event that will be set when the callback has started</summary>
|
||||
public ManualResetEvent StartEvent {
|
||||
get { return this.startEvent; }
|
||||
}
|
||||
|
||||
/// <summary>Event that will be set when the callback has finished</summary>
|
||||
public ManualResetEvent FinishEvent {
|
||||
get { return this.finishEvent; }
|
||||
}
|
||||
|
||||
/// <summary>Event that blocks the callback</summary>
|
||||
public ManualResetEvent WaitEvent {
|
||||
get { return this.waitEvent; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// State parameter that was provide when the callback was called
|
||||
/// </summary>
|
||||
public volatile object LastCallbackState;
|
||||
|
||||
/// <summary>Event that will be set when the callback has started</summary>
|
||||
private ManualResetEvent startEvent;
|
||||
/// <summary>Event that will be set when the callback has finished</summary>
|
||||
private ManualResetEvent finishEvent;
|
||||
/// <summary>Event used to block the callback</summary>
|
||||
private ManualResetEvent waitEvent;
|
||||
|
||||
}
|
||||
|
||||
#endregion // class WaitTask
|
||||
|
||||
#region class ThrowingDisposable
|
||||
|
||||
/// <summary>Throws an exception when it is disposed</summary>
|
||||
private class ThrowingDisposable : IDisposable {
|
||||
|
||||
/// <summary>Immediately releases all resources owned by the instance</summary>
|
||||
public void Dispose() {
|
||||
throw new ArithmeticException("Simulated exception for unit testing");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endregion // class ThrowingDisposable
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that the Thread Pool's default assertion handler is working
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void TestDefaultAssertionHandler() {
|
||||
|
||||
// We can't test a failing assertion because our tests need to run
|
||||
// unattended on a build server without blocking for user input.
|
||||
AffineThreadPool.DefaultAssertionHandler(
|
||||
true, "Unit test", "This should not fail"
|
||||
);
|
||||
|
||||
}
|
||||
|
||||
/// <summary>Tests whether the QueueUserWorkItem() method is working</summary>
|
||||
[Test]
|
||||
public void TestQueueUserWorkItem() {
|
||||
using(TestTask task = new TestTask()) {
|
||||
AffineThreadPool.QueueUserWorkItem(task.Callback);
|
||||
Assert.IsTrue(task.CallbackEvent.WaitOne(1000));
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that the QueueUserWorkItem() method is passing the state parameter
|
||||
/// on to the callback
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void TestQueueUserWorkItemWithState() {
|
||||
using(TestTask task = new TestTask()) {
|
||||
object state = new object();
|
||||
|
||||
AffineThreadPool.QueueUserWorkItem(task.Callback, state);
|
||||
|
||||
Assert.IsTrue(task.CallbackEvent.WaitOne(1000));
|
||||
Assert.AreSame(state, task.LastCallbackState);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tests whether the thread pool can handle an exception from a user work item
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void TestExceptionFromUserWorkItem() {
|
||||
using(ManualResetEvent assertEvent = new ManualResetEvent(false)) {
|
||||
AffineThreadPool.AssertionDelegate oldAssertionHandler =
|
||||
AffineThreadPool.AssertionHandler;
|
||||
|
||||
AffineThreadPool.AssertionHandler = delegate(
|
||||
bool condition, string message, string details
|
||||
) { assertEvent.Set(); };
|
||||
try {
|
||||
AffineThreadPool.QueueUserWorkItem(
|
||||
delegate(object state) { throw new KeyNotFoundException(); }
|
||||
);
|
||||
Assert.IsTrue(assertEvent.WaitOne(1000));
|
||||
}
|
||||
finally {
|
||||
AffineThreadPool.AssertionHandler = oldAssertionHandler;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that the affine thread pool's maximum thread count equals
|
||||
/// the number of logical processors in the system
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void TestMaxThreadsProperty() {
|
||||
Assert.AreEqual(Environment.ProcessorCount, AffineThreadPool.MaxThreads);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tests whether the thread pool can handle an exception from a user work item
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void TestExceptionFromDisposableState() {
|
||||
using(ManualResetEvent assertEvent = new ManualResetEvent(false)) {
|
||||
AffineThreadPool.AssertionDelegate oldAssertionHandler =
|
||||
AffineThreadPool.AssertionHandler;
|
||||
|
||||
AffineThreadPool.AssertionHandler = delegate(
|
||||
bool condition, string message, string details
|
||||
) { assertEvent.Set(); };
|
||||
|
||||
try {
|
||||
int eventCount = AffineThreadPool.CpuCores;
|
||||
WaitTask[] tasks = new WaitTask[eventCount];
|
||||
|
||||
int createdTasks = 0;
|
||||
try {
|
||||
|
||||
// Create the tasks, counting up the created task counter. If an exception
|
||||
// occurs, we will roll back from there.
|
||||
for(createdTasks = 0; createdTasks < eventCount; ++createdTasks) {
|
||||
tasks[createdTasks] = new WaitTask();
|
||||
}
|
||||
|
||||
// Schedule the blocking tasks in the thread pool so it will not be able
|
||||
// to process the next task we add to the queue
|
||||
for(int index = 0; index < eventCount; ++index) {
|
||||
AffineThreadPool.QueueUserWorkItem(tasks[index].Callback);
|
||||
}
|
||||
|
||||
// Wait for the tasks to start so they aren't aborted by EmptyQueue()
|
||||
for(int index = 0; index < eventCount; ++index) {
|
||||
Assert.IsTrue(tasks[index].StartEvent.WaitOne(1000));
|
||||
}
|
||||
Assert.AreEqual(createdTasks, AffineThreadPool.ActiveThreads);
|
||||
Assert.AreEqual(0, AffineThreadPool.WaitingCallbacks);
|
||||
|
||||
// Add a task to the queue whose state implements a faulty IDisposable
|
||||
AffineThreadPool.QueueUserWorkItem(
|
||||
delegate(object state) { }, new ThrowingDisposable()
|
||||
);
|
||||
|
||||
Assert.AreEqual(1, AffineThreadPool.WaitingCallbacks);
|
||||
|
||||
// Now clear the thread pool. This should cause the faulty IDisposable
|
||||
// to be disposed and then throw its exception.
|
||||
AffineThreadPool.EmptyQueue();
|
||||
|
||||
// Make sure our custom assertion handler has been triggered
|
||||
Assert.IsTrue(assertEvent.WaitOne(1000));
|
||||
|
||||
Assert.AreEqual(createdTasks, AffineThreadPool.ActiveThreads);
|
||||
Assert.AreEqual(0, AffineThreadPool.WaitingCallbacks);
|
||||
|
||||
// Let the thread pool finish its active tasks
|
||||
for(int index = 0; index < eventCount; ++index) {
|
||||
tasks[index].WaitEvent.Set();
|
||||
}
|
||||
|
||||
// Wait for the tasks to end before we dispose them
|
||||
for(int index = 0; index < eventCount; ++index) {
|
||||
Assert.IsTrue(tasks[index].FinishEvent.WaitOne(1000));
|
||||
}
|
||||
}
|
||||
finally {
|
||||
for(--createdTasks; createdTasks >= 0; --createdTasks) {
|
||||
tasks[createdTasks].Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
AffineThreadPool.AssertionHandler = oldAssertionHandler;
|
||||
}
|
||||
} // using
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
} // namespace Nuclex.Support
|
||||
|
||||
#endif // UNITTEST
|
388
Source/AffineThreadPool.cs
Normal file
388
Source/AffineThreadPool.cs
Normal file
|
@ -0,0 +1,388 @@
|
|||
#region CPL License
|
||||
/*
|
||||
Nuclex Framework
|
||||
Copyright (C) 2002-2009 Nuclex Development Labs
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the IBM Common Public License as
|
||||
published by the IBM Corporation; either version 1.0 of the
|
||||
License, or (at your option) any later version.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
IBM Common Public License for more details.
|
||||
|
||||
You should have received a copy of the IBM Common Public
|
||||
License along with this library
|
||||
*/
|
||||
#endregion
|
||||
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
|
||||
namespace Nuclex.Support {
|
||||
|
||||
/// <summary>Alternative Thread pool providing one thread for each core</summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Unlike the normal thread pool, the affine thread pool provides only as many
|
||||
/// threads as there are CPU cores available on the current platform. This makes
|
||||
/// it more suitable for tasks you want to spread across all available cpu cores
|
||||
/// explicitely, however, it's not a good match if you just want to run a series
|
||||
/// of tasks asynchronously.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Implementation based on original code provided by Stephen Toub
|
||||
/// (stoub at microsoft ignorethis dot com)
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public static class AffineThreadPool {
|
||||
|
||||
/// <summary>Number of CPU cores available on the system</summary>
|
||||
#if XBOX360
|
||||
public static readonly int CpuCores = 4;
|
||||
#else
|
||||
public static readonly int CpuCores = Environment.ProcessorCount;
|
||||
#endif
|
||||
|
||||
/// <summary>Delegate used by the thread pool to handle assertion checks</summary>
|
||||
/// <param name="condition">Condition that will be asserted</param>
|
||||
/// <param name="message">Message explaining what causes the assertion to fail</param>
|
||||
/// <param name="details">
|
||||
/// Detailed description of the exact cause of the assertion failure
|
||||
/// </param>
|
||||
public delegate void AssertionDelegate(
|
||||
bool condition, string message, string details
|
||||
);
|
||||
|
||||
#region class Semaphore
|
||||
|
||||
/// <summary>
|
||||
/// Lightweight variant of Dijkstra's PV Semaphore semaphore based on
|
||||
/// the Monitor class.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// Based on code by Stephen Toub (stoub at microsoft ignorethis dot com).
|
||||
/// </remarks>
|
||||
private class Semaphore {
|
||||
|
||||
#if false // The Thread Pool doesn't use this.
|
||||
/// <summary>Initializes the semaphore as a binary semaphore (mutex)</summary>
|
||||
public Semaphore() : this(1) { }
|
||||
#endif
|
||||
|
||||
/// <summary>Initializes the semaphore as a counting semaphore</summary>
|
||||
/// <param name="count">
|
||||
/// Initial number of threads that can take out units from this semaphore
|
||||
/// </param>
|
||||
/// <exception cref="ArgumentException">
|
||||
/// Thrown if the count argument is less than 1
|
||||
/// </exception>
|
||||
public Semaphore(int count) {
|
||||
#if false // The Thread Pool doesn't make this mistake.
|
||||
if(count < 0) {
|
||||
throw new ArgumentException(
|
||||
"Semaphore must have a count of at least 0.", "count"
|
||||
);
|
||||
}
|
||||
#endif
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
/// <summary>V the semaphore (add 1 unit to it).</summary>
|
||||
public void Release() { v(); }
|
||||
|
||||
/// <summary>P the semaphore (take out 1 unit from it).</summary>
|
||||
public void WaitOne() { p(); }
|
||||
|
||||
/// <summary>P the semaphore (take out 1 unit from it).</summary>
|
||||
private void p() {
|
||||
|
||||
// Lock so we can work in peace. This works because lock is actually
|
||||
// built around Monitor.
|
||||
lock(this) {
|
||||
|
||||
// Wait until a unit becomes available. We need to wait in a loop in case
|
||||
// someone else wakes up before us. This could happen if the Monitor.Pulse
|
||||
// statements were changed to Monitor.PulseAll statements in order to
|
||||
// introduce some randomness into the order in which threads are woken.
|
||||
while(this.count <= 0) {
|
||||
Monitor.Wait(this, Timeout.Infinite);
|
||||
}
|
||||
|
||||
--this.count;
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// <summary>V the semaphore (add 1 unit to it).</summary>
|
||||
private void v() {
|
||||
|
||||
// Lock so we can work in peace. This works because lock is actually
|
||||
// built around Monitor.
|
||||
lock(this) {
|
||||
|
||||
// Release our hold on the unit of control. Then tell everyone
|
||||
// waiting on this object that there is a unit available.
|
||||
++this.count;
|
||||
Monitor.Pulse(this);
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Resets the semaphore to the specified count. Should be used cautiously.
|
||||
/// </summary>
|
||||
public void Reset(int count) {
|
||||
lock(this) {
|
||||
this.count = count;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>The number of units alloted by this semaphore.</summary>
|
||||
private int count;
|
||||
|
||||
}
|
||||
|
||||
#endregion // class Semaphore
|
||||
|
||||
#region class WaitingCallback
|
||||
|
||||
/// <summary>Used to hold a callback delegate and the state for that delegate.</summary>
|
||||
private struct UserWorkItem {
|
||||
|
||||
/// <summary>Initialize the callback holding object.</summary>
|
||||
/// <param name="callback">Callback delegate for the callback.</param>
|
||||
/// <param name="state">State with which to call the callback delegate.</param>
|
||||
public UserWorkItem(WaitCallback callback, object state) {
|
||||
this.Callback = callback;
|
||||
this.State = state;
|
||||
}
|
||||
|
||||
/// <summary>Callback delegate for the callback.</summary>
|
||||
public WaitCallback Callback;
|
||||
/// <summary>State with which to call the callback delegate.</summary>
|
||||
public object State;
|
||||
|
||||
}
|
||||
|
||||
#endregion // class WaitingCallback
|
||||
|
||||
/// <summary>Initializes the thread pool</summary>
|
||||
static AffineThreadPool() {
|
||||
|
||||
// Create our thread stores; we handle synchronization ourself
|
||||
// as we may run into situations where multiple operations need to be atomic.
|
||||
// We keep track of the threads we've created just for good measure; not actually
|
||||
// needed for any core functionality.
|
||||
waitingCallbacks = new Queue<UserWorkItem>(CpuCores * 4);
|
||||
workerThreads = new List<Thread>(CpuCores);
|
||||
inUseThreads = 0;
|
||||
|
||||
// We can only use these hardware thread indices on the XBox 360
|
||||
#if XBOX360
|
||||
XboxHardwareThreads = new Queue<int>(new int[] { 5, 4, 3, 1 });
|
||||
#endif
|
||||
|
||||
// Create our "thread needed" event
|
||||
workerThreadNeeded = new Semaphore(0);
|
||||
|
||||
// Create all of the worker threads
|
||||
for(int index = 0; index < CpuCores; index++) {
|
||||
|
||||
// Create a new thread and add it to the list of threads.
|
||||
Thread newThread = new Thread(new ThreadStart(ProcessQueuedItems));
|
||||
workerThreads.Add(newThread);
|
||||
|
||||
// Configure the new thread and start it
|
||||
newThread.Name = "Nuclex.Support.AffineThreadPool Thread #" + index.ToString();
|
||||
newThread.IsBackground = true;
|
||||
newThread.Start();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// <summary>Queues a user work item to the thread pool</summary>
|
||||
/// <param name="callback">
|
||||
/// A WaitCallback representing the delegate to invoke when a thread in the
|
||||
/// thread pool picks up the work item
|
||||
/// </param>
|
||||
public static void QueueUserWorkItem(WaitCallback callback) {
|
||||
|
||||
// Queue the delegate with no state
|
||||
QueueUserWorkItem(callback, null);
|
||||
|
||||
}
|
||||
|
||||
/// <summary>Queues a user work item to the thread pool.</summary>
|
||||
/// <param name="callback">
|
||||
/// A WaitCallback representing the delegate to invoke when a thread in the
|
||||
/// thread pool picks up the work item
|
||||
/// </param>
|
||||
/// <param name="state">
|
||||
/// The object that is passed to the delegate when serviced from the thread pool
|
||||
/// </param>
|
||||
public static void QueueUserWorkItem(WaitCallback callback, object state) {
|
||||
|
||||
// Create a waiting callback that contains the delegate and its state.
|
||||
// Add it to the processing queue, and signal that data is waiting.
|
||||
UserWorkItem waiting = new UserWorkItem(callback, state);
|
||||
lock(waitingCallbacks) {
|
||||
waitingCallbacks.Enqueue(waiting);
|
||||
}
|
||||
|
||||
// Decrement the semaphore into the negative range, so the worker threads will
|
||||
// be woken up until no more tasks are available.
|
||||
workerThreadNeeded.Release();
|
||||
|
||||
}
|
||||
|
||||
/// <summary>Empties the work queue of any queued work items</summary>
|
||||
public static void EmptyQueue() {
|
||||
lock(waitingCallbacks) {
|
||||
try {
|
||||
while(waitingCallbacks.Count > 0) {
|
||||
UserWorkItem callback = waitingCallbacks.Dequeue();
|
||||
IDisposable disposableState = callback.State as IDisposable;
|
||||
if(disposableState != null) {
|
||||
disposableState.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
catch(Exception) { // Make sure an error isn't thrown.
|
||||
AssertionHandler(
|
||||
false,
|
||||
"Unhandled exception disposing the state of a user work item",
|
||||
"The AffineThreadPool.EmptyQueue() method tried to dispose of any states" +
|
||||
"associated with waiting user work items. One of the states implementing" +
|
||||
"IDisposable threw an exception during Dispose()."
|
||||
);
|
||||
}
|
||||
|
||||
// Clear all waiting items and reset the number of worker threads currently needed
|
||||
// to be 0 (there is nothing for threads to do)
|
||||
waitingCallbacks.Clear();
|
||||
workerThreadNeeded.Reset(0);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Gets the number of threads at the disposal of the thread pool</summary>
|
||||
public static int MaxThreads { get { return CpuCores; } }
|
||||
|
||||
/// <summary>Gets the number of currently active threads in the thread pool</summary>
|
||||
public static int ActiveThreads { get { return inUseThreads; } }
|
||||
|
||||
/// <summary>
|
||||
/// Gets the number of callback delegates currently waiting in the thread pool
|
||||
/// </summary>
|
||||
public static int WaitingCallbacks {
|
||||
get {
|
||||
lock(waitingCallbacks) {
|
||||
return waitingCallbacks.Count;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>A thread worker function that processes items from the work queue</summary>
|
||||
private static void ProcessQueuedItems() {
|
||||
#if XBOX360
|
||||
// MSDN states that SetProcessorAffinity() should be called from the thread
|
||||
// whose affinity is being changed.
|
||||
int hardwareThreadIndex;
|
||||
lock(XboxHardwareThreads) {
|
||||
hardwareThreadIndex = XboxHardwareThreads.Dequeue();
|
||||
}
|
||||
Thread.CurrentThread.SetProcessorAffinity(new int[] { hardwareThreadIndex });
|
||||
#endif
|
||||
|
||||
// Keep processing tasks indefinitely
|
||||
for(; ; ) {
|
||||
UserWorkItem workItem = getNextWorkItem();
|
||||
|
||||
// Execute the work item we just picked up. Make sure to accurately
|
||||
// record how many callbacks are currently executing.
|
||||
try {
|
||||
Interlocked.Increment(ref inUseThreads);
|
||||
workItem.Callback(workItem.State);
|
||||
}
|
||||
catch(Exception) { // Make sure we don't throw here.
|
||||
AssertionHandler(
|
||||
false,
|
||||
"Unhandled exception in queued user work item",
|
||||
"An unhandled exception travelled up to the AffineThreadPool from" +
|
||||
"a queued user work item that was being executed"
|
||||
);
|
||||
}
|
||||
finally {
|
||||
Interlocked.Decrement(ref inUseThreads);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Obtains the next work item from the queue</summary>
|
||||
/// <returns>The next work item in the queue</returns>
|
||||
/// <remarks>
|
||||
/// If the queue is empty, the call will block until an item is added to
|
||||
/// the queue and the calling thread was the one picking it up.
|
||||
/// </remarks>
|
||||
private static UserWorkItem getNextWorkItem() {
|
||||
|
||||
// Get the next item in the queue. If there is nothing there, go to sleep
|
||||
// for a while until we're woken up when a callback is waiting.
|
||||
for(; ; ) {
|
||||
|
||||
// Try to get the next callback available. We need to lock on the
|
||||
// queue in order to make our count check and retrieval atomic.
|
||||
lock(waitingCallbacks) {
|
||||
if(waitingCallbacks.Count > 0) {
|
||||
return waitingCallbacks.Dequeue();
|
||||
}
|
||||
}
|
||||
|
||||
// If we can't get one, go to sleep.
|
||||
workerThreadNeeded.WaitOne();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// <summary>Default assertion handler for the affine thread pool</summary>
|
||||
/// <param name="condition">Condition which is being asserted</param>
|
||||
/// <param name="message">Message explaining what causes the assertion to fail</param>
|
||||
/// <param name="details">
|
||||
/// Detailed description of the exact cause of the assertion failure
|
||||
/// </param>
|
||||
public static void DefaultAssertionHandler(
|
||||
bool condition, string message, string details
|
||||
) {
|
||||
Trace.Assert(condition, message, details);
|
||||
}
|
||||
|
||||
/// <summary>Delegate used to handle assertion checks in the code</summary>
|
||||
public static AssertionDelegate AssertionHandler = DefaultAssertionHandler;
|
||||
|
||||
#if XBOX360
|
||||
/// <summary>XNA games on the XBox 360 can use only 4 of 6 hardware threads</summary>
|
||||
private static Queue<int> XboxHardwareThreads;
|
||||
#endif
|
||||
/// <summary>Queue of all the callbacks waiting to be executed.</summary>
|
||||
private static Queue<UserWorkItem> waitingCallbacks;
|
||||
/// <summary>
|
||||
/// Used to signal that a worker thread is needed for processing. Note that multiple
|
||||
/// threads may be needed simultaneously and as such we use a semaphore instead of
|
||||
/// an auto reset event.
|
||||
/// </summary>
|
||||
private static Semaphore workerThreadNeeded;
|
||||
/// <summary>List of all worker threads at the disposal of the thread pool.</summary>
|
||||
private static List<Thread> workerThreads;
|
||||
/// <summary>Number of threads currently active.</summary>
|
||||
private static int inUseThreads;
|
||||
|
||||
}
|
||||
|
||||
} // namespace Nuclex.Support
|
|
@ -25,7 +25,7 @@ using System;
|
|||
using NUnit.Framework;
|
||||
|
||||
// Decide:
|
||||
// - Move (part of) this to Nuclex.Support?
|
||||
// - Move (part of) this to Nuclex.Support? [done]
|
||||
// - Create new Assemblies Nuclex.NUnit.dll and Nuclex.NUnit.Xna.dll?
|
||||
|
||||
namespace Nuclex.Support {
|
||||
|
|
Loading…
Reference in New Issue
Block a user