using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Nuclex.Support {
/// Processes tasks in parallel using many threads
/// Type of tasks the class will process
public abstract class ParallelBackgroundWorker : IDisposable {
/// Number of CPU cores available on the system
#if XBOX360
public static readonly int Processors = 4;
#else
public static readonly int Processors = Environment.ProcessorCount;
#endif
/// Initializes a new parallel background worker with unlimited threads
public ParallelBackgroundWorker() : this(int.MaxValue) { }
///
/// Initializes a new parallel background worker running the specified number
/// of tasks in parallel
///
///
/// Number of tasks to run in parallel (if positive) or number of CPU cores to leave
/// unused (if negative).
///
///
/// If a negative number of threads is used, at least one thread will be always
/// be created, so specifying -2 on a single-core system will still occupy
/// the only core.
///
public ParallelBackgroundWorker(int threadCount) {
if(threadCount > 0) {
this.threadCount = threadCount;
} else {
threadCount = Math.Max(1, Processors + threadCount);
}
this.runQueuedTasksInThreadDelegate = new Action(runQueuedTasksInThread);
this.runningThreads = new List();
this.tasks = new Queue();
this.threadTerminatedEvent = new AutoResetEvent(false);
this.cancellationTokenSource = new CancellationTokenSource();
}
///
/// Initializes a new parallel background worker that uses the specified name for
/// its worker threads.
///
/// Name that will be assigned to the worker threads
public ParallelBackgroundWorker(string name) :
this(int.MaxValue) {
threadName = name;
}
///
/// Initializes a new parallel background worker that uses the specified name for
/// its worker threads and running the specified number of tasks in parallel.
///
/// Name that will be assigned to the worker threads
///
/// Number of tasks to run in parallel (if positive) or number of CPU cores to leave
/// unused (if negative).
///
///
/// If a negative number of threads is used, at least one thread will be always
/// be created, so specifying -2 on a single-core system will still occupy
/// the only core.
///
public ParallelBackgroundWorker(string name, int threadCount) :
this(threadCount) {
threadName = name;
}
/// Immediately releases all resources owned by the instance
public void Dispose() {
if(this.threadTerminatedEvent != null) {
CancelPendingTasks();
CancelRunningTasks();
Join();
this.threadTerminatedEvent.Dispose();
this.threadTerminatedEvent = null;
}
if(this.cancellationTokenSource != null) {
this.cancellationTokenSource.Dispose();
this.cancellationTokenSource = null;
}
}
/// Adds a task for processing by the background worker threads
/// Task that will be processed in the background
public void AddTask(TTask task) {
if(this.cancellationTokenSource.IsCancellationRequested) {
return;
}
lock(this.queueSynchronizationRoot) {
this.tasks.Enqueue(task);
if(this.runningThreads.Count < this.threadCount) {
//Task newThread = new Task(this.runQueuedTasksInThreadDelegate, );
}
}
// Thread 1:
// lock() {
// - take task
// - or deregister and exit
// }
//
// Thread 2:
// lock() {
// - put task
// - if too few threads, register and add
// }
}
/// Cancels all tasks that are currently executing
public void CancelRunningTasks() {
this.cancellationTokenSource.Cancel();
}
/// Cancels all queued tasks waiting to be executed
public void CancelPendingTasks() {
lock(this.queueSynchronizationRoot) {
this.tasks.Clear();
}
}
/// Waits until all executing and queued tasks have been processed
public void Join() {
while(this.runningThreads.Count > 0) {
this.threadTerminatedEvent.WaitOne();
}
}
/// Called in a thread to execute a single task
/// Task that should be executed
///
/// Cancellation token through which the method can be signalled to cancel
///
protected abstract void Run(TTask task, CancellationToken cancellationToken);
///
/// Runs queued tasks of the parallel background worker until the queue is empty
///
/// Threading task in which this worker is running
private void runQueuedTasksInThread(object thisTaskAsObject) {
string previousThreadName = null;
if(!string.IsNullOrEmpty(this.threadName)) {
previousThreadName = Thread.CurrentThread.Name;
Thread.CurrentThread.Name = this.threadName;
}
try {
#if false
TTask task;
while(this.tasks.TryDequeue(out task)) {
try {
Run(task, this.cancellationTokenSource.Token);
}
catch(Exception exception) {
this.exceptions.Add(exception);
}
}
lock(this.runningThreads) {
this.runningThreads.Remove((Task)thisTaskAsObject);
}
this.threadTerminatedEvent.Set();
#endif
}
finally {
if(!string.IsNullOrEmpty(this.threadName)) {
Thread.CurrentThread.Name = previousThreadName;
}
}
}
///
/// Name that will be assigned to the worker threads while they're processing tasks
/// for the parallel background worker
///
private string threadName;
/// Number of threads to use simultaneously
private int threadCount;
/// Synchronization root used to access the task queue and thread list
///
///
/// Both lists are intentionally using a shared synchronization root because this will
/// guarantee that there's no race condition between adding work and a thread finishing:
///
///
/// Main thread
///
/// lock(this.queueSynchronizationRoot) {
/// this.tasks.Add(newTask);
///
/// if(this.runningThreads.Count < maximumThreadCount) {
/// startAdditionalTaskThread();
/// }
/// }
///
///
///
/// Task thread
///
/// TTask myTask;
/// lock(this.queueSynchronizationRoot) {
/// this.tasks.TryGet(out myTask);
///
/// // RACE CONDITION WITHOUT LOCK: if this wasn't inside the same lock as the task
/// // checking, the main thread might check at this point, see that there are already
/// // enough threads running!
/// if(myTask == null) {
/// this.runningThreads.Remove(thisThread);
/// return;
/// }
/// }
///
///
///
/// The race condition mentioned above could completely stall the background worker by
/// leaving tasks in the queue but no threads active to execute them, so it is vital to
/// use this method of synchronization!
///
///
///
private object queueSynchronizationRoot;
/// Delegate for the runQueuedTasksInThread() method
private Action runQueuedTasksInThreadDelegate;
/// Tasks remaining to be processed
private Queue tasks;
/// Threads that are currently executing tasks
private IList runningThreads;
// Idea:
// private int runningThreadCount;
// Before the task looks for new work, it will decrement this
// if the task gets new work, it will increment this again.
// - if it would be above threadCount now, put work back in the queue
// AddTask() increments this after pushing new work
// - if it would be above threadCount, do not create a new thread
/// Exceptions that have occurred while executing tasks
private ConcurrentBag exceptions;
/// Event that is triggered whenever a task gets completed
private AutoResetEvent threadTerminatedEvent;
///
/// Provides a cancellation token that can be used to signal a thread to terminate
///
private CancellationTokenSource cancellationTokenSource;
}
} // namespace Nuclex.Support