From a18cb63fc5dbdb07b5dec7e23f89fa663efe7239 Mon Sep 17 00:00:00 2001 From: Markus Ewald Date: Thu, 20 Feb 2014 11:49:11 +0000 Subject: [PATCH] Began work on a parallel background worker class that will be like the BackgroundWorker, but without the tight coupling to WinForms and with support for running multiple tasks in parallel git-svn-id: file:///srv/devel/repo-conversion/nusu@290 d2e56fa2-650e-0410-a79f-9358c0239efd --- Nuclex.Support (net-4.0).csproj | 1 + Source/ParallelBackgroundWorker.cs | 265 +++++++++++++++++++++++++++++ 2 files changed, 266 insertions(+) create mode 100644 Source/ParallelBackgroundWorker.cs diff --git a/Nuclex.Support (net-4.0).csproj b/Nuclex.Support (net-4.0).csproj index 65e37d4..c5593b1 100644 --- a/Nuclex.Support (net-4.0).csproj +++ b/Nuclex.Support (net-4.0).csproj @@ -214,6 +214,7 @@ LicenseKey.cs + CommandLine.cs diff --git a/Source/ParallelBackgroundWorker.cs b/Source/ParallelBackgroundWorker.cs new file mode 100644 index 0000000..6dbed19 --- /dev/null +++ b/Source/ParallelBackgroundWorker.cs @@ -0,0 +1,265 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace Nuclex.Support { + + /// Processes tasks in parallel using many threads + /// Type of tasks the class will process + public abstract class ParallelBackgroundWorker : IDisposable { + + /// Number of CPU cores available on the system +#if XBOX360 + public static readonly int Processors = 4; +#else + public static readonly int Processors = Environment.ProcessorCount; +#endif + + /// Initializes a new parallel background worker with unlimited threads + public ParallelBackgroundWorker() : this(int.MaxValue) { } + + /// + /// Initializes a new parallel background worker running the specified number + /// of tasks in parallel + /// + /// + /// Number of tasks to run in parallel (if positive) or number of CPU cores to leave + /// unused (if negative). + /// + /// + /// If a negative number of threads is used, at least one thread will be always + /// be created, so specifying -2 on a single-core system will still occupy + /// the only core. + /// + public ParallelBackgroundWorker(int threadCount) { + if(threadCount > 0) { + this.threadCount = threadCount; + } else { + threadCount = Math.Max(1, Processors + threadCount); + } + + this.runQueuedTasksInThreadDelegate = new Action(runQueuedTasksInThread); + this.runningThreads = new List(); + this.tasks = new Queue(); + this.threadTerminatedEvent = new AutoResetEvent(false); + this.cancellationTokenSource = new CancellationTokenSource(); + } + + /// + /// Initializes a new parallel background worker that uses the specified name for + /// its worker threads. + /// + /// Name that will be assigned to the worker threads + public ParallelBackgroundWorker(string name) : + this(int.MaxValue) { + threadName = name; + } + + /// + /// Initializes a new parallel background worker that uses the specified name for + /// its worker threads and running the specified number of tasks in parallel. + /// + /// Name that will be assigned to the worker threads + /// + /// Number of tasks to run in parallel (if positive) or number of CPU cores to leave + /// unused (if negative). + /// + /// + /// If a negative number of threads is used, at least one thread will be always + /// be created, so specifying -2 on a single-core system will still occupy + /// the only core. + /// + public ParallelBackgroundWorker(string name, int threadCount) : + this(threadCount) { + threadName = name; + } + + /// Immediately releases all resources owned by the instance + public void Dispose() { + if(this.threadTerminatedEvent != null) { + CancelPendingTasks(); + CancelRunningTasks(); + Join(); + + this.threadTerminatedEvent.Dispose(); + this.threadTerminatedEvent = null; + } + if(this.cancellationTokenSource != null) { + this.cancellationTokenSource.Dispose(); + this.cancellationTokenSource = null; + } + } + + + /// Adds a task for processing by the background worker threads + /// Task that will be processed in the background + public void AddTask(TTask task) { + if(this.cancellationTokenSource.IsCancellationRequested) { + return; + } + + lock(this.queueSynchronizationRoot) { + this.tasks.Enqueue(task); + + if(this.runningThreads.Count < this.threadCount) { + //Task newThread = new Task(this.runQueuedTasksInThreadDelegate, ); + } + } + + // Thread 1: + // lock() { + // - take task + // - or deregister and exit + // } + // + // Thread 2: + // lock() { + // - put task + // - if too few threads, register and add + // } + + } + + /// Cancels all tasks that are currently executing + public void CancelRunningTasks() { + this.cancellationTokenSource.Cancel(); + } + + /// Cancels all queued tasks waiting to be executed + public void CancelPendingTasks() { + lock(this.queueSynchronizationRoot) { + this.tasks.Clear(); + } + } + + /// Waits until all executing and queued tasks have been processed + public void Join() { + while(this.runningThreads.Count > 0) { + this.threadTerminatedEvent.WaitOne(); + } + } + + /// Called in a thread to execute a single task + /// Task that should be executed + /// + /// Cancellation token through which the method can be signalled to cancel + /// + protected abstract void Run(TTask task, CancellationToken cancellationToken); + + /// + /// Runs queued tasks of the parallel background worker until the queue is empty + /// + /// Threading task in which this worker is running + private void runQueuedTasksInThread(object thisTaskAsObject) { + string previousThreadName = null; + if(!string.IsNullOrEmpty(this.threadName)) { + previousThreadName = Thread.CurrentThread.Name; + Thread.CurrentThread.Name = this.threadName; + } + try { +#if false + TTask task; + while(this.tasks.TryDequeue(out task)) { + try { + Run(task, this.cancellationTokenSource.Token); + } + catch(Exception exception) { + this.exceptions.Add(exception); + } + } + + lock(this.runningThreads) { + this.runningThreads.Remove((Task)thisTaskAsObject); + } + this.threadTerminatedEvent.Set(); +#endif + } + finally { + if(!string.IsNullOrEmpty(this.threadName)) { + Thread.CurrentThread.Name = previousThreadName; + } + } + } + + /// + /// Name that will be assigned to the worker threads while they're processing tasks + /// for the parallel background worker + /// + private string threadName; + /// Number of threads to use simultaneously + private int threadCount; + + /// Synchronization root used to access the task queue and thread list + /// + /// + /// Both lists are intentionally using a shared synchronization root because this will + /// guarantee that there's no race condition between adding work and a thread finishing: + /// + /// + /// Main thread + /// + /// lock(this.queueSynchronizationRoot) { + /// this.tasks.Add(newTask); + /// + /// if(this.runningThreads.Count < maximumThreadCount) { + /// startAdditionalTaskThread(); + /// } + /// } + /// + /// + /// + /// Task thread + /// + /// TTask myTask; + /// lock(this.queueSynchronizationRoot) { + /// this.tasks.TryGet(out myTask); + /// + /// // RACE CONDITION WITHOUT LOCK: if this wasn't inside the same lock as the task + /// // checking, the main thread might check at this point, see that there are already + /// // enough threads running! + /// if(myTask == null) { + /// this.runningThreads.Remove(thisThread); + /// return; + /// } + /// } + /// + /// + /// + /// The race condition mentioned above could completely stall the background worker by + /// leaving tasks in the queue but no threads active to execute them, so it is vital to + /// use this method of synchronization! + /// + /// + /// + private object queueSynchronizationRoot; + + /// Delegate for the runQueuedTasksInThread() method + private Action runQueuedTasksInThreadDelegate; + /// Tasks remaining to be processed + private Queue tasks; + /// Threads that are currently executing tasks + private IList runningThreads; + + // Idea: + // private int runningThreadCount; + // Before the task looks for new work, it will decrement this + // if the task gets new work, it will increment this again. + // - if it would be above threadCount now, put work back in the queue + // AddTask() increments this after pushing new work + // - if it would be above threadCount, do not create a new thread + + /// Exceptions that have occurred while executing tasks + private ConcurrentBag exceptions; + /// Event that is triggered whenever a task gets completed + private AutoResetEvent threadTerminatedEvent; + + /// + /// Provides a cancellation token that can be used to signal a thread to terminate + /// + private CancellationTokenSource cancellationTokenSource; + + } + +} // namespace Nuclex.Support