-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
message: port advanced rpc compression from scylla-enterprise.git to scylladb.git #22032
message: port advanced rpc compression from scylla-enterprise.git to scylladb.git #22032
Conversation
Adds utilities for "advanced" methods of compression with lz4 and zstd -- with streaming (a history buffer persisted across messages) and/or precomputed dictionaries. This patch is mostly just glue needed to use the underlying libraries with discontiguous input and output buffers, and for reusing the same compressor context objects across messages. It doesn't contain any innovations of its own. There is one "design decision" in the patch. The block format of LZ4 doesn't contain the length of the compressed blocks. At decompression time, that length must be delivered to the decompressor by a channel separate to the compressed block itself. In `lz4_cstream`, we deal with that by prepending a variable-length integer containing the compressed size to each compressed block. This is suboptimal for single-fragment messages, since the user of lz4_cstream is likely going to remember the length of the whole message anyway, which makes the length prepended to the block redundant. But a loss of 1 byte is probably acceptable for most uses.
Introduces a util which launches a new OS thread and accepts callables for concurrent execution. Meant to be created once at startup and used until shutdown, for running nonpreemptible, 3rd party, non-interactive code. Note: this new utility is almost identical to wasm::alien_thread_runner. Maybe we should unify them.
We are planning to improve some usages of compression in Scylla (in which we compress small blocks of data) by pre-training compression dictionaries on similar data seen so far. For example, many RPC messages have similar structure (and likely similar data), so the similarity could be exploited for better compression. This can be achieved e.g. by training a dictionary on the RPC traffic, and compressing subsequent RPC messages against that dictionary. To work well, the training should be fed a representative sample of the compressible data. Such a sample can be approached by taking a random subset (of some given reasonable size) of the data, with uniform probability. For our purposes, we need an online algorithm for this -- one which can select the random k-subset from a stream of arbitrary size (e.g. all RPC traffic over an hour), while requiring only the necessary minimum of memory. This is a known problem, called "reservoir sampling". This PR introduces `reservoir_sampler`, which implements an optimal algorithm for reservoir sampling. Additionally, it introduces `page_sampler` -- a wrapper for `reservoir_sampler`, which uses it to select a random sample of pages from a stream of bytes.
Adds glue needed to pass lz4 and zstd with streaming and/or dictionaries as the network traffic compressors for Seastar's RPC servers. The main jobs of this glue are: 1. Implementing the API expected by Seastar from RPC compressors. 2. Expose metrics about the effectiveness of the compression. 3. Allow dynamically switching algorithms and dictionaries on a running connection, without any extra waits. The biggest design decision here is that the choice of algorithm and dictionary is negotiated by both sides of the connection, not dictated unilaterally by the sender. The negotiation algorithm is fairly complicated (a TLA+ model validating it is included in the commit). Unilateral compression choice would be much simpler. However, negotiation avoids re-sending the same dictionary over every connection in the cluster after dictionary updates (with one-way communication, it's the only reliable way to ensure that our receiver possesses the dictionary we are about to start using), lets receivers ask for a cheaper compression mode if they want, and lets them refuse to update a dictionary if they don't think they have enough free memory for that. In hindsight, those properties probably weren't worth the extra complexity and extra development effort. Zstd can be quite expensive, so this patch also includes a mechanism which temporarily downgrades the compressor from zstd to lz4 if zstd has been using too much CPU in a given slice of time. But it should be noted that this can't be treated as a reliable "protection" from negative performance effects of zstd, since a downgrade can happen on the sender side, and receivers are at the mercy of senders.
Adds a new system table which will act as the medium for distributing compression dictionaries over the cluster. This table will be managed by Raft (group 0). It will be hooked up to it in follow-up commits.
Adds glue which causes the contents of system.dicts to be sent in group 0 snapshots, and causes a callback to be called when system.dicts is updated locally. The callback is currently empty and will be hooked up to the RPC compressor tracker in one of the next commits.
This "service" is a bag for code responsible for dictionary training, created to unclutter main() from dictionary-specific logic. It starts the RPC dictionary training loop when the relevant cluster feature is enabled, pauses and unpauses it appropriately whenever relevant config or leadership status are updated, and publishes new dictionaries whenever the training fiber produces them.
Docs Preview 📖Docs Preview for this pull request is available here Changed Files:
Note: This preview will be available for 30 days and will be automatically deleted after that period. You can manually trigger a new build by committing changes. |
probably we failed to port some changes from 76f1ee6785ad997c4f3c4d58cfd827c712e0df1a |
🟢 CI State: SUCCESS✅ - Build Build Details:
|
This patch sets up an `alien_worker`, `advanced_rpc_compression::tracker`, `dict_sampler` and `dictionary_service` in `main()`, and wires them to each other and to `messaging_service`. `messaging_service` compresses its network traffic with compressors managed by the `advanced_rpc_compression::tracker`. All this traffic is passed as a single merged "stream" through `dict_sampler`. `dictionary_service` has access to `dict_sampler`. On chosen nodes (by default: the Raft leader), it uses the sampler to maintain a random multi-megabyte sample of the sampler's stream. Every several minutes, it copies the sample, trains a compression dictionary on it (by calling zstd's training library via the `alien_worker` thread) and publishes the new dictionary to `system.dicts` via Raft. This update triggers a callback into `advanced_rpc_compression::tracker` on all nodes, which updates the dictionary used by the compressors it manages.
6d48bf5
to
fdb2d22
Compare
v2: Addressed Kefu's comments.
|
🟢 CI State: SUCCESS✅ - Build Build Details:
|
@avikivity Ping for review/merge. |
Ugh the typo fixes are unfortunate, they will complicate the master->enterprise merge for me. |
Shall I revert them? |
…git to scylladb.git' from Michał Chojnowski This is a forward port (from scylla-enterprise) of additional compression options (zstd, dictionaries shared across messages) for inter-node network traffic. It works as follows: After the patch, messaging_service (Scylla's interface for all inter-node communication) compresses its network traffic with compressors managed by the new advanced_rpc_compression::tracker. Those compressors compress with lz4, but can also be configured to use zstd as long as a CPU usage limit isn't crossed. A precomputed compression dictionary can be fed to the tracker. Each connection handled by the tracker will then start a negotiation with the other end to switch to this dictionary, and when it succeeds, the connection will start being compressed using that dictionary. All traffic going through the tracker is passed as a single merged "stream" through dict_sampler. dictionary_service has access to the dict_sampler. On chosen nodes (in the "usual" configuration: the Raft leader), it uses the sampler to maintain a random multi-megabyte sample of the sampler's stream. Every several minutes, it copies the sample, trains a compression dictionary on it (by calling zstd's training library via the alien_worker thread) and publishes the new dictionary to system.dicts via Raft's write_mutation command. This update triggers (eventually) a callback on all nodes, which feeds the new dictionary to advanced_rpc_compression::tracker, and this switches (eventually) all inter-node connections to this dictionary. Closes #22032 * github.com:scylladb/scylladb: messaging_service: use advanced_rpc_compression::tracker for compression message/dictionary_service: introduce dictionary_service service: make Raft group 0 aware of system.dicts db/system_keyspace: add system.dicts utils: add advanced_rpc_compressor utils: add dict_trainer utils: introduce reservoir_sampling utils: introduce alien_worker utils: add stream_compressor
No, I managed to resolve the problems. I saw that scylladb/scylla-enterprise@e0800d1421031bc6540de6dfa50c6986678b11d4 missed the boat and has to be forward-ported separately. |
This is a forward port (from scylla-enterprise) of additional compression options (zstd, dictionaries shared across messages) for inter-node network traffic. It works as follows:
After the patch, messaging_service (Scylla's interface for all inter-node communication)
compresses its network traffic with compressors managed by
the new advanced_rpc_compression::tracker. Those compressors compress with lz4,
but can also be configured to use zstd as long as a CPU usage limit isn't crossed.
A precomputed compression dictionary can be fed to the tracker. Each connection
handled by the tracker will then start a negotiation with the other end to switch
to this dictionary, and when it succeeds, the connection will start being compressed using that dictionary.
All traffic going through the tracker is passed as a single merged "stream" through dict_sampler.
dictionary_service has access to the dict_sampler.
On chosen nodes (in the "usual" configuration: the Raft leader), it uses the sampler to maintain
a random multi-megabyte sample of the sampler's stream. Every several minutes,
it copies the sample, trains a compression dictionary on it (by calling zstd's
training library via the alien_worker thread) and publishes the new dictionary
to system.dicts via Raft's write_mutation command.
This update triggers (eventually) a callback on all nodes, which feeds the new dictionary
to advanced_rpc_compression::tracker, and this switches (eventually) all inter-node connections
to this dictionary.