From 05e4aebaacabd93a2d1e7cca82fc7692e74361f8 Mon Sep 17 00:00:00 2001 From: Markus Ewald Date: Tue, 15 Sep 2009 20:03:19 +0000 Subject: [PATCH] Replaced the affine thread pool with a less elegant implementation that works on the XBox 360 git-svn-id: file:///srv/devel/repo-conversion/nusu@173 d2e56fa2-650e-0410-a79f-9358c0239efd --- Source/AffineThreadPool.cs | 146 ++++++------------------------------- 1 file changed, 24 insertions(+), 122 deletions(-) diff --git a/Source/AffineThreadPool.cs b/Source/AffineThreadPool.cs index 125ace5..d169f75 100644 --- a/Source/AffineThreadPool.cs +++ b/Source/AffineThreadPool.cs @@ -58,100 +58,7 @@ namespace Nuclex.Support { bool condition, string message, string details ); - #region class Semaphore - - /// - /// Lightweight variant of Dijkstra's PV Semaphore semaphore based on - /// the Monitor class. - /// - /// - /// Based on code by Stephen Toub (stoub at microsoft ignorethis dot com). - /// - private class Semaphore { - -#if false // The Thread Pool doesn't use this. - /// Initializes the semaphore as a binary semaphore (mutex) - public Semaphore() : this(1) { } -#endif - - /// Initializes the semaphore as a counting semaphore - /// - /// Initial number of threads that can take out units from this semaphore - /// - /// - /// Thrown if the count argument is less than 1 - /// - public Semaphore(int count) { -#if false // The Thread Pool doesn't make this mistake. - if(count < 0) { - throw new ArgumentException( - "Semaphore must have a count of at least 0.", "count" - ); - } -#endif - this.count = count; - } - - /// V the semaphore (add 1 unit to it). - public void Release() { v(); } - - /// P the semaphore (take out 1 unit from it). - public void WaitOne() { p(); } - - /// P the semaphore (take out 1 unit from it). - private void p() { - - // Lock so we can work in peace. This works because lock is actually - // built around Monitor. - lock(this) { - - // Wait until a unit becomes available. We need to wait in a loop in case - // someone else wakes up before us. This could happen if the Monitor.Pulse - // statements were changed to Monitor.PulseAll statements in order to - // introduce some randomness into the order in which threads are woken. - while(this.count <= 0) { - Monitor.Wait(this, Timeout.Infinite); - } - - --this.count; - - } - - } - - /// V the semaphore (add 1 unit to it). - private void v() { - - // Lock so we can work in peace. This works because lock is actually - // built around Monitor. - lock(this) { - - // Release our hold on the unit of control. Then tell everyone - // waiting on this object that there is a unit available. - ++this.count; - Monitor.Pulse(this); - - } - - } - - /// - /// Resets the semaphore to the specified count. Should be used cautiously. - /// - public void Reset(int count) { - lock(this) { - this.count = count; - } - } - - /// The number of units alloted by this semaphore. - private int count; - - } - - #endregion // class Semaphore - - #region class WaitingCallback + #region class UserWorkItem /// Used to hold a callback delegate and the state for that delegate. private struct UserWorkItem { @@ -171,7 +78,7 @@ namespace Nuclex.Support { } - #endregion // class WaitingCallback + #endregion // class UserWorkItem /// Initializes the thread pool static AffineThreadPool() { @@ -180,18 +87,16 @@ namespace Nuclex.Support { // as we may run into situations where multiple operations need to be atomic. // We keep track of the threads we've created just for good measure; not actually // needed for any core functionality. - waitingCallbacks = new Queue(CpuCores * 4); + workAvailable = new AutoResetEvent(false); + userWorkItems = new Queue(CpuCores * 4); workerThreads = new List(CpuCores); inUseThreads = 0; - + // We can only use these hardware thread indices on the XBox 360 #if XBOX360 XboxHardwareThreads = new Queue(new int[] { 5, 4, 3, 1 }); #endif - // Create our "thread needed" event - workerThreadNeeded = new Semaphore(0); - // Create all of the worker threads for(int index = 0; index < CpuCores; index++) { @@ -203,6 +108,7 @@ namespace Nuclex.Support { newThread.Name = "Nuclex.Support.AffineThreadPool Thread #" + index.ToString(); newThread.IsBackground = true; newThread.Start(); + } } @@ -232,22 +138,21 @@ namespace Nuclex.Support { // Create a waiting callback that contains the delegate and its state. // Add it to the processing queue, and signal that data is waiting. UserWorkItem waiting = new UserWorkItem(callback, state); - lock(waitingCallbacks) { - waitingCallbacks.Enqueue(waiting); + lock(userWorkItems) { + userWorkItems.Enqueue(waiting); } - // Decrement the semaphore into the negative range, so the worker threads will - // be woken up until no more tasks are available. - workerThreadNeeded.Release(); + // Wake up one of the worker threads so this task will be processed + workAvailable.Set(); } - + /// Empties the work queue of any queued work items public static void EmptyQueue() { - lock(waitingCallbacks) { + lock(userWorkItems) { try { - while(waitingCallbacks.Count > 0) { - UserWorkItem callback = waitingCallbacks.Dequeue(); + while(userWorkItems.Count > 0) { + UserWorkItem callback = userWorkItems.Dequeue(); IDisposable disposableState = callback.State as IDisposable; if(disposableState != null) { disposableState.Dispose(); @@ -266,8 +171,7 @@ namespace Nuclex.Support { // Clear all waiting items and reset the number of worker threads currently needed // to be 0 (there is nothing for threads to do) - waitingCallbacks.Clear(); - workerThreadNeeded.Reset(0); + userWorkItems.Clear(); } } @@ -282,8 +186,8 @@ namespace Nuclex.Support { /// public static int WaitingCallbacks { get { - lock(waitingCallbacks) { - return waitingCallbacks.Count; + lock(userWorkItems) { + return userWorkItems.Count; } } } @@ -338,14 +242,14 @@ namespace Nuclex.Support { // Try to get the next callback available. We need to lock on the // queue in order to make our count check and retrieval atomic. - lock(waitingCallbacks) { - if(waitingCallbacks.Count > 0) { - return waitingCallbacks.Dequeue(); + lock(userWorkItems) { + if(userWorkItems.Count > 0) { + return userWorkItems.Dequeue(); } } // If we can't get one, go to sleep. - workerThreadNeeded.WaitOne(); + workAvailable.WaitOne(); } @@ -371,13 +275,11 @@ namespace Nuclex.Support { private static Queue XboxHardwareThreads; #endif /// Queue of all the callbacks waiting to be executed. - private static Queue waitingCallbacks; + private static Queue userWorkItems; /// - /// Used to signal that a worker thread is needed for processing. Note that multiple - /// threads may be needed simultaneously and as such we use a semaphore instead of - /// an auto reset event. + /// Used to let the threads in the thread pool wait for new work to appear. /// - private static Semaphore workerThreadNeeded; + private static AutoResetEvent workAvailable; /// List of all worker threads at the disposal of the thread pool. private static List workerThreads; /// Number of threads currently active.