From b26154cd972bca454ed3e21a8f66b47600a07313 Mon Sep 17 00:00:00 2001
From: Andrey Borodin <amborodin@acm.org>
Date: Sun, 27 Dec 2020 12:01:35 +0500
Subject: [PATCH v16 7/7] Add Lz4 compression to WAL FPIs

---
 src/backend/access/transam/xlog.c       |  1 +
 src/backend/access/transam/xloginsert.c | 41 ++++++++++++++++++++++---
 src/backend/access/transam/xlogreader.c | 32 +++++++++++++++++--
 src/backend/utils/misc/guc.c            | 17 ++++++++++
 src/include/access/xlog.h               |  2 ++
 src/include/access/xlogreader.h         |  1 +
 src/include/access/xlogrecord.h         |  1 +
 src/include/catalog/pg_am.dat           |  4 +--
 src/include/catalog/pg_proc.dat         |  6 ++--
 9 files changed, 93 insertions(+), 12 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 48ca46a941..8e4f31819e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -112,6 +112,7 @@ int			CommitDelay = 0;	/* precommit delay in microseconds */
 int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int			wal_retrieve_retry_interval = 5000;
 int			max_slot_wal_keep_size_mb = -1;
+int			wal_compression_method = LZ4_COMPRESSION_ID;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 1f0e4e01e6..a7f4accb1c 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 
+#include "access/compressamapi.h"
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
@@ -33,6 +34,10 @@
 #include "storage/proc.h"
 #include "utils/memutils.h"
 
+#ifdef HAVE_LIBLZ4
+#include "lz4.h"
+#endif
+
 /* Buffer size required to store a compressed version of backup block image */
 #define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
 
@@ -113,7 +118,8 @@ static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
 									   XLogRecPtr RedoRecPtr, bool doPageWrites,
 									   XLogRecPtr *fpw_lsn, int *num_fpi);
 static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
-									uint16 hole_length, char *dest, uint16 *dlen);
+									uint16 hole_length, char *dest,
+									uint16 *dlen, CompressionId compression);
 
 /*
  * Begin constructing a WAL record. This must be called before the
@@ -630,11 +636,15 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			 */
 			if (wal_compression)
 			{
+				bimg.compression_method = PGLZ_COMPRESSION_ID;
+#ifdef HAVE_LIBLZ4
+				bimg.compression_method = wal_compression_method;
+#endif
 				is_compressed =
 					XLogCompressBackupBlock(page, bimg.hole_offset,
 											cbimg.hole_length,
 											regbuf->compressed_page,
-											&compressed_len);
+											&compressed_len, bimg.compression_method);
 			}
 
 			/*
@@ -827,7 +837,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
  */
 static bool
 XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
-						char *dest, uint16 *dlen)
+						char *dest, uint16 *dlen, CompressionId compression)
 {
 	int32		orig_len = BLCKSZ - hole_length;
 	int32		len;
@@ -853,12 +863,33 @@ XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
 	else
 		source = page;
 
+	if (compression == LZ4_COMPRESSION_ID)
+	{
+#ifndef HAVE_LIBLZ4
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("not built with lz4 support")));
+#else
+		len = LZ4_compress_fast(source, dest, orig_len, PGLZ_MAX_BLCKSZ, 1);
+#endif
+	}
+	else if (compression == PGLZ_COMPRESSION_ID)
+	{
+		len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
+	}
+	else
+	{
+		ereport(ERROR,
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+			 errmsg("unknown compression method requested")));
+	}
+	
+	
 	/*
-	 * We recheck the actual size even if pglz_compress() reports success and
+	 * We recheck the actual size even if compression reports success and
 	 * see if the number of bytes saved by compression is larger than the
 	 * length of extra data needed for the compressed version of block image.
 	 */
-	len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
 	if (len >= 0 &&
 		len + extra_bytes < orig_len)
 	{
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index a63ad8cfd0..d1da11b79b 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -33,6 +33,10 @@
 #include "utils/memutils.h"
 #endif
 
+#ifdef HAVE_LIBLZ4
+#include "lz4.h"
+#endif
+
 static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
 			pg_attribute_printf(2, 3);
 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
@@ -1291,6 +1295,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 			{
 				COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
 				COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
+				COPY_HEADER_FIELD(&blk->compression_method, sizeof(uint8));
 				COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
 
 				blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
@@ -1565,8 +1570,31 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 	if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
 	{
 		/* If a backup block image is compressed, decompress it */
-		if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
-							BLCKSZ - bkpb->hole_length, true) < 0)
+		int32 decomp_result = -1;
+		if (bkpb->compression_method == PGLZ_COMPRESSION_ID)
+		{
+			decomp_result = pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
+							BLCKSZ - bkpb->hole_length, true);
+		}
+		else if (bkpb->compression_method == LZ4_COMPRESSION_ID)
+		{
+#ifndef HAVE_LIBLZ4
+			ereport(ERROR,
+					(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+					errmsg("not built with lz4 support")));
+#else
+			decomp_result = LZ4_decompress_safe(ptr, tmp.data, bkpb->bimg_len, BLCKSZ);
+#endif
+		}
+		else
+		{
+#ifndef FRONTEND
+			ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("unknown compression method requested")));
+#endif
+		}
+		if ( decomp_result < 0)
 		{
 			report_invalid_record(record, "invalid compressed image at %X/%X, block %d",
 								  (uint32) (record->ReadRecPtr >> 32),
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index bbaf037bc6..290fd00687 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -30,6 +30,7 @@
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/compressamapi.h"
 #include "access/gin.h"
 #include "access/rmgr.h"
 #include "access/tableam.h"
@@ -483,6 +484,12 @@ const struct config_enum_entry ssl_protocol_versions_info[] = {
 	{NULL, 0, false}
 };
 
+const struct config_enum_entry wal_compression_options[] = {
+	{"pglz", PGLZ_COMPRESSION_ID, false},
+	{"lz4", LZ4_COMPRESSION_ID, false},
+	{NULL, 0, false}
+};
+
 StaticAssertDecl(lengthof(ssl_protocol_versions_info) == (PG_TLS1_3_VERSION + 2),
 				 "array length mismatch");
 
@@ -4679,6 +4686,16 @@ static struct config_enum ConfigureNamesEnum[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"wal_compression_method", PGC_SIGHUP, WAL_SETTINGS,
+			gettext_noop("Set the method used to compress full page images in the WAL."),
+			NULL
+		},
+		&wal_compression_method,
+		LZ4_COMPRESSION_ID, wal_compression_options,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"dynamic_shared_memory_type", PGC_POSTMASTER, RESOURCES_MEM,
 			gettext_noop("Selects the dynamic shared memory implementation used."),
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 221af87e71..5099c07b63 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -11,6 +11,7 @@
 #ifndef XLOG_H
 #define XLOG_H
 
+#include "access/compressamapi.h"
 #include "access/rmgr.h"
 #include "access/xlogdefs.h"
 #include "access/xloginsert.h"
@@ -175,6 +176,7 @@ typedef enum RecoveryState
 } RecoveryState;
 
 extern PGDLLIMPORT int wal_level;
+extern PGDLLIMPORT int wal_compression_method;
 
 /* Is WAL archiving enabled (always or only while server is running normally)? */
 #define XLogArchivingActive() \
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 0b6d00dd7d..791b6f435c 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -133,6 +133,7 @@ typedef struct
 	bool		apply_image;	/* has image that should be restored */
 	char	   *bkp_image;
 	uint16		hole_offset;
+	uint8		compression_method;
 	uint16		hole_length;
 	uint16		bimg_len;
 	uint8		bimg_info;
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index 2f0c8bf589..7eee5d42d0 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -131,6 +131,7 @@ typedef struct XLogRecordBlockImageHeader
 {
 	uint16		length;			/* number of page image bytes */
 	uint16		hole_offset;	/* number of bytes before "hole" */
+	uint8		compression_method;
 	uint8		bimg_info;		/* flag bits, see below */
 
 	/*
diff --git a/src/include/catalog/pg_am.dat b/src/include/catalog/pg_am.dat
index 3a8d0ac09c..a984eeeadf 100644
--- a/src/include/catalog/pg_am.dat
+++ b/src/include/catalog/pg_am.dat
@@ -33,10 +33,10 @@
 { oid => '3580', oid_symbol => 'BRIN_AM_OID',
   descr => 'block range index (BRIN) access method',
   amname => 'brin', amhandler => 'brinhandler', amtype => 'i' },
-{ oid => '4225', oid_symbol => 'PGLZ_COMPRESSION_AM_OID',
+{ oid => '8022', oid_symbol => 'PGLZ_COMPRESSION_AM_OID',
   descr => 'pglz compression access method',
   amname => 'pglz', amhandler => 'pglzhandler', amtype => 'c' },
-{ oid => '4226', oid_symbol => 'LZ4_COMPRESSION_AM_OID',
+{ oid => '8023', oid_symbol => 'LZ4_COMPRESSION_AM_OID',
   descr => 'lz4 compression access method',
   amname => 'lz4', amhandler => 'lz4handler', amtype => 'c' },  
 ]
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index d851619543..3148285f45 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -941,11 +941,11 @@
   prorettype => 'void', proargtypes => 'regclass int8',
   prosrc => 'brin_desummarize_range' },
 
-{ oid => '4388', descr => 'pglz compression access method handler',
+{ oid => '8024', descr => 'pglz compression access method handler',
   proname => 'pglzhandler', provolatile => 'v',
   prorettype => 'compression_am_handler', proargtypes => 'internal',
   prosrc => 'pglzhandler' },
-{ oid => '4389', descr => 'lz4 compression access method handler',
+{ oid => '8021', descr => 'lz4 compression access method handler',
   proname => 'lz4handler', provolatile => 'v',
   prorettype => 'compression_am_handler', proargtypes => 'internal',
   prosrc => 'lz4handler' },
@@ -7056,7 +7056,7 @@
   descr => 'bytes required to store the value, perhaps with compression',
   proname => 'pg_column_size', provolatile => 's', prorettype => 'int4',
   proargtypes => 'any', prosrc => 'pg_column_size' },
-{ oid => '2228',
+{ oid => '8025',
   descr => 'compression method for the compressed datum',
   proname => 'pg_column_compression', provolatile => 's', prorettype => 'text',
   proargtypes => 'any', prosrc => 'pg_column_compression' },
-- 
2.24.3 (Apple Git-128)

