From 874fe0a9e49f730656e7aa6e66ada8d67108017c Mon Sep 17 00:00:00 2001 From: Markus Ewald Date: Fri, 18 Sep 2009 20:06:09 +0000 Subject: [PATCH] Found the synchronization problem in the affine thread pool - the auto reset event was being set twice before even a single thread pool thread made it through the WaitOne() - fixed by using a semaphore; implemented a semaphore that should work in the XBox 360; wrote unit tests for said semaphore; added messages to the affine thread pool's unit tests to make debugging easier git-svn-id: file:///srv/devel/repo-conversion/nusu@177 d2e56fa2-650e-0410-a79f-9358c0239efd --- Nuclex.Support (Xbox 360).csproj | 4 + Nuclex.Support.csproj | 4 + Source/AffineThreadPool.Test.cs | 31 ++++- Source/AffineThreadPool.cs | 6 +- Source/Semaphore.Test.cs | 124 +++++++++++++++++ Source/Semaphore.cs | 222 +++++++++++++++++++++++++++++++ 6 files changed, 381 insertions(+), 10 deletions(-) create mode 100644 Source/Semaphore.Test.cs create mode 100644 Source/Semaphore.cs diff --git a/Nuclex.Support (Xbox 360).csproj b/Nuclex.Support (Xbox 360).csproj index 26b1dec..89e660f 100644 --- a/Nuclex.Support (Xbox 360).csproj +++ b/Nuclex.Support (Xbox 360).csproj @@ -151,6 +151,10 @@ WindowsTimeSource.cs + + + Semaphore.cs + AppDomainTypeLister.cs diff --git a/Nuclex.Support.csproj b/Nuclex.Support.csproj index d35843c..d5a896c 100644 --- a/Nuclex.Support.csproj +++ b/Nuclex.Support.csproj @@ -137,6 +137,10 @@ WindowsTimeSource.cs + + + Semaphore.cs + AppDomainTypeLister.cs diff --git a/Source/AffineThreadPool.Test.cs b/Source/AffineThreadPool.Test.cs index 9267cc6..3c3964c 100644 --- a/Source/AffineThreadPool.Test.cs +++ b/Source/AffineThreadPool.Test.cs @@ -281,20 +281,35 @@ namespace Nuclex.Support { // works on a first come first serve basis, but we don't want to rely on this // implementation detail in the unit test. for(int index = 0; index < eventCount; ++index) { - Assert.IsTrue(tasks[index].StartEvent.WaitOne(1000)); + Assert.IsTrue( + tasks[index].StartEvent.WaitOne(10000), + "Task " + index.ToString() + " was started" + ); } // All Thread should now be active and no work items should be waiting - Assert.AreEqual(createdTasks, AffineThreadPool.ActiveThreads); - Assert.AreEqual(0, AffineThreadPool.WaitingWorkItems); + Assert.AreEqual( + createdTasks, AffineThreadPool.ActiveThreads, + "ActiveThreads property equals number of tasks" + ); + Assert.AreEqual( + 0, AffineThreadPool.WaitingWorkItems, + "No waiting work items are in the queue" + ); // Add a task to the queue and make sure the waiting work item count goes up AffineThreadPool.QueueUserWorkItem(delegate(object state) { }); - Assert.AreEqual(1, AffineThreadPool.WaitingWorkItems); + Assert.AreEqual( + 1, AffineThreadPool.WaitingWorkItems, + "Added work item is waiting in the queue" + ); // The same again. Now we should have 2 work items sitting in the queue AffineThreadPool.QueueUserWorkItem(delegate(object state) { }); - Assert.AreEqual(2, AffineThreadPool.WaitingWorkItems); + Assert.AreEqual( + 2, AffineThreadPool.WaitingWorkItems, + "Both added work items are waiting in the queue" + ); // Let the WaitTasks finish so we're not blocking the thread pool any longer for(int index = 0; index < eventCount; ++index) { @@ -303,7 +318,10 @@ namespace Nuclex.Support { // Wait for the tasks to end before we get rid of them for(int index = 0; index < eventCount; ++index) { - Assert.IsTrue(tasks[index].FinishEvent.WaitOne(1000)); + Assert.IsTrue( + tasks[index].FinishEvent.WaitOne(1000), + "Task " + index.ToString() + " has finished" + ); } } finally { @@ -311,7 +329,6 @@ namespace Nuclex.Support { tasks[createdTasks].Dispose(); } } - } } diff --git a/Source/AffineThreadPool.cs b/Source/AffineThreadPool.cs index 923926e..79cfd48 100644 --- a/Source/AffineThreadPool.cs +++ b/Source/AffineThreadPool.cs @@ -82,7 +82,7 @@ namespace Nuclex.Support { // as we may run into situations where multiple operations need to be atomic. // We keep track of the threads we've created just for good measure; not actually // needed for any core functionality. - workAvailable = new AutoResetEvent(false); + workAvailable = new Semaphore(); userWorkItems = new Queue(CpuCores * 4); workerThreads = new List(CpuCores); inUseThreads = 0; @@ -144,7 +144,7 @@ namespace Nuclex.Support { } // Wake up one of the worker threads so this task will be processed - workAvailable.Set(); + workAvailable.Release(); } @@ -280,7 +280,7 @@ namespace Nuclex.Support { /// /// Used to let the threads in the thread pool wait for new work to appear. /// - private static AutoResetEvent workAvailable; + private static Semaphore workAvailable; /// List of all worker threads at the disposal of the thread pool. private static List workerThreads; /// Number of threads currently active. diff --git a/Source/Semaphore.Test.cs b/Source/Semaphore.Test.cs new file mode 100644 index 0000000..dfe5528 --- /dev/null +++ b/Source/Semaphore.Test.cs @@ -0,0 +1,124 @@ +#region CPL License +/* +Nuclex Framework +Copyright (C) 2002-2009 Nuclex Development Labs + +This library is free software; you can redistribute it and/or +modify it under the terms of the IBM Common Public License as +published by the IBM Corporation; either version 1.0 of the +License, or (at your option) any later version. + +This library is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +IBM Common Public License for more details. + +You should have received a copy of the IBM Common Public +License along with this library +*/ +#endregion + +#if UNITTEST + +using System; +using System.Collections.Generic; +using System.Threading; + +using NUnit.Framework; + +namespace Nuclex.Support { + + /// Unit Test for the Semaphore class + [TestFixture] + public class SemaphoreTest { + + /// + /// Test whether a semaphore can be initialized with reverse counting + /// + [Test] + public void TestReverseCountingConstructor() { + using(Semaphore semaphore = new Semaphore()) { + Assert.IsNotNull(semaphore); // nonsense, avoids compiler warning + } + } + + /// + /// Test whether a semaphore can be initialized with a maximum user count + /// + [Test] + public void TestLimitConstructor() { + using(Semaphore semaphore = new Semaphore(16)) { + Assert.IsNotNull(semaphore); // nonsense, avoids compiler warning + } + } + + /// + /// Test whether a semaphore can be initialized with an initial user + /// count and a maximum user count + /// + [Test] + public void TestFullConstructor() { + using(Semaphore semaphore = new Semaphore(8, 16)) { + Assert.IsNotNull(semaphore); // nonsense, avoids compiler warning + } + } + + /// + /// Verifies that the right exception is thrown if a semaphore is initialized + /// with a larger number of initial users than the maximum number of users. + /// + [Test] + public void TestThrowOnMoreInitialUsersThanMaximumUsers() { + Assert.Throws( + delegate() { + Semaphore semaphore = new Semaphore(2, 1); + semaphore.Close(); + } + ); + } + + /// + /// Verifies that the semaphore can time out if the resource does not become + /// available within the time limit specified by the user + /// + [Test] + public void TestWaitTimeout() { + using(Semaphore semaphore = new Semaphore(1)) { + Assert.IsTrue(semaphore.WaitOne(1000)); + Assert.IsFalse(semaphore.WaitOne(0)); + } + } + + /// + /// Verifies that the semaphore can time out if the resource does not become + /// available within the time limit specified by the user, if the time limit + /// is specified using the TimeSpan class + /// + [Test] + public void TestWaitTimeoutWithTimeSpan() { + using(Semaphore semaphore = new Semaphore(1)) { + Assert.IsTrue(semaphore.WaitOne(TimeSpan.FromSeconds(1))); + Assert.IsFalse(semaphore.WaitOne(TimeSpan.FromSeconds(0))); + } + } + + /// + /// Tests whether an exception is thrown if the WaitOne() method is called + /// with a time span that is too large for the underlying synchronization API + /// + [Test] + public void TestThrowOnWaitWithTooLargeTimeSpan() { + using(Semaphore semaphore = new Semaphore(1)) { + Assert.Throws( + delegate() { + semaphore.WaitOne(TimeSpan.FromMilliseconds(1L << 32)); + } + ); + } + } + + } + +} // namespace Nuclex.Support + +#endif // UNITTEST diff --git a/Source/Semaphore.cs b/Source/Semaphore.cs new file mode 100644 index 0000000..fa355c3 --- /dev/null +++ b/Source/Semaphore.cs @@ -0,0 +1,222 @@ +#region CPL License +/* +Nuclex Framework +Copyright (C) 2002-2009 Nuclex Development Labs + +This library is free software; you can redistribute it and/or +modify it under the terms of the IBM Common Public License as +published by the IBM Corporation; either version 1.0 of the +License, or (at your option) any later version. + +This library is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +IBM Common Public License for more details. + +You should have received a copy of the IBM Common Public +License along with this library +*/ +#endregion + +using System; +using System.Collections.Generic; +using System.Threading; + +namespace Nuclex.Support { + + /// A reverse counting semaphore + /// + /// + /// This semaphore counts in reverse, which means you can Release() the semaphore + /// as often as you'd like a thread calling WaitOne() to be let through. You + /// can use it in the traditional sense and have any Thread calling WaitOne() + /// make sure to call Release() afterwards, or you can, for example, Release() it + /// whenever work becomes available and let threads take work from the Semaphore + /// by calling WaitOne() alone. + /// + /// + /// Implementation notes (ignore this if you just want to use the Semaphore) + /// + /// + /// We could design a semaphore that uses an auto reset event, where the thread + /// that gets to pass immediately sets the event again if the semaphore isn't full + /// yet to let another thread pass. + /// + /// + /// However, this would mean that when a semaphore receives a large number of + /// wait requests, assuming it would allow, for example, 25 users at once, the + /// thread scheduler would see only 1 thread become eligible for execution. Then + /// that thread would unlock the next and so on. In short, we wait 25 times + /// for the thread scheduler to wake up a thread until all users get through. + /// + /// + /// So we chose a ManualResetEvent, which will wake up more threads than + /// neccessary and possibly cause a period of intense competition for getting + /// a lock on the resource, but will make the thread scheduler see all threads + /// become eligible for execution. + /// + /// + public class Semaphore : WaitHandle { + + /// Initializes a new semaphore + public Semaphore() { + createEvent(); + } + + /// Initializes a new semaphore + /// + /// Number of users that can access the resource at the same time + /// + public Semaphore(int count) { + this.users = -count; + + createEvent(); + } + + /// Initializes a new semaphore + /// + /// Initial number of users accessing the resource + /// + /// + /// Maximum numbr of users that can access the resource at the same time + /// + public Semaphore(int initialCount, int maximumCount) + : this() { + if(initialCount > maximumCount) { + throw new ArgumentOutOfRangeException( + "initialCount", "Initial count must not be larger than the maximum count" + ); + } + + this.users = initialCount - maximumCount; // should be negative! + createEvent(); + } + + /// Immediately releases all resources owned by the instance + /// + /// Whether Dispose() has been called explictly + /// + protected override void Dispose(bool explicitDisposing) { + if(this.manualResetEvent != null) { +#if XBOX360 + base.Handle = IntPtr.Zero; +#else + base.SafeWaitHandle = null; +#endif + + this.manualResetEvent.Close(); + this.manualResetEvent = null; + } + + base.Dispose(explicitDisposing); + } + + /// + /// Waits for the resource to become available and locks it + /// + /// + /// Number of milliseconds to wait at most before giving up + /// + /// + /// True to exit the synchronization domain for the context before the wait (if + /// in a synchronized context), and reacquire it afterward; otherwise, false. + /// + /// + /// True if the resource was available and is now locked, false if + /// the timeout has been reached. + /// + public override bool WaitOne(int millisecondsTimeout, bool exitContext) { + for(; ; ) { + + // Lock the resource - even if it is full. We will correct out mistake later + // if we overcomitted the resource. + int newUsers = Interlocked.Increment(ref this.users); + + // If we got the resource, let the thread pass without further processing. + if(newUsers <= 0) { + if(newUsers < 0) { + this.manualResetEvent.Set(); + } + + return true; + } + + // We overcomitted the resource, count it down again. We know that, at least + // moments ago, the resource was busy, so block the event. + this.manualResetEvent.Reset(); + Thread.MemoryBarrier(); + newUsers = Interlocked.Decrement(ref this.users); + + // Unless we have been preempted by a Release(), we now have to wait for the + // resource to become available. + if(newUsers >= 0) { + if(!this.manualResetEvent.WaitOne(millisecondsTimeout, exitContext)) { + return false; + } + } + } + } + +#if !XBOX360 + /// + /// Waits for the resource to become available and locks it + /// + /// + /// Time span to wait for the lock before giving up + /// + /// + /// True to exit the synchronization domain for the context before the wait (if + /// in a synchronized context), and reacquire it afterward; otherwise, false. + /// + /// + /// True if the resource was available and is now locked, false if + /// the timeout has been reached. + /// + public override bool WaitOne(TimeSpan timeout, bool exitContext) { + long totalMilliseconds = (long)timeout.TotalMilliseconds; + if((totalMilliseconds < -1) || (totalMilliseconds > int.MaxValue)) { + throw new ArgumentOutOfRangeException( + "timeout", "Timeout must be either -1 or positive and less than 2^31" + ); + } + + return WaitOne((int)totalMilliseconds, exitContext); + } +#endif + + /// + /// Releases a lock on the resource. Note that for a reverse counting semaphore, + /// it is legal to Release() the resource before before locking it. + /// + public void Release() { + + // Release one lock on the resource + int newUsers = Interlocked.Decrement(ref this.users); + + // Wake up any threads waiting for the resource to become available + this.manualResetEvent.Set(); + + } + + /// Creates the event used to make threads wait for the resource + private void createEvent() { + this.manualResetEvent = new ManualResetEvent(false); +#if XBOX360 + base.Handle = this.manualResetEvent.Handle; +#else + base.SafeWaitHandle = this.manualResetEvent.SafeWaitHandle; +#endif + } + + /// Event used to make threads wait if the semaphore is full + private ManualResetEvent manualResetEvent; + /// Number of users currently accessing the resource + /// + /// Since this is a reverse counting semaphore, it will be negative if + /// the resource is available and 0 if the semaphore is full. + /// + private int users; + + } + +} // namespace Nuclex.Support