Skip to content

Commit

Permalink
Introduce a ConcurrentQueue to reduce the number of raw mutexes used …
Browse files Browse the repository at this point in the history
…throughout the code to simplify the code reduce the chance of (multithreaded) deadlocks.

For now it's just a queue with a mutex.

this might help with #863 and #864 in the future.
  • Loading branch information
IntegratedQuantum committed Dec 29, 2024
1 parent 19b1e7e commit de90b4e
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 84 deletions.
2 changes: 1 addition & 1 deletion src/entity.zig
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ pub const ClientEntityManager = struct {
var modelTexture: main.graphics.Texture = undefined;
var shader: graphics.Shader = undefined; // Entities are sometimes small and sometimes big. Therefor it would mean a lot of work to still use smooth lighting. Therefor the non-smooth shader is used for those.
pub var entities: main.VirtualList(ClientEntity, 1 << 20) = undefined;
pub var mutex: std.Thread.Mutex = std.Thread.Mutex{};
pub var mutex: std.Thread.Mutex = .{};

pub fn init() void {
entities = .init();
Expand Down
2 changes: 1 addition & 1 deletion src/game.zig
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ pub const Player = struct { // MARK: Player
pub var isFlying: Atomic(bool) = .init(false);
pub var isGhost: Atomic(bool) = .init(false);
pub var hyperSpeed: Atomic(bool) = .init(false);
pub var mutex: std.Thread.Mutex = std.Thread.Mutex{};
pub var mutex: std.Thread.Mutex = .{};
pub var inventory: Inventory = undefined;
pub var selectedSlot: u32 = 0;

Expand Down
20 changes: 4 additions & 16 deletions src/gui/gui.zig
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,22 @@ const GuiCommandQueue = struct { // MARK: GuiCommandQueue
action: Action,
};

var commands: List(Command) = undefined;
var mutex: std.Thread.Mutex = .{};
var commands: main.utils.ConcurrentQueue(Command) = undefined;

fn init() void {
mutex.lock();
defer mutex.unlock();
commands = .init(main.globalAllocator);
commands = .init(main.globalAllocator, 16);
}

fn deinit() void {
mutex.lock();
defer mutex.unlock();
commands.deinit();
}

fn scheduleCommand(command: Command) void {
mutex.lock();
defer mutex.unlock();
commands.append(command);
commands.enqueue(command);
}

fn executeCommands() void {
mutex.lock();
const commands_ = main.stackAllocator.dupe(Command, commands.items);
defer main.stackAllocator.free(commands_);
commands.clearAndFree();
mutex.unlock();
for(commands_) |command| {
while(commands.dequeue()) |command| {
switch(command.action) {
.open => {
executeOpenWindowCommand(command.window);
Expand Down
31 changes: 12 additions & 19 deletions src/gui/windows/chat.zig
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ const messageTimeout: i32 = 10000;
const messageFade = 1000;

var history: main.List(*Label) = undefined;
var mutex: std.Thread.Mutex = .{};
var messageQueue: main.List([]const u8) = undefined;
var messageQueue: main.utils.ConcurrentQueue([]const u8) = undefined;
var expirationTime: main.List(i32) = undefined;
var historyStart: u32 = 0;
var fadeOutEnd: u32 = 0;
Expand Down Expand Up @@ -72,7 +71,7 @@ fn refresh() void {
pub fn onOpen() void {
history = .init(main.globalAllocator);
expirationTime = .init(main.globalAllocator);
messageQueue = .init(main.globalAllocator);
messageQueue = .init(main.globalAllocator, 16);
historyStart = 0;
fadeOutEnd = 0;
input = TextInput.init(.{0, 0}, 256, 32, "", .{.callback = &sendMessage});
Expand All @@ -84,7 +83,7 @@ pub fn onClose() void {
label.deinit();
}
history.deinit();
for(messageQueue.items) |msg| {
while(messageQueue.dequeue()) |msg| {
main.globalAllocator.free(msg);
}
messageQueue.deinit();
Expand All @@ -96,20 +95,16 @@ pub fn onClose() void {
}

pub fn update() void {
{
mutex.lock();
defer mutex.unlock();
if(messageQueue.items.len != 0) {
const currentTime: i32 = @truncate(std.time.milliTimestamp());
for(messageQueue.items) |msg| {
history.append(Label.init(.{0, 0}, 256, msg, .left));
main.globalAllocator.free(msg);
expirationTime.append(currentTime +% messageTimeout);
}
refresh();
messageQueue.clearRetainingCapacity();
if(!messageQueue.empty()) {
const currentTime: i32 = @truncate(std.time.milliTimestamp());
while(messageQueue.dequeue()) |msg| {
history.append(Label.init(.{0, 0}, 256, msg, .left));
main.globalAllocator.free(msg);
expirationTime.append(currentTime +% messageTimeout);
}
refresh();
}

const currentTime: i32 = @truncate(std.time.milliTimestamp());
while(fadeOutEnd < history.items.len and currentTime -% expirationTime.items[fadeOutEnd] >= 0) {
fadeOutEnd += 1;
Expand Down Expand Up @@ -139,9 +134,7 @@ pub fn render() void {
}

pub fn addMessage(msg: []const u8) void {
mutex.lock();
defer mutex.unlock();
messageQueue.append(main.globalAllocator.dupe(u8, msg));
messageQueue.enqueue(main.globalAllocator.dupe(u8, msg));
}

pub fn sendMessage(_: usize) void {
Expand Down
2 changes: 1 addition & 1 deletion src/itemdrop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub const ItemDropManager = struct { // MARK: ItemDropManager

allocator: NeverFailingAllocator,

mutex: std.Thread.Mutex = std.Thread.Mutex{},
mutex: std.Thread.Mutex = .{},

list: std.MultiArrayList(ItemDrop),

Expand Down
4 changes: 2 additions & 2 deletions src/network.zig
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ pub const ConnectionManager = struct { // MARK: ConnectionManager
connections: main.List(*Connection) = undefined,
requests: main.List(*Request) = undefined,

mutex: std.Thread.Mutex = std.Thread.Mutex{},
mutex: std.Thread.Mutex = .{},
waitingToFinishReceive: std.Thread.Condition = std.Thread.Condition{},

receiveBuffer: [Connection.maxPacketSize]u8 = undefined,
Expand Down Expand Up @@ -1262,7 +1262,7 @@ pub const Connection = struct { // MARK: Connection
handShakeWaiting: std.Thread.Condition = std.Thread.Condition{},
lastConnection: i64,

mutex: std.Thread.Mutex = std.Thread.Mutex{},
mutex: std.Thread.Mutex = .{},

pub fn init(manager: *ConnectionManager, ipPort: []const u8, user: ?*main.server.User) !*Connection {
const result: *Connection = main.globalAllocator.create(Connection);
Expand Down
57 changes: 25 additions & 32 deletions src/renderer/mesh_storage.zig
Original file line number Diff line number Diff line change
Expand Up @@ -36,30 +36,29 @@ const storageMask = storageSize - 1;
var storageLists: [settings.highestSupportedLod + 1]*[storageSize*storageSize*storageSize]ChunkMeshNode = undefined;
var mapStorageLists: [settings.highestSupportedLod + 1]*[storageSize*storageSize]?*LightMap.LightMapFragment = undefined;
var meshList = main.List(*chunk_meshing.ChunkMesh).init(main.globalAllocator);
var priorityMeshUpdateList = main.List(*chunk_meshing.ChunkMesh).init(main.globalAllocator);
var priorityMeshUpdateList: main.utils.ConcurrentQueue(*chunk_meshing.ChunkMesh) = undefined;
pub var updatableList = main.List(*chunk_meshing.ChunkMesh).init(main.globalAllocator);
var mapUpdatableList = main.List(*LightMap.LightMapFragment).init(main.globalAllocator);
var mapUpdatableList: main.utils.ConcurrentQueue(*LightMap.LightMapFragment) = undefined;
var clearList = main.List(*chunk_meshing.ChunkMesh).init(main.globalAllocator);
var lastPx: i32 = 0;
var lastPy: i32 = 0;
var lastPz: i32 = 0;
var lastRD: u16 = 0;
var mutex = std.Thread.Mutex{};
var blockUpdateMutex = std.Thread.Mutex{};
var mutex: std.Thread.Mutex = .{};
const BlockUpdate = struct {
x: i32,
y: i32,
z: i32,
newBlock: blocks.Block,
};
var blockUpdateList: main.List(BlockUpdate) = undefined;
var blockUpdateList: main.utils.ConcurrentQueue(BlockUpdate) = undefined;

var meshMemoryPool: std.heap.MemoryPoolAligned(chunk_meshing.ChunkMesh, @alignOf(chunk_meshing.ChunkMesh)) = undefined;
var meshMemoryPoolMutex: std.Thread.Mutex = .{};

pub fn init() void { // MARK: init()
lastRD = 0;
blockUpdateList = .init(main.globalAllocator);
blockUpdateList = .init(main.globalAllocator, 16);
meshMemoryPool = .init(main.globalAllocator.allocator);
for(&storageLists) |*storageList| {
storageList.* = main.globalAllocator.create([storageSize*storageSize*storageSize]ChunkMeshNode);
Expand All @@ -71,6 +70,8 @@ pub fn init() void { // MARK: init()
mapStorageList.* = main.globalAllocator.create([storageSize*storageSize]?*LightMap.LightMapFragment);
@memset(mapStorageList.*, null);
}
priorityMeshUpdateList = .init(main.globalAllocator, 16);
mapUpdatableList = .init(main.globalAllocator, 16);
}

pub fn deinit() void {
Expand All @@ -94,15 +95,15 @@ pub fn deinit() void {
mesh.decreaseRefCount();
}
updatableList.clearAndFree();
for(mapUpdatableList.items) |map| {
while(mapUpdatableList.dequeue()) |map| {
map.decreaseRefCount();
}
mapUpdatableList.clearAndFree();
for(priorityMeshUpdateList.items) |mesh| {
mapUpdatableList.deinit();
while(priorityMeshUpdateList.dequeue()) |mesh| {
mesh.decreaseRefCount();
}
priorityMeshUpdateList.clearAndFree();
blockUpdateList.clearAndFree();
priorityMeshUpdateList.deinit();
blockUpdateList.deinit();
meshList.clearAndFree();
for(clearList.items) |mesh| {
mesh.deinit();
Expand Down Expand Up @@ -732,18 +733,15 @@ pub noinline fn updateAndGetRenderChunks(conn: *network.Connection, frustum: *co
}

pub fn updateMeshes(targetTime: i64) void { // MARK: updateMeshes()
{ // First of all process all the block updates:
blockUpdateMutex.lock();
defer blockUpdateMutex.unlock();
for(blockUpdateList.items) |blockUpdate| {
const pos = chunk.ChunkPosition{.wx=blockUpdate.x, .wy=blockUpdate.y, .wz=blockUpdate.z, .voxelSize=1};
if(getMeshAndIncreaseRefCount(pos)) |mesh| {
defer mesh.decreaseRefCount();
mesh.updateBlock(blockUpdate.x, blockUpdate.y, blockUpdate.z, blockUpdate.newBlock);
} // TODO: It seems like we simply ignore the block update if we don't have the mesh yet.
}
blockUpdateList.clearRetainingCapacity();
// First of all process all the block updates:
while(blockUpdateList.dequeue()) |blockUpdate| {
const pos = chunk.ChunkPosition{.wx=blockUpdate.x, .wy=blockUpdate.y, .wz=blockUpdate.z, .voxelSize=1};
if(getMeshAndIncreaseRefCount(pos)) |mesh| {
defer mesh.decreaseRefCount();
mesh.updateBlock(blockUpdate.x, blockUpdate.y, blockUpdate.z, blockUpdate.newBlock);
} // TODO: It seems like we simply ignore the block update if we don't have the mesh yet.
}

mutex.lock();
defer mutex.unlock();
for(clearList.items) |mesh| {
Expand All @@ -753,8 +751,7 @@ pub fn updateMeshes(targetTime: i64) void { // MARK: updateMeshes()
meshMemoryPoolMutex.unlock();
}
clearList.clearRetainingCapacity();
while (priorityMeshUpdateList.items.len != 0) {
const mesh = priorityMeshUpdateList.orderedRemove(0);
while(priorityMeshUpdateList.dequeue()) |mesh| {
if(!mesh.needsMeshUpdate) {
mutex.unlock();
defer mutex.lock();
Expand All @@ -778,7 +775,7 @@ pub fn updateMeshes(targetTime: i64) void { // MARK: updateMeshes()
mesh.uploadData();
if(std.time.milliTimestamp() >= targetTime) break; // Update at least one mesh.
}
while(mapUpdatableList.popOrNull()) |map| {
while(mapUpdatableList.dequeue()) |map| {
if(!isMapInRenderDistance(map.pos)) {
map.decreaseRefCount();
} else {
Expand Down Expand Up @@ -852,7 +849,7 @@ pub fn addToUpdateListAndDecreaseRefCount(mesh: *chunk_meshing.ChunkMesh) void {
mutex.lock();
defer mutex.unlock();
if(mesh.finishedMeshing) {
priorityMeshUpdateList.append(mesh);
priorityMeshUpdateList.enqueue(mesh);
mesh.needsMeshUpdate = true;
} else {
mutex.unlock();
Expand Down Expand Up @@ -935,17 +932,13 @@ pub const MeshGenerationTask = struct { // MARK: MeshGenerationTask
// MARK: updaters

pub fn updateBlock(x: i32, y: i32, z: i32, newBlock: blocks.Block) void {
blockUpdateMutex.lock();
defer blockUpdateMutex.unlock();
blockUpdateList.append(BlockUpdate{.x=x, .y=y, .z=z, .newBlock=newBlock});
blockUpdateList.enqueue(.{.x=x, .y=y, .z=z, .newBlock=newBlock});
}

pub fn updateChunkMesh(mesh: *chunk.Chunk) void {
MeshGenerationTask.schedule(mesh);
}

pub fn updateLightMap(map: *LightMap.LightMapFragment) void {
mutex.lock();
defer mutex.unlock();
mapUpdatableList.append(map);
mapUpdatableList.enqueue(map);
}
17 changes: 8 additions & 9 deletions src/server/server.zig
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ const updateNanoTime: u32 = 1000000000/20;

pub var world: ?*ServerWorld = null;
pub var users: main.List(*User) = undefined;
pub var userDeinitList: main.List(*User) = undefined;
pub var userDeinitList: main.utils.ConcurrentQueue(*User) = undefined;

pub var connectionManager: *ConnectionManager = undefined;

Expand All @@ -214,7 +214,7 @@ fn init(name: []const u8, singlePlayerPort: ?u16) void { // MARK: init()
std.debug.assert(world == null); // There can only be one world.
command.init();
users = .init(main.globalAllocator);
userDeinitList = .init(main.globalAllocator);
userDeinitList = .init(main.globalAllocator, 16);
lastTime = std.time.nanoTimestamp();
connectionManager = ConnectionManager.init(main.settings.defaultPort, false) catch |err| {
std.log.err("Couldn't create socket: {s}", .{@errorName(err)});
Expand Down Expand Up @@ -245,10 +245,10 @@ fn init(name: []const u8, singlePlayerPort: ?u16) void { // MARK: init()

fn deinit() void {
users.clearAndFree();
for(userDeinitList.items) |user| {
while(userDeinitList.dequeue()) |user| {
user.deinit();
}
userDeinitList.clearAndFree();
userDeinitList.deinit();
for(connectionManager.connections.items) |conn| {
conn.user.?.decreaseRefCount();
}
Expand Down Expand Up @@ -345,12 +345,11 @@ fn update() void { // MARK: update()
main.network.Protocols.entityPosition.send(user.conn, data, itemData);
}

while(userDeinitList.popOrNull()) |user| {
mutex.unlock();
mutex.unlock();

while(userDeinitList.dequeue()) |user| {
user.decreaseRefCount();
mutex.lock();
}
mutex.unlock();
}

pub fn start(name: []const u8, port: ?u16) void {
Expand Down Expand Up @@ -382,7 +381,7 @@ pub fn stop() void {
pub fn disconnect(user: *User) void { // MARK: disconnect()
if(!user.connected.load(.unordered)) return;
removePlayer(user);
userDeinitList.append(user);
userDeinitList.enqueue(user);
user.connected.store(false, .unordered);
}

Expand Down
Loading

0 comments on commit de90b4e

Please sign in to comment.