Replaced the affine thread pool with a less elegant implementation that works on the XBox 360

git-svn-id: file:///srv/devel/repo-conversion/nusu@173 d2e56fa2-650e-0410-a79f-9358c0239efd
This commit is contained in:
Markus Ewald 2009-09-15 20:03:19 +00:00
parent 316e2c379a
commit 05e4aebaac

View File

@ -58,100 +58,7 @@ namespace Nuclex.Support {
bool condition, string message, string details bool condition, string message, string details
); );
#region class Semaphore #region class UserWorkItem
/// <summary>
/// Lightweight variant of Dijkstra's PV Semaphore semaphore based on
/// the Monitor class.
/// </summary>
/// <remarks>
/// Based on code by Stephen Toub (stoub at microsoft ignorethis dot com).
/// </remarks>
private class Semaphore {
#if false // The Thread Pool doesn't use this.
/// <summary>Initializes the semaphore as a binary semaphore (mutex)</summary>
public Semaphore() : this(1) { }
#endif
/// <summary>Initializes the semaphore as a counting semaphore</summary>
/// <param name="count">
/// Initial number of threads that can take out units from this semaphore
/// </param>
/// <exception cref="ArgumentException">
/// Thrown if the count argument is less than 1
/// </exception>
public Semaphore(int count) {
#if false // The Thread Pool doesn't make this mistake.
if(count < 0) {
throw new ArgumentException(
"Semaphore must have a count of at least 0.", "count"
);
}
#endif
this.count = count;
}
/// <summary>V the semaphore (add 1 unit to it).</summary>
public void Release() { v(); }
/// <summary>P the semaphore (take out 1 unit from it).</summary>
public void WaitOne() { p(); }
/// <summary>P the semaphore (take out 1 unit from it).</summary>
private void p() {
// Lock so we can work in peace. This works because lock is actually
// built around Monitor.
lock(this) {
// Wait until a unit becomes available. We need to wait in a loop in case
// someone else wakes up before us. This could happen if the Monitor.Pulse
// statements were changed to Monitor.PulseAll statements in order to
// introduce some randomness into the order in which threads are woken.
while(this.count <= 0) {
Monitor.Wait(this, Timeout.Infinite);
}
--this.count;
}
}
/// <summary>V the semaphore (add 1 unit to it).</summary>
private void v() {
// Lock so we can work in peace. This works because lock is actually
// built around Monitor.
lock(this) {
// Release our hold on the unit of control. Then tell everyone
// waiting on this object that there is a unit available.
++this.count;
Monitor.Pulse(this);
}
}
/// <summary>
/// Resets the semaphore to the specified count. Should be used cautiously.
/// </summary>
public void Reset(int count) {
lock(this) {
this.count = count;
}
}
/// <summary>The number of units alloted by this semaphore.</summary>
private int count;
}
#endregion // class Semaphore
#region class WaitingCallback
/// <summary>Used to hold a callback delegate and the state for that delegate.</summary> /// <summary>Used to hold a callback delegate and the state for that delegate.</summary>
private struct UserWorkItem { private struct UserWorkItem {
@ -171,7 +78,7 @@ namespace Nuclex.Support {
} }
#endregion // class WaitingCallback #endregion // class UserWorkItem
/// <summary>Initializes the thread pool</summary> /// <summary>Initializes the thread pool</summary>
static AffineThreadPool() { static AffineThreadPool() {
@ -180,7 +87,8 @@ namespace Nuclex.Support {
// as we may run into situations where multiple operations need to be atomic. // as we may run into situations where multiple operations need to be atomic.
// We keep track of the threads we've created just for good measure; not actually // We keep track of the threads we've created just for good measure; not actually
// needed for any core functionality. // needed for any core functionality.
waitingCallbacks = new Queue<UserWorkItem>(CpuCores * 4); workAvailable = new AutoResetEvent(false);
userWorkItems = new Queue<UserWorkItem>(CpuCores * 4);
workerThreads = new List<Thread>(CpuCores); workerThreads = new List<Thread>(CpuCores);
inUseThreads = 0; inUseThreads = 0;
@ -189,9 +97,6 @@ namespace Nuclex.Support {
XboxHardwareThreads = new Queue<int>(new int[] { 5, 4, 3, 1 }); XboxHardwareThreads = new Queue<int>(new int[] { 5, 4, 3, 1 });
#endif #endif
// Create our "thread needed" event
workerThreadNeeded = new Semaphore(0);
// Create all of the worker threads // Create all of the worker threads
for(int index = 0; index < CpuCores; index++) { for(int index = 0; index < CpuCores; index++) {
@ -203,6 +108,7 @@ namespace Nuclex.Support {
newThread.Name = "Nuclex.Support.AffineThreadPool Thread #" + index.ToString(); newThread.Name = "Nuclex.Support.AffineThreadPool Thread #" + index.ToString();
newThread.IsBackground = true; newThread.IsBackground = true;
newThread.Start(); newThread.Start();
} }
} }
@ -232,22 +138,21 @@ namespace Nuclex.Support {
// Create a waiting callback that contains the delegate and its state. // Create a waiting callback that contains the delegate and its state.
// Add it to the processing queue, and signal that data is waiting. // Add it to the processing queue, and signal that data is waiting.
UserWorkItem waiting = new UserWorkItem(callback, state); UserWorkItem waiting = new UserWorkItem(callback, state);
lock(waitingCallbacks) { lock(userWorkItems) {
waitingCallbacks.Enqueue(waiting); userWorkItems.Enqueue(waiting);
} }
// Decrement the semaphore into the negative range, so the worker threads will // Wake up one of the worker threads so this task will be processed
// be woken up until no more tasks are available. workAvailable.Set();
workerThreadNeeded.Release();
} }
/// <summary>Empties the work queue of any queued work items</summary> /// <summary>Empties the work queue of any queued work items</summary>
public static void EmptyQueue() { public static void EmptyQueue() {
lock(waitingCallbacks) { lock(userWorkItems) {
try { try {
while(waitingCallbacks.Count > 0) { while(userWorkItems.Count > 0) {
UserWorkItem callback = waitingCallbacks.Dequeue(); UserWorkItem callback = userWorkItems.Dequeue();
IDisposable disposableState = callback.State as IDisposable; IDisposable disposableState = callback.State as IDisposable;
if(disposableState != null) { if(disposableState != null) {
disposableState.Dispose(); disposableState.Dispose();
@ -266,8 +171,7 @@ namespace Nuclex.Support {
// Clear all waiting items and reset the number of worker threads currently needed // Clear all waiting items and reset the number of worker threads currently needed
// to be 0 (there is nothing for threads to do) // to be 0 (there is nothing for threads to do)
waitingCallbacks.Clear(); userWorkItems.Clear();
workerThreadNeeded.Reset(0);
} }
} }
@ -282,8 +186,8 @@ namespace Nuclex.Support {
/// </summary> /// </summary>
public static int WaitingCallbacks { public static int WaitingCallbacks {
get { get {
lock(waitingCallbacks) { lock(userWorkItems) {
return waitingCallbacks.Count; return userWorkItems.Count;
} }
} }
} }
@ -338,14 +242,14 @@ namespace Nuclex.Support {
// Try to get the next callback available. We need to lock on the // Try to get the next callback available. We need to lock on the
// queue in order to make our count check and retrieval atomic. // queue in order to make our count check and retrieval atomic.
lock(waitingCallbacks) { lock(userWorkItems) {
if(waitingCallbacks.Count > 0) { if(userWorkItems.Count > 0) {
return waitingCallbacks.Dequeue(); return userWorkItems.Dequeue();
} }
} }
// If we can't get one, go to sleep. // If we can't get one, go to sleep.
workerThreadNeeded.WaitOne(); workAvailable.WaitOne();
} }
@ -371,13 +275,11 @@ namespace Nuclex.Support {
private static Queue<int> XboxHardwareThreads; private static Queue<int> XboxHardwareThreads;
#endif #endif
/// <summary>Queue of all the callbacks waiting to be executed.</summary> /// <summary>Queue of all the callbacks waiting to be executed.</summary>
private static Queue<UserWorkItem> waitingCallbacks; private static Queue<UserWorkItem> userWorkItems;
/// <summary> /// <summary>
/// Used to signal that a worker thread is needed for processing. Note that multiple /// Used to let the threads in the thread pool wait for new work to appear.
/// threads may be needed simultaneously and as such we use a semaphore instead of
/// an auto reset event.
/// </summary> /// </summary>
private static Semaphore workerThreadNeeded; private static AutoResetEvent workAvailable;
/// <summary>List of all worker threads at the disposal of the thread pool.</summary> /// <summary>List of all worker threads at the disposal of the thread pool.</summary>
private static List<Thread> workerThreads; private static List<Thread> workerThreads;
/// <summary>Number of threads currently active.</summary> /// <summary>Number of threads currently active.</summary>