Skip to content

Commit

Permalink
semaphore plus:
Browse files Browse the repository at this point in the history
(1) add an option for in-order resume of threads (by default not);
(2) avoid thundering herd of threads whose needs are not met;
  • Loading branch information
lihuiba committed Dec 18, 2024
1 parent 7346d38 commit 2283b3a
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 27 deletions.
45 changes: 24 additions & 21 deletions thread/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1749,44 +1749,47 @@ R"(
{
if (count == 0) return 0;
splock.lock();
DEFER(splock.unlock());
CURRENT->semaphore_count = count;
int ret = 0;
while (!try_subtract(count)) {
ret = waitq::wait_defer(timeout, spinlock_unlock, &splock);
ERRNO err;
splock.lock();
if (ret < 0) {
int ret = waitq::wait_defer(timeout, spinlock_unlock, &splock);
splock.lock(); // assuming errno NOT changed
if (unlikely(ret < 0)) { // got interrupted
CURRENT->semaphore_count = 0;
// when timeout, we need to try to resume next thread(s) in q
if (err.no == ETIMEDOUT) try_resume();
splock.unlock();
errno = err.no;
if (!m_ooo_resume)
try_resume(m_count.load());
return ret;
}
}
try_resume();
splock.unlock();
return 0;
}
void semaphore::try_resume()
{
auto cnt = m_count.load();
while(true)
{
void semaphore::try_resume(uint64_t cnt) {
if (cnt == 0) return;
while(true) {
ScopedLockHead h(this);
if (!h) return;
if (!h) break;
auto th = (thread*)h;
auto& qfcount = th->semaphore_count;
if (qfcount > cnt) break;
cnt -= qfcount;
qfcount = 0;
prelocked_thread_interrupt(th, -1);
}
if (!q.th || !cnt || !m_ooo_resume)
return;
for (auto th = q.th->next();
th!= q.th && cnt;
th = th->next()) {
auto& qfcount = th->semaphore_count;
if (qfcount <= cnt) {
cnt -= qfcount;
qfcount = 0;
prelocked_thread_interrupt(th, -1);
}
}
}
bool semaphore::try_subtract(uint64_t count)
{
while(true)
{
inline bool semaphore::try_subtract(uint64_t count) {
while(true) {
auto mc = m_count.load();
if (mc < count)
return false;
Expand Down
13 changes: 7 additions & 6 deletions thread/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,8 @@ namespace photon
class semaphore : protected waitq
{
public:
explicit semaphore(uint64_t count = 0) : m_count(count) { }
explicit semaphore(uint64_t count = 0, bool in_order_resume = true)
: m_count(count), m_ooo_resume(!in_order_resume) { }
int wait(uint64_t count, Timeout timeout = {}) {
int ret = 0;
do {
Expand All @@ -441,12 +442,11 @@ namespace photon
return ret;
}
int wait_interruptible(uint64_t count, Timeout timeout = {});
int signal(uint64_t count)
{
int signal(uint64_t count) {
if (count == 0) return 0;
SCOPED_LOCK(splock);
m_count.fetch_add(count);
resume_one();
auto cnt = m_count.fetch_add(count) + count;
try_resume(cnt);
return 0;
}
uint64_t count() const {
Expand All @@ -455,9 +455,10 @@ namespace photon

protected:
std::atomic<uint64_t> m_count;
bool m_ooo_resume;
spinlock splock;
bool try_subtract(uint64_t count);
void try_resume();
void try_resume(uint64_t count);
};

// to be different to timer flags
Expand Down

0 comments on commit 2283b3a

Please sign in to comment.