From feac2b9c89196c5e2b3983e626de5024568d0f9b Mon Sep 17 00:00:00 2001 From: Markus Ewald Date: Thu, 20 Feb 2014 14:33:40 +0000 Subject: [PATCH] Added unit tests for parallel background worker; fixed some issues with the parallel background worker; added failing unit test for almost equal checks with doubles; fixed a typo git-svn-id: file:///srv/devel/repo-conversion/nusu@291 d2e56fa2-650e-0410-a79f-9358c0239efd --- Nuclex.Support (net-4.0).csproj | 1 + Source/FloatHelper.Test.cs | 27 ++- Source/IntegerHelper.cs | 2 +- Source/ParallelBackgroundWorker.Test.cs | 239 ++++++++++++++++++++++++ Source/ParallelBackgroundWorker.cs | 106 +++++++---- 5 files changed, 334 insertions(+), 41 deletions(-) create mode 100644 Source/ParallelBackgroundWorker.Test.cs diff --git a/Nuclex.Support (net-4.0).csproj b/Nuclex.Support (net-4.0).csproj index c5593b1..9378f3d 100644 --- a/Nuclex.Support (net-4.0).csproj +++ b/Nuclex.Support (net-4.0).csproj @@ -215,6 +215,7 @@ LicenseKey.cs + CommandLine.cs diff --git a/Source/FloatHelper.Test.cs b/Source/FloatHelper.Test.cs index accf4d4..15a27c5 100644 --- a/Source/FloatHelper.Test.cs +++ b/Source/FloatHelper.Test.cs @@ -148,7 +148,12 @@ namespace Nuclex.Support { // If both are negative -> fine // If both are positive -> fine // If different -> Measure both distances to zero in ulps and sum them - public void NegativeZeroEqualsPositiveZero() { + /// + /// Verifies that the negative floating point zero is within one ulp of the positive + /// floating point zero and vice versa + /// + [Test] + public void NegativeZeroFloatEqualsPositiveZero() { float zero = 0.0f; float zeroPlusOneUlp = FloatHelper.ReinterpretAsFloat( FloatHelper.ReinterpretAsInt(zero) + 1 @@ -163,6 +168,26 @@ namespace Nuclex.Support { Assert.IsTrue(FloatHelper.AreAlmostEqual(zero, zeroMinusOneUlp, 1)); } + /// + /// Verifies that the negative double precision floating point zero is within one ulp + /// of the positive double precision floating point zero and vice versa + /// + [Test] + public void NegativeZeroDoubleEqualsPositiveZero() { + double zero = 0.0; + double zeroPlusOneUlp = FloatHelper.ReinterpretAsDouble( + FloatHelper.ReinterpretAsLong(zero) + 1 + ); + double zeroMinusOneUlp = -zeroPlusOneUlp; + + bool test = FloatHelper.AreAlmostEqual(zeroMinusOneUlp, zeroPlusOneUlp, 1); + + Assert.IsFalse(FloatHelper.AreAlmostEqual(zero, zeroPlusOneUlp, 0)); + Assert.IsTrue(FloatHelper.AreAlmostEqual(zero, zeroPlusOneUlp, 1)); + Assert.IsFalse(FloatHelper.AreAlmostEqual(zero, zeroMinusOneUlp, 0)); + Assert.IsTrue(FloatHelper.AreAlmostEqual(zero, zeroMinusOneUlp, 1)); + } + } } // namespace Nuclex.Support diff --git a/Source/IntegerHelper.cs b/Source/IntegerHelper.cs index e06f7c5..4ce1256 100644 --- a/Source/IntegerHelper.cs +++ b/Source/IntegerHelper.cs @@ -77,7 +77,7 @@ namespace Nuclex.Support { return value; } - /// Returns the number of bits set in an + /// Returns the number of bits set in an integer /// Value whose bits will be counted /// The number of bits set in the integer public static int CountBits(this int value) { diff --git a/Source/ParallelBackgroundWorker.Test.cs b/Source/ParallelBackgroundWorker.Test.cs new file mode 100644 index 0000000..f8835e1 --- /dev/null +++ b/Source/ParallelBackgroundWorker.Test.cs @@ -0,0 +1,239 @@ +#region CPL License +/* +Nuclex Framework +Copyright (C) 2002-2013 Nuclex Development Labs + +This library is free software; you can redistribute it and/or +modify it under the terms of the IBM Common Public License as +published by the IBM Corporation; either version 1.0 of the +License, or (at your option) any later version. + +This library is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +IBM Common Public License for more details. + +You should have received a copy of the IBM Common Public +License along with this library +*/ +#endregion + +using System; +using System.IO; + +#if UNITTEST + +using NUnit.Framework; +using System.Threading; +using System.Collections.Generic; + +namespace Nuclex.Support { + + /// Unit Test for the parallel background worker class + [TestFixture] + public class ParallelBackgroundWorkerTest { + + private class TestWorker : ParallelBackgroundWorker { + + /// Initializes a new parallel background worker with unlimited threads + public TestWorker() : base() { } + + /// + /// Initializes a new parallel background worker running the specified number + /// of tasks in parallel + /// + /// + /// Number of tasks to run in parallel (if positive) or number of CPU cores to leave + /// unused (if negative). + /// + /// + /// If a negative number of threads is used, at least one thread will be always + /// be created, so specifying -2 on a single-core system will still occupy + /// the only core. + /// + public TestWorker(int threadCount) : base(threadCount) {} + + /// + /// Initializes a new parallel background worker that uses the specified name for + /// its worker threads. + /// + /// Name that will be assigned to the worker threads + public TestWorker(string name) : base(name) {} + + /// + /// Initializes a new parallel background worker that uses the specified name for + /// its worker threads and running the specified number of tasks in parallel. + /// + /// Name that will be assigned to the worker threads + /// + /// Number of tasks to run in parallel (if positive) or number of CPU cores to leave + /// unused (if negative). + /// + /// + /// If a negative number of threads is used, at least one thread will be always + /// be created, so specifying -2 on a single-core system will still occupy + /// the only core. + /// + public TestWorker(string name, int threadCount) : base(name, threadCount) { } + + /// Called in a thread to execute a single task + /// Task that should be executed + /// + /// Cancellation token through which the method can be signalled to cancel + /// + protected override void Run(object task, CancellationToken cancellationToken) { + if(this.ThrowException) { + throw new Exception("Something went wrong"); + } + + if(this.WaitEvent != null) { + this.WaitEvent.WaitOne(); + } + + this.WasCancelled = cancellationToken.IsCancellationRequested; + + if(this.Tasks != null) { + this.Tasks.Add(task); + } + } + + public bool ThrowException; + public ManualResetEvent WaitEvent; + public bool WasCancelled; + public ICollection Tasks; + + } + + /// Verifies that the background worker has a default constructor + [Test] + public void CanBeDefaultConstructed() { + using(new TestWorker()) { } + } + + /// + /// Verifies that a background worker can be constructed that uses a fixed number + /// of threads + /// + [Test] + public void CanUseFixedNumberOfThreads() { + using(new TestWorker(4)) { } + } + + /// + /// Verifies that a background worker can be constructed that leaves free a fixed + /// number of CPU cores + /// + [Test] + public void CanPreserveFixedNumberOfCores() { + using(new TestWorker(-2)) { } + } + + /// + /// Verifies that a background worker can be constructed using a specific name + /// for its worker threads + /// + [Test] + public void CanUseNamedThreads() { + using(new TestWorker("Test Task Thread")) { } + } + + /// + /// Verifies that a background worker can be constructed that uses a fixed number + /// of threads using a specific name + /// + [Test] + public void CanUseFixedNumberOfNamedThreads() { + using(new TestWorker("Test Task Thread", 4)) { } + } + + /// + /// Verifies that a background worker can be constructed that leaves free a fixed + /// number of CPU cores and uses a specific name for its worker threads. + /// + [Test] + public void CanPreserveFixedNumberOfCoresAndUseNamedThreads() { + using(new TestWorker("Test Task Thread", -2)) { } + } + + /// + /// Verifies that exceptions happening inside the tasks are collected and re-thrown + /// in the Join() method. + /// + [Test] + public void ExceptionsAreReThrownInJoin() { + using(var testWorker = new TestWorker()) { + testWorker.ThrowException = true; + testWorker.AddTask(new object()); + testWorker.AddTask(new object()); + + Assert.Throws( + () => { + testWorker.Join(); + } + ); + + try { + testWorker.Join(); + Assert.Fail( + "Calling ParallelBackgroundWorker.Join() multiple times should re-throw " + + "exceptions multiple times" + ); + } + catch(AggregateException aggregateException) { + Assert.AreEqual(2, aggregateException.InnerExceptions.Count); + } + } + } + + /// + /// Verifies that tasks can be cancelled while they are running + /// + [Test] + public void TasksCanBeCancelled() { + using(var waitEvent = new ManualResetEvent(false)) { + using(var testWorker = new TestWorker()) { + testWorker.WaitEvent = waitEvent; + + testWorker.AddTask(new object()); + testWorker.CancelRunningTasks(); + + waitEvent.Set(); + + Assert.IsTrue(testWorker.Wait(1000)); + + Assert.IsTrue(testWorker.WasCancelled); + } + } // disposes waitEvent + } + + /// Verifies that calling Join() waits for all queued tasks + [Test] + public void JoinWaitsForQueuedTasks() { + var tasks = new List(100); + for(int index = 0; index < 100; ++index) { + tasks.Add(new object()); + } + + using(var waitEvent = new ManualResetEvent(false)) { + using(var testWorker = new TestWorker(2)) { + testWorker.WaitEvent = waitEvent; + testWorker.Tasks = new List(); + for(int index = 0; index < 100; ++index) { + testWorker.AddTask(tasks[index]); + } + + CollectionAssert.IsEmpty(testWorker.Tasks); + + waitEvent.Set(); + testWorker.Join(); + + CollectionAssert.AreEquivalent(tasks, testWorker.Tasks); + } + } // disposes waitEvent + } + + } + +} // namespace Nuclex.Support + +#endif // UNITTEST diff --git a/Source/ParallelBackgroundWorker.cs b/Source/ParallelBackgroundWorker.cs index 6dbed19..3572b69 100644 --- a/Source/ParallelBackgroundWorker.cs +++ b/Source/ParallelBackgroundWorker.cs @@ -17,6 +17,12 @@ namespace Nuclex.Support { public static readonly int Processors = Environment.ProcessorCount; #endif + /// + /// Timeout after which Dispose() will stop waiting for unfinished tasks and + /// free resources anyway + /// + private static readonly int DefaultDisposeTimeoutMilliseconds = 1500; // milliseconds + /// Initializes a new parallel background worker with unlimited threads public ParallelBackgroundWorker() : this(int.MaxValue) { } @@ -40,11 +46,12 @@ namespace Nuclex.Support { threadCount = Math.Max(1, Processors + threadCount); } - this.runQueuedTasksInThreadDelegate = new Action(runQueuedTasksInThread); - this.runningThreads = new List(); + this.queueSynchronizationRoot = new object(); + this.runQueuedTasksInThreadDelegate = new Action(runQueuedTasksInThread); this.tasks = new Queue(); this.threadTerminatedEvent = new AutoResetEvent(false); this.cancellationTokenSource = new CancellationTokenSource(); + this.exceptions = new ConcurrentBag(); } /// @@ -81,7 +88,8 @@ namespace Nuclex.Support { if(this.threadTerminatedEvent != null) { CancelPendingTasks(); CancelRunningTasks(); - Join(); + + Wait(DefaultDisposeTimeoutMilliseconds); this.threadTerminatedEvent.Dispose(); this.threadTerminatedEvent = null; @@ -92,7 +100,6 @@ namespace Nuclex.Support { } } - /// Adds a task for processing by the background worker threads /// Task that will be processed in the background public void AddTask(TTask task) { @@ -100,26 +107,28 @@ namespace Nuclex.Support { return; } + bool needNewThread; + lock(this.queueSynchronizationRoot) { this.tasks.Enqueue(task); - if(this.runningThreads.Count < this.threadCount) { - //Task newThread = new Task(this.runQueuedTasksInThreadDelegate, ); + needNewThread = (this.runningThreadCount < this.threadCount); + if(needNewThread) { + ++this.runningThreadCount; } } - // Thread 1: - // lock() { - // - take task - // - or deregister and exit - // } - // - // Thread 2: - // lock() { - // - put task - // - if too few threads, register and add - // } - + if(needNewThread) { + Task newThread = new Task( + this.runQueuedTasksInThreadDelegate, + // this.cancellationTokenSource.Token, // DO NOT PASS THIS! + // Passing the cancellation token makes tasks that have been queued but + // not started yet cease to execute at all - meaning our runningThreadCount + // goes out of sync and Dispose() / Wait() / Join() sit around endlessly! + TaskCreationOptions.LongRunning + ); + newThread.Start(); + } } /// Cancels all tasks that are currently executing @@ -134,11 +143,36 @@ namespace Nuclex.Support { } } - /// Waits until all executing and queued tasks have been processed + /// + /// Waits until all executing and queued tasks have been processed and throws an + /// exception if any errors have occurred + /// public void Join() { - while(this.runningThreads.Count > 0) { + while(Thread.VolatileRead(ref this.runningThreadCount) > 0) { this.threadTerminatedEvent.WaitOne(); } + + if(this.exceptions.Count > 0) { + throw new AggregateException(this.exceptions); + } + } + + /// + /// Waits until all executing and queued tasks have been processed or + /// the timeout is reached + /// + /// Milliseconds after which the wait times out + /// + /// True if all tasks have been processed, false if the timeout was reached + /// + public bool Wait(int timeoutMilliseconds) { + while(Thread.VolatileRead(ref this.runningThreadCount) > 0) { + if(this.threadTerminatedEvent.WaitOne(timeoutMilliseconds) == false) { + return false; + } + } + + return true; } /// Called in a thread to execute a single task @@ -151,17 +185,23 @@ namespace Nuclex.Support { /// /// Runs queued tasks of the parallel background worker until the queue is empty /// - /// Threading task in which this worker is running - private void runQueuedTasksInThread(object thisTaskAsObject) { + private void runQueuedTasksInThread() { string previousThreadName = null; if(!string.IsNullOrEmpty(this.threadName)) { previousThreadName = Thread.CurrentThread.Name; Thread.CurrentThread.Name = this.threadName; } try { -#if false - TTask task; - while(this.tasks.TryDequeue(out task)) { + for(;;) { + TTask task; + lock(this.queueSynchronizationRoot) { + if(this.tasks.Count == 0) { + --this.runningThreadCount; + break; + } + task = this.tasks.Dequeue(); + } + try { Run(task, this.cancellationTokenSource.Token); } @@ -170,11 +210,7 @@ namespace Nuclex.Support { } } - lock(this.runningThreads) { - this.runningThreads.Remove((Task)thisTaskAsObject); - } this.threadTerminatedEvent.Set(); -#endif } finally { if(!string.IsNullOrEmpty(this.threadName)) { @@ -236,19 +272,11 @@ namespace Nuclex.Support { private object queueSynchronizationRoot; /// Delegate for the runQueuedTasksInThread() method - private Action runQueuedTasksInThreadDelegate; + private Action runQueuedTasksInThreadDelegate; /// Tasks remaining to be processed private Queue tasks; /// Threads that are currently executing tasks - private IList runningThreads; - - // Idea: - // private int runningThreadCount; - // Before the task looks for new work, it will decrement this - // if the task gets new work, it will increment this again. - // - if it would be above threadCount now, put work back in the queue - // AddTask() increments this after pushing new work - // - if it would be above threadCount, do not create a new thread + private int runningThreadCount; /// Exceptions that have occurred while executing tasks private ConcurrentBag exceptions;