Skip to content

Commit

Permalink
Merge branch 'gpimm/bump_write_buffer_size' into 'master'
Browse files Browse the repository at this point in the history
Add writer options for direct io

See merge request minknow/pod5-file-format!352
  • Loading branch information
0x55555555 committed Aug 6, 2024
2 parents 44281d5 + 5abd929 commit be8c086
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 21 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@ All notable changes, updates, and fixes to pod5 will be documented here
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.3.14]
## [0.3.15]

## Added

- Added new end reasons "api_request" and "device_data_error" to allow for new read end reasons future minknow versions will generate.
- Allow directio to specify the chunk size directly.

## [0.3.14]

## Added

- gcc8 builds

## [0.3.13]

Expand Down
37 changes: 25 additions & 12 deletions c++/pod5_format/file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,19 @@

namespace {
/// Open a file using the specified path and return it
arrow::Result<std::shared_ptr<arrow::io::OutputStream>>
open_file_output_stream(std::string const & path, bool append, bool use_directio = true)
arrow::Result<std::shared_ptr<arrow::io::OutputStream>> open_file_output_stream(
std::string const & path,
bool append,
bool use_directio = true,
bool use_sync_io = false)
{
#ifdef __linux__
auto flags = use_directio ? O_RDWR | O_DIRECT : O_RDWR;

flags |= (append == true ? O_APPEND : O_CREAT);
if (use_sync_io) {
flags |= O_SYNC;
}

int fd = open(path.c_str(), flags, 0644);

Expand All @@ -55,11 +61,13 @@ open_file_output_stream(std::string const & path, bool append, bool use_directio
std::shared_ptr<arrow::io::OutputStream> make_async_stream(
std::shared_ptr<arrow::io::OutputStream> const & io_stream,
std::shared_ptr<pod5::ThreadPool> thread_pool,
bool use_directio = true)
bool use_directio,
std::size_t directio_chunk_size)
{
#ifdef __linux__
if (use_directio) {
return std::make_shared<pod5::AsyncOutputStreamDirectIO>(io_stream, thread_pool);
return std::make_shared<pod5::AsyncOutputStreamDirectIO>(
io_stream, thread_pool, directio_chunk_size);
} else {
return std::make_shared<pod5::AsyncOutputStream>(io_stream, thread_pool);
}
Expand All @@ -79,6 +87,8 @@ FileWriterOptions::FileWriterOptions()
, m_read_table_batch_size(DEFAULT_READ_TABLE_BATCH_SIZE)
, m_run_info_table_batch_size(DEFAULT_RUN_INFO_TABLE_BATCH_SIZE)
, m_use_directio{DEFAULT_USE_DIRECTIO}
, m_directio_chunk_size(DEFAULT_DIRECTIO_CHUNK_SIZE)
, m_use_sync_io(DEFAULT_USE_SYNC_IO)
{
}

Expand Down Expand Up @@ -552,13 +562,14 @@ pod5::Result<std::unique_ptr<FileWriter>> create_file_writer(
auto run_info_tmp_path = make_run_info_tmp_path(arrow_path, file_identifier);

bool const use_directio = options.use_directio();
bool const use_sync_io = options.use_sync_io();

// Prepare the temporary reads file:
ARROW_ASSIGN_OR_RAISE(
auto read_table_file_stream,
::open_file_output_stream(reads_tmp_path, false, use_directio));
auto read_table_file_async =
::make_async_stream(read_table_file_stream, thread_pool, use_directio);
::open_file_output_stream(reads_tmp_path, false, use_directio, use_sync_io));
auto read_table_file_async = ::make_async_stream(
read_table_file_stream, thread_pool, use_directio, options.directio_chunk_size());
ARROW_ASSIGN_OR_RAISE(
auto read_table_tmp_writer,
make_read_table_writer(
Expand All @@ -573,9 +584,9 @@ pod5::Result<std::unique_ptr<FileWriter>> create_file_writer(
// Prepare the temporary run_info file:
ARROW_ASSIGN_OR_RAISE(
auto run_info_table_file_stream,
::open_file_output_stream(run_info_tmp_path, false, use_directio));
auto run_info_table_file_async =
::make_async_stream(run_info_table_file_stream, thread_pool, use_directio);
::open_file_output_stream(run_info_tmp_path, false, use_directio, use_sync_io));
auto run_info_table_file_async = ::make_async_stream(
run_info_table_file_stream, thread_pool, use_directio, options.directio_chunk_size());

ARROW_ASSIGN_OR_RAISE(
auto run_info_table_tmp_writer,
Expand All @@ -587,8 +598,10 @@ pod5::Result<std::unique_ptr<FileWriter>> create_file_writer(

// Prepare the main file - and set up the signal table to write here:
ARROW_ASSIGN_OR_RAISE(
auto signal_table_file_stream, ::open_file_output_stream(path, false, use_directio));
auto signal_file = ::make_async_stream(signal_table_file_stream, thread_pool, use_directio);
auto signal_table_file_stream,
::open_file_output_stream(path, false, use_directio, use_sync_io));
auto signal_file = ::make_async_stream(
signal_table_file_stream, thread_pool, use_directio, options.directio_chunk_size());

// Write the initial header to the combined file:
ARROW_RETURN_NOT_OK(combined_file_utils::write_combined_header(signal_file, section_marker));
Expand Down
12 changes: 12 additions & 0 deletions c++/pod5_format/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ class POD5_FORMAT_EXPORT FileWriterOptions {
static constexpr std::uint32_t DEFAULT_RUN_INFO_TABLE_BATCH_SIZE = 1;
static constexpr SignalType DEFAULT_SIGNAL_TYPE = SignalType::VbzSignal;
static constexpr bool DEFAULT_USE_DIRECTIO = false;
static constexpr bool DEFAULT_USE_SYNC_IO = false;
static constexpr std::size_t DEFAULT_DIRECTIO_CHUNK_SIZE = 2 * 1024 * 1024;

FileWriterOptions();

Expand Down Expand Up @@ -73,6 +75,14 @@ class POD5_FORMAT_EXPORT FileWriterOptions {

bool use_directio() const { return m_use_directio; }

void set_directio_chunk_size(std::size_t chunk_size) { m_directio_chunk_size = chunk_size; }

std::size_t directio_chunk_size() const { return m_directio_chunk_size; }

void set_use_sync_io(bool use_sync_io) { m_use_sync_io = use_sync_io; }

bool use_sync_io() const { return m_use_sync_io; }

private:
std::shared_ptr<ThreadPool> m_writer_thread_pool;
std::uint32_t m_max_signal_chunk_size;
Expand All @@ -82,6 +92,8 @@ class POD5_FORMAT_EXPORT FileWriterOptions {
std::size_t m_read_table_batch_size;
std::size_t m_run_info_table_batch_size;
bool m_use_directio;
std::size_t m_directio_chunk_size;
bool m_use_sync_io;
};

class FileWriterImpl;
Expand Down
11 changes: 6 additions & 5 deletions c++/pod5_format/internal/async_output_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ namespace {
constexpr size_t alignment = 4096; // buffer alignment (for block devices)
constexpr size_t megabyte = 256 * alignment; // 1MB
constexpr size_t fallocate_chunk = 50 * megabyte; // 50MB
constexpr size_t write_buffer_size = megabyte; // Arbitrary limit. Seems a good trade-off
// between memory usage and disk activities

} // namespace

Expand Down Expand Up @@ -182,10 +180,12 @@ class AsyncOutputStreamDirectIO : public AsyncOutputStream {
public:
AsyncOutputStreamDirectIO(
std::shared_ptr<OutputStream> const & main_stream,
std::shared_ptr<ThreadPool> const & thread_pool)
std::shared_ptr<ThreadPool> const & thread_pool,
std::size_t write_chunk_size)
: AsyncOutputStream(main_stream, thread_pool)
, m_write_chunk_size(write_chunk_size)
, m_fallocate_offset{0}
, m_buffer(write_buffer_size, alignment)
, m_buffer(m_write_chunk_size, alignment)
, m_flushed_buffer_copy(alignment, 0)
, m_buffer_offset{0}
, m_num_blocks_written{0}
Expand Down Expand Up @@ -240,7 +240,7 @@ class AsyncOutputStreamDirectIO : public AsyncOutputStream {
ARROW_RETURN_NOT_OK(write_cache());

// adjust accounting
m_num_blocks_written += (write_buffer_size / alignment);
m_num_blocks_written += (m_write_chunk_size / alignment);
}
}
return arrow::Status::OK();
Expand Down Expand Up @@ -421,6 +421,7 @@ class AsyncOutputStreamDirectIO : public AsyncOutputStream {
return arrow::Status::OK();
}

std::size_t const m_write_chunk_size;
std::size_t m_fallocate_offset;
AlignedBuffer m_buffer;
// copy of buffer (unaligned) which has been flushed to the output stream already,
Expand Down
2 changes: 1 addition & 1 deletion docs/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ sphinx-rtd-theme
sphinx==v5.3.0
myst-parser
# Paths are relative to project root for ReadTheDocs and docs/Makefile
pod5==0.3.14
pod5==0.3.15
2 changes: 1 addition & 1 deletion python/pod5/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ classifiers=[
]

dependencies = [
"lib_pod5 == 0.3.14",
"lib_pod5 == 0.3.15",
"iso8601",
'importlib-metadata; python_version<"3.8"',
"more_itertools",
Expand Down
2 changes: 1 addition & 1 deletion python/pod5/src/pod5/tools/pod5_convert_to_fast5.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"signal_positive": 5,
"signal_negative": 6,
"api_request": 7,
"signal_negative": 6,
"device_data_error": 8,
}

# Fast5 types
Expand Down

0 comments on commit be8c086

Please sign in to comment.