diff --git a/Nuclex.Support (Xbox 360).csproj b/Nuclex.Support (Xbox 360).csproj
index 26b1dec..89e660f 100644
--- a/Nuclex.Support (Xbox 360).csproj
+++ b/Nuclex.Support (Xbox 360).csproj
@@ -151,6 +151,10 @@
WindowsTimeSource.cs
+
+
+ Semaphore.cs
+
AppDomainTypeLister.cs
diff --git a/Nuclex.Support.csproj b/Nuclex.Support.csproj
index d35843c..d5a896c 100644
--- a/Nuclex.Support.csproj
+++ b/Nuclex.Support.csproj
@@ -137,6 +137,10 @@
WindowsTimeSource.cs
+
+
+ Semaphore.cs
+
AppDomainTypeLister.cs
diff --git a/Source/AffineThreadPool.Test.cs b/Source/AffineThreadPool.Test.cs
index 9267cc6..3c3964c 100644
--- a/Source/AffineThreadPool.Test.cs
+++ b/Source/AffineThreadPool.Test.cs
@@ -281,20 +281,35 @@ namespace Nuclex.Support {
// works on a first come first serve basis, but we don't want to rely on this
// implementation detail in the unit test.
for(int index = 0; index < eventCount; ++index) {
- Assert.IsTrue(tasks[index].StartEvent.WaitOne(1000));
+ Assert.IsTrue(
+ tasks[index].StartEvent.WaitOne(10000),
+ "Task " + index.ToString() + " was started"
+ );
}
// All Thread should now be active and no work items should be waiting
- Assert.AreEqual(createdTasks, AffineThreadPool.ActiveThreads);
- Assert.AreEqual(0, AffineThreadPool.WaitingWorkItems);
+ Assert.AreEqual(
+ createdTasks, AffineThreadPool.ActiveThreads,
+ "ActiveThreads property equals number of tasks"
+ );
+ Assert.AreEqual(
+ 0, AffineThreadPool.WaitingWorkItems,
+ "No waiting work items are in the queue"
+ );
// Add a task to the queue and make sure the waiting work item count goes up
AffineThreadPool.QueueUserWorkItem(delegate(object state) { });
- Assert.AreEqual(1, AffineThreadPool.WaitingWorkItems);
+ Assert.AreEqual(
+ 1, AffineThreadPool.WaitingWorkItems,
+ "Added work item is waiting in the queue"
+ );
// The same again. Now we should have 2 work items sitting in the queue
AffineThreadPool.QueueUserWorkItem(delegate(object state) { });
- Assert.AreEqual(2, AffineThreadPool.WaitingWorkItems);
+ Assert.AreEqual(
+ 2, AffineThreadPool.WaitingWorkItems,
+ "Both added work items are waiting in the queue"
+ );
// Let the WaitTasks finish so we're not blocking the thread pool any longer
for(int index = 0; index < eventCount; ++index) {
@@ -303,7 +318,10 @@ namespace Nuclex.Support {
// Wait for the tasks to end before we get rid of them
for(int index = 0; index < eventCount; ++index) {
- Assert.IsTrue(tasks[index].FinishEvent.WaitOne(1000));
+ Assert.IsTrue(
+ tasks[index].FinishEvent.WaitOne(1000),
+ "Task " + index.ToString() + " has finished"
+ );
}
}
finally {
@@ -311,7 +329,6 @@ namespace Nuclex.Support {
tasks[createdTasks].Dispose();
}
}
-
}
}
diff --git a/Source/AffineThreadPool.cs b/Source/AffineThreadPool.cs
index 923926e..79cfd48 100644
--- a/Source/AffineThreadPool.cs
+++ b/Source/AffineThreadPool.cs
@@ -82,7 +82,7 @@ namespace Nuclex.Support {
// as we may run into situations where multiple operations need to be atomic.
// We keep track of the threads we've created just for good measure; not actually
// needed for any core functionality.
- workAvailable = new AutoResetEvent(false);
+ workAvailable = new Semaphore();
userWorkItems = new Queue(CpuCores * 4);
workerThreads = new List(CpuCores);
inUseThreads = 0;
@@ -144,7 +144,7 @@ namespace Nuclex.Support {
}
// Wake up one of the worker threads so this task will be processed
- workAvailable.Set();
+ workAvailable.Release();
}
@@ -280,7 +280,7 @@ namespace Nuclex.Support {
///
/// Used to let the threads in the thread pool wait for new work to appear.
///
- private static AutoResetEvent workAvailable;
+ private static Semaphore workAvailable;
/// List of all worker threads at the disposal of the thread pool.
private static List workerThreads;
/// Number of threads currently active.
diff --git a/Source/Semaphore.Test.cs b/Source/Semaphore.Test.cs
new file mode 100644
index 0000000..dfe5528
--- /dev/null
+++ b/Source/Semaphore.Test.cs
@@ -0,0 +1,124 @@
+#region CPL License
+/*
+Nuclex Framework
+Copyright (C) 2002-2009 Nuclex Development Labs
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the IBM Common Public License as
+published by the IBM Corporation; either version 1.0 of the
+License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+IBM Common Public License for more details.
+
+You should have received a copy of the IBM Common Public
+License along with this library
+*/
+#endregion
+
+#if UNITTEST
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+using NUnit.Framework;
+
+namespace Nuclex.Support {
+
+ /// Unit Test for the Semaphore class
+ [TestFixture]
+ public class SemaphoreTest {
+
+ ///
+ /// Test whether a semaphore can be initialized with reverse counting
+ ///
+ [Test]
+ public void TestReverseCountingConstructor() {
+ using(Semaphore semaphore = new Semaphore()) {
+ Assert.IsNotNull(semaphore); // nonsense, avoids compiler warning
+ }
+ }
+
+ ///
+ /// Test whether a semaphore can be initialized with a maximum user count
+ ///
+ [Test]
+ public void TestLimitConstructor() {
+ using(Semaphore semaphore = new Semaphore(16)) {
+ Assert.IsNotNull(semaphore); // nonsense, avoids compiler warning
+ }
+ }
+
+ ///
+ /// Test whether a semaphore can be initialized with an initial user
+ /// count and a maximum user count
+ ///
+ [Test]
+ public void TestFullConstructor() {
+ using(Semaphore semaphore = new Semaphore(8, 16)) {
+ Assert.IsNotNull(semaphore); // nonsense, avoids compiler warning
+ }
+ }
+
+ ///
+ /// Verifies that the right exception is thrown if a semaphore is initialized
+ /// with a larger number of initial users than the maximum number of users.
+ ///
+ [Test]
+ public void TestThrowOnMoreInitialUsersThanMaximumUsers() {
+ Assert.Throws(
+ delegate() {
+ Semaphore semaphore = new Semaphore(2, 1);
+ semaphore.Close();
+ }
+ );
+ }
+
+ ///
+ /// Verifies that the semaphore can time out if the resource does not become
+ /// available within the time limit specified by the user
+ ///
+ [Test]
+ public void TestWaitTimeout() {
+ using(Semaphore semaphore = new Semaphore(1)) {
+ Assert.IsTrue(semaphore.WaitOne(1000));
+ Assert.IsFalse(semaphore.WaitOne(0));
+ }
+ }
+
+ ///
+ /// Verifies that the semaphore can time out if the resource does not become
+ /// available within the time limit specified by the user, if the time limit
+ /// is specified using the TimeSpan class
+ ///
+ [Test]
+ public void TestWaitTimeoutWithTimeSpan() {
+ using(Semaphore semaphore = new Semaphore(1)) {
+ Assert.IsTrue(semaphore.WaitOne(TimeSpan.FromSeconds(1)));
+ Assert.IsFalse(semaphore.WaitOne(TimeSpan.FromSeconds(0)));
+ }
+ }
+
+ ///
+ /// Tests whether an exception is thrown if the WaitOne() method is called
+ /// with a time span that is too large for the underlying synchronization API
+ ///
+ [Test]
+ public void TestThrowOnWaitWithTooLargeTimeSpan() {
+ using(Semaphore semaphore = new Semaphore(1)) {
+ Assert.Throws(
+ delegate() {
+ semaphore.WaitOne(TimeSpan.FromMilliseconds(1L << 32));
+ }
+ );
+ }
+ }
+
+ }
+
+} // namespace Nuclex.Support
+
+#endif // UNITTEST
diff --git a/Source/Semaphore.cs b/Source/Semaphore.cs
new file mode 100644
index 0000000..fa355c3
--- /dev/null
+++ b/Source/Semaphore.cs
@@ -0,0 +1,222 @@
+#region CPL License
+/*
+Nuclex Framework
+Copyright (C) 2002-2009 Nuclex Development Labs
+
+This library is free software; you can redistribute it and/or
+modify it under the terms of the IBM Common Public License as
+published by the IBM Corporation; either version 1.0 of the
+License, or (at your option) any later version.
+
+This library is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+IBM Common Public License for more details.
+
+You should have received a copy of the IBM Common Public
+License along with this library
+*/
+#endregion
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+
+namespace Nuclex.Support {
+
+ /// A reverse counting semaphore
+ ///
+ ///
+ /// This semaphore counts in reverse, which means you can Release() the semaphore
+ /// as often as you'd like a thread calling WaitOne() to be let through. You
+ /// can use it in the traditional sense and have any Thread calling WaitOne()
+ /// make sure to call Release() afterwards, or you can, for example, Release() it
+ /// whenever work becomes available and let threads take work from the Semaphore
+ /// by calling WaitOne() alone.
+ ///
+ ///
+ /// Implementation notes (ignore this if you just want to use the Semaphore)
+ ///
+ ///
+ /// We could design a semaphore that uses an auto reset event, where the thread
+ /// that gets to pass immediately sets the event again if the semaphore isn't full
+ /// yet to let another thread pass.
+ ///
+ ///
+ /// However, this would mean that when a semaphore receives a large number of
+ /// wait requests, assuming it would allow, for example, 25 users at once, the
+ /// thread scheduler would see only 1 thread become eligible for execution. Then
+ /// that thread would unlock the next and so on. In short, we wait 25 times
+ /// for the thread scheduler to wake up a thread until all users get through.
+ ///
+ ///
+ /// So we chose a ManualResetEvent, which will wake up more threads than
+ /// neccessary and possibly cause a period of intense competition for getting
+ /// a lock on the resource, but will make the thread scheduler see all threads
+ /// become eligible for execution.
+ ///
+ ///
+ public class Semaphore : WaitHandle {
+
+ /// Initializes a new semaphore
+ public Semaphore() {
+ createEvent();
+ }
+
+ /// Initializes a new semaphore
+ ///
+ /// Number of users that can access the resource at the same time
+ ///
+ public Semaphore(int count) {
+ this.users = -count;
+
+ createEvent();
+ }
+
+ /// Initializes a new semaphore
+ ///
+ /// Initial number of users accessing the resource
+ ///
+ ///
+ /// Maximum numbr of users that can access the resource at the same time
+ ///
+ public Semaphore(int initialCount, int maximumCount)
+ : this() {
+ if(initialCount > maximumCount) {
+ throw new ArgumentOutOfRangeException(
+ "initialCount", "Initial count must not be larger than the maximum count"
+ );
+ }
+
+ this.users = initialCount - maximumCount; // should be negative!
+ createEvent();
+ }
+
+ /// Immediately releases all resources owned by the instance
+ ///
+ /// Whether Dispose() has been called explictly
+ ///
+ protected override void Dispose(bool explicitDisposing) {
+ if(this.manualResetEvent != null) {
+#if XBOX360
+ base.Handle = IntPtr.Zero;
+#else
+ base.SafeWaitHandle = null;
+#endif
+
+ this.manualResetEvent.Close();
+ this.manualResetEvent = null;
+ }
+
+ base.Dispose(explicitDisposing);
+ }
+
+ ///
+ /// Waits for the resource to become available and locks it
+ ///
+ ///
+ /// Number of milliseconds to wait at most before giving up
+ ///
+ ///
+ /// True to exit the synchronization domain for the context before the wait (if
+ /// in a synchronized context), and reacquire it afterward; otherwise, false.
+ ///
+ ///
+ /// True if the resource was available and is now locked, false if
+ /// the timeout has been reached.
+ ///
+ public override bool WaitOne(int millisecondsTimeout, bool exitContext) {
+ for(; ; ) {
+
+ // Lock the resource - even if it is full. We will correct out mistake later
+ // if we overcomitted the resource.
+ int newUsers = Interlocked.Increment(ref this.users);
+
+ // If we got the resource, let the thread pass without further processing.
+ if(newUsers <= 0) {
+ if(newUsers < 0) {
+ this.manualResetEvent.Set();
+ }
+
+ return true;
+ }
+
+ // We overcomitted the resource, count it down again. We know that, at least
+ // moments ago, the resource was busy, so block the event.
+ this.manualResetEvent.Reset();
+ Thread.MemoryBarrier();
+ newUsers = Interlocked.Decrement(ref this.users);
+
+ // Unless we have been preempted by a Release(), we now have to wait for the
+ // resource to become available.
+ if(newUsers >= 0) {
+ if(!this.manualResetEvent.WaitOne(millisecondsTimeout, exitContext)) {
+ return false;
+ }
+ }
+ }
+ }
+
+#if !XBOX360
+ ///
+ /// Waits for the resource to become available and locks it
+ ///
+ ///
+ /// Time span to wait for the lock before giving up
+ ///
+ ///
+ /// True to exit the synchronization domain for the context before the wait (if
+ /// in a synchronized context), and reacquire it afterward; otherwise, false.
+ ///
+ ///
+ /// True if the resource was available and is now locked, false if
+ /// the timeout has been reached.
+ ///
+ public override bool WaitOne(TimeSpan timeout, bool exitContext) {
+ long totalMilliseconds = (long)timeout.TotalMilliseconds;
+ if((totalMilliseconds < -1) || (totalMilliseconds > int.MaxValue)) {
+ throw new ArgumentOutOfRangeException(
+ "timeout", "Timeout must be either -1 or positive and less than 2^31"
+ );
+ }
+
+ return WaitOne((int)totalMilliseconds, exitContext);
+ }
+#endif
+
+ ///
+ /// Releases a lock on the resource. Note that for a reverse counting semaphore,
+ /// it is legal to Release() the resource before before locking it.
+ ///
+ public void Release() {
+
+ // Release one lock on the resource
+ int newUsers = Interlocked.Decrement(ref this.users);
+
+ // Wake up any threads waiting for the resource to become available
+ this.manualResetEvent.Set();
+
+ }
+
+ /// Creates the event used to make threads wait for the resource
+ private void createEvent() {
+ this.manualResetEvent = new ManualResetEvent(false);
+#if XBOX360
+ base.Handle = this.manualResetEvent.Handle;
+#else
+ base.SafeWaitHandle = this.manualResetEvent.SafeWaitHandle;
+#endif
+ }
+
+ /// Event used to make threads wait if the semaphore is full
+ private ManualResetEvent manualResetEvent;
+ /// Number of users currently accessing the resource
+ ///
+ /// Since this is a reverse counting semaphore, it will be negative if
+ /// the resource is available and 0 if the semaphore is full.
+ ///
+ private int users;
+
+ }
+
+} // namespace Nuclex.Support