From 58b6120fe2ac5b75d9a93e01e73051d739862309 Mon Sep 17 00:00:00 2001 From: Konstantin Knizhnik Date: Tue, 24 Dec 2024 09:55:14 +0200 Subject: [PATCH] Unmdo unrelated changes --- libs/pageserver_api/src/models.rs | 290 ++----------- pageserver/client/src/page_service.rs | 2 +- .../pagebench/src/cmd/getpage_latest_lsn.rs | 1 - pageserver/src/metrics.rs | 1 - pageserver/src/page_service.rs | 200 ++------- pgxn/neon/libpagestore.c | 5 +- pgxn/neon/pagestore_client.h | 88 ++-- pgxn/neon/pagestore_smgr.c | 383 +++++------------- 8 files changed, 223 insertions(+), 747 deletions(-) diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 3d9f8f2159c4..f3fc9fad760a 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -1460,17 +1460,13 @@ impl TryFrom for PagestreamBeMessageTag { // interface allows sending both LSNs, and let the pageserver do the right thing. There was no // difference in the responses between V1 and V2. // -#[derive(Debug, PartialEq, Eq, Clone, Copy)] +#[derive(Clone, Copy)] pub enum PagestreamProtocolVersion { V2, - V3, } -pub type RequestId = u64; - #[derive(Debug, PartialEq, Eq)] pub struct PagestreamExistsRequest { - pub reqid: RequestId, pub request_lsn: Lsn, pub not_modified_since: Lsn, pub rel: RelTag, @@ -1478,7 +1474,6 @@ pub struct PagestreamExistsRequest { #[derive(Debug, PartialEq, Eq)] pub struct PagestreamNblocksRequest { - pub reqid: RequestId, pub request_lsn: Lsn, pub not_modified_since: Lsn, pub rel: RelTag, @@ -1486,7 +1481,6 @@ pub struct PagestreamNblocksRequest { #[derive(Debug, PartialEq, Eq)] pub struct PagestreamGetPageRequest { - pub reqid: RequestId, pub request_lsn: Lsn, pub not_modified_since: Lsn, pub rel: RelTag, @@ -1495,7 +1489,6 @@ pub struct PagestreamGetPageRequest { #[derive(Debug, PartialEq, Eq)] pub struct PagestreamDbSizeRequest { - pub reqid: RequestId, pub request_lsn: Lsn, pub not_modified_since: Lsn, pub dbnode: u32, @@ -1503,7 +1496,6 @@ pub struct PagestreamDbSizeRequest { #[derive(Debug, PartialEq, Eq)] pub struct PagestreamGetSlruSegmentRequest { - pub reqid: RequestId, pub request_lsn: Lsn, pub not_modified_since: Lsn, pub kind: u8, @@ -1512,56 +1504,31 @@ pub struct PagestreamGetSlruSegmentRequest { #[derive(Debug)] pub struct PagestreamExistsResponse { - pub reqid: RequestId, - pub request_lsn: Lsn, - pub not_modified_since: Lsn, - pub rel: RelTag, pub exists: bool, } #[derive(Debug)] pub struct PagestreamNblocksResponse { - pub reqid: RequestId, - pub request_lsn: Lsn, - pub not_modified_since: Lsn, - pub rel: RelTag, pub n_blocks: u32, } #[derive(Debug)] pub struct PagestreamGetPageResponse { - pub reqid: RequestId, - pub request_lsn: Lsn, - pub not_modified_since: Lsn, - pub rel: RelTag, - pub blkno: u32, pub page: Bytes, } #[derive(Debug)] pub struct PagestreamGetSlruSegmentResponse { - pub reqid: RequestId, - pub request_lsn: Lsn, - pub not_modified_since: Lsn, - pub kind: u8, - pub segno: u32, pub segment: Bytes, } #[derive(Debug)] pub struct PagestreamErrorResponse { - pub reqid: RequestId, - pub request_lsn: Lsn, - pub not_modified_since: Lsn, pub message: String, } #[derive(Debug)] pub struct PagestreamDbSizeResponse { - pub reqid: RequestId, - pub request_lsn: Lsn, - pub not_modified_since: Lsn, - pub db_node: u32, pub db_size: i64, } @@ -1585,7 +1552,6 @@ impl PagestreamFeMessage { match self { Self::Exists(req) => { bytes.put_u8(0); - bytes.put_u64(req.reqid); bytes.put_u64(req.request_lsn.0); bytes.put_u64(req.not_modified_since.0); bytes.put_u32(req.rel.spcnode); @@ -1596,7 +1562,6 @@ impl PagestreamFeMessage { Self::Nblocks(req) => { bytes.put_u8(1); - bytes.put_u64(req.reqid); bytes.put_u64(req.request_lsn.0); bytes.put_u64(req.not_modified_since.0); bytes.put_u32(req.rel.spcnode); @@ -1607,7 +1572,6 @@ impl PagestreamFeMessage { Self::GetPage(req) => { bytes.put_u8(2); - bytes.put_u64(req.reqid); bytes.put_u64(req.request_lsn.0); bytes.put_u64(req.not_modified_since.0); bytes.put_u32(req.rel.spcnode); @@ -1619,7 +1583,6 @@ impl PagestreamFeMessage { Self::DbSize(req) => { bytes.put_u8(3); - bytes.put_u64(req.reqid); bytes.put_u64(req.request_lsn.0); bytes.put_u64(req.not_modified_since.0); bytes.put_u32(req.dbnode); @@ -1627,7 +1590,6 @@ impl PagestreamFeMessage { Self::GetSlruSegment(req) => { bytes.put_u8(4); - bytes.put_u64(req.reqid); bytes.put_u64(req.request_lsn.0); bytes.put_u64(req.not_modified_since.0); bytes.put_u8(req.kind); @@ -1638,31 +1600,19 @@ impl PagestreamFeMessage { bytes.into() } - pub fn parse( - body: &mut R, - protocol_version: PagestreamProtocolVersion, - ) -> anyhow::Result { + pub fn parse(body: &mut R) -> anyhow::Result { // these correspond to the NeonMessageTag enum in pagestore_client.h // // TODO: consider using protobuf or serde bincode for less error prone // serialization. let msg_tag = body.read_u8()?; - let (reqid, request_lsn, not_modified_since) = match protocol_version { - PagestreamProtocolVersion::V2 => ( - 0, - Lsn::from(body.read_u64::()?), - Lsn::from(body.read_u64::()?), - ), - PagestreamProtocolVersion::V3 => ( - body.read_u64::()?, - Lsn::from(body.read_u64::()?), - Lsn::from(body.read_u64::()?), - ), - }; + + // these two fields are the same for every request type + let request_lsn = Lsn::from(body.read_u64::()?); + let not_modified_since = Lsn::from(body.read_u64::()?); match msg_tag { 0 => Ok(PagestreamFeMessage::Exists(PagestreamExistsRequest { - reqid, request_lsn, not_modified_since, rel: RelTag { @@ -1673,7 +1623,6 @@ impl PagestreamFeMessage { }, })), 1 => Ok(PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { - reqid, request_lsn, not_modified_since, rel: RelTag { @@ -1684,7 +1633,6 @@ impl PagestreamFeMessage { }, })), 2 => Ok(PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - reqid, request_lsn, not_modified_since, rel: RelTag { @@ -1696,14 +1644,12 @@ impl PagestreamFeMessage { blkno: body.read_u32::()?, })), 3 => Ok(PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { - reqid, request_lsn, not_modified_since, dbnode: body.read_u32::()?, })), 4 => Ok(PagestreamFeMessage::GetSlruSegment( PagestreamGetSlruSegmentRequest { - reqid, request_lsn, not_modified_since, kind: body.read_u8()?, @@ -1716,114 +1662,43 @@ impl PagestreamFeMessage { } impl PagestreamBeMessage { - pub fn serialize(&self, protocol_version: PagestreamProtocolVersion) -> Bytes { + pub fn serialize(&self) -> Bytes { let mut bytes = BytesMut::new(); use PagestreamBeMessageTag as Tag; - match protocol_version { - PagestreamProtocolVersion::V2 => { - match self { - Self::Exists(resp) => { - bytes.put_u8(Tag::Exists as u8); - bytes.put_u8(resp.exists as u8); - } - - Self::Nblocks(resp) => { - bytes.put_u8(Tag::Nblocks as u8); - bytes.put_u32(resp.n_blocks); - } - - Self::GetPage(resp) => { - bytes.put_u8(Tag::GetPage as u8); - bytes.put(&resp.page[..]) - } - - Self::Error(resp) => { - bytes.put_u8(Tag::Error as u8); - bytes.put(resp.message.as_bytes()); - bytes.put_u8(0); // null terminator - } - Self::DbSize(resp) => { - bytes.put_u8(Tag::DbSize as u8); - bytes.put_i64(resp.db_size); - } - - Self::GetSlruSegment(resp) => { - bytes.put_u8(Tag::GetSlruSegment as u8); - bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); - bytes.put(&resp.segment[..]); - } - } + match self { + Self::Exists(resp) => { + bytes.put_u8(Tag::Exists as u8); + bytes.put_u8(resp.exists as u8); } - _ => { - match self { - Self::Exists(resp) => { - bytes.put_u8(Tag::Exists as u8); - bytes.put_u64(resp.reqid); - bytes.put_u64(resp.request_lsn.0); - bytes.put_u64(resp.not_modified_since.0); - bytes.put_u32(resp.rel.spcnode); - bytes.put_u32(resp.rel.dbnode); - bytes.put_u32(resp.rel.relnode); - bytes.put_u8(resp.rel.forknum); - bytes.put_u8(resp.exists as u8); - } - - Self::Nblocks(resp) => { - bytes.put_u8(Tag::Nblocks as u8); - bytes.put_u64(resp.reqid); - bytes.put_u64(resp.request_lsn.0); - bytes.put_u64(resp.not_modified_since.0); - bytes.put_u32(resp.rel.spcnode); - bytes.put_u32(resp.rel.dbnode); - bytes.put_u32(resp.rel.relnode); - bytes.put_u8(resp.rel.forknum); - bytes.put_u32(resp.n_blocks); - } - - Self::GetPage(resp) => { - bytes.put_u8(Tag::GetPage as u8); - bytes.put_u64(resp.reqid); - bytes.put_u64(resp.request_lsn.0); - bytes.put_u64(resp.not_modified_since.0); - bytes.put_u32(resp.rel.spcnode); - bytes.put_u32(resp.rel.dbnode); - bytes.put_u32(resp.rel.relnode); - bytes.put_u8(resp.rel.forknum); - bytes.put_u32(resp.blkno); - bytes.put(&resp.page[..]) - } - - Self::Error(resp) => { - bytes.put_u8(Tag::Error as u8); - bytes.put_u64(resp.reqid); - bytes.put_u64(resp.request_lsn.0); - bytes.put_u64(resp.not_modified_since.0); - bytes.put(resp.message.as_bytes()); - bytes.put_u8(0); // null terminator - } - Self::DbSize(resp) => { - bytes.put_u8(Tag::DbSize as u8); - bytes.put_u64(resp.reqid); - bytes.put_u64(resp.request_lsn.0); - bytes.put_u64(resp.not_modified_since.0); - bytes.put_u32(resp.db_node); - bytes.put_i64(resp.db_size); - } - - Self::GetSlruSegment(resp) => { - bytes.put_u8(Tag::GetSlruSegment as u8); - bytes.put_u64(resp.reqid); - bytes.put_u64(resp.request_lsn.0); - bytes.put_u64(resp.not_modified_since.0); - bytes.put_u8(resp.kind); - bytes.put_u32(resp.segno); - bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); - bytes.put(&resp.segment[..]); - } - } + + Self::Nblocks(resp) => { + bytes.put_u8(Tag::Nblocks as u8); + bytes.put_u32(resp.n_blocks); + } + + Self::GetPage(resp) => { + bytes.put_u8(Tag::GetPage as u8); + bytes.put(&resp.page[..]); + } + + Self::Error(resp) => { + bytes.put_u8(Tag::Error as u8); + bytes.put(resp.message.as_bytes()); + bytes.put_u8(0); // null terminator + } + Self::DbSize(resp) => { + bytes.put_u8(Tag::DbSize as u8); + bytes.put_i64(resp.db_size); + } + + Self::GetSlruSegment(resp) => { + bytes.put_u8(Tag::GetSlruSegment as u8); + bytes.put_u32((resp.segment.len() / BLCKSZ as usize) as u32); + bytes.put(&resp.segment[..]); } } + bytes.into() } @@ -1835,109 +1710,38 @@ impl PagestreamBeMessage { let ok = match Tag::try_from(msg_tag).map_err(|tag: u8| anyhow::anyhow!("invalid tag {tag}"))? { Tag::Exists => { - let reqid = buf.read_u64::()?; - let request_lsn = Lsn(buf.read_u64::()?); - let not_modified_since = Lsn(buf.read_u64::()?); - let rel = RelTag { - spcnode: buf.read_u32::()?, - dbnode: buf.read_u32::()?, - relnode: buf.read_u32::()?, - forknum: buf.read_u8()?, - }; - let exists = buf.read_u8()? != 0; + let exists = buf.read_u8()?; Self::Exists(PagestreamExistsResponse { - reqid, - request_lsn, - not_modified_since, - rel, - exists, + exists: exists != 0, }) } Tag::Nblocks => { - let reqid = buf.read_u64::()?; - let request_lsn = Lsn(buf.read_u64::()?); - let not_modified_since = Lsn(buf.read_u64::()?); - let rel = RelTag { - spcnode: buf.read_u32::()?, - dbnode: buf.read_u32::()?, - relnode: buf.read_u32::()?, - forknum: buf.read_u8()?, - }; let n_blocks = buf.read_u32::()?; - Self::Nblocks(PagestreamNblocksResponse { - reqid, - request_lsn, - not_modified_since, - rel, - n_blocks, - }) + Self::Nblocks(PagestreamNblocksResponse { n_blocks }) } Tag::GetPage => { - let reqid = buf.read_u64::()?; - let request_lsn = Lsn(buf.read_u64::()?); - let not_modified_since = Lsn(buf.read_u64::()?); - let rel = RelTag { - spcnode: buf.read_u32::()?, - dbnode: buf.read_u32::()?, - relnode: buf.read_u32::()?, - forknum: buf.read_u8()?, - }; - let blkno = buf.read_u32::()?; let mut page = vec![0; 8192]; // TODO: use MaybeUninit buf.read_exact(&mut page)?; - Self::GetPage(PagestreamGetPageResponse { - reqid, - request_lsn, - not_modified_since, - rel, - blkno, - page: page.into(), - }) + PagestreamBeMessage::GetPage(PagestreamGetPageResponse { page: page.into() }) } Tag::Error => { - let reqid = buf.read_u64::()?; - let request_lsn = Lsn(buf.read_u64::()?); - let not_modified_since = Lsn(buf.read_u64::()?); let mut msg = Vec::new(); buf.read_until(0, &mut msg)?; let cstring = std::ffi::CString::from_vec_with_nul(msg)?; let rust_str = cstring.to_str()?; - Self::Error(PagestreamErrorResponse { - reqid, - request_lsn, - not_modified_since, + PagestreamBeMessage::Error(PagestreamErrorResponse { message: rust_str.to_owned(), }) } Tag::DbSize => { - let reqid = buf.read_u64::()?; - let request_lsn = Lsn(buf.read_u64::()?); - let not_modified_since = Lsn(buf.read_u64::()?); - let db_node = buf.read_u32::()?; let db_size = buf.read_i64::()?; - Self::DbSize(PagestreamDbSizeResponse { - reqid, - request_lsn, - not_modified_since, - db_node, - db_size, - }) + Self::DbSize(PagestreamDbSizeResponse { db_size }) } Tag::GetSlruSegment => { - let reqid = buf.read_u64::()?; - let request_lsn = Lsn(buf.read_u64::()?); - let not_modified_since = Lsn(buf.read_u64::()?); - let kind = buf.read_u8()?; - let segno = buf.read_u32::()?; let n_blocks = buf.read_u32::()?; let mut segment = vec![0; n_blocks as usize * BLCKSZ as usize]; buf.read_exact(&mut segment)?; Self::GetSlruSegment(PagestreamGetSlruSegmentResponse { - reqid, - request_lsn, - not_modified_since, - kind, - segno, segment: segment.into(), }) } @@ -1976,7 +1780,6 @@ mod tests { // Test serialization/deserialization of PagestreamFeMessage let messages = vec![ PagestreamFeMessage::Exists(PagestreamExistsRequest { - reqid: 0, request_lsn: Lsn(4), not_modified_since: Lsn(3), rel: RelTag { @@ -1987,7 +1790,6 @@ mod tests { }, }), PagestreamFeMessage::Nblocks(PagestreamNblocksRequest { - reqid: 0, request_lsn: Lsn(4), not_modified_since: Lsn(4), rel: RelTag { @@ -1998,7 +1800,6 @@ mod tests { }, }), PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - reqid: 0, request_lsn: Lsn(4), not_modified_since: Lsn(3), rel: RelTag { @@ -2010,7 +1811,6 @@ mod tests { blkno: 7, }), PagestreamFeMessage::DbSize(PagestreamDbSizeRequest { - reqid: 0, request_lsn: Lsn(4), not_modified_since: Lsn(3), dbnode: 7, @@ -2018,9 +1818,7 @@ mod tests { ]; for msg in messages { let bytes = msg.serialize(); - let reconstructed = - PagestreamFeMessage::parse(&mut bytes.reader(), PagestreamProtocolVersion::V3) - .unwrap(); + let reconstructed = PagestreamFeMessage::parse(&mut bytes.reader()).unwrap(); assert!(msg == reconstructed); } } diff --git a/pageserver/client/src/page_service.rs b/pageserver/client/src/page_service.rs index 207ec4166cb8..f9507fc47a3a 100644 --- a/pageserver/client/src/page_service.rs +++ b/pageserver/client/src/page_service.rs @@ -60,7 +60,7 @@ impl Client { ) -> anyhow::Result { let copy_both: tokio_postgres::CopyBothDuplex = self .client - .copy_both_simple(&format!("pagestream_v3 {tenant_id} {timeline_id}")) + .copy_both_simple(&format!("pagestream_v2 {tenant_id} {timeline_id}")) .await?; let Client { cancel_on_client_drop, diff --git a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs index 910708c04a80..b2df01714d31 100644 --- a/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs +++ b/pageserver/pagebench/src/cmd/getpage_latest_lsn.rs @@ -322,7 +322,6 @@ async fn main_impl( .to_rel_block() .expect("we filter non-rel-block keys out above"); PagestreamGetPageRequest { - reqid: 0, request_lsn: if rng.gen_bool(args.req_latest_probability) { Lsn::MAX } else { diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 9a3bd7f931e0..bdbabf3f7511 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1845,7 +1845,6 @@ pub(crate) static LIVE_CONNECTIONS: Lazy = Lazy::new(|| { #[derive(Clone, Copy, enum_map::Enum, IntoStaticStr)] pub(crate) enum ComputeCommandKind { - PageStreamV3, PageStreamV2, Basebackup, Fullbackup, diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 2a393c335c0f..d00ec11a7611 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -17,7 +17,7 @@ use pageserver_api::models::{ PagestreamErrorResponse, PagestreamExistsRequest, PagestreamExistsResponse, PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetSlruSegmentRequest, PagestreamGetSlruSegmentResponse, PagestreamNblocksRequest, PagestreamNblocksResponse, - PagestreamProtocolVersion, RequestId, + PagestreamProtocolVersion, }; use pageserver_api::shard::TenantShardId; use postgres_backend::{ @@ -537,29 +537,6 @@ impl From for QueryError { } } -#[derive(thiserror::Error, Debug)] -struct BatchedPageStreamError { - err: PageStreamError, - reqid: RequestId, - request_lsn: Lsn, - not_modified_since: Lsn, -} - -impl std::fmt::Display for BatchedPageStreamError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.err.fmt(f) - } -} - -struct BatchedGetPageRequest { - rel: RelTag, - blkno: BlockNumber, - reqid: RequestId, - request_lsn: Lsn, - not_modified_since: Lsn, - timer: SmgrOpTimer, -} - enum BatchedFeMessage { Exists { span: Span, @@ -577,7 +554,7 @@ enum BatchedFeMessage { span: Span, shard: timeline::handle::Handle, effective_request_lsn: Lsn, - pages: smallvec::SmallVec<[BatchedGetPageRequest; 1]>, + pages: smallvec::SmallVec<[(RelTag, BlockNumber, SmgrOpTimer); 1]>, }, DbSize { span: Span, @@ -593,7 +570,7 @@ enum BatchedFeMessage { }, RespondError { span: Span, - error: BatchedPageStreamError, + error: PageStreamError, }, } @@ -618,7 +595,7 @@ impl BatchedFeMessage { BatchedFeMessage::GetPage { shard, pages, .. } => ( shard, pages.len(), - itertools::Either::Right(pages.iter_mut().map(|p| &mut p.timer)), + itertools::Either::Right(pages.iter_mut().map(|(_, _, timer)| timer)), ), BatchedFeMessage::RespondError { .. } => return Ok(()), }; @@ -677,7 +654,6 @@ impl PageServerHandler { ) } - #[allow(clippy::too_many_arguments)] async fn pagestream_read_message( pgb: &mut PostgresBackendReader, tenant_id: TenantId, @@ -685,7 +661,6 @@ impl PageServerHandler { timeline_handles: &mut TimelineHandles, cancel: &CancellationToken, ctx: &RequestContext, - protocol_version: PagestreamProtocolVersion, parent_span: Span, ) -> Result, QueryError> where @@ -720,8 +695,7 @@ impl PageServerHandler { fail::fail_point!("ps::handle-pagerequest-message"); // parse request - let neon_fe_msg = - PagestreamFeMessage::parse(&mut copy_data_bytes.reader(), protocol_version)?; + let neon_fe_msg = PagestreamFeMessage::parse(&mut copy_data_bytes.reader())?; let batched_msg = match neon_fe_msg { PagestreamFeMessage::Exists(req) => { @@ -789,7 +763,6 @@ impl PageServerHandler { } } PagestreamFeMessage::GetPage(PagestreamGetPageRequest { - reqid, request_lsn, not_modified_since, rel, @@ -801,12 +774,7 @@ impl PageServerHandler { ($error:expr) => {{ let error = BatchedFeMessage::RespondError { span, - error: BatchedPageStreamError { - err: $error, - reqid, - request_lsn, - not_modified_since, - }, + error: $error, }; Ok(Some(error)) }}; @@ -863,14 +831,7 @@ impl PageServerHandler { span, shard, effective_request_lsn, - pages: smallvec::smallvec![BatchedGetPageRequest { - rel, - blkno, - reqid, - request_lsn, - not_modified_since, - timer - }], + pages: smallvec::smallvec![(rel, blkno, timer)], } } }; @@ -949,7 +910,6 @@ impl PageServerHandler { pgb_writer: &mut PostgresBackend, batch: BatchedFeMessage, cancel: &CancellationToken, - protocol_version: PagestreamProtocolVersion, ctx: &RequestContext, ) -> Result<(), QueryError> where @@ -957,7 +917,7 @@ impl PageServerHandler { { // invoke handler function let (handler_results, span): ( - Vec>, + Vec>, _, ) = match batch { BatchedFeMessage::Exists { @@ -972,13 +932,7 @@ impl PageServerHandler { .handle_get_rel_exists_request(&shard, &req, ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer)) - .map_err(|err| BatchedPageStreamError { - err, - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - })], + .map(|msg| (msg, timer))], span, ) } @@ -994,13 +948,7 @@ impl PageServerHandler { .handle_get_nblocks_request(&shard, &req, ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer)) - .map_err(|err| BatchedPageStreamError { - err, - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - })], + .map(|msg| (msg, timer))], span, ) } @@ -1042,13 +990,7 @@ impl PageServerHandler { .handle_db_size_request(&shard, &req, ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer)) - .map_err(|err| BatchedPageStreamError { - err, - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - })], + .map(|msg| (msg, timer))], span, ) } @@ -1064,13 +1006,7 @@ impl PageServerHandler { .handle_get_slru_segment_request(&shard, &req, ctx) .instrument(span.clone()) .await - .map(|msg| (msg, timer)) - .map_err(|err| BatchedPageStreamError { - err, - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - })], + .map(|msg| (msg, timer))], span, ) } @@ -1086,7 +1022,7 @@ impl PageServerHandler { // Other handler errors are sent back as an error message and we stay in pagestream protocol. for handler_result in handler_results { let (response_msg, timer) = match handler_result { - Err(e) => match &e.err { + Err(e) => match &e { PageStreamError::Shutdown => { // If we fail to fulfil a request during shutdown, which may be _because_ of // shutdown, then do not send the error to the client. Instead just drop the @@ -1105,16 +1041,13 @@ impl PageServerHandler { // print the all details to the log with {:#}, but for the client the // error message is enough. Do not log if shutting down, as the anyhow::Error // here includes cancellation which is not an error. - let full = utils::error::report_compact_sources(&e.err); + let full = utils::error::report_compact_sources(&e); span.in_scope(|| { error!("error reading relation or page version: {full:#}") }); ( PagestreamBeMessage::Error(PagestreamErrorResponse { - reqid: e.reqid, - request_lsn: e.request_lsn, - not_modified_since: e.not_modified_since, - message: e.err.to_string(), + message: e.to_string(), }), None, // TODO: measure errors ) @@ -1127,9 +1060,7 @@ impl PageServerHandler { // marshal & transmit response message // - pgb_writer.write_message_noflush(&BeMessage::CopyData( - &response_msg.serialize(protocol_version), - ))?; + pgb_writer.write_message_noflush(&BeMessage::CopyData(&response_msg.serialize()))?; // We purposefully don't count flush time into the timer. // @@ -1192,7 +1123,7 @@ impl PageServerHandler { pgb: &mut PostgresBackend, tenant_id: TenantId, timeline_id: TimelineId, - protocol_version: PagestreamProtocolVersion, + _protocol_version: PagestreamProtocolVersion, ctx: RequestContext, ) -> Result<(), QueryError> where @@ -1232,7 +1163,6 @@ impl PageServerHandler { timeline_handles, request_span, pipelining_config, - protocol_version, &ctx, ) .await @@ -1245,7 +1175,6 @@ impl PageServerHandler { timeline_id, timeline_handles, request_span, - protocol_version, &ctx, ) .await @@ -1272,7 +1201,6 @@ impl PageServerHandler { timeline_id: TimelineId, mut timeline_handles: TimelineHandles, request_span: Span, - protocol_version: PagestreamProtocolVersion, ctx: &RequestContext, ) -> ( (PostgresBackendReader, TimelineHandles), @@ -1290,7 +1218,6 @@ impl PageServerHandler { &mut timeline_handles, &cancel, ctx, - protocol_version, request_span.clone(), ) .await; @@ -1311,7 +1238,7 @@ impl PageServerHandler { } let err = self - .pagesteam_handle_batched_message(pgb_writer, msg, &cancel, protocol_version, ctx) + .pagesteam_handle_batched_message(pgb_writer, msg, &cancel, ctx) .await; match err { Ok(()) => {} @@ -1334,7 +1261,6 @@ impl PageServerHandler { mut timeline_handles: TimelineHandles, request_span: Span, pipelining_config: PageServicePipeliningConfigPipelined, - protocol_version: PagestreamProtocolVersion, ctx: &RequestContext, ) -> ( (PostgresBackendReader, TimelineHandles), @@ -1432,7 +1358,6 @@ impl PageServerHandler { &mut timeline_handles, &cancel_batcher, &ctx, - protocol_version, request_span.clone(), ) .await; @@ -1478,14 +1403,8 @@ impl PageServerHandler { batch .throttle_and_record_start_processing(&self.cancel) .await?; - self.pagesteam_handle_batched_message( - pgb_writer, - batch, - &cancel, - protocol_version, - &ctx, - ) - .await?; + self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx) + .await?; } } }); @@ -1671,10 +1590,6 @@ impl PageServerHandler { .await?; Ok(PagestreamBeMessage::Exists(PagestreamExistsResponse { - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - rel: req.rel, exists, })) } @@ -1701,10 +1616,6 @@ impl PageServerHandler { .await?; Ok(PagestreamBeMessage::Nblocks(PagestreamNblocksResponse { - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - rel: req.rel, n_blocks, })) } @@ -1732,10 +1643,6 @@ impl PageServerHandler { let db_size = total_blocks as i64 * BLCKSZ as i64; Ok(PagestreamBeMessage::DbSize(PagestreamDbSizeResponse { - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - db_node: req.dbnode, db_size, })) } @@ -1745,9 +1652,9 @@ impl PageServerHandler { &mut self, timeline: &Timeline, effective_lsn: Lsn, - requests: smallvec::SmallVec<[BatchedGetPageRequest; 1]>, + requests: smallvec::SmallVec<[(RelTag, BlockNumber, SmgrOpTimer); 1]>, ctx: &RequestContext, - ) -> Vec> { + ) -> Vec> { debug_assert_current_span_has_tenant_and_timeline_id(); timeline @@ -1756,7 +1663,7 @@ impl PageServerHandler { let results = timeline .get_rel_page_at_lsn_batched( - requests.iter().map(|p| (&p.rel, &p.blkno)), + requests.iter().map(|(reltag, blkno, _)| (reltag, blkno)), effective_lsn, ctx, ) @@ -1768,26 +1675,16 @@ impl PageServerHandler { requests .into_iter() .zip(results.into_iter()) - .map(|(req, res)| { + .map(|((_, _, timer), res)| { res.map(|page| { ( PagestreamBeMessage::GetPage(models::PagestreamGetPageResponse { - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - rel: req.rel, - blkno: req.blkno, page, }), - req.timer, + timer, ) }) - .map_err(|e| BatchedPageStreamError { - err: PageStreamError::from(e), - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - }) + .map_err(PageStreamError::from) }), ) } @@ -1814,14 +1711,7 @@ impl PageServerHandler { let segment = timeline.get_slru_segment(kind, req.segno, lsn, ctx).await?; Ok(PagestreamBeMessage::GetSlruSegment( - PagestreamGetSlruSegmentResponse { - reqid: req.reqid, - request_lsn: req.request_lsn, - not_modified_since: req.not_modified_since, - kind: req.kind, - segno: req.segno, - segment, - }, + PagestreamGetSlruSegmentResponse { segment }, )) } @@ -2016,7 +1906,6 @@ struct FullBackupCmd { struct PageStreamCmd { tenant_id: TenantId, timeline_id: TimelineId, - protocol_version: PagestreamProtocolVersion, } /// `lease lsn tenant timeline lsn` @@ -2037,7 +1926,7 @@ enum PageServiceCmd { } impl PageStreamCmd { - fn parse(query: &str, protocol_version: PagestreamProtocolVersion) -> anyhow::Result { + fn parse(query: &str) -> anyhow::Result { let parameters = query.split_whitespace().collect_vec(); if parameters.len() != 2 { bail!( @@ -2052,7 +1941,6 @@ impl PageStreamCmd { Ok(Self { tenant_id, timeline_id, - protocol_version, }) } } @@ -2190,14 +2078,7 @@ impl PageServiceCmd { bail!("cannot parse query: {query}") }; match cmd.to_ascii_lowercase().as_str() { - "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse( - other, - PagestreamProtocolVersion::V2, - )?)), - "pagestream_v3" => Ok(Self::PageStream(PageStreamCmd::parse( - other, - PagestreamProtocolVersion::V3, - )?)), + "pagestream_v2" => Ok(Self::PageStream(PageStreamCmd::parse(other)?)), "basebackup" => Ok(Self::BaseBackup(BaseBackupCmd::parse(other)?)), "fullbackup" => Ok(Self::FullBackup(FullBackupCmd::parse(other)?)), "lease" => { @@ -2279,21 +2160,25 @@ where PageServiceCmd::PageStream(PageStreamCmd { tenant_id, timeline_id, - protocol_version, }) => { tracing::Span::current() .record("tenant_id", field::display(tenant_id)) .record("timeline_id", field::display(timeline_id)); self.check_permission(Some(tenant_id))?; - let command_kind = match protocol_version { - PagestreamProtocolVersion::V2 => ComputeCommandKind::PageStreamV2, - PagestreamProtocolVersion::V3 => ComputeCommandKind::PageStreamV3, - }; - COMPUTE_COMMANDS_COUNTERS.for_command(command_kind).inc(); - self.handle_pagerequests(pgb, tenant_id, timeline_id, protocol_version, ctx) - .await?; + COMPUTE_COMMANDS_COUNTERS + .for_command(ComputeCommandKind::PageStreamV2) + .inc(); + + self.handle_pagerequests( + pgb, + tenant_id, + timeline_id, + PagestreamProtocolVersion::V2, + ctx, + ) + .await?; } PageServiceCmd::BaseBackup(BaseBackupCmd { tenant_id, @@ -2472,8 +2357,7 @@ mod tests { cmd, PageServiceCmd::PageStream(PageStreamCmd { tenant_id, - timeline_id, - protocol_version: PagestreamProtocolVersion::V2, + timeline_id }) ); let cmd = PageServiceCmd::parse(&format!("basebackup {tenant_id} {timeline_id}")).unwrap(); diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index c21f07b1d7d7..88d0a5292bf7 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -556,9 +556,6 @@ pageserver_connect(shardno_t shard_no, int elevel) switch (neon_protocol_version) { - case 3: - pagestream_query = psprintf("pagestream_v3 %s %s", neon_tenant, neon_timeline); - break; case 2: pagestream_query = psprintf("pagestream_v2 %s %s", neon_tenant, neon_timeline); break; @@ -1140,7 +1137,7 @@ pg_init_libpagestore(void) &neon_protocol_version, 2, /* use protocol version 2 */ 2, /* min */ - 3, /* max */ + 2, /* max */ PGC_SU_BACKEND, 0, /* no flags required */ NULL, NULL, NULL); diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index de1a22b690d3..f905e3b0faa3 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -44,15 +44,10 @@ typedef enum T_NeonGetSlruSegmentResponse, } NeonMessageTag; -typedef uint64 NeonRequestId; - /* base struct for c-style inheritance */ typedef struct { - NeonMessageTag tag; - NeonRequestId reqid; - XLogRecPtr lsn; - XLogRecPtr not_modified_since; + NeonMessageTag tag; } NeonMessage; #define messageTag(m) (((const NeonMessage *)(m))->tag) @@ -72,7 +67,6 @@ typedef enum { SLRU_MULTIXACT_OFFSETS } SlruKind; - /*-- * supertype of all the Neon*Request structs below. * @@ -94,94 +88,92 @@ typedef enum { * These structs describe the V2 of these requests. (The old now-defunct V1 * protocol contained just one LSN and a boolean 'latest' flag.) */ -typedef NeonMessage NeonRequest; +typedef struct +{ + NeonMessageTag tag; + XLogRecPtr lsn; + XLogRecPtr not_modified_since; +} NeonRequest; typedef struct { - NeonRequest req; - NRelFileInfo rinfo; - ForkNumber forknum; + NeonRequest req; + NRelFileInfo rinfo; + ForkNumber forknum; } NeonExistsRequest; typedef struct { - NeonRequest req; - NRelFileInfo rinfo; - ForkNumber forknum; + NeonRequest req; + NRelFileInfo rinfo; + ForkNumber forknum; } NeonNblocksRequest; typedef struct { - NeonRequest req; - Oid dbNode; + NeonRequest req; + Oid dbNode; } NeonDbSizeRequest; typedef struct { - NeonRequest req; - NRelFileInfo rinfo; - ForkNumber forknum; - BlockNumber blkno; + NeonRequest req; + NRelFileInfo rinfo; + ForkNumber forknum; + BlockNumber blkno; } NeonGetPageRequest; typedef struct { - NeonRequest req; - SlruKind kind; - int segno; + NeonRequest req; + SlruKind kind; + int segno; } NeonGetSlruSegmentRequest; /* supertype of all the Neon*Response structs below */ -typedef NeonMessage NeonResponse; +typedef struct +{ + NeonMessageTag tag; +} NeonResponse; typedef struct { - NeonResponse resp; - NRelFileInfo rinfo; - ForkNumber forknum; - bool exists; + NeonMessageTag tag; + bool exists; } NeonExistsResponse; typedef struct { - NeonResponse resp; - NRelFileInfo rinfo; - ForkNumber forknum; - uint32 n_blocks; + NeonMessageTag tag; + uint32 n_blocks; } NeonNblocksResponse; typedef struct { - NeonResponse resp; - NRelFileInfo rinfo; - ForkNumber forknum; - BlockNumber blkno; - char page[FLEXIBLE_ARRAY_MEMBER]; + NeonMessageTag tag; + char page[FLEXIBLE_ARRAY_MEMBER]; } NeonGetPageResponse; #define PS_GETPAGERESPONSE_SIZE (MAXALIGN(offsetof(NeonGetPageResponse, page) + BLCKSZ)) typedef struct { - NeonResponse resp; - Oid dbNode; - int64 db_size; + NeonMessageTag tag; + int64 db_size; } NeonDbSizeResponse; typedef struct { - NeonResponse resp; - char message[FLEXIBLE_ARRAY_MEMBER]; /* null-terminated error - * message */ + NeonMessageTag tag; + char message[FLEXIBLE_ARRAY_MEMBER]; /* null-terminated error + * message */ } NeonErrorResponse; typedef struct { - NeonResponse resp; - SlruKind kind; - int segno; - int n_blocks; - char data[BLCKSZ * SLRU_PAGES_PER_SEGMENT]; + NeonMessageTag tag; + int n_blocks; + char data[BLCKSZ * SLRU_PAGES_PER_SEGMENT]; } NeonGetSlruSegmentResponse; diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index ec13dd72e0b8..385905d9cee9 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -120,9 +120,6 @@ static bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block static BlockNumber neon_nblocks(SMgrRelation reln, ForkNumber forknum); -static uint32 local_request_counter; -#define GENERATE_REQUEST_ID() (((NeonRequestId)MyProcPid << 32) | ++local_request_counter) - /* * Prefetch implementation: * @@ -191,11 +188,15 @@ typedef struct PrefetchRequest uint8 status; /* see PrefetchStatus for valid values */ uint8 flags; /* see PrefetchRequestFlags */ neon_request_lsns request_lsns; - NeonRequestId reqid; NeonResponse *response; /* may be null */ uint64 my_ring_index; } PrefetchRequest; +StaticAssertDecl(sizeof(PrefetchRequest) == 64, + "We prefer to have a power-of-2 size for this struct. Please" + " try to find an alternative solution before reaching to" + " increase the expected size here"); + /* prefetch buffer lookup hash table */ typedef struct PrfHashEntry @@ -364,7 +365,6 @@ compact_prefetch_buffers(void) target_slot->shard_no = source_slot->shard_no; target_slot->status = source_slot->status; target_slot->response = source_slot->response; - target_slot->reqid = source_slot->reqid; target_slot->request_lsns = source_slot->request_lsns; target_slot->my_ring_index = empty_ring_index; @@ -797,7 +797,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns NeonGetPageRequest request = { .req.tag = T_NeonGetPageRequest, - .req.reqid = GENERATE_REQUEST_ID(), /* lsn and not_modified_since are filled in below */ .rinfo = BufTagGetNRelFileInfo(slot->buftag), .forknum = slot->buftag.forkNum, @@ -806,8 +805,6 @@ prefetch_do_request(PrefetchRequest *slot, neon_request_lsns *force_request_lsns Assert(mySlotNo == MyPState->ring_unused); - slot->reqid = request.req.reqid; - if (force_request_lsns) slot->request_lsns = *force_request_lsns; else @@ -1180,10 +1177,6 @@ nm_pack_request(NeonRequest *msg) initStringInfo(&s); pq_sendbyte(&s, msg->tag); - if (neon_protocol_version >= 3) - { - pq_sendint64(&s, msg->reqid); - } pq_sendint64(&s, msg->lsn); pq_sendint64(&s, msg->not_modified_since); @@ -1261,16 +1254,8 @@ NeonResponse * nm_unpack_response(StringInfo s) { NeonMessageTag tag = pq_getmsgbyte(s); - NeonResponse resp_hdr = {0}; /* make valgrind happy */ NeonResponse *resp = NULL; - resp_hdr.tag = tag; - if (neon_protocol_version >= 3) - { - resp_hdr.reqid = pq_getmsgint64(s); - resp_hdr.lsn = pq_getmsgint64(s); - resp_hdr.not_modified_since = pq_getmsgint64(s); - } switch (tag) { /* pagestore -> pagestore_client */ @@ -1278,14 +1263,7 @@ nm_unpack_response(StringInfo s) { NeonExistsResponse *msg_resp = palloc0(sizeof(NeonExistsResponse)); - if (neon_protocol_version >= 3) - { - NInfoGetSpcOid(msg_resp->rinfo) = pq_getmsgint(s, 4); - NInfoGetDbOid(msg_resp->rinfo) = pq_getmsgint(s, 4); - NInfoGetRelNumber(msg_resp->rinfo) = pq_getmsgint(s, 4); - msg_resp->forknum = pq_getmsgbyte(s); - } - msg_resp->resp = resp_hdr; + msg_resp->tag = tag; msg_resp->exists = pq_getmsgbyte(s); pq_getmsgend(s); @@ -1297,14 +1275,7 @@ nm_unpack_response(StringInfo s) { NeonNblocksResponse *msg_resp = palloc0(sizeof(NeonNblocksResponse)); - if (neon_protocol_version >= 3) - { - NInfoGetSpcOid(msg_resp->rinfo) = pq_getmsgint(s, 4); - NInfoGetDbOid(msg_resp->rinfo) = pq_getmsgint(s, 4); - NInfoGetRelNumber(msg_resp->rinfo) = pq_getmsgint(s, 4); - msg_resp->forknum = pq_getmsgbyte(s); - } - msg_resp->resp = resp_hdr; + msg_resp->tag = tag; msg_resp->n_blocks = pq_getmsgint(s, 4); pq_getmsgend(s); @@ -1317,20 +1288,12 @@ nm_unpack_response(StringInfo s) NeonGetPageResponse *msg_resp; msg_resp = MemoryContextAllocZero(MyPState->bufctx, PS_GETPAGERESPONSE_SIZE); - if (neon_protocol_version >= 3) - { - NInfoGetSpcOid(msg_resp->rinfo) = pq_getmsgint(s, 4); - NInfoGetDbOid(msg_resp->rinfo) = pq_getmsgint(s, 4); - NInfoGetRelNumber(msg_resp->rinfo) = pq_getmsgint(s, 4); - msg_resp->forknum = pq_getmsgbyte(s); - msg_resp->blkno = pq_getmsgint(s, 4); - } - msg_resp->resp = resp_hdr; + msg_resp->tag = tag; /* XXX: should be varlena */ memcpy(msg_resp->page, pq_getmsgbytes(s, BLCKSZ), BLCKSZ); pq_getmsgend(s); - Assert(msg_resp->resp.tag == T_NeonGetPageResponse); + Assert(msg_resp->tag == T_NeonGetPageResponse); resp = (NeonResponse *) msg_resp; break; @@ -1340,11 +1303,7 @@ nm_unpack_response(StringInfo s) { NeonDbSizeResponse *msg_resp = palloc0(sizeof(NeonDbSizeResponse)); - if (neon_protocol_version >= 3) - { - msg_resp->dbNode = pq_getmsgint(s, 4); - } - msg_resp->resp = resp_hdr; + msg_resp->tag = tag; msg_resp->db_size = pq_getmsgint64(s); pq_getmsgend(s); @@ -1362,7 +1321,7 @@ nm_unpack_response(StringInfo s) msglen = strlen(msgtext); msg_resp = palloc0(sizeof(NeonErrorResponse) + msglen + 1); - msg_resp->resp = resp_hdr; + msg_resp->tag = tag; memcpy(msg_resp->message, msgtext, msglen + 1); pq_getmsgend(s); @@ -1373,17 +1332,9 @@ nm_unpack_response(StringInfo s) case T_NeonGetSlruSegmentResponse: { NeonGetSlruSegmentResponse *msg_resp; - int n_blocks; + int n_blocks = pq_getmsgint(s, 4); msg_resp = palloc(sizeof(NeonGetSlruSegmentResponse)); - - if (neon_protocol_version >= 3) - { - msg_resp->kind = pq_getmsgbyte(s); - msg_resp->segno = pq_getmsgint(s, 4); - } - msg_resp->resp = resp_hdr; - - n_blocks = pq_getmsgint(s, 4); + msg_resp->tag = tag; msg_resp->n_blocks = n_blocks; memcpy(msg_resp->data, pq_getmsgbytes(s, n_blocks * BLCKSZ), n_blocks * BLCKSZ); pq_getmsgend(s); @@ -2355,7 +2306,6 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum) { NeonExistsRequest request = { .req.tag = T_NeonExistsRequest, - .req.reqid = GENERATE_REQUEST_ID(), .req.lsn = request_lsns.request_lsn, .req.not_modified_since = request_lsns.not_modified_since, .rinfo = InfoFromSMgrRel(reln), @@ -2363,59 +2313,31 @@ neon_exists(SMgrRelation reln, ForkNumber forkNum) }; resp = page_server_request(&request); + } - switch (resp->tag) - { - case T_NeonExistsResponse: - { - NeonExistsResponse* exists_resp = (NeonExistsResponse *) resp; - if (neon_protocol_version >= 3) - { - if (resp->reqid != request.req.reqid || - resp->lsn != request.req.lsn || - resp->not_modified_since != request.req.not_modified_since || - !RelFileInfoEquals(exists_resp->rinfo, request.rinfo) || - exists_resp->forknum != forkNum) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to exits request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(exists_resp->rinfo), exists_resp->forknum, - request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since), RelFileInfoFmt(request.rinfo), forkNum); - } - } - exists = exists_resp->exists; - break; - } - case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (resp->reqid != request.req.reqid || - resp->lsn != request.req.lsn || - resp->not_modified_since != request.req.not_modified_since) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match exists request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since)); - } - } - ereport(ERROR, - (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[reqid %lx] could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X", - resp->reqid, - RelFileInfoFmt(InfoFromSMgrRel(reln)), - forkNum, - LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), - errdetail("page server returned error: %s", - ((NeonErrorResponse *) resp)->message))); - break; + switch (resp->tag) + { + case T_NeonExistsResponse: + exists = ((NeonExistsResponse *) resp)->exists; + break; - default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Expected Exists (0x%02x) or Error (0x%02x) response to ExistsRequest, but got 0x%02x", - T_NeonExistsResponse, T_NeonErrorResponse, resp->tag); - } - pfree(resp); + case T_NeonErrorResponse: + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg(NEON_TAG "could not read relation existence of rel %u/%u/%u.%u from page server at lsn %X/%08X", + RelFileInfoFmt(InfoFromSMgrRel(reln)), + forkNum, + LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), + errdetail("page server returned error: %s", + ((NeonErrorResponse *) resp)->message))); + break; + + default: + NEON_PANIC_CONNECTION_STATE(-1, PANIC, + "Expected Exists (0x%02x) or Error (0x%02x) response to ExistsRequest, but got 0x%02x", + T_NeonExistsResponse, T_NeonErrorResponse, resp->tag); } + pfree(resp); return exists; } @@ -3023,43 +2945,15 @@ neon_read_at_lsnv(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber base_block switch (resp->tag) { case T_NeonGetPageResponse: - { - NeonGetPageResponse* getpage_resp = (NeonGetPageResponse *) resp; - if (neon_protocol_version >= 3) - { - if (resp->reqid != slot->reqid || - resp->lsn != slot->request_lsns.request_lsn || - resp->not_modified_since != slot->request_lsns.not_modified_since || - !RelFileInfoEquals(getpage_resp->rinfo, rinfo) || - getpage_resp->forknum != forkNum || - getpage_resp->blkno != base_blockno + i) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u} to get page request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u, block=%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(getpage_resp->rinfo), getpage_resp->forknum, getpage_resp->blkno, - slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since), RelFileInfoFmt(rinfo), forkNum, base_blockno + i); - } - } - memcpy(buffer, getpage_resp->page, BLCKSZ); + memcpy(buffer, ((NeonGetPageResponse *) resp)->page, BLCKSZ); lfc_write(rinfo, forkNum, blockno, buffer); break; - } + case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (resp->reqid != slot->reqid || - resp->lsn != slot->request_lsns.request_lsn || - resp->not_modified_since != slot->request_lsns.not_modified_since) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - slot->reqid, LSN_FORMAT_ARGS(slot->request_lsns.request_lsn), LSN_FORMAT_ARGS(slot->request_lsns.not_modified_since)); - } - } ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[shard %d, reqid %lx] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", - slot->shard_no, resp->reqid, blockno, RelFileInfoFmt(rinfo), + errmsg(NEON_TAG "[shard %d] could not read block %u in rel %u/%u/%u.%u from page server at lsn %X/%08X", + slot->shard_no, blockno, RelFileInfoFmt(rinfo), forkNum, LSN_FORMAT_ARGS(reqlsns->effective_request_lsn)), errdetail("page server returned error: %s", ((NeonErrorResponse *) resp)->message))); @@ -3543,7 +3437,6 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) { NeonNblocksRequest request = { .req.tag = T_NeonNblocksRequest, - .req.reqid = GENERATE_REQUEST_ID(), .req.lsn = request_lsns.request_lsn, .req.not_modified_since = request_lsns.not_modified_since, .rinfo = InfoFromSMgrRel(reln), @@ -3551,67 +3444,39 @@ neon_nblocks(SMgrRelation reln, ForkNumber forknum) }; resp = page_server_request(&request); + } - switch (resp->tag) - { - case T_NeonNblocksResponse: - { - NeonNblocksResponse * relsize_resp = (NeonNblocksResponse *) resp; - if (neon_protocol_version >= 3) - { - if (resp->reqid != request.req.reqid || - resp->lsn != request.req.lsn || - resp->not_modified_since != request.req.not_modified_since || - !RelFileInfoEquals(relsize_resp->rinfo, request.rinfo) || - relsize_resp->forknum != forknum) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u} to get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, rel=%u/%u/%u.%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), RelFileInfoFmt(relsize_resp->rinfo), relsize_resp->forknum, - request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since), RelFileInfoFmt(request.rinfo), forknum); - } - } - n_blocks = relsize_resp->n_blocks; - break; - } - case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (resp->reqid != request.req.reqid || - resp->lsn != request.req.lsn || - resp->not_modified_since != request.req.not_modified_since) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get relsize request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since)); - } - } - ereport(ERROR, - (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[reqid %lx] could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X", - resp->reqid, - RelFileInfoFmt(InfoFromSMgrRel(reln)), - forknum, - LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), - errdetail("page server returned error: %s", - ((NeonErrorResponse *) resp)->message))); - break; - - default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Expected Nblocks (0x%02x) or Error (0x%02x) response to NblocksRequest, but got 0x%02x", - T_NeonNblocksResponse, T_NeonErrorResponse, resp->tag); - } - update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks); + switch (resp->tag) + { + case T_NeonNblocksResponse: + n_blocks = ((NeonNblocksResponse *) resp)->n_blocks; + break; - neon_log(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks", - RelFileInfoFmt(InfoFromSMgrRel(reln)), - forknum, - LSN_FORMAT_ARGS(request_lsns.effective_request_lsn), - n_blocks); + case T_NeonErrorResponse: + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg(NEON_TAG "could not read relation size of rel %u/%u/%u.%u from page server at lsn %X/%08X", + RelFileInfoFmt(InfoFromSMgrRel(reln)), + forknum, + LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), + errdetail("page server returned error: %s", + ((NeonErrorResponse *) resp)->message))); + break; - pfree(resp); + default: + NEON_PANIC_CONNECTION_STATE(-1, PANIC, + "Expected Nblocks (0x%02x) or Error (0x%02x) response to NblocksRequest, but got 0x%02x", + T_NeonNblocksResponse, T_NeonErrorResponse, resp->tag); } + update_cached_relsize(InfoFromSMgrRel(reln), forknum, n_blocks); + + neon_log(SmgrTrace, "neon_nblocks: rel %u/%u/%u fork %u (request LSN %X/%08X): %u blocks", + RelFileInfoFmt(InfoFromSMgrRel(reln)), + forknum, + LSN_FORMAT_ARGS(request_lsns.effective_request_lsn), + n_blocks); + + pfree(resp); return n_blocks; } @@ -3632,67 +3497,39 @@ neon_dbsize(Oid dbNode) { NeonDbSizeRequest request = { .req.tag = T_NeonDbSizeRequest, - .req.reqid = GENERATE_REQUEST_ID(), .req.lsn = request_lsns.request_lsn, .req.not_modified_since = request_lsns.not_modified_since, .dbNode = dbNode, }; resp = page_server_request(&request); + } - switch (resp->tag) - { - case T_NeonDbSizeResponse: - { - NeonDbSizeResponse* dbsize_resp = (NeonDbSizeResponse *) resp; - if (neon_protocol_version >= 3) - { - if (resp->reqid != request.req.reqid || - resp->lsn != request.req.lsn || - resp->not_modified_since != request.req.not_modified_since || - dbsize_resp->dbNode != dbNode) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u} to get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, dbNode=%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), dbsize_resp->dbNode, - request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since), dbNode); - } - } - db_size = dbsize_resp->db_size; - break; - } - case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (resp->reqid != request.req.reqid || - resp->lsn != request.req.lsn || - resp->not_modified_since != request.req.not_modified_since) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get DB size request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since)); - } - } - ereport(ERROR, - (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[reqid %lx] could not read db size of db %u from page server at lsn %X/%08X", - resp->reqid, - dbNode, LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), - errdetail("page server returned error: %s", - ((NeonErrorResponse *) resp)->message))); - break; - - default: - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Expected DbSize (0x%02x) or Error (0x%02x) response to DbSizeRequest, but got 0x%02x", - T_NeonDbSizeResponse, T_NeonErrorResponse, resp->tag); - } + switch (resp->tag) + { + case T_NeonDbSizeResponse: + db_size = ((NeonDbSizeResponse *) resp)->db_size; + break; - neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes", - dbNode, LSN_FORMAT_ARGS(request_lsns.effective_request_lsn), db_size); + case T_NeonErrorResponse: + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg(NEON_TAG "could not read db size of db %u from page server at lsn %X/%08X", + dbNode, LSN_FORMAT_ARGS(request_lsns.effective_request_lsn)), + errdetail("page server returned error: %s", + ((NeonErrorResponse *) resp)->message))); + break; - pfree(resp); + default: + NEON_PANIC_CONNECTION_STATE(-1, PANIC, + "Expected DbSize (0x%02x) or Error (0x%02x) response to DbSizeRequest, but got 0x%02x", + T_NeonDbSizeResponse, T_NeonErrorResponse, resp->tag); } + + neon_log(SmgrTrace, "neon_dbsize: db %u (request LSN %X/%08X): %ld bytes", + dbNode, LSN_FORMAT_ARGS(request_lsns.effective_request_lsn), db_size); + + pfree(resp); return db_size; } @@ -4025,7 +3862,6 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf request = (NeonGetSlruSegmentRequest) { .req.tag = T_NeonGetSlruSegmentRequest, - .req.reqid = GENERATE_REQUEST_ID(), .req.lsn = request_lsn, .req.not_modified_since = not_modified_since, .kind = kind, @@ -4044,42 +3880,14 @@ neon_read_slru_segment(SMgrRelation reln, const char* path, int segno, void* buf switch (resp->tag) { case T_NeonGetSlruSegmentResponse: - { - NeonGetSlruSegmentResponse* slru_resp = (NeonGetSlruSegmentResponse *) resp; - if (neon_protocol_version >= 3) - { - if (resp->reqid != request.req.reqid || - resp->lsn != request.req.lsn || - resp->not_modified_since != request.req.not_modified_since || - slru_resp->kind != kind || - slru_resp->segno != segno) - { - NEON_PANIC_CONNECTION_STATE(-1, PANIC, - "Unexpect response {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u} to get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X, kind=%u, segno=%u}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), slru_resp->kind, slru_resp->segno, - request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since), kind, segno); - } - } - n_blocks = slru_resp->n_blocks; - memcpy(buffer, slru_resp->data, n_blocks*BLCKSZ); + n_blocks = ((NeonGetSlruSegmentResponse *) resp)->n_blocks; + memcpy(buffer, ((NeonGetSlruSegmentResponse *) resp)->data, n_blocks*BLCKSZ); break; - } + case T_NeonErrorResponse: - if (neon_protocol_version >= 3) - { - if (resp->reqid != request.req.reqid || - resp->lsn != request.req.lsn || - resp->not_modified_since != request.req.not_modified_since) - { - elog(WARNING, NEON_TAG "Error message {reqid=%lx,lsn=%X/%08X, since=%X/%08X} doesn't match get SLRU segment request {reqid=%lx,lsn=%X/%08X, since=%X/%08X}", - resp->reqid, LSN_FORMAT_ARGS(resp->lsn), LSN_FORMAT_ARGS(resp->not_modified_since), - request.req.reqid, LSN_FORMAT_ARGS(request.req.lsn), LSN_FORMAT_ARGS(request.req.not_modified_since)); - } - } ereport(ERROR, (errcode(ERRCODE_IO_ERROR), - errmsg(NEON_TAG "[reqid %lx] could not read SLRU %d segment %d at lsn %X/%08X", - resp->reqid, + errmsg(NEON_TAG "could not read SLRU %d segment %d at lsn %X/%08X", kind, segno, LSN_FORMAT_ARGS(request_lsn)), @@ -4220,7 +4028,6 @@ neon_extend_rel_size(NRelFileInfo rinfo, ForkNumber forknum, BlockNumber blkno, NeonNblocksRequest request = { .req = (NeonRequest) { .tag = T_NeonNblocksRequest, - .reqid = GENERATE_REQUEST_ID(), .lsn = end_recptr, .not_modified_since = end_recptr, },