Found the synchronization problem in the affine thread pool - the auto reset event was being set twice before even a single thread pool thread made it through the WaitOne() - fixed by using a semaphore; implemented a semaphore that should work in the XBox 360; wrote unit tests for said semaphore; added messages to the affine thread pool's unit tests to make debugging easier

git-svn-id: file:///srv/devel/repo-conversion/nusu@177 d2e56fa2-650e-0410-a79f-9358c0239efd
This commit is contained in:
Markus Ewald 2009-09-18 20:06:09 +00:00
parent 09247541f2
commit 874fe0a9e4
6 changed files with 381 additions and 10 deletions

View File

@ -151,6 +151,10 @@
<Compile Include="Source\Scheduling\WindowsTimeSource.Test.cs"> <Compile Include="Source\Scheduling\WindowsTimeSource.Test.cs">
<DependentUpon>WindowsTimeSource.cs</DependentUpon> <DependentUpon>WindowsTimeSource.cs</DependentUpon>
</Compile> </Compile>
<Compile Include="Source\Semaphore.cs" />
<Compile Include="Source\Semaphore.Test.cs">
<DependentUpon>Semaphore.cs</DependentUpon>
</Compile>
<Compile Include="Source\Services\AppDomainTypeLister.cs" /> <Compile Include="Source\Services\AppDomainTypeLister.cs" />
<Compile Include="Source\Services\AppDomainTypeLister.Test.cs"> <Compile Include="Source\Services\AppDomainTypeLister.Test.cs">
<DependentUpon>AppDomainTypeLister.cs</DependentUpon> <DependentUpon>AppDomainTypeLister.cs</DependentUpon>

View File

@ -137,6 +137,10 @@
<Compile Include="Source\Scheduling\WindowsTimeSource.Test.cs"> <Compile Include="Source\Scheduling\WindowsTimeSource.Test.cs">
<DependentUpon>WindowsTimeSource.cs</DependentUpon> <DependentUpon>WindowsTimeSource.cs</DependentUpon>
</Compile> </Compile>
<Compile Include="Source\Semaphore.cs" />
<Compile Include="Source\Semaphore.Test.cs">
<DependentUpon>Semaphore.cs</DependentUpon>
</Compile>
<Compile Include="Source\Services\AppDomainTypeLister.cs" /> <Compile Include="Source\Services\AppDomainTypeLister.cs" />
<Compile Include="Source\Services\AppDomainTypeLister.Test.cs"> <Compile Include="Source\Services\AppDomainTypeLister.Test.cs">
<DependentUpon>AppDomainTypeLister.cs</DependentUpon> <DependentUpon>AppDomainTypeLister.cs</DependentUpon>

View File

@ -281,20 +281,35 @@ namespace Nuclex.Support {
// works on a first come first serve basis, but we don't want to rely on this // works on a first come first serve basis, but we don't want to rely on this
// implementation detail in the unit test. // implementation detail in the unit test.
for(int index = 0; index < eventCount; ++index) { for(int index = 0; index < eventCount; ++index) {
Assert.IsTrue(tasks[index].StartEvent.WaitOne(1000)); Assert.IsTrue(
tasks[index].StartEvent.WaitOne(10000),
"Task " + index.ToString() + " was started"
);
} }
// All Thread should now be active and no work items should be waiting // All Thread should now be active and no work items should be waiting
Assert.AreEqual(createdTasks, AffineThreadPool.ActiveThreads); Assert.AreEqual(
Assert.AreEqual(0, AffineThreadPool.WaitingWorkItems); createdTasks, AffineThreadPool.ActiveThreads,
"ActiveThreads property equals number of tasks"
);
Assert.AreEqual(
0, AffineThreadPool.WaitingWorkItems,
"No waiting work items are in the queue"
);
// Add a task to the queue and make sure the waiting work item count goes up // Add a task to the queue and make sure the waiting work item count goes up
AffineThreadPool.QueueUserWorkItem(delegate(object state) { }); AffineThreadPool.QueueUserWorkItem(delegate(object state) { });
Assert.AreEqual(1, AffineThreadPool.WaitingWorkItems); Assert.AreEqual(
1, AffineThreadPool.WaitingWorkItems,
"Added work item is waiting in the queue"
);
// The same again. Now we should have 2 work items sitting in the queue // The same again. Now we should have 2 work items sitting in the queue
AffineThreadPool.QueueUserWorkItem(delegate(object state) { }); AffineThreadPool.QueueUserWorkItem(delegate(object state) { });
Assert.AreEqual(2, AffineThreadPool.WaitingWorkItems); Assert.AreEqual(
2, AffineThreadPool.WaitingWorkItems,
"Both added work items are waiting in the queue"
);
// Let the WaitTasks finish so we're not blocking the thread pool any longer // Let the WaitTasks finish so we're not blocking the thread pool any longer
for(int index = 0; index < eventCount; ++index) { for(int index = 0; index < eventCount; ++index) {
@ -303,7 +318,10 @@ namespace Nuclex.Support {
// Wait for the tasks to end before we get rid of them // Wait for the tasks to end before we get rid of them
for(int index = 0; index < eventCount; ++index) { for(int index = 0; index < eventCount; ++index) {
Assert.IsTrue(tasks[index].FinishEvent.WaitOne(1000)); Assert.IsTrue(
tasks[index].FinishEvent.WaitOne(1000),
"Task " + index.ToString() + " has finished"
);
} }
} }
finally { finally {
@ -311,7 +329,6 @@ namespace Nuclex.Support {
tasks[createdTasks].Dispose(); tasks[createdTasks].Dispose();
} }
} }
} }
} }

View File

@ -82,7 +82,7 @@ namespace Nuclex.Support {
// as we may run into situations where multiple operations need to be atomic. // as we may run into situations where multiple operations need to be atomic.
// We keep track of the threads we've created just for good measure; not actually // We keep track of the threads we've created just for good measure; not actually
// needed for any core functionality. // needed for any core functionality.
workAvailable = new AutoResetEvent(false); workAvailable = new Semaphore();
userWorkItems = new Queue<UserWorkItem>(CpuCores * 4); userWorkItems = new Queue<UserWorkItem>(CpuCores * 4);
workerThreads = new List<Thread>(CpuCores); workerThreads = new List<Thread>(CpuCores);
inUseThreads = 0; inUseThreads = 0;
@ -144,7 +144,7 @@ namespace Nuclex.Support {
} }
// Wake up one of the worker threads so this task will be processed // Wake up one of the worker threads so this task will be processed
workAvailable.Set(); workAvailable.Release();
} }
@ -280,7 +280,7 @@ namespace Nuclex.Support {
/// <summary> /// <summary>
/// Used to let the threads in the thread pool wait for new work to appear. /// Used to let the threads in the thread pool wait for new work to appear.
/// </summary> /// </summary>
private static AutoResetEvent workAvailable; private static Semaphore workAvailable;
/// <summary>List of all worker threads at the disposal of the thread pool.</summary> /// <summary>List of all worker threads at the disposal of the thread pool.</summary>
private static List<Thread> workerThreads; private static List<Thread> workerThreads;
/// <summary>Number of threads currently active.</summary> /// <summary>Number of threads currently active.</summary>

124
Source/Semaphore.Test.cs Normal file
View File

@ -0,0 +1,124 @@
#region CPL License
/*
Nuclex Framework
Copyright (C) 2002-2009 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#endregion
#if UNITTEST
using System;
using System.Collections.Generic;
using System.Threading;
using NUnit.Framework;
namespace Nuclex.Support {
/// <summary>Unit Test for the Semaphore class</summary>
[TestFixture]
public class SemaphoreTest {
/// <summary>
/// Test whether a semaphore can be initialized with reverse counting
/// </summary>
[Test]
public void TestReverseCountingConstructor() {
using(Semaphore semaphore = new Semaphore()) {
Assert.IsNotNull(semaphore); // nonsense, avoids compiler warning
}
}
/// <summary>
/// Test whether a semaphore can be initialized with a maximum user count
/// </summary>
[Test]
public void TestLimitConstructor() {
using(Semaphore semaphore = new Semaphore(16)) {
Assert.IsNotNull(semaphore); // nonsense, avoids compiler warning
}
}
/// <summary>
/// Test whether a semaphore can be initialized with an initial user
/// count and a maximum user count
/// </summary>
[Test]
public void TestFullConstructor() {
using(Semaphore semaphore = new Semaphore(8, 16)) {
Assert.IsNotNull(semaphore); // nonsense, avoids compiler warning
}
}
/// <summary>
/// Verifies that the right exception is thrown if a semaphore is initialized
/// with a larger number of initial users than the maximum number of users.
/// </summary>
[Test]
public void TestThrowOnMoreInitialUsersThanMaximumUsers() {
Assert.Throws<ArgumentOutOfRangeException>(
delegate() {
Semaphore semaphore = new Semaphore(2, 1);
semaphore.Close();
}
);
}
/// <summary>
/// Verifies that the semaphore can time out if the resource does not become
/// available within the time limit specified by the user
/// </summary>
[Test]
public void TestWaitTimeout() {
using(Semaphore semaphore = new Semaphore(1)) {
Assert.IsTrue(semaphore.WaitOne(1000));
Assert.IsFalse(semaphore.WaitOne(0));
}
}
/// <summary>
/// Verifies that the semaphore can time out if the resource does not become
/// available within the time limit specified by the user, if the time limit
/// is specified using the TimeSpan class
/// </summary>
[Test]
public void TestWaitTimeoutWithTimeSpan() {
using(Semaphore semaphore = new Semaphore(1)) {
Assert.IsTrue(semaphore.WaitOne(TimeSpan.FromSeconds(1)));
Assert.IsFalse(semaphore.WaitOne(TimeSpan.FromSeconds(0)));
}
}
/// <summary>
/// Tests whether an exception is thrown if the WaitOne() method is called
/// with a time span that is too large for the underlying synchronization API
/// </summary>
[Test]
public void TestThrowOnWaitWithTooLargeTimeSpan() {
using(Semaphore semaphore = new Semaphore(1)) {
Assert.Throws<ArgumentOutOfRangeException>(
delegate() {
semaphore.WaitOne(TimeSpan.FromMilliseconds(1L << 32));
}
);
}
}
}
} // namespace Nuclex.Support
#endif // UNITTEST

222
Source/Semaphore.cs Normal file
View File

@ -0,0 +1,222 @@
#region CPL License
/*
Nuclex Framework
Copyright (C) 2002-2009 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#endregion
using System;
using System.Collections.Generic;
using System.Threading;
namespace Nuclex.Support {
/// <summary>A reverse counting semaphore</summary>
/// <remarks>
/// <para>
/// This semaphore counts in reverse, which means you can Release() the semaphore
/// as often as you'd like a thread calling WaitOne() to be let through. You
/// can use it in the traditional sense and have any Thread calling WaitOne()
/// make sure to call Release() afterwards, or you can, for example, Release() it
/// whenever work becomes available and let threads take work from the Semaphore
/// by calling WaitOne() alone.
/// </para>
/// <para>
/// Implementation notes (ignore this if you just want to use the Semaphore)
/// </para>
/// <para>
/// We could design a semaphore that uses an auto reset event, where the thread
/// that gets to pass immediately sets the event again if the semaphore isn't full
/// yet to let another thread pass.
/// </para>
/// <para>
/// However, this would mean that when a semaphore receives a large number of
/// wait requests, assuming it would allow, for example, 25 users at once, the
/// thread scheduler would see only 1 thread become eligible for execution. Then
/// that thread would unlock the next and so on. In short, we wait 25 times
/// for the thread scheduler to wake up a thread until all users get through.
/// </para>
/// <para>
/// So we chose a ManualResetEvent, which will wake up more threads than
/// neccessary and possibly cause a period of intense competition for getting
/// a lock on the resource, but will make the thread scheduler see all threads
/// become eligible for execution.
/// </para>
/// </remarks>
public class Semaphore : WaitHandle {
/// <summary>Initializes a new semaphore</summary>
public Semaphore() {
createEvent();
}
/// <summary>Initializes a new semaphore</summary>
/// <param name="count">
/// Number of users that can access the resource at the same time
/// </param>
public Semaphore(int count) {
this.users = -count;
createEvent();
}
/// <summary>Initializes a new semaphore</summary>
/// <param name="initialCount">
/// Initial number of users accessing the resource
/// </param>
/// <param name="maximumCount">
/// Maximum numbr of users that can access the resource at the same time
/// </param>
public Semaphore(int initialCount, int maximumCount)
: this() {
if(initialCount > maximumCount) {
throw new ArgumentOutOfRangeException(
"initialCount", "Initial count must not be larger than the maximum count"
);
}
this.users = initialCount - maximumCount; // should be negative!
createEvent();
}
/// <summary>Immediately releases all resources owned by the instance</summary>
/// <param name="explicitDisposing">
/// Whether Dispose() has been called explictly
/// </param>
protected override void Dispose(bool explicitDisposing) {
if(this.manualResetEvent != null) {
#if XBOX360
base.Handle = IntPtr.Zero;
#else
base.SafeWaitHandle = null;
#endif
this.manualResetEvent.Close();
this.manualResetEvent = null;
}
base.Dispose(explicitDisposing);
}
/// <summary>
/// Waits for the resource to become available and locks it
/// </summary>
/// <param name="millisecondsTimeout">
/// Number of milliseconds to wait at most before giving up
/// </param>
/// <param name="exitContext">
/// True to exit the synchronization domain for the context before the wait (if
/// in a synchronized context), and reacquire it afterward; otherwise, false.
/// </param>
/// <returns>
/// True if the resource was available and is now locked, false if
/// the timeout has been reached.
/// </returns>
public override bool WaitOne(int millisecondsTimeout, bool exitContext) {
for(; ; ) {
// Lock the resource - even if it is full. We will correct out mistake later
// if we overcomitted the resource.
int newUsers = Interlocked.Increment(ref this.users);
// If we got the resource, let the thread pass without further processing.
if(newUsers <= 0) {
if(newUsers < 0) {
this.manualResetEvent.Set();
}
return true;
}
// We overcomitted the resource, count it down again. We know that, at least
// moments ago, the resource was busy, so block the event.
this.manualResetEvent.Reset();
Thread.MemoryBarrier();
newUsers = Interlocked.Decrement(ref this.users);
// Unless we have been preempted by a Release(), we now have to wait for the
// resource to become available.
if(newUsers >= 0) {
if(!this.manualResetEvent.WaitOne(millisecondsTimeout, exitContext)) {
return false;
}
}
}
}
#if !XBOX360
/// <summary>
/// Waits for the resource to become available and locks it
/// </summary>
/// <param name="timeout">
/// Time span to wait for the lock before giving up
/// </param>
/// <param name="exitContext">
/// True to exit the synchronization domain for the context before the wait (if
/// in a synchronized context), and reacquire it afterward; otherwise, false.
/// </param>
/// <returns>
/// True if the resource was available and is now locked, false if
/// the timeout has been reached.
/// </returns>
public override bool WaitOne(TimeSpan timeout, bool exitContext) {
long totalMilliseconds = (long)timeout.TotalMilliseconds;
if((totalMilliseconds < -1) || (totalMilliseconds > int.MaxValue)) {
throw new ArgumentOutOfRangeException(
"timeout", "Timeout must be either -1 or positive and less than 2^31"
);
}
return WaitOne((int)totalMilliseconds, exitContext);
}
#endif
/// <summary>
/// Releases a lock on the resource. Note that for a reverse counting semaphore,
/// it is legal to Release() the resource before before locking it.
/// </summary>
public void Release() {
// Release one lock on the resource
int newUsers = Interlocked.Decrement(ref this.users);
// Wake up any threads waiting for the resource to become available
this.manualResetEvent.Set();
}
/// <summary>Creates the event used to make threads wait for the resource</summary>
private void createEvent() {
this.manualResetEvent = new ManualResetEvent(false);
#if XBOX360
base.Handle = this.manualResetEvent.Handle;
#else
base.SafeWaitHandle = this.manualResetEvent.SafeWaitHandle;
#endif
}
/// <summary>Event used to make threads wait if the semaphore is full</summary>
private ManualResetEvent manualResetEvent;
/// <summary>Number of users currently accessing the resource</summary>
/// <remarks>
/// Since this is a reverse counting semaphore, it will be negative if
/// the resource is available and 0 if the semaphore is full.
/// </remarks>
private int users;
}
} // namespace Nuclex.Support