diff --git a/control_plane/src/pageserver.rs b/control_plane/src/pageserver.rs index ef5b3d65934c..c15042e42a0d 100644 --- a/control_plane/src/pageserver.rs +++ b/control_plane/src/pageserver.rs @@ -418,6 +418,21 @@ impl PageServerNode { .map(serde_json::from_str) .transpose() .context("parse `wal_receiver_protocol_override` from json")?, + gc_compaction_enabled: settings + .remove("gc_compaction_enabled") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'gc_compaction_enabled' as bool")?, + gc_compaction_initial_threshold_mb: settings + .remove("gc_compaction_initial_threshold_mb") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'gc_compaction_initial_threshold_mb' as integer")?, + gc_compaction_ratio_percent: settings + .remove("gc_compaction_ratio_percent") + .map(|x| x.parse::()) + .transpose() + .context("Failed to parse 'gc_compaction_ratio_percent' as integer")?, }; if !settings.is_empty() { bail!("Unrecognized tenant settings: {settings:?}") diff --git a/libs/pageserver_api/src/config.rs b/libs/pageserver_api/src/config.rs index 09cfbc55fd1c..b30e93414202 100644 --- a/libs/pageserver_api/src/config.rs +++ b/libs/pageserver_api/src/config.rs @@ -301,6 +301,11 @@ pub struct TenantConfigToml { pub timeline_offloading: bool, pub wal_receiver_protocol_override: Option, + + /// gc-compaction related configs + pub gc_compaction_enabled: bool, + pub gc_compaction_initial_threshold_mb: u64, + pub gc_compaction_ratio_percent: u64, } pub mod defaults { @@ -494,6 +499,9 @@ pub mod tenant_conf_defaults { // By default ingest enough WAL for two new L0 layers before checking if new image // image layers should be created. pub const DEFAULT_IMAGE_LAYER_CREATION_CHECK_THRESHOLD: u8 = 2; + pub const DEFAULT_GC_COMPACTION_ENABLED: bool = false; + pub const DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_MB: u64 = 10240; + pub const DEFAULT_GC_COMPACTION_RATIO_PERCENT: u64 = 100; } impl Default for TenantConfigToml { @@ -538,6 +546,9 @@ impl Default for TenantConfigToml { lsn_lease_length_for_ts: LsnLease::DEFAULT_LENGTH_FOR_TS, timeline_offloading: false, wal_receiver_protocol_override: None, + gc_compaction_enabled: DEFAULT_GC_COMPACTION_ENABLED, + gc_compaction_initial_threshold_mb: DEFAULT_GC_COMPACTION_INITIAL_THRESHOLD_MB, + gc_compaction_ratio_percent: DEFAULT_GC_COMPACTION_RATIO_PERCENT, } } } diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index f3fc9fad760a..2912c9bcf694 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -272,6 +272,8 @@ pub struct CompactInfoResponse { pub compact_key_range: Option, pub compact_lsn_range: Option, pub sub_compaction: bool, + pub running: bool, + pub job_id: usize, } #[derive(Serialize, Deserialize, Clone)] @@ -496,6 +498,12 @@ pub struct TenantConfigPatch { pub timeline_offloading: FieldPatch, #[serde(skip_serializing_if = "FieldPatch::is_noop")] pub wal_receiver_protocol_override: FieldPatch, + #[serde(skip_serializing_if = "FieldPatch::is_noop")] + pub gc_compaction_enabled: FieldPatch, + #[serde(skip_serializing_if = "FieldPatch::is_noop")] + pub gc_compaction_initial_threshold_mb: FieldPatch, + #[serde(skip_serializing_if = "FieldPatch::is_noop")] + pub gc_compaction_ratio_percent: FieldPatch, } /// An alternative representation of `pageserver::tenant::TenantConf` with @@ -527,6 +535,9 @@ pub struct TenantConfig { pub lsn_lease_length_for_ts: Option, pub timeline_offloading: Option, pub wal_receiver_protocol_override: Option, + pub gc_compaction_enabled: Option, + pub gc_compaction_initial_threshold_mb: Option, + pub gc_compaction_ratio_percent: Option, } impl TenantConfig { @@ -556,6 +567,9 @@ impl TenantConfig { mut lsn_lease_length_for_ts, mut timeline_offloading, mut wal_receiver_protocol_override, + mut gc_compaction_enabled, + mut gc_compaction_initial_threshold_mb, + mut gc_compaction_ratio_percent, } = self; patch.checkpoint_distance.apply(&mut checkpoint_distance); @@ -600,6 +614,15 @@ impl TenantConfig { patch .wal_receiver_protocol_override .apply(&mut wal_receiver_protocol_override); + patch + .gc_compaction_enabled + .apply(&mut gc_compaction_enabled); + patch + .gc_compaction_initial_threshold_mb + .apply(&mut gc_compaction_initial_threshold_mb); + patch + .gc_compaction_ratio_percent + .apply(&mut gc_compaction_ratio_percent); Self { checkpoint_distance, @@ -626,6 +649,9 @@ impl TenantConfig { lsn_lease_length_for_ts, timeline_offloading, wal_receiver_protocol_override, + gc_compaction_enabled, + gc_compaction_initial_threshold_mb, + gc_compaction_ratio_percent, } } } diff --git a/pageserver/src/http/routes.rs b/pageserver/src/http/routes.rs index 60ef4c3702f4..94e0b101bd23 100644 --- a/pageserver/src/http/routes.rs +++ b/pageserver/src/http/routes.rs @@ -97,8 +97,8 @@ use crate::tenant::{LogicalSizeCalculationCause, PageReconstructError}; use crate::DEFAULT_PG_VERSION; use crate::{disk_usage_eviction_task, tenant}; use pageserver_api::models::{ - CompactInfoResponse, StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, - TimelineGcRequest, TimelineInfo, + StatusResponse, TenantConfigRequest, TenantInfo, TimelineCreateRequest, TimelineGcRequest, + TimelineInfo, }; use utils::{ auth::SwappableJwtAuth, @@ -2052,15 +2052,7 @@ async fn timeline_compact_info_handler( let tenant = state .tenant_manager .get_attached_tenant_shard(tenant_shard_id)?; - let res = tenant.get_scheduled_compaction_tasks(timeline_id); - let mut resp = Vec::new(); - for item in res { - resp.push(CompactInfoResponse { - compact_key_range: item.compact_key_range, - compact_lsn_range: item.compact_lsn_range, - sub_compaction: item.sub_compaction, - }); - } + let resp = tenant.get_scheduled_compaction_tasks(timeline_id); json_response(StatusCode::OK, resp) } .instrument(info_span!("timeline_compact_info", tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %timeline_id)) diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index 2e4c47c6e40f..3a166db4707b 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -21,6 +21,7 @@ use enumset::EnumSet; use futures::stream::FuturesUnordered; use futures::StreamExt; use pageserver_api::models; +use pageserver_api::models::CompactInfoResponse; use pageserver_api::models::LsnLease; use pageserver_api::models::TimelineArchivalState; use pageserver_api::models::TimelineState; @@ -37,20 +38,16 @@ use remote_timeline_client::manifest::{ }; use remote_timeline_client::UploadQueueNotReadyError; use std::collections::BTreeMap; -use std::collections::VecDeque; use std::fmt; use std::future::Future; use std::sync::atomic::AtomicBool; use std::sync::Weak; use std::time::SystemTime; use storage_broker::BrokerClientChannel; -use timeline::compaction::GcCompactJob; -use timeline::compaction::ScheduledCompactionTask; +use timeline::compaction::GcCompactionQueue; use timeline::import_pgdata; use timeline::offload::offload_timeline; -use timeline::CompactFlags; use timeline::CompactOptions; -use timeline::CompactionError; use timeline::ShutdownMode; use tokio::io::BufReader; use tokio::sync::watch; @@ -346,10 +343,8 @@ pub struct Tenant { /// Overhead of mutex is acceptable because compaction is done with a multi-second period. compaction_circuit_breaker: std::sync::Mutex, - /// Scheduled compaction tasks. Currently, this can only be populated by triggering - /// a manual gc-compaction from the manual compaction API. - scheduled_compaction_tasks: - std::sync::Mutex>>, + /// Scheduled gc-compaction tasks. + scheduled_compaction_tasks: std::sync::Mutex>>, /// If the tenant is in Activating state, notify this to encourage it /// to proceed to Active as soon as possible, rather than waiting for lazy @@ -2990,104 +2985,18 @@ impl Tenant { if has_pending_l0_compaction_task { Some(true) } else { - let mut has_pending_scheduled_compaction_task; - let next_scheduled_compaction_task = { - let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); - if let Some(tline_pending_tasks) = guard.get_mut(timeline_id) { - if !tline_pending_tasks.is_empty() { - info!( - "{} tasks left in the compaction schedule queue", - tline_pending_tasks.len() - ); - } - let next_task = tline_pending_tasks.pop_front(); - has_pending_scheduled_compaction_task = !tline_pending_tasks.is_empty(); - next_task - } else { - has_pending_scheduled_compaction_task = false; - None - } + let queue = { + let guard = self.scheduled_compaction_tasks.lock().unwrap(); + guard.get(timeline_id).cloned() }; - if let Some(mut next_scheduled_compaction_task) = next_scheduled_compaction_task - { - if !next_scheduled_compaction_task - .options - .flags - .contains(CompactFlags::EnhancedGcBottomMostCompaction) - { - warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", next_scheduled_compaction_task.options); - } else if next_scheduled_compaction_task.options.sub_compaction { - info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"); - let jobs: Vec = timeline - .gc_compaction_split_jobs( - GcCompactJob::from_compact_options( - next_scheduled_compaction_task.options.clone(), - ), - next_scheduled_compaction_task - .options - .sub_compaction_max_job_size_mb, - ) - .await - .map_err(CompactionError::Other)?; - if jobs.is_empty() { - info!("no jobs to run, skipping scheduled compaction task"); - } else { - has_pending_scheduled_compaction_task = true; - let jobs_len = jobs.len(); - let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); - let tline_pending_tasks = guard.entry(*timeline_id).or_default(); - for (idx, job) in jobs.into_iter().enumerate() { - // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions` - // until we do further refactors to allow directly call `compact_with_gc`. - let mut flags: EnumSet = EnumSet::default(); - flags |= CompactFlags::EnhancedGcBottomMostCompaction; - if job.dry_run { - flags |= CompactFlags::DryRun; - } - let options = CompactOptions { - flags, - sub_compaction: false, - compact_key_range: Some(job.compact_key_range.into()), - compact_lsn_range: Some(job.compact_lsn_range.into()), - sub_compaction_max_job_size_mb: None, - }; - tline_pending_tasks.push_back(if idx == jobs_len - 1 { - ScheduledCompactionTask { - options, - // The last job in the queue sends the signal and releases the gc guard - result_tx: next_scheduled_compaction_task - .result_tx - .take(), - gc_block: next_scheduled_compaction_task - .gc_block - .take(), - } - } else { - ScheduledCompactionTask { - options, - result_tx: None, - gc_block: None, - } - }); - } - info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len); - } - } else { - let _ = timeline - .compact_with_options( - cancel, - next_scheduled_compaction_task.options, - ctx, - ) - .instrument(info_span!("scheduled_compact_timeline", %timeline_id)) - .await?; - if let Some(tx) = next_scheduled_compaction_task.result_tx.take() { - // TODO: we can send compaction statistics in the future - tx.send(()).ok(); - } - } + if let Some(queue) = queue { + let has_pending_tasks = queue + .iteration(cancel, ctx, &self.gc_block, timeline) + .await?; + Some(has_pending_tasks) + } else { + Some(false) } - Some(has_pending_scheduled_compaction_task) } } else { None @@ -3109,34 +3018,32 @@ impl Tenant { } /// Cancel scheduled compaction tasks - pub(crate) fn cancel_scheduled_compaction( - &self, - timeline_id: TimelineId, - ) -> Vec { + pub(crate) fn cancel_scheduled_compaction(&self, timeline_id: TimelineId) { let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); - if let Some(tline_pending_tasks) = guard.get_mut(&timeline_id) { - let current_tline_pending_tasks = std::mem::take(tline_pending_tasks); - current_tline_pending_tasks.into_iter().collect() - } else { - Vec::new() + if let Some(q) = guard.get_mut(&timeline_id) { + q.cancel_scheduled(); } } pub(crate) fn get_scheduled_compaction_tasks( &self, timeline_id: TimelineId, - ) -> Vec { - use itertools::Itertools; - let guard = self.scheduled_compaction_tasks.lock().unwrap(); - guard - .get(&timeline_id) - .map(|tline_pending_tasks| { - tline_pending_tasks - .iter() - .map(|x| x.options.clone()) - .collect_vec() - }) - .unwrap_or_default() + ) -> Vec { + let res = { + let guard = self.scheduled_compaction_tasks.lock().unwrap(); + guard.get(&timeline_id).map(|q| q.remaining_jobs()) + }; + let Some((running, remaining)) = res else { + return Vec::new(); + }; + let mut result = Vec::new(); + if let Some((id, running)) = running { + result.extend(running.into_compact_info_resp(id, true)); + } + for (id, job) in remaining { + result.extend(job.into_compact_info_resp(id, false)); + } + result } /// Schedule a compaction task for a timeline. @@ -3145,20 +3052,12 @@ impl Tenant { timeline_id: TimelineId, options: CompactOptions, ) -> anyhow::Result> { - let gc_guard = match self.gc_block.start().await { - Ok(guard) => guard, - Err(e) => { - bail!("cannot run gc-compaction because gc is blocked: {}", e); - } - }; let (tx, rx) = tokio::sync::oneshot::channel(); let mut guard = self.scheduled_compaction_tasks.lock().unwrap(); - let tline_pending_tasks = guard.entry(timeline_id).or_default(); - tline_pending_tasks.push_back(ScheduledCompactionTask { - options, - result_tx: Some(tx), - gc_block: Some(gc_guard), - }); + let q = guard + .entry(timeline_id) + .or_insert_with(|| Arc::new(GcCompactionQueue::new())); + q.schedule_manual_compaction(options, Some(tx)); Ok(rx) } @@ -5537,6 +5436,9 @@ pub(crate) mod harness { lsn_lease_length_for_ts: Some(tenant_conf.lsn_lease_length_for_ts), timeline_offloading: Some(tenant_conf.timeline_offloading), wal_receiver_protocol_override: tenant_conf.wal_receiver_protocol_override, + gc_compaction_enabled: None, + gc_compaction_initial_threshold_mb: None, + gc_compaction_ratio_percent: None, } } } diff --git a/pageserver/src/tenant/config.rs b/pageserver/src/tenant/config.rs index d54dded7782d..50a95a574e6c 100644 --- a/pageserver/src/tenant/config.rs +++ b/pageserver/src/tenant/config.rs @@ -357,6 +357,15 @@ pub struct TenantConfOpt { #[serde(skip_serializing_if = "Option::is_none")] pub wal_receiver_protocol_override: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub gc_compaction_enabled: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub gc_compaction_initial_threshold_mb: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub gc_compaction_ratio_percent: Option, } impl TenantConfOpt { @@ -425,6 +434,15 @@ impl TenantConfOpt { wal_receiver_protocol_override: self .wal_receiver_protocol_override .or(global_conf.wal_receiver_protocol_override), + gc_compaction_enabled: self + .gc_compaction_enabled + .unwrap_or(global_conf.gc_compaction_enabled), + gc_compaction_initial_threshold_mb: self + .gc_compaction_initial_threshold_mb + .unwrap_or(global_conf.gc_compaction_initial_threshold_mb), + gc_compaction_ratio_percent: self + .gc_compaction_ratio_percent + .unwrap_or(global_conf.gc_compaction_ratio_percent), } } @@ -454,6 +472,9 @@ impl TenantConfOpt { mut lsn_lease_length_for_ts, mut timeline_offloading, mut wal_receiver_protocol_override, + mut gc_compaction_enabled, + mut gc_compaction_initial_threshold_mb, + mut gc_compaction_ratio_percent, } = self; patch.checkpoint_distance.apply(&mut checkpoint_distance); @@ -522,6 +543,15 @@ impl TenantConfOpt { patch .wal_receiver_protocol_override .apply(&mut wal_receiver_protocol_override); + patch + .gc_compaction_enabled + .apply(&mut gc_compaction_enabled); + patch + .gc_compaction_initial_threshold_mb + .apply(&mut gc_compaction_initial_threshold_mb); + patch + .gc_compaction_ratio_percent + .apply(&mut gc_compaction_ratio_percent); Ok(Self { checkpoint_distance, @@ -548,6 +578,9 @@ impl TenantConfOpt { lsn_lease_length_for_ts, timeline_offloading, wal_receiver_protocol_override, + gc_compaction_enabled, + gc_compaction_initial_threshold_mb, + gc_compaction_ratio_percent, }) } } @@ -603,6 +636,9 @@ impl From for models::TenantConfig { lsn_lease_length_for_ts: value.lsn_lease_length_for_ts.map(humantime), timeline_offloading: value.timeline_offloading, wal_receiver_protocol_override: value.wal_receiver_protocol_override, + gc_compaction_enabled: value.gc_compaction_enabled, + gc_compaction_initial_threshold_mb: value.gc_compaction_initial_threshold_mb, + gc_compaction_ratio_percent: value.gc_compaction_ratio_percent, } } } diff --git a/pageserver/src/tenant/remote_timeline_client/index.rs b/pageserver/src/tenant/remote_timeline_client/index.rs index 506990fb2fa4..78879a819592 100644 --- a/pageserver/src/tenant/remote_timeline_client/index.rs +++ b/pageserver/src/tenant/remote_timeline_client/index.rs @@ -79,8 +79,14 @@ pub struct IndexPart { /// /// None means no aux files have been written to the storage before the point /// when this flag is introduced. + /// + /// This flag is not used any more as all tenants have been transitioned to the new aux file policy. #[serde(skip_serializing_if = "Option::is_none", default)] pub(crate) last_aux_file_policy: Option, + + /// The LSN of gc-compaction horizon. Once gc-compaction is finished for all layer files below an LSN, this LSN will be updated. + #[serde(skip_serializing_if = "Option::is_none", default)] + pub(crate) l2_lsn: Option, } impl IndexPart { @@ -99,10 +105,11 @@ impl IndexPart { /// - 8: added `archived_at` /// - 9: +gc_blocking /// - 10: +import_pgdata - const LATEST_VERSION: usize = 10; + /// - 11: +l2_lsn + const LATEST_VERSION: usize = 11; // Versions we may see when reading from a bucket. - pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; + pub const KNOWN_VERSIONS: &'static [usize] = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]; pub const FILE_NAME: &'static str = "index_part.json"; @@ -118,6 +125,7 @@ impl IndexPart { gc_blocking: None, last_aux_file_policy: None, import_pgdata: None, + l2_lsn: None, } } @@ -392,6 +400,7 @@ mod tests { gc_blocking: None, last_aux_file_policy: None, import_pgdata: None, + l2_lsn: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -437,6 +446,7 @@ mod tests { gc_blocking: None, last_aux_file_policy: None, import_pgdata: None, + l2_lsn: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -483,6 +493,7 @@ mod tests { gc_blocking: None, last_aux_file_policy: None, import_pgdata: None, + l2_lsn: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -532,6 +543,7 @@ mod tests { gc_blocking: None, last_aux_file_policy: None, import_pgdata: None, + l2_lsn: None, }; let empty_layers_parsed = IndexPart::from_json_bytes(empty_layers_json.as_bytes()).unwrap(); @@ -576,6 +588,7 @@ mod tests { gc_blocking: None, last_aux_file_policy: None, import_pgdata: None, + l2_lsn: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -623,6 +636,7 @@ mod tests { gc_blocking: None, last_aux_file_policy: None, import_pgdata: None, + l2_lsn: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -675,6 +689,7 @@ mod tests { gc_blocking: None, last_aux_file_policy: Some(AuxFilePolicy::V2), import_pgdata: None, + l2_lsn: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -732,6 +747,7 @@ mod tests { gc_blocking: None, last_aux_file_policy: Default::default(), import_pgdata: None, + l2_lsn: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -790,6 +806,7 @@ mod tests { gc_blocking: None, last_aux_file_policy: Default::default(), import_pgdata: None, + l2_lsn: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -853,6 +870,7 @@ mod tests { last_aux_file_policy: Default::default(), archived_at: None, import_pgdata: None, + l2_lsn: None, }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); @@ -928,7 +946,86 @@ mod tests { started_at: parse_naive_datetime("2024-11-13T09:23:42.123000000"), finished_at: parse_naive_datetime("2024-11-13T09:42:23.123000000"), idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new("specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5".to_string()), - }))) + }))), + l2_lsn: None, + }; + + let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); + assert_eq!(part, expected); + } + + #[test] + fn v11_importpgdata_is_parsed() { + let example = r#"{ + "version": 10, + "layer_metadata":{ + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9": { "file_size": 25600000 }, + "000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51": { "file_size": 9007199254741001 } + }, + "disk_consistent_lsn":"0/16960E8", + "metadata": { + "disk_consistent_lsn": "0/16960E8", + "prev_record_lsn": "0/1696070", + "ancestor_timeline": "e45a7f37d3ee2ff17dc14bf4f4e3f52e", + "ancestor_lsn": "0/0", + "latest_gc_cutoff_lsn": "0/1696070", + "initdb_lsn": "0/1696070", + "pg_version": 14 + }, + "gc_blocking": { + "started_at": "2024-07-19T09:00:00.123", + "reasons": ["DetachAncestor"] + }, + "import_pgdata": { + "V1": { + "Done": { + "idempotency_key": "specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5", + "started_at": "2024-11-13T09:23:42.123", + "finished_at": "2024-11-13T09:42:23.123" + } + } + }, + "l2_lsn": "0/1686070" + }"#; + + let expected = IndexPart { + version: 10, + layer_metadata: HashMap::from([ + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__0000000001696070-00000000016960E9".parse().unwrap(), LayerFileMetadata { + file_size: 25600000, + generation: Generation::none(), + shard: ShardIndex::unsharded() + }), + ("000000000000000000000000000000000000-FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF__00000000016B59D8-00000000016B5A51".parse().unwrap(), LayerFileMetadata { + file_size: 9007199254741001, + generation: Generation::none(), + shard: ShardIndex::unsharded() + }) + ]), + disk_consistent_lsn: "0/16960E8".parse::().unwrap(), + metadata: TimelineMetadata::new( + Lsn::from_str("0/16960E8").unwrap(), + Some(Lsn::from_str("0/1696070").unwrap()), + Some(TimelineId::from_str("e45a7f37d3ee2ff17dc14bf4f4e3f52e").unwrap()), + Lsn::INVALID, + Lsn::from_str("0/1696070").unwrap(), + Lsn::from_str("0/1696070").unwrap(), + 14, + ).with_recalculated_checksum().unwrap(), + deleted_at: None, + lineage: Default::default(), + gc_blocking: Some(GcBlocking { + started_at: parse_naive_datetime("2024-07-19T09:00:00.123000000"), + reasons: enumset::EnumSet::from_iter([GcBlockingReason::DetachAncestor]), + }), + last_aux_file_policy: Default::default(), + archived_at: None, + import_pgdata: Some(import_pgdata::index_part_format::Root::V1(import_pgdata::index_part_format::V1::Done(import_pgdata::index_part_format::Done{ + started_at: parse_naive_datetime("2024-11-13T09:23:42.123000000"), + finished_at: parse_naive_datetime("2024-11-13T09:42:23.123000000"), + idempotency_key: import_pgdata::index_part_format::IdempotencyKey::new("specified-by-client-218a5213-5044-4562-a28d-d024c5f057f5".to_string()), + }))), + l2_lsn: Some("0/1686070".parse::().unwrap()), }; let part = IndexPart::from_json_bytes(example.as_bytes()).unwrap(); diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 94c65631b206..35a5a7bda592 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -4,7 +4,7 @@ //! //! The old legacy algorithm is implemented directly in `timeline.rs`. -use std::collections::{BinaryHeap, HashMap, HashSet}; +use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque}; use std::ops::{Deref, Range}; use std::sync::Arc; @@ -16,10 +16,12 @@ use super::{ use anyhow::{anyhow, bail, Context}; use bytes::Bytes; +use enumset::EnumSet; use fail::fail_point; use itertools::Itertools; use pageserver_api::key::KEY_SIZE; use pageserver_api::keyspace::ShardedRange; +use pageserver_api::models::CompactInfoResponse; use pageserver_api::shard::{ShardCount, ShardIdentity, TenantShardId}; use serde::Serialize; use tokio_util::sync::CancellationToken; @@ -30,6 +32,7 @@ use crate::context::{AccessStatsBehavior, RequestContext, RequestContextBuilder} use crate::page_cache; use crate::statvfs::Statvfs; use crate::tenant::checks::check_valid_layermap; +use crate::tenant::gc_block::GcBlock; use crate::tenant::remote_timeline_client::WaitCompletionError; use crate::tenant::storage_layer::batch_split_writer::{ BatchWriterResult, SplitDeltaLayerWriter, SplitImageLayerWriter, @@ -63,16 +66,259 @@ use super::CompactionError; /// Maximum number of deltas before generating an image layer in bottom-most compaction. const COMPACTION_DELTA_THRESHOLD: usize = 5; -/// A scheduled compaction task. -pub(crate) struct ScheduledCompactionTask { - /// It's unfortunate that we need to store a compact options struct here because the only outer - /// API we can call here is `compact_with_options` which does a few setup calls before starting the - /// actual compaction job... We should refactor this to store `GcCompactionJob` in the future. - pub options: CompactOptions, - /// The channel to send the compaction result. If this is a subcompaction, the last compaction job holds the sender. - pub result_tx: Option>, - /// Hold the GC block. If this is a subcompaction, the last compaction job holds the gc block guard. - pub gc_block: Option, +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] +pub struct GcCompactionJobId(pub usize); + +#[derive(Debug, Clone)] +pub enum GcCompactionQueueItem { + Manual(CompactOptions), + SubCompactionJob(CompactOptions), + #[allow(dead_code)] + UpdateL2Lsn(Lsn), + Notify(GcCompactionJobId), +} + +impl GcCompactionQueueItem { + pub fn into_compact_info_resp( + self, + id: GcCompactionJobId, + running: bool, + ) -> Option { + match self { + GcCompactionQueueItem::Manual(options) => Some(CompactInfoResponse { + compact_key_range: options.compact_key_range, + compact_lsn_range: options.compact_lsn_range, + sub_compaction: options.sub_compaction, + running, + job_id: id.0, + }), + GcCompactionQueueItem::SubCompactionJob(options) => Some(CompactInfoResponse { + compact_key_range: options.compact_key_range, + compact_lsn_range: options.compact_lsn_range, + sub_compaction: options.sub_compaction, + running, + job_id: id.0, + }), + GcCompactionQueueItem::UpdateL2Lsn(_) => None, + GcCompactionQueueItem::Notify(_) => None, + } + } +} + +struct GcCompactionQueueInner { + running: Option<(GcCompactionJobId, GcCompactionQueueItem)>, + queued: VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>, + notify: HashMap>, + gc_guards: HashMap, + last_id: GcCompactionJobId, +} + +impl GcCompactionQueueInner { + fn next_id(&mut self) -> GcCompactionJobId { + let id = self.last_id; + self.last_id = GcCompactionJobId(id.0 + 1); + id + } +} + +/// A structure to store gc_compaction jobs. +pub struct GcCompactionQueue { + /// All items in the queue, and the currently-running job. + inner: std::sync::Mutex, + /// Ensure only one thread is consuming the queue. + queue_lock: tokio::sync::Mutex<()>, +} + +impl GcCompactionQueue { + pub fn new() -> Self { + GcCompactionQueue { + inner: std::sync::Mutex::new(GcCompactionQueueInner { + running: None, + queued: VecDeque::new(), + notify: HashMap::new(), + gc_guards: HashMap::new(), + last_id: GcCompactionJobId(0), + }), + queue_lock: tokio::sync::Mutex::new(()), + } + } + + pub fn cancel_scheduled(&self) { + let mut guard = self.inner.lock().unwrap(); + guard.queued.clear(); + guard.notify.clear(); + guard.gc_guards.clear(); + } + + /// Schedule a manual compaction job. + pub fn schedule_manual_compaction( + &self, + options: CompactOptions, + notify: Option>, + ) -> GcCompactionJobId { + let mut guard = self.inner.lock().unwrap(); + let id = guard.next_id(); + guard + .queued + .push_back((id, GcCompactionQueueItem::Manual(options))); + if let Some(notify) = notify { + guard.notify.insert(id, notify); + } + id + } + + /// Trigger an auto compaction. + #[allow(dead_code)] + pub fn trigger_auto_compaction(&self, _: &Arc) {} + + /// Notify the caller the job has finished and unblock GC. + fn notify_and_unblock(&self, id: GcCompactionJobId) { + let mut guard = self.inner.lock().unwrap(); + if let Some(blocking) = guard.gc_guards.remove(&id) { + drop(blocking) + } + if let Some(tx) = guard.notify.remove(&id) { + let _ = tx.send(()); + } + } + + /// Take a job from the queue and process it. Returns if there are still pending tasks. + pub async fn iteration( + &self, + cancel: &CancellationToken, + ctx: &RequestContext, + gc_block: &GcBlock, + timeline: &Arc, + ) -> Result { + let _one_op_at_a_time_guard = self.queue_lock.lock().await; + let has_pending_tasks; + let (id, item) = { + let mut guard = self.inner.lock().unwrap(); + let Some((id, item)) = guard.queued.pop_front() else { + return Ok(false); + }; + guard.running = Some((id, item.clone())); + has_pending_tasks = !guard.queued.is_empty(); + (id, item) + }; + + match item { + GcCompactionQueueItem::Manual(options) => { + if !options + .flags + .contains(CompactFlags::EnhancedGcBottomMostCompaction) + { + warn!("ignoring scheduled compaction task: scheduled task must be gc compaction: {:?}", options); + } else if options.sub_compaction { + info!("running scheduled enhanced gc bottom-most compaction with sub-compaction, splitting compaction jobs"); + let jobs: Vec = timeline + .gc_compaction_split_jobs( + GcCompactJob::from_compact_options(options.clone()), + options.sub_compaction_max_job_size_mb, + ) + .await + .map_err(CompactionError::Other)?; + if jobs.is_empty() { + info!("no jobs to run, skipping scheduled compaction task"); + self.notify_and_unblock(id); + } else { + let gc_guard = match gc_block.start().await { + Ok(guard) => guard, + Err(e) => { + return Err(CompactionError::Other(anyhow!( + "cannot run gc-compaction because gc is blocked: {}", + e + ))); + } + }; + + let jobs_len = jobs.len(); + let mut pending_tasks = Vec::new(); + for job in jobs { + // Unfortunately we need to convert the `GcCompactJob` back to `CompactionOptions` + // until we do further refactors to allow directly call `compact_with_gc`. + let mut flags: EnumSet = EnumSet::default(); + flags |= CompactFlags::EnhancedGcBottomMostCompaction; + if job.dry_run { + flags |= CompactFlags::DryRun; + } + let options = CompactOptions { + flags, + sub_compaction: false, + compact_key_range: Some(job.compact_key_range.into()), + compact_lsn_range: Some(job.compact_lsn_range.into()), + sub_compaction_max_job_size_mb: None, + }; + pending_tasks.push(GcCompactionQueueItem::SubCompactionJob(options)); + } + pending_tasks.push(GcCompactionQueueItem::Notify(id)); + { + let mut guard = self.inner.lock().unwrap(); + guard.gc_guards.insert(id, gc_guard); + for task in pending_tasks { + let id = guard.next_id(); + guard.queued.push_back((id, task)); + } + } + info!("scheduled enhanced gc bottom-most compaction with sub-compaction, split into {} jobs", jobs_len); + } + } else { + let gc_guard = match gc_block.start().await { + Ok(guard) => guard, + Err(e) => { + return Err(CompactionError::Other(anyhow!( + "cannot run gc-compaction because gc is blocked: {}", + e + ))); + } + }; + { + let mut guard = self.inner.lock().unwrap(); + guard.gc_guards.insert(id, gc_guard); + } + let _ = timeline + .compact_with_options(cancel, options, ctx) + .instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id)) + .await?; + self.notify_and_unblock(id); + } + } + GcCompactionQueueItem::SubCompactionJob(options) => { + let _ = timeline + .compact_with_options(cancel, options, ctx) + .instrument(info_span!("scheduled_compact_timeline", %timeline.timeline_id)) + .await?; + } + GcCompactionQueueItem::Notify(id) => { + self.notify_and_unblock(id); + } + GcCompactionQueueItem::UpdateL2Lsn(_) => { + unreachable!() + } + } + { + let mut guard = self.inner.lock().unwrap(); + guard.running = None; + } + Ok(has_pending_tasks) + } + + #[allow(clippy::type_complexity)] + pub fn remaining_jobs( + &self, + ) -> ( + Option<(GcCompactionJobId, GcCompactionQueueItem)>, + VecDeque<(GcCompactionJobId, GcCompactionQueueItem)>, + ) { + let guard = self.inner.lock().unwrap(); + (guard.running.clone(), guard.queued.clone()) + } + + #[allow(dead_code)] + pub fn remaining_jobs_num(&self) -> usize { + let guard = self.inner.lock().unwrap(); + guard.queued.len() + if guard.running.is_some() { 1 } else { 0 } + } } /// A job description for the gc-compaction job. This structure describes the rectangle range that the job will