Used pragmas to avoid warnings in unit tests designed to test obsolete methods; method thread-related classes into a new namespace, 'Threading'
git-svn-id: file:///srv/devel/repo-conversion/nusu@331 d2e56fa2-650e-0410-a79f-9358c0239efd
This commit is contained in:
parent
a934fb155e
commit
e6e0220fb3
7 changed files with 32 additions and 22 deletions
356
Source/Threading/AffineThreadPool.Test.cs
Normal file
356
Source/Threading/AffineThreadPool.Test.cs
Normal file
|
@ -0,0 +1,356 @@
|
|||
#region CPL License
|
||||
/*
|
||||
Nuclex Framework
|
||||
Copyright (C) 2002-2017 Nuclex Development Labs
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the IBM Common Public License as
|
||||
published by the IBM Corporation; either version 1.0 of the
|
||||
License, or (at your option) any later version.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
IBM Common Public License for more details.
|
||||
|
||||
You should have received a copy of the IBM Common Public
|
||||
License along with this library
|
||||
*/
|
||||
#endregion
|
||||
|
||||
#if UNITTEST
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
|
||||
using NUnit.Framework;
|
||||
|
||||
namespace Nuclex.Support.Threading {
|
||||
|
||||
/// <summary>Unit Test for the CPU core-affine thread pool</summary>
|
||||
[TestFixture]
|
||||
internal class AffineThreadPoolTest {
|
||||
|
||||
#region class TestTask
|
||||
|
||||
/// <summary>ThreadPool task that can be used for testing</summary>
|
||||
private class TestTask : IDisposable {
|
||||
|
||||
/// <summary>Initializes a new test task</summary>
|
||||
public TestTask() {
|
||||
this.callbackEvent = new ManualResetEvent(false);
|
||||
}
|
||||
|
||||
/// <summary>Immediately releases all resources owned by the instance</summary>
|
||||
public void Dispose() {
|
||||
if(this.callbackEvent != null) {
|
||||
this.callbackEvent.Close();
|
||||
this.callbackEvent = null;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Callback that can be added to the thread pool as a task</summary>
|
||||
/// <param name="state">User defined state</param>
|
||||
public void Callback(object state) {
|
||||
this.LastCallbackState = state;
|
||||
this.callbackEvent.Set();
|
||||
}
|
||||
|
||||
/// <summary>Event that will be set when the callback is executed</summary>
|
||||
public ManualResetEvent CallbackEvent {
|
||||
get { return this.callbackEvent; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// State parameter that was provide when the callback was called
|
||||
/// </summary>
|
||||
public volatile object LastCallbackState;
|
||||
|
||||
/// <summary>Event that will be set when the callback is invoked</summary>
|
||||
private ManualResetEvent callbackEvent;
|
||||
|
||||
}
|
||||
|
||||
#endregion // class TestTask
|
||||
|
||||
#region class WaitTask
|
||||
|
||||
/// <summary>ThreadPool task that can be used for testing</summary>
|
||||
private class WaitTask : IDisposable {
|
||||
|
||||
/// <summary>Initializes a new test task</summary>
|
||||
public WaitTask() {
|
||||
this.startEvent = new ManualResetEvent(false);
|
||||
this.finishEvent = new ManualResetEvent(false);
|
||||
this.waitEvent = new ManualResetEvent(false);
|
||||
}
|
||||
|
||||
/// <summary>Immediately releases all resources owned by the instance</summary>
|
||||
public void Dispose() {
|
||||
if(this.waitEvent != null) {
|
||||
this.waitEvent.Close();
|
||||
this.waitEvent = null;
|
||||
}
|
||||
if(this.finishEvent != null) {
|
||||
this.finishEvent.Close();
|
||||
this.finishEvent = null;
|
||||
}
|
||||
if(this.startEvent != null) {
|
||||
this.startEvent.Close();
|
||||
this.startEvent = null;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Callback that can be added to the thread pool as a task</summary>
|
||||
/// <param name="state">User defined state</param>
|
||||
public void Callback(object state) {
|
||||
this.LastCallbackState = state;
|
||||
this.startEvent.Set();
|
||||
this.waitEvent.WaitOne();
|
||||
this.finishEvent.Set();
|
||||
}
|
||||
|
||||
/// <summary>Event that will be set when the callback has started</summary>
|
||||
public ManualResetEvent StartEvent {
|
||||
get { return this.startEvent; }
|
||||
}
|
||||
|
||||
/// <summary>Event that will be set when the callback has finished</summary>
|
||||
public ManualResetEvent FinishEvent {
|
||||
get { return this.finishEvent; }
|
||||
}
|
||||
|
||||
/// <summary>Event that blocks the callback</summary>
|
||||
public ManualResetEvent WaitEvent {
|
||||
get { return this.waitEvent; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// State parameter that was provide when the callback was called
|
||||
/// </summary>
|
||||
public volatile object LastCallbackState;
|
||||
|
||||
/// <summary>Event that will be set when the callback has started</summary>
|
||||
private ManualResetEvent startEvent;
|
||||
/// <summary>Event that will be set when the callback has finished</summary>
|
||||
private ManualResetEvent finishEvent;
|
||||
/// <summary>Event used to block the callback</summary>
|
||||
private ManualResetEvent waitEvent;
|
||||
|
||||
}
|
||||
|
||||
#endregion // class WaitTask
|
||||
|
||||
#if false
|
||||
#region class ThrowingDisposable
|
||||
|
||||
/// <summary>Throws an exception when it is disposed</summary>
|
||||
private class ThrowingDisposable : IDisposable {
|
||||
|
||||
/// <summary>Immediately releases all resources owned by the instance</summary>
|
||||
public void Dispose() {
|
||||
throw new ArithmeticException("Simulated exception for unit testing");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endregion // class ThrowingDisposable
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that the Thread Pool's default assertion handler is working
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void TestDefaultAssertionHandler() {
|
||||
|
||||
// We can't test a failing assertion because our tests need to run
|
||||
// unattended on a build server without blocking for user input.
|
||||
AffineThreadPool.DefaultAssertionHandler(
|
||||
true, "Unit test", "This should not fail"
|
||||
);
|
||||
|
||||
}
|
||||
#endif
|
||||
|
||||
/// <summary>Tests whether the QueueUserWorkItem() method is working</summary>
|
||||
[Test]
|
||||
public void TestQueueUserWorkItem() {
|
||||
using(TestTask task = new TestTask()) {
|
||||
AffineThreadPool.QueueUserWorkItem(task.Callback);
|
||||
Assert.IsTrue(task.CallbackEvent.WaitOne(1000));
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that the QueueUserWorkItem() method is passing the state parameter
|
||||
/// on to the callback
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void TestQueueUserWorkItemWithState() {
|
||||
using(TestTask task = new TestTask()) {
|
||||
object state = new object();
|
||||
|
||||
AffineThreadPool.QueueUserWorkItem(task.Callback, state);
|
||||
|
||||
Assert.IsTrue(task.CallbackEvent.WaitOne(1000));
|
||||
Assert.AreSame(state, task.LastCallbackState);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Tests whether the thread pool can handle an exception from a user work item
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void TestExceptionFromUserWorkItem() {
|
||||
using(ManualResetEvent exceptionEvent = new ManualResetEvent(false)) {
|
||||
AffineThreadPool.ExceptionDelegate oldExceptionHandler =
|
||||
AffineThreadPool.ExceptionHandler;
|
||||
|
||||
AffineThreadPool.ExceptionHandler = delegate(Exception exception) {
|
||||
exceptionEvent.Set();
|
||||
};
|
||||
try {
|
||||
AffineThreadPool.QueueUserWorkItem(
|
||||
delegate(object state) { throw new KeyNotFoundException(); }
|
||||
);
|
||||
Assert.IsTrue(exceptionEvent.WaitOne(1000));
|
||||
}
|
||||
finally {
|
||||
AffineThreadPool.ExceptionHandler = oldExceptionHandler;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that the affine thread pool's maximum thread count equals
|
||||
/// the number of logical processors in the system
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void TestMaxThreadsProperty() {
|
||||
Assert.AreEqual(Environment.ProcessorCount, AffineThreadPool.MaxThreads);
|
||||
}
|
||||
|
||||
#if WINDOWS
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that the ProcessThread instance for a system thread id can
|
||||
/// be determined using the GetProcessThread() method
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void CanGetProcessThreadForManagedThread() {
|
||||
if(Environment.OSVersion.Platform == PlatformID.Win32NT) {
|
||||
Thread.BeginThreadAffinity();
|
||||
try {
|
||||
int threadId = AffineThreadPool.GetCurrentThreadId();
|
||||
|
||||
Assert.IsNotNull(AffineThreadPool.GetProcessThread(threadId));
|
||||
Assert.IsNull(AffineThreadPool.GetProcessThread(0));
|
||||
}
|
||||
finally {
|
||||
Thread.EndThreadAffinity();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif // WINDOWS
|
||||
|
||||
/// <summary>
|
||||
/// Tests whether the afine thread pool's default exception handler works
|
||||
/// as expected
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void TestDefaultExceptionHandler() {
|
||||
Assert.Throws<ArrayTypeMismatchException>(
|
||||
delegate() {
|
||||
AffineThreadPool.ExceptionHandler(new ArrayTypeMismatchException("Test"));
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that the waiting work items count and active thread count are
|
||||
/// updated by the thread pool.
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void TestWaitingWorkItemsProperty() {
|
||||
int eventCount = AffineThreadPool.Processors;
|
||||
WaitTask[] tasks = new WaitTask[eventCount];
|
||||
|
||||
int createdTasks = 0;
|
||||
try {
|
||||
// CHECK: Is there danger that the thread pool still has not finished
|
||||
// queued items for other unit tests, thereby failing to meet
|
||||
// our expected task counts?
|
||||
|
||||
// Create the tasks, counting up the created task counter. If an exception
|
||||
// occurs, we will roll back from there.
|
||||
for(createdTasks = 0; createdTasks < eventCount; ++createdTasks) {
|
||||
tasks[createdTasks] = new WaitTask();
|
||||
}
|
||||
|
||||
// Schedule the blocking tasks in the thread pool so it will not be able
|
||||
// to process the next task we add to the queue
|
||||
for(int index = 0; index < eventCount; ++index) {
|
||||
AffineThreadPool.QueueUserWorkItem(tasks[index].Callback);
|
||||
}
|
||||
|
||||
// Wait for the tasks to start so they aren't preempted by the tasks we're
|
||||
// going to add (which would finish immediately). The affine thread pool
|
||||
// works on a first come first serve basis, but we don't want to rely on this
|
||||
// implementation detail in the unit test.
|
||||
for(int index = 0; index < eventCount; ++index) {
|
||||
Assert.IsTrue(
|
||||
tasks[index].StartEvent.WaitOne(10000),
|
||||
"Task " + index.ToString() + " was started"
|
||||
);
|
||||
}
|
||||
|
||||
// All Thread should now be active and no work items should be waiting
|
||||
Assert.AreEqual(
|
||||
createdTasks, AffineThreadPool.ActiveThreads,
|
||||
"ActiveThreads property equals number of tasks"
|
||||
);
|
||||
Assert.AreEqual(
|
||||
0, AffineThreadPool.WaitingWorkItems,
|
||||
"No waiting work items are in the queue"
|
||||
);
|
||||
|
||||
// Add a task to the queue and make sure the waiting work item count goes up
|
||||
AffineThreadPool.QueueUserWorkItem(delegate(object state) { });
|
||||
Assert.AreEqual(
|
||||
1, AffineThreadPool.WaitingWorkItems,
|
||||
"Added work item is waiting in the queue"
|
||||
);
|
||||
|
||||
// The same again. Now we should have 2 work items sitting in the queue
|
||||
AffineThreadPool.QueueUserWorkItem(delegate(object state) { });
|
||||
Assert.AreEqual(
|
||||
2, AffineThreadPool.WaitingWorkItems,
|
||||
"Both added work items are waiting in the queue"
|
||||
);
|
||||
|
||||
// Let the WaitTasks finish so we're not blocking the thread pool any longer
|
||||
for(int index = 0; index < eventCount; ++index) {
|
||||
tasks[index].WaitEvent.Set();
|
||||
}
|
||||
|
||||
// Wait for the tasks to end before we get rid of them
|
||||
for(int index = 0; index < eventCount; ++index) {
|
||||
Assert.IsTrue(
|
||||
tasks[index].FinishEvent.WaitOne(1000),
|
||||
"Task " + index.ToString() + " has finished"
|
||||
);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
for(--createdTasks; createdTasks >= 0; --createdTasks) {
|
||||
tasks[createdTasks].Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
} // namespace Nuclex.Support.Threading
|
||||
|
||||
#endif // UNITTEST
|
296
Source/Threading/AffineThreadPool.cs
Normal file
296
Source/Threading/AffineThreadPool.cs
Normal file
|
@ -0,0 +1,296 @@
|
|||
#region CPL License
|
||||
/*
|
||||
Nuclex Framework
|
||||
Copyright (C) 2002-2017 Nuclex Development Labs
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the IBM Common Public License as
|
||||
published by the IBM Corporation; either version 1.0 of the
|
||||
License, or (at your option) any later version.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
IBM Common Public License for more details.
|
||||
|
||||
You should have received a copy of the IBM Common Public
|
||||
License along with this library
|
||||
*/
|
||||
#endregion
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Diagnostics;
|
||||
using System.Runtime.InteropServices;
|
||||
using System.Threading;
|
||||
|
||||
namespace Nuclex.Support.Threading {
|
||||
|
||||
/// <summary>Alternative Thread pool providing one thread for each core</summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Unlike the normal thread pool, the affine thread pool provides only as many
|
||||
/// threads as there are CPU cores available on the current platform. This makes
|
||||
/// it more suitable for tasks you want to spread across all available cpu cores
|
||||
/// explicitly.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// However, it's not a good match if you want to run blocking or waiting tasks
|
||||
/// inside the thread pool because the limited available threads will become
|
||||
/// congested quickly. It is encouraged to use this class in parallel with
|
||||
/// .NET's own thread pool, putting tasks that can block into the .NET thread
|
||||
/// pool and tasks that perform pure processing into the affine thread pool.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Implementation based on original code provided by Stephen Toub
|
||||
/// (stoub at microsoft ignorethis dot com)
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public static class AffineThreadPool {
|
||||
|
||||
/// <summary>Number of CPU cores available on the system</summary>
|
||||
public static readonly int Processors = Environment.ProcessorCount;
|
||||
|
||||
/// <summary>Delegate used by the thread pool to report unhandled exceptions</summary>
|
||||
/// <param name="exception">Exception that has not been handled</param>
|
||||
public delegate void ExceptionDelegate(Exception exception);
|
||||
|
||||
#region class UserWorkItem
|
||||
|
||||
/// <summary>Used to hold a callback delegate and the state for that delegate.</summary>
|
||||
private struct UserWorkItem {
|
||||
|
||||
/// <summary>Initialize the callback holding object.</summary>
|
||||
/// <param name="callback">Callback delegate for the callback.</param>
|
||||
/// <param name="state">State with which to call the callback delegate.</param>
|
||||
public UserWorkItem(WaitCallback callback, object state) {
|
||||
this.Callback = callback;
|
||||
this.State = state;
|
||||
}
|
||||
|
||||
/// <summary>Callback delegate for the callback.</summary>
|
||||
public WaitCallback Callback;
|
||||
/// <summary>State with which to call the callback delegate.</summary>
|
||||
public object State;
|
||||
|
||||
}
|
||||
|
||||
#endregion // class UserWorkItem
|
||||
|
||||
/// <summary>Initializes the thread pool</summary>
|
||||
static AffineThreadPool() {
|
||||
|
||||
// Create our thread stores; we handle synchronization ourself
|
||||
// as we may run into situations where multiple operations need to be atomic.
|
||||
// We keep track of the threads we've created just for good measure; not actually
|
||||
// needed for any core functionality.
|
||||
workAvailable = new System.Threading.Semaphore(0, int.MaxValue);
|
||||
userWorkItems = new Queue<UserWorkItem>(Processors * 4);
|
||||
workerThreads = new List<Thread>(Processors);
|
||||
inUseThreads = 0;
|
||||
|
||||
// We can use all cores on a PC, starting from index 1
|
||||
hardwareThreads = new Queue<int>(Processors);
|
||||
for(int core = Processors; core >= 1; --core) {
|
||||
hardwareThreads.Enqueue(core);
|
||||
}
|
||||
|
||||
// Create all of the worker threads
|
||||
for(int index = 0; index < Processors; index++) {
|
||||
|
||||
// Create a new thread and add it to the list of threads.
|
||||
Thread newThread = new Thread(new ThreadStart(ProcessQueuedItems));
|
||||
workerThreads.Add(newThread);
|
||||
|
||||
// Configure the new thread and start it
|
||||
newThread.Name = "Nuclex.Support.AffineThreadPool Thread #" + index.ToString();
|
||||
newThread.IsBackground = true;
|
||||
newThread.Start();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// <summary>Queues a user work item to the thread pool</summary>
|
||||
/// <param name="callback">
|
||||
/// A WaitCallback representing the delegate to invoke when a thread in the
|
||||
/// thread pool picks up the work item
|
||||
/// </param>
|
||||
public static void QueueUserWorkItem(WaitCallback callback) {
|
||||
|
||||
// Queue the delegate with no state
|
||||
QueueUserWorkItem(callback, null);
|
||||
|
||||
}
|
||||
|
||||
/// <summary>Queues a user work item to the thread pool.</summary>
|
||||
/// <param name="callback">
|
||||
/// A WaitCallback representing the delegate to invoke when a thread in the
|
||||
/// thread pool picks up the work item
|
||||
/// </param>
|
||||
/// <param name="state">
|
||||
/// The object that is passed to the delegate when serviced from the thread pool
|
||||
/// </param>
|
||||
public static void QueueUserWorkItem(WaitCallback callback, object state) {
|
||||
|
||||
// Create a waiting callback that contains the delegate and its state.
|
||||
// Add it to the processing queue, and signal that data is waiting.
|
||||
UserWorkItem waiting = new UserWorkItem(callback, state);
|
||||
lock(userWorkItems) {
|
||||
userWorkItems.Enqueue(waiting);
|
||||
}
|
||||
|
||||
// Wake up one of the worker threads so this task will be processed
|
||||
workAvailable.Release();
|
||||
|
||||
}
|
||||
|
||||
/// <summary>Gets the number of threads at the disposal of the thread pool</summary>
|
||||
public static int MaxThreads { get { return Processors; } }
|
||||
|
||||
/// <summary>Gets the number of currently active threads in the thread pool</summary>
|
||||
public static int ActiveThreads { get { return inUseThreads; } }
|
||||
|
||||
/// <summary>
|
||||
/// Gets the number of callback delegates currently waiting in the thread pool
|
||||
/// </summary>
|
||||
public static int WaitingWorkItems {
|
||||
get {
|
||||
lock(userWorkItems) {
|
||||
return userWorkItems.Count;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Default handler used to respond to unhandled exceptions in ThreadPool threads
|
||||
/// </summary>
|
||||
/// <param name="exception">Exception that has occurred</param>
|
||||
internal static void DefaultExceptionHandler(Exception exception) {
|
||||
throw exception;
|
||||
}
|
||||
|
||||
#if WINDOWS
|
||||
/// <summary>Retrieves the ProcessThread for the calling thread</summary>
|
||||
/// <returns>The ProcessThread for the calling thread</returns>
|
||||
internal static ProcessThread GetProcessThread(int threadId) {
|
||||
ProcessThreadCollection threads = Process.GetCurrentProcess().Threads;
|
||||
for(int index = 0; index < threads.Count; ++index) {
|
||||
if(threads[index].Id == threadId) {
|
||||
return threads[index];
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
#endif
|
||||
|
||||
/// <summary>A thread worker function that processes items from the work queue</summary>
|
||||
private static void ProcessQueuedItems() {
|
||||
|
||||
// Get the system/hardware thread index this thread is going to use. We hope that
|
||||
// the threads more or less start after each other, but there is no guarantee that
|
||||
// tasks will be handled by the CPU cores in the order the queue was filled with.
|
||||
// This could be added, though, by using a WaitHandle so the thread creator could
|
||||
// wait for each thread to take one entry out of the queue.
|
||||
int hardwareThreadIndex;
|
||||
lock(hardwareThreads) {
|
||||
hardwareThreadIndex = hardwareThreads.Dequeue();
|
||||
}
|
||||
|
||||
#if WINDOWS
|
||||
if(Environment.OSVersion.Platform == PlatformID.Win32NT) {
|
||||
// Prevent this managed thread from impersonating another system thread.
|
||||
// In .NET, managed threads can supposedly be moved to different system threads
|
||||
// and, more worryingly, even fibers. This should make sure we're sitting on
|
||||
// a normal system thread and stay with that thread during our lifetime.
|
||||
Thread.BeginThreadAffinity();
|
||||
|
||||
// Assign the ideal processor, but don't force it. It's not a good idea to
|
||||
// circumvent the thread scheduler of a desktop machine, so we try to play nice.
|
||||
int threadId = GetCurrentThreadId();
|
||||
ProcessThread thread = GetProcessThread(threadId);
|
||||
if(thread != null) {
|
||||
thread.IdealProcessor = hardwareThreadIndex;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
// Keep processing tasks indefinitely
|
||||
for(; ; ) {
|
||||
UserWorkItem workItem = getNextWorkItem();
|
||||
|
||||
// Execute the work item we just picked up. Make sure to accurately
|
||||
// record how many callbacks are currently executing.
|
||||
Interlocked.Increment(ref inUseThreads);
|
||||
try {
|
||||
workItem.Callback(workItem.State);
|
||||
}
|
||||
catch(Exception exception) { // Make sure we don't throw here.
|
||||
ExceptionDelegate exceptionHandler = ExceptionHandler;
|
||||
if(exceptionHandler != null) {
|
||||
exceptionHandler(exception);
|
||||
}
|
||||
}
|
||||
finally {
|
||||
Interlocked.Decrement(ref inUseThreads);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Obtains the next work item from the queue</summary>
|
||||
/// <returns>The next work item in the queue</returns>
|
||||
/// <remarks>
|
||||
/// If the queue is empty, the call will block until an item is added to
|
||||
/// the queue and the calling thread was the one picking it up.
|
||||
/// </remarks>
|
||||
private static UserWorkItem getNextWorkItem() {
|
||||
|
||||
// Get the next item in the queue. If there is nothing there, go to sleep
|
||||
// for a while until we're woken up when a callback is waiting.
|
||||
for(; ; ) {
|
||||
|
||||
// Try to get the next callback available. We need to lock on the
|
||||
// queue in order to make our count check and retrieval atomic.
|
||||
lock(userWorkItems) {
|
||||
if(userWorkItems.Count > 0) {
|
||||
return userWorkItems.Dequeue();
|
||||
}
|
||||
}
|
||||
|
||||
// If we can't get one, go to sleep. The semaphore blocks until work
|
||||
// becomes available (then acting like an AutoResetEvent that counts
|
||||
// how often it has been triggered and letting that number of threads
|
||||
// pass through.)
|
||||
workAvailable.WaitOne();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/// <summary>Delegate used to handle assertion checks in the code</summary>
|
||||
public static volatile ExceptionDelegate ExceptionHandler = DefaultExceptionHandler;
|
||||
|
||||
#if WINDOWS
|
||||
/// <summary>Retrieves the calling thread's thread id</summary>
|
||||
/// <returns>The thread is of the calling thread</returns>
|
||||
[DllImport("kernel32.dll")]
|
||||
internal static extern int GetCurrentThreadId();
|
||||
#endif
|
||||
|
||||
/// <summary>Available hardware threads the thread pool threads pick from</summary>
|
||||
private static Queue<int> hardwareThreads;
|
||||
/// <summary>Queue of all the callbacks waiting to be executed.</summary>
|
||||
private static Queue<UserWorkItem> userWorkItems;
|
||||
/// <summary>
|
||||
/// Used to let the threads in the thread pool wait for new work to appear.
|
||||
/// </summary>
|
||||
private static System.Threading.Semaphore workAvailable;
|
||||
/// <summary>List of all worker threads at the disposal of the thread pool.</summary>
|
||||
private static List<Thread> workerThreads;
|
||||
/// <summary>Number of threads currently active.</summary>
|
||||
private static int inUseThreads;
|
||||
|
||||
}
|
||||
|
||||
} // namespace Nuclex.Support.Threading
|
256
Source/Threading/ParallelBackgroundWorker.Test.cs
Normal file
256
Source/Threading/ParallelBackgroundWorker.Test.cs
Normal file
|
@ -0,0 +1,256 @@
|
|||
#region CPL License
|
||||
/*
|
||||
Nuclex Framework
|
||||
Copyright (C) 2002-2017 Nuclex Development Labs
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the IBM Common Public License as
|
||||
published by the IBM Corporation; either version 1.0 of the
|
||||
License, or (at your option) any later version.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
IBM Common Public License for more details.
|
||||
|
||||
You should have received a copy of the IBM Common Public
|
||||
License along with this library
|
||||
*/
|
||||
#endregion
|
||||
|
||||
#if !NO_CONCURRENT_COLLECTIONS
|
||||
|
||||
using System;
|
||||
using System.Threading;
|
||||
using System.Collections.Generic;
|
||||
|
||||
#if UNITTEST
|
||||
|
||||
using NUnit.Framework;
|
||||
|
||||
namespace Nuclex.Support.Threading {
|
||||
|
||||
/// <summary>Unit Test for the parallel background worker class</summary>
|
||||
[TestFixture]
|
||||
internal class ParallelBackgroundWorkerTest {
|
||||
|
||||
#region class TestWorker
|
||||
|
||||
/// <summary>Implementation of a background worker used for unit testing</summary>
|
||||
private class TestWorker : ParallelBackgroundWorker<object> {
|
||||
|
||||
/// <summary>Initializes a new parallel background worker with unlimited threads</summary>
|
||||
public TestWorker() : base() { }
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new parallel background worker running the specified number
|
||||
/// of tasks in parallel
|
||||
/// </summary>
|
||||
/// <param name="threadCount">
|
||||
/// Number of tasks to run in parallel (if positive) or number of CPU cores to leave
|
||||
/// unused (if negative).
|
||||
/// </param>
|
||||
/// <remarks>
|
||||
/// If a negative number of threads is used, at least one thread will be always
|
||||
/// be created, so specifying -2 on a single-core system will still occupy
|
||||
/// the only core.
|
||||
/// </remarks>
|
||||
public TestWorker(int threadCount) : base(threadCount) { }
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new parallel background worker that uses the specified name for
|
||||
/// its worker threads.
|
||||
/// </summary>
|
||||
/// <param name="name">Name that will be assigned to the worker threads</param>
|
||||
public TestWorker(string name) : base(name) { }
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new parallel background worker that uses the specified name for
|
||||
/// its worker threads and running the specified number of tasks in parallel.
|
||||
/// </summary>
|
||||
/// <param name="name">Name that will be assigned to the worker threads</param>
|
||||
/// <param name="threadCount">
|
||||
/// Number of tasks to run in parallel (if positive) or number of CPU cores to leave
|
||||
/// unused (if negative).
|
||||
/// </param>
|
||||
/// <remarks>
|
||||
/// If a negative number of threads is used, at least one thread will be always
|
||||
/// be created, so specifying -2 on a single-core system will still occupy
|
||||
/// the only core.
|
||||
/// </remarks>
|
||||
public TestWorker(string name, int threadCount) : base(name, threadCount) { }
|
||||
|
||||
/// <summary>Called in a thread to execute a single task</summary>
|
||||
/// <param name="task">Task that should be executed</param>
|
||||
/// <param name="cancellationToken">
|
||||
/// Cancellation token through which the method can be signalled to cancel
|
||||
/// </param>
|
||||
protected override void Run(object task, CancellationToken cancellationToken) {
|
||||
if(this.ThrowException) {
|
||||
throw new Exception("Something went wrong");
|
||||
}
|
||||
|
||||
if(this.WaitEvent != null) {
|
||||
this.WaitEvent.WaitOne();
|
||||
}
|
||||
|
||||
this.WasCancelled = cancellationToken.IsCancellationRequested;
|
||||
|
||||
if(this.Tasks != null) {
|
||||
lock(this.Tasks) {
|
||||
this.Tasks.Add(task);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Whether the work tasks should throw exceptions</summary>
|
||||
public bool ThrowException;
|
||||
/// <summary>Event that can be used to stop work tasks from completing</summary>
|
||||
public ManualResetEvent WaitEvent;
|
||||
|
||||
/// <summary>Set by work tasks if they have been cancelled</summary>
|
||||
public bool WasCancelled;
|
||||
/// <summary>Work tasks that have reached execution</summary>
|
||||
public ICollection<object> Tasks;
|
||||
|
||||
}
|
||||
|
||||
#endregion // class TestWorker
|
||||
|
||||
/// <summary>Verifies that the background worker has a default constructor</summary>
|
||||
[Test]
|
||||
public void CanBeDefaultConstructed() {
|
||||
using(new TestWorker()) { }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a background worker can be constructed that uses a fixed number
|
||||
/// of threads
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void CanUseFixedNumberOfThreads() {
|
||||
using(new TestWorker(4)) { }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a background worker can be constructed that leaves free a fixed
|
||||
/// number of CPU cores
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void CanPreserveFixedNumberOfCores() {
|
||||
using(new TestWorker(-2)) { }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a background worker can be constructed using a specific name
|
||||
/// for its worker threads
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void CanUseNamedThreads() {
|
||||
using(new TestWorker("Test Task Thread")) { }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a background worker can be constructed that uses a fixed number
|
||||
/// of threads using a specific name
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void CanUseFixedNumberOfNamedThreads() {
|
||||
using(new TestWorker("Test Task Thread", 4)) { }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that a background worker can be constructed that leaves free a fixed
|
||||
/// number of CPU cores and uses a specific name for its worker threads.
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void CanPreserveFixedNumberOfCoresAndUseNamedThreads() {
|
||||
using(new TestWorker("Test Task Thread", -2)) { }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that exceptions happening inside the tasks are collected and re-thrown
|
||||
/// in the Join() method.
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void ExceptionsAreReThrownInJoin() {
|
||||
using(var testWorker = new TestWorker()) {
|
||||
testWorker.ThrowException = true;
|
||||
testWorker.AddTask(new object());
|
||||
testWorker.AddTask(new object());
|
||||
|
||||
Assert.Throws<AggregateException>(
|
||||
() => {
|
||||
testWorker.Join();
|
||||
}
|
||||
);
|
||||
|
||||
try {
|
||||
testWorker.Join();
|
||||
Assert.Fail(
|
||||
"Calling ParallelBackgroundWorker.Join() multiple times should re-throw " +
|
||||
"exceptions multiple times"
|
||||
);
|
||||
}
|
||||
catch(AggregateException aggregateException) {
|
||||
Assert.AreEqual(2, aggregateException.InnerExceptions.Count);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Verifies that tasks can be cancelled while they are running
|
||||
/// </summary>
|
||||
[Test]
|
||||
public void TasksCanBeCancelled() {
|
||||
using(var waitEvent = new ManualResetEvent(false)) {
|
||||
using(var testWorker = new TestWorker()) {
|
||||
testWorker.WaitEvent = waitEvent;
|
||||
|
||||
testWorker.AddTask(new object());
|
||||
testWorker.CancelRunningTasks();
|
||||
|
||||
waitEvent.Set();
|
||||
|
||||
Assert.IsTrue(testWorker.Wait(1000));
|
||||
|
||||
Assert.IsTrue(testWorker.WasCancelled);
|
||||
}
|
||||
} // disposes waitEvent
|
||||
}
|
||||
|
||||
/// <summary>Verifies that calling Join() waits for all queued tasks</summary>
|
||||
[Test]
|
||||
public void JoinWaitsForQueuedTasks() {
|
||||
var tasks = new List<object>(100);
|
||||
for(int index = 0; index < 100; ++index) {
|
||||
tasks.Add(new object());
|
||||
}
|
||||
|
||||
using(var waitEvent = new ManualResetEvent(false)) {
|
||||
using(var testWorker = new TestWorker(2)) {
|
||||
testWorker.WaitEvent = waitEvent;
|
||||
testWorker.Tasks = new List<object>();
|
||||
for(int index = 0; index < 100; ++index) {
|
||||
testWorker.AddTask(tasks[index]);
|
||||
}
|
||||
|
||||
CollectionAssert.IsEmpty(testWorker.Tasks);
|
||||
|
||||
waitEvent.Set();
|
||||
testWorker.Join();
|
||||
|
||||
lock(testWorker.Tasks) {
|
||||
CollectionAssert.AreEquivalent(tasks, testWorker.Tasks);
|
||||
}
|
||||
}
|
||||
} // disposes waitEvent
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
} // namespace Nuclex.Support.Threading
|
||||
|
||||
#endif // UNITTEST
|
||||
|
||||
#endif // !NO_CONCURRENT_COLLECTIONS
|
344
Source/Threading/ParallelBackgroundWorker.cs
Normal file
344
Source/Threading/ParallelBackgroundWorker.cs
Normal file
|
@ -0,0 +1,344 @@
|
|||
#region CPL License
|
||||
/*
|
||||
Nuclex Framework
|
||||
Copyright (C) 2002-2017 Nuclex Development Labs
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the IBM Common Public License as
|
||||
published by the IBM Corporation; either version 1.0 of the
|
||||
License, or (at your option) any later version.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
IBM Common Public License for more details.
|
||||
|
||||
You should have received a copy of the IBM Common Public
|
||||
License along with this library
|
||||
*/
|
||||
#endregion
|
||||
|
||||
#if !NO_CONCURRENT_COLLECTIONS
|
||||
|
||||
using System;
|
||||
using System.Collections.Concurrent;
|
||||
using System.Collections.Generic;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace Nuclex.Support.Threading {
|
||||
|
||||
/// <summary>Processes tasks in parallel using many threads</summary>
|
||||
/// <typeparam name="TTask">Type of tasks the class will process</typeparam>
|
||||
public abstract class ParallelBackgroundWorker<TTask> : IDisposable {
|
||||
|
||||
/// <summary>Number of CPU cores available on the system</summary>
|
||||
public static readonly int ProcessorCount = Environment.ProcessorCount;
|
||||
|
||||
/// <summary>
|
||||
/// Timeout after which Dispose() will stop waiting for unfinished tasks and
|
||||
/// free resources anyway
|
||||
/// </summary>
|
||||
private static readonly int DefaultDisposeTimeoutMilliseconds = 1500; // milliseconds
|
||||
|
||||
/// <summary>Initializes a new parallel background worker with unlimited threads</summary>
|
||||
public ParallelBackgroundWorker() : this(int.MaxValue) { }
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new parallel background worker running the specified number
|
||||
/// of tasks in parallel
|
||||
/// </summary>
|
||||
/// <param name="threadCount">
|
||||
/// Number of tasks to run in parallel (if positive) or number of CPU cores to leave
|
||||
/// unused (if negative).
|
||||
/// </param>
|
||||
/// <remarks>
|
||||
/// If a negative number of threads is used, at least one thread will be always
|
||||
/// be created, so specifying -2 on a single-core system will still occupy
|
||||
/// the only core.
|
||||
/// </remarks>
|
||||
public ParallelBackgroundWorker(int threadCount) {
|
||||
if(threadCount > 0) {
|
||||
this.threadCount = threadCount;
|
||||
} else {
|
||||
threadCount = Math.Max(1, ProcessorCount + threadCount);
|
||||
}
|
||||
|
||||
this.queueSynchronizationRoot = new object();
|
||||
this.runQueuedTasksInThreadDelegate = new Action(runQueuedTasksInThread);
|
||||
this.tasks = new Queue<TTask>();
|
||||
this.threadTerminatedEvent = new AutoResetEvent(false);
|
||||
this.cancellationTokenSource = new CancellationTokenSource();
|
||||
this.exceptions = new ConcurrentBag<Exception>();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new parallel background worker that uses the specified name for
|
||||
/// its worker threads.
|
||||
/// </summary>
|
||||
/// <param name="name">Name that will be assigned to the worker threads</param>
|
||||
public ParallelBackgroundWorker(string name) :
|
||||
this(int.MaxValue) {
|
||||
threadName = name;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Initializes a new parallel background worker that uses the specified name for
|
||||
/// its worker threads and running the specified number of tasks in parallel.
|
||||
/// </summary>
|
||||
/// <param name="name">Name that will be assigned to the worker threads</param>
|
||||
/// <param name="threadCount">
|
||||
/// Number of tasks to run in parallel (if positive) or number of CPU cores to leave
|
||||
/// unused (if negative).
|
||||
/// </param>
|
||||
/// <remarks>
|
||||
/// If a negative number of threads is used, at least one thread will be always
|
||||
/// be created, so specifying -2 on a single-core system will still occupy
|
||||
/// the only core.
|
||||
/// </remarks>
|
||||
public ParallelBackgroundWorker(string name, int threadCount) :
|
||||
this(threadCount) {
|
||||
threadName = name;
|
||||
}
|
||||
|
||||
/// <summary>Immediately releases all resources owned by the instance</summary>
|
||||
public void Dispose() {
|
||||
if(this.threadTerminatedEvent != null) {
|
||||
CancelPendingTasks();
|
||||
CancelRunningTasks();
|
||||
|
||||
Wait(DefaultDisposeTimeoutMilliseconds);
|
||||
|
||||
this.threadTerminatedEvent.Dispose();
|
||||
this.threadTerminatedEvent = null;
|
||||
}
|
||||
lock(this.queueSynchronizationRoot) {
|
||||
if(this.cancellationTokenSource != null) {
|
||||
this.cancellationTokenSource.Dispose();
|
||||
this.cancellationTokenSource = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Adds a task for processing by the background worker threads</summary>
|
||||
/// <param name="task">Task that will be processed in the background</param>
|
||||
public void AddTask(TTask task) {
|
||||
if(this.cancellationTokenSource.IsCancellationRequested) {
|
||||
return;
|
||||
}
|
||||
|
||||
bool needNewThread;
|
||||
|
||||
lock(this.queueSynchronizationRoot) {
|
||||
this.tasks.Enqueue(task);
|
||||
|
||||
needNewThread = (this.runningThreadCount < this.threadCount);
|
||||
if(needNewThread) {
|
||||
++this.runningThreadCount;
|
||||
}
|
||||
}
|
||||
|
||||
if(needNewThread) {
|
||||
Task newThread = new Task(
|
||||
this.runQueuedTasksInThreadDelegate,
|
||||
// this.cancellationTokenSource.Token, // DO NOT PASS THIS!
|
||||
// Passing the cancellation token makes tasks that have been queued but
|
||||
// not started yet cease to execute at all - meaning our runningThreadCount
|
||||
// goes out of sync and Dispose() / Wait() / Join() sit around endlessly!
|
||||
TaskCreationOptions.LongRunning
|
||||
);
|
||||
newThread.Start();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Cancels all tasks that are currently executing</summary>
|
||||
/// <remarks>
|
||||
/// It is valid to call this method after Dispose()
|
||||
/// </remarks>
|
||||
public void CancelRunningTasks() {
|
||||
lock(this.queueSynchronizationRoot) {
|
||||
if(this.cancellationTokenSource != null) {
|
||||
this.cancellationTokenSource.Cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Cancels all queued tasks waiting to be executed</summary>
|
||||
/// <remarks>
|
||||
/// It is valid to call this method after Dispose()
|
||||
/// </remarks>
|
||||
public void CancelPendingTasks() {
|
||||
lock(this.queueSynchronizationRoot) {
|
||||
this.tasks.Clear();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Waits until all executing and queued tasks have been processed and throws an
|
||||
/// exception if any errors have occurred
|
||||
/// </summary>
|
||||
public void Join() {
|
||||
while(Thread.VolatileRead(ref this.runningThreadCount) > 0) {
|
||||
this.threadTerminatedEvent.WaitOne();
|
||||
}
|
||||
|
||||
if(this.exceptions.Count > 0) {
|
||||
throw new AggregateException(this.exceptions);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Waits until all executing and queued tasks have been processed or
|
||||
/// the timeout is reached
|
||||
/// </summary>
|
||||
/// <param name="timeoutMilliseconds">Milliseconds after which the wait times out</param>
|
||||
/// <returns>
|
||||
/// True if all tasks have been processed, false if the timeout was reached
|
||||
/// </returns>
|
||||
public bool Wait(int timeoutMilliseconds) {
|
||||
|
||||
// Wait until the task queue has become empty
|
||||
while(queuedTaskCount > 0) {
|
||||
if(this.threadTerminatedEvent.WaitOne(timeoutMilliseconds) == false) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Now wait until all running tasks have finished
|
||||
while(Thread.VolatileRead(ref this.runningThreadCount) > 0) {
|
||||
if(this.threadTerminatedEvent.WaitOne(timeoutMilliseconds) == false) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
|
||||
}
|
||||
|
||||
/// <summary>Called in a thread to execute a single task</summary>
|
||||
/// <param name="task">Task that should be executed</param>
|
||||
/// <param name="cancellationToken">
|
||||
/// Cancellation token through which the method can be signalled to cancel
|
||||
/// </param>
|
||||
protected abstract void Run(TTask task, CancellationToken cancellationToken);
|
||||
|
||||
/// <summary>
|
||||
/// Runs queued tasks of the parallel background worker until the queue is empty
|
||||
/// </summary>
|
||||
private void runQueuedTasksInThread() {
|
||||
string previousThreadName = null;
|
||||
if(!string.IsNullOrEmpty(this.threadName)) {
|
||||
previousThreadName = Thread.CurrentThread.Name;
|
||||
Thread.CurrentThread.Name = this.threadName;
|
||||
}
|
||||
try {
|
||||
for(;;) {
|
||||
TTask task;
|
||||
lock(this.queueSynchronizationRoot) {
|
||||
if(this.tasks.Count == 0) {
|
||||
--this.runningThreadCount;
|
||||
break;
|
||||
}
|
||||
task = this.tasks.Dequeue();
|
||||
}
|
||||
|
||||
try {
|
||||
Run(task, this.cancellationTokenSource.Token);
|
||||
}
|
||||
catch(Exception exception) {
|
||||
this.exceptions.Add(exception);
|
||||
}
|
||||
}
|
||||
|
||||
this.threadTerminatedEvent.Set();
|
||||
}
|
||||
finally {
|
||||
if(!string.IsNullOrEmpty(this.threadName)) {
|
||||
Thread.CurrentThread.Name = previousThreadName;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>Number of task still waiting to be executed</summary>
|
||||
private int queuedTaskCount {
|
||||
get {
|
||||
lock(this.queueSynchronizationRoot) {
|
||||
return this.tasks.Count;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Name that will be assigned to the worker threads while they're processing tasks
|
||||
/// for the parallel background worker
|
||||
/// </summary>
|
||||
private string threadName;
|
||||
/// <summary>Number of threads to use simultaneously</summary>
|
||||
private int threadCount;
|
||||
|
||||
/// <summary>Synchronization root used to access the task queue and thread list</summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// Both lists are intentionally using a shared synchronization root because this will
|
||||
/// guarantee that there's no race condition between adding work and a thread finishing:
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Main thread
|
||||
/// <code>
|
||||
/// lock(this.queueSynchronizationRoot) {
|
||||
/// this.tasks.Add(newTask);
|
||||
///
|
||||
/// if(this.runningThreads.Count < maximumThreadCount) {
|
||||
/// startAdditionalTaskThread();
|
||||
/// }
|
||||
/// }
|
||||
/// </code>
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Task thread
|
||||
/// <code>
|
||||
/// TTask myTask;
|
||||
/// lock(this.queueSynchronizationRoot) {
|
||||
/// this.tasks.TryGet(out myTask);
|
||||
///
|
||||
/// // RACE CONDITION WITHOUT LOCK: if this wasn't inside the same lock as the task
|
||||
/// // checking, the main thread might check at this point, see that there are already
|
||||
/// // enough threads running!
|
||||
/// if(myTask == null) {
|
||||
/// this.runningThreads.Remove(thisThread);
|
||||
/// return;
|
||||
/// }
|
||||
/// }
|
||||
/// </code>
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// The race condition mentioned above could completely stall the background worker by
|
||||
/// leaving tasks in the queue but no threads active to execute them, so it is vital to
|
||||
/// use this method of synchronization!
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
///
|
||||
private object queueSynchronizationRoot;
|
||||
|
||||
/// <summary>Delegate for the runQueuedTasksInThread() method</summary>
|
||||
private Action runQueuedTasksInThreadDelegate;
|
||||
/// <summary>Tasks remaining to be processed</summary>
|
||||
private Queue<TTask> tasks;
|
||||
/// <summary>Threads that are currently executing tasks</summary>
|
||||
private int runningThreadCount;
|
||||
|
||||
/// <summary>Exceptions that have occurred while executing tasks</summary>
|
||||
private ConcurrentBag<Exception> exceptions;
|
||||
/// <summary>Event that is triggered whenever a task gets completed</summary>
|
||||
private AutoResetEvent threadTerminatedEvent;
|
||||
|
||||
/// <summary>
|
||||
/// Provides a cancellation token that can be used to signal a thread to terminate
|
||||
/// </summary>
|
||||
private CancellationTokenSource cancellationTokenSource;
|
||||
|
||||
}
|
||||
|
||||
} // namespace Nuclex.Support.Threading
|
||||
|
||||
#endif // !NO_CONCURRENT_COLLECTIONS
|
Loading…
Add table
Add a link
Reference in a new issue