Skip to content

Commit

Permalink
lib: Implement thread pool and use it for BVH building
Browse files Browse the repository at this point in the history
I was intending on simplifying the main renderer thread management with
this as well, but on second thought, this isn't really a good fit for
that. The current approach gives us load balancing for free, since the
fixed render threads just pick up new tiles to render as soon as they
finish the previous one.
Using this thread pool for rendering will require a bit more planning.
  • Loading branch information
vkoskiv committed Jan 4, 2024
1 parent 284c616 commit 238f075
Show file tree
Hide file tree
Showing 11 changed files with 377 additions and 45 deletions.
15 changes: 1 addition & 14 deletions src/common/platform/mutex.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,6 @@
#include "mutex.h"
#include <stdlib.h>

#ifdef WINDOWS
#include <Windows.h>
#else
#include <pthread.h>
#endif

struct cr_mutex {
#ifdef WINDOWS
CRITICAL_SECTION lock;
#else
pthread_mutex_t lock; // = PTHREAD_MUTEX_INITIALIZER;
#endif
};

struct cr_mutex *mutex_create() {
struct cr_mutex *new = calloc(1, sizeof(*new));
#ifdef WINDOWS
Expand All @@ -40,6 +26,7 @@ void mutex_destroy(struct cr_mutex *m) {
#else
pthread_mutex_destroy(&m->lock);
#endif
free(m);
}

void mutex_lock(struct cr_mutex *m) {
Expand Down
15 changes: 14 additions & 1 deletion src/common/platform/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,20 @@

//Platform-agnostic mutexes

struct cr_mutex;
#ifdef WINDOWS
#include <Windows.h>
#else
#include <pthread.h>
#endif

struct cr_mutex {
#ifdef WINDOWS
LPCRITICAL_SECTION lock;
#else
pthread_mutex_t lock; // = PTHREAD_MUTEX_INITIALIZER;
#endif
};


struct cr_mutex *mutex_create(void);

Expand Down
94 changes: 92 additions & 2 deletions src/common/platform/thread.c
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//
// thread.c
// C-ray
// c-ray
//
// Created by Valtteri on 29.3.2020.
// Copyright © 2020 Valtteri Koskivuori. All rights reserved.
// Copyright © 2020-2024 Valtteri Koskivuori. All rights reserved.
//

#include <stdbool.h>
Expand All @@ -12,6 +12,11 @@
#include "thread.h"
#include "../logging.h"

/*
cond stuff is based on:
https://nachtimwald.com/2019/04/05/cross-platform-thread-wrapper/
*/

void *thread_user_data(void *arg) {
struct cr_thread *thread = (struct cr_thread *)arg;
return thread->user_data;
Expand All @@ -36,6 +41,19 @@ void thread_wait(struct cr_thread *t) {
#endif
}

int thread_create_detach(struct cr_thread *t) {
if (!t) return -1;
#ifdef WINDOWS
t->thread_handle = CreateThread(NULL, 0, thread_stub, t, 0, &t->thread_id);
CloseHandle(t->thread_handle);
return 0;
#else
pthread_create(&t->thread_id, NULL, thread_stub, t);
pthread_detach(t->thread_id);
return 0;
#endif
}

int thread_start(struct cr_thread *t) {
#ifdef WINDOWS
t->thread_handle = CreateThread(NULL, 0, thread_stub, t, 0, &t->thread_id);
Expand All @@ -50,3 +68,75 @@ int thread_start(struct cr_thread *t) {
return ret;
#endif
}

int thread_cond_init(struct cr_cond *cond) {
if (!cond) return -1;
#ifdef WINDOWS
InitializeConditionVariable(&cond->cond);
#else
pthread_cond_init(&cond->cond, NULL);
#endif
return 0;
}

int thread_cond_destroy(struct cr_cond *cond) {
if (!cond) return -1;
#ifndef WINDOWS
pthread_cond_destroy(&cond->cond);
#endif
return 0;
}

int thread_cond_wait(struct cr_cond *cond, struct cr_mutex *mutex) {
if (!cond || !mutex) return -1;
#ifdef WINDOWS
return thread_cond_timed_wait(cond, mutex, NULL);
#else
return pthread_cond_wait(&cond->cond, &mutex->lock);
#endif
}

#ifdef WINDOWS
static DWORD timespec_to_ms(const struct timespec *absolute_time) {
if (!absolute_time) return INFINITE;
DWORD t = ((absolute_time->tv_sec - time(NULL)) * 1000) + (absolute_time->tv_nsec / 1000000);
return t < 0 ? 1 : t;
}
#endif

void ms_to_timespec(struct timespec *ts, unsigned int ms) {
if (!ts) return;
ts->tv_sec = (ms / 1000) + time(NULL);
ts->tv_nsec = (ms % 1000) * 1000000;
}

int thread_cond_timed_wait(struct cr_cond *cond, struct cr_mutex *mutex, const struct timespec *absolute_time) {
if (!cond || !mutex) return -1;
#ifdef WINDOWS
if (!SleepConditionVariableCS(&cond->cond, &mutex->lock, timespec_to_ms(absolute_time)))
return -1;
#else
return pthread_cond_timedwait(&cond->cond, &mutex->lock, absolute_time);
#endif
return 0;
}

int thread_cond_signal(struct cr_cond *cond) {
if (!cond) return -1;
#ifdef WINDOWS
return WakeConditionVariable(&cond->cond);
#else
return pthread_cond_signal(&cond->cond);
#endif
return 0;
}

int thread_cond_broadcast(struct cr_cond *cond) {
if (!cond) return -1;
#ifdef WINDOWS
return WakeAllConditionVariable(&cond->cond);
#else
return pthread_cond_broadcast(&cond->cond);
#endif
return 0;
}
32 changes: 29 additions & 3 deletions src/common/platform/thread.h
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
//
// thread.h
// C-ray
// c-ray
//
// Created by Valtteri on 29.3.2020.
// Copyright © 2020-2021 Valtteri Koskivuori. All rights reserved.
// Copyright © 2020-2024 Valtteri Koskivuori. All rights reserved.
//

#pragma once

#ifdef WINDOWS
#include <stdbool.h>
#include <Windows.h>
#else
#include <pthread.h>
#endif

#include "../dyn_array.h"
#include "mutex.h"

//Multi-platform threading

Expand All @@ -32,16 +34,40 @@ struct cr_thread {
void *(*thread_fn)(void *); // Code you want to run.
};

struct cr_cond {
#ifdef WINDOWS
PCONDITION_VARIABLE cond;
#else
pthread_cond_t cond;
#endif
};

typedef struct cr_thread cr_thread;
dyn_array_def(cr_thread);

// Fetch the user data pointer from args parameter
void *thread_user_data(void *arg);

/// Start a new C-ray platform abstracted thread
// Create & detach a thread. Used by thread pool.
int thread_create_detach(struct cr_thread *t);
/// Start a new c-ray platform abstracted thread
/// @param t Pointer to the thread to be started
int thread_start(struct cr_thread *t);

/// Block until the given thread has terminated.
/// @param t Pointer to the thread to be checked.
void thread_wait(struct cr_thread *t);

int thread_cond_init(struct cr_cond *cond);

int thread_cond_destroy(struct cr_cond *cond);

int thread_cond_wait(struct cr_cond *cond, struct cr_mutex *mutex);

void ms_to_timespec(struct timespec *ts, unsigned int ms);

int thread_cond_timed_wait(struct cr_cond *cond, struct cr_mutex *mutex, const struct timespec *absolute_time);

int thread_cond_signal(struct cr_cond *cond);

int thread_cond_broadcast(struct cr_cond *cond);
158 changes: 158 additions & 0 deletions src/common/platform/thread_pool.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
//
// thread_pool.c
// c-ray
//
// Created by Valtteri on 04.1.2024.
// Copyright © 2024 Valtteri Koskivuori. All rights reserved.
//

#include "thread_pool.h"
#include "mutex.h"
#include "thread.h"
#include "../logging.h"

struct cr_task {
void (*fn)(void *arg);
void *arg;
struct cr_task *next;
};

struct cr_thread_pool {
struct cr_task *first;
struct cr_task *last;
struct cr_mutex *mutex;
struct cr_cond work_available;
struct cr_cond work_ongoing;
size_t active_workers;
struct cr_thread *threads;
size_t thread_count;
bool stop_flag;
};

static struct cr_task *task_create(void (*fn)(void *arg), void *arg) {
if (!fn) return NULL;
struct cr_task *task = malloc(sizeof(*task));
*task = (struct cr_task){
.fn = fn,
.arg = arg,
.next = NULL
};
return task;
}

static struct cr_task *thread_pool_get_task(struct cr_thread_pool *pool) {
if (!pool) return NULL;
struct cr_task *task = pool->first;
if (!task) return NULL;
if (!task->next) {
pool->first = NULL;
pool->last = NULL;
} else {
pool->first = task->next;
}
return task;
}

static void *cr_worker(void *arg) {
struct cr_thread_pool *pool = thread_user_data(arg);
while (true) {
mutex_lock(pool->mutex);
while (!pool->first && !pool->stop_flag)
thread_cond_wait(&pool->work_available, pool->mutex);
if (pool->stop_flag) break;
struct cr_task *task = thread_pool_get_task(pool);
pool->active_workers++;
mutex_release(pool->mutex);
if (task) {
task->fn(task->arg);
free(task);
}
mutex_lock(pool->mutex);
pool->active_workers--;
if (!pool->stop_flag && pool->active_workers == 0 && !pool->first)
thread_cond_signal(&pool->work_ongoing);
mutex_release(pool->mutex);
}
pool->thread_count--;
thread_cond_signal(&pool->work_ongoing);
mutex_release(pool->mutex);
return NULL;
}

struct cr_thread_pool *thread_pool_create(size_t threads) {
if (!threads) threads = 2;
struct cr_thread_pool *pool = calloc(1, sizeof(*pool));
logr(debug, "Spawning thread pool (%lut, %p)\n", threads, (void *)pool);
pool->thread_count = threads;
pool->threads = calloc(pool->thread_count, sizeof(*pool->threads));

pool->mutex = mutex_create();
thread_cond_init(&pool->work_available);
thread_cond_init(&pool->work_ongoing);

for (size_t i = 0; i < pool->thread_count; ++i) {
pool->threads[i] = (struct cr_thread){
.thread_fn = cr_worker,
.user_data = pool
};
thread_create_detach(&pool->threads[i]);
}
return pool;
}

void thread_pool_destroy(struct cr_thread_pool *pool) {
if (!pool) return;
logr(debug, "Closing thread pool (%lut, %p)\n", pool->thread_count, (void *)pool);
mutex_lock(pool->mutex);
// Clear work queue
struct cr_task *head = pool->first;
struct cr_task *next = NULL;
while (head) {
next = head->next;
free(head);
head = next;
}
// Tell the workers to stop
pool->stop_flag = true;
thread_cond_broadcast(&pool->work_available);
mutex_release(pool->mutex);

// Wait for them to actually stop
thread_pool_wait(pool);

mutex_destroy(pool->mutex);
thread_cond_destroy(&pool->work_available);
thread_cond_destroy(&pool->work_ongoing);
free(pool->threads);
free(pool);
}

bool thread_pool_enqueue(struct cr_thread_pool *pool, void (*fn)(void *arg), void *arg) {
if (!pool) return false;
struct cr_task *task = task_create(fn, arg);
if (!task) return false;
mutex_lock(pool->mutex);
if (!pool->first) {
pool->first = task;
pool->last = pool->first;
} else {
pool->last->next = task;
pool->last = task;
}
thread_cond_broadcast(&pool->work_available);
mutex_release(pool->mutex);
return true;
}

void thread_pool_wait(struct cr_thread_pool *pool) {
if (!pool) return;
mutex_lock(pool->mutex);
while (true) {
if ((!pool->stop_flag && pool->active_workers != 0) || (pool->stop_flag && pool->thread_count != 0)) {
thread_cond_wait(&pool->work_ongoing, pool->mutex);
} else {
break;
}
}
mutex_release(pool->mutex);
}
Loading

0 comments on commit 238f075

Please sign in to comment.