#region CPL License
/*
Nuclex Framework
Copyright (C) 2002-2017 Nuclex Development Labs
This library is free software; you can redistribute it and/or
modify it under the terms of the IBM Common Public License as
published by the IBM Corporation; either version 1.0 of the
License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
IBM Common Public License for more details.
You should have received a copy of the IBM Common Public
License along with this library
*/
#endregion
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
namespace Nuclex.Support {
/// Alternative Thread pool providing one thread for each core
///
///
/// Unlike the normal thread pool, the affine thread pool provides only as many
/// threads as there are CPU cores available on the current platform. This makes
/// it more suitable for tasks you want to spread across all available cpu cores
/// explicitly.
///
///
/// However, it's not a good match if you want to run blocking or waiting tasks
/// inside the thread pool because the limited available threads will become
/// congested quickly. It is encouraged to use this class in parallel with
/// .NET's own thread pool, putting tasks that can block into the .NET thread
/// pool and tasks that perform pure processing into the affine thread pool.
///
///
/// Implementation based on original code provided by Stephen Toub
/// (stoub at microsoft ignorethis dot com)
///
///
public static class AffineThreadPool {
/// Number of CPU cores available on the system
public static readonly int Processors = Environment.ProcessorCount;
/// Delegate used by the thread pool to report unhandled exceptions
/// Exception that has not been handled
public delegate void ExceptionDelegate(Exception exception);
#region class UserWorkItem
/// Used to hold a callback delegate and the state for that delegate.
private struct UserWorkItem {
/// Initialize the callback holding object.
/// Callback delegate for the callback.
/// State with which to call the callback delegate.
public UserWorkItem(WaitCallback callback, object state) {
this.Callback = callback;
this.State = state;
}
/// Callback delegate for the callback.
public WaitCallback Callback;
/// State with which to call the callback delegate.
public object State;
}
#endregion // class UserWorkItem
/// Initializes the thread pool
static AffineThreadPool() {
// Create our thread stores; we handle synchronization ourself
// as we may run into situations where multiple operations need to be atomic.
// We keep track of the threads we've created just for good measure; not actually
// needed for any core functionality.
workAvailable = new System.Threading.Semaphore(0, int.MaxValue);
userWorkItems = new Queue(Processors * 4);
workerThreads = new List(Processors);
inUseThreads = 0;
// We can use all cores on a PC, starting from index 1
hardwareThreads = new Queue(Processors);
for(int core = Processors; core >= 1; --core) {
hardwareThreads.Enqueue(core);
}
// Create all of the worker threads
for(int index = 0; index < Processors; index++) {
// Create a new thread and add it to the list of threads.
Thread newThread = new Thread(new ThreadStart(ProcessQueuedItems));
workerThreads.Add(newThread);
// Configure the new thread and start it
newThread.Name = "Nuclex.Support.AffineThreadPool Thread #" + index.ToString();
newThread.IsBackground = true;
newThread.Start();
}
}
/// Queues a user work item to the thread pool
///
/// A WaitCallback representing the delegate to invoke when a thread in the
/// thread pool picks up the work item
///
public static void QueueUserWorkItem(WaitCallback callback) {
// Queue the delegate with no state
QueueUserWorkItem(callback, null);
}
/// Queues a user work item to the thread pool.
///
/// A WaitCallback representing the delegate to invoke when a thread in the
/// thread pool picks up the work item
///
///
/// The object that is passed to the delegate when serviced from the thread pool
///
public static void QueueUserWorkItem(WaitCallback callback, object state) {
// Create a waiting callback that contains the delegate and its state.
// Add it to the processing queue, and signal that data is waiting.
UserWorkItem waiting = new UserWorkItem(callback, state);
lock(userWorkItems) {
userWorkItems.Enqueue(waiting);
}
// Wake up one of the worker threads so this task will be processed
workAvailable.Release();
}
/// Gets the number of threads at the disposal of the thread pool
public static int MaxThreads { get { return Processors; } }
/// Gets the number of currently active threads in the thread pool
public static int ActiveThreads { get { return inUseThreads; } }
///
/// Gets the number of callback delegates currently waiting in the thread pool
///
public static int WaitingWorkItems {
get {
lock(userWorkItems) {
return userWorkItems.Count;
}
}
}
///
/// Default handler used to respond to unhandled exceptions in ThreadPool threads
///
/// Exception that has occurred
internal static void DefaultExceptionHandler(Exception exception) {
throw exception;
}
#if WINDOWS
/// Retrieves the ProcessThread for the calling thread
/// The ProcessThread for the calling thread
internal static ProcessThread GetProcessThread(int threadId) {
ProcessThreadCollection threads = Process.GetCurrentProcess().Threads;
for(int index = 0; index < threads.Count; ++index) {
if(threads[index].Id == threadId) {
return threads[index];
}
}
return null;
}
#endif
/// A thread worker function that processes items from the work queue
private static void ProcessQueuedItems() {
// Get the system/hardware thread index this thread is going to use. We hope that
// the threads more or less start after each other, but there is no guarantee that
// tasks will be handled by the CPU cores in the order the queue was filled with.
// This could be added, though, by using a WaitHandle so the thread creator could
// wait for each thread to take one entry out of the queue.
int hardwareThreadIndex;
lock(hardwareThreads) {
hardwareThreadIndex = hardwareThreads.Dequeue();
}
#if WINDOWS
if(Environment.OSVersion.Platform == PlatformID.Win32NT) {
// Prevent this managed thread from impersonating another system thread.
// In .NET, managed threads can supposedly be moved to different system threads
// and, more worryingly, even fibers. This should make sure we're sitting on
// a normal system thread and stay with that thread during our lifetime.
Thread.BeginThreadAffinity();
// Assign the ideal processor, but don't force it. It's not a good idea to
// circumvent the thread scheduler of a desktop machine, so we try to play nice.
int threadId = GetCurrentThreadId();
ProcessThread thread = GetProcessThread(threadId);
if(thread != null) {
thread.IdealProcessor = hardwareThreadIndex;
}
}
#endif
// Keep processing tasks indefinitely
for(; ; ) {
UserWorkItem workItem = getNextWorkItem();
// Execute the work item we just picked up. Make sure to accurately
// record how many callbacks are currently executing.
Interlocked.Increment(ref inUseThreads);
try {
workItem.Callback(workItem.State);
}
catch(Exception exception) { // Make sure we don't throw here.
ExceptionDelegate exceptionHandler = ExceptionHandler;
if(exceptionHandler != null) {
exceptionHandler(exception);
}
}
finally {
Interlocked.Decrement(ref inUseThreads);
}
}
}
/// Obtains the next work item from the queue
/// The next work item in the queue
///
/// If the queue is empty, the call will block until an item is added to
/// the queue and the calling thread was the one picking it up.
///
private static UserWorkItem getNextWorkItem() {
// Get the next item in the queue. If there is nothing there, go to sleep
// for a while until we're woken up when a callback is waiting.
for(; ; ) {
// Try to get the next callback available. We need to lock on the
// queue in order to make our count check and retrieval atomic.
lock(userWorkItems) {
if(userWorkItems.Count > 0) {
return userWorkItems.Dequeue();
}
}
// If we can't get one, go to sleep. The semaphore blocks until work
// becomes available (then acting like an AutoResetEvent that counts
// how often it has been triggered and letting that number of threads
// pass through.)
workAvailable.WaitOne();
}
}
/// Delegate used to handle assertion checks in the code
public static volatile ExceptionDelegate ExceptionHandler = DefaultExceptionHandler;
#if WINDOWS
/// Retrieves the calling thread's thread id
/// The thread is of the calling thread
[DllImport("kernel32.dll")]
internal static extern int GetCurrentThreadId();
#endif
/// Available hardware threads the thread pool threads pick from
private static Queue hardwareThreads;
/// Queue of all the callbacks waiting to be executed.
private static Queue userWorkItems;
///
/// Used to let the threads in the thread pool wait for new work to appear.
///
private static System.Threading.Semaphore workAvailable;
/// List of all worker threads at the disposal of the thread pool.
private static List workerThreads;
/// Number of threads currently active.
private static int inUseThreads;
}
} // namespace Nuclex.Support