From 1eb681208ce9a82a91496cadbd2a055a975490b0 Mon Sep 17 00:00:00 2001 From: Calthron Date: Tue, 10 Nov 2015 09:14:45 -0800 Subject: [PATCH 01/10] Add ability to enqueue a task that does not require monitoring a return value Add the ability to enqueue a task that does not require monitoring a return value. std::future on destruction is expected to wait() for a thread to complete before completing destruction (btw, MSVS2103 std::future/std::ansync does not behave according to C++ Standard). If a temporary future is created (not captured), the calling thread should block waiting for the thread to exit and complete the promise. An alternate enqueueAndDetach method was added to allow for invocations that don't require the capturing of the std::future. --- ThreadPool.h | 205 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 130 insertions(+), 75 deletions(-) diff --git a/ThreadPool.h b/ThreadPool.h index 4183203..4fd288a 100644 --- a/ThreadPool.h +++ b/ThreadPool.h @@ -1,5 +1,8 @@ -#ifndef THREAD_POOL_H -#define THREAD_POOL_H +// Purpose: Thread pool + +// Based on https://github.com/progschj/ThreadPool + +#pragma once #include #include @@ -11,88 +14,140 @@ #include #include -class ThreadPool { -public: - ThreadPool(size_t); - template - auto enqueue(F&& f, Args&&... args) - -> std::future::type>; - ~ThreadPool(); -private: - // need to keep track of threads so we can join them - std::vector< std::thread > workers; - // the task queue - std::queue< std::function > tasks; - - // synchronization - std::mutex queue_mutex; - std::condition_variable condition; - bool stop; +class ThreadPool final +{ + public: + ThreadPool (size_t threads); + ~ThreadPool (); + + // Enqueue task and return std::future<> + template + auto enqueue (F&& f, Args&&... args) + -> std::future::type>; + + // Enqueue task with checked return value + template + auto enqueueAndDetach (F&& f, Args&&... args) + -> void; + + private: + // Keep track of threads, so they can be joined + std::vector workers; + // Task queue + std::queue> tasks; + + // Synchronization + using Lock = std::unique_lock; + std::mutex queue_mutex; + std::condition_variable condition; + bool stop = false; }; - + // the constructor just launches some amount of workers -inline ThreadPool::ThreadPool(size_t threads) - : stop(false) +ThreadPool::ThreadPool (size_t threads) { - for(size_t i = 0;i task; - - { - std::unique_lock lock(this->queue_mutex); - this->condition.wait(lock, - [this]{ return this->stop || !this->tasks.empty(); }); - if(this->stop && this->tasks.empty()) - return; - task = std::move(this->tasks.front()); - this->tasks.pop(); - } - - task(); - } - } - ); + for (size_t i = 0; i < threads; ++i) + { + // Worker execution loop + workers.emplace_back + ( + [this] + { + for(;;) + { + // Task to execute + std::function task; + + // Wait for additional work signal + { // Critical section + // Wait to be notified of work + Lock lock (this->queue_mutex); + this->condition.wait + ( + lock, + [this](){ return this->stop || !this->tasks.empty (); } + ); + + // Exit the thread if stopping and no work remains + if(this->stop && this->tasks.empty ()) + return; + + // Dequeue the next task + task = std::move (this->tasks.front ()); + this->tasks.pop (); + } // End critical section + + // Execute + task (); + } + } + ); + } } -// add new work item to the pool +// Add a new work item to the pool template -auto ThreadPool::enqueue(F&& f, Args&&... args) - -> std::future::type> +auto ThreadPool::enqueue (F&& f, Args&&... args) + -> std::future::type> { - using return_type = typename std::result_of::type; - - auto task = std::make_shared< std::packaged_task >( - std::bind(std::forward(f), std::forward(args)...) - ); - - std::future res = task->get_future(); - { - std::unique_lock lock(queue_mutex); - - // don't allow enqueueing after stopping the pool - if(stop) - throw std::runtime_error("enqueue on stopped ThreadPool"); - - tasks.emplace([task](){ (*task)(); }); - } - condition.notify_one(); - return res; + using return_type = typename std::result_of::type; + + auto task = std::make_shared> + ( + std::bind(std::forward(f), std::forward(args)...) + ); + + std::future result = task->get_future(); + + { // Critical section + Lock lock (queue_mutex); + + // Don't allow an enqueue after stopping + if (stop) + throw std::runtime_error ("enqueue on stopped ThreadPool"); + + // Push work back on the queue + tasks.emplace ([task](){ (*task)(); }); + } // End critical section + + // Notify a thread that there is new work to perform + condition.notify_one (); + return result; } -// the destructor joins all threads -inline ThreadPool::~ThreadPool() +// Add a new work item to the pool +template +auto ThreadPool::enqueueAndDetach (F&& f, Args&&... args) + -> void { - { - std::unique_lock lock(queue_mutex); - stop = true; - } - condition.notify_all(); - for(std::thread &worker: workers) - worker.join(); + { // Critical section + Lock lock (queue_mutex); + + // Don't allow an enqueue after stopping + if (stop) + throw std::runtime_error ("enqueue on stopped ThreadPool"); + + // Push work back on the queue + tasks.emplace (std::bind(std::forward(f), std::forward(args)...)); + } // End critical section + + // Notify a thread that there is new work to perform + condition.notify_one (); } -#endif +// Destructor joins all worker threads +ThreadPool::~ThreadPool () +{ + { // Critical section + Lock lock (queue_mutex); + stop = true; + } // End critical section + + condition.notify_all (); + + // Wait for threads to complete work + for (std::thread &worker : workers) + { + if (worker.joinable ()) { worker.join(); } + } +} From a37e6136010ca1cb486510144b0431fdfe44bcf5 Mon Sep 17 00:00:00 2001 From: Calthron Date: Tue, 10 Nov 2015 09:18:15 -0800 Subject: [PATCH 02/10] Typo in comment Typo in comment --- ThreadPool.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ThreadPool.h b/ThreadPool.h index 4fd288a..482d32a 100644 --- a/ThreadPool.h +++ b/ThreadPool.h @@ -25,7 +25,7 @@ class ThreadPool final auto enqueue (F&& f, Args&&... args) -> std::future::type>; - // Enqueue task with checked return value + // Enqueue task without requiring a return value template auto enqueueAndDetach (F&& f, Args&&... args) -> void; From e463df9f059f151291235b0f2d77f635be2bb714 Mon Sep 17 00:00:00 2001 From: Steven LeMay Date: Wed, 11 Nov 2015 08:52:55 -0800 Subject: [PATCH 03/10] Break into header and implementation Break the source into header and implementation, make the template definition more readable by changing F to Callable --- ThreadPool.cpp | 61 +++++++++++++++++ ThreadPool.h | 182 ++++++++++++++++++------------------------------- 2 files changed, 126 insertions(+), 117 deletions(-) create mode 100644 ThreadPool.cpp diff --git a/ThreadPool.cpp b/ThreadPool.cpp new file mode 100644 index 0000000..7923ac6 --- /dev/null +++ b/ThreadPool.cpp @@ -0,0 +1,61 @@ +// Purpose: Thread pool + +// Based on https://github.com/progschj/ThreadPool + +#include "ThreadPool.h" + +ThreadPool::ThreadPool (size_t threads) +{ + workers.reserve (threads); + + for (size_t count = 0; count < threads; ++count) + { + // Worker execution loop + workers.emplace_back ([this]() + { + for (;;) + { + // Task to execute + std::function task; + + // Wait for additional work signal + { // Critical section + // Wait to be notified of work + Lock lock (this->queue_mutex); + this->condition.wait (lock, [this]() + { + return this->stop || !this->tasks.empty (); + }); + + // Exit the thread if stopping and no work remains + if (this->stop && this->tasks.empty ()) + return; + + // Dequeue the next task + task = std::move (this->tasks.front ()); + this->tasks.pop (); + } // End critical section + + // Execute + task (); + } + }); + } +} + +// Destructor joins all worker threads +ThreadPool::~ThreadPool () +{ + { // Critical section + Lock lock (queue_mutex); + stop = true; + } // End critical section + + condition.notify_all (); + + // Wait for threads to complete work + for (std::thread &worker : workers) + { + if (worker.joinable ()) { worker.join(); } + } +} diff --git a/ThreadPool.h b/ThreadPool.h index 482d32a..f6fdfb2 100644 --- a/ThreadPool.h +++ b/ThreadPool.h @@ -16,138 +16,86 @@ class ThreadPool final { - public: - ThreadPool (size_t threads); - ~ThreadPool (); - - // Enqueue task and return std::future<> - template - auto enqueue (F&& f, Args&&... args) - -> std::future::type>; - - // Enqueue task without requiring a return value - template - auto enqueueAndDetach (F&& f, Args&&... args) - -> void; - - private: - // Keep track of threads, so they can be joined - std::vector workers; - // Task queue - std::queue> tasks; - - // Synchronization - using Lock = std::unique_lock; - std::mutex queue_mutex; - std::condition_variable condition; - bool stop = false; + public: + // Launches specified number of worker threads + ThreadPool (size_t threads); + ~ThreadPool (); + + // Not copyable + ThreadPool (const ThreadPool &) = delete; + ThreadPool& operator= (const ThreadPool &) = delete; + + // Not moveable + ThreadPool (ThreadPool &&) = delete; + ThreadPool& operator= (const ThreadPool &&) = delete; + + // Enqueue task and return std::future<> + template + auto enqueue (Callable&& callable, Args&&... args) + -> std::future::type>; + + // Enqueue task without requiring an std::future<> + template + auto enqueueAndDetach (Callable&& callable, Args&&... args) + -> void; + + private: + // Keep track of threads, so they can be joined + std::vector workers; + // Task queue + std::queue> tasks; + + // Synchronization + using Lock = std::unique_lock; + std::mutex queue_mutex; + std::condition_variable condition; + bool stop = false; }; -// the constructor just launches some amount of workers -ThreadPool::ThreadPool (size_t threads) -{ - for (size_t i = 0; i < threads; ++i) - { - // Worker execution loop - workers.emplace_back - ( - [this] - { - for(;;) - { - // Task to execute - std::function task; - - // Wait for additional work signal - { // Critical section - // Wait to be notified of work - Lock lock (this->queue_mutex); - this->condition.wait - ( - lock, - [this](){ return this->stop || !this->tasks.empty (); } - ); - - // Exit the thread if stopping and no work remains - if(this->stop && this->tasks.empty ()) - return; - - // Dequeue the next task - task = std::move (this->tasks.front ()); - this->tasks.pop (); - } // End critical section - - // Execute - task (); - } - } - ); - } -} - // Add a new work item to the pool -template -auto ThreadPool::enqueue (F&& f, Args&&... args) - -> std::future::type> +template +auto ThreadPool::enqueue (Callable&& callable, Args&&... args) + -> std::future::type> { - using return_type = typename std::result_of::type; + using return_t = typename std::result_of::type; + using task_t = std::packaged_task; - auto task = std::make_shared> - ( - std::bind(std::forward(f), std::forward(args)...) - ); - - std::future result = task->get_future(); + auto task = std::make_shared (std::bind (std::forward (callable), std::forward (args)...)); + std::future result = task->get_future(); - { // Critical section - Lock lock (queue_mutex); + { // Critical section + Lock lock (queue_mutex); - // Don't allow an enqueue after stopping - if (stop) - throw std::runtime_error ("enqueue on stopped ThreadPool"); + // Don't allow an enqueue after stopping + if (stop) + throw std::runtime_error ("enqueue on stopped ThreadPool"); - // Push work back on the queue - tasks.emplace ([task](){ (*task)(); }); - } // End critical section + // Push work back on the queue + tasks.emplace ([task](){ (*task)(); }); + } // End critical section - // Notify a thread that there is new work to perform - condition.notify_one (); - return result; + // Notify a thread that there is new work to perform + condition.notify_one (); + return result; } // Add a new work item to the pool -template -auto ThreadPool::enqueueAndDetach (F&& f, Args&&... args) - -> void +template +auto ThreadPool::enqueueAndDetach (Callable&& callable, Args&&... args) + -> void { - { // Critical section - Lock lock (queue_mutex); + { // Critical section + Lock lock (queue_mutex); - // Don't allow an enqueue after stopping - if (stop) - throw std::runtime_error ("enqueue on stopped ThreadPool"); + // Don't allow an enqueue after stopping + if (stop) + throw std::runtime_error ("enqueue on stopped ThreadPool"); - // Push work back on the queue - tasks.emplace (std::bind(std::forward(f), std::forward(args)...)); - } // End critical section + // Push work back on the queue + tasks.emplace (std::bind (std::forward (callable), std::forward (args)...)); + } // End critical section - // Notify a thread that there is new work to perform - condition.notify_one (); + // Notify a thread that there is new work to perform + condition.notify_one (); } -// Destructor joins all worker threads -ThreadPool::~ThreadPool () -{ - { // Critical section - Lock lock (queue_mutex); - stop = true; - } // End critical section - - condition.notify_all (); - - // Wait for threads to complete work - for (std::thread &worker : workers) - { - if (worker.joinable ()) { worker.join(); } - } -} From 16e0dbf9dce14944db95849889df044ebdf84a3e Mon Sep 17 00:00:00 2001 From: Steven LeMay Date: Wed, 11 Nov 2015 08:54:07 -0800 Subject: [PATCH 04/10] Rename ThreadPool.h to ThreadPool.hpp Rename ThreadPool.h to ThreadPool.hpp --- ThreadPool.cpp | 2 +- ThreadPool.h => ThreadPool.hpp | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename ThreadPool.h => ThreadPool.hpp (100%) diff --git a/ThreadPool.cpp b/ThreadPool.cpp index 7923ac6..e3c7379 100644 --- a/ThreadPool.cpp +++ b/ThreadPool.cpp @@ -2,7 +2,7 @@ // Based on https://github.com/progschj/ThreadPool -#include "ThreadPool.h" +#include "ThreadPool.hpp" ThreadPool::ThreadPool (size_t threads) { diff --git a/ThreadPool.h b/ThreadPool.hpp similarity index 100% rename from ThreadPool.h rename to ThreadPool.hpp From 78b440b6bf379e830c6411a98dad0b87847cc6db Mon Sep 17 00:00:00 2001 From: Steven LeMay Date: Wed, 11 Nov 2015 09:01:40 -0800 Subject: [PATCH 05/10] Comment update Comment update --- ThreadPool.hpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ThreadPool.hpp b/ThreadPool.hpp index f6fdfb2..c358149 100644 --- a/ThreadPool.hpp +++ b/ThreadPool.hpp @@ -34,7 +34,8 @@ class ThreadPool final auto enqueue (Callable&& callable, Args&&... args) -> std::future::type>; - // Enqueue task without requiring an std::future<> + // Enqueue task without requiring capture of std::future<> + // Note: Best not to let exceptions escape the callable template auto enqueueAndDetach (Callable&& callable, Args&&... args) -> void; From 9db68ccc69413f92bb0470d224693df5bfd945e2 Mon Sep 17 00:00:00 2001 From: Steven LeMay Date: Wed, 11 Nov 2015 09:37:02 -0800 Subject: [PATCH 06/10] Use typename instead of class Use typename instead of class --- ThreadPool.hpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ThreadPool.hpp b/ThreadPool.hpp index c358149..2a6e894 100644 --- a/ThreadPool.hpp +++ b/ThreadPool.hpp @@ -30,13 +30,13 @@ class ThreadPool final ThreadPool& operator= (const ThreadPool &&) = delete; // Enqueue task and return std::future<> - template + template auto enqueue (Callable&& callable, Args&&... args) -> std::future::type>; // Enqueue task without requiring capture of std::future<> // Note: Best not to let exceptions escape the callable - template + template auto enqueueAndDetach (Callable&& callable, Args&&... args) -> void; @@ -54,7 +54,7 @@ class ThreadPool final }; // Add a new work item to the pool -template +template auto ThreadPool::enqueue (Callable&& callable, Args&&... args) -> std::future::type> { @@ -81,7 +81,7 @@ auto ThreadPool::enqueue (Callable&& callable, Args&&... args) } // Add a new work item to the pool -template +template auto ThreadPool::enqueueAndDetach (Callable&& callable, Args&&... args) -> void { From 834fc0c2b7fe9caefc398592a4026ec8a4e3f60d Mon Sep 17 00:00:00 2001 From: Calthron Date: Thu, 12 Nov 2015 17:00:45 -0800 Subject: [PATCH 07/10] Use break instead of return Provide a single exit point for the worker thread --- ThreadPool.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ThreadPool.cpp b/ThreadPool.cpp index e3c7379..6cf2781 100644 --- a/ThreadPool.cpp +++ b/ThreadPool.cpp @@ -27,9 +27,9 @@ ThreadPool::ThreadPool (size_t threads) return this->stop || !this->tasks.empty (); }); - // Exit the thread if stopping and no work remains + // If stopping and no work remains, exit the work loop and thread if (this->stop && this->tasks.empty ()) - return; + break; // Dequeue the next task task = std::move (this->tasks.front ()); From f761d527a83a37003ecef53c94718818e60d8601 Mon Sep 17 00:00:00 2001 From: Calthron Date: Fri, 13 Nov 2015 07:40:30 -0800 Subject: [PATCH 08/10] Set default thread pool size to 1 --- ThreadPool.cpp | 2 +- ThreadPool.hpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ThreadPool.cpp b/ThreadPool.cpp index 6cf2781..1f1ff72 100644 --- a/ThreadPool.cpp +++ b/ThreadPool.cpp @@ -56,6 +56,6 @@ ThreadPool::~ThreadPool () // Wait for threads to complete work for (std::thread &worker : workers) { - if (worker.joinable ()) { worker.join(); } + worker.join(); } } diff --git a/ThreadPool.hpp b/ThreadPool.hpp index 2a6e894..f190b8b 100644 --- a/ThreadPool.hpp +++ b/ThreadPool.hpp @@ -18,7 +18,7 @@ class ThreadPool final { public: // Launches specified number of worker threads - ThreadPool (size_t threads); + ThreadPool (size_t threads = 1); ~ThreadPool (); // Not copyable From 3a5d8a8a28886157ddbeed3ed906733faff8ef17 Mon Sep 17 00:00:00 2001 From: Calthron Date: Fri, 13 Nov 2015 15:19:44 -0800 Subject: [PATCH 09/10] Use lock_t instead of Lock Use lock_t instead of Lock --- ThreadPool.cpp | 6 +++--- ThreadPool.hpp | 8 ++++---- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/ThreadPool.cpp b/ThreadPool.cpp index 1f1ff72..109b9e0 100644 --- a/ThreadPool.cpp +++ b/ThreadPool.cpp @@ -1,4 +1,4 @@ -// Purpose: Thread pool +// Purpose: Simple thread pool // Based on https://github.com/progschj/ThreadPool @@ -21,7 +21,7 @@ ThreadPool::ThreadPool (size_t threads) // Wait for additional work signal { // Critical section // Wait to be notified of work - Lock lock (this->queue_mutex); + lock_t lock (this->queue_mutex); this->condition.wait (lock, [this]() { return this->stop || !this->tasks.empty (); @@ -47,7 +47,7 @@ ThreadPool::ThreadPool (size_t threads) ThreadPool::~ThreadPool () { { // Critical section - Lock lock (queue_mutex); + lock_t lock (queue_mutex); stop = true; } // End critical section diff --git a/ThreadPool.hpp b/ThreadPool.hpp index f190b8b..94a96a8 100644 --- a/ThreadPool.hpp +++ b/ThreadPool.hpp @@ -1,4 +1,4 @@ -// Purpose: Thread pool +// Purpose: Simple thread pool // Based on https://github.com/progschj/ThreadPool @@ -47,7 +47,7 @@ class ThreadPool final std::queue> tasks; // Synchronization - using Lock = std::unique_lock; + using lock_t = std::unique_lock; std::mutex queue_mutex; std::condition_variable condition; bool stop = false; @@ -65,7 +65,7 @@ auto ThreadPool::enqueue (Callable&& callable, Args&&... args) std::future result = task->get_future(); { // Critical section - Lock lock (queue_mutex); + lock_t lock (queue_mutex); // Don't allow an enqueue after stopping if (stop) @@ -86,7 +86,7 @@ auto ThreadPool::enqueueAndDetach (Callable&& callable, Args&&... args) -> void { { // Critical section - Lock lock (queue_mutex); + lock_t lock (queue_mutex); // Don't allow an enqueue after stopping if (stop) From cbb46e0a1e79107e7559bdade16b5af5ebdc82cd Mon Sep 17 00:00:00 2001 From: Calthron Date: Wed, 25 Jan 2017 08:56:01 -0800 Subject: [PATCH 10/10] Remove enqueAndDetach enqueAndDetach is not needed as std::future wait is not an issues on it's destruction if it is not created from std::async. --- ThreadPool.cpp | 23 ++++++++++++----------- ThreadPool.hpp | 48 +++++++++++------------------------------------- 2 files changed, 23 insertions(+), 48 deletions(-) diff --git a/ThreadPool.cpp b/ThreadPool.cpp index 109b9e0..393c722 100644 --- a/ThreadPool.cpp +++ b/ThreadPool.cpp @@ -1,6 +1,7 @@ +// // Purpose: Simple thread pool - -// Based on https://github.com/progschj/ThreadPool +// +// Based on https://github.com/progschj/ThreadPool changes provided as https://github.com/calthron/ThreadPool #include "ThreadPool.hpp" @@ -19,22 +20,22 @@ ThreadPool::ThreadPool (size_t threads) std::function task; // Wait for additional work signal - { // Critical section + { // CRITICAL SECTION // Wait to be notified of work - lock_t lock (this->queue_mutex); - this->condition.wait (lock, [this]() + lock_t lock (queue_mutex); + condition.wait (lock, [this]() { - return this->stop || !this->tasks.empty (); + return stop || !tasks.empty (); }); - // If stopping and no work remains, exit the work loop and thread - if (this->stop && this->tasks.empty ()) + // If stopping and no work remains, exit the work loop + if (stop && tasks.empty ()) break; // Dequeue the next task - task = std::move (this->tasks.front ()); - this->tasks.pop (); - } // End critical section + task.swap (tasks.front ()); + tasks.pop (); + } // END CRITICAL SECTION // Execute task (); diff --git a/ThreadPool.hpp b/ThreadPool.hpp index 94a96a8..74693fa 100644 --- a/ThreadPool.hpp +++ b/ThreadPool.hpp @@ -1,18 +1,19 @@ +// // Purpose: Simple thread pool - -// Based on https://github.com/progschj/ThreadPool +// +// Based on https://github.com/progschj/ThreadPool changes provided as https://github.com/calthron/ThreadPool #pragma once -#include -#include -#include -#include -#include #include -#include #include +#include +#include +#include +#include #include +#include +#include class ThreadPool final { @@ -34,18 +35,12 @@ class ThreadPool final auto enqueue (Callable&& callable, Args&&... args) -> std::future::type>; - // Enqueue task without requiring capture of std::future<> - // Note: Best not to let exceptions escape the callable - template - auto enqueueAndDetach (Callable&& callable, Args&&... args) - -> void; - private: // Keep track of threads, so they can be joined std::vector workers; // Task queue std::queue> tasks; - + // Synchronization using lock_t = std::unique_lock; std::mutex queue_mutex; @@ -53,7 +48,7 @@ class ThreadPool final bool stop = false; }; -// Add a new work item to the pool +// Add a new work item to the pool, return std::future of return type template auto ThreadPool::enqueue (Callable&& callable, Args&&... args) -> std::future::type> @@ -79,24 +74,3 @@ auto ThreadPool::enqueue (Callable&& callable, Args&&... args) condition.notify_one (); return result; } - -// Add a new work item to the pool -template -auto ThreadPool::enqueueAndDetach (Callable&& callable, Args&&... args) - -> void -{ - { // Critical section - lock_t lock (queue_mutex); - - // Don't allow an enqueue after stopping - if (stop) - throw std::runtime_error ("enqueue on stopped ThreadPool"); - - // Push work back on the queue - tasks.emplace (std::bind (std::forward (callable), std::forward (args)...)); - } // End critical section - - // Notify a thread that there is new work to perform - condition.notify_one (); -} -