Skip to content

Commit

Permalink
pageserver: revert flush backpressure (#8550) (#10135)
Browse files Browse the repository at this point in the history
## Problem

In #8550, we made the flush loop wait for uploads after every layer.
This was to avoid unbounded buildup of uploads, and to reduce compaction
debt. However, the approach has several problems:

* It prevents upload parallelism.
* It prevents flush and upload pipelining.
* It slows down ingestion even when there is no need to backpressure.
* It does not directly backpressure WAL ingestion (only via
`disk_consistent_lsn`), and will build up in-memory layers.
* It does not directly backpressure based on compaction debt and read
amplification.

An alternative solution to these problems is proposed in #8390.

In the meanwhile, we revert the change to reduce the impact on ingest
throughput. This does reintroduce some risk of unbounded
upload/compaction buildup. Until
#8390, this can be addressed
in other ways:

* Use `max_replication_apply_lag` (aka `remote_consistent_lsn`), which
will more directly limit upload debt.
* Shard the tenant, which will spread the flush/upload work across more
Pageservers and move the bottleneck to Safekeeper.

Touches #10095.

## Summary of changes

Remove waiting on the upload queue in the flush loop.
  • Loading branch information
erikgrinaker authored Dec 15, 2024
1 parent cf161e1 commit f3ecd5d
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 112 deletions.
25 changes: 1 addition & 24 deletions pageserver/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use metrics::{
register_counter_vec, register_gauge_vec, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_pair_vec, register_int_counter_vec,
register_int_gauge, register_int_gauge_vec, register_uint_gauge, register_uint_gauge_vec,
Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
Counter, CounterVec, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterPair,
IntCounterPairVec, IntCounterVec, IntGauge, IntGaugeVec, UIntGauge, UIntGaugeVec,
};
use once_cell::sync::Lazy;
Expand Down Expand Up @@ -445,15 +445,6 @@ pub(crate) static WAIT_LSN_TIME: Lazy<Histogram> = Lazy::new(|| {
.expect("failed to define a metric")
});

static FLUSH_WAIT_UPLOAD_TIME: Lazy<GaugeVec> = Lazy::new(|| {
register_gauge_vec!(
"pageserver_flush_wait_upload_seconds",
"Time spent waiting for preceding uploads during layer flush",
&["tenant_id", "shard_id", "timeline_id"]
)
.expect("failed to define a metric")
});

static LAST_RECORD_LSN: Lazy<IntGaugeVec> = Lazy::new(|| {
register_int_gauge_vec!(
"pageserver_last_record_lsn",
Expand Down Expand Up @@ -2586,7 +2577,6 @@ pub(crate) struct TimelineMetrics {
shard_id: String,
timeline_id: String,
pub flush_time_histo: StorageTimeMetrics,
pub flush_wait_upload_time_gauge: Gauge,
pub compact_time_histo: StorageTimeMetrics,
pub create_images_time_histo: StorageTimeMetrics,
pub logical_size_histo: StorageTimeMetrics,
Expand Down Expand Up @@ -2632,9 +2622,6 @@ impl TimelineMetrics {
&shard_id,
&timeline_id,
);
let flush_wait_upload_time_gauge = FLUSH_WAIT_UPLOAD_TIME
.get_metric_with_label_values(&[&tenant_id, &shard_id, &timeline_id])
.unwrap();
let compact_time_histo = StorageTimeMetrics::new(
StorageTimeOperation::Compact,
&tenant_id,
Expand Down Expand Up @@ -2780,7 +2767,6 @@ impl TimelineMetrics {
shard_id,
timeline_id,
flush_time_histo,
flush_wait_upload_time_gauge,
compact_time_histo,
create_images_time_histo,
logical_size_histo,
Expand Down Expand Up @@ -2830,14 +2816,6 @@ impl TimelineMetrics {
self.resident_physical_size_gauge.get()
}

pub(crate) fn flush_wait_upload_time_gauge_add(&self, duration: f64) {
self.flush_wait_upload_time_gauge.add(duration);
crate::metrics::FLUSH_WAIT_UPLOAD_TIME
.get_metric_with_label_values(&[&self.tenant_id, &self.shard_id, &self.timeline_id])
.unwrap()
.add(duration);
}

pub(crate) fn shutdown(&self) {
let was_shutdown = self
.shutdown
Expand All @@ -2855,7 +2833,6 @@ impl TimelineMetrics {
let shard_id = &self.shard_id;
let _ = LAST_RECORD_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = DISK_CONSISTENT_LSN.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = FLUSH_WAIT_UPLOAD_TIME.remove_label_values(&[tenant_id, shard_id, timeline_id]);
let _ = STANDBY_HORIZON.remove_label_values(&[tenant_id, shard_id, timeline_id]);
{
RESIDENT_PHYSICAL_SIZE_GLOBAL.sub(self.resident_physical_size_get());
Expand Down
38 changes: 8 additions & 30 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,19 +144,15 @@ use self::layer_manager::LayerManager;
use self::logical_size::LogicalSize;
use self::walreceiver::{WalReceiver, WalReceiverConf};

use super::config::TenantConf;
use super::remote_timeline_client::index::IndexPart;
use super::remote_timeline_client::RemoteTimelineClient;
use super::secondary::heatmap::{HeatMapLayer, HeatMapTimeline};
use super::storage_layer::{LayerFringe, LayerVisibilityHint, ReadableLayer};
use super::upload_queue::NotInitialized;
use super::GcError;
use super::{
config::TenantConf, storage_layer::LayerVisibilityHint, upload_queue::NotInitialized,
MaybeOffloaded,
};
use super::{debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf};
use super::{remote_timeline_client::index::IndexPart, storage_layer::LayerFringe};
use super::{
remote_timeline_client::RemoteTimelineClient, remote_timeline_client::WaitCompletionError,
storage_layer::ReadableLayer,
};
use super::{
secondary::heatmap::{HeatMapLayer, HeatMapTimeline},
GcError,
debug_assert_current_span_has_tenant_and_timeline_id, AttachedTenantConf, MaybeOffloaded,
};

#[cfg(test)]
Expand Down Expand Up @@ -3897,24 +3893,6 @@ impl Timeline {
// release lock on 'layers'
};

// Backpressure mechanism: wait with continuation of the flush loop until we have uploaded all layer files.
// This makes us refuse ingest until the new layers have been persisted to the remote
let start = Instant::now();
self.remote_client
.wait_completion()
.await
.map_err(|e| match e {
WaitCompletionError::UploadQueueShutDownOrStopped
| WaitCompletionError::NotInitialized(
NotInitialized::ShuttingDown | NotInitialized::Stopped,
) => FlushLayerError::Cancelled,
WaitCompletionError::NotInitialized(NotInitialized::Uninitialized) => {
FlushLayerError::Other(anyhow!(e).into())
}
})?;
let duration = start.elapsed().as_secs_f64();
self.metrics.flush_wait_upload_time_gauge_add(duration);

// FIXME: between create_delta_layer and the scheduling of the upload in `update_metadata_file`,
// a compaction can delete the file and then it won't be available for uploads any more.
// We still schedule the upload, resulting in an error, but ideally we'd somehow avoid this
Expand Down
1 change: 0 additions & 1 deletion test_runner/fixtures/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ def counter(name: str) -> str:
"pageserver_evictions_with_low_residence_duration_total",
"pageserver_aux_file_estimated_size",
"pageserver_valid_lsn_lease_count",
"pageserver_flush_wait_upload_seconds",
counter("pageserver_tenant_throttling_count_accounted_start"),
counter("pageserver_tenant_throttling_count_accounted_finish"),
counter("pageserver_tenant_throttling_wait_usecs_sum"),
Expand Down
13 changes: 4 additions & 9 deletions test_runner/regress/test_branching.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from fixtures.utils import query_scalar
from performance.test_perf_pgbench import get_scales_matrix
from requests import RequestException
from requests.exceptions import RetryError


# Test branch creation
Expand Down Expand Up @@ -176,11 +177,8 @@ def start_creating_timeline():

env.neon_cli.mappings_map_branch(initial_branch, env.initial_tenant, env.initial_timeline)

with pytest.raises(RuntimeError, match="ERROR: Not found: Timeline"):
env.endpoints.create_start(
initial_branch, tenant_id=env.initial_tenant, basebackup_request_tries=2
)
ps_http.configure_failpoints(("before-upload-index-pausable", "off"))
with pytest.raises(RuntimeError, match="is not active, state: Loading"):
env.endpoints.create_start(initial_branch, tenant_id=env.initial_tenant)
finally:
env.pageserver.stop(immediate=True)

Expand Down Expand Up @@ -221,10 +219,7 @@ def start_creating_timeline():

branch_id = TimelineId.generate()

with pytest.raises(
PageserverApiException,
match="Cannot branch off the timeline that's not present in pageserver",
):
with pytest.raises(RetryError, match="too many 503 error responses"):
ps_http.timeline_create(
env.pg_version,
env.initial_tenant,
Expand Down
48 changes: 0 additions & 48 deletions test_runner/regress/test_remote_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,54 +784,6 @@ def create_in_background():
create_thread.join()


def test_paused_upload_stalls_checkpoint(
neon_env_builder: NeonEnvBuilder,
):
"""
This test checks that checkpoints block on uploads to remote storage.
"""
neon_env_builder.enable_pageserver_remote_storage(RemoteStorageKind.LOCAL_FS)

env = neon_env_builder.init_start(
initial_tenant_conf={
# Set a small compaction threshold
"compaction_threshold": "3",
# Disable GC
"gc_period": "0s",
# disable PITR
"pitr_interval": "0s",
}
)

env.pageserver.allowed_errors.append(
f".*PUT.* path=/v1/tenant/{env.initial_tenant}/timeline.* request was dropped before completing"
)

tenant_id = env.initial_tenant
timeline_id = env.initial_timeline

client = env.pageserver.http_client()
layers_at_creation = client.layer_map_info(tenant_id, timeline_id)
deltas_at_creation = len(layers_at_creation.delta_layers())
assert (
deltas_at_creation == 1
), "are you fixing #5863? make sure we end up with 2 deltas at the end of endpoint lifecycle"

# Make new layer uploads get stuck.
# Note that timeline creation waits for the initial layers to reach remote storage.
# So at this point, the `layers_at_creation` are in remote storage.
client.configure_failpoints(("before-upload-layer-pausable", "pause"))

with env.endpoints.create_start("main", tenant_id=tenant_id) as endpoint:
# Build two tables with some data inside
endpoint.safe_psql("CREATE TABLE foo AS SELECT x FROM generate_series(1, 10000) g(x)")
wait_for_last_flush_lsn(env, endpoint, tenant_id, timeline_id)

with pytest.raises(ReadTimeout):
client.timeline_checkpoint(tenant_id, timeline_id, timeout=5)
client.configure_failpoints(("before-upload-layer-pausable", "off"))


def wait_upload_queue_empty(
client: PageserverHttpClient, tenant_id: TenantId, timeline_id: TimelineId
):
Expand Down

1 comment on commit f3ecd5d

@github-actions
Copy link

@github-actions github-actions bot commented on f3ecd5d Dec 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7286 tests run: 6978 passed, 1 failed, 307 skipped (full report)


Failures on Postgres 16

  • test_storage_controller_many_tenants[github-actions-selfhosted]: release-x86-64
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_storage_controller_many_tenants[release-pg16-github-actions-selfhosted]"
Flaky tests (6)

Postgres 17

Postgres 14

  • test_pgdata_import_smoke[None-1024-RelBlockSize.MULTIPLE_RELATION_SEGMENTS]: release-arm64

Code coverage* (full report)

  • functions: 31.4% (8396 of 26775 functions)
  • lines: 48.0% (66605 of 138627 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
f3ecd5d at 2024-12-15T23:39:38.716Z :recycle:

Please sign in to comment.