Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an alternate method to enqueueAndDetach a task without requiring the capture of std::future<> #31

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
61 changes: 61 additions & 0 deletions ThreadPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Purpose: Thread pool

// Based on https://github.com/progschj/ThreadPool

#include "ThreadPool.hpp"

ThreadPool::ThreadPool (size_t threads)
{
workers.reserve (threads);

for (size_t count = 0; count < threads; ++count)
{
// Worker execution loop
workers.emplace_back ([this]()
{
for (;;)
{
// Task to execute
std::function<void ()> task;

// Wait for additional work signal
{ // Critical section
// Wait to be notified of work
Lock lock (this->queue_mutex);
this->condition.wait (lock, [this]()
{
return this->stop || !this->tasks.empty ();
});

// If stopping and no work remains, exit the work loop and thread
if (this->stop && this->tasks.empty ())
break;

// Dequeue the next task
task = std::move (this->tasks.front ());
this->tasks.pop ();
} // End critical section

// Execute
task ();
}
});
}
}

// Destructor joins all worker threads
ThreadPool::~ThreadPool ()
{
{ // Critical section
Lock lock (queue_mutex);
stop = true;
} // End critical section

condition.notify_all ();

// Wait for threads to complete work
for (std::thread &worker : workers)
{
worker.join();
}
}
98 changes: 0 additions & 98 deletions ThreadPool.h

This file was deleted.

102 changes: 102 additions & 0 deletions ThreadPool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Purpose: Thread pool

// Based on https://github.com/progschj/ThreadPool

#pragma once

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool final
{
public:
// Launches specified number of worker threads
ThreadPool (size_t threads = 1);
~ThreadPool ();

// Not copyable
ThreadPool (const ThreadPool &) = delete;
ThreadPool& operator= (const ThreadPool &) = delete;

// Not moveable
ThreadPool (ThreadPool &&) = delete;
ThreadPool& operator= (const ThreadPool &&) = delete;

// Enqueue task and return std::future<>
template<typename Callable, typename... Args>
auto enqueue (Callable&& callable, Args&&... args)
-> std::future<typename std::result_of<Callable (Args...)>::type>;

// Enqueue task without requiring capture of std::future<>
// Note: Best not to let exceptions escape the callable
template<typename Callable, typename... Args>
auto enqueueAndDetach (Callable&& callable, Args&&... args)
-> void;

private:
// Keep track of threads, so they can be joined
std::vector<std::thread> workers;
// Task queue
std::queue<std::function<void ()>> tasks;

// Synchronization
using Lock = std::unique_lock<std::mutex>;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop = false;
};

// Add a new work item to the pool
template<typename Callable, typename... Args>
auto ThreadPool::enqueue (Callable&& callable, Args&&... args)
-> std::future<typename std::result_of<Callable (Args...)>::type>
{
using return_t = typename std::result_of<Callable (Args...)>::type;
using task_t = std::packaged_task<return_t ()>;

auto task = std::make_shared<task_t> (std::bind (std::forward<Callable> (callable), std::forward<Args> (args)...));
std::future<return_t> result = task->get_future();

{ // Critical section
Lock lock (queue_mutex);

// Don't allow an enqueue after stopping
if (stop)
throw std::runtime_error ("enqueue on stopped ThreadPool");

// Push work back on the queue
tasks.emplace ([task](){ (*task)(); });
} // End critical section

// Notify a thread that there is new work to perform
condition.notify_one ();
return result;
}

// Add a new work item to the pool
template<typename Callable, typename... Args>
auto ThreadPool::enqueueAndDetach (Callable&& callable, Args&&... args)
-> void
{
{ // Critical section
Lock lock (queue_mutex);

// Don't allow an enqueue after stopping
if (stop)
throw std::runtime_error ("enqueue on stopped ThreadPool");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@progschj isn't this going to cause a throw from destructor potentially? that seems to the be only time when "stop" is called. IIRC you can never cleanly catch such an exception -- DTOR throw is undefined behavior.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I’m not sure I understand the question.

This would be throwing on a caller’s thread when the internal ‘stop’ flag is true, and the caller was trying to add a task. If the caller is calling member functions while the object is being destroyed… that would be another sort of issue.

From: peter karasev [mailto:[email protected]]
Sent: Thursday, June 16, 2016 11:56 AM
To: progschj/ThreadPool [email protected]
Cc: Steven LeMay [email protected]; Author [email protected]
Subject: Re: [progschj/ThreadPool] Add an alternate method to enqueueAndDetach a task without requiring the capture of std::future<> (#31)

In ThreadPool.hpp #31 (comment) :

  • // Notify a thread that there is new work to perform
  • condition.notify_one ();
  • return result;
    +}

+// Add a new work item to the pool
+template<typename Callable, typename... Args>
+auto ThreadPool::enqueueAndDetach (Callable&& callable, Args&&... args)

  • -> void
    +{
  • { // Critical section
  •  Lock lock (queue_mutex);
    
  •  // Don't allow an enqueue after stopping
    
  •  if (stop)
    
  •     throw std::runtime_error ("enqueue on stopped ThreadPool");
    

@progschj https://github.com/progschj isn't this going to cause a throw from destructor potentially? that seems to the be only time when "stop" is called. IIRC you can never cleanly catch such an exception -- DTOR throw is undefined behavior.


You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub https://github.com/progschj/ThreadPool/pull/31/files/f761d527a83a37003ecef53c94718818e60d8601#r67404012 , or mute the thread https://github.com/notifications/unsubscribe/AKJadATwrIy1WhowY4pCzd32mUBhU6JSks5qMZxagaJpZM4GfmQ6 . https://github.com/notifications/beacon/AKJadA15Aixeyyd70laKpBGr_6yKx0KRks5qMZxagaJpZM4GfmQ6.gif


// Push work back on the queue
tasks.emplace (std::bind (std::forward<Callable> (callable), std::forward<Args> (args)...));
} // End critical section

// Notify a thread that there is new work to perform
condition.notify_one ();
}