From e52919c7656ac598b9aa63d1ad70e8dff34f6685 Mon Sep 17 00:00:00 2001
From: Andrey Borodin <amborodin@acm.org>
Date: Wed, 8 Jan 2025 16:39:20 +0500
Subject: [PATCH v2] Compress big WAL records

This approach replaces FPI compression
---
 contrib/pg_walinspect/pg_walinspect.c         |   6 -
 src/backend/access/rmgrdesc/xlogdesc.c        |  44 +-
 src/backend/access/transam/xlog.c             |  19 +-
 src/backend/access/transam/xloginsert.c       | 388 +++++++++---------
 src/backend/access/transam/xlogreader.c       | 248 ++++++-----
 src/backend/utils/misc/guc_tables.c           |  11 +
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/include/access/xlog.h                     |   1 +
 src/include/access/xloginsert.h               |   1 +
 src/include/access/xlogreader.h               |   6 +
 src/include/access/xlogrecord.h               |  37 +-
 src/include/pg_config_manual.h                |   2 +-
 .../recovery/t/026_overwrite_contrecord.pl    |   2 +-
 src/test/recovery/t/039_end_of_wal.pl         |   2 +-
 14 files changed, 401 insertions(+), 367 deletions(-)

diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c
index 9e60941578..511bd2978a 100644
--- a/contrib/pg_walinspect/pg_walinspect.c
+++ b/contrib/pg_walinspect/pg_walinspect.c
@@ -311,12 +311,6 @@ GetWALBlockInfo(FunctionCallInfo fcinfo, XLogReaderState *record,
 				flags[cnt++] = CStringGetTextDatum("HAS_HOLE");
 			if (blk->apply_image)
 				flags[cnt++] = CStringGetTextDatum("APPLY");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_PGLZ");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_LZ4");
-			if ((blk->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-				flags[cnt++] = CStringGetTextDatum("COMPRESS_ZSTD");
 
 			Assert(cnt <= bitcnt);
 			block_fpi_info = construct_array_builtin(flags, cnt, TEXTOID);
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index 58040f2865..55dcfee59d 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -267,46 +267,18 @@ XLogRecGetBlockRefInfo(XLogReaderState *record, bool pretty,
 
 			if (XLogRecHasBlockImage(record, block_id))
 			{
-				uint8		bimg_info = XLogRecGetBlock(record, block_id)->bimg_info;
-
 				/* Calculate the amount of FPI data in the record. */
 				if (fpi_len)
 					*fpi_len += XLogRecGetBlock(record, block_id)->bimg_len;
 
-				if (BKPIMAGE_COMPRESSED(bimg_info))
-				{
-					const char *method;
-
-					if ((bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-						method = "pglz";
-					else if ((bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-						method = "lz4";
-					else if ((bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-						method = "zstd";
-					else
-						method = "unknown";
-
-					appendStringInfo(buf,
-									 " (FPW%s); hole: offset: %u, length: %u, "
-									 "compression saved: %u, method: %s",
-									 XLogRecBlockImageApply(record, block_id) ?
-									 "" : " for WAL verification",
-									 XLogRecGetBlock(record, block_id)->hole_offset,
-									 XLogRecGetBlock(record, block_id)->hole_length,
-									 BLCKSZ -
-									 XLogRecGetBlock(record, block_id)->hole_length -
-									 XLogRecGetBlock(record, block_id)->bimg_len,
-									 method);
-				}
-				else
-				{
-					appendStringInfo(buf,
-									 " (FPW%s); hole: offset: %u, length: %u",
-									 XLogRecBlockImageApply(record, block_id) ?
-									 "" : " for WAL verification",
-									 XLogRecGetBlock(record, block_id)->hole_offset,
-									 XLogRecGetBlock(record, block_id)->hole_length);
-				}
+				
+				appendStringInfo(buf,
+								 " (FPW%s); hole: offset: %u, length: %u",
+								 XLogRecBlockImageApply(record, block_id) ?
+								 "" : " for WAL verification",
+								 XLogRecGetBlock(record, block_id)->hole_offset,
+								 XLogRecGetBlock(record, block_id)->hole_length);
+				
 			}
 
 			if (pretty)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index bf3dbda901..b54b4260ff 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -135,6 +135,7 @@ int			wal_retrieve_retry_interval = 5000;
 int			max_slot_wal_keep_size_mb = -1;
 int			wal_decode_buffer_size = 512 * 1024;
 bool		track_wal_io_timing = false;
+int			wal_compression_threshold = 512;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -715,6 +716,22 @@ static void WALInsertLockAcquireExclusive(void);
 static void WALInsertLockRelease(void);
 static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt);
 
+static uint32 XLogGetRecordTotalLen(XLogRecord *record);
+
+
+/* Read length of a record, accounting for possible compression */
+static uint32
+XLogGetRecordTotalLen(XLogRecord *record)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+		Assert(((int32_t)c->decompressed_length) > 0);
+		return c->decompressed_length;
+	}
+	return record->xl_tot_len;
+}
+
 /*
  * Insert an XLOG record represented by an already-constructed chain of data
  * chunks.  This is a low-level routine; to construct the WAL record header
@@ -1031,7 +1048,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* We also need temporary space to decode the record. */
 		record = (XLogRecord *) recordBuf.data;
 		decoded = (DecodedXLogRecord *)
-			palloc(DecodeXLogRecordRequiredSpace(record->xl_tot_len));
+			palloc(DecodeXLogRecordRequiredSpace(XLogGetRecordTotalLen(record)));
 
 		if (!debug_reader)
 			debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index efed097092..6ae7e4015a 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -40,27 +40,6 @@
 #include "storage/proc.h"
 #include "utils/memutils.h"
 
-/*
- * Guess the maximum buffer size required to store a compressed version of
- * backup block image.
- */
-#ifdef USE_LZ4
-#define	LZ4_MAX_BLCKSZ		LZ4_COMPRESSBOUND(BLCKSZ)
-#else
-#define LZ4_MAX_BLCKSZ		0
-#endif
-
-#ifdef USE_ZSTD
-#define ZSTD_MAX_BLCKSZ		ZSTD_COMPRESSBOUND(BLCKSZ)
-#else
-#define ZSTD_MAX_BLCKSZ		0
-#endif
-
-#define PGLZ_MAX_BLCKSZ		PGLZ_MAX_OUTPUT(BLCKSZ)
-
-/* Buffer size required to store a compressed version of backup block image */
-#define COMPRESS_BUFSIZE	Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
-
 /*
  * For each block reference registered with XLogRegisterBuffer, we fill in
  * a registered_buffer struct.
@@ -81,9 +60,6 @@ typedef struct
 
 	XLogRecData bkp_rdatas[2];	/* temporary rdatas used to hold references to
 								 * backup block data in XLogRecordAssemble() */
-
-	/* buffer to store a compressed version of backup block image */
-	char		compressed_page[COMPRESS_BUFSIZE];
 } registered_buffer;
 
 static registered_buffer *registered_buffers;
@@ -113,6 +89,16 @@ static uint8 curinsert_flags = 0;
 static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
+/*
+ * Compression buffers are kep in StringInfor for a prototype
+ * compression_buffer_current_size == -1 means we entered critical section
+ * before could prepare compression buffers.
+ */
+static int32			compression_buffer_current_size;
+static XLogRecData		compressed_rdt_hdr;
+static StringInfo		data_before_compression = NULL;
+static StringInfo		compressed_data = NULL;
+
 #define SizeOfXlogOrigin	(sizeof(RepOriginId) + sizeof(char))
 #define SizeOfXLogTransactionId	(sizeof(TransactionId) + sizeof(char))
 
@@ -137,9 +123,7 @@ static MemoryContext xloginsert_cxt;
 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
 									   XLogRecPtr RedoRecPtr, bool doPageWrites,
 									   XLogRecPtr *fpw_lsn, int *num_fpi,
-									   bool *topxid_included);
-static bool XLogCompressBackupBlock(const char *page, uint16 hole_offset,
-									uint16 hole_length, char *dest, uint16 *dlen);
+									   bool *topxid_included, uint64 *rec_size);
 
 /*
  * Begin constructing a WAL record. This must be called before the
@@ -160,6 +144,11 @@ XLogBeginInsert(void)
 		elog(ERROR, "XLogBeginInsert was already called");
 
 	begininsert_called = true;
+
+	if (data_before_compression)
+		resetStringInfo(data_before_compression);
+	if (compressed_data)
+		resetStringInfo(compressed_data);
 }
 
 /*
@@ -231,6 +220,7 @@ XLogResetInsertion(void)
 	mainrdata_len = 0;
 	mainrdata_last = (XLogRecData *) &mainrdata_head;
 	curinsert_flags = 0;
+	compression_buffer_current_size = 0;
 	begininsert_called = false;
 }
 
@@ -299,6 +289,8 @@ XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
 #endif
 
 	regbuf->in_use = true;
+
+	XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ);
 }
 
 /*
@@ -352,6 +344,7 @@ XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum,
 #endif
 
 	regbuf->in_use = true;
+	XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ);
 }
 
 /*
@@ -386,6 +379,7 @@ XLogRegisterData(const char *data, uint32 len)
 	mainrdata_last = rdata;
 
 	mainrdata_len += len;
+	XLogEnsureCompressionBuffer(len);
 }
 
 /*
@@ -440,6 +434,7 @@ XLogRegisterBufData(uint8 block_id, const char *data, uint32 len)
 	regbuf->rdata_tail->next = rdata;
 	regbuf->rdata_tail = rdata;
 	regbuf->rdata_len += len;
+	XLogEnsureCompressionBuffer(len);
 }
 
 /*
@@ -459,6 +454,144 @@ XLogSetRecordFlags(uint8 flags)
 	curinsert_flags |= flags;
 }
 
+/*
+ * Make sure we have buffers needed for compression.
+ * We cannot do it during WALInsert(), because we will be in a critial section.
+ */
+void XLogEnsureCompressionBuffer(uint32 extraLen)
+{
+	uint64 compressed_buffer_size;
+	uint64 desired_buffer_size;
+
+	if (wal_compression == WAL_COMPRESSION_NONE)
+		return;
+
+	if (CritSectionCount > 0 || compression_buffer_current_size == -1)
+	{
+		/* We cannot prepare buffer during critical section, so bail out early */
+		compression_buffer_current_size = -1;
+		return;
+	}
+
+	compression_buffer_current_size += extraLen;
+	desired_buffer_size = compression_buffer_current_size + SizeOfXLogRecord;
+	Assert(data_before_compression->len == 0);
+	enlargeStringInfo(data_before_compression, desired_buffer_size);
+
+	compressed_buffer_size = PGLZ_MAX_OUTPUT(desired_buffer_size);
+
+#ifdef USE_LZ4
+	compressed_buffer_size = Max(compressed_buffer_size, LZ4_COMPRESSBOUND(desired_buffer_size));
+#endif
+#ifdef USE_ZSTD
+	compressed_buffer_size = Max(compressed_buffer_size, ZSTD_COMPRESSBOUND(desired_buffer_size));
+#endif
+	compressed_buffer_size = compressed_buffer_size + SizeOfXLogCompressedRecord;
+	Assert(compressed_data->len == 0);
+	enlargeStringInfo(compressed_data, compressed_buffer_size);
+}
+
+/* Compress assembled record on top of compression buffers */
+static XLogRecData*
+XLogCompressRdt(XLogRecData *rdt)
+{
+	XLogCompressionHeader *compressed_header;
+	XLogRecord *src_header;
+	uint32 orig_len;
+	int32 compr_len = -1;
+
+	if (compression_buffer_current_size == -1)
+		return NULL;
+
+	Assert(wal_compression != WAL_COMPRESSION_NONE);
+	
+	Assert(compression_buffer_current_size <= data_before_compression->maxlen);
+
+	/* Build the whole record */
+	for (; rdt != NULL; rdt = rdt->next)
+		appendBinaryStringInfoNT(data_before_compression, rdt->data, rdt->len);
+
+	src_header = (XLogRecord*) data_before_compression->data;
+	compressed_header = (XLogCompressionHeader*) compressed_data->data;
+
+	compressed_header->record_header = *src_header;
+	compressed_header->decompressed_length = data_before_compression->len;
+
+	orig_len = src_header->xl_tot_len - SizeOfXLogRecord;
+
+	switch ((WalCompression) wal_compression)
+	{
+		case WAL_COMPRESSION_PGLZ:
+			compressed_header->method = XLR_COMPRESS_PGLZ;
+			compr_len = pglz_compress((char*)&src_header[1], orig_len, (char*)&compressed_header[1], PGLZ_strategy_default);
+			if (compr_len == -1)
+				return NULL;
+			break;
+
+		case WAL_COMPRESSION_LZ4:
+#ifdef USE_LZ4
+			compressed_header->method = XLR_COMPRESS_LZ4;
+			compr_len = LZ4_compress_default((char*)&src_header[1], (char*)&compressed_header[1], orig_len,
+									   compressed_data->maxlen);
+			if (compr_len <= 0)
+				return NULL;
+#else
+			elog(ERROR, "LZ4 is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_ZSTD:
+#ifdef USE_ZSTD
+			compressed_header->method = XLR_COMPRESS_ZSTD;
+			compr_len = ZSTD_compress((char*)&compressed_header[1], compressed_data->maxlen, (char*)&src_header[1], orig_len,
+								ZSTD_CLEVEL_DEFAULT);
+			if (ZSTD_isError(compr_len))
+				return NULL;
+#else
+			elog(ERROR, "zstd is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_NONE:
+			Assert(false);		/* cannot happen */
+			return NULL;
+			break;
+			/* no default case, so that compiler will warn */
+	}
+
+	Assert(compr_len > 0);
+
+	compressed_header->record_header.xl_tot_len = SizeOfXLogCompressedRecord + compr_len;
+
+	compressed_header->record_header.xl_info = compressed_header->record_header.xl_info | XLR_COMPRESSED;
+
+	compressed_rdt_hdr.data = compressed_data->data;
+	compressed_rdt_hdr.len = compressed_header->record_header.xl_tot_len;
+	compressed_rdt_hdr.next = NULL;
+
+	return &compressed_rdt_hdr;
+}
+
+/* Checksum assebled record, possibly compressed */
+static void XLogChecksumRecord(XLogRecData *rdt)
+{
+	pg_crc32c	rdata_crc;
+	XLogRecord *rechdr = (XLogRecord*) rdt->data;
+	/*
+	 * Calculate CRC of the data
+	 *
+	 * Note that the record header isn't added into the CRC initially since we
+	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
+	 * the whole record in the order: rdata, then backup blocks, then record
+	 * header.
+	 */
+	INIT_CRC32C(rdata_crc);
+	COMP_CRC32C(rdata_crc, rdt->data + SizeOfXLogRecord, rdt->len - SizeOfXLogRecord);
+	for (rdt = rdt->next; rdt != NULL; rdt = rdt->next)
+		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
+	rechdr->xl_crc = rdata_crc;
+}
+
 /*
  * Insert an XLOG record having the specified RMID and info bytes, with the
  * body of the record being the data and buffer references registered earlier
@@ -509,6 +642,8 @@ XLogInsert(RmgrId rmid, uint8 info)
 		XLogRecPtr	fpw_lsn;
 		XLogRecData *rdt;
 		int			num_fpi = 0;
+		uint64		rec_size;
+
 
 		/*
 		 * Get values needed to decide whether to do full-page writes. Since
@@ -518,7 +653,17 @@ XLogInsert(RmgrId rmid, uint8 info)
 		GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
 
 		rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
-								 &fpw_lsn, &num_fpi, &topxid_included);
+								 &fpw_lsn, &num_fpi, &topxid_included,
+								 &rec_size);
+
+		if (rec_size > wal_compression_threshold && wal_compression != WAL_COMPRESSION_NONE)
+		{
+			XLogRecData *rdt_compressed = XLogCompressRdt(rdt);
+			if (rdt_compressed != NULL)
+				rdt = rdt_compressed;
+		}
+
+		XLogChecksumRecord(rdt);
 
 		EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
 								  topxid_included);
@@ -547,12 +692,11 @@ XLogInsert(RmgrId rmid, uint8 info)
 static XLogRecData *
 XLogRecordAssemble(RmgrId rmid, uint8 info,
 				   XLogRecPtr RedoRecPtr, bool doPageWrites,
-				   XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included)
+				   XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included,
+				   uint64 *rec_size)
 {
-	XLogRecData *rdt;
 	uint64		total_len = 0;
 	int			block_id;
-	pg_crc32c	rdata_crc;
 	registered_buffer *prev_regbuf = NULL;
 	XLogRecData *rdt_datas_last;
 	XLogRecord *rechdr;
@@ -593,9 +737,8 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		bool		needs_data;
 		XLogRecordBlockHeader bkpb;
 		XLogRecordBlockImageHeader bimg;
-		XLogRecordBlockCompressHeader cbimg = {0};
+		uint32		hole_length;
 		bool		samerel;
-		bool		is_compressed = false;
 		bool		include_image;
 
 		if (!regbuf->in_use)
@@ -649,7 +792,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		if (include_image)
 		{
 			const char *page = regbuf->page;
-			uint16		compressed_len = 0;
 
 			/*
 			 * The page needs to be backed up, so calculate its hole length
@@ -666,32 +808,20 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					upper <= BLCKSZ)
 				{
 					bimg.hole_offset = lower;
-					cbimg.hole_length = upper - lower;
+					hole_length = upper - lower;
 				}
 				else
 				{
 					/* No "hole" to remove */
 					bimg.hole_offset = 0;
-					cbimg.hole_length = 0;
+					hole_length = 0;
 				}
 			}
 			else
 			{
 				/* Not a standard page header, don't try to eliminate "hole" */
 				bimg.hole_offset = 0;
-				cbimg.hole_length = 0;
-			}
-
-			/*
-			 * Try to compress a block image if wal_compression is enabled
-			 */
-			if (wal_compression != WAL_COMPRESSION_NONE)
-			{
-				is_compressed =
-					XLogCompressBackupBlock(page, bimg.hole_offset,
-											cbimg.hole_length,
-											regbuf->compressed_page,
-											&compressed_len);
+				hole_length = 0;
 			}
 
 			/*
@@ -709,7 +839,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			rdt_datas_last->next = &regbuf->bkp_rdatas[0];
 			rdt_datas_last = rdt_datas_last->next;
 
-			bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
+			bimg.bimg_info = (hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
 
 			/*
 			 * If WAL consistency checking is enabled for the resource manager
@@ -720,48 +850,10 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			if (needs_backup)
 				bimg.bimg_info |= BKPIMAGE_APPLY;
 
-			if (is_compressed)
-			{
-				/* The current compression is stored in the WAL record */
-				bimg.length = compressed_len;
-
-				/* Set the compression method used for this block */
-				switch ((WalCompression) wal_compression)
-				{
-					case WAL_COMPRESSION_PGLZ:
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
-						break;
-
-					case WAL_COMPRESSION_LZ4:
-#ifdef USE_LZ4
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
-#else
-						elog(ERROR, "LZ4 is not supported by this build");
-#endif
-						break;
-
-					case WAL_COMPRESSION_ZSTD:
-#ifdef USE_ZSTD
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
-#else
-						elog(ERROR, "zstd is not supported by this build");
-#endif
-						break;
-
-					case WAL_COMPRESSION_NONE:
-						Assert(false);	/* cannot happen */
-						break;
-						/* no default case, so that compiler will warn */
-				}
-
-				rdt_datas_last->data = regbuf->compressed_page;
-				rdt_datas_last->len = compressed_len;
-			}
-			else
 			{
-				bimg.length = BLCKSZ - cbimg.hole_length;
+				bimg.length = BLCKSZ - hole_length;
 
-				if (cbimg.hole_length == 0)
+				if (hole_length == 0)
 				{
 					rdt_datas_last->data = page;
 					rdt_datas_last->len = BLCKSZ;
@@ -776,9 +868,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					rdt_datas_last = rdt_datas_last->next;
 
 					rdt_datas_last->data =
-						page + (bimg.hole_offset + cbimg.hole_length);
+						page + (bimg.hole_offset + hole_length);
 					rdt_datas_last->len =
-						BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
+						BLCKSZ - (bimg.hole_offset + hole_length);
 				}
 			}
 
@@ -821,12 +913,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		{
 			memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
 			scratch += SizeOfXLogRecordBlockImageHeader;
-			if (cbimg.hole_length != 0 && is_compressed)
-			{
-				memcpy(scratch, &cbimg,
-					   SizeOfXLogRecordBlockCompressHeader);
-				scratch += SizeOfXLogRecordBlockCompressHeader;
-			}
 		}
 		if (!samerel)
 		{
@@ -892,19 +978,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	hdr_rdt.len = (scratch - hdr_scratch);
 	total_len += hdr_rdt.len;
 
-	/*
-	 * Calculate CRC of the data
-	 *
-	 * Note that the record header isn't added into the CRC initially since we
-	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
-	 * the whole record in the order: rdata, then backup blocks, then record
-	 * header.
-	 */
-	INIT_CRC32C(rdata_crc);
-	COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
-	for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
-		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
-
 	/*
 	 * Ensure that the XLogRecord is not too large.
 	 *
@@ -928,92 +1001,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	rechdr->xl_info = info;
 	rechdr->xl_rmid = rmid;
 	rechdr->xl_prev = InvalidXLogRecPtr;
-	rechdr->xl_crc = rdata_crc;
+	rechdr->xl_crc = 0;
 
-	return &hdr_rdt;
-}
+	*rec_size = rechdr->xl_tot_len;
 
-/*
- * Create a compressed version of a backup block image.
- *
- * Returns false if compression fails (i.e., compressed result is actually
- * bigger than original). Otherwise, returns true and sets 'dlen' to
- * the length of compressed block image.
- */
-static bool
-XLogCompressBackupBlock(const char *page, uint16 hole_offset, uint16 hole_length,
-						char *dest, uint16 *dlen)
-{
-	int32		orig_len = BLCKSZ - hole_length;
-	int32		len = -1;
-	int32		extra_bytes = 0;
-	const char *source;
-	PGAlignedBlock tmp;
-
-	if (hole_length != 0)
-	{
-		/* must skip the hole */
-		memcpy(tmp.data, page, hole_offset);
-		memcpy(tmp.data + hole_offset,
-			   page + (hole_offset + hole_length),
-			   BLCKSZ - (hole_length + hole_offset));
-		source = tmp.data;
-
-		/*
-		 * Extra data needs to be stored in WAL record for the compressed
-		 * version of block image if the hole exists.
-		 */
-		extra_bytes = SizeOfXLogRecordBlockCompressHeader;
-	}
-	else
-		source = page;
-
-	switch ((WalCompression) wal_compression)
-	{
-		case WAL_COMPRESSION_PGLZ:
-			len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
-			break;
-
-		case WAL_COMPRESSION_LZ4:
-#ifdef USE_LZ4
-			len = LZ4_compress_default(source, dest, orig_len,
-									   COMPRESS_BUFSIZE);
-			if (len <= 0)
-				len = -1;		/* failure */
-#else
-			elog(ERROR, "LZ4 is not supported by this build");
-#endif
-			break;
-
-		case WAL_COMPRESSION_ZSTD:
-#ifdef USE_ZSTD
-			len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
-								ZSTD_CLEVEL_DEFAULT);
-			if (ZSTD_isError(len))
-				len = -1;		/* failure */
-#else
-			elog(ERROR, "zstd is not supported by this build");
-#endif
-			break;
-
-		case WAL_COMPRESSION_NONE:
-			Assert(false);		/* cannot happen */
-			break;
-			/* no default case, so that compiler will warn */
-	}
-
-	/*
-	 * We recheck the actual size even if compression reports success and see
-	 * if the number of bytes saved by compression is larger than the length
-	 * of extra data needed for the compressed version of block image.
-	 */
-	if (len >= 0 &&
-		len + extra_bytes < orig_len)
-	{
-		*dlen = (uint16) len;	/* successful compression */
-		return true;
-	}
-	return false;
+	return &hdr_rdt;
 }
 
 /*
@@ -1389,4 +1381,10 @@ InitXLogInsert(void)
 	if (hdr_scratch == NULL)
 		hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
 											 HEADER_SCRATCH_SIZE);
+
+	if (data_before_compression == NULL)
+		data_before_compression = makeStringInfo();
+	if (compressed_data == NULL)
+		compressed_data = makeStringInfo();
+	XLogEnsureCompressionBuffer(SizeOfXLogRecord);
 }
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 3596af0617..558f40c1fa 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -32,6 +32,7 @@
 #include "catalog/pg_control.h"
 #include "common/pg_lzcompress.h"
 #include "replication/origin.h"
+#include "utils/memutils.h"
 
 #ifndef FRONTEND
 #include "pgstat.h"
@@ -53,6 +54,8 @@ static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
 static void ResetDecoder(XLogReaderState *state);
 static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 							   int segsize, const char *waldir);
+static XLogRecord* XLogDecompressRecordIfNeeded(XLogReaderState *state, XLogRecord *record,
+												XLogRecPtr recptr);
 
 /* size of the buffer allocated for error message. */
 #define MAX_ERRORMSG_LEN 1000
@@ -169,6 +172,8 @@ XLogReaderFree(XLogReaderState *state)
 	pfree(state->errormsg_buf);
 	if (state->readRecordBuf)
 		pfree(state->readRecordBuf);
+	if (state->decompression_buffer)
+		pfree(state->decompression_buffer);
 	pfree(state->readBuf);
 	pfree(state);
 }
@@ -532,7 +537,8 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	XLogRecPtr	targetPagePtr;
 	bool		randAccess;
 	uint32		len,
-				total_len;
+				total_len_decomp,
+				total_len_phisical;
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
 	bool		assembled;
@@ -643,8 +649,27 @@ restart:
 	 * whole header.
 	 */
 	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
-	total_len = record->xl_tot_len;
+	total_len_phisical = record->xl_tot_len;
 
+	/* TODO: Actually, we should not trust this compression bit too... */
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		if (targetRecOff > XLOG_BLCKSZ - SizeOfXLogCompressedRecord)
+		{
+			total_len_decomp = -1; /* Need reassemble to know the size */
+		}
+		else
+		{
+			XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+			// Assert(((int32_t)c->decompressed_length) > 0); // We cannot assert this, this might be a garbage
+			total_len_decomp = c->decompressed_length;
+		}
+	}
+	else
+		if (targetRecOff > XLOG_BLCKSZ - SizeOfXLogRecord)
+			total_len_decomp = record->xl_tot_len;
+		else 
+			total_len_decomp = -1; /* We are not sure record is not compressed */
 	/*
 	 * If the whole record header is on this page, validate it immediately.
 	 * Otherwise do just a basic sanity check on xl_tot_len, and validate the
@@ -659,16 +684,18 @@ restart:
 								   randAccess))
 			goto err;
 		gotheader = true;
+		if (record->xl_info & XLR_COMPRESSED)
+			gotheader = targetRecOff <= XLOG_BLCKSZ - SizeOfXLogCompressedRecord;
 	}
 	else
 	{
 		/* There may be no next page if it's too small. */
-		if (total_len < SizeOfXLogRecord)
+		if (total_len_phisical < SizeOfXLogRecord)
 		{
 			report_invalid_record(state,
 								  "invalid record length at %X/%X: expected at least %u, got %u",
 								  LSN_FORMAT_ARGS(RecPtr),
-								  (uint32) SizeOfXLogRecord, total_len);
+								  (uint32) SizeOfXLogRecord, total_len_phisical);
 			goto err;
 		}
 		/* We'll validate the header once we have the next page. */
@@ -680,9 +707,11 @@ restart:
 	 * calling palloc.  If we can't, we'll try again below after we've
 	 * validated that total_len isn't garbage bytes from a recycled WAL page.
 	 */
-	decoded = XLogReadRecordAlloc(state,
-								  total_len,
+	if (total_len_decomp != -1)
+		decoded = XLogReadRecordAlloc(state,
+								  total_len_decomp,
 								  false /* allow_oversized */ );
+
 	if (decoded == NULL && nonblocking)
 	{
 		/*
@@ -694,7 +723,7 @@ restart:
 	}
 
 	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
-	if (total_len > len)
+	if (total_len_phisical > len)
 	{
 		/* Need to reassemble record */
 		char	   *contdata;
@@ -724,7 +753,7 @@ restart:
 
 			/* Wait for the next page to become available */
 			readOff = ReadPageInternal(state, targetPagePtr,
-									   Min(total_len - gotlen + SizeOfXLogShortPHD,
+									   Min(total_len_phisical - gotlen + SizeOfXLogShortPHD,
 										   XLOG_BLCKSZ));
 
 			if (readOff == XLREAD_WOULDBLOCK)
@@ -765,12 +794,12 @@ restart:
 			 * we expect there to be left.
 			 */
 			if (pageHeader->xlp_rem_len == 0 ||
-				total_len != (pageHeader->xlp_rem_len + gotlen))
+				total_len_phisical != (pageHeader->xlp_rem_len + gotlen))
 			{
 				report_invalid_record(state,
 									  "invalid contrecord length %u (expected %lld) at %X/%X",
 									  pageHeader->xlp_rem_len,
-									  ((long long) total_len) - gotlen,
+									  ((long long) total_len_phisical) - gotlen,
 									  LSN_FORMAT_ARGS(RecPtr));
 				goto err;
 			}
@@ -813,7 +842,7 @@ restart:
 			 * also cross-checked total_len against xlp_rem_len on the second
 			 * page, and verified xlp_pageaddr on both.
 			 */
-			if (total_len > state->readRecordBufSize)
+			if (total_len_phisical > state->readRecordBufSize)
 			{
 				char		save_copy[XLOG_BLCKSZ * 2];
 
@@ -824,11 +853,11 @@ restart:
 				Assert(gotlen <= lengthof(save_copy));
 				Assert(gotlen <= state->readRecordBufSize);
 				memcpy(save_copy, state->readRecordBuf, gotlen);
-				allocate_recordbuf(state, total_len);
+				allocate_recordbuf(state, total_len_phisical);
 				memcpy(state->readRecordBuf, save_copy, gotlen);
 				buffer = state->readRecordBuf + gotlen;
 			}
-		} while (gotlen < total_len);
+		} while (gotlen < total_len_phisical);
 		Assert(gotheader);
 
 		record = (XLogRecord *) state->readRecordBuf;
@@ -843,8 +872,9 @@ restart:
 	else
 	{
 		/* Wait for the record data to become available */
+		Assert(targetRecOff + total_len_phisical <= XLOG_BLCKSZ);
 		readOff = ReadPageInternal(state, targetPagePtr,
-								   Min(targetRecOff + total_len, XLOG_BLCKSZ));
+								   targetRecOff + total_len_phisical);
 		if (readOff == XLREAD_WOULDBLOCK)
 			return XLREAD_WOULDBLOCK;
 		else if (readOff < 0)
@@ -854,7 +884,7 @@ restart:
 		if (!ValidXLogRecord(state, record, RecPtr))
 			goto err;
 
-		state->NextRecPtr = RecPtr + MAXALIGN(total_len);
+		state->NextRecPtr = RecPtr + MAXALIGN(total_len_phisical);
 
 		state->DecodeRecPtr = RecPtr;
 	}
@@ -877,8 +907,19 @@ restart:
 	if (decoded == NULL)
 	{
 		Assert(!nonblocking);
+
+		/* total_len_decomp might be not actual */
+		if (record->xl_info & XLR_COMPRESSED)
+		{
+			XLogCompressionHeader *c = (XLogCompressionHeader*) record;
+			Assert(((int32_t)c->decompressed_length) > 0);
+			Assert(((int32_t)c->decompressed_length) < MaxAllocSize);
+			total_len_decomp = c->decompressed_length;
+		}
+		else
+			total_len_decomp = record->xl_tot_len;
 		decoded = XLogReadRecordAlloc(state,
-									  total_len,
+									  total_len_decomp,
 									  true /* allow_oversized */ );
 		/* allocation should always happen under allow_oversized */
 		Assert(decoded != NULL);
@@ -1646,6 +1687,84 @@ DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
 	return size;
 }
 
+static XLogRecord* XLogDecompressRecordIfNeeded(XLogReaderState *state,
+												XLogRecord *record,
+												XLogRecPtr recptr)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionHeader	*src = (XLogCompressionHeader*) record;
+		bool				decomp_success = true;
+		uint32				srclen = src->record_header.xl_tot_len - SizeOfXLogCompressedRecord;
+		char				*dst;
+		XLogRecord			*dst_h;
+
+		if (state->decompression_buffer_size < src->decompressed_length + SizeOfXLogRecord)
+		{
+			if (state->decompression_buffer)
+				pfree(state->decompression_buffer);
+			/* Avoid small steps in growths, we compress only big records */
+			state->decompression_buffer_size = TYPEALIGN(BLCKSZ, src->decompressed_length + SizeOfXLogRecord);
+			state->decompression_buffer = palloc(state->decompression_buffer_size);
+		}
+		dst_h = (XLogRecord*) state->decompression_buffer;
+		*dst_h = src->record_header;
+		dst_h->xl_tot_len = src->decompressed_length;
+		dst = (char*) &dst_h[1];
+
+		/* If a backup block image is compressed, decompress it */
+
+		if (src->method == XLR_COMPRESS_PGLZ)
+		{
+			if (pglz_decompress((char*) &src[1], srclen, dst,
+								state->decompression_buffer_size, true) < 0)
+				decomp_success = false;
+		}
+		else if (src->method == XLR_COMPRESS_LZ4)
+		{
+#ifdef USE_LZ4
+			if (LZ4_decompress_safe((char*) &src[1], dst,
+									srclen, state->decompression_buffer_size) <= 0)
+				decomp_success = false;
+#else
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with %s not supported by build",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr), "lz4");
+			return NULL;
+#endif
+		}
+		else if (src->method == XLR_COMPRESS_ZSTD)
+		{
+#ifdef USE_ZSTD
+			size_t		decomp_result = ZSTD_decompress(dst,
+														state->decompression_buffer_size,
+														(char*) &src[1], srclen);
+			if (ZSTD_isError(decomp_result))
+				decomp_success = false;
+#else
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with %s not supported by build",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr), "zstd");
+			return NULL;
+#endif
+		}
+		else
+		{
+			report_invalid_record(state, "could not decompress record at %X/%X compressed with unknown method",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr));
+			return NULL;
+		}
+
+		if (!decomp_success)
+		{
+			report_invalid_record(state, "could not decompress record at %X/%X",
+								  LSN_FORMAT_ARGS((XLogRecPtr)recptr));
+			return NULL;
+		}
+
+		return (XLogRecord*) state->decompression_buffer;
+	}
+	return record;
+}
+
 /*
  * Decode a record.  "decoded" must point to a MAXALIGNed memory area that has
  * space for at least DecodeXLogRecordRequiredSpace(record) bytes.  On
@@ -1684,6 +1803,14 @@ DecodeXLogRecord(XLogReaderState *state,
 	RelFileLocator *rlocator = NULL;
 	uint8		block_id;
 
+	record = XLogDecompressRecordIfNeeded(state, record, lsn);
+
+	if (!record)
+	{
+		/* Decompression failed, error must be reported already */
+		return false;
+	}
+
 	decoded->header = *record;
 	decoded->lsn = lsn;
 	decoded->next = NULL;
@@ -1791,16 +1918,7 @@ DecodeXLogRecord(XLogReaderState *state,
 				COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
 
 				blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
-
-				if (BKPIMAGE_COMPRESSED(blk->bimg_info))
-				{
-					if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
-						COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
-					else
-						blk->hole_length = 0;
-				}
-				else
-					blk->hole_length = BLCKSZ - blk->bimg_len;
+				blk->hole_length = BLCKSZ - blk->bimg_len;
 				datatotal += blk->bimg_len;
 
 				/*
@@ -1836,29 +1954,15 @@ DecodeXLogRecord(XLogReaderState *state,
 					goto err;
 				}
 
-				/*
-				 * Cross-check that bimg_len < BLCKSZ if it is compressed.
-				 */
-				if (BKPIMAGE_COMPRESSED(blk->bimg_info) &&
-					blk->bimg_len == BLCKSZ)
-				{
-					report_invalid_record(state,
-										  "BKPIMAGE_COMPRESSED set, but block image length %u at %X/%X",
-										  (unsigned int) blk->bimg_len,
-										  LSN_FORMAT_ARGS(state->ReadRecPtr));
-					goto err;
-				}
-
 				/*
 				 * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE is
 				 * set nor COMPRESSED().
 				 */
 				if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) &&
-					!BKPIMAGE_COMPRESSED(blk->bimg_info) &&
 					blk->bimg_len != BLCKSZ)
 				{
 					report_invalid_record(state,
-										  "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%X",
+										  "BKPIMAGE_HAS_HOLE is not set, but block image length is %u at %X/%X",
 										  (unsigned int) blk->data_len,
 										  LSN_FORMAT_ARGS(state->ReadRecPtr));
 					goto err;
@@ -2057,7 +2161,6 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 {
 	DecodedBkpBlock *bkpb;
 	char	   *ptr;
-	PGAlignedBlock tmp;
 
 	if (block_id > record->record->max_block_id ||
 		!record->record->blocks[block_id].in_use)
@@ -2079,67 +2182,6 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 	bkpb = &record->record->blocks[block_id];
 	ptr = bkpb->bkp_image;
 
-	if (BKPIMAGE_COMPRESSED(bkpb->bimg_info))
-	{
-		/* If a backup block image is compressed, decompress it */
-		bool		decomp_success = true;
-
-		if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-		{
-			if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
-								BLCKSZ - bkpb->hole_length, true) < 0)
-				decomp_success = false;
-		}
-		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-		{
-#ifdef USE_LZ4
-			if (LZ4_decompress_safe(ptr, tmp.data,
-									bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0)
-				decomp_success = false;
-#else
-			report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  "LZ4",
-								  block_id);
-			return false;
-#endif
-		}
-		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-		{
-#ifdef USE_ZSTD
-			size_t		decomp_result = ZSTD_decompress(tmp.data,
-														BLCKSZ - bkpb->hole_length,
-														ptr, bkpb->bimg_len);
-
-			if (ZSTD_isError(decomp_result))
-				decomp_success = false;
-#else
-			report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  "zstd",
-								  block_id);
-			return false;
-#endif
-		}
-		else
-		{
-			report_invalid_record(record, "could not restore image at %X/%X compressed with unknown method, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  block_id);
-			return false;
-		}
-
-		if (!decomp_success)
-		{
-			report_invalid_record(record, "could not decompress image at %X/%X, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  block_id);
-			return false;
-		}
-
-		ptr = tmp.data;
-	}
-
 	/* generate page, taking into account hole if necessary */
 	if (bkpb->hole_length == 0)
 	{
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index c9d8cd796a..795e93a014 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -2980,6 +2980,17 @@ struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"wal_compression_threshold", PGC_SIGHUP, WAL_SETTINGS,
+			gettext_noop("Minimum WAL record length to engage compression."),
+			NULL,
+			GUC_UNIT_BYTE
+		},
+		&wal_compression_threshold,
+		512, 32, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"wal_writer_flush_after", PGC_SIGHUP, WAL_SETTINGS,
 			gettext_noop("Amount of WAL written out by WAL writer that triggers a flush."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index b2bc43383d..90245c9b8d 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -239,6 +239,7 @@
 					# (change requires restart)
 #wal_compression = off			# enables compression of full-page writes;
 					# off, pglz, lz4, zstd, or on
+#wal_compression_threshold = 512	# min 32, minimal record length to be compressed
 #wal_init_zero = on			# zero-fill new WAL files
 #wal_recycle = on			# recycle WAL files
 #wal_buffers = -1			# min 32kB, -1 sets based on shared_buffers
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 4411c1468a..f40c2ae76b 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -56,6 +56,7 @@ extern PGDLLIMPORT int CommitDelay;
 extern PGDLLIMPORT int CommitSiblings;
 extern PGDLLIMPORT bool track_wal_io_timing;
 extern PGDLLIMPORT int wal_decode_buffer_size;
+extern PGDLLIMPORT int wal_compression_threshold;
 
 extern PGDLLIMPORT int CheckPointSegments;
 
diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h
index 71894262fb..0266e224bb 100644
--- a/src/include/access/xloginsert.h
+++ b/src/include/access/xloginsert.h
@@ -44,6 +44,7 @@ extern void XLogBeginInsert(void);
 extern void XLogSetRecordFlags(uint8 flags);
 extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info);
 extern void XLogEnsureRecordSpace(int max_block_id, int ndatas);
+extern void XLogEnsureCompressionBuffer(uint32 extraLen);
 extern void XLogRegisterData(const char *data, uint32 len);
 extern void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags);
 extern void XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator,
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 9738462d3c..f41106cbdc 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -252,6 +252,12 @@ struct XLogReaderState
 	char	   *decode_buffer_head; /* data is read from the head */
 	char	   *decode_buffer_tail; /* new data is written at the tail */
 
+	/*
+	 * Buffer to decompress records
+	 */
+	char	   *decompression_buffer;
+	uint32 		decompression_buffer_size;
+
 	/*
 	 * Queue of records that have been decoded.  This is a linked list that
 	 * usually consists of consecutive records in decode_buffer, but may also
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index a06833ce0a..9261760272 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -48,7 +48,7 @@ typedef struct XLogRecord
 	/* 2 bytes of padding here, initialize to zero */
 	pg_crc32c	xl_crc;			/* CRC for this record */
 
-	/* XLogRecordBlockHeaders and XLogRecordDataHeader follow, no padding */
+	/* XLogRecordBlockHeaders, XLogRecordDataHeader or compression header follow, no padding */
 
 } XLogRecord;
 
@@ -90,6 +90,9 @@ typedef struct XLogRecord
  */
 #define XLR_CHECK_CONSISTENCY	0x02
 
+/* This bit in xl_info means the record is compressed */
+#define XLR_COMPRESSED	0x04
+
 /*
  * Header info for block data appended to an XLOG record.
  *
@@ -143,11 +146,6 @@ typedef struct XLogRecordBlockImageHeader
 	uint16		length;			/* number of page image bytes */
 	uint16		hole_offset;	/* number of bytes before "hole" */
 	uint8		bimg_info;		/* flag bits, see below */
-
-	/*
-	 * If BKPIMAGE_HAS_HOLE and BKPIMAGE_COMPRESSED(), an
-	 * XLogRecordBlockCompressHeader struct follows.
-	 */
 } XLogRecordBlockImageHeader;
 
 #define SizeOfXLogRecordBlockImageHeader	\
@@ -158,25 +156,19 @@ typedef struct XLogRecordBlockImageHeader
 #define BKPIMAGE_APPLY			0x02	/* page image should be restored
 										 * during replay */
 /* compression methods supported */
-#define BKPIMAGE_COMPRESS_PGLZ	0x04
-#define BKPIMAGE_COMPRESS_LZ4	0x08
-#define BKPIMAGE_COMPRESS_ZSTD	0x10
-
-#define	BKPIMAGE_COMPRESSED(info) \
-	((info & (BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | \
-			  BKPIMAGE_COMPRESS_ZSTD)) != 0)
+#define XLR_COMPRESS_PGLZ	0x04
+#define XLR_COMPRESS_LZ4	0x08
+#define XLR_COMPRESS_ZSTD	0x10
 
-/*
- * Extra header information used when page image has "hole" and
- * is compressed.
- */
-typedef struct XLogRecordBlockCompressHeader
+/* Record of a compressed header */
+typedef struct XLogCompressionHeader
 {
-	uint16		hole_length;	/* number of bytes in "hole" */
-} XLogRecordBlockCompressHeader;
+	XLogRecord record_header;
+	char	method;
+	uint32	decompressed_length;
+} XLogCompressionHeader;
 
-#define SizeOfXLogRecordBlockCompressHeader \
-	sizeof(XLogRecordBlockCompressHeader)
+#define SizeOfXLogCompressedRecord	(offsetof(XLogCompressionHeader, decompressed_length) + sizeof(uint32))
 
 /*
  * Maximum size of the header for a block reference. This is used to size a
@@ -185,7 +177,6 @@ typedef struct XLogRecordBlockCompressHeader
 #define MaxSizeOfXLogRecordBlockHeader \
 	(SizeOfXLogRecordBlockHeader + \
 	 SizeOfXLogRecordBlockImageHeader + \
-	 SizeOfXLogRecordBlockCompressHeader + \
 	 sizeof(RelFileLocator) + \
 	 sizeof(BlockNumber))
 
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 449e50bd78..9ff11e3e6a 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -355,7 +355,7 @@
  * Enable debugging print statements for WAL-related operations; see
  * also the wal_debug GUC var.
  */
-/* #define WAL_DEBUG */
+#define WAL_DEBUG
 
 /*
  * Enable tracing of syncscan operations (see also the trace_syncscan GUC var).
diff --git a/src/test/recovery/t/026_overwrite_contrecord.pl b/src/test/recovery/t/026_overwrite_contrecord.pl
index f408d4f69b..942cf26f3f 100644
--- a/src/test/recovery/t/026_overwrite_contrecord.pl
+++ b/src/test/recovery/t/026_overwrite_contrecord.pl
@@ -56,7 +56,7 @@ $$;
 my $initfile = $node->safe_psql('postgres',
 	'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
 $node->safe_psql('postgres',
-	qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
+	qq{SET wal_compression to off; SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
 );
 #$node->safe_psql('postgres', qq{create table foo ()});
 my $endfile = $node->safe_psql('postgres',
diff --git a/src/test/recovery/t/039_end_of_wal.pl b/src/test/recovery/t/039_end_of_wal.pl
index ab751eb271..d4c3d65877 100644
--- a/src/test/recovery/t/039_end_of_wal.pl
+++ b/src/test/recovery/t/039_end_of_wal.pl
@@ -81,7 +81,7 @@ sub emit_message
 	return int(
 		$node->safe_psql(
 			'postgres',
-			"SELECT pg_logical_emit_message(true, '', repeat('a', $size)) - '0/0'"
+			"SET wal_compression to off;SELECT pg_logical_emit_message(true, '', repeat('a', $size)) - '0/0'"
 		));
 }
 
-- 
2.39.5 (Apple Git-154)

