Changed license to Apache License 2.0

This commit is contained in:
Markus Ewald 2024-06-13 18:36:21 +02:00 committed by cygon
parent d3bf0be9d7
commit 9f36d71529
144 changed files with 32422 additions and 32544 deletions

View file

@ -1,356 +1,355 @@
#region CPL License
/*
Nuclex Framework
Copyright (C) 2002-2017 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#endregion
#if UNITTEST
using System;
using System.Collections.Generic;
using System.Threading;
using NUnit.Framework;
namespace Nuclex.Support.Threading {
/// <summary>Unit Test for the CPU core-affine thread pool</summary>
[TestFixture]
internal class AffineThreadPoolTest {
#region class TestTask
/// <summary>ThreadPool task that can be used for testing</summary>
private class TestTask : IDisposable {
/// <summary>Initializes a new test task</summary>
public TestTask() {
this.callbackEvent = new ManualResetEvent(false);
}
/// <summary>Immediately releases all resources owned by the instance</summary>
public void Dispose() {
if(this.callbackEvent != null) {
this.callbackEvent.Close();
this.callbackEvent = null;
}
}
/// <summary>Callback that can be added to the thread pool as a task</summary>
/// <param name="state">User defined state</param>
public void Callback(object state) {
this.LastCallbackState = state;
this.callbackEvent.Set();
}
/// <summary>Event that will be set when the callback is executed</summary>
public ManualResetEvent CallbackEvent {
get { return this.callbackEvent; }
}
/// <summary>
/// State parameter that was provide when the callback was called
/// </summary>
public volatile object LastCallbackState;
/// <summary>Event that will be set when the callback is invoked</summary>
private ManualResetEvent callbackEvent;
}
#endregion // class TestTask
#region class WaitTask
/// <summary>ThreadPool task that can be used for testing</summary>
private class WaitTask : IDisposable {
/// <summary>Initializes a new test task</summary>
public WaitTask() {
this.startEvent = new ManualResetEvent(false);
this.finishEvent = new ManualResetEvent(false);
this.waitEvent = new ManualResetEvent(false);
}
/// <summary>Immediately releases all resources owned by the instance</summary>
public void Dispose() {
if(this.waitEvent != null) {
this.waitEvent.Close();
this.waitEvent = null;
}
if(this.finishEvent != null) {
this.finishEvent.Close();
this.finishEvent = null;
}
if(this.startEvent != null) {
this.startEvent.Close();
this.startEvent = null;
}
}
/// <summary>Callback that can be added to the thread pool as a task</summary>
/// <param name="state">User defined state</param>
public void Callback(object state) {
this.LastCallbackState = state;
this.startEvent.Set();
this.waitEvent.WaitOne();
this.finishEvent.Set();
}
/// <summary>Event that will be set when the callback has started</summary>
public ManualResetEvent StartEvent {
get { return this.startEvent; }
}
/// <summary>Event that will be set when the callback has finished</summary>
public ManualResetEvent FinishEvent {
get { return this.finishEvent; }
}
/// <summary>Event that blocks the callback</summary>
public ManualResetEvent WaitEvent {
get { return this.waitEvent; }
}
/// <summary>
/// State parameter that was provide when the callback was called
/// </summary>
public volatile object LastCallbackState;
/// <summary>Event that will be set when the callback has started</summary>
private ManualResetEvent startEvent;
/// <summary>Event that will be set when the callback has finished</summary>
private ManualResetEvent finishEvent;
/// <summary>Event used to block the callback</summary>
private ManualResetEvent waitEvent;
}
#endregion // class WaitTask
#if false
#region class ThrowingDisposable
/// <summary>Throws an exception when it is disposed</summary>
private class ThrowingDisposable : IDisposable {
/// <summary>Immediately releases all resources owned by the instance</summary>
public void Dispose() {
throw new ArithmeticException("Simulated exception for unit testing");
}
}
#endregion // class ThrowingDisposable
/// <summary>
/// Verifies that the Thread Pool's default assertion handler is working
/// </summary>
[Test]
public void TestDefaultAssertionHandler() {
// We can't test a failing assertion because our tests need to run
// unattended on a build server without blocking for user input.
AffineThreadPool.DefaultAssertionHandler(
true, "Unit test", "This should not fail"
);
}
#endif
/// <summary>Tests whether the QueueUserWorkItem() method is working</summary>
[Test]
public void TestQueueUserWorkItem() {
using(TestTask task = new TestTask()) {
AffineThreadPool.QueueUserWorkItem(task.Callback);
Assert.IsTrue(task.CallbackEvent.WaitOne(1000));
}
}
/// <summary>
/// Verifies that the QueueUserWorkItem() method is passing the state parameter
/// on to the callback
/// </summary>
[Test]
public void TestQueueUserWorkItemWithState() {
using(TestTask task = new TestTask()) {
object state = new object();
AffineThreadPool.QueueUserWorkItem(task.Callback, state);
Assert.IsTrue(task.CallbackEvent.WaitOne(1000));
Assert.AreSame(state, task.LastCallbackState);
}
}
/// <summary>
/// Tests whether the thread pool can handle an exception from a user work item
/// </summary>
[Test]
public void TestExceptionFromUserWorkItem() {
using(ManualResetEvent exceptionEvent = new ManualResetEvent(false)) {
AffineThreadPool.ExceptionDelegate oldExceptionHandler =
AffineThreadPool.ExceptionHandler;
AffineThreadPool.ExceptionHandler = delegate(Exception exception) {
exceptionEvent.Set();
};
try {
AffineThreadPool.QueueUserWorkItem(
delegate(object state) { throw new KeyNotFoundException(); }
);
Assert.IsTrue(exceptionEvent.WaitOne(1000));
}
finally {
AffineThreadPool.ExceptionHandler = oldExceptionHandler;
}
}
}
/// <summary>
/// Verifies that the affine thread pool's maximum thread count equals
/// the number of logical processors in the system
/// </summary>
[Test]
public void TestMaxThreadsProperty() {
Assert.AreEqual(Environment.ProcessorCount, AffineThreadPool.MaxThreads);
}
#if WINDOWS
/// <summary>
/// Verifies that the ProcessThread instance for a system thread id can
/// be determined using the GetProcessThread() method
/// </summary>
[Test]
public void CanGetProcessThreadForManagedThread() {
if(Environment.OSVersion.Platform == PlatformID.Win32NT) {
Thread.BeginThreadAffinity();
try {
int threadId = AffineThreadPool.GetCurrentThreadId();
Assert.IsNotNull(AffineThreadPool.GetProcessThread(threadId));
Assert.IsNull(AffineThreadPool.GetProcessThread(0));
}
finally {
Thread.EndThreadAffinity();
}
}
}
#endif // WINDOWS
/// <summary>
/// Tests whether the afine thread pool's default exception handler works
/// as expected
/// </summary>
[Test]
public void TestDefaultExceptionHandler() {
Assert.Throws<ArrayTypeMismatchException>(
delegate() {
AffineThreadPool.ExceptionHandler(new ArrayTypeMismatchException("Test"));
}
);
}
/// <summary>
/// Verifies that the waiting work items count and active thread count are
/// updated by the thread pool.
/// </summary>
[Test]
public void TestWaitingWorkItemsProperty() {
int eventCount = AffineThreadPool.Processors;
WaitTask[] tasks = new WaitTask[eventCount];
int createdTasks = 0;
try {
// CHECK: Is there danger that the thread pool still has not finished
// queued items for other unit tests, thereby failing to meet
// our expected task counts?
// Create the tasks, counting up the created task counter. If an exception
// occurs, we will roll back from there.
for(createdTasks = 0; createdTasks < eventCount; ++createdTasks) {
tasks[createdTasks] = new WaitTask();
}
// Schedule the blocking tasks in the thread pool so it will not be able
// to process the next task we add to the queue
for(int index = 0; index < eventCount; ++index) {
AffineThreadPool.QueueUserWorkItem(tasks[index].Callback);
}
// Wait for the tasks to start so they aren't preempted by the tasks we're
// going to add (which would finish immediately). The affine thread pool
// works on a first come first serve basis, but we don't want to rely on this
// implementation detail in the unit test.
for(int index = 0; index < eventCount; ++index) {
Assert.IsTrue(
tasks[index].StartEvent.WaitOne(10000),
"Task " + index.ToString() + " was started"
);
}
// All Thread should now be active and no work items should be waiting
Assert.AreEqual(
createdTasks, AffineThreadPool.ActiveThreads,
"ActiveThreads property equals number of tasks"
);
Assert.AreEqual(
0, AffineThreadPool.WaitingWorkItems,
"No waiting work items are in the queue"
);
// Add a task to the queue and make sure the waiting work item count goes up
AffineThreadPool.QueueUserWorkItem(delegate(object state) { });
Assert.AreEqual(
1, AffineThreadPool.WaitingWorkItems,
"Added work item is waiting in the queue"
);
// The same again. Now we should have 2 work items sitting in the queue
AffineThreadPool.QueueUserWorkItem(delegate(object state) { });
Assert.AreEqual(
2, AffineThreadPool.WaitingWorkItems,
"Both added work items are waiting in the queue"
);
// Let the WaitTasks finish so we're not blocking the thread pool any longer
for(int index = 0; index < eventCount; ++index) {
tasks[index].WaitEvent.Set();
}
// Wait for the tasks to end before we get rid of them
for(int index = 0; index < eventCount; ++index) {
Assert.IsTrue(
tasks[index].FinishEvent.WaitOne(1000),
"Task " + index.ToString() + " has finished"
);
}
}
finally {
for(--createdTasks; createdTasks >= 0; --createdTasks) {
tasks[createdTasks].Dispose();
}
}
}
}
} // namespace Nuclex.Support.Threading
#endif // UNITTEST
#region Apache License 2.0
/*
Nuclex .NET Framework
Copyright (C) 2002-2024 Markus Ewald / Nuclex Development Labs
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#endregion // Apache License 2.0
#if UNITTEST
using System;
using System.Collections.Generic;
using System.Threading;
using NUnit.Framework;
namespace Nuclex.Support.Threading {
/// <summary>Unit Test for the CPU core-affine thread pool</summary>
[TestFixture]
internal class AffineThreadPoolTest {
#region class TestTask
/// <summary>ThreadPool task that can be used for testing</summary>
private class TestTask : IDisposable {
/// <summary>Initializes a new test task</summary>
public TestTask() {
this.callbackEvent = new ManualResetEvent(false);
}
/// <summary>Immediately releases all resources owned by the instance</summary>
public void Dispose() {
if(this.callbackEvent != null) {
this.callbackEvent.Close();
this.callbackEvent = null;
}
}
/// <summary>Callback that can be added to the thread pool as a task</summary>
/// <param name="state">User defined state</param>
public void Callback(object state) {
this.LastCallbackState = state;
this.callbackEvent.Set();
}
/// <summary>Event that will be set when the callback is executed</summary>
public ManualResetEvent CallbackEvent {
get { return this.callbackEvent; }
}
/// <summary>
/// State parameter that was provide when the callback was called
/// </summary>
public volatile object LastCallbackState;
/// <summary>Event that will be set when the callback is invoked</summary>
private ManualResetEvent callbackEvent;
}
#endregion // class TestTask
#region class WaitTask
/// <summary>ThreadPool task that can be used for testing</summary>
private class WaitTask : IDisposable {
/// <summary>Initializes a new test task</summary>
public WaitTask() {
this.startEvent = new ManualResetEvent(false);
this.finishEvent = new ManualResetEvent(false);
this.waitEvent = new ManualResetEvent(false);
}
/// <summary>Immediately releases all resources owned by the instance</summary>
public void Dispose() {
if(this.waitEvent != null) {
this.waitEvent.Close();
this.waitEvent = null;
}
if(this.finishEvent != null) {
this.finishEvent.Close();
this.finishEvent = null;
}
if(this.startEvent != null) {
this.startEvent.Close();
this.startEvent = null;
}
}
/// <summary>Callback that can be added to the thread pool as a task</summary>
/// <param name="state">User defined state</param>
public void Callback(object state) {
this.LastCallbackState = state;
this.startEvent.Set();
this.waitEvent.WaitOne();
this.finishEvent.Set();
}
/// <summary>Event that will be set when the callback has started</summary>
public ManualResetEvent StartEvent {
get { return this.startEvent; }
}
/// <summary>Event that will be set when the callback has finished</summary>
public ManualResetEvent FinishEvent {
get { return this.finishEvent; }
}
/// <summary>Event that blocks the callback</summary>
public ManualResetEvent WaitEvent {
get { return this.waitEvent; }
}
/// <summary>
/// State parameter that was provide when the callback was called
/// </summary>
public volatile object LastCallbackState;
/// <summary>Event that will be set when the callback has started</summary>
private ManualResetEvent startEvent;
/// <summary>Event that will be set when the callback has finished</summary>
private ManualResetEvent finishEvent;
/// <summary>Event used to block the callback</summary>
private ManualResetEvent waitEvent;
}
#endregion // class WaitTask
#if false
#region class ThrowingDisposable
/// <summary>Throws an exception when it is disposed</summary>
private class ThrowingDisposable : IDisposable {
/// <summary>Immediately releases all resources owned by the instance</summary>
public void Dispose() {
throw new ArithmeticException("Simulated exception for unit testing");
}
}
#endregion // class ThrowingDisposable
/// <summary>
/// Verifies that the Thread Pool's default assertion handler is working
/// </summary>
[Test]
public void TestDefaultAssertionHandler() {
// We can't test a failing assertion because our tests need to run
// unattended on a build server without blocking for user input.
AffineThreadPool.DefaultAssertionHandler(
true, "Unit test", "This should not fail"
);
}
#endif
/// <summary>Tests whether the QueueUserWorkItem() method is working</summary>
[Test]
public void TestQueueUserWorkItem() {
using(TestTask task = new TestTask()) {
AffineThreadPool.QueueUserWorkItem(task.Callback);
Assert.IsTrue(task.CallbackEvent.WaitOne(1000));
}
}
/// <summary>
/// Verifies that the QueueUserWorkItem() method is passing the state parameter
/// on to the callback
/// </summary>
[Test]
public void TestQueueUserWorkItemWithState() {
using(TestTask task = new TestTask()) {
object state = new object();
AffineThreadPool.QueueUserWorkItem(task.Callback, state);
Assert.IsTrue(task.CallbackEvent.WaitOne(1000));
Assert.AreSame(state, task.LastCallbackState);
}
}
/// <summary>
/// Tests whether the thread pool can handle an exception from a user work item
/// </summary>
[Test]
public void TestExceptionFromUserWorkItem() {
using(ManualResetEvent exceptionEvent = new ManualResetEvent(false)) {
AffineThreadPool.ExceptionDelegate oldExceptionHandler =
AffineThreadPool.ExceptionHandler;
AffineThreadPool.ExceptionHandler = delegate(Exception exception) {
exceptionEvent.Set();
};
try {
AffineThreadPool.QueueUserWorkItem(
delegate(object state) { throw new KeyNotFoundException(); }
);
Assert.IsTrue(exceptionEvent.WaitOne(1000));
}
finally {
AffineThreadPool.ExceptionHandler = oldExceptionHandler;
}
}
}
/// <summary>
/// Verifies that the affine thread pool's maximum thread count equals
/// the number of logical processors in the system
/// </summary>
[Test]
public void TestMaxThreadsProperty() {
Assert.AreEqual(Environment.ProcessorCount, AffineThreadPool.MaxThreads);
}
#if WINDOWS
/// <summary>
/// Verifies that the ProcessThread instance for a system thread id can
/// be determined using the GetProcessThread() method
/// </summary>
[Test]
public void CanGetProcessThreadForManagedThread() {
if(Environment.OSVersion.Platform == PlatformID.Win32NT) {
Thread.BeginThreadAffinity();
try {
int threadId = AffineThreadPool.GetCurrentThreadId();
Assert.IsNotNull(AffineThreadPool.GetProcessThread(threadId));
Assert.IsNull(AffineThreadPool.GetProcessThread(0));
}
finally {
Thread.EndThreadAffinity();
}
}
}
#endif // WINDOWS
/// <summary>
/// Tests whether the afine thread pool's default exception handler works
/// as expected
/// </summary>
[Test]
public void TestDefaultExceptionHandler() {
Assert.Throws<ArrayTypeMismatchException>(
delegate() {
AffineThreadPool.ExceptionHandler(new ArrayTypeMismatchException("Test"));
}
);
}
/// <summary>
/// Verifies that the waiting work items count and active thread count are
/// updated by the thread pool.
/// </summary>
[Test]
public void TestWaitingWorkItemsProperty() {
int eventCount = AffineThreadPool.Processors;
WaitTask[] tasks = new WaitTask[eventCount];
int createdTasks = 0;
try {
// CHECK: Is there danger that the thread pool still has not finished
// queued items for other unit tests, thereby failing to meet
// our expected task counts?
// Create the tasks, counting up the created task counter. If an exception
// occurs, we will roll back from there.
for(createdTasks = 0; createdTasks < eventCount; ++createdTasks) {
tasks[createdTasks] = new WaitTask();
}
// Schedule the blocking tasks in the thread pool so it will not be able
// to process the next task we add to the queue
for(int index = 0; index < eventCount; ++index) {
AffineThreadPool.QueueUserWorkItem(tasks[index].Callback);
}
// Wait for the tasks to start so they aren't preempted by the tasks we're
// going to add (which would finish immediately). The affine thread pool
// works on a first come first serve basis, but we don't want to rely on this
// implementation detail in the unit test.
for(int index = 0; index < eventCount; ++index) {
Assert.IsTrue(
tasks[index].StartEvent.WaitOne(10000),
"Task " + index.ToString() + " was started"
);
}
// All Thread should now be active and no work items should be waiting
Assert.AreEqual(
createdTasks, AffineThreadPool.ActiveThreads,
"ActiveThreads property equals number of tasks"
);
Assert.AreEqual(
0, AffineThreadPool.WaitingWorkItems,
"No waiting work items are in the queue"
);
// Add a task to the queue and make sure the waiting work item count goes up
AffineThreadPool.QueueUserWorkItem(delegate(object state) { });
Assert.AreEqual(
1, AffineThreadPool.WaitingWorkItems,
"Added work item is waiting in the queue"
);
// The same again. Now we should have 2 work items sitting in the queue
AffineThreadPool.QueueUserWorkItem(delegate(object state) { });
Assert.AreEqual(
2, AffineThreadPool.WaitingWorkItems,
"Both added work items are waiting in the queue"
);
// Let the WaitTasks finish so we're not blocking the thread pool any longer
for(int index = 0; index < eventCount; ++index) {
tasks[index].WaitEvent.Set();
}
// Wait for the tasks to end before we get rid of them
for(int index = 0; index < eventCount; ++index) {
Assert.IsTrue(
tasks[index].FinishEvent.WaitOne(1000),
"Task " + index.ToString() + " has finished"
);
}
}
finally {
for(--createdTasks; createdTasks >= 0; --createdTasks) {
tasks[createdTasks].Dispose();
}
}
}
}
} // namespace Nuclex.Support.Threading
#endif // UNITTEST

View file

@ -1,296 +1,295 @@
#region CPL License
/*
Nuclex Framework
Copyright (C) 2002-2017 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
namespace Nuclex.Support.Threading {
/// <summary>Alternative Thread pool providing one thread for each core</summary>
/// <remarks>
/// <para>
/// Unlike the normal thread pool, the affine thread pool provides only as many
/// threads as there are CPU cores available on the current platform. This makes
/// it more suitable for tasks you want to spread across all available cpu cores
/// explicitly.
/// </para>
/// <para>
/// However, it's not a good match if you want to run blocking or waiting tasks
/// inside the thread pool because the limited available threads will become
/// congested quickly. It is encouraged to use this class in parallel with
/// .NET's own thread pool, putting tasks that can block into the .NET thread
/// pool and tasks that perform pure processing into the affine thread pool.
/// </para>
/// <para>
/// Implementation based on original code provided by Stephen Toub
/// (stoub at microsoft ignorethis dot com)
/// </para>
/// </remarks>
public static class AffineThreadPool {
/// <summary>Number of CPU cores available on the system</summary>
public static readonly int Processors = Environment.ProcessorCount;
/// <summary>Delegate used by the thread pool to report unhandled exceptions</summary>
/// <param name="exception">Exception that has not been handled</param>
public delegate void ExceptionDelegate(Exception exception);
#region class UserWorkItem
/// <summary>Used to hold a callback delegate and the state for that delegate.</summary>
private struct UserWorkItem {
/// <summary>Initialize the callback holding object.</summary>
/// <param name="callback">Callback delegate for the callback.</param>
/// <param name="state">State with which to call the callback delegate.</param>
public UserWorkItem(WaitCallback callback, object state) {
this.Callback = callback;
this.State = state;
}
/// <summary>Callback delegate for the callback.</summary>
public WaitCallback Callback;
/// <summary>State with which to call the callback delegate.</summary>
public object State;
}
#endregion // class UserWorkItem
/// <summary>Initializes the thread pool</summary>
static AffineThreadPool() {
// Create our thread stores; we handle synchronization ourself
// as we may run into situations where multiple operations need to be atomic.
// We keep track of the threads we've created just for good measure; not actually
// needed for any core functionality.
workAvailable = new System.Threading.Semaphore(0, int.MaxValue);
userWorkItems = new Queue<UserWorkItem>(Processors * 4);
workerThreads = new List<Thread>(Processors);
inUseThreads = 0;
// We can use all cores on a PC, starting from index 1
hardwareThreads = new Queue<int>(Processors);
for(int core = Processors; core >= 1; --core) {
hardwareThreads.Enqueue(core);
}
// Create all of the worker threads
for(int index = 0; index < Processors; index++) {
// Create a new thread and add it to the list of threads.
Thread newThread = new Thread(new ThreadStart(ProcessQueuedItems));
workerThreads.Add(newThread);
// Configure the new thread and start it
newThread.Name = "Nuclex.Support.AffineThreadPool Thread #" + index.ToString();
newThread.IsBackground = true;
newThread.Start();
}
}
/// <summary>Queues a user work item to the thread pool</summary>
/// <param name="callback">
/// A WaitCallback representing the delegate to invoke when a thread in the
/// thread pool picks up the work item
/// </param>
public static void QueueUserWorkItem(WaitCallback callback) {
// Queue the delegate with no state
QueueUserWorkItem(callback, null);
}
/// <summary>Queues a user work item to the thread pool.</summary>
/// <param name="callback">
/// A WaitCallback representing the delegate to invoke when a thread in the
/// thread pool picks up the work item
/// </param>
/// <param name="state">
/// The object that is passed to the delegate when serviced from the thread pool
/// </param>
public static void QueueUserWorkItem(WaitCallback callback, object state) {
// Create a waiting callback that contains the delegate and its state.
// Add it to the processing queue, and signal that data is waiting.
UserWorkItem waiting = new UserWorkItem(callback, state);
lock(userWorkItems) {
userWorkItems.Enqueue(waiting);
}
// Wake up one of the worker threads so this task will be processed
workAvailable.Release();
}
/// <summary>Gets the number of threads at the disposal of the thread pool</summary>
public static int MaxThreads { get { return Processors; } }
/// <summary>Gets the number of currently active threads in the thread pool</summary>
public static int ActiveThreads { get { return inUseThreads; } }
/// <summary>
/// Gets the number of callback delegates currently waiting in the thread pool
/// </summary>
public static int WaitingWorkItems {
get {
lock(userWorkItems) {
return userWorkItems.Count;
}
}
}
/// <summary>
/// Default handler used to respond to unhandled exceptions in ThreadPool threads
/// </summary>
/// <param name="exception">Exception that has occurred</param>
internal static void DefaultExceptionHandler(Exception exception) {
throw exception;
}
#if WINDOWS
/// <summary>Retrieves the ProcessThread for the calling thread</summary>
/// <returns>The ProcessThread for the calling thread</returns>
internal static ProcessThread GetProcessThread(int threadId) {
ProcessThreadCollection threads = Process.GetCurrentProcess().Threads;
for(int index = 0; index < threads.Count; ++index) {
if(threads[index].Id == threadId) {
return threads[index];
}
}
return null;
}
#endif
/// <summary>A thread worker function that processes items from the work queue</summary>
private static void ProcessQueuedItems() {
// Get the system/hardware thread index this thread is going to use. We hope that
// the threads more or less start after each other, but there is no guarantee that
// tasks will be handled by the CPU cores in the order the queue was filled with.
// This could be added, though, by using a WaitHandle so the thread creator could
// wait for each thread to take one entry out of the queue.
int hardwareThreadIndex;
lock(hardwareThreads) {
hardwareThreadIndex = hardwareThreads.Dequeue();
}
#if WINDOWS
if(Environment.OSVersion.Platform == PlatformID.Win32NT) {
// Prevent this managed thread from impersonating another system thread.
// In .NET, managed threads can supposedly be moved to different system threads
// and, more worryingly, even fibers. This should make sure we're sitting on
// a normal system thread and stay with that thread during our lifetime.
Thread.BeginThreadAffinity();
// Assign the ideal processor, but don't force it. It's not a good idea to
// circumvent the thread scheduler of a desktop machine, so we try to play nice.
int threadId = GetCurrentThreadId();
ProcessThread thread = GetProcessThread(threadId);
if(thread != null) {
thread.IdealProcessor = hardwareThreadIndex;
}
}
#endif
// Keep processing tasks indefinitely
for(; ; ) {
UserWorkItem workItem = getNextWorkItem();
// Execute the work item we just picked up. Make sure to accurately
// record how many callbacks are currently executing.
Interlocked.Increment(ref inUseThreads);
try {
workItem.Callback(workItem.State);
}
catch(Exception exception) { // Make sure we don't throw here.
ExceptionDelegate exceptionHandler = ExceptionHandler;
if(exceptionHandler != null) {
exceptionHandler(exception);
}
}
finally {
Interlocked.Decrement(ref inUseThreads);
}
}
}
/// <summary>Obtains the next work item from the queue</summary>
/// <returns>The next work item in the queue</returns>
/// <remarks>
/// If the queue is empty, the call will block until an item is added to
/// the queue and the calling thread was the one picking it up.
/// </remarks>
private static UserWorkItem getNextWorkItem() {
// Get the next item in the queue. If there is nothing there, go to sleep
// for a while until we're woken up when a callback is waiting.
for(; ; ) {
// Try to get the next callback available. We need to lock on the
// queue in order to make our count check and retrieval atomic.
lock(userWorkItems) {
if(userWorkItems.Count > 0) {
return userWorkItems.Dequeue();
}
}
// If we can't get one, go to sleep. The semaphore blocks until work
// becomes available (then acting like an AutoResetEvent that counts
// how often it has been triggered and letting that number of threads
// pass through.)
workAvailable.WaitOne();
}
}
/// <summary>Delegate used to handle assertion checks in the code</summary>
public static volatile ExceptionDelegate ExceptionHandler = DefaultExceptionHandler;
#if WINDOWS
/// <summary>Retrieves the calling thread's thread id</summary>
/// <returns>The thread is of the calling thread</returns>
[DllImport("kernel32.dll")]
internal static extern int GetCurrentThreadId();
#endif
/// <summary>Available hardware threads the thread pool threads pick from</summary>
private static Queue<int> hardwareThreads;
/// <summary>Queue of all the callbacks waiting to be executed.</summary>
private static Queue<UserWorkItem> userWorkItems;
/// <summary>
/// Used to let the threads in the thread pool wait for new work to appear.
/// </summary>
private static System.Threading.Semaphore workAvailable;
/// <summary>List of all worker threads at the disposal of the thread pool.</summary>
private static List<Thread> workerThreads;
/// <summary>Number of threads currently active.</summary>
private static int inUseThreads;
}
} // namespace Nuclex.Support.Threading
#region Apache License 2.0
/*
Nuclex .NET Framework
Copyright (C) 2002-2024 Markus Ewald / Nuclex Development Labs
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#endregion // Apache License 2.0
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
namespace Nuclex.Support.Threading {
/// <summary>Alternative Thread pool providing one thread for each core</summary>
/// <remarks>
/// <para>
/// Unlike the normal thread pool, the affine thread pool provides only as many
/// threads as there are CPU cores available on the current platform. This makes
/// it more suitable for tasks you want to spread across all available cpu cores
/// explicitly.
/// </para>
/// <para>
/// However, it's not a good match if you want to run blocking or waiting tasks
/// inside the thread pool because the limited available threads will become
/// congested quickly. It is encouraged to use this class in parallel with
/// .NET's own thread pool, putting tasks that can block into the .NET thread
/// pool and tasks that perform pure processing into the affine thread pool.
/// </para>
/// <para>
/// Implementation based on original code provided by Stephen Toub
/// (stoub at microsoft ignorethis dot com)
/// </para>
/// </remarks>
public static class AffineThreadPool {
/// <summary>Number of CPU cores available on the system</summary>
public static readonly int Processors = Environment.ProcessorCount;
/// <summary>Delegate used by the thread pool to report unhandled exceptions</summary>
/// <param name="exception">Exception that has not been handled</param>
public delegate void ExceptionDelegate(Exception exception);
#region class UserWorkItem
/// <summary>Used to hold a callback delegate and the state for that delegate.</summary>
private struct UserWorkItem {
/// <summary>Initialize the callback holding object.</summary>
/// <param name="callback">Callback delegate for the callback.</param>
/// <param name="state">State with which to call the callback delegate.</param>
public UserWorkItem(WaitCallback callback, object state) {
this.Callback = callback;
this.State = state;
}
/// <summary>Callback delegate for the callback.</summary>
public WaitCallback Callback;
/// <summary>State with which to call the callback delegate.</summary>
public object State;
}
#endregion // class UserWorkItem
/// <summary>Initializes the thread pool</summary>
static AffineThreadPool() {
// Create our thread stores; we handle synchronization ourself
// as we may run into situations where multiple operations need to be atomic.
// We keep track of the threads we've created just for good measure; not actually
// needed for any core functionality.
workAvailable = new System.Threading.Semaphore(0, int.MaxValue);
userWorkItems = new Queue<UserWorkItem>(Processors * 4);
workerThreads = new List<Thread>(Processors);
inUseThreads = 0;
// We can use all cores on a PC, starting from index 1
hardwareThreads = new Queue<int>(Processors);
for(int core = Processors; core >= 1; --core) {
hardwareThreads.Enqueue(core);
}
// Create all of the worker threads
for(int index = 0; index < Processors; index++) {
// Create a new thread and add it to the list of threads.
Thread newThread = new Thread(new ThreadStart(ProcessQueuedItems));
workerThreads.Add(newThread);
// Configure the new thread and start it
newThread.Name = "Nuclex.Support.AffineThreadPool Thread #" + index.ToString();
newThread.IsBackground = true;
newThread.Start();
}
}
/// <summary>Queues a user work item to the thread pool</summary>
/// <param name="callback">
/// A WaitCallback representing the delegate to invoke when a thread in the
/// thread pool picks up the work item
/// </param>
public static void QueueUserWorkItem(WaitCallback callback) {
// Queue the delegate with no state
QueueUserWorkItem(callback, null);
}
/// <summary>Queues a user work item to the thread pool.</summary>
/// <param name="callback">
/// A WaitCallback representing the delegate to invoke when a thread in the
/// thread pool picks up the work item
/// </param>
/// <param name="state">
/// The object that is passed to the delegate when serviced from the thread pool
/// </param>
public static void QueueUserWorkItem(WaitCallback callback, object state) {
// Create a waiting callback that contains the delegate and its state.
// Add it to the processing queue, and signal that data is waiting.
UserWorkItem waiting = new UserWorkItem(callback, state);
lock(userWorkItems) {
userWorkItems.Enqueue(waiting);
}
// Wake up one of the worker threads so this task will be processed
workAvailable.Release();
}
/// <summary>Gets the number of threads at the disposal of the thread pool</summary>
public static int MaxThreads { get { return Processors; } }
/// <summary>Gets the number of currently active threads in the thread pool</summary>
public static int ActiveThreads { get { return inUseThreads; } }
/// <summary>
/// Gets the number of callback delegates currently waiting in the thread pool
/// </summary>
public static int WaitingWorkItems {
get {
lock(userWorkItems) {
return userWorkItems.Count;
}
}
}
/// <summary>
/// Default handler used to respond to unhandled exceptions in ThreadPool threads
/// </summary>
/// <param name="exception">Exception that has occurred</param>
internal static void DefaultExceptionHandler(Exception exception) {
throw exception;
}
#if WINDOWS
/// <summary>Retrieves the ProcessThread for the calling thread</summary>
/// <returns>The ProcessThread for the calling thread</returns>
internal static ProcessThread GetProcessThread(int threadId) {
ProcessThreadCollection threads = Process.GetCurrentProcess().Threads;
for(int index = 0; index < threads.Count; ++index) {
if(threads[index].Id == threadId) {
return threads[index];
}
}
return null;
}
#endif
/// <summary>A thread worker function that processes items from the work queue</summary>
private static void ProcessQueuedItems() {
// Get the system/hardware thread index this thread is going to use. We hope that
// the threads more or less start after each other, but there is no guarantee that
// tasks will be handled by the CPU cores in the order the queue was filled with.
// This could be added, though, by using a WaitHandle so the thread creator could
// wait for each thread to take one entry out of the queue.
int hardwareThreadIndex;
lock(hardwareThreads) {
hardwareThreadIndex = hardwareThreads.Dequeue();
}
#if WINDOWS
if(Environment.OSVersion.Platform == PlatformID.Win32NT) {
// Prevent this managed thread from impersonating another system thread.
// In .NET, managed threads can supposedly be moved to different system threads
// and, more worryingly, even fibers. This should make sure we're sitting on
// a normal system thread and stay with that thread during our lifetime.
Thread.BeginThreadAffinity();
// Assign the ideal processor, but don't force it. It's not a good idea to
// circumvent the thread scheduler of a desktop machine, so we try to play nice.
int threadId = GetCurrentThreadId();
ProcessThread thread = GetProcessThread(threadId);
if(thread != null) {
thread.IdealProcessor = hardwareThreadIndex;
}
}
#endif
// Keep processing tasks indefinitely
for(; ; ) {
UserWorkItem workItem = getNextWorkItem();
// Execute the work item we just picked up. Make sure to accurately
// record how many callbacks are currently executing.
Interlocked.Increment(ref inUseThreads);
try {
workItem.Callback(workItem.State);
}
catch(Exception exception) { // Make sure we don't throw here.
ExceptionDelegate exceptionHandler = ExceptionHandler;
if(exceptionHandler != null) {
exceptionHandler(exception);
}
}
finally {
Interlocked.Decrement(ref inUseThreads);
}
}
}
/// <summary>Obtains the next work item from the queue</summary>
/// <returns>The next work item in the queue</returns>
/// <remarks>
/// If the queue is empty, the call will block until an item is added to
/// the queue and the calling thread was the one picking it up.
/// </remarks>
private static UserWorkItem getNextWorkItem() {
// Get the next item in the queue. If there is nothing there, go to sleep
// for a while until we're woken up when a callback is waiting.
for(; ; ) {
// Try to get the next callback available. We need to lock on the
// queue in order to make our count check and retrieval atomic.
lock(userWorkItems) {
if(userWorkItems.Count > 0) {
return userWorkItems.Dequeue();
}
}
// If we can't get one, go to sleep. The semaphore blocks until work
// becomes available (then acting like an AutoResetEvent that counts
// how often it has been triggered and letting that number of threads
// pass through.)
workAvailable.WaitOne();
}
}
/// <summary>Delegate used to handle assertion checks in the code</summary>
public static volatile ExceptionDelegate ExceptionHandler = DefaultExceptionHandler;
#if WINDOWS
/// <summary>Retrieves the calling thread's thread id</summary>
/// <returns>The thread is of the calling thread</returns>
[DllImport("kernel32.dll")]
internal static extern int GetCurrentThreadId();
#endif
/// <summary>Available hardware threads the thread pool threads pick from</summary>
private static Queue<int> hardwareThreads;
/// <summary>Queue of all the callbacks waiting to be executed.</summary>
private static Queue<UserWorkItem> userWorkItems;
/// <summary>
/// Used to let the threads in the thread pool wait for new work to appear.
/// </summary>
private static System.Threading.Semaphore workAvailable;
/// <summary>List of all worker threads at the disposal of the thread pool.</summary>
private static List<Thread> workerThreads;
/// <summary>Number of threads currently active.</summary>
private static int inUseThreads;
}
} // namespace Nuclex.Support.Threading

View file

@ -1,49 +1,48 @@
#region CPL License
/*
Nuclex Framework
Copyright (C) 2002-2019 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#endregion
using System;
using System.Threading;
namespace Nuclex.Support.Threading {
/// <summary>Action with no arguments that can be cancelled</summary>
/// <param name="cancellationToken">
/// Cancellation token by which the action can be cancelled
/// </param>
public delegate void CancellableAction(CancellationToken cancellationToken);
/// <summary>Action with no arguments that can be cancelled</summary>
/// <param name="cancellationToken">
/// Cancellation token by which the action can be cancelled
/// </param>
/// <param name="arg1">First argument for the action</param>
public delegate void CancellableAction<in T1>(T1 arg1, CancellationToken cancellationToken);
/// <summary>Action with no arguments that can be cancelled</summary>
/// <param name="cancellationToken">
/// Cancellation token by which the action can be cancelled
/// </param>
/// <param name="arg1">First argument for the action</param>
/// <param name="arg2">Second argument for the action</param>
public delegate void CancellableAction<in T1, in T2>(
T1 arg1, T2 arg2, CancellationToken cancellationToken
);
} // namespace Nuclex.Support.Threading
#region Apache License 2.0
/*
Nuclex .NET Framework
Copyright (C) 2002-2024 Markus Ewald / Nuclex Development Labs
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#endregion // Apache License 2.0
using System;
using System.Threading;
namespace Nuclex.Support.Threading {
/// <summary>Action with no arguments that can be cancelled</summary>
/// <param name="cancellationToken">
/// Cancellation token by which the action can be cancelled
/// </param>
public delegate void CancellableAction(CancellationToken cancellationToken);
/// <summary>Action with no arguments that can be cancelled</summary>
/// <param name="cancellationToken">
/// Cancellation token by which the action can be cancelled
/// </param>
/// <param name="arg1">First argument for the action</param>
public delegate void CancellableAction<in T1>(T1 arg1, CancellationToken cancellationToken);
/// <summary>Action with no arguments that can be cancelled</summary>
/// <param name="cancellationToken">
/// Cancellation token by which the action can be cancelled
/// </param>
/// <param name="arg1">First argument for the action</param>
/// <param name="arg2">Second argument for the action</param>
public delegate void CancellableAction<in T1, in T2>(
T1 arg1, T2 arg2, CancellationToken cancellationToken
);
} // namespace Nuclex.Support.Threading

View file

@ -1,258 +1,257 @@
#region CPL License
/*
Nuclex Framework
Copyright (C) 2002-2017 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#endregion
#if !NO_CONCURRENT_COLLECTIONS
using System;
using System.Threading;
using System.Collections.Generic;
#if UNITTEST
using NUnit.Framework;
namespace Nuclex.Support.Threading {
/// <summary>Unit Test for the parallel background worker class</summary>
[TestFixture]
internal class ParallelBackgroundWorkerTest {
#region class TestWorker
/// <summary>Implementation of a background worker used for unit testing</summary>
#pragma warning disable 0618
private class TestWorker : ParallelBackgroundWorker<object> {
#pragma warning restore 0618
/// <summary>Initializes a new parallel background worker with unlimited threads</summary>
public TestWorker() : base() { }
/// <summary>
/// Initializes a new parallel background worker running the specified number
/// of tasks in parallel
/// </summary>
/// <param name="threadCount">
/// Number of tasks to run in parallel (if positive) or number of CPU cores to leave
/// unused (if negative).
/// </param>
/// <remarks>
/// If a negative number of threads is used, at least one thread will be always
/// be created, so specifying -2 on a single-core system will still occupy
/// the only core.
/// </remarks>
public TestWorker(int threadCount) : base(threadCount) { }
/// <summary>
/// Initializes a new parallel background worker that uses the specified name for
/// its worker threads.
/// </summary>
/// <param name="name">Name that will be assigned to the worker threads</param>
public TestWorker(string name) : base(name) { }
/// <summary>
/// Initializes a new parallel background worker that uses the specified name for
/// its worker threads and running the specified number of tasks in parallel.
/// </summary>
/// <param name="name">Name that will be assigned to the worker threads</param>
/// <param name="threadCount">
/// Number of tasks to run in parallel (if positive) or number of CPU cores to leave
/// unused (if negative).
/// </param>
/// <remarks>
/// If a negative number of threads is used, at least one thread will be always
/// be created, so specifying -2 on a single-core system will still occupy
/// the only core.
/// </remarks>
public TestWorker(string name, int threadCount) : base(name, threadCount) { }
/// <summary>Called in a thread to execute a single task</summary>
/// <param name="task">Task that should be executed</param>
/// <param name="cancellationToken">
/// Cancellation token through which the method can be signalled to cancel
/// </param>
protected override void Run(object task, CancellationToken cancellationToken) {
if(this.ThrowException) {
throw new Exception("Something went wrong");
}
if(this.WaitEvent != null) {
this.WaitEvent.WaitOne();
}
this.WasCancelled = cancellationToken.IsCancellationRequested;
if(this.Tasks != null) {
lock(this.Tasks) {
this.Tasks.Add(task);
}
}
}
/// <summary>Whether the work tasks should throw exceptions</summary>
public bool ThrowException;
/// <summary>Event that can be used to stop work tasks from completing</summary>
public ManualResetEvent WaitEvent;
/// <summary>Set by work tasks if they have been cancelled</summary>
public bool WasCancelled;
/// <summary>Work tasks that have reached execution</summary>
public ICollection<object> Tasks;
}
#endregion // class TestWorker
/// <summary>Verifies that the background worker has a default constructor</summary>
[Test]
public void CanBeDefaultConstructed() {
using(new TestWorker()) { }
}
/// <summary>
/// Verifies that a background worker can be constructed that uses a fixed number
/// of threads
/// </summary>
[Test]
public void CanUseFixedNumberOfThreads() {
using(new TestWorker(4)) { }
}
/// <summary>
/// Verifies that a background worker can be constructed that leaves free a fixed
/// number of CPU cores
/// </summary>
[Test]
public void CanPreserveFixedNumberOfCores() {
using(new TestWorker(-2)) { }
}
/// <summary>
/// Verifies that a background worker can be constructed using a specific name
/// for its worker threads
/// </summary>
[Test]
public void CanUseNamedThreads() {
using(new TestWorker("Test Task Thread")) { }
}
/// <summary>
/// Verifies that a background worker can be constructed that uses a fixed number
/// of threads using a specific name
/// </summary>
[Test]
public void CanUseFixedNumberOfNamedThreads() {
using(new TestWorker("Test Task Thread", 4)) { }
}
/// <summary>
/// Verifies that a background worker can be constructed that leaves free a fixed
/// number of CPU cores and uses a specific name for its worker threads.
/// </summary>
[Test]
public void CanPreserveFixedNumberOfCoresAndUseNamedThreads() {
using(new TestWorker("Test Task Thread", -2)) { }
}
/// <summary>
/// Verifies that exceptions happening inside the tasks are collected and re-thrown
/// in the Join() method.
/// </summary>
[Test]
public void ExceptionsAreReThrownInJoin() {
using(var testWorker = new TestWorker()) {
testWorker.ThrowException = true;
testWorker.AddTask(new object());
testWorker.AddTask(new object());
Assert.Throws<AggregateException>(
() => {
testWorker.Join();
}
);
try {
testWorker.Join();
Assert.Fail(
"Calling ParallelBackgroundWorker.Join() multiple times should re-throw " +
"exceptions multiple times"
);
}
catch(AggregateException aggregateException) {
Assert.AreEqual(2, aggregateException.InnerExceptions.Count);
}
}
}
/// <summary>
/// Verifies that tasks can be cancelled while they are running
/// </summary>
[Test]
public void TasksCanBeCancelled() {
using(var waitEvent = new ManualResetEvent(false)) {
using(var testWorker = new TestWorker()) {
testWorker.WaitEvent = waitEvent;
testWorker.AddTask(new object());
testWorker.CancelRunningTasks();
waitEvent.Set();
Assert.IsTrue(testWorker.Wait(1000));
Assert.IsTrue(testWorker.WasCancelled);
}
} // disposes waitEvent
}
/// <summary>Verifies that calling Join() waits for all queued tasks</summary>
[Test]
public void JoinWaitsForQueuedTasks() {
var tasks = new List<object>(100);
for(int index = 0; index < 100; ++index) {
tasks.Add(new object());
}
using(var waitEvent = new ManualResetEvent(false)) {
using(var testWorker = new TestWorker(2)) {
testWorker.WaitEvent = waitEvent;
testWorker.Tasks = new List<object>();
for(int index = 0; index < 100; ++index) {
testWorker.AddTask(tasks[index]);
}
CollectionAssert.IsEmpty(testWorker.Tasks);
waitEvent.Set();
testWorker.Join();
lock(testWorker.Tasks) {
CollectionAssert.AreEquivalent(tasks, testWorker.Tasks);
}
}
} // disposes waitEvent
}
}
} // namespace Nuclex.Support.Threading
#endif // UNITTEST
#endif // !NO_CONCURRENT_COLLECTIONS
#region Apache License 2.0
/*
Nuclex .NET Framework
Copyright (C) 2002-2024 Markus Ewald / Nuclex Development Labs
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#endregion // Apache License 2.0
#if !NO_CONCURRENT_COLLECTIONS
using System;
using System.Threading;
using System.Collections.Generic;
#if UNITTEST
using NUnit.Framework;
namespace Nuclex.Support.Threading {
/// <summary>Unit Test for the parallel background worker class</summary>
[TestFixture]
internal class ParallelBackgroundWorkerTest {
#region class TestWorker
/// <summary>Implementation of a background worker used for unit testing</summary>
#pragma warning disable 0618
private class TestWorker : ParallelBackgroundWorker<object> {
#pragma warning restore 0618
/// <summary>Initializes a new parallel background worker with unlimited threads</summary>
public TestWorker() : base() { }
/// <summary>
/// Initializes a new parallel background worker running the specified number
/// of tasks in parallel
/// </summary>
/// <param name="threadCount">
/// Number of tasks to run in parallel (if positive) or number of CPU cores to leave
/// unused (if negative).
/// </param>
/// <remarks>
/// If a negative number of threads is used, at least one thread will be always
/// be created, so specifying -2 on a single-core system will still occupy
/// the only core.
/// </remarks>
public TestWorker(int threadCount) : base(threadCount) { }
/// <summary>
/// Initializes a new parallel background worker that uses the specified name for
/// its worker threads.
/// </summary>
/// <param name="name">Name that will be assigned to the worker threads</param>
public TestWorker(string name) : base(name) { }
/// <summary>
/// Initializes a new parallel background worker that uses the specified name for
/// its worker threads and running the specified number of tasks in parallel.
/// </summary>
/// <param name="name">Name that will be assigned to the worker threads</param>
/// <param name="threadCount">
/// Number of tasks to run in parallel (if positive) or number of CPU cores to leave
/// unused (if negative).
/// </param>
/// <remarks>
/// If a negative number of threads is used, at least one thread will be always
/// be created, so specifying -2 on a single-core system will still occupy
/// the only core.
/// </remarks>
public TestWorker(string name, int threadCount) : base(name, threadCount) { }
/// <summary>Called in a thread to execute a single task</summary>
/// <param name="task">Task that should be executed</param>
/// <param name="cancellationToken">
/// Cancellation token through which the method can be signalled to cancel
/// </param>
protected override void Run(object task, CancellationToken cancellationToken) {
if(this.ThrowException) {
throw new Exception("Something went wrong");
}
if(this.WaitEvent != null) {
this.WaitEvent.WaitOne();
}
this.WasCancelled = cancellationToken.IsCancellationRequested;
if(this.Tasks != null) {
lock(this.Tasks) {
this.Tasks.Add(task);
}
}
}
/// <summary>Whether the work tasks should throw exceptions</summary>
public bool ThrowException;
/// <summary>Event that can be used to stop work tasks from completing</summary>
public ManualResetEvent WaitEvent;
/// <summary>Set by work tasks if they have been cancelled</summary>
public bool WasCancelled;
/// <summary>Work tasks that have reached execution</summary>
public ICollection<object> Tasks;
}
#endregion // class TestWorker
/// <summary>Verifies that the background worker has a default constructor</summary>
[Test]
public void CanBeDefaultConstructed() {
using(new TestWorker()) { }
}
/// <summary>
/// Verifies that a background worker can be constructed that uses a fixed number
/// of threads
/// </summary>
[Test]
public void CanUseFixedNumberOfThreads() {
using(new TestWorker(4)) { }
}
/// <summary>
/// Verifies that a background worker can be constructed that leaves free a fixed
/// number of CPU cores
/// </summary>
[Test]
public void CanPreserveFixedNumberOfCores() {
using(new TestWorker(-2)) { }
}
/// <summary>
/// Verifies that a background worker can be constructed using a specific name
/// for its worker threads
/// </summary>
[Test]
public void CanUseNamedThreads() {
using(new TestWorker("Test Task Thread")) { }
}
/// <summary>
/// Verifies that a background worker can be constructed that uses a fixed number
/// of threads using a specific name
/// </summary>
[Test]
public void CanUseFixedNumberOfNamedThreads() {
using(new TestWorker("Test Task Thread", 4)) { }
}
/// <summary>
/// Verifies that a background worker can be constructed that leaves free a fixed
/// number of CPU cores and uses a specific name for its worker threads.
/// </summary>
[Test]
public void CanPreserveFixedNumberOfCoresAndUseNamedThreads() {
using(new TestWorker("Test Task Thread", -2)) { }
}
/// <summary>
/// Verifies that exceptions happening inside the tasks are collected and re-thrown
/// in the Join() method.
/// </summary>
[Test]
public void ExceptionsAreReThrownInJoin() {
using(var testWorker = new TestWorker()) {
testWorker.ThrowException = true;
testWorker.AddTask(new object());
testWorker.AddTask(new object());
Assert.Throws<AggregateException>(
() => {
testWorker.Join();
}
);
try {
testWorker.Join();
Assert.Fail(
"Calling ParallelBackgroundWorker.Join() multiple times should re-throw " +
"exceptions multiple times"
);
}
catch(AggregateException aggregateException) {
Assert.AreEqual(2, aggregateException.InnerExceptions.Count);
}
}
}
/// <summary>
/// Verifies that tasks can be cancelled while they are running
/// </summary>
[Test]
public void TasksCanBeCancelled() {
using(var waitEvent = new ManualResetEvent(false)) {
using(var testWorker = new TestWorker()) {
testWorker.WaitEvent = waitEvent;
testWorker.AddTask(new object());
testWorker.CancelRunningTasks();
waitEvent.Set();
Assert.IsTrue(testWorker.Wait(1000));
Assert.IsTrue(testWorker.WasCancelled);
}
} // disposes waitEvent
}
/// <summary>Verifies that calling Join() waits for all queued tasks</summary>
[Test]
public void JoinWaitsForQueuedTasks() {
var tasks = new List<object>(100);
for(int index = 0; index < 100; ++index) {
tasks.Add(new object());
}
using(var waitEvent = new ManualResetEvent(false)) {
using(var testWorker = new TestWorker(2)) {
testWorker.WaitEvent = waitEvent;
testWorker.Tasks = new List<object>();
for(int index = 0; index < 100; ++index) {
testWorker.AddTask(tasks[index]);
}
CollectionAssert.IsEmpty(testWorker.Tasks);
waitEvent.Set();
testWorker.Join();
lock(testWorker.Tasks) {
CollectionAssert.AreEquivalent(tasks, testWorker.Tasks);
}
}
} // disposes waitEvent
}
}
} // namespace Nuclex.Support.Threading
#endif // UNITTEST
#endif // !NO_CONCURRENT_COLLECTIONS

View file

@ -1,344 +1,343 @@
#region CPL License
/*
Nuclex Framework
Copyright (C) 2002-2017 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#endregion
#if !NO_CONCURRENT_COLLECTIONS
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Nuclex.Support.Threading {
/// <summary>Processes tasks in parallel using many threads</summary>
/// <typeparam name="TTask">Type of tasks the class will process</typeparam>
[Obsolete("Use ThreadRunner instead; UI view models should inherit ThreadedViewModel")]
public abstract class ParallelBackgroundWorker<TTask> : IDisposable {
/// <summary>Number of CPU cores available on the system</summary>
public static readonly int ProcessorCount = Environment.ProcessorCount;
/// <summary>
/// Timeout after which Dispose() will stop waiting for unfinished tasks and
/// free resources anyway
/// </summary>
private static readonly int DefaultDisposeTimeoutMilliseconds = 1500; // milliseconds
/// <summary>Initializes a new parallel background worker with unlimited threads</summary>
public ParallelBackgroundWorker() : this(int.MaxValue) { }
/// <summary>
/// Initializes a new parallel background worker running the specified number
/// of tasks in parallel
/// </summary>
/// <param name="threadCount">
/// Number of tasks to run in parallel (if positive) or number of CPU cores to leave
/// unused (if negative).
/// </param>
/// <remarks>
/// If a negative number of threads is used, at least one thread will be always
/// be created, so specifying -2 on a single-core system will still occupy
/// the only core.
/// </remarks>
public ParallelBackgroundWorker(int threadCount) {
if(threadCount > 0) {
this.threadCount = threadCount;
} else {
this.threadCount = Math.Max(1, ProcessorCount + threadCount);
}
this.queueSynchronizationRoot = new object();
this.runQueuedTasksInThreadDelegate = new Action(runQueuedTasksInThread);
this.tasks = new Queue<TTask>();
this.threadTerminatedEvent = new AutoResetEvent(false);
this.cancellationTokenSource = new CancellationTokenSource();
this.exceptions = new ConcurrentBag<Exception>();
}
/// <summary>
/// Initializes a new parallel background worker that uses the specified name for
/// its worker threads.
/// </summary>
/// <param name="name">Name that will be assigned to the worker threads</param>
public ParallelBackgroundWorker(string name) :
this(int.MaxValue) {
this.threadName = name;
}
/// <summary>
/// Initializes a new parallel background worker that uses the specified name for
/// its worker threads and running the specified number of tasks in parallel.
/// </summary>
/// <param name="name">Name that will be assigned to the worker threads</param>
/// <param name="threadCount">
/// Number of tasks to run in parallel (if positive) or number of CPU cores to leave
/// unused (if negative).
/// </param>
/// <remarks>
/// If a negative number of threads is used, at least one thread will be always
/// be created, so specifying -2 on a single-core system will still occupy
/// the only core.
/// </remarks>
public ParallelBackgroundWorker(string name, int threadCount) :
this(threadCount) {
this.threadName = name;
}
/// <summary>Immediately releases all resources owned by the instance</summary>
public void Dispose() {
if(this.threadTerminatedEvent != null) {
CancelPendingTasks();
CancelRunningTasks();
Wait(DefaultDisposeTimeoutMilliseconds);
this.threadTerminatedEvent.Dispose();
this.threadTerminatedEvent = null;
}
lock(this.queueSynchronizationRoot) {
if(this.cancellationTokenSource != null) {
this.cancellationTokenSource.Dispose();
this.cancellationTokenSource = null;
}
}
}
/// <summary>Adds a task for processing by the background worker threads</summary>
/// <param name="task">Task that will be processed in the background</param>
public void AddTask(TTask task) {
if(this.cancellationTokenSource.IsCancellationRequested) {
return;
}
bool needNewThread;
lock(this.queueSynchronizationRoot) {
this.tasks.Enqueue(task);
needNewThread = (this.runningThreadCount < this.threadCount);
if(needNewThread) {
++this.runningThreadCount;
}
}
if(needNewThread) {
Task newThread = new Task(
this.runQueuedTasksInThreadDelegate,
// this.cancellationTokenSource.Token, // DO NOT PASS THIS!
// Passing the cancellation token makes tasks that have been queued but
// not started yet cease to execute at all - meaning our runningThreadCount
// goes out of sync and Dispose() / Wait() / Join() sit around endlessly!
TaskCreationOptions.LongRunning
);
newThread.Start();
}
}
/// <summary>Cancels all tasks that are currently executing</summary>
/// <remarks>
/// It is valid to call this method after Dispose()
/// </remarks>
public void CancelRunningTasks() {
lock(this.queueSynchronizationRoot) {
if(this.cancellationTokenSource != null) {
this.cancellationTokenSource.Cancel();
}
}
}
/// <summary>Cancels all queued tasks waiting to be executed</summary>
/// <remarks>
/// It is valid to call this method after Dispose()
/// </remarks>
public void CancelPendingTasks() {
lock(this.queueSynchronizationRoot) {
this.tasks.Clear();
}
}
/// <summary>
/// Waits until all executing and queued tasks have been processed and throws an
/// exception if any errors have occurred
/// </summary>
public void Join() {
while(Thread.VolatileRead(ref this.runningThreadCount) > 0) {
this.threadTerminatedEvent.WaitOne();
}
if(this.exceptions.Count > 0) {
throw new AggregateException(this.exceptions);
}
}
/// <summary>
/// Waits until all executing and queued tasks have been processed or
/// the timeout is reached
/// </summary>
/// <param name="timeoutMilliseconds">Milliseconds after which the wait times out</param>
/// <returns>
/// True if all tasks have been processed, false if the timeout was reached
/// </returns>
public bool Wait(int timeoutMilliseconds) {
// Wait until the task queue has become empty
while(queuedTaskCount > 0) {
if(this.threadTerminatedEvent.WaitOne(timeoutMilliseconds) == false) {
return false;
}
}
// Now wait until all running tasks have finished
while(Thread.VolatileRead(ref this.runningThreadCount) > 0) {
if(this.threadTerminatedEvent.WaitOne(timeoutMilliseconds) == false) {
return false;
}
}
return true;
}
/// <summary>Called in a thread to execute a single task</summary>
/// <param name="task">Task that should be executed</param>
/// <param name="cancellationToken">
/// Cancellation token through which the method can be signalled to cancel
/// </param>
protected abstract void Run(TTask task, CancellationToken cancellationToken);
/// <summary>
/// Runs queued tasks of the parallel background worker until the queue is empty
/// </summary>
private void runQueuedTasksInThread() {
string previousThreadName = null;
if(!string.IsNullOrEmpty(this.threadName)) {
previousThreadName = Thread.CurrentThread.Name;
Thread.CurrentThread.Name = this.threadName;
}
try {
for(;;) {
TTask task;
lock(this.queueSynchronizationRoot) {
if(this.tasks.Count == 0) {
--this.runningThreadCount;
break;
}
task = this.tasks.Dequeue();
}
try {
Run(task, this.cancellationTokenSource.Token);
}
catch(Exception exception) {
this.exceptions.Add(exception);
}
}
this.threadTerminatedEvent.Set();
}
finally {
if(!string.IsNullOrEmpty(this.threadName)) {
Thread.CurrentThread.Name = previousThreadName;
}
}
}
/// <summary>Number of task still waiting to be executed</summary>
private int queuedTaskCount {
get {
lock(this.queueSynchronizationRoot) {
return this.tasks.Count;
}
}
}
/// <summary>
/// Name that will be assigned to the worker threads while they're processing tasks
/// for the parallel background worker
/// </summary>
private string threadName;
/// <summary>Number of threads to use simultaneously</summary>
private int threadCount;
/// <summary>Synchronization root used to access the task queue and thread list</summary>
/// <remarks>
/// <para>
/// Both lists are intentionally using a shared synchronization root because this will
/// guarantee that there's no race condition between adding work and a thread finishing:
/// </para>
/// <para>
/// Main thread
/// <code>
/// lock(this.queueSynchronizationRoot) {
/// this.tasks.Add(newTask);
///
/// if(this.runningThreads.Count &lt; maximumThreadCount) {
/// startAdditionalTaskThread();
/// }
/// }
/// </code>
/// </para>
/// <para>
/// Task thread
/// <code>
/// TTask myTask;
/// lock(this.queueSynchronizationRoot) {
/// this.tasks.TryGet(out myTask);
///
/// // RACE CONDITION WITHOUT LOCK: if this wasn't inside the same lock as the task
/// // checking, the main thread might check at this point, see that there are already
/// // enough threads running!
/// if(myTask == null) {
/// this.runningThreads.Remove(thisThread);
/// return;
/// }
/// }
/// </code>
/// </para>
/// <para>
/// The race condition mentioned above could completely stall the background worker by
/// leaving tasks in the queue but no threads active to execute them, so it is vital to
/// use this method of synchronization!
/// </para>
/// </remarks>
private object queueSynchronizationRoot;
/// <summary>Delegate for the runQueuedTasksInThread() method</summary>
private Action runQueuedTasksInThreadDelegate;
/// <summary>Tasks remaining to be processed</summary>
private Queue<TTask> tasks;
/// <summary>Threads that are currently executing tasks</summary>
private int runningThreadCount;
/// <summary>Exceptions that have occurred while executing tasks</summary>
private ConcurrentBag<Exception> exceptions;
/// <summary>Event that is triggered whenever a task gets completed</summary>
private AutoResetEvent threadTerminatedEvent;
/// <summary>
/// Provides a cancellation token that can be used to signal a thread to terminate
/// </summary>
private CancellationTokenSource cancellationTokenSource;
}
} // namespace Nuclex.Support.Threading
#endif // !NO_CONCURRENT_COLLECTIONS
#region Apache License 2.0
/*
Nuclex .NET Framework
Copyright (C) 2002-2024 Markus Ewald / Nuclex Development Labs
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#endregion // Apache License 2.0
#if !NO_CONCURRENT_COLLECTIONS
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Nuclex.Support.Threading {
/// <summary>Processes tasks in parallel using many threads</summary>
/// <typeparam name="TTask">Type of tasks the class will process</typeparam>
[Obsolete("Use ThreadRunner instead; UI view models should inherit ThreadedViewModel")]
public abstract class ParallelBackgroundWorker<TTask> : IDisposable {
/// <summary>Number of CPU cores available on the system</summary>
public static readonly int ProcessorCount = Environment.ProcessorCount;
/// <summary>
/// Timeout after which Dispose() will stop waiting for unfinished tasks and
/// free resources anyway
/// </summary>
private static readonly int DefaultDisposeTimeoutMilliseconds = 1500; // milliseconds
/// <summary>Initializes a new parallel background worker with unlimited threads</summary>
public ParallelBackgroundWorker() : this(int.MaxValue) { }
/// <summary>
/// Initializes a new parallel background worker running the specified number
/// of tasks in parallel
/// </summary>
/// <param name="threadCount">
/// Number of tasks to run in parallel (if positive) or number of CPU cores to leave
/// unused (if negative).
/// </param>
/// <remarks>
/// If a negative number of threads is used, at least one thread will be always
/// be created, so specifying -2 on a single-core system will still occupy
/// the only core.
/// </remarks>
public ParallelBackgroundWorker(int threadCount) {
if(threadCount > 0) {
this.threadCount = threadCount;
} else {
this.threadCount = Math.Max(1, ProcessorCount + threadCount);
}
this.queueSynchronizationRoot = new object();
this.runQueuedTasksInThreadDelegate = new Action(runQueuedTasksInThread);
this.tasks = new Queue<TTask>();
this.threadTerminatedEvent = new AutoResetEvent(false);
this.cancellationTokenSource = new CancellationTokenSource();
this.exceptions = new ConcurrentBag<Exception>();
}
/// <summary>
/// Initializes a new parallel background worker that uses the specified name for
/// its worker threads.
/// </summary>
/// <param name="name">Name that will be assigned to the worker threads</param>
public ParallelBackgroundWorker(string name) :
this(int.MaxValue) {
this.threadName = name;
}
/// <summary>
/// Initializes a new parallel background worker that uses the specified name for
/// its worker threads and running the specified number of tasks in parallel.
/// </summary>
/// <param name="name">Name that will be assigned to the worker threads</param>
/// <param name="threadCount">
/// Number of tasks to run in parallel (if positive) or number of CPU cores to leave
/// unused (if negative).
/// </param>
/// <remarks>
/// If a negative number of threads is used, at least one thread will be always
/// be created, so specifying -2 on a single-core system will still occupy
/// the only core.
/// </remarks>
public ParallelBackgroundWorker(string name, int threadCount) :
this(threadCount) {
this.threadName = name;
}
/// <summary>Immediately releases all resources owned by the instance</summary>
public void Dispose() {
if(this.threadTerminatedEvent != null) {
CancelPendingTasks();
CancelRunningTasks();
Wait(DefaultDisposeTimeoutMilliseconds);
this.threadTerminatedEvent.Dispose();
this.threadTerminatedEvent = null;
}
lock(this.queueSynchronizationRoot) {
if(this.cancellationTokenSource != null) {
this.cancellationTokenSource.Dispose();
this.cancellationTokenSource = null;
}
}
}
/// <summary>Adds a task for processing by the background worker threads</summary>
/// <param name="task">Task that will be processed in the background</param>
public void AddTask(TTask task) {
if(this.cancellationTokenSource.IsCancellationRequested) {
return;
}
bool needNewThread;
lock(this.queueSynchronizationRoot) {
this.tasks.Enqueue(task);
needNewThread = (this.runningThreadCount < this.threadCount);
if(needNewThread) {
++this.runningThreadCount;
}
}
if(needNewThread) {
Task newThread = new Task(
this.runQueuedTasksInThreadDelegate,
// this.cancellationTokenSource.Token, // DO NOT PASS THIS!
// Passing the cancellation token makes tasks that have been queued but
// not started yet cease to execute at all - meaning our runningThreadCount
// goes out of sync and Dispose() / Wait() / Join() sit around endlessly!
TaskCreationOptions.LongRunning
);
newThread.Start();
}
}
/// <summary>Cancels all tasks that are currently executing</summary>
/// <remarks>
/// It is valid to call this method after Dispose()
/// </remarks>
public void CancelRunningTasks() {
lock(this.queueSynchronizationRoot) {
if(this.cancellationTokenSource != null) {
this.cancellationTokenSource.Cancel();
}
}
}
/// <summary>Cancels all queued tasks waiting to be executed</summary>
/// <remarks>
/// It is valid to call this method after Dispose()
/// </remarks>
public void CancelPendingTasks() {
lock(this.queueSynchronizationRoot) {
this.tasks.Clear();
}
}
/// <summary>
/// Waits until all executing and queued tasks have been processed and throws an
/// exception if any errors have occurred
/// </summary>
public void Join() {
while(Thread.VolatileRead(ref this.runningThreadCount) > 0) {
this.threadTerminatedEvent.WaitOne();
}
if(this.exceptions.Count > 0) {
throw new AggregateException(this.exceptions);
}
}
/// <summary>
/// Waits until all executing and queued tasks have been processed or
/// the timeout is reached
/// </summary>
/// <param name="timeoutMilliseconds">Milliseconds after which the wait times out</param>
/// <returns>
/// True if all tasks have been processed, false if the timeout was reached
/// </returns>
public bool Wait(int timeoutMilliseconds) {
// Wait until the task queue has become empty
while(queuedTaskCount > 0) {
if(this.threadTerminatedEvent.WaitOne(timeoutMilliseconds) == false) {
return false;
}
}
// Now wait until all running tasks have finished
while(Thread.VolatileRead(ref this.runningThreadCount) > 0) {
if(this.threadTerminatedEvent.WaitOne(timeoutMilliseconds) == false) {
return false;
}
}
return true;
}
/// <summary>Called in a thread to execute a single task</summary>
/// <param name="task">Task that should be executed</param>
/// <param name="cancellationToken">
/// Cancellation token through which the method can be signalled to cancel
/// </param>
protected abstract void Run(TTask task, CancellationToken cancellationToken);
/// <summary>
/// Runs queued tasks of the parallel background worker until the queue is empty
/// </summary>
private void runQueuedTasksInThread() {
string previousThreadName = null;
if(!string.IsNullOrEmpty(this.threadName)) {
previousThreadName = Thread.CurrentThread.Name;
Thread.CurrentThread.Name = this.threadName;
}
try {
for(;;) {
TTask task;
lock(this.queueSynchronizationRoot) {
if(this.tasks.Count == 0) {
--this.runningThreadCount;
break;
}
task = this.tasks.Dequeue();
}
try {
Run(task, this.cancellationTokenSource.Token);
}
catch(Exception exception) {
this.exceptions.Add(exception);
}
}
this.threadTerminatedEvent.Set();
}
finally {
if(!string.IsNullOrEmpty(this.threadName)) {
Thread.CurrentThread.Name = previousThreadName;
}
}
}
/// <summary>Number of task still waiting to be executed</summary>
private int queuedTaskCount {
get {
lock(this.queueSynchronizationRoot) {
return this.tasks.Count;
}
}
}
/// <summary>
/// Name that will be assigned to the worker threads while they're processing tasks
/// for the parallel background worker
/// </summary>
private string threadName;
/// <summary>Number of threads to use simultaneously</summary>
private int threadCount;
/// <summary>Synchronization root used to access the task queue and thread list</summary>
/// <remarks>
/// <para>
/// Both lists are intentionally using a shared synchronization root because this will
/// guarantee that there's no race condition between adding work and a thread finishing:
/// </para>
/// <para>
/// Main thread
/// <code>
/// lock(this.queueSynchronizationRoot) {
/// this.tasks.Add(newTask);
///
/// if(this.runningThreads.Count &lt; maximumThreadCount) {
/// startAdditionalTaskThread();
/// }
/// }
/// </code>
/// </para>
/// <para>
/// Task thread
/// <code>
/// TTask myTask;
/// lock(this.queueSynchronizationRoot) {
/// this.tasks.TryGet(out myTask);
///
/// // RACE CONDITION WITHOUT LOCK: if this wasn't inside the same lock as the task
/// // checking, the main thread might check at this point, see that there are already
/// // enough threads running!
/// if(myTask == null) {
/// this.runningThreads.Remove(thisThread);
/// return;
/// }
/// }
/// </code>
/// </para>
/// <para>
/// The race condition mentioned above could completely stall the background worker by
/// leaving tasks in the queue but no threads active to execute them, so it is vital to
/// use this method of synchronization!
/// </para>
/// </remarks>
private object queueSynchronizationRoot;
/// <summary>Delegate for the runQueuedTasksInThread() method</summary>
private Action runQueuedTasksInThreadDelegate;
/// <summary>Tasks remaining to be processed</summary>
private Queue<TTask> tasks;
/// <summary>Threads that are currently executing tasks</summary>
private int runningThreadCount;
/// <summary>Exceptions that have occurred while executing tasks</summary>
private ConcurrentBag<Exception> exceptions;
/// <summary>Event that is triggered whenever a task gets completed</summary>
private AutoResetEvent threadTerminatedEvent;
/// <summary>
/// Provides a cancellation token that can be used to signal a thread to terminate
/// </summary>
private CancellationTokenSource cancellationTokenSource;
}
} // namespace Nuclex.Support.Threading
#endif // !NO_CONCURRENT_COLLECTIONS

View file

@ -1,460 +1,459 @@
#region CPL License
/*
Nuclex Framework
Copyright (C) 2002-2017 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#endregion
#if !NO_CONCURRENT_COLLECTIONS
using System;
using System.Threading;
using System.Collections.Generic;
#if UNITTEST
using NUnit.Framework;
namespace Nuclex.Support.Threading {
/// <summary>Unit Test for the thread runner class</summary>
[TestFixture]
internal class ThreadRunnerTest {
#region class DefaultDisposeRunner
/// <summary>Implementation of a thread runner to check default dispose behavior</summary>
private class DefaultDisposeRunner : ThreadRunner {
/// <summary>Reports an error</summary>
/// <param name="exception">Error that will be reported</param>
protected override void ReportError(Exception exception) { }
/// <summary>Called when the status of the busy flag changes</summary>
protected override void BusyChanged() { }
}
#endregion // class DefaultDisposeRunner
#region class DummyRunner
/// <summary>Implementation of a thread runner used for unit testing</summary>
private class DummyRunner : ThreadRunner {
/// <summary>Initializes a new dummy thread runner</summary>
public DummyRunner() : base() {
this.completionGate = new ManualResetEvent(initialState: false);
}
/// <summary>Immediately frees all resources used by the instance</summary>
public new void Dispose() {
base.Dispose(100);
if(this.completionGate != null) {
this.completionGate.Dispose();
this.completionGate = null;
}
}
/// <summary>Waits for the task for complete (all of 100 milliseconds)</summary>
/// <returns>True if the task completed, false if it continues running</returns>
public bool WaitForCompletion() {
return this.completionGate.WaitOne(100);
}
/// <summary>How often the status of the busy flag has changed</summary>
public int BusyChangeCount {
get { return this.busyChangeCount; }
}
/// <summary>Error that has been reported the last time a task was run</summary>
public Exception ReportedError {
get { return this.reportedError; }
}
/// <summary>Reports an error</summary>
/// <param name="exception">Error that will be reported</param>
protected override void ReportError(Exception exception) {
this.reportedError = exception;
}
/// <summary>Called when the status of the busy flag changes</summary>
protected override void BusyChanged() {
++busyChangeCount;
if((busyChangeCount >= 2) && (base.IsBusy == false)) {
this.completionGate.Set();
}
}
/// <summary>Last error that was reported in the thread</summary>
private Exception reportedError;
/// <summary>Number of times the busy state of the runner has changed</summary>
private int busyChangeCount;
/// <summary>Triggered when the busy event has performed a double flank</summary>
private ManualResetEvent completionGate;
}
#endregion // class DummyRunner
#region class DummyTask
/// <summary>Dummy task that can be executed by a thread runner</summary>
private class DummyTask : IDisposable {
/// <summary>Initializes a new dummy task</summary>
/// <param name="delayMilliseconds">How long the task shoudl take to execute</param>
public DummyTask(int delayMilliseconds) {
this.startGate = new ManualResetEvent(initialState: false);
this.delayMilliseconds = delayMilliseconds;
}
/// <summary>Immediately releases all resources owned by the instance</summary>
public void Dispose() {
if(this.startGate != null) {
this.startGate.Dispose();
this.startGate = null;
}
}
/// <summary>Waits for the task to start (all of 100 milliseconds)</summary>
/// <returns>True if the start started, false if it didn't</returns>
public bool WaitForStart() {
return this.startGate.WaitOne(100);
}
/// <summary>Sets the task up to fail with the specified error</summary>
/// <param name="error">Error the task will fail with</param>
public void FailWith(Exception error) {
this.error = error;
}
/// <summary>Runs the task with no arguments</summary>
public void Run() {
this.startGate.Set();
++this.executionCount;
Thread.Sleep(this.delayMilliseconds);
if(this.error != null) {
throw this.error;
}
}
/// <summary>Runs the task with one argument</summary>
/// <param name="firstArgument">First argument passed from the runner</param>
public void Run(float firstArgument) {
this.startGate.Set();
++this.executionCount;
this.firstArgument = firstArgument;
Thread.Sleep(this.delayMilliseconds);
if(this.error != null) {
throw this.error;
}
}
/// <summary>Runs the task with two argument</summary>
/// <param name="firstArgument">First argument passed from the runner</param>
/// <param name="secondArgument">Second argument passed from the runner</param>
public void Run(float firstArgument, string secondArgument) {
this.startGate.Set();
++this.executionCount;
this.firstArgument = firstArgument;
this.secondArgument = secondArgument;
Thread.Sleep(this.delayMilliseconds);
if(this.error != null) {
throw this.error;
}
}
/// <summary>Runs the task with no arguments</summary>
/// <param name="cancellationToken">Token by which cancellation can be signalled</param>
public void RunCancellable(CancellationToken cancellationToken) {
this.startGate.Set();
++this.executionCount;
if(delayMilliseconds == 0) {
Thread.Sleep(0);
} else {
if(cancellationToken.WaitHandle.WaitOne(delayMilliseconds)) {
this.wasCancelled = cancellationToken.IsCancellationRequested;
cancellationToken.ThrowIfCancellationRequested();
}
}
if(this.error != null) {
throw this.error;
}
}
/// <summary>Runs the task with one argument</summary>
/// <param name="firstArgument">First argument passed from the runner</param>
/// <param name="cancellationToken">Token by which cancellation can be signalled</param>
public void RunCancellable(float firstArgument, CancellationToken cancellationToken) {
this.startGate.Set();
++this.executionCount;
this.firstArgument = firstArgument;
if(delayMilliseconds == 0) {
Thread.Sleep(0);
} else {
if(cancellationToken.WaitHandle.WaitOne(delayMilliseconds)) {
this.wasCancelled = cancellationToken.IsCancellationRequested;
cancellationToken.ThrowIfCancellationRequested();
}
}
if(this.error != null) {
throw this.error;
}
}
/// <summary>Runs the task with two argument</summary>
/// <param name="firstArgument">First argument passed from the runner</param>
/// <param name="secondArgument">Second argument passed from the runner</param>
/// <param name="cancellationToken">Token by which cancellation can be signalled</param>
public void RunCancellable(
float firstArgument, string secondArgument, CancellationToken cancellationToken
) {
this.startGate.Set();
++this.executionCount;
this.firstArgument = firstArgument;
this.secondArgument = secondArgument;
if(delayMilliseconds == 0) {
Thread.Sleep(0);
} else {
if(cancellationToken.WaitHandle.WaitOne(delayMilliseconds)) {
this.wasCancelled = cancellationToken.IsCancellationRequested;
cancellationToken.ThrowIfCancellationRequested();
}
}
if(this.error != null) {
throw this.error;
}
}
/// <summary>How many times the task was run</summary>
public int ExecutionCount {
get { return this.executionCount; }
}
/// <summary>Whether the task was cancelled by the runner itself</summary>
public bool WasCancelled {
get { return this.wasCancelled; }
}
/// <summary>What the first argument was during the last call</summary>
public float FirstArgument {
get { return this.firstArgument; }
}
/// <summary>What the second argument was during the last call</summary>
public string SecondArgument {
get { return this.secondArgument; }
}
/// <summary>Last error that was reported in the thread</summary>
private Exception error;
/// <summary>Triggered when the task has started</summary>
private ManualResetEvent startGate;
/// <summary>How long the task should take to execute in milliseconds</summary>
private int delayMilliseconds;
/// <summary>How many times the task has been executed</summary>
private volatile int executionCount;
/// <summary>Whether the task has been cancelled</summary>
private volatile bool wasCancelled;
/// <summary>First argument that was passed to the task</summary>
private volatile float firstArgument;
/// <summary>Second argument that was passed to the task</summary>
private volatile string secondArgument;
}
#endregion // class DummyRunner
/// <summary>Verifies that the thread runner has a default constructor</summary>
[Test]
public void CanBeDefaultConstructed() {
using(new DummyRunner()) { }
}
/// <summary>Checks that the runner sets and unsets its busy flag</summary>
[Test]
public void BusyFlagIsToggled() {
using(var runner = new DummyRunner()) {
int busyFlagChangeCount = runner.BusyChangeCount;
Assert.IsFalse(runner.IsBusy);
runner.RunInBackground((Action)delegate() { });
Assert.IsTrue(runner.WaitForCompletion());
Assert.GreaterOrEqual(busyFlagChangeCount + 2, runner.BusyChangeCount);
Assert.IsFalse(runner.IsBusy);
}
}
/// <summary>Lets the thread runner run a simple task in the background</summary>
[Test]
public void CanRunSimpleTaskInBackground() {
using(var task = new DummyTask(0)) {
using(var runner = new DummyRunner()) {
runner.RunInBackground(new Action(task.Run));
Assert.IsTrue(runner.WaitForCompletion());
Assert.IsNull(runner.ReportedError);
}
Assert.AreEqual(1, task.ExecutionCount);
Assert.IsFalse(task.WasCancelled);
}
}
/// <summary>
/// Checks that the thread runner is able to pass a single argument to a task
/// </summary>
[Test]
public void CanPassSingleArgumentToSimpleTask() {
using(var task = new DummyTask(0)) {
using(var runner = new DummyRunner()) {
runner.RunInBackground(new Action<float>(task.Run), 12.43f);
Assert.IsTrue(runner.WaitForCompletion());
Assert.IsNull(runner.ReportedError);
}
Assert.AreEqual(1, task.ExecutionCount);
Assert.AreEqual(12.43f, task.FirstArgument);
Assert.IsFalse(task.WasCancelled);
}
}
/// <summary>
/// Checks that the thread runner is able to pass two arguments to a task
/// </summary>
[Test]
public void CanPassTwoArgumentsToSimpleTask() {
using(var task = new DummyTask(0)) {
using(var runner = new DummyRunner()) {
runner.RunInBackground(new Action<float, string>(task.Run), 98.67f, "Hello");
Assert.IsTrue(runner.WaitForCompletion());
Assert.IsNull(runner.ReportedError);
}
Assert.AreEqual(1, task.ExecutionCount);
Assert.AreEqual(98.67f, task.FirstArgument);
Assert.AreEqual("Hello", task.SecondArgument);
Assert.IsFalse(task.WasCancelled);
}
}
/// <summary>
/// Verifies that an error happening in a simple task is reported correctly
/// </summary>
[Test]
public void SimpleTaskErrorIsReported() {
using(var task = new DummyTask(0)) {
var error = new InvalidOperationException("Mooh!");
task.FailWith(error);
using(var runner = new DummyRunner()) {
runner.RunInBackground(new Action(task.Run));
Assert.IsTrue(runner.WaitForCompletion());
Assert.AreSame(error, runner.ReportedError);
}
Assert.AreEqual(1, task.ExecutionCount);
Assert.IsFalse(task.WasCancelled);
}
}
/// <summary>Lets the thread runner run a cancellable task in the background</summary>
[Test]
public void CanRunCancellableTaskInBackground() {
using(var task = new DummyTask(100)) {
using(var runner = new DummyRunner()) {
runner.RunInBackground(new CancellableAction(task.RunCancellable));
Assert.IsTrue(task.WaitForStart());
runner.CancelAllBackgroundOperations();
Assert.IsTrue(runner.WaitForCompletion());
Assert.IsNull(runner.ReportedError);
}
Assert.AreEqual(1, task.ExecutionCount);
Assert.IsTrue(task.WasCancelled);
}
}
/// <summary>
/// Checks that the thread runner is able to pass a single argument to a task
/// that can be cancelled
/// </summary>
[Test]
public void CanPassSingleArgumentToCancellableTask() {
using(var task = new DummyTask(100)) {
using(var runner = new DummyRunner()) {
runner.RunInBackground(new CancellableAction<float>(task.RunCancellable), 12.43f);
Assert.IsTrue(task.WaitForStart());
runner.CancelAllBackgroundOperations();
Assert.IsTrue(runner.WaitForCompletion());
Assert.IsNull(runner.ReportedError);
}
Assert.AreEqual(1, task.ExecutionCount);
Assert.AreEqual(12.43f, task.FirstArgument);
Assert.IsTrue(task.WasCancelled);
}
}
/// <summary>
/// Checks that the thread runner is able to pass two arguments to a task
/// that can be cancelled
/// </summary>
[Test]
public void CanPassTwoArgumentsToCancellableTask() {
using(var task = new DummyTask(100)) {
using(var runner = new DummyRunner()) {
runner.RunInBackground(
new CancellableAction<float, string>(task.RunCancellable), 98.67f, "Hello"
);
Assert.IsTrue(task.WaitForStart());
runner.CancelAllBackgroundOperations();
Assert.IsTrue(runner.WaitForCompletion());
Assert.IsNull(runner.ReportedError);
}
Assert.AreEqual(1, task.ExecutionCount);
Assert.AreEqual(98.67f, task.FirstArgument);
Assert.AreEqual("Hello", task.SecondArgument);
Assert.IsTrue(task.WasCancelled);
}
}
}
} // namespace Nuclex.Support.Threading
#endif // UNITTEST
#endif // !NO_CONCURRENT_COLLECTIONS
#region Apache License 2.0
/*
Nuclex .NET Framework
Copyright (C) 2002-2024 Markus Ewald / Nuclex Development Labs
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#endregion // Apache License 2.0
#if !NO_CONCURRENT_COLLECTIONS
using System;
using System.Threading;
using System.Collections.Generic;
#if UNITTEST
using NUnit.Framework;
namespace Nuclex.Support.Threading {
/// <summary>Unit Test for the thread runner class</summary>
[TestFixture]
internal class ThreadRunnerTest {
#region class DefaultDisposeRunner
/// <summary>Implementation of a thread runner to check default dispose behavior</summary>
private class DefaultDisposeRunner : ThreadRunner {
/// <summary>Reports an error</summary>
/// <param name="exception">Error that will be reported</param>
protected override void ReportError(Exception exception) { }
/// <summary>Called when the status of the busy flag changes</summary>
protected override void BusyChanged() { }
}
#endregion // class DefaultDisposeRunner
#region class DummyRunner
/// <summary>Implementation of a thread runner used for unit testing</summary>
private class DummyRunner : ThreadRunner {
/// <summary>Initializes a new dummy thread runner</summary>
public DummyRunner() : base() {
this.completionGate = new ManualResetEvent(initialState: false);
}
/// <summary>Immediately frees all resources used by the instance</summary>
public new void Dispose() {
base.Dispose(100);
if(this.completionGate != null) {
this.completionGate.Dispose();
this.completionGate = null;
}
}
/// <summary>Waits for the task for complete (all of 100 milliseconds)</summary>
/// <returns>True if the task completed, false if it continues running</returns>
public bool WaitForCompletion() {
return this.completionGate.WaitOne(100);
}
/// <summary>How often the status of the busy flag has changed</summary>
public int BusyChangeCount {
get { return this.busyChangeCount; }
}
/// <summary>Error that has been reported the last time a task was run</summary>
public Exception ReportedError {
get { return this.reportedError; }
}
/// <summary>Reports an error</summary>
/// <param name="exception">Error that will be reported</param>
protected override void ReportError(Exception exception) {
this.reportedError = exception;
}
/// <summary>Called when the status of the busy flag changes</summary>
protected override void BusyChanged() {
++busyChangeCount;
if((busyChangeCount >= 2) && (base.IsBusy == false)) {
this.completionGate.Set();
}
}
/// <summary>Last error that was reported in the thread</summary>
private Exception reportedError;
/// <summary>Number of times the busy state of the runner has changed</summary>
private int busyChangeCount;
/// <summary>Triggered when the busy event has performed a double flank</summary>
private ManualResetEvent completionGate;
}
#endregion // class DummyRunner
#region class DummyTask
/// <summary>Dummy task that can be executed by a thread runner</summary>
private class DummyTask : IDisposable {
/// <summary>Initializes a new dummy task</summary>
/// <param name="delayMilliseconds">How long the task shoudl take to execute</param>
public DummyTask(int delayMilliseconds) {
this.startGate = new ManualResetEvent(initialState: false);
this.delayMilliseconds = delayMilliseconds;
}
/// <summary>Immediately releases all resources owned by the instance</summary>
public void Dispose() {
if(this.startGate != null) {
this.startGate.Dispose();
this.startGate = null;
}
}
/// <summary>Waits for the task to start (all of 100 milliseconds)</summary>
/// <returns>True if the start started, false if it didn't</returns>
public bool WaitForStart() {
return this.startGate.WaitOne(100);
}
/// <summary>Sets the task up to fail with the specified error</summary>
/// <param name="error">Error the task will fail with</param>
public void FailWith(Exception error) {
this.error = error;
}
/// <summary>Runs the task with no arguments</summary>
public void Run() {
this.startGate.Set();
++this.executionCount;
Thread.Sleep(this.delayMilliseconds);
if(this.error != null) {
throw this.error;
}
}
/// <summary>Runs the task with one argument</summary>
/// <param name="firstArgument">First argument passed from the runner</param>
public void Run(float firstArgument) {
this.startGate.Set();
++this.executionCount;
this.firstArgument = firstArgument;
Thread.Sleep(this.delayMilliseconds);
if(this.error != null) {
throw this.error;
}
}
/// <summary>Runs the task with two argument</summary>
/// <param name="firstArgument">First argument passed from the runner</param>
/// <param name="secondArgument">Second argument passed from the runner</param>
public void Run(float firstArgument, string secondArgument) {
this.startGate.Set();
++this.executionCount;
this.firstArgument = firstArgument;
this.secondArgument = secondArgument;
Thread.Sleep(this.delayMilliseconds);
if(this.error != null) {
throw this.error;
}
}
/// <summary>Runs the task with no arguments</summary>
/// <param name="cancellationToken">Token by which cancellation can be signalled</param>
public void RunCancellable(CancellationToken cancellationToken) {
this.startGate.Set();
++this.executionCount;
if(delayMilliseconds == 0) {
Thread.Sleep(0);
} else {
if(cancellationToken.WaitHandle.WaitOne(delayMilliseconds)) {
this.wasCancelled = cancellationToken.IsCancellationRequested;
cancellationToken.ThrowIfCancellationRequested();
}
}
if(this.error != null) {
throw this.error;
}
}
/// <summary>Runs the task with one argument</summary>
/// <param name="firstArgument">First argument passed from the runner</param>
/// <param name="cancellationToken">Token by which cancellation can be signalled</param>
public void RunCancellable(float firstArgument, CancellationToken cancellationToken) {
this.startGate.Set();
++this.executionCount;
this.firstArgument = firstArgument;
if(delayMilliseconds == 0) {
Thread.Sleep(0);
} else {
if(cancellationToken.WaitHandle.WaitOne(delayMilliseconds)) {
this.wasCancelled = cancellationToken.IsCancellationRequested;
cancellationToken.ThrowIfCancellationRequested();
}
}
if(this.error != null) {
throw this.error;
}
}
/// <summary>Runs the task with two argument</summary>
/// <param name="firstArgument">First argument passed from the runner</param>
/// <param name="secondArgument">Second argument passed from the runner</param>
/// <param name="cancellationToken">Token by which cancellation can be signalled</param>
public void RunCancellable(
float firstArgument, string secondArgument, CancellationToken cancellationToken
) {
this.startGate.Set();
++this.executionCount;
this.firstArgument = firstArgument;
this.secondArgument = secondArgument;
if(delayMilliseconds == 0) {
Thread.Sleep(0);
} else {
if(cancellationToken.WaitHandle.WaitOne(delayMilliseconds)) {
this.wasCancelled = cancellationToken.IsCancellationRequested;
cancellationToken.ThrowIfCancellationRequested();
}
}
if(this.error != null) {
throw this.error;
}
}
/// <summary>How many times the task was run</summary>
public int ExecutionCount {
get { return this.executionCount; }
}
/// <summary>Whether the task was cancelled by the runner itself</summary>
public bool WasCancelled {
get { return this.wasCancelled; }
}
/// <summary>What the first argument was during the last call</summary>
public float FirstArgument {
get { return this.firstArgument; }
}
/// <summary>What the second argument was during the last call</summary>
public string SecondArgument {
get { return this.secondArgument; }
}
/// <summary>Last error that was reported in the thread</summary>
private Exception error;
/// <summary>Triggered when the task has started</summary>
private ManualResetEvent startGate;
/// <summary>How long the task should take to execute in milliseconds</summary>
private int delayMilliseconds;
/// <summary>How many times the task has been executed</summary>
private volatile int executionCount;
/// <summary>Whether the task has been cancelled</summary>
private volatile bool wasCancelled;
/// <summary>First argument that was passed to the task</summary>
private volatile float firstArgument;
/// <summary>Second argument that was passed to the task</summary>
private volatile string secondArgument;
}
#endregion // class DummyRunner
/// <summary>Verifies that the thread runner has a default constructor</summary>
[Test]
public void CanBeDefaultConstructed() {
using(new DummyRunner()) { }
}
/// <summary>Checks that the runner sets and unsets its busy flag</summary>
[Test]
public void BusyFlagIsToggled() {
using(var runner = new DummyRunner()) {
int busyFlagChangeCount = runner.BusyChangeCount;
Assert.IsFalse(runner.IsBusy);
runner.RunInBackground((Action)delegate() { });
Assert.IsTrue(runner.WaitForCompletion());
Assert.GreaterOrEqual(busyFlagChangeCount + 2, runner.BusyChangeCount);
Assert.IsFalse(runner.IsBusy);
}
}
/// <summary>Lets the thread runner run a simple task in the background</summary>
[Test]
public void CanRunSimpleTaskInBackground() {
using(var task = new DummyTask(0)) {
using(var runner = new DummyRunner()) {
runner.RunInBackground(new Action(task.Run));
Assert.IsTrue(runner.WaitForCompletion());
Assert.IsNull(runner.ReportedError);
}
Assert.AreEqual(1, task.ExecutionCount);
Assert.IsFalse(task.WasCancelled);
}
}
/// <summary>
/// Checks that the thread runner is able to pass a single argument to a task
/// </summary>
[Test]
public void CanPassSingleArgumentToSimpleTask() {
using(var task = new DummyTask(0)) {
using(var runner = new DummyRunner()) {
runner.RunInBackground(new Action<float>(task.Run), 12.43f);
Assert.IsTrue(runner.WaitForCompletion());
Assert.IsNull(runner.ReportedError);
}
Assert.AreEqual(1, task.ExecutionCount);
Assert.AreEqual(12.43f, task.FirstArgument);
Assert.IsFalse(task.WasCancelled);
}
}
/// <summary>
/// Checks that the thread runner is able to pass two arguments to a task
/// </summary>
[Test]
public void CanPassTwoArgumentsToSimpleTask() {
using(var task = new DummyTask(0)) {
using(var runner = new DummyRunner()) {
runner.RunInBackground(new Action<float, string>(task.Run), 98.67f, "Hello");
Assert.IsTrue(runner.WaitForCompletion());
Assert.IsNull(runner.ReportedError);
}
Assert.AreEqual(1, task.ExecutionCount);
Assert.AreEqual(98.67f, task.FirstArgument);
Assert.AreEqual("Hello", task.SecondArgument);
Assert.IsFalse(task.WasCancelled);
}
}
/// <summary>
/// Verifies that an error happening in a simple task is reported correctly
/// </summary>
[Test]
public void SimpleTaskErrorIsReported() {
using(var task = new DummyTask(0)) {
var error = new InvalidOperationException("Mooh!");
task.FailWith(error);
using(var runner = new DummyRunner()) {
runner.RunInBackground(new Action(task.Run));
Assert.IsTrue(runner.WaitForCompletion());
Assert.AreSame(error, runner.ReportedError);
}
Assert.AreEqual(1, task.ExecutionCount);
Assert.IsFalse(task.WasCancelled);
}
}
/// <summary>Lets the thread runner run a cancellable task in the background</summary>
[Test]
public void CanRunCancellableTaskInBackground() {
using(var task = new DummyTask(100)) {
using(var runner = new DummyRunner()) {
runner.RunInBackground(new CancellableAction(task.RunCancellable));
Assert.IsTrue(task.WaitForStart());
runner.CancelAllBackgroundOperations();
Assert.IsTrue(runner.WaitForCompletion());
Assert.IsNull(runner.ReportedError);
}
Assert.AreEqual(1, task.ExecutionCount);
Assert.IsTrue(task.WasCancelled);
}
}
/// <summary>
/// Checks that the thread runner is able to pass a single argument to a task
/// that can be cancelled
/// </summary>
[Test]
public void CanPassSingleArgumentToCancellableTask() {
using(var task = new DummyTask(100)) {
using(var runner = new DummyRunner()) {
runner.RunInBackground(new CancellableAction<float>(task.RunCancellable), 12.43f);
Assert.IsTrue(task.WaitForStart());
runner.CancelAllBackgroundOperations();
Assert.IsTrue(runner.WaitForCompletion());
Assert.IsNull(runner.ReportedError);
}
Assert.AreEqual(1, task.ExecutionCount);
Assert.AreEqual(12.43f, task.FirstArgument);
Assert.IsTrue(task.WasCancelled);
}
}
/// <summary>
/// Checks that the thread runner is able to pass two arguments to a task
/// that can be cancelled
/// </summary>
[Test]
public void CanPassTwoArgumentsToCancellableTask() {
using(var task = new DummyTask(100)) {
using(var runner = new DummyRunner()) {
runner.RunInBackground(
new CancellableAction<float, string>(task.RunCancellable), 98.67f, "Hello"
);
Assert.IsTrue(task.WaitForStart());
runner.CancelAllBackgroundOperations();
Assert.IsTrue(runner.WaitForCompletion());
Assert.IsNull(runner.ReportedError);
}
Assert.AreEqual(1, task.ExecutionCount);
Assert.AreEqual(98.67f, task.FirstArgument);
Assert.AreEqual("Hello", task.SecondArgument);
Assert.IsTrue(task.WasCancelled);
}
}
}
} // namespace Nuclex.Support.Threading
#endif // UNITTEST
#endif // !NO_CONCURRENT_COLLECTIONS

View file

@ -1,464 +1,463 @@
#region CPL License
/*
Nuclex Framework
Copyright (C) 2002-2019 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#endregion
#if !NO_CONCURRENT_COLLECTIONS
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace Nuclex.Support.Threading {
/// <summary>Executes actions in a threads</summary>
public abstract class ThreadRunner : IDisposable {
#region interface IRunner
/// <summary>Interface for a background task runner</summary>
private interface IRunner {
/// <summary>Runs the background task</summary>
void Run();
/// <summary>The runner's cancellation token source</summary>
CancellationTokenSource CancellationTokenSource { get; }
}
#endregion // interface IRunner
#region struct Runner
/// <summary>Runs a background task with no parameters</summary>
private struct Runner : IRunner {
/// <summary>Initializes a new runner</summary>
/// <param name="action">Action the runner will execute</param>
public Runner(Action action) {
this.action = action;
}
/// <summary>Executes the runner's action</summary>
public void Run() {
this.action();
}
/// <summary>The runner's cancellation token source</summary>
public CancellationTokenSource CancellationTokenSource {
get { return null; }
}
/// <summary>Action the runner will execute</summary>
private Action action;
}
#endregion // struct Runner
#region struct CancellableRunner
/// <summary>Runs a background task with no parameters</summary>
private struct CancellableRunner : IRunner {
/// <summary>Initializes a new runner</summary>
/// <param name="action">Action the runner will execute</param>
public CancellableRunner(CancellableAction action) {
this.action = action;
this.cancellationTokenSource = new CancellationTokenSource();
}
/// <summary>Executes the runner's action</summary>
public void Run() {
this.action(this.cancellationTokenSource.Token);
}
/// <summary>The runner's cancellation token source</summary>
public CancellationTokenSource CancellationTokenSource {
get { return this.cancellationTokenSource; }
}
/// <summary>The runner's cancellation token source</summary>
private CancellationTokenSource cancellationTokenSource;
/// <summary>Action the runner will execute</summary>
private CancellableAction action;
}
#endregion // struct CancellableRunner
#region struct Runner<P1>
/// <summary>Runs a background task with one parameter</summary>
private struct Runner<P1> : IRunner {
/// <summary>Initializes a new runner</summary>
/// <param name="action">Action the runner will execute</param>
/// <param name="parameter1">Parameter that will be passed to the action</param>
public Runner(Action<P1> action, P1 parameter1) {
this.action = action;
this.parameter1 = parameter1;
}
/// <summary>Executes the runner's action</summary>
public void Run() {
this.action(this.parameter1);
}
/// <summary>The runner's cancellation token source</summary>
public CancellationTokenSource CancellationTokenSource {
get { return null; }
}
/// <summary>Action the runner will execute</summary>
private Action<P1> action;
/// <summary>Parameter that will be passed to the action</summary>
private P1 parameter1;
}
#endregion // struct Runner<P1>
#region struct CancellableRunner<P1>
/// <summary>Runs a background task with one parameter</summary>
private struct CancellableRunner<P1> : IRunner {
/// <summary>Initializes a new runner</summary>
/// <param name="action">Action the runner will execute</param>
/// <param name="parameter1">Parameter that will be passed to the action</param>
public CancellableRunner(CancellableAction<P1> action, P1 parameter1) {
this.action = action;
this.parameter1 = parameter1;
this.cancellationTokenSource = new CancellationTokenSource();
}
/// <summary>Executes the runner's action</summary>
public void Run() {
this.action(this.parameter1, this.cancellationTokenSource.Token);
}
/// <summary>The runner's cancellation token source</summary>
public CancellationTokenSource CancellationTokenSource {
get { return this.cancellationTokenSource; }
}
/// <summary>The runner's cancellation token source</summary>
private CancellationTokenSource cancellationTokenSource;
/// <summary>Action the runner will execute</summary>
private CancellableAction<P1> action;
/// <summary>Parameter that will be passed to the action</summary>
private P1 parameter1;
}
#endregion // struct CancellableRunner<P1>
#region struct Runner<P1, P2>
/// <summary>Runs a background task with one parameter</summary>
private struct Runner<P1, P2> : IRunner {
/// <summary>Initializes a new runner</summary>
/// <param name="action">Action the runner will execute</param>
/// <param name="parameter1">First parameter that will be passed to the action</param>
/// <param name="parameter2">Second parameter that will be passed to the action</param>
public Runner(Action<P1, P2> action, P1 parameter1, P2 parameter2) {
this.action = action;
this.parameter1 = parameter1;
this.parameter2 = parameter2;
}
/// <summary>Executes the runner's action</summary>
public void Run() {
this.action(this.parameter1, this.parameter2);
}
/// <summary>The runner's cancellation token source</summary>
public CancellationTokenSource CancellationTokenSource {
get { return null; }
}
/// <summary>Action the runner will execute</summary>
private Action<P1, P2> action;
/// <summary>First parameter that will be passed to the action</summary>
private P1 parameter1;
/// <summary>Second parameter that will be passed to the action</summary>
private P2 parameter2;
}
#endregion // struct Runner<P1, P2>
#region struct CancellableRunner<P1, P2>
/// <summary>Runs a background task with one parameter</summary>
private struct CancellableRunner<P1, P2> : IRunner {
/// <summary>Initializes a new runner</summary>
/// <param name="action">Action the runner will execute</param>
/// <param name="parameter1">First parameter that will be passed to the action</param>
/// <param name="parameter2">Second parameter that will be passed to the action</param>
public CancellableRunner(CancellableAction<P1, P2> action, P1 parameter1, P2 parameter2) {
this.action = action;
this.parameter1 = parameter1;
this.parameter2 = parameter2;
this.cancellationTokenSource = new CancellationTokenSource();
}
/// <summary>Executes the runner's action</summary>
public void Run() {
this.action(this.parameter1, this.parameter2, this.cancellationTokenSource.Token);
}
/// <summary>The runner's cancellation token source</summary>
public CancellationTokenSource CancellationTokenSource {
get { return this.cancellationTokenSource; }
}
/// <summary>The runner's cancellation token source</summary>
private CancellationTokenSource cancellationTokenSource;
/// <summary>Action the runner will execute</summary>
private CancellableAction<P1, P2> action;
/// <summary>First parameter that will be passed to the action</summary>
private P1 parameter1;
/// <summary>Second parameter that will be passed to the action</summary>
private P2 parameter2;
}
#endregion // struct CancellableRunner<P1, P2>
/// <summary>Initializes a new background processing handler</summary>
public ThreadRunner() {
this.executeQueuedRunnersInThreadDelegate = new Action(executeQueuedRunnersInThread);
this.queuedRunners = new ConcurrentQueue<IRunner>();
}
/// <summary>
/// Immediately cancels all operations and releases the resources used by the instance
/// </summary>
public void Dispose() {
Dispose(timeoutMilliseconds: 2500);
}
/// <summary>
/// Immediately cancels all operations and releases the resources used by the instance
/// </summary>
/// <param name="timeoutMilliseconds">
/// Time to wait for the background tasks before dropping the tasks unfinished
/// </param>
public void Dispose(int timeoutMilliseconds) {
CancelAllBackgroundOperations();
Task currentTask;
lock(this) {
currentTask = this.currentTask;
}
if(currentTask != null) {
if(!currentTask.Wait(timeoutMilliseconds)) {
Debug.Assert(false, "Task does not support cancellation or did not cancel in time");
}
lock(this) {
this.currentTask = null;
IsBusy = false;
}
}
}
/// <summary>Whether the view model is currently busy executing a task</summary>
public bool IsBusy {
get { return this.isBusy; }
private set {
if(value != this.isBusy) {
this.isBusy = value;
BusyChanged();
}
}
}
/// <summary>Reports an error</summary>
/// <param name="exception">Error that will be reported</param>
protected abstract void ReportError(Exception exception);
/// <summary>Called when the status of the busy flag changes</summary>
protected abstract void BusyChanged();
/// <summary>Executes the specified operation in the background</summary>
/// <param name="action">Action that will be executed in the background</param>
public void RunInBackground(Action action) {
this.queuedRunners.Enqueue(new Runner(action));
startBackgroundProcessingIfNecessary();
}
/// <summary>Executes the specified operation in the background</summary>
/// <param name="action">Action that will be executed in the background</param>
public void RunInBackground(CancellableAction action) {
this.queuedRunners.Enqueue(new CancellableRunner(action));
startBackgroundProcessingIfNecessary();
}
/// <summary>Executes the specified operation in the background</summary>
/// <param name="action">Action that will be executed in the background</param>
/// <param name="parameter1">Parameter that will be passed to the action</param>
public void RunInBackground<P1>(Action<P1> action, P1 parameter1) {
this.queuedRunners.Enqueue(new Runner<P1>(action, parameter1));
startBackgroundProcessingIfNecessary();
}
/// <summary>Executes the specified operation in the background</summary>
/// <param name="action">Action that will be executed in the background</param>
/// <param name="parameter1">Parameter that will be passed to the action</param>
public void RunInBackground<P1>(CancellableAction<P1> action, P1 parameter1) {
this.queuedRunners.Enqueue(new CancellableRunner<P1>(action, parameter1));
startBackgroundProcessingIfNecessary();
}
/// <summary>Executes the specified operation in the background</summary>
/// <param name="action">Action that will be executed in the background</param>
/// <param name="parameter1">First parameter that will be passed to the action</param>
/// <param name="parameter2">Second parameter that will be passed to the action</param>
public void RunInBackground<P1, P2>(Action<P1, P2> action, P1 parameter1, P2 parameter2) {
this.queuedRunners.Enqueue(new Runner<P1, P2>(action, parameter1, parameter2));
startBackgroundProcessingIfNecessary();
}
/// <summary>Executes the specified operation in the background</summary>
/// <param name="action">Action that will be executed in the background</param>
/// <param name="parameter1">First parameter that will be passed to the action</param>
/// <param name="parameter2">Second parameter that will be passed to the action</param>
public void RunInBackground<P1, P2>(
CancellableAction<P1, P2> action, P1 parameter1, P2 parameter2
) {
this.queuedRunners.Enqueue(new CancellableRunner<P1, P2>(action, parameter1, parameter2));
startBackgroundProcessingIfNecessary();
}
/// <summary>Cancels the currently running background operation</summary>
public void CancelBackgroundOperation() {
IRunner currentRunner = this.currentRunner;
if(currentRunner != null) {
CancellationTokenSource cancellationTokenSource = currentRunner.CancellationTokenSource;
if(cancellationTokenSource != null) {
cancellationTokenSource.Cancel();
}
}
}
/// <summary>Cancels all queued and the currently running background operation</summary>
public void CancelAllBackgroundOperations() {
IRunner runner;
while(this.queuedRunners.TryDequeue(out runner)) {
CancellationTokenSource cancellationTokenSource = runner.CancellationTokenSource;
if(cancellationTokenSource != null) {
cancellationTokenSource.Cancel();
}
}
CancelBackgroundOperation();
}
/// <summary>Whether the background operation has been cancelled</summary>
//[Obsolete("Please use a method accepting a cancellation token instead of using this")]
public bool IsBackgroundOperationCancelled {
get {
IRunner currentRunner = this.currentRunner;
if(currentRunner != null) {
return currentRunner.CancellationTokenSource.IsCancellationRequested;
} else {
return false;
}
}
}
/// <summary>Throws an exception if the background operation was cancelled</summary>
//[Obsolete("Please use a method accepting a cancellation token instead of using this")]
public void ThrowIfBackgroundOperationCancelled() {
IRunner currentRunner = this.currentRunner;
if(currentRunner != null) {
CancellationTokenSource source = currentRunner.CancellationTokenSource;
if(source != null) {
source.Token.ThrowIfCancellationRequested();
}
}
}
/// <summary>Executes the queued runners in the background</summary>
private void executeQueuedRunnersInThread() {
IsBusy = true;
IRunner runner;
while(this.queuedRunners.TryDequeue(out runner)) {
try {
this.currentRunner = runner;
runner.Run();
}
catch(OperationCanceledException) {
// Ignore
}
catch(Exception exception) {
this.currentRunner = null; // When the error is reported this should already be null
ReportError(exception);
}
this.currentRunner = null;
}
lock(this) {
this.currentTask = null;
IsBusy = false;
}
}
/// <summary>Starts the background processing thread, if needed</summary>
private void startBackgroundProcessingIfNecessary() {
Task currentTask;
lock(this) {
if(this.currentTask == null) {
currentTask = new Task(this.executeQueuedRunnersInThreadDelegate);
this.currentTask = currentTask;
} else {
return; // Task is already running
}
}
// Start the task outside of the lock statement so that when the thread starts to run,
// it is guaranteed to read the currentTask variable as the task we just created.
currentTask.Start();
}
/// <summary>Whether the view model is currently busy executing a task</summary>
private volatile bool isBusy;
/// <summary>Delegate for the executedQueuedRunnersInThread() method</summary>
private Action executeQueuedRunnersInThreadDelegate;
/// <summary>Queued background operations</summary>
private ConcurrentQueue<IRunner> queuedRunners;
/// <summary>Runner currently executing in the background</summary>
private volatile IRunner currentRunner;
/// <summary>Task that is currently executing the runners</summary>
private Task currentTask;
}
} // namespace Nuclex.Support.Threading
#endif // !NO_CONCURRENT_COLLECTIONS
#region Apache License 2.0
/*
Nuclex .NET Framework
Copyright (C) 2002-2024 Markus Ewald / Nuclex Development Labs
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#endregion // Apache License 2.0
#if !NO_CONCURRENT_COLLECTIONS
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace Nuclex.Support.Threading {
/// <summary>Executes actions in a threads</summary>
public abstract class ThreadRunner : IDisposable {
#region interface IRunner
/// <summary>Interface for a background task runner</summary>
private interface IRunner {
/// <summary>Runs the background task</summary>
void Run();
/// <summary>The runner's cancellation token source</summary>
CancellationTokenSource CancellationTokenSource { get; }
}
#endregion // interface IRunner
#region struct Runner
/// <summary>Runs a background task with no parameters</summary>
private struct Runner : IRunner {
/// <summary>Initializes a new runner</summary>
/// <param name="action">Action the runner will execute</param>
public Runner(Action action) {
this.action = action;
}
/// <summary>Executes the runner's action</summary>
public void Run() {
this.action();
}
/// <summary>The runner's cancellation token source</summary>
public CancellationTokenSource CancellationTokenSource {
get { return null; }
}
/// <summary>Action the runner will execute</summary>
private Action action;
}
#endregion // struct Runner
#region struct CancellableRunner
/// <summary>Runs a background task with no parameters</summary>
private struct CancellableRunner : IRunner {
/// <summary>Initializes a new runner</summary>
/// <param name="action">Action the runner will execute</param>
public CancellableRunner(CancellableAction action) {
this.action = action;
this.cancellationTokenSource = new CancellationTokenSource();
}
/// <summary>Executes the runner's action</summary>
public void Run() {
this.action(this.cancellationTokenSource.Token);
}
/// <summary>The runner's cancellation token source</summary>
public CancellationTokenSource CancellationTokenSource {
get { return this.cancellationTokenSource; }
}
/// <summary>The runner's cancellation token source</summary>
private CancellationTokenSource cancellationTokenSource;
/// <summary>Action the runner will execute</summary>
private CancellableAction action;
}
#endregion // struct CancellableRunner
#region struct Runner<P1>
/// <summary>Runs a background task with one parameter</summary>
private struct Runner<P1> : IRunner {
/// <summary>Initializes a new runner</summary>
/// <param name="action">Action the runner will execute</param>
/// <param name="parameter1">Parameter that will be passed to the action</param>
public Runner(Action<P1> action, P1 parameter1) {
this.action = action;
this.parameter1 = parameter1;
}
/// <summary>Executes the runner's action</summary>
public void Run() {
this.action(this.parameter1);
}
/// <summary>The runner's cancellation token source</summary>
public CancellationTokenSource CancellationTokenSource {
get { return null; }
}
/// <summary>Action the runner will execute</summary>
private Action<P1> action;
/// <summary>Parameter that will be passed to the action</summary>
private P1 parameter1;
}
#endregion // struct Runner<P1>
#region struct CancellableRunner<P1>
/// <summary>Runs a background task with one parameter</summary>
private struct CancellableRunner<P1> : IRunner {
/// <summary>Initializes a new runner</summary>
/// <param name="action">Action the runner will execute</param>
/// <param name="parameter1">Parameter that will be passed to the action</param>
public CancellableRunner(CancellableAction<P1> action, P1 parameter1) {
this.action = action;
this.parameter1 = parameter1;
this.cancellationTokenSource = new CancellationTokenSource();
}
/// <summary>Executes the runner's action</summary>
public void Run() {
this.action(this.parameter1, this.cancellationTokenSource.Token);
}
/// <summary>The runner's cancellation token source</summary>
public CancellationTokenSource CancellationTokenSource {
get { return this.cancellationTokenSource; }
}
/// <summary>The runner's cancellation token source</summary>
private CancellationTokenSource cancellationTokenSource;
/// <summary>Action the runner will execute</summary>
private CancellableAction<P1> action;
/// <summary>Parameter that will be passed to the action</summary>
private P1 parameter1;
}
#endregion // struct CancellableRunner<P1>
#region struct Runner<P1, P2>
/// <summary>Runs a background task with one parameter</summary>
private struct Runner<P1, P2> : IRunner {
/// <summary>Initializes a new runner</summary>
/// <param name="action">Action the runner will execute</param>
/// <param name="parameter1">First parameter that will be passed to the action</param>
/// <param name="parameter2">Second parameter that will be passed to the action</param>
public Runner(Action<P1, P2> action, P1 parameter1, P2 parameter2) {
this.action = action;
this.parameter1 = parameter1;
this.parameter2 = parameter2;
}
/// <summary>Executes the runner's action</summary>
public void Run() {
this.action(this.parameter1, this.parameter2);
}
/// <summary>The runner's cancellation token source</summary>
public CancellationTokenSource CancellationTokenSource {
get { return null; }
}
/// <summary>Action the runner will execute</summary>
private Action<P1, P2> action;
/// <summary>First parameter that will be passed to the action</summary>
private P1 parameter1;
/// <summary>Second parameter that will be passed to the action</summary>
private P2 parameter2;
}
#endregion // struct Runner<P1, P2>
#region struct CancellableRunner<P1, P2>
/// <summary>Runs a background task with one parameter</summary>
private struct CancellableRunner<P1, P2> : IRunner {
/// <summary>Initializes a new runner</summary>
/// <param name="action">Action the runner will execute</param>
/// <param name="parameter1">First parameter that will be passed to the action</param>
/// <param name="parameter2">Second parameter that will be passed to the action</param>
public CancellableRunner(CancellableAction<P1, P2> action, P1 parameter1, P2 parameter2) {
this.action = action;
this.parameter1 = parameter1;
this.parameter2 = parameter2;
this.cancellationTokenSource = new CancellationTokenSource();
}
/// <summary>Executes the runner's action</summary>
public void Run() {
this.action(this.parameter1, this.parameter2, this.cancellationTokenSource.Token);
}
/// <summary>The runner's cancellation token source</summary>
public CancellationTokenSource CancellationTokenSource {
get { return this.cancellationTokenSource; }
}
/// <summary>The runner's cancellation token source</summary>
private CancellationTokenSource cancellationTokenSource;
/// <summary>Action the runner will execute</summary>
private CancellableAction<P1, P2> action;
/// <summary>First parameter that will be passed to the action</summary>
private P1 parameter1;
/// <summary>Second parameter that will be passed to the action</summary>
private P2 parameter2;
}
#endregion // struct CancellableRunner<P1, P2>
/// <summary>Initializes a new background processing handler</summary>
public ThreadRunner() {
this.executeQueuedRunnersInThreadDelegate = new Action(executeQueuedRunnersInThread);
this.queuedRunners = new ConcurrentQueue<IRunner>();
}
/// <summary>
/// Immediately cancels all operations and releases the resources used by the instance
/// </summary>
public void Dispose() {
Dispose(timeoutMilliseconds: 2500);
}
/// <summary>
/// Immediately cancels all operations and releases the resources used by the instance
/// </summary>
/// <param name="timeoutMilliseconds">
/// Time to wait for the background tasks before dropping the tasks unfinished
/// </param>
public void Dispose(int timeoutMilliseconds) {
CancelAllBackgroundOperations();
Task currentTask;
lock(this) {
currentTask = this.currentTask;
}
if(currentTask != null) {
if(!currentTask.Wait(timeoutMilliseconds)) {
Debug.Assert(false, "Task does not support cancellation or did not cancel in time");
}
lock(this) {
this.currentTask = null;
IsBusy = false;
}
}
}
/// <summary>Whether the view model is currently busy executing a task</summary>
public bool IsBusy {
get { return this.isBusy; }
private set {
if(value != this.isBusy) {
this.isBusy = value;
BusyChanged();
}
}
}
/// <summary>Reports an error</summary>
/// <param name="exception">Error that will be reported</param>
protected abstract void ReportError(Exception exception);
/// <summary>Called when the status of the busy flag changes</summary>
protected abstract void BusyChanged();
/// <summary>Executes the specified operation in the background</summary>
/// <param name="action">Action that will be executed in the background</param>
public void RunInBackground(Action action) {
this.queuedRunners.Enqueue(new Runner(action));
startBackgroundProcessingIfNecessary();
}
/// <summary>Executes the specified operation in the background</summary>
/// <param name="action">Action that will be executed in the background</param>
public void RunInBackground(CancellableAction action) {
this.queuedRunners.Enqueue(new CancellableRunner(action));
startBackgroundProcessingIfNecessary();
}
/// <summary>Executes the specified operation in the background</summary>
/// <param name="action">Action that will be executed in the background</param>
/// <param name="parameter1">Parameter that will be passed to the action</param>
public void RunInBackground<P1>(Action<P1> action, P1 parameter1) {
this.queuedRunners.Enqueue(new Runner<P1>(action, parameter1));
startBackgroundProcessingIfNecessary();
}
/// <summary>Executes the specified operation in the background</summary>
/// <param name="action">Action that will be executed in the background</param>
/// <param name="parameter1">Parameter that will be passed to the action</param>
public void RunInBackground<P1>(CancellableAction<P1> action, P1 parameter1) {
this.queuedRunners.Enqueue(new CancellableRunner<P1>(action, parameter1));
startBackgroundProcessingIfNecessary();
}
/// <summary>Executes the specified operation in the background</summary>
/// <param name="action">Action that will be executed in the background</param>
/// <param name="parameter1">First parameter that will be passed to the action</param>
/// <param name="parameter2">Second parameter that will be passed to the action</param>
public void RunInBackground<P1, P2>(Action<P1, P2> action, P1 parameter1, P2 parameter2) {
this.queuedRunners.Enqueue(new Runner<P1, P2>(action, parameter1, parameter2));
startBackgroundProcessingIfNecessary();
}
/// <summary>Executes the specified operation in the background</summary>
/// <param name="action">Action that will be executed in the background</param>
/// <param name="parameter1">First parameter that will be passed to the action</param>
/// <param name="parameter2">Second parameter that will be passed to the action</param>
public void RunInBackground<P1, P2>(
CancellableAction<P1, P2> action, P1 parameter1, P2 parameter2
) {
this.queuedRunners.Enqueue(new CancellableRunner<P1, P2>(action, parameter1, parameter2));
startBackgroundProcessingIfNecessary();
}
/// <summary>Cancels the currently running background operation</summary>
public void CancelBackgroundOperation() {
IRunner currentRunner = this.currentRunner;
if(currentRunner != null) {
CancellationTokenSource cancellationTokenSource = currentRunner.CancellationTokenSource;
if(cancellationTokenSource != null) {
cancellationTokenSource.Cancel();
}
}
}
/// <summary>Cancels all queued and the currently running background operation</summary>
public void CancelAllBackgroundOperations() {
IRunner runner;
while(this.queuedRunners.TryDequeue(out runner)) {
CancellationTokenSource cancellationTokenSource = runner.CancellationTokenSource;
if(cancellationTokenSource != null) {
cancellationTokenSource.Cancel();
}
}
CancelBackgroundOperation();
}
/// <summary>Whether the background operation has been cancelled</summary>
//[Obsolete("Please use a method accepting a cancellation token instead of using this")]
public bool IsBackgroundOperationCancelled {
get {
IRunner currentRunner = this.currentRunner;
if(currentRunner != null) {
return currentRunner.CancellationTokenSource.IsCancellationRequested;
} else {
return false;
}
}
}
/// <summary>Throws an exception if the background operation was cancelled</summary>
//[Obsolete("Please use a method accepting a cancellation token instead of using this")]
public void ThrowIfBackgroundOperationCancelled() {
IRunner currentRunner = this.currentRunner;
if(currentRunner != null) {
CancellationTokenSource source = currentRunner.CancellationTokenSource;
if(source != null) {
source.Token.ThrowIfCancellationRequested();
}
}
}
/// <summary>Executes the queued runners in the background</summary>
private void executeQueuedRunnersInThread() {
IsBusy = true;
IRunner runner;
while(this.queuedRunners.TryDequeue(out runner)) {
try {
this.currentRunner = runner;
runner.Run();
}
catch(OperationCanceledException) {
// Ignore
}
catch(Exception exception) {
this.currentRunner = null; // When the error is reported this should already be null
ReportError(exception);
}
this.currentRunner = null;
}
lock(this) {
this.currentTask = null;
IsBusy = false;
}
}
/// <summary>Starts the background processing thread, if needed</summary>
private void startBackgroundProcessingIfNecessary() {
Task currentTask;
lock(this) {
if(this.currentTask == null) {
currentTask = new Task(this.executeQueuedRunnersInThreadDelegate);
this.currentTask = currentTask;
} else {
return; // Task is already running
}
}
// Start the task outside of the lock statement so that when the thread starts to run,
// it is guaranteed to read the currentTask variable as the task we just created.
currentTask.Start();
}
/// <summary>Whether the view model is currently busy executing a task</summary>
private volatile bool isBusy;
/// <summary>Delegate for the executedQueuedRunnersInThread() method</summary>
private Action executeQueuedRunnersInThreadDelegate;
/// <summary>Queued background operations</summary>
private ConcurrentQueue<IRunner> queuedRunners;
/// <summary>Runner currently executing in the background</summary>
private volatile IRunner currentRunner;
/// <summary>Task that is currently executing the runners</summary>
private Task currentTask;
}
} // namespace Nuclex.Support.Threading
#endif // !NO_CONCURRENT_COLLECTIONS