#region CPL License /* Nuclex Framework Copyright (C) 2002-2017 Nuclex Development Labs This library is free software; you can redistribute it and/or modify it under the terms of the IBM Common Public License as published by the IBM Corporation; either version 1.0 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the IBM Common Public License for more details. You should have received a copy of the IBM Common Public License along with this library */ #endregion #if !NO_CONCURRENT_COLLECTIONS using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; namespace Nuclex.Support { /// Processes tasks in parallel using many threads /// Type of tasks the class will process public abstract class ParallelBackgroundWorker : IDisposable { /// Number of CPU cores available on the system public static readonly int ProcessorCount = Environment.ProcessorCount; /// /// Timeout after which Dispose() will stop waiting for unfinished tasks and /// free resources anyway /// private static readonly int DefaultDisposeTimeoutMilliseconds = 1500; // milliseconds /// Initializes a new parallel background worker with unlimited threads public ParallelBackgroundWorker() : this(int.MaxValue) { } /// /// Initializes a new parallel background worker running the specified number /// of tasks in parallel /// /// /// Number of tasks to run in parallel (if positive) or number of CPU cores to leave /// unused (if negative). /// /// /// If a negative number of threads is used, at least one thread will be always /// be created, so specifying -2 on a single-core system will still occupy /// the only core. /// public ParallelBackgroundWorker(int threadCount) { if(threadCount > 0) { this.threadCount = threadCount; } else { threadCount = Math.Max(1, ProcessorCount + threadCount); } this.queueSynchronizationRoot = new object(); this.runQueuedTasksInThreadDelegate = new Action(runQueuedTasksInThread); this.tasks = new Queue(); this.threadTerminatedEvent = new AutoResetEvent(false); this.cancellationTokenSource = new CancellationTokenSource(); this.exceptions = new ConcurrentBag(); } /// /// Initializes a new parallel background worker that uses the specified name for /// its worker threads. /// /// Name that will be assigned to the worker threads public ParallelBackgroundWorker(string name) : this(int.MaxValue) { threadName = name; } /// /// Initializes a new parallel background worker that uses the specified name for /// its worker threads and running the specified number of tasks in parallel. /// /// Name that will be assigned to the worker threads /// /// Number of tasks to run in parallel (if positive) or number of CPU cores to leave /// unused (if negative). /// /// /// If a negative number of threads is used, at least one thread will be always /// be created, so specifying -2 on a single-core system will still occupy /// the only core. /// public ParallelBackgroundWorker(string name, int threadCount) : this(threadCount) { threadName = name; } /// Immediately releases all resources owned by the instance public void Dispose() { if(this.threadTerminatedEvent != null) { CancelPendingTasks(); CancelRunningTasks(); Wait(DefaultDisposeTimeoutMilliseconds); this.threadTerminatedEvent.Dispose(); this.threadTerminatedEvent = null; } lock(this.queueSynchronizationRoot) { if(this.cancellationTokenSource != null) { this.cancellationTokenSource.Dispose(); this.cancellationTokenSource = null; } } } /// Adds a task for processing by the background worker threads /// Task that will be processed in the background public void AddTask(TTask task) { if(this.cancellationTokenSource.IsCancellationRequested) { return; } bool needNewThread; lock(this.queueSynchronizationRoot) { this.tasks.Enqueue(task); needNewThread = (this.runningThreadCount < this.threadCount); if(needNewThread) { ++this.runningThreadCount; } } if(needNewThread) { Task newThread = new Task( this.runQueuedTasksInThreadDelegate, // this.cancellationTokenSource.Token, // DO NOT PASS THIS! // Passing the cancellation token makes tasks that have been queued but // not started yet cease to execute at all - meaning our runningThreadCount // goes out of sync and Dispose() / Wait() / Join() sit around endlessly! TaskCreationOptions.LongRunning ); newThread.Start(); } } /// Cancels all tasks that are currently executing /// /// It is valid to call this method after Dispose() /// public void CancelRunningTasks() { lock(this.queueSynchronizationRoot) { if(this.cancellationTokenSource != null) { this.cancellationTokenSource.Cancel(); } } } /// Cancels all queued tasks waiting to be executed /// /// It is valid to call this method after Dispose() /// public void CancelPendingTasks() { lock(this.queueSynchronizationRoot) { this.tasks.Clear(); } } /// /// Waits until all executing and queued tasks have been processed and throws an /// exception if any errors have occurred /// public void Join() { while(Thread.VolatileRead(ref this.runningThreadCount) > 0) { this.threadTerminatedEvent.WaitOne(); } if(this.exceptions.Count > 0) { throw new AggregateException(this.exceptions); } } /// /// Waits until all executing and queued tasks have been processed or /// the timeout is reached /// /// Milliseconds after which the wait times out /// /// True if all tasks have been processed, false if the timeout was reached /// public bool Wait(int timeoutMilliseconds) { // Wait until the task queue has become empty while(queuedTaskCount > 0) { if(this.threadTerminatedEvent.WaitOne(timeoutMilliseconds) == false) { return false; } } // Now wait until all running tasks have finished while(Thread.VolatileRead(ref this.runningThreadCount) > 0) { if(this.threadTerminatedEvent.WaitOne(timeoutMilliseconds) == false) { return false; } } return true; } /// Called in a thread to execute a single task /// Task that should be executed /// /// Cancellation token through which the method can be signalled to cancel /// protected abstract void Run(TTask task, CancellationToken cancellationToken); /// /// Runs queued tasks of the parallel background worker until the queue is empty /// private void runQueuedTasksInThread() { string previousThreadName = null; if(!string.IsNullOrEmpty(this.threadName)) { previousThreadName = Thread.CurrentThread.Name; Thread.CurrentThread.Name = this.threadName; } try { for(;;) { TTask task; lock(this.queueSynchronizationRoot) { if(this.tasks.Count == 0) { --this.runningThreadCount; break; } task = this.tasks.Dequeue(); } try { Run(task, this.cancellationTokenSource.Token); } catch(Exception exception) { this.exceptions.Add(exception); } } this.threadTerminatedEvent.Set(); } finally { if(!string.IsNullOrEmpty(this.threadName)) { Thread.CurrentThread.Name = previousThreadName; } } } /// Number of task still waiting to be executed private int queuedTaskCount { get { lock(this.queueSynchronizationRoot) { return this.tasks.Count; } } } /// /// Name that will be assigned to the worker threads while they're processing tasks /// for the parallel background worker /// private string threadName; /// Number of threads to use simultaneously private int threadCount; /// Synchronization root used to access the task queue and thread list /// /// /// Both lists are intentionally using a shared synchronization root because this will /// guarantee that there's no race condition between adding work and a thread finishing: /// /// /// Main thread /// /// lock(this.queueSynchronizationRoot) { /// this.tasks.Add(newTask); /// /// if(this.runningThreads.Count < maximumThreadCount) { /// startAdditionalTaskThread(); /// } /// } /// /// /// /// Task thread /// /// TTask myTask; /// lock(this.queueSynchronizationRoot) { /// this.tasks.TryGet(out myTask); /// /// // RACE CONDITION WITHOUT LOCK: if this wasn't inside the same lock as the task /// // checking, the main thread might check at this point, see that there are already /// // enough threads running! /// if(myTask == null) { /// this.runningThreads.Remove(thisThread); /// return; /// } /// } /// /// /// /// The race condition mentioned above could completely stall the background worker by /// leaving tasks in the queue but no threads active to execute them, so it is vital to /// use this method of synchronization! /// /// /// private object queueSynchronizationRoot; /// Delegate for the runQueuedTasksInThread() method private Action runQueuedTasksInThreadDelegate; /// Tasks remaining to be processed private Queue tasks; /// Threads that are currently executing tasks private int runningThreadCount; /// Exceptions that have occurred while executing tasks private ConcurrentBag exceptions; /// Event that is triggered whenever a task gets completed private AutoResetEvent threadTerminatedEvent; /// /// Provides a cancellation token that can be used to signal a thread to terminate /// private CancellationTokenSource cancellationTokenSource; } } // namespace Nuclex.Support #endif // !NO_CONCURRENT_COLLECTIONS