On Mon, Nov 14, 2022 at 07:44:24PM -0800, Andrey Borodin wrote:
> patchset needs a heavy rebase. If no one shows up to fix it I'll do

Despite what its git timestamp says, this is based on the most recent
patch from January, which I've had floating around since then.  It
needed to be rebased over at least:

  - guc_tables patch;
  - build and test with meson;
  - doc/

Some of my changes are separate so you can see what I've done.
check_libpq_compression() is in the wrong place, but I couldn't
immediately see where else to put it, since src/common can't include the
backend's guc headers.

Some of the makefile changes seem unnecessary (now?), and my meson
changes don't seem quite right, either.

There's no reason for Zstd to be a separate patch anymore.

It should be updated to parse compression level and options using the
infrastructure introduced for basebackup.

And address the architectural issue from 2 years ago:
https://www.postgresql.org/message-id/20220118043919.GA23027%40telsasoft.com

The global variable PqStream should be moved into some libpq structure
(Port?) and handled within secure_read().  And pqsecure_read shouldn't
be passed as a function pointer/callback.  And verify it passes tests
with all supported compression algorithms and connects to old servers.

-- 
Justin
>From 6a74b2af0131499a3afa5132bc6dac08eb1884f6 Mon Sep 17 00:00:00 2001
From: usernamedt <usernam...@yandex-team.com>
Date: Fri, 14 Jan 2022 01:38:55 +0500
Subject: [PATCH 1/4] Implement libpq compression

---
 doc/src/sgml/config.sgml                      |   21 +
 doc/src/sgml/libpq.sgml                       |   37 +
 doc/src/sgml/protocol.sgml                    |  158 +++
 src/Makefile.global.in                        |    1 +
 src/backend/Makefile                          |    8 +
 src/backend/catalog/system_views.sql          |    9 +
 src/backend/libpq/pqcomm.c                    |  277 ++++-
 src/backend/postmaster/postmaster.c           |   10 +
 .../libpqwalreceiver/libpqwalreceiver.c       |   13 +-
 src/backend/utils/activity/backend_status.c   |   29 +
 src/backend/utils/adt/pgstatfuncs.c           |   50 +-
 src/backend/utils/misc/guc_funcs.c            |   21 +
 src/backend/utils/misc/guc_tables.c           |   10 +
 src/bin/pgbench/pgbench.c                     |   17 +-
 src/bin/psql/command.c                        |   17 +
 src/common/Makefile                           |    4 +-
 src/common/z_stream.c                         |  663 +++++++++++
 src/common/zpq_stream.c                       | 1022 +++++++++++++++++
 src/include/catalog/pg_proc.dat               |   18 +-
 src/include/common/z_stream.h                 |  109 ++
 src/include/common/zpq_stream.h               |  120 ++
 src/include/libpq/libpq-be.h                  |    3 +
 src/include/libpq/libpq.h                     |    1 +
 src/include/libpq/pqcomm.h                    |    3 +
 src/include/utils/backend_status.h            |    7 +
 src/interfaces/libpq/Makefile                 |   14 +
 src/interfaces/libpq/exports.txt              |    2 +
 src/interfaces/libpq/fe-connect.c             |  129 ++-
 src/interfaces/libpq/fe-exec.c                |   10 +-
 src/interfaces/libpq/fe-misc.c                |   92 +-
 src/interfaces/libpq/fe-protocol3.c           |   71 +-
 src/interfaces/libpq/libpq-fe.h               |    4 +
 src/interfaces/libpq/libpq-int.h              |   14 +
 src/test/regress/expected/rules.out           |   14 +-
 src/tools/msvc/Mkvcbuild.pm                   |    2 +-
 35 files changed, 2884 insertions(+), 96 deletions(-)
 create mode 100644 src/common/z_stream.c
 create mode 100644 src/common/zpq_stream.c
 create mode 100644 src/include/common/z_stream.h
 create mode 100644 src/include/common/zpq_stream.h

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index bd50ea8e480..de4d9532cc5 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1045,6 +1045,27 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-libpq-compression" xreflabel="libpq_compression">
+      <term><varname>libpq_compression</varname> (<type>string</type>)
+      <indexterm>
+       <primary><varname>libpq_compression</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        This parameter controls the available client-server traffic compression methods.
+        It allows rejecting compression requests even if it is supported by the server (for example, due to security, or CPU consumption).
+        The default is <literal>on</literal>, which means that all supported compression methods are allowed.
+        For more precise control, a list of the allowed compression methods can be specified.
+        For example, to allow only <literal>lz4</literal> and <literal>zlib</literal>, set the setting to <literal>lz4,zlib</literal>.
+        Also, maximal allowed compression level can be specified for each method, e.g. <literal>lz4:1,zlib:2</literal> setting will set the
+        maximal compression level for <literal>lz4</literal> to 1 and <literal>zlib</literal> to 2.
+        If a client requests the compression with a higher compression level, it will be set to the maximal allowed one.
+        Default (and recommended) maximal compression level for each algorithm is 1.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
      </sect2>
 
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 3c9bd3d6730..d92f63ec350 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -1259,6 +1259,43 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
       </listitem>
      </varlistentry>
 
+     <varlistentry id="libpq-connect-compression" xreflabel="compression">
+      <term><literal>compression</literal></term>
+      <listitem>
+      <para>
+        Request compression of libpq traffic. The client sends a request with
+        a list of compression algorithms. Compression can be requested by a client
+        by including the <literal>"compression"</literal> option in its connection string.
+        This can either be a boolean value to enable or disable compression
+        (<literal>"true"</literal>/<literal>"false"</literal>,
+        <literal>"on"</literal>/<literal>"off"</literal>,
+        <literal>"yes"</literal>/<literal>"no"</literal>,
+        <literal>"1"</literal>/<literal>"0"</literal>),
+        <literal>"any"</literal>,
+        or an explicit list of comma-separated compression algorithms
+        which can optionally include compression level (<literal>"zlib,lz4:2"</literal>).
+        If compression is enabled but an algorithm is not explicitly specified,
+        the client library sends its full list of supported algorithms.
+        The server intersects the received compression algorithms with the allowed ones (controlled via the <literal>libpq_compression</literal> server config setting).
+        If the intersection is not empty, the server responds with CompressionAck containing the final list of the compression algorithms that can be used for the compression of libpq messages between the client and server.
+        If the intersection is empty (server does not accept any of the requested algorithms), then it replies with CompressionAck containing the empty list and it is up to the client whether to continue without compression or to report an error.
+      </para>
+      <para>
+        After sending the CompressionAck message, the server can send the SetCompressionMethod message to set the current compression algorithm for server-to-client traffic compression.
+        After receiving the CompressionAck message, the client can send the SetCompressionMethod message to set the current compression algorithm for client-to-server traffic compression.
+        Compressed data is transmitted via the CompressedData messages.
+      </para>
+      <para>
+        Support for compression algorithms must be enabled when the server is compiled.
+        Currently, two libraries are supported: zlib (default) and lz4 (if Postgres was
+        configured with --with-lz4 option). In both cases, streaming mode is used.
+        By default, compression is not requested by the client.
+        Please note that using compression together with SSL may expose extra vulnerabilities:
+        <ulink url="https://en.wikipedia.org/wiki/CRIME";>CRIME</ulink>
+      </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="libpq-connect-client-encoding" xreflabel="client_encoding">
       <term><literal>client_encoding</literal></term>
       <listitem>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 5fdd429e05d..147d55eab23 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -92,6 +92,15 @@
    such as <command>COPY</command>.
   </para>
 
+  <para>
+    It is possible to compress protocol data to reduce traffic and speed-up client-server interaction.
+    Compression is especially useful for importing/exporting data to/from the database using the <literal>COPY</literal> command
+    and for replication (both physical and logical). Compression can also reduce the server's response time
+    for queries returning a large amount of data (for example, JSON, BLOBs, text, ...).
+    Currently, two libraries are supported: zlib (default) and lz4 (if Postgres was
+    configured with --with-lz4 option).
+  </para>
+
  <sect2 id="protocol-message-concepts">
   <title>Messaging Overview</title>
 
@@ -262,6 +271,21 @@
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term>CompressionAck</term>
+      <listitem>
+       <para>
+         The server accepts the client's compression request.
+         Compression is requested when a client connection includes the "compression" option, which contains a list of the requested compression algorithms.
+         The server intersects the requested compression algorithms with the allowed ones (controlled via the <literal>libpq_compression</literal> server config setting).
+         If the intersection is not empty, the server responds with CompressionAck containing the final list of the compression algorithms that can be used for the compression of libpq messages between the client and server.
+         If the intersection is empty (server does not accept any of the requested algorithms), then it replies with CompressionAck containing the empty list and it is up to the client whether to continue without compression or to report an error.
+         After sending the CompressionAck message, the server can send the SetCompressionMethod message to set the current compression algorithm for server-to-client traffic compression.
+         After receiving the CompressionAck message, the client can send the SetCompressionMethod message to set the current compression algorithm for client-to-server traffic compression.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term>AuthenticationOk</term>
       <listitem>
@@ -4142,6 +4166,140 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
     </listitem>
    </varlistentry>
 
+<varlistentry>
+<term>
+CompressionAck (B)
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('z')
+</term>
+<listitem>
+<para>
+                Acknowledge use of compression for protocol data.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32
+</term>
+<listitem>
+<para>
+                Length of message contents in bytes, including self.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Byte<replaceable>n</replaceable>
+</term>
+<listitem>
+<para>
+                List of the negotiated compression algorithms.
+</para>
+</listitem>
+</varlistentry>
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+
+<varlistentry>
+<term>
+SetCompressionMethod (F &amp; B)
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('k')
+</term>
+<listitem>
+<para>
+                Switch the current compression algorithm. Following CompressedData messages will be compressed via the compressed algorithm specified in this message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32(5)
+</term>
+<listitem>
+<para>
+                Length of message contents in bytes, including self.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int8
+</term>
+<listitem>
+<para>
+                Index of the new compression algorithm.
+</para>
+</listitem>
+</varlistentry>
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+<varlistentry>
+<term>
+CompressedData (F &amp; B)
+</term>
+<listitem>
+<para>
+
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('m')
+</term>
+<listitem>
+<para>
+                Identifies the message as compressed data.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32
+</term>
+<listitem>
+<para>
+                Length of message contents in bytes, including self.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Byte<replaceable>n</replaceable>
+</term>
+<listitem>
+<para>
+                Compressed message data.
+</para>
+</listitem>
+</varlistentry>
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
    <varlistentry id="protocol-message-formats-CopyData">
     <term>CopyData (F &amp; B)</term>
     <listitem>
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index e96bedd4e7b..1b3dc97e97d 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -196,6 +196,7 @@ with_llvm	= @with_llvm@
 with_system_tzdata = @with_system_tzdata@
 with_uuid	= @with_uuid@
 with_zlib	= @with_zlib@
+with_lz4    = @with_lz4@
 enable_rpath	= @enable_rpath@
 enable_nls	= @enable_nls@
 enable_debug	= @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index 181c217fae4..f90e078b6ed 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -55,6 +55,14 @@ ifeq ($(with_systemd),yes)
 LIBS += -lsystemd
 endif
 
+ifeq ($(with_lz4),yes)
+LIBS += -llz4
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+endif
+
 ##########################################################################
 
 all: submake-libpgport submake-catalog-headers submake-utils-headers postgres $(POSTGRES_IMP)
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 2d8104b0907..89d18c12ef2 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -871,6 +871,15 @@ CREATE VIEW pg_stat_activity AS
         LEFT JOIN pg_database AS D ON (S.datid = D.oid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
 
+CREATE VIEW pg_stat_network_traffic AS
+    SELECT
+	    S.pid,
+        S.rx_raw_bytes,
+        S.tx_raw_bytes,
+        S.rx_compressed_bytes,
+        S.tx_compressed_bytes
+    FROM pg_stat_get_activity(NULL) AS S;
+
 CREATE VIEW pg_stat_replication AS
     SELECT
             S.pid,
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index ce56ab1d41d..9bf4672950c 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -60,6 +60,7 @@
 #include <signal.h>
 #include <fcntl.h>
 #include <grp.h>
+#include <pgstat.h>
 #include <unistd.h>
 #include <sys/file.h>
 #include <sys/socket.h>
@@ -75,11 +76,14 @@
 
 #include "common/ip.h"
 #include "libpq/libpq.h"
+#include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "port/pg_bswap.h"
 #include "storage/ipc.h"
 #include "utils/guc_hooks.h"
 #include "utils/memutils.h"
+#include "utils/builtins.h"
+#include "common/zpq_stream.h"
 
 /*
  * Cope with the various platform-specific ways to spell TCP keepalive socket
@@ -106,6 +110,9 @@
 int			Unix_socket_permissions;
 char	   *Unix_socket_group;
 
+/* GUC variable containing the allowed compression algorithms list (separated by comma) */
+char	   *libpq_compress_algorithms;
+
 /* Where the Unix socket files are (list of palloc'd strings) */
 static List *sock_paths = NIL;
 
@@ -128,6 +135,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
 static int	PqRecvLength;		/* End of data available in PqRecvBuffer */
 
+static ZpqStream * PqStream;
+
+
 /*
  * Message status
  */
@@ -163,6 +173,145 @@ const PQcommMethods *PqCommMethods = &PqCommSocketMethods;
 
 WaitEventSet *FeBeWaitSet;
 
+static ssize_t
+write_compressed(void *arg, void const *data, size_t size)
+{
+	ssize_t		rc = secure_write((Port *) arg, (void *) data, size);
+
+	if (rc > 0)
+		pgstat_report_network_traffic(0, 0, 0, rc);
+	return rc;
+}
+
+static ssize_t
+read_compressed(void *arg, void *data, size_t size)
+{
+	ssize_t		rc = secure_read((Port *) arg, data, size);
+
+	if (rc > 0)
+		pgstat_report_network_traffic(0, 0, rc, 0);
+	return rc;
+}
+
+/*
+ * Send the chosen compression algorithms to the client
+ */
+static void
+SendCompressionACK(char *compressors_str)
+{
+	StringInfoData buf;
+
+	pq_beginmessage(&buf, 'z');
+	pq_sendstring(&buf, compressors_str);
+	pq_endmessage(&buf);
+	pq_flush();
+}
+
+/* --------------------------------
+ *		pq_configure - configure connection using port settings
+ *
+ * Right now only compression is toggled in the configure.
+ * Function returns 0 in case of success, non-null in case of error
+ * --------------------------------
+ */
+int
+pq_configure(Port *port)
+{
+	zpq_compressor *server_compressors;
+	zpq_compressor *client_compressors;
+	zpq_compressor *res_compressors;
+	size_t		n_client_compressors;
+	size_t		n_server_compressors;
+	size_t		n_res_compressors = 0;
+	char	   *res_compressors_str;
+	char	   *client_compression_algorithms = port->compression_algorithms;
+
+	if (!client_compression_algorithms || !libpq_compress_algorithms)
+	{
+		return 0;
+	}
+
+	if (zpq_parse_compression_setting(libpq_compress_algorithms, &server_compressors, &n_server_compressors) == -1)
+	{
+		ereport(LOG, errmsg("failed to parse configured compression setting: %s", libpq_compress_algorithms));
+		return 0;
+	}
+
+	if (n_server_compressors == 0)
+	{
+		/*
+		 * No enabled server compressors available, abort the compression
+		 * initialization.
+		 */
+		/* Send the compression acknowledgment with empty compressors list. */
+		char	   *empty_response = "";
+
+		SendCompressionACK(empty_response);
+		return 0;
+	}
+
+	if (!zpq_deserialize_compressors(client_compression_algorithms, &client_compressors, &n_client_compressors))
+	{
+		ereport(LOG, (errmsg("failed to parse received client compression methods: %s", client_compression_algorithms)));
+		return 0;
+	}
+
+	if (n_client_compressors == 0)
+	{
+		/*
+		 * client did not provide any compression algorithms that server is
+		 * compiled to support
+		 */
+		ereport(LOG, (errmsg("server doesn't support any of the compression methods requested by the client: %s", client_compression_algorithms)));
+		return 0;
+	}
+
+	res_compressors = malloc(Max(n_client_compressors, n_server_compressors) * sizeof(zpq_compressor));
+
+	/*
+	 * Intersect client and server compressors to determine the final list of
+	 * the supported compressors. O(N^2) is negligible because of a small
+	 * number of the compression methods.
+	 */
+	for (size_t i = 0; i < n_client_compressors; i++)
+	{
+		for (size_t j = 0; j < n_server_compressors; j++)
+		{
+			if (client_compressors[i].impl == server_compressors[j].impl)
+			{
+				res_compressors[n_res_compressors].impl = client_compressors[i].impl;
+				/* prefer the lower compression level */
+				res_compressors[n_res_compressors++].level = Min(client_compressors[i].level, server_compressors[j].level);
+				break;
+			}
+		}
+	}
+	free(client_compressors);
+	free(server_compressors);
+
+	if (n_res_compressors == 0)
+	{
+		char	   *empty_response = "";
+
+		ereport(LOG, (errmsg("did not find any matches between the client and server compression methods, not enabling compression")));
+		free(res_compressors);
+		/* send the compression ack with empty compressors list */
+		SendCompressionACK(empty_response);
+		return 0;
+	}
+
+	res_compressors_str = zpq_serialize_compressors(res_compressors, n_res_compressors);
+	SendCompressionACK(res_compressors_str);
+
+	/* Init compression */
+	PqStream = zpq_create(res_compressors, n_res_compressors, write_compressed, read_compressed, MyProcPort, NULL, 0);
+	if (!PqStream)
+	{
+		ereport(LOG, (errmsg("failed to initialize the compression stream")));
+		return -1;
+	}
+	return 0;
+}
 
 /* --------------------------------
  *		pq_init - initialize libpq at backend startup
@@ -266,6 +415,9 @@ socket_close(int code, Datum arg)
 		}
 #endif							/* ENABLE_GSS */
 
+		/* Release compression streams */
+		zpq_free(PqStream);
+
 		/*
 		 * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
 		 * call this, so this is safe when interrupting BackendInitialize().
@@ -902,12 +1054,15 @@ socket_set_nonblocking(bool nonblocking)
 /* --------------------------------
  *		pq_recvbuf - load some bytes into the input buffer
  *
- *		returns 0 if OK, EOF if trouble
+ *      nowait parameter toggles non-blocking mode.
+ *		returns number of read bytes, EOF if trouble
  * --------------------------------
  */
 static int
-pq_recvbuf(void)
+pq_recvbuf(bool nowait)
 {
+	int			r;
+
 	if (PqRecvPointer > 0)
 	{
 		if (PqRecvLength > PqRecvPointer)
@@ -923,21 +1078,40 @@ pq_recvbuf(void)
 	}
 
 	/* Ensure that we're in blocking mode */
-	socket_set_nonblocking(false);
+	socket_set_nonblocking(nowait);
 
 	/* Can fill buffer from PqRecvLength and upwards */
 	for (;;)
 	{
-		int			r;
-
-		r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
-						PQ_RECV_BUFFER_SIZE - PqRecvLength);
+		/*
+		 * If streaming compression is enabled then use correspondent
+		 * compression read function.
+		 */
+		r = PqStream
+			? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
+					   PQ_RECV_BUFFER_SIZE - PqRecvLength, false)
+			: secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
+						  PQ_RECV_BUFFER_SIZE - PqRecvLength);
 
 		if (r < 0)
 		{
+			if (r == ZS_DECOMPRESS_ERROR)
+			{
+				char const *msg = zpq_decompress_error(PqStream);
+
+				if (msg == NULL)
+					msg = "end of stream";
+				ereport(COMMERROR,
+						(errcode_for_socket_access(),
+						 errmsg("failed to decompress data: %s", msg)));
+				return EOF;
+			}
 			if (errno == EINTR)
 				continue;		/* Ok if interrupted */
 
+			if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK))
+				return 0;
+
 			/*
 			 * Careful: an ereport() that tries to write to the client would
 			 * cause recursion to here, leading to stack overflow and core
@@ -958,7 +1132,8 @@ pq_recvbuf(void)
 		}
 		/* r contains number of bytes read, so just incr length */
 		PqRecvLength += r;
-		return 0;
+		pgstat_report_network_traffic(r, 0, 0, 0);
+		return r;
 	}
 }
 
@@ -973,7 +1148,8 @@ pq_getbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+										 * some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer++];
@@ -992,7 +1168,8 @@ pq_peekbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+										 * some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer];
@@ -1009,48 +1186,15 @@ pq_peekbyte(void)
 int
 pq_getbyte_if_available(unsigned char *c)
 {
-	int			r;
+	int			r = 0;
 
 	Assert(PqCommReadingMsg);
 
-	if (PqRecvPointer < PqRecvLength)
+	if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0)
 	{
 		*c = PqRecvBuffer[PqRecvPointer++];
 		return 1;
 	}
-
-	/* Put the socket into non-blocking mode */
-	socket_set_nonblocking(true);
-
-	r = secure_read(MyProcPort, c, 1);
-	if (r < 0)
-	{
-		/*
-		 * Ok if no data available without blocking or interrupted (though
-		 * EINTR really shouldn't happen with a non-blocking socket). Report
-		 * other errors.
-		 */
-		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
-			r = 0;
-		else
-		{
-			/*
-			 * Careful: an ereport() that tries to write to the client would
-			 * cause recursion to here, leading to stack overflow and core
-			 * dump!  This message must go *only* to the postmaster log.
-			 */
-			ereport(COMMERROR,
-					(errcode_for_socket_access(),
-					 errmsg("could not receive data from client: %m")));
-			r = EOF;
-		}
-	}
-	else if (r == 0)
-	{
-		/* EOF detected */
-		r = EOF;
-	}
-
 	return r;
 }
 
@@ -1071,7 +1215,8 @@ pq_getbytes(char *s, size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+											 * some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1105,7 +1250,8 @@ pq_discardbytes(size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+											 * some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1129,7 +1275,6 @@ pq_buffer_has_data(void)
 	return (PqRecvPointer < PqRecvLength);
 }
 
-
 /* --------------------------------
  *		pq_startmsgread - begin reading a message from the client.
  *
@@ -1333,13 +1478,24 @@ internal_flush(void)
 	char	   *bufptr = PqSendBuffer + PqSendStart;
 	char	   *bufend = PqSendBuffer + PqSendPointer;
 
-	while (bufptr < bufend)
+	while (bufptr < bufend || zpq_buffered_tx(PqStream) != 0)
+
+		/*
+		 * has more data to flush or unsent data in internal compression
+		 * buffer
+		 */
 	{
 		int			r;
+		size_t		processed = 0;
+		size_t		available = bufend - bufptr;
 
-		r = secure_write(MyProcPort, bufptr, bufend - bufptr);
+		r = PqStream
+			? zpq_write(PqStream, bufptr, available, &processed)
+			: secure_write(MyProcPort, bufptr, available);
+		bufptr += processed;
+		PqSendStart += processed;
 
-		if (r <= 0)
+		if (r < 0 || (r == 0 && available))
 		{
 			if (errno == EINTR)
 				continue;		/* Ok if we were interrupted */
@@ -1382,12 +1538,12 @@ internal_flush(void)
 			InterruptPending = 1;
 			return EOF;
 		}
+		pgstat_report_network_traffic(0, r, 0, 0);
 
 		last_reported_send_errno = 0;	/* reset after any successful send */
 		bufptr += r;
 		PqSendStart += r;
 	}
-
 	PqSendStart = PqSendPointer = 0;
 	return 0;
 }
@@ -1404,7 +1560,7 @@ socket_flush_if_writable(void)
 	int			res;
 
 	/* Quick exit if nothing to do */
-	if (PqSendPointer == PqSendStart)
+	if ((PqSendPointer == PqSendStart) && (zpq_buffered_tx(PqStream) == 0))
 		return 0;
 
 	/* No-op if reentrant call */
@@ -1427,7 +1583,7 @@ socket_flush_if_writable(void)
 static bool
 socket_is_send_pending(void)
 {
-	return (PqSendStart < PqSendPointer);
+	return (PqSendStart < PqSendPointer || (zpq_buffered_tx(PqStream) != 0));
 }
 
 /* --------------------------------
@@ -2052,3 +2208,16 @@ retry:
 
 	return true;
 }
+
+PG_FUNCTION_INFO_V1(pg_compression_algorithm);
+
+Datum
+pg_compression_algorithm(PG_FUNCTION_ARGS)
+{
+	char const *algorithm_name = PqStream ? zpq_compress_algorithm_name(PqStream) : NULL;
+
+	if (algorithm_name)
+		PG_RETURN_TEXT_P(cstring_to_text(algorithm_name));
+	else
+		PG_RETURN_NULL();
+}
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index 1da5752047f..7d747755368 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2230,6 +2230,8 @@ retry1:
 				port->database_name = pstrdup(valptr);
 			else if (strcmp(nameptr, "user") == 0)
 				port->user_name = pstrdup(valptr);
+			else if (strcmp(nameptr, "_pq_.compression") == 0)
+				port->compression_algorithms = pstrdup(valptr);
 			else if (strcmp(nameptr, "options") == 0)
 				port->cmdline_options = pstrdup(valptr);
 			else if (strcmp(nameptr, "replication") == 0)
@@ -4406,6 +4408,14 @@ BackendInitialize(Port *port)
 	if (status != STATUS_OK)
 		proc_exit(0);
 
+	if (pq_configure(port))
+	{
+		ereport(COMMERROR,
+				(errcode_for_socket_access(),
+				 errmsg("failed to send compression message: %m")));
+		proc_exit(0);
+	}
+
 	/*
 	 * Now that we have the user and database name, we can set the process
 	 * title for ps.  It's good to do this as early as possible in startup.
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 7f697b0f292..6584105ed77 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -201,11 +201,14 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
 		else
 			io_flag = WL_SOCKET_WRITEABLE;
 
-		rc = WaitLatchOrSocket(MyLatch,
-							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
-							   PQsocket(conn->streamConn),
-							   0,
-							   WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
+		if (status == PGRES_POLLING_READING && PQreadPending(conn->streamConn))
+			rc = WL_SOCKET_READABLE;
+		else
+			rc = WaitLatchOrSocket(MyLatch,
+								   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
+								   PQsocket(conn->streamConn),
+								   0,
+								   WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
 
 		/* Interrupted? */
 		if (rc & WL_LATCH_SET)
diff --git a/src/backend/utils/activity/backend_status.c b/src/backend/utils/activity/backend_status.c
index 1146a6c33cd..7cdd4acbb13 100644
--- a/src/backend/utils/activity/backend_status.c
+++ b/src/backend/utils/activity/backend_status.c
@@ -338,6 +338,9 @@ pgstat_bestart(void)
 	lbeentry.st_xact_start_timestamp = 0;
 	lbeentry.st_databaseid = MyDatabaseId;
 
+	lbeentry.st_tx_raw_bytes = lbeentry.st_rx_raw_bytes =
+		lbeentry.st_tx_compressed_bytes = lbeentry.st_rx_compressed_bytes = 0;
+
 	/* We have userid for client-backends, wal-sender and bgworker processes */
 	if (lbeentry.st_backendType == B_BACKEND
 		|| lbeentry.st_backendType == B_WAL_SENDER
@@ -716,6 +719,32 @@ pgstat_report_xact_timestamp(TimestampTz tstamp)
 	PGSTAT_END_WRITE_ACTIVITY(beentry);
 }
 
+/*
+ * Report network raw or compressed tx/rx traffic as the specified values.
+ */
+void
+pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes)
+{
+	volatile PgBackendStatus *beentry = MyBEEntry;
+
+	if (!pgstat_track_activities || !beentry)
+		return;
+
+	/*
+	 * Update my status entry, following the protocol of bumping
+	 * st_changecount before and after.  We use a volatile pointer here to
+	 * ensure the compiler doesn't try to get cute.
+	 */
+	PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
+
+	beentry->st_rx_raw_bytes += rx_raw_bytes;
+	beentry->st_tx_raw_bytes += tx_raw_bytes;
+	beentry->st_rx_compressed_bytes += rx_compressed_bytes;
+	beentry->st_tx_compressed_bytes += tx_compressed_bytes;
+
+	PGSTAT_END_WRITE_ACTIVITY(beentry);
+}
+
 /* ----------
  * pgstat_read_current_status() -
  *
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index ae3365d9171..9ce5529f23d 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -559,7 +559,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_activity(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_ACTIVITY_COLS	30
+#define PG_STAT_GET_ACTIVITY_COLS	34
 	int			num_backends = pgstat_fetch_stat_numbackends();
 	int			curr_backend;
 	int			pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0);
@@ -858,6 +858,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 				nulls[29] = true;
 			else
 				values[29] = UInt64GetDatum(beentry->st_query_id);
+			values[30] = beentry->st_rx_raw_bytes;
+			values[31] = beentry->st_tx_raw_bytes;
+			values[32] = beentry->st_rx_compressed_bytes;
+			values[33] = beentry->st_tx_compressed_bytes;
 		}
 		else
 		{
@@ -886,6 +890,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 			nulls[27] = true;
 			nulls[28] = true;
 			nulls[29] = true;
+			nulls[30] = true;
+			nulls[31] = true;
+			nulls[32] = true;
+			nulls[33] = true;
 		}
 
 		tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
@@ -1082,6 +1090,46 @@ pg_stat_get_backend_start(PG_FUNCTION_ARGS)
 	PG_RETURN_TIMESTAMPTZ(result);
 }
 
+Datum
+pg_stat_get_network_traffic(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_NETWORK_TRAFFIC_COLS	4
+	TupleDesc	tupdesc;
+	Datum		values[PG_STAT_NETWORK_TRAFFIC_COLS];
+	bool		nulls[PG_STAT_NETWORK_TRAFFIC_COLS];
+	int32		beid = PG_GETARG_INT32(0);
+	PgBackendStatus *beentry;
+
+	if ((beentry = pgstat_fetch_stat_beentry(beid)) == NULL)
+		PG_RETURN_NULL();
+	else if (!HAS_PGSTAT_PERMISSIONS(beentry->st_userid))
+		PG_RETURN_NULL();
+
+	/* Initialise values and NULL flags arrays */
+	MemSet(values, 0, sizeof(values));
+	MemSet(nulls, 0, sizeof(nulls));
+
+	/* Initialise attributes information in the tuple descriptor */
+	tupdesc = CreateTemplateTupleDesc(PG_STAT_NETWORK_TRAFFIC_COLS);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "rx_raw_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "tx_raw_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "rx_compressed_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "tx_compressed_bytes",
+					   INT8OID, -1, 0);
+	BlessTupleDesc(tupdesc);
+
+	/* Fill values and NULLs */
+	values[0] = beentry->st_rx_raw_bytes;
+	values[1] = beentry->st_tx_raw_bytes;
+	values[2] = beentry->st_rx_compressed_bytes;
+	values[3] = beentry->st_tx_compressed_bytes;
+
+	/* Returns the record as Datum */
+	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
+}
 
 Datum
 pg_stat_get_backend_client_addr(PG_FUNCTION_ARGS)
diff --git a/src/backend/utils/misc/guc_funcs.c b/src/backend/utils/misc/guc_funcs.c
index 108b3bd1290..5585f992702 100644
--- a/src/backend/utils/misc/guc_funcs.c
+++ b/src/backend/utils/misc/guc_funcs.c
@@ -1049,3 +1049,24 @@ show_all_file_settings(PG_FUNCTION_ARGS)
 
 	return (Datum) 0;
 }
+
+
+#include "common/zpq_stream.h"
+static bool check_libpq_compression(char **newval, void **extra, GucSource source);
+
+static bool
+check_libpq_compression(char **newval, void **extra, GucSource source)
+{
+	zpq_compressor *compressors;
+	size_t		n_compressors;
+
+	if (!zpq_parse_compression_setting(*newval, &compressors, &n_compressors))
+	{
+		GUC_check_errdetail("Cannot parse the libpq_compression setting.");
+		return false;
+	}
+
+	free(compressors);
+	return true;
+}
+
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 836b49484a1..bef045e5be9 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -4447,6 +4447,16 @@ struct config_string ConfigureNamesString[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"libpq_compression", PGC_SIGHUP, CLIENT_CONN_OTHER,
+			gettext_noop("Sets the list of allowed libpq compression algorithms."),
+			NULL
+		},
+		&libpq_compress_algorithms,
+		"off",
+		check_libpq_compression, NULL, NULL
+	},
+
 	{
 		{"backtrace_functions", PGC_SUSET, DEVELOPER_OPTIONS,
 			gettext_noop("Log backtrace for errors in these functions."),
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 36905a89681..6b7c2366cf9 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -7355,6 +7355,9 @@ threadRun(void *arg)
 		int			nsocks;		/* number of sockets to be waited for */
 		pg_time_usec_t min_usec;
 		pg_time_usec_t now = 0; /* set this only if needed */
+		bool		buffered_rx = false;	/* true if some of the clients has
+											 * data left in SSL/ZPQ read
+											 * buffers */
 
 		/*
 		 * identify which client sockets should be checked for input, and
@@ -7390,6 +7393,9 @@ threadRun(void *arg)
 				 */
 				int			sock = PQsocket(st->con);
 
+				/* check if conn has buffered SSL / ZPQ read data */
+				buffered_rx = buffered_rx || PQreadPending(st->con);
+
 				if (sock < 0)
 				{
 					pg_log_error("invalid socket: %s", PQerrorMessage(st->con));
@@ -7433,7 +7439,7 @@ threadRun(void *arg)
 			{
 				if (nsocks > 0)
 				{
-					rc = wait_on_socket_set(sockets, min_usec);
+					rc = buffered_rx ? 1 : wait_on_socket_set(sockets, min_usec);
 				}
 				else			/* nothing active, simple sleep */
 				{
@@ -7442,7 +7448,7 @@ threadRun(void *arg)
 			}
 			else				/* no explicit delay, wait without timeout */
 			{
-				rc = wait_on_socket_set(sockets, 0);
+				rc = buffered_rx ? 1 : wait_on_socket_set(sockets, 0);
 			}
 
 			if (rc < 0)
@@ -7482,8 +7488,11 @@ threadRun(void *arg)
 					pg_log_error("invalid socket: %s", PQerrorMessage(st->con));
 					goto done;
 				}
-
-				if (!socket_has_input(sockets, sock, nsocks++))
+				if (PQreadPending(st->con))
+				{
+					nsocks++;
+				}
+				else if (!socket_has_input(sockets, sock, nsocks++))
 					continue;
 			}
 			else if (st->state == CSTATE_FINISHED ||
diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c
index 7672ed9e9d5..64c0f4cce88 100644
--- a/src/bin/psql/command.c
+++ b/src/bin/psql/command.c
@@ -172,6 +172,7 @@ static void print_with_linenumbers(FILE *output, char *lines,
 								   const char *header_keyword);
 static void minimal_error_message(PGresult *res);
 
+static void printCompressionInfo(void);
 static void printSSLInfo(void);
 static void printGSSInfo(void);
 static bool printPsetInfo(const char *param, printQueryOpt *popt);
@@ -676,6 +677,7 @@ exec_command_conninfo(PsqlScanState scan_state, bool active_branch)
 					printf(_("You are connected to database \"%s\" as user \"%s\" on host \"%s\" at port \"%s\".\n"),
 						   db, PQuser(pset.db), host, PQport(pset.db));
 			}
+			printCompressionInfo();
 			printSSLInfo();
 			printGSSInfo();
 		}
@@ -3693,6 +3695,21 @@ connection_warnings(bool in_startup)
 	}
 }
 
+/*
+ * printCompressionInfo
+ *
+ * Print information about used compressor/decompressor
+ */
+static void
+printCompressionInfo(void)
+{
+	char	   *algorithms = PQcompression (pset.db);
+
+	if (algorithms != NULL)
+	{
+		printf(_("Compression: %s\n"), algorithms);
+	}
+}
 
 /*
  * printSSLInfo
diff --git a/src/common/Makefile b/src/common/Makefile
index e9af7346c9c..d200f5feb58 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -79,7 +79,9 @@ OBJS_COMMON = \
 	unicode_norm.o \
 	username.o \
 	wait_error.o \
-	wchar.o
+	wchar.o \
+	z_stream.o \
+	zpq_stream.o
 
 ifeq ($(with_ssl),openssl)
 OBJS_COMMON += \
diff --git a/src/common/z_stream.c b/src/common/z_stream.c
new file mode 100644
index 00000000000..b75f194a1cf
--- /dev/null
+++ b/src/common/z_stream.c
@@ -0,0 +1,663 @@
+#include "c.h"
+#include "pg_config.h"
+#include "common/z_stream.h"
+
+/*
+ * Functions implementing streaming compression algorithm
+ */
+typedef struct
+{
+	/*
+	 * Name of compression algorithm.
+	 */
+	char const *(*name) (void);
+
+	/*
+	 * Create new compression stream. level: compression level
+	 */
+	void	   *(*create_compressor) (int level);
+
+	/*
+	 * Create new decompression stream.
+	 */
+	void	   *(*create_decompressor) ();
+
+	/*
+	 * Decompress up to "src_size" compressed bytes from *src and write up to
+	 * "dst_size" raw (decompressed) bytes to *dst. Number of decompressed
+	 * bytes written to *dst is stored in *dst_processed. Number of compressed
+	 * bytes read from *src is stored in *src_processed.
+	 *
+	 * Return codes: ZS_OK if no errors were encountered during decompression
+	 * attempt. This return code does not guarantee that *src_processed > 0 or
+	 * *dst_processed > 0.
+	 *
+	 * ZS_DATA_PENDING means that there might be some data left within
+	 * decompressor internal buffers.
+	 *
+	 * ZS_STREAM_END if encountered end of compressed data stream.
+	 *
+	 * ZS_DECOMPRESS_ERROR if encountered an error during decompression
+	 * attempt.
+	 */
+	ssize_t		(*decompress) (void *ds, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed);
+
+	/*
+	 * Compress up to "src_size" raw (non-compressed) bytes from *src and
+	 * write up to "dst_size" compressed bytes to *dst. Number of compressed
+	 * bytes written to *dst is stored in *dst_processed. Number of
+	 * non-compressed bytes read from *src is stored in *src_processed.
+	 *
+	 * Return codes: ZS_OK if no errors were encountered during compression
+	 * attempt. This return code does not guarantee that *src_processed > 0 or
+	 * *dst_processed > 0.
+	 *
+	 * ZS_DATA_PENDING means that there might be some data left within
+	 * compressor internal buffers.
+	 *
+	 * ZS_COMPRESS_ERROR if encountered an error during compression attempt.
+	 */
+	ssize_t		(*compress) (void *cs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed);
+
+	/*
+	 * Free compression stream created by create_compressor function.
+	 */
+	void		(*free_compressor) (void *cs);
+
+	/*
+	 * Free decompression stream created by create_decompressor function.
+	 */
+	void		(*free_decompressor) (void *ds);
+
+	/*
+	 * Get compressor error message.
+	 */
+	char const *(*compress_error) (void *cs);
+
+	/*
+	 * Get decompressor error message.
+	 */
+	char const *(*decompress_error) (void *ds);
+
+	ssize_t		(*end_compression) (void *cs, void *dst, size_t dst_size, size_t *dst_processed);
+}			ZAlgorithm;
+
+struct ZStream
+{
+	ZAlgorithm const *algorithm;
+	void	   *stream;
+	bool		not_flushed;
+};
+
+#if HAVE_LIBZ
+
+#include <stdlib.h>
+#include <zlib.h>
+
+
+static void *
+zlib_create_compressor(int level)
+{
+	int			rc;
+	z_stream   *c_stream = (z_stream *) malloc(sizeof(z_stream));
+
+	memset(c_stream, 0, sizeof(*c_stream));
+	rc = deflateInit(c_stream, level);
+	if (rc != Z_OK)
+	{
+		free(c_stream);
+		return NULL;
+	}
+	return c_stream;
+}
+
+static void *
+zlib_create_decompressor()
+{
+	int			rc;
+	z_stream   *d_stream = (z_stream *) malloc(sizeof(z_stream));
+
+	memset(d_stream, 0, sizeof(*d_stream));
+	rc = inflateInit(d_stream);
+	if (rc != Z_OK)
+	{
+		free(d_stream);
+		return NULL;
+	}
+	return d_stream;
+}
+
+static ssize_t
+zlib_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	z_stream   *ds = (z_stream *) d_stream;
+	int			rc;
+
+	ds->next_in = (Bytef *) src;
+	ds->avail_in = src_size;
+	ds->next_out = (Bytef *) dst;
+	ds->avail_out = dst_size;
+
+	rc = inflate(ds, Z_SYNC_FLUSH);
+	*src_processed = src_size - ds->avail_in;
+	*dst_processed = dst_size - ds->avail_out;
+
+	if (rc == Z_STREAM_END)
+	{
+		return ZS_STREAM_END;
+	}
+	if (rc != Z_OK && rc != Z_BUF_ERROR)
+	{
+		return ZS_DECOMPRESS_ERROR;
+	}
+
+	return ZS_OK;
+}
+
+static ssize_t
+zlib_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	z_stream   *cs = (z_stream *) c_stream;
+	int			rc PG_USED_FOR_ASSERTS_ONLY;
+	unsigned	deflate_pending = 0;
+
+
+	cs->next_out = (Bytef *) dst;
+	cs->avail_out = dst_size;
+	cs->next_in = (Bytef *) src;
+	cs->avail_in = src_size;
+
+	rc = deflate(cs, Z_SYNC_FLUSH);
+	Assert(rc == Z_OK);
+	*dst_processed = dst_size - cs->avail_out;
+	*src_processed = src_size - cs->avail_in;
+
+	deflatePending(cs, &deflate_pending, Z_NULL);	/* check if any data left
+													 * in deflate buffer */
+	if (deflate_pending > 0)
+	{
+		return ZS_DATA_PENDING;
+	}
+	return ZS_OK;
+}
+
+
+static ssize_t
+zlib_end(void *c_stream, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	z_stream   *cs = (z_stream *) c_stream;
+	int			rc;
+
+	cs->next_out = (Bytef *) dst;
+	cs->avail_out = dst_size;
+	cs->next_in = NULL;
+	cs->avail_in = 0;
+
+	rc = deflate(cs, Z_STREAM_END);
+	Assert(rc == Z_OK || rc == Z_STREAM_END);
+	*dst_processed = dst_size - cs->avail_out;
+	if (rc == Z_STREAM_END)
+	{
+		return ZS_OK;
+	}
+
+	return ZS_DATA_PENDING;
+}
+
+static void
+zlib_free_compressor(void *c_stream)
+{
+	z_stream   *cs = (z_stream *) c_stream;
+
+	if (cs != NULL)
+	{
+		deflateEnd(cs);
+		free(cs);
+	}
+}
+
+static void
+zlib_free_decompressor(void *d_stream)
+{
+	z_stream   *ds = (z_stream *) d_stream;
+
+	if (ds != NULL)
+	{
+		inflateEnd(ds);
+		free(ds);
+	}
+}
+
+static char const *
+zlib_error(void *stream)
+{
+	z_stream   *zs = (z_stream *) stream;
+
+	return zs->msg;
+}
+
+static char const *
+zlib_name(void)
+{
+	return "zlib";
+}
+
+#endif
+
+#if USE_LZ4
+#include <lz4.h>
+
+#define MESSAGE_MAX_BYTES 819200
+#define RING_BUFFER_BYTES (1024 * 64 + MESSAGE_MAX_BYTES)
+
+typedef struct ZS_LZ4_CStream
+{
+	LZ4_stream_t *stream;
+	int			level;
+	size_t		buf_pos;
+	char		buf[RING_BUFFER_BYTES];
+}			ZS_LZ4_CStream;
+
+typedef struct ZS_LZ4_DStream
+{
+	LZ4_streamDecode_t *stream;
+	size_t		buf_pos;
+	char		buf[RING_BUFFER_BYTES];
+}			ZS_LZ4_DStream;
+
+static void *
+lz4_create_compressor(int level)
+{
+	ZS_LZ4_CStream *c_stream = (ZS_LZ4_CStream *) malloc(sizeof(ZS_LZ4_CStream));
+
+	if (c_stream == NULL)
+	{
+		return NULL;
+	}
+	c_stream->stream = LZ4_createStream();
+	c_stream->level = level;
+	c_stream->buf_pos = 0;
+	if (c_stream->stream == NULL)
+	{
+		free(c_stream);
+		return NULL;
+	}
+	return c_stream;
+}
+
+static void *
+lz4_create_decompressor()
+{
+	ZS_LZ4_DStream *d_stream = (ZS_LZ4_DStream *) malloc(sizeof(ZS_LZ4_DStream));
+
+	if (d_stream == NULL)
+	{
+		return NULL;
+	}
+
+	d_stream->stream = LZ4_createStreamDecode();
+	d_stream->buf_pos = 0;
+	if (d_stream->stream == NULL)
+	{
+		free(d_stream);
+		return NULL;
+	}
+
+	return d_stream;
+}
+
+static ssize_t
+lz4_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	ZS_LZ4_DStream *ds = (ZS_LZ4_DStream *) d_stream;
+
+	char	   *const decPtr = &ds->buf[ds->buf_pos];
+
+	const int	decBytes = LZ4_decompress_safe_continue(
+														ds->stream, src, decPtr, (int) src_size, (int) dst_size);
+	Assert(decBytes > 0);
+
+	*dst_processed = decBytes;
+	*src_processed = src_size;
+
+	memcpy(dst, decPtr, decBytes);	/* write msg length */
+
+	ds->buf_pos += decBytes;
+	if (ds->buf_pos >= RING_BUFFER_BYTES - MESSAGE_MAX_BYTES)
+	{
+		ds->buf_pos = 0;
+	}
+
+	return ZS_OK;
+}
+
+static ssize_t
+lz4_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	ZS_LZ4_CStream *cs = (ZS_LZ4_CStream *) c_stream;
+	int			cmpBytes;
+
+	src_size = Min(MESSAGE_MAX_BYTES, src_size);
+
+	memcpy((char *) (cs->buf) + cs->buf_pos, src, src_size);	/* write msg length */
+
+	Assert(dst_size >= LZ4_compressBound(src_size));
+
+	cmpBytes = LZ4_compress_fast_continue(
+										  cs->stream, (char *) (cs->buf) + cs->buf_pos, dst, (int) src_size, (int) dst_size, cs->level);
+
+	Assert(cmpBytes > 0);
+	Assert(cmpBytes <= MESSAGE_MAX_BYTES);
+
+	*dst_processed = cmpBytes;
+	*src_processed = src_size;
+
+	cs->buf_pos += src_size;
+	if (cs->buf_pos >= RING_BUFFER_BYTES - MESSAGE_MAX_BYTES)
+	{
+		cs->buf_pos = 0;
+	}
+	return ZS_OK;
+}
+
+
+static ssize_t
+lz4_end(void *c_stream, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	*dst_processed = 0;
+	return ZS_OK;
+}
+
+static void
+lz4_free_compressor(void *c_stream)
+{
+	ZS_LZ4_CStream *cs = (ZS_LZ4_CStream *) c_stream;
+
+	if (cs != NULL)
+	{
+		if (cs->stream != NULL)
+		{
+			LZ4_freeStream(cs->stream);
+		}
+		free(cs);
+	}
+}
+
+static void
+lz4_free_decompressor(void *d_stream)
+{
+	ZS_LZ4_DStream *ds = (ZS_LZ4_DStream *) d_stream;
+
+	if (ds != NULL)
+	{
+		if (ds->stream != NULL)
+		{
+			LZ4_freeStreamDecode(ds->stream);
+		}
+		free(ds);
+	}
+}
+
+static char const *
+lz4_error(void *stream)
+{
+	/* lz4 doesn't have any explicit API to get the error names */
+	return "NO_MSG";
+}
+
+static char const *
+lz4_name(void)
+{
+	return "lz4";
+}
+
+#endif
+
+static char const *
+no_compression_name(void)
+{
+	return NULL;
+}
+
+/*
+ * Array with all supported compression algorithms.
+ */
+static ZAlgorithm const zs_algorithms[] =
+{
+#if HAVE_LIBZ
+	{zlib_name, zlib_create_compressor, zlib_create_decompressor, zlib_decompress, zlib_compress, zlib_free_compressor, zlib_free_decompressor, zlib_error, zlib_error, zlib_end},
+#endif
+#if USE_LZ4
+	{lz4_name, lz4_create_compressor, lz4_create_decompressor, lz4_decompress, lz4_compress, lz4_free_compressor, lz4_free_decompressor, lz4_error, lz4_error, lz4_end},
+#endif
+	{no_compression_name}
+};
+
+inline bool
+zs_is_valid_impl_id(unsigned int id)
+{
+	return id >= 0 && id < lengthof(zs_algorithms);
+}
+
+static ssize_t
+zs_init_compressor(ZStream * zs, unsigned int c_alg_impl, int c_level)
+{
+	if (!zs_is_valid_impl_id(c_alg_impl))
+	{
+		return -1;
+	}
+	zs->algorithm = &zs_algorithms[c_alg_impl];
+	zs->stream = zs->algorithm->create_compressor(c_level);
+	if (zs->stream == NULL)
+	{
+		return -1;
+	}
+	return 0;
+}
+
+static ssize_t
+zs_init_decompressor(ZStream * zs, unsigned int d_alg_impl)
+{
+	if (!zs_is_valid_impl_id(d_alg_impl))
+	{
+		return -1;
+	}
+	zs->algorithm = &zs_algorithms[d_alg_impl];
+	zs->stream = zs->algorithm->create_decompressor();
+	if (zs->stream == NULL)
+	{
+		return -1;
+	}
+	return 0;
+}
+
+/*
+ * Index of used compression algorithm in zs_algorithms array.
+ */
+ZStream *
+zs_create_compressor(unsigned int c_alg_impl, int c_level)
+{
+	ZStream    *zs = (ZStream *) malloc(sizeof(ZStream));
+
+	zs->not_flushed = false;
+
+	if (zs_init_compressor(zs, c_alg_impl, c_level))
+	{
+		free(zs);
+		return NULL;
+	}
+
+	return zs;
+}
+
+ZStream *
+zs_create_decompressor(unsigned int d_alg_impl)
+{
+	ZStream    *zs = (ZStream *) malloc(sizeof(ZStream));
+
+	zs->not_flushed = false;
+
+	if (zs_init_decompressor(zs, d_alg_impl))
+	{
+		free(zs);
+		return NULL;
+	}
+
+	return zs;
+}
+
+ssize_t
+zs_read(ZStream * zs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	ssize_t		rc;
+
+	*src_processed = 0;
+	*dst_processed = 0;
+
+	rc = zs->algorithm->decompress(zs->stream,
+								   src, src_size, src_processed,
+								   dst, dst_size, dst_processed);
+
+	zs->not_flushed = false;
+	if (rc == ZS_DATA_PENDING)
+	{
+		zs->not_flushed = true;
+		return ZS_OK;
+	}
+
+	if (rc == ZS_OK || rc == ZS_INCOMPLETE_SRC || rc == ZS_STREAM_END)
+	{
+		return rc;
+	}
+
+	return ZS_DECOMPRESS_ERROR;
+}
+
+ssize_t
+zs_write(ZStream * zs, void const *buf, size_t size, size_t *processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	ssize_t		rc;
+
+	*processed = 0;
+	*dst_processed = 0;
+
+	rc = zs->algorithm->compress(zs->stream,
+								 buf, size, processed,
+								 dst, dst_size, dst_processed);
+
+	zs->not_flushed = false;
+	if (rc == ZS_DATA_PENDING)
+	{
+		zs->not_flushed = true;
+		return ZS_OK;
+	}
+	if (rc != ZS_OK)
+	{
+		return ZS_COMPRESS_ERROR;
+	}
+
+	return rc;
+}
+
+void
+zs_compressor_free(ZStream * zs)
+{
+	if (zs == NULL)
+	{
+		return;
+	}
+
+	if (zs->stream)
+	{
+		zs->algorithm->free_compressor(zs->stream);
+	}
+
+	free(zs);
+}
+
+void
+zs_decompressor_free(ZStream * zs)
+{
+	if (zs == NULL)
+	{
+		return;
+	}
+
+	if (zs->stream)
+	{
+		zs->algorithm->free_decompressor(zs->stream);
+	}
+
+	free(zs);
+}
+
+ssize_t
+zs_end_compression(ZStream * zs, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	ssize_t		rc;
+
+	*dst_processed = 0;
+
+	rc = zs->algorithm->end_compression(zs->stream, dst, dst_size, dst_processed);
+
+	zs->not_flushed = false;
+	if (rc == ZS_DATA_PENDING)
+	{
+		zs->not_flushed = true;
+		return ZS_OK;
+	}
+	if (rc != ZS_OK)
+	{
+		return ZS_COMPRESS_ERROR;
+	}
+
+	return rc;
+}
+
+char const *
+zs_compress_error(ZStream * zs)
+{
+	return zs->algorithm->compress_error(zs->stream);
+}
+
+char const *
+zs_decompress_error(ZStream * zs)
+{
+	return zs->algorithm->decompress_error(zs->stream);
+}
+
+bool
+zs_buffered(ZStream * zs)
+{
+	return zs ? zs->not_flushed : 0;
+}
+
+
+/*
+ * Get list of the supported algorithms.
+ */
+char	  **
+zs_get_supported_algorithms(void)
+{
+	size_t		n_algorithms = lengthof(zs_algorithms);
+	char	  **algorithm_names = malloc(n_algorithms * sizeof(char *));
+
+	for (size_t i = 0; i < n_algorithms; i++)
+	{
+		algorithm_names[i] = (char *) zs_algorithms[i].name();
+	}
+
+	return algorithm_names;
+}
+
+char const *
+zs_compress_algorithm_name(ZStream * zs)
+{
+	return zs ? zs->algorithm->name() : NULL;
+}
+
+char const *
+zs_decompress_algorithm_name(ZStream * zs)
+{
+	return zs ? zs->algorithm->name() : NULL;
+}
diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c
new file mode 100644
index 00000000000..a712200e453
--- /dev/null
+++ b/src/common/zpq_stream.c
@@ -0,0 +1,1022 @@
+#ifndef FRONTEND
+#include "postgres.h"
+#else
+#include "postgres_fe.h"
+#endif
+#include <unistd.h>
+#include <math.h>
+
+#include "common/zpq_stream.h"
+#include "pg_config.h"
+#include "port/pg_bswap.h"
+
+/* log warnings on backend */
+#ifndef FRONTEND
+#define pg_log_warning(...) elog(WARNING, __VA_ARGS__)
+#else
+#define pg_log_warning(...) (void)0
+#endif
+
+/* ZpqBuffer size, in bytes */
+#define ZPQ_BUFFER_SIZE 8192000
+/* CompressedData msg_type */
+#define ZPQ_COMPRESSED_MSG_TYPE 'm'
+/* SetCompressionMethod msg_type */
+#define ZPQ_SET_COMPRESSION_MSG_TYPE 'k'
+
+#define ZPQ_COMPRESS_THRESHOLD 60
+
+typedef struct ZpqBuffer ZpqBuffer;
+
+
+/* ZpqBuffer used as RX/TX buffer in ZpqStream */
+struct ZpqBuffer
+{
+	char		buf[ZPQ_BUFFER_SIZE];
+	size_t		size;			/* current size of buf */
+	size_t		pos;			/* current position in buf, in range [0, size] */
+};
+
+static inline void
+zpq_buf_init(ZpqBuffer * zb)
+{
+	zb->size = 0;
+	zb->pos = 0;
+}
+
+static inline size_t
+zpq_buf_left(ZpqBuffer * zb)
+{
+	Assert(zb->buf);
+	return ZPQ_BUFFER_SIZE - zb->size;
+}
+
+static inline size_t
+zpq_buf_unread(ZpqBuffer * zb)
+{
+	return zb->size - zb->pos;
+}
+
+static inline char *
+zpq_buf_size(ZpqBuffer * zb)
+{
+	return (char *) (zb->buf) + zb->size;
+}
+
+static inline char *
+zpq_buf_pos(ZpqBuffer * zb)
+{
+	return (char *) (zb->buf) + zb->pos;
+}
+
+static inline void
+zpq_buf_size_advance(ZpqBuffer * zb, size_t value)
+{
+	zb->size += value;
+}
+
+static inline void
+zpq_buf_pos_advance(ZpqBuffer * zb, size_t value)
+{
+	zb->pos += value;
+}
+
+static inline void
+zpq_buf_reuse(ZpqBuffer * zb)
+{
+	size_t		unread = zpq_buf_unread(zb);
+
+	if (unread > 5)				/* can read message header, don't do anything */
+		return;
+	if (unread == 0)
+	{
+		zb->size = 0;
+		zb->pos = 0;
+		return;
+	}
+	memmove(zb->buf, zb->buf + zb->pos, unread);
+	zb->size = unread;
+	zb->pos = 0;
+}
+
+struct ZpqStream
+{
+	ZStream    *c_stream;		/* underlying compression stream */
+	ZStream    *d_stream;		/* underlying decompression stream */
+
+	size_t		tx_total;		/* amount of bytes sent to tx_func */
+
+	size_t		tx_total_raw;	/* amount of bytes received by zpq_write */
+	size_t		rx_total;		/* amount of bytes read by rx_func */
+	size_t		rx_total_raw;	/* amount of bytes returned by zpq_write */
+	bool		is_compressing; /* current compression state */
+
+	bool		is_decompressing;	/* current decompression state */
+	size_t		rx_msg_bytes_left;	/* number of bytes left to process without
+									 * changing the decompression state */
+	size_t		tx_msg_bytes_left;	/* number of bytes left to process without
+									 * changing the compression state */
+
+	ZpqBuffer	rx_in;			/* buffer for unprocessed data read by rx_func */
+	ZpqBuffer	tx_in;			/* buffer for unprocessed data consumed by
+								 * zpq_write */
+	ZpqBuffer	tx_out;			/* buffer for processed data waiting for send
+								 * via tx_func */
+
+	zpq_rx_func rx_func;
+	zpq_tx_func tx_func;
+	void	   *arg;
+
+	zpq_compressor *compressors;	/* compressors array holds the available
+									 * compressors to use for
+									 * compression/decompression */
+	size_t		n_compressors;	/* size of the compressors array */
+	int			compress_alg_idx;	/* index of the active compression
+									 * algorithm */
+	int			decompress_alg_idx; /* index of the active decompression
+									 * algorithm */
+	int			compressor_by_msg_type[256];	/* map to choose a compressor
+												 * by the protocol message
+												 * type */
+
+	bool		reading_set_compression;	/* utility marker indicating
+											 * partial SetCompressionMethod
+											 * read */
+};
+
+/*
+ * Message compression map defines the logic for choosing the compressor
+ * based on the protocol message type. Currently, it is a basic prototype to demonstrate
+ * the capabilities of the on-the-fly compression switch.
+ */
+static inline void
+zpq_build_msg_compression_map(ZpqStream * zpq)
+{
+	int			i;
+
+	for (i = 0; i < 256; i++)
+	{
+		zpq->compressor_by_msg_type[i] = -1;
+	}
+
+	for (i = 0; i < zpq->n_compressors; i++)
+	{
+		/* compress CopyData, DataRow and Query messages */
+		if (zpq->compressor_by_msg_type['d'] == -1)
+		{
+			zpq->compressor_by_msg_type['d'] = i;
+		}
+		if (zpq->compressor_by_msg_type['D'] == -1)
+		{
+			zpq->compressor_by_msg_type['D'] = i;
+		}
+		if (zpq->compressor_by_msg_type['Q'] == -1)
+		{
+			zpq->compressor_by_msg_type['Q'] = i;
+		}
+	}
+}
+
+/*
+ * Choose the index of compressor to use for the message of msg_type with msg_len.
+ * Return values:
+ * - the non-negative index of zpq->compressors array
+ * - -1, if message should not be compressed
+ */
+static inline int
+zpq_choose_compressor(ZpqStream * zpq, char msg_type, uint32 msg_len)
+{
+	if (msg_len >= ZPQ_COMPRESS_THRESHOLD)
+	{
+		return zpq->compressor_by_msg_type[(unsigned char) msg_type];
+	}
+	return -1;
+}
+
+/*
+ * Check if should compress message of msg_type with msg_len.
+ * Return true if should, false if should not.
+ */
+static inline bool
+zpq_should_compress(ZpqStream * zpq, char msg_type, uint32 msg_len)
+{
+	return zpq_choose_compressor(zpq, msg_type, msg_len) != -1;
+}
+
+/*
+ * Check if message is a CompressedData.
+ * Return true if it is, otherwise false.
+ * */
+static inline bool
+zpq_is_compressed_msg(char msg_type)
+{
+	return msg_type == ZPQ_COMPRESSED_MSG_TYPE;
+}
+
+/*
+ * Check if message is a SetCompressionMethod.
+ * Return true if it is, otherwise false.
+ * */
+static inline bool
+zpq_is_set_compression_msg(char msg_type)
+{
+	return msg_type == ZPQ_SET_COMPRESSION_MSG_TYPE;
+}
+
+ZpqStream *
+zpq_create(zpq_compressor * compressors, size_t n_compressors, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size)
+{
+	ZpqStream  *zpq;
+
+	/* zpqStream needs at least one compressor */
+	if (n_compressors == 0 || compressors == NULL)
+	{
+		return NULL;
+	}
+	zpq = (ZpqStream *) malloc(sizeof(ZpqStream));
+
+	zpq->compressors = compressors;
+	zpq->n_compressors = n_compressors;
+	zpq->compress_alg_idx = -1;
+	zpq->decompress_alg_idx = -1;
+
+	zpq->is_compressing = false;
+	zpq->is_decompressing = false;
+	zpq->rx_msg_bytes_left = 0;
+	zpq->tx_msg_bytes_left = 0;
+	zpq_buf_init(&zpq->tx_in);
+
+	zpq->tx_total = 0;
+	zpq->tx_total_raw = 0;
+	zpq->rx_total = 0;
+	zpq->rx_total_raw = 0;
+
+	zpq_buf_init(&zpq->rx_in);
+	zpq_buf_size_advance(&zpq->rx_in, rx_data_size);
+	Assert(rx_data_size < ZPQ_BUFFER_SIZE);
+	memcpy(zpq->rx_in.buf, rx_data, rx_data_size);
+
+	zpq_buf_init(&zpq->tx_out);
+
+	zpq->rx_func = rx_func;
+	zpq->tx_func = tx_func;
+	zpq->arg = arg;
+	zpq->reading_set_compression = false;
+	zpq->c_stream = NULL;
+	zpq->d_stream = NULL;
+
+	zpq_build_msg_compression_map(zpq);
+
+	return zpq;
+}
+
+/* Compress up to src_size bytes from *src into CompressedData and write it to the tx buffer.
+ * Returns ZS_OK on success, ZS_COMPRESS_ERROR if encountered a compression error. */
+static inline ssize_t
+zpq_write_compressed_message(ZpqStream * zpq, char const *src, size_t src_size, size_t *src_processed)
+{
+	size_t		compressed_len;
+	ssize_t		rc;
+	uint32		size;
+
+	/* check if have enough space */
+	if (zpq_buf_left(&zpq->tx_out) <= 5)
+	{
+		/* too little space for CompressedData, abort */
+		*src_processed = 0;
+		return ZS_OK;
+	}
+
+	compressed_len = 0;
+	rc = zs_write(zpq->c_stream, src, src_size, src_processed,
+				  zpq_buf_size(&zpq->tx_out) + 5, zpq_buf_left(&zpq->tx_out) - 5, &compressed_len);
+
+	if (compressed_len > 0)
+	{
+		/* write CompressedData type */
+		*zpq_buf_size(&zpq->tx_out) = ZPQ_COMPRESSED_MSG_TYPE;
+		size = pg_hton32(compressed_len + 4);
+
+		memcpy(zpq_buf_size(&zpq->tx_out) + 1, &size, sizeof(uint32));	/* write msg length */
+		compressed_len += 5;	/* append header length to compressed data
+								 * length */
+	}
+
+	zpq->tx_total_raw += *src_processed;
+	zpq->tx_total += compressed_len;
+	zpq_buf_size_advance(&zpq->tx_out, compressed_len);
+	return rc;
+}
+
+/* Copy the data directly from *src to the tx buffer */
+static void
+zpq_write_uncompressed(ZpqStream * zpq, char const *src, size_t src_size, size_t *src_processed)
+{
+	src_size = Min(zpq_buf_left(&zpq->tx_out), src_size);
+	memcpy(zpq_buf_size(&zpq->tx_out), src, src_size);
+
+	zpq->tx_total_raw += src_size;
+	zpq->tx_total += src_size;
+	zpq_buf_size_advance(&zpq->tx_out, src_size);
+	*src_processed = src_size;
+}
+
+static ssize_t
+zpq_write_set_compression_msg(ZpqStream * zpq, int new_compress_idx)
+{
+	uint32		len;
+	uint8		idx;
+
+	/*
+	 * check if have enough space: msg_type(1 byte) + msg_len(4 bytes) +
+	 * compress_alg_idx(1 byte)
+	 */
+	if (zpq_buf_left(&zpq->tx_out) < 6)
+	{
+		return -1;
+	}
+
+	/* write CompressedData type */
+	*zpq_buf_size(&zpq->tx_out) = ZPQ_SET_COMPRESSION_MSG_TYPE;
+	len = pg_hton32(5);
+	memcpy(zpq_buf_size(&zpq->tx_out) + 1, &len, sizeof(uint32));	/* write msg length */
+
+	/* currently we expect idx to be in range [0, 255] */
+	Assert(new_compress_idx >= 0 && new_compress_idx <= UINT8_MAX);
+	idx = (uint8) new_compress_idx;
+	memcpy(zpq_buf_size(&zpq->tx_out) + 5, &idx, sizeof(uint8));	/* write
+																	 * new_compress_idx */
+
+	zpq->tx_total_raw += 6;
+	zpq->tx_total += 6;
+	zpq_buf_size_advance(&zpq->tx_out, 6);
+	return 0;
+}
+
+/* Determine if should compress the next message and change the current compression state */
+static ssize_t
+zpq_toggle_compression(ZpqStream * zpq, char msg_type, uint32 msg_len)
+{
+	int			new_compress_idx = zpq_choose_compressor(zpq, msg_type, msg_len);
+	bool		should_compress = new_compress_idx != -1;
+
+	/*
+	 * negative new_compress_idx indicates that we should not compress this
+	 * message
+	 */
+	if (should_compress)
+	{
+		/*
+		 * if the new compressor does not match the current one, process the
+		 * switch
+		 */
+		if (zpq->compress_alg_idx != new_compress_idx)
+		{
+			if (zpq_write_set_compression_msg(zpq, new_compress_idx))
+			{
+				/*
+				 * come back later when we can write the entire
+				 * SetCompressionMethod message
+				 */
+				return 0;
+			}
+
+			zs_compressor_free(zpq->c_stream);
+			zpq->c_stream = zs_create_compressor(zpq->compressors[new_compress_idx].impl, zpq->compressors[new_compress_idx].level);
+			if (zpq->c_stream == NULL)
+			{
+				return ZPQ_FATAL_ERROR;
+			}
+			zpq->compress_alg_idx = new_compress_idx;
+		}
+	}
+
+	zpq->is_compressing = should_compress;
+	zpq->tx_msg_bytes_left = msg_len + 1;
+	return 0;
+}
+
+/*
+ * Internal write function. Reads the data from *src buffer,
+ * determines the postgres messages type and length.
+ * If message matches the compression criteria, it wraps the message into
+ * CompressedData. Otherwise, leaves the message unchanged.
+ * If *src data ends with incomplete message header, this function is not
+ * going to read this message header.
+ * Returns number of written raw bytes or error code.
+ * In the last case number of bytes written is stored in *processed.
+ */
+static ssize_t
+zpq_write_internal(ZpqStream * zpq, void const *src, size_t src_size, size_t *processed)
+{
+	size_t		src_pos = 0;
+	ssize_t		rc;
+
+	do
+	{
+		/*
+		 * try to read ahead the next message types and increase
+		 * tx_msg_bytes_left, if possible
+		 */
+		while (zpq->tx_msg_bytes_left > 0 && src_size - src_pos >= zpq->tx_msg_bytes_left + 5)
+		{
+			char		msg_type = *((char *) src + src_pos + zpq->tx_msg_bytes_left);
+			uint32		msg_len;
+
+			memcpy(&msg_len, (char *) src + src_pos + zpq->tx_msg_bytes_left + 1, 4);
+			msg_len = pg_ntoh32(msg_len);
+			if (zpq_should_compress(zpq, msg_type, msg_len) != zpq->is_compressing)
+			{
+				/*
+				 * cannot proceed further, encountered compression toggle
+				 * point
+				 */
+				break;
+			}
+			zpq->tx_msg_bytes_left += msg_len + 1;
+		}
+
+		/*
+		 * Write CompressedData if currently is compressing or have some
+		 * buffered data left in underlying compression stream
+		 */
+		if (zs_buffered(zpq->c_stream) || (zpq->is_compressing && zpq->tx_msg_bytes_left > 0))
+		{
+			size_t		buf_processed = 0;
+			size_t		to_compress = Min(zpq->tx_msg_bytes_left, src_size - src_pos);
+
+			rc = zpq_write_compressed_message(zpq, (char *) src + src_pos, to_compress, &buf_processed);
+			src_pos += buf_processed;
+			zpq->tx_msg_bytes_left -= buf_processed;
+
+			if (rc != ZS_OK)
+			{
+				*processed = src_pos;
+				return rc;
+			}
+		}
+
+		/*
+		 * If not going to compress the data from *src, just write it
+		 * uncompressed.
+		 */
+		else if (zpq->tx_msg_bytes_left > 0)
+		{						/* determine next message type */
+			size_t		copy_len = Min(src_size - src_pos, zpq->tx_msg_bytes_left);
+			size_t		copy_processed = 0;
+
+			zpq_write_uncompressed(zpq, (char *) src + src_pos, copy_len, &copy_processed);
+			src_pos += copy_processed;
+			zpq->tx_msg_bytes_left -= copy_processed;
+		}
+
+		/*
+		 * Reached the compression toggle point, fetch next message header to
+		 * determine compression state.
+		 */
+		else
+		{
+			char		msg_type;
+			uint32		msg_len;
+
+			if (src_size - src_pos < 5)
+			{
+				/*
+				 * must return here because we can't continue without full
+				 * message header
+				 */
+				*processed = src_pos;
+				return ZPQ_INCOMPLETE_HEADER;
+			}
+
+			msg_type = *((char *) src + src_pos);
+			memcpy(&msg_len, (char *) src + src_pos + 1, 4);
+			msg_len = pg_ntoh32(msg_len);
+			rc = zpq_toggle_compression(zpq, msg_type, msg_len);
+			if (rc)
+			{
+				return rc;
+			}
+		}
+
+		/*
+		 * repeat sending while there is some data in input or internal
+		 * compression buffer
+		 */
+	} while (src_pos < src_size && zpq_buf_left(&zpq->tx_out) > 6);
+
+	return src_pos;
+}
+
+ssize_t
+zpq_write(ZpqStream * zpq, void const *src, size_t src_size, size_t *src_processed)
+{
+	size_t		src_pos = 0;
+	ssize_t		rc;
+
+	/* try to process as much data as possible before calling the tx_func */
+	while (zpq_buf_left(&zpq->tx_out) > 6)
+	{
+		size_t		copy_len = Min(zpq_buf_left(&zpq->tx_in), src_size - src_pos);
+		size_t		processed;
+
+		memcpy(zpq_buf_size(&zpq->tx_in), (char *) src + src_pos, copy_len);
+		zpq_buf_size_advance(&zpq->tx_in, copy_len);
+		src_pos += copy_len;
+
+		if (zpq_buf_unread(&zpq->tx_in) == 0 && !zs_buffered(zpq->c_stream))
+		{
+			break;
+		}
+
+		processed = 0;
+
+		rc = zpq_write_internal(zpq, zpq_buf_pos(&zpq->tx_in), zpq_buf_unread(&zpq->tx_in), &processed);
+		if (rc > 0)
+		{
+			zpq_buf_pos_advance(&zpq->tx_in, rc);
+			zpq_buf_reuse(&zpq->tx_in);
+		}
+		else
+		{
+			zpq_buf_pos_advance(&zpq->tx_in, processed);
+			zpq_buf_reuse(&zpq->tx_in);
+			if (rc == ZPQ_INCOMPLETE_HEADER)
+			{
+				break;
+			}
+			*src_processed = src_pos;
+			return rc;
+		}
+	}
+
+	/*
+	 * call the tx_func if have any bytes to send
+	 */
+	while (zpq_buf_unread(&zpq->tx_out))
+	{
+		rc = zpq->tx_func(zpq->arg, zpq_buf_pos(&zpq->tx_out), zpq_buf_unread(&zpq->tx_out));
+		if (rc > 0)
+		{
+			zpq_buf_pos_advance(&zpq->tx_out, rc);
+		}
+		else
+		{
+			*src_processed = src_pos;
+			zpq_buf_reuse(&zpq->tx_out);
+			return rc;
+		}
+	}
+
+	zpq_buf_reuse(&zpq->tx_out);
+	return src_pos;
+}
+
+/* Decompress bytes from RX buffer and write up to dst_len of uncompressed data to *dst.
+ * Returns:
+ * ZS_OK on success,
+ * ZS_STREAM_END if reached end of compressed chunk
+ * ZS_DECOMPRESS_ERROR if encountered a decompression error */
+static inline ssize_t
+zpq_read_compressed_message(ZpqStream * zpq, char *dst, size_t dst_len, size_t *dst_processed)
+{
+	size_t		rx_processed = 0;
+	ssize_t		rc;
+	size_t		read_len = Min(zpq->rx_msg_bytes_left, zpq_buf_unread(&zpq->rx_in));
+
+	Assert(read_len == zpq->rx_msg_bytes_left);
+	rc = zs_read(zpq->d_stream, zpq_buf_pos(&zpq->rx_in), read_len, &rx_processed,
+				 dst, dst_len, dst_processed);
+
+	zpq_buf_pos_advance(&zpq->rx_in, rx_processed);
+	zpq->rx_total_raw += *dst_processed;
+	zpq->rx_msg_bytes_left -= rx_processed;
+	return rc;
+}
+
+/* Copy up to dst_len bytes from rx buffer to *dst.
+ * Returns amount of bytes copied. */
+static inline size_t
+zpq_read_uncompressed(ZpqStream * zpq, char *dst, size_t dst_len)
+{
+	size_t		copy_len;
+
+	Assert(zpq_buf_unread(&zpq->rx_in) > 0);
+	copy_len = Min(zpq->rx_msg_bytes_left, Min(zpq_buf_unread(&zpq->rx_in), dst_len));
+
+	memcpy(dst, zpq_buf_pos(&zpq->rx_in), copy_len);
+
+	zpq_buf_pos_advance(&zpq->rx_in, copy_len);
+	zpq->rx_total_raw += copy_len;
+	zpq->rx_msg_bytes_left -= copy_len;
+	return copy_len;
+}
+
+/* Determine if should decompress the next message and
+ * change the current decompression state */
+static inline void
+zpq_toggle_decompression(ZpqStream * zpq)
+{
+	uint32		msg_len;
+	char		msg_type = *zpq_buf_pos(&zpq->rx_in);
+
+	memcpy(&msg_len, zpq_buf_pos(&zpq->rx_in) + 1, 4);
+	msg_len = pg_ntoh32(msg_len);
+
+	if (zpq_is_set_compression_msg(msg_type))
+	{
+		Assert(msg_len == 5);
+		zpq->reading_set_compression = true;
+		/* set compression message header is no longer needed, just skip it */
+		zpq_buf_pos_advance(&zpq->rx_in, 5);
+	}
+	else
+	{
+		zpq->is_decompressing = zpq_is_compressed_msg(msg_type);
+		zpq->rx_msg_bytes_left = msg_len + 1;
+
+		if (zpq->is_decompressing)
+		{
+			/* compressed message header is no longer needed, just skip it */
+			zpq_buf_pos_advance(&zpq->rx_in, 5);
+			zpq->rx_msg_bytes_left -= 5;
+		}
+	}
+}
+
+static inline ssize_t
+zpq_process_switch(ZpqStream * zpq)
+{
+	uint8		algorithm_idx;
+
+	if (zpq_buf_unread(&zpq->rx_in) < 1)
+	{
+		return 0;
+	}
+
+	algorithm_idx = *zpq_buf_pos(&zpq->rx_in);
+
+	zpq_buf_pos_advance(&zpq->rx_in, 1);
+	zpq->reading_set_compression = false;
+
+	if (algorithm_idx != zpq->decompress_alg_idx)
+	{
+		zs_decompressor_free(zpq->d_stream);
+		zpq->d_stream = zs_create_decompressor(zpq->compressors[algorithm_idx].impl);
+		if (zpq->d_stream == NULL)
+		{
+			return ZPQ_FATAL_ERROR;
+		}
+		zpq->decompress_alg_idx = algorithm_idx;
+	}
+
+	return 0;
+}
+
+ssize_t
+zpq_read(ZpqStream * zpq, void *dst, size_t dst_size, bool noblock)
+{
+	size_t		dst_pos = 0;
+	size_t		dst_processed = 0;
+	ssize_t		rc;
+
+	/* Read until some data fetched */
+	while (dst_pos == 0)
+	{
+		zpq_buf_reuse(&zpq->rx_in);
+
+		if (!zpq_buffered_rx(zpq) || (zpq->is_decompressing && zpq_buf_unread(&zpq->rx_in) < zpq->rx_msg_bytes_left))
+		{
+			if (noblock)
+			{
+				/*
+				 * can't read anything w/o the potentially blocking backend
+				 * call
+				 */
+				return dst_pos;
+			}
+			rc = zpq->rx_func(zpq->arg, zpq_buf_size(&zpq->rx_in), zpq_buf_left(&zpq->rx_in));
+			if (rc > 0)			/* read fetches some data */
+			{
+				zpq->rx_total += rc;
+				zpq_buf_size_advance(&zpq->rx_in, rc);
+			}
+			else				/* read failed */
+			{
+				return rc;
+			}
+		}
+
+		/*
+		 * try to read ahead the next message types and increase
+		 * rx_msg_bytes_left, if possible (ONLY UNCOMPRESSED MESSAGES)
+		 */
+		while (!zpq->is_decompressing && zpq->rx_msg_bytes_left > 0 && (zpq_buf_unread(&zpq->rx_in) >= zpq->rx_msg_bytes_left + 5))
+		{
+			char		msg_type;
+			uint32		msg_len;
+
+			msg_type = *(zpq_buf_pos(&zpq->rx_in) + zpq->rx_msg_bytes_left);
+			if (zpq_is_compressed_msg(msg_type) || zpq_is_set_compression_msg(msg_type))
+			{
+				/*
+				 * cannot proceed further, encountered compression toggle
+				 * point
+				 */
+				break;
+			}
+
+			memcpy(&msg_len, zpq_buf_pos(&zpq->rx_in) + zpq->rx_msg_bytes_left + 1, 4);
+			zpq->rx_msg_bytes_left += pg_ntoh32(msg_len) + 1;
+		}
+
+
+		if (zpq->rx_msg_bytes_left > 0 || zs_buffered(zpq->d_stream))
+		{
+			dst_processed = 0;
+			if (zpq->is_decompressing || zs_buffered(zpq->d_stream))
+			{
+				if (!zs_buffered(zpq->d_stream) && zpq_buf_unread(&zpq->rx_in) < zpq->rx_msg_bytes_left)
+				{
+					/*
+					 * prefer to read only the fully compressed messages or
+					 * read if some data is buffered
+					 */
+					continue;
+				}
+				rc = zpq_read_compressed_message(zpq, dst, dst_size - dst_pos, &dst_processed);
+				dst_pos += dst_processed;
+				if (rc == ZS_STREAM_END)
+				{
+					continue;
+				}
+				if (rc != ZS_OK)
+				{
+					return rc;
+				}
+			}
+			else
+				dst_pos += zpq_read_uncompressed(zpq, dst, dst_size - dst_pos);
+		}
+		else if (zpq->reading_set_compression)
+		{
+			zpq_process_switch(zpq);
+		}
+		else if (zpq_buf_unread(&zpq->rx_in) >= 5)
+			zpq_toggle_decompression(zpq);
+	}
+	return dst_pos;
+}
+
+bool
+zpq_buffered_rx(ZpqStream * zpq)
+{
+	return zpq ? zpq_buf_unread(&zpq->rx_in) >= 5 || (zpq_buf_unread(&zpq->rx_in) > 0 && zpq->rx_msg_bytes_left > 0) ||
+		zs_buffered(zpq->d_stream) : 0;
+}
+
+bool
+zpq_buffered_tx(ZpqStream * zpq)
+{
+	return zpq ? zpq_buf_unread(&zpq->tx_in) >= 5 || (zpq_buf_unread(&zpq->tx_in) > 0 && zpq->tx_msg_bytes_left > 0) || zpq_buf_unread(&zpq->tx_out) > 0 ||
+		zs_buffered(zpq->c_stream) : 0;
+}
+
+void
+zpq_free(ZpqStream * zpq)
+{
+	if (zpq)
+	{
+		if (zpq->c_stream)
+		{
+			zs_compressor_free(zpq->c_stream);
+		}
+		if (zpq->d_stream)
+		{
+			zs_decompressor_free(zpq->d_stream);
+		}
+		free(zpq);
+	}
+}
+
+char const *
+zpq_compress_error(ZpqStream * zpq)
+{
+	return zs_compress_error(zpq->c_stream);
+}
+
+char const *
+zpq_decompress_error(ZpqStream * zpq)
+{
+	return zs_decompress_error(zpq->d_stream);
+}
+
+char const *
+zpq_compress_algorithm_name(ZpqStream * zpq)
+{
+	return zs_compress_algorithm_name(zpq->c_stream);
+}
+
+char const *
+zpq_decompress_algorithm_name(ZpqStream * zpq)
+{
+	return zs_decompress_algorithm_name(zpq->d_stream);
+}
+
+char *
+zpq_algorithms(ZpqStream * zpq)
+{
+	return zpq_serialize_compressors(zpq->compressors, zpq->n_compressors);
+}
+
+int
+zpq_parse_compression_setting(char *val, zpq_compressor * *compressors, size_t *n_compressors)
+{
+	int			i;
+	char	  **supported_algorithms = zs_get_supported_algorithms();
+	size_t		n_supported_algorithms = 0;
+	char	   *protocol_extension = strchr(val, ';');
+
+	*compressors = NULL;
+	*n_compressors = 0;
+
+	/* No protocol extensions are currently supported */
+	if (protocol_extension)
+		*protocol_extension = '\0';
+
+	while (supported_algorithms[n_supported_algorithms] != NULL)
+	{
+		n_supported_algorithms += 1;
+	}
+
+	if (pg_strcasecmp(val, "true") == 0 ||
+		pg_strcasecmp(val, "yes") == 0 ||
+		pg_strcasecmp(val, "on") == 0 ||
+		pg_strcasecmp(val, "any") == 0 ||
+		pg_strcasecmp(val, "1") == 0)
+	{
+		/* return all available compressors */
+		*n_compressors = n_supported_algorithms;
+
+		if (n_supported_algorithms)
+		{
+			*compressors = malloc(n_supported_algorithms * sizeof(zpq_compressor));
+			for (i = 0; i < n_supported_algorithms; i++)
+			{
+				(*compressors)[i].impl = i;
+				(*compressors)[i].level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+			}
+		}
+		return 1;
+	}
+
+	if (*val == 0 ||
+		pg_strcasecmp(val, "false") == 0 ||
+		pg_strcasecmp(val, "no") == 0 ||
+		pg_strcasecmp(val, "off") == 0 ||
+		pg_strcasecmp(val, "0") == 0)
+	{
+		/* Compression is disabled */
+		return 0;
+	}
+
+	return zpq_deserialize_compressors(val, compressors, n_compressors) ? 1 : -1;
+}
+
+bool
+zpq_deserialize_compressors(char const *c_string, zpq_compressor * *compressors, size_t *n_compressors)
+{
+	int			selected_alg_mask = 0;	/* bitmask of already selected
+										 * algorithms to avoid duplicates in
+										 * compressors */
+	char	  **supported_algorithms = zs_get_supported_algorithms();
+	size_t		n_supported_algorithms = 0;
+	char	   *c_string_dup = strdup(c_string);	/* following parsing can
+													 * modify the string */
+	char	   *p = c_string_dup;
+
+	*n_compressors = 0;
+
+	while (supported_algorithms[n_supported_algorithms] != NULL)
+	{
+		n_supported_algorithms += 1;
+	}
+
+	*compressors = malloc(n_supported_algorithms * sizeof(zpq_compressor));
+
+	while (*p != '\0')
+	{
+		char	   *sep = strchr(p, ',');
+		char	   *col;
+		int			compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+		bool		found;
+
+		if (sep != NULL)
+			*sep = '\0';
+
+		col = strchr(p, ':');
+		if (col != NULL)
+		{
+			*col = '\0';
+			if (sscanf(col + 1, "%d", &compression_level) != 1)
+			{
+				pg_log_warning("invalid compression level %s in compression option '%s'", col + 1, p);
+				free(*compressors);
+				free(c_string_dup);
+				*compressors = NULL;
+				*n_compressors = 0;
+				return false;
+			}
+		}
+		found = false;
+		for (int i = 0; supported_algorithms[i] != NULL; i++)
+		{
+			if (pg_strcasecmp(p, supported_algorithms[i]) == 0)
+			{
+				if (selected_alg_mask & (1 << i))
+				{
+					/* duplicates are not allowed */
+					pg_log_warning("duplicate algorithm %s in compressors string %s", p, c_string);
+					free(*compressors);
+					free(c_string_dup);
+					*compressors = NULL;
+					*n_compressors = 0;
+					return false;
+				}
+
+				(*compressors)[*n_compressors].impl = i;
+				(*compressors)[*n_compressors].level = compression_level;
+
+				selected_alg_mask |= 1 << i;
+				*n_compressors += 1;
+				found = true;
+				break;
+			}
+		}
+		if (!found)
+		{
+			pg_log_warning("algorithm %s is not supported", p);
+		}
+		if (sep)
+			p = sep + 1;
+		else
+			break;
+	}
+
+	if (*n_compressors == 0)
+	{
+		free(*compressors);
+		*compressors = NULL;
+	}
+	free(c_string_dup);
+	return true;
+}
+
+char *
+zpq_serialize_compressors(zpq_compressor const *compressors, size_t n_compressors)
+{
+	char	   *res;
+	char	   *p;
+	size_t		i;
+	size_t		total_len = 0;
+	char	  **supported_algorithms = zs_get_supported_algorithms();
+
+	if (n_compressors == 0)
+	{
+		return NULL;
+	}
+
+	for (i = 0; i < n_compressors; i++)
+	{
+		size_t		level_len;
+
+		if (!zs_is_valid_impl_id(compressors[i].impl))
+		{
+			pg_log_warning("algorithm impl_id %d is incorrect", compressors[i].impl);
+			return NULL;
+		}
+
+		/* determine the length of the compression level string */
+		level_len = compressors[i].level == 0 ? 1 : (int) floor(log10(abs(compressors[i].level))) + 1;
+		if (compressors[i].level < 0)
+		{
+			level_len += 1;		/* add the leading "-" */
+		}
+
+		/*
+		 * single entry looks like "alg_name:compression_level," so +2 is for
+		 * ":" and "," symbols (or trailing null)
+		 */
+		total_len += strlen(supported_algorithms[compressors[i].impl]) + level_len + 2;
+	}
+
+	res = p = malloc(total_len);
+
+	for (i = 0; i < n_compressors; i++)
+	{
+		p += sprintf(p, "%s:%d", supported_algorithms[compressors[i].impl], compressors[i].level);
+		if (i < n_compressors - 1)
+			*p++ = ',';
+	}
+	return res;
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index a104caf4040..b4d3122c6eb 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5347,9 +5347,9 @@
   proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'int4',
-  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,text,numeric,text,bool,text,bool,int4,int8}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,query_id}',
+  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,text,numeric,text,bool,text,bool,int4,int8,int8,int8,int8,int8}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,query_id,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}',
   prosrc => 'pg_stat_get_activity' },
 { oid => '3318',
   descr => 'statistics: information about progress of backends running maintenance command',
@@ -5668,6 +5668,14 @@
   proargnames => '{stats_reset,prefetch,hit,skip_init,skip_new,skip_fpw,skip_rep,wal_distance,block_distance,io_depth}',
   prosrc => 'pg_stat_get_recovery_prefetch' },
 
+{ oid => '9598', descr => 'statistics: information about network traffic',
+  proname => 'pg_stat_get_network_traffic', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'record', proargtypes => 'int4',
+  proallargtypes => '{int4,int8,int8,int8,int8}',
+  proargmodes => '{i,o,o,o,o}',
+  proargnames => '{_beid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}',
+  prosrc => 'pg_stat_get_network_traffic' },
+
 { oid => '2306', descr => 'statistics: information about SLRU caches',
   proname => 'pg_stat_get_slru', prorows => '100', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
@@ -5809,6 +5817,10 @@
   proname => 'pg_tablespace_location', provolatile => 's', prorettype => 'text',
   proargtypes => 'oid', prosrc => 'pg_tablespace_location' },
 
+{ oid => '9257', descr => 'connection compression algorithm',
+  proname => 'pg_compression_algorithm', provolatile => 's', prorettype => 'text',
+  proargtypes => '', prosrc => 'pg_compression_algorithm' },
+
 { oid => '1946',
   descr => 'convert bytea value into some ascii-only text string',
   proname => 'encode', prorettype => 'text', proargtypes => 'bytea text',
diff --git a/src/include/common/z_stream.h b/src/include/common/z_stream.h
new file mode 100644
index 00000000000..4316563aa7f
--- /dev/null
+++ b/src/include/common/z_stream.h
@@ -0,0 +1,109 @@
+/*
+ * z_stream.h
+ *     Streaming compression
+ */
+
+
+#ifndef Z_STREAM_H
+#define Z_STREAM_H
+
+#include <stdlib.h>
+
+#define ZS_OK (0)
+#define ZS_IO_ERROR (-1)
+#define ZS_DECOMPRESS_ERROR (-2)
+#define ZS_COMPRESS_ERROR (-3)
+#define ZS_STREAM_END (-4)
+#define ZS_DATA_PENDING (-5)
+#define ZS_INCOMPLETE_SRC (-6)	/* cannot decompress unless full src message
+								 * is fetched */
+#define ZS_EXPAND_DST (-7)		/* cannot compress: dst buffer is too small
+								 * for given src size */
+
+struct ZStream;
+typedef struct ZStream ZStream;
+
+#endif
+
+/*
+ * Create compression stream for sending compressed data.
+ * c_alg_impl: index of chosen compression algorithm
+ * c_level: compression c_level
+ */
+extern ZStream * zs_create_compressor(unsigned int c_alg_impl, int c_level);
+
+/*
+ * Create decompression stream for reading compressed data.
+ * d_alg_impl: index of chosen decompression algorithm
+ */
+extern ZStream * zs_create_decompressor(unsigned int d_alg_impl);
+
+/*
+ * Read up to "size" raw (decompressed) bytes.
+ * Returns number of decompressed bytes or error code.
+ * Error code is either ZS_DECOMPRESS_ERROR or error code returned by the rx function.
+ */
+extern ssize_t zs_read(ZStream * zs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed);
+
+/*
+ * Write up to "size" raw (decompressed) bytes.
+ * Returns number of written raw bytes or error code.
+ * Error code is either ZS_COMPRESS_ERROR or error code returned by the tx function.
+ * In the last case number of bytes written is stored in *processed.
+ */
+extern ssize_t zs_write(ZStream * zs, void const *buf, size_t size, size_t *processed, void *dst, size_t dst_size, size_t *dst_processed);
+
+/*
+ * Get decompressor error message.
+ */
+extern char const *zs_decompress_error(ZStream * zs);
+
+/*
+ * Get compressor error message.
+ */
+extern char const *zs_compress_error(ZStream * zs);
+
+/*
+ * Return true if non-flushed data might left in internal rx decompression buffer.
+ */
+extern bool zs_buffered(ZStream * zs);
+
+/*
+ * Return true if non-flushed data might left in internal tx compression buffer.
+ */
+extern bool zs_buffered(ZStream * zs);
+
+/*
+ * End the compression stream.
+ */
+extern ssize_t zs_end_compression(ZStream * zs, void *dst, size_t dst_size, size_t *dst_processed);
+
+/*
+ * Free stream created by zs_create_compressor function.
+ */
+extern void zs_compressor_free(ZStream * zs);
+
+/*
+ * Free stream created by zs_create_decompressor function.
+ */
+extern void zs_decompressor_free(ZStream * zs);
+
+/*
+ * Get the name of chosen compression algorithm.
+ */
+extern char const *zs_compress_algorithm_name(ZStream * zs);
+
+/*
+ * Get the name of chosen decompression algorithm.
+ */
+extern char const *zs_decompress_algorithm_name(ZStream * zs);
+
+/*
+  Returns zero terminated array with compression algorithms names
+*/
+extern char **zs_get_supported_algorithms(void);
+
+/*
+  Returns true if provided id is a valid compression algorithm id, otherwise returns false
+*/
+extern bool zs_is_valid_impl_id(unsigned int id);
diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h
new file mode 100644
index 00000000000..29b467015f2
--- /dev/null
+++ b/src/include/common/zpq_stream.h
@@ -0,0 +1,120 @@
+/*
+ * zpq_stream.h
+ *     Streaming compression for libpq
+ */
+#include "z_stream.h"
+
+#ifndef ZPQ_STREAM_H
+#define ZPQ_STREAM_H
+
+#define ZPQ_DEFAULT_COMPRESSION_LEVEL (1)
+#define ZPQ_INCOMPLETE_HEADER (-6)
+#define ZPQ_FATAL_ERROR (-7)
+struct ZpqStream;
+typedef struct ZpqStream ZpqStream;
+
+typedef ssize_t (*zpq_tx_func) (void *arg, void const *data, size_t size);
+typedef ssize_t (*zpq_rx_func) (void *arg, void *data, size_t size);
+
+/*
+ * Descriptor of compression algorithm chosen by client
+ */
+typedef struct zpq_compressor
+{
+	unsigned int impl;			/* compression algorithm index */
+	int			level;			/* compression level */
+}			zpq_compressor;
+
+#endif
+
+/*
+ * Create compression stream with rx/tx function for reading/sending compressed data.
+ * tx_func: function for writing compressed data in underlying stream
+ * rx_func: function for reading compressed data from underlying stream
+ * arg: context passed to the function
+ * rx_data: received data (compressed data already fetched from input stream)
+ * rx_data_size: size of data fetched from input stream
+ */
+extern ZpqStream * zpq_create(zpq_compressor * compressors, size_t n_compressors, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size);
+
+/*
+ * Write up to "src_size" raw (decompressed) bytes.
+ * Returns number of written raw bytes or error code.
+ * Error code is either ZPQ_COMPRESS_ERROR or error code returned by the tx function.
+ * In the last case number of bytes written is stored in *src_processed.
+ */
+extern ssize_t zpq_write(ZpqStream * zpq, void const *src, size_t src_size, size_t *src_processed);
+
+/*
+ * Read up to "dst_size" raw (decompressed) bytes.
+ * Returns number of decompressed bytes or error code.
+ * Error code is either ZPQ_DECOMPRESS_ERROR or error code returned by the rx function.
+ */
+extern ssize_t zpq_read(ZpqStream * zpq, void *dst, size_t dst_size, bool noblock);
+
+/*
+ * Return true if non-flushed data left in internal rx decompression buffer.
+ */
+extern bool zpq_buffered_rx(ZpqStream * zpq);
+
+/*
+ * Return true if non-flushed data left in internal tx compression buffer.
+ */
+extern bool zpq_buffered_tx(ZpqStream * zpq);
+
+/*
+ * Free stream created by zs_create function.
+ */
+extern void zpq_free(ZpqStream * zpq);
+
+/*
+ * Get decompressor error message.
+ */
+extern char const *zpq_decompress_error(ZpqStream * zpq);
+
+/*
+ * Get compressor error message.
+ */
+extern char const *zpq_compress_error(ZpqStream * zpq);
+
+/*
+ * Get the name of the current compression algorithm.
+ */
+extern char const *zpq_compress_algorithm_name(ZpqStream * zpq);
+
+/*
+ * Get the name of the current decompression algorithm.
+ */
+extern char const *zpq_decompress_algorithm_name(ZpqStream * zpq);
+
+/*
+ * Parse the compression setting. Returns:
+ * - 1 if the compression setting is valid
+ * - 0 if the compression setting is valid but disabled
+ * - -1 if the compression setting is invalid
+ * It also populates the compressors array with the recognized compressors. Size of the array is stored in n_compressors.
+ * If no supported compressors recognized or if compression is disabled, then NULL is assigned to *compressors and n_compressors is set to 0.
+ */
+extern int
+			zpq_parse_compression_setting(char *val, zpq_compressor * *compressors, size_t *n_compressors);
+
+/* Serialize the compressors array to string so it can be transmitted to the other side during the compression startup.
+ * For example, for array of two compressors (zstd, level 1), (zlib, level 2) resulting string would look like "zstd:1,zlib:2".
+ * Returns the resulting string.
+ */
+extern char
+		   *zpq_serialize_compressors(zpq_compressor const *compressors, size_t n_compressors);
+
+/* Deserialize the compressors string received during the compression setup to a compressors array.
+ * For example, for string "zstd:1,zlib:2" compressors would be populated with 2 elements: (zstd, level 1), (zlib, level 2).
+ * Returns:
+ * - true if the compressors string is successfully parsed
+ * - false otherwise
+ * It also populates the compressors array with the recognized compressors. Size of the array is stored in n_compressors.
+ * If no supported compressors recognized or string is empty, then NULL is assigned to *compressors and n_compressors is set to 0.
+ */
+bool
+			zpq_deserialize_compressors(char const *c_string, zpq_compressor * *compressors, size_t *n_compressors);
+
+/* Return the currently enabled compression algorithms */
+char	   *zpq_algorithms(ZpqStream * zpq);
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 6d452ec6d95..b26808daa31 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -195,6 +195,9 @@ typedef struct Port
 	int			keepalives_count;
 	int			tcp_user_timeout;
 
+	char	   *compression_algorithms; /* Compression algorithms supported by
+										 * client */
+
 	/*
 	 * GSSAPI structures.
 	 */
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index 2de7d9bad2c..0212a7d25f8 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -72,6 +72,7 @@ extern void StreamClose(pgsocket sock);
 extern void TouchSocketFiles(void);
 extern void RemoveSocketFiles(void);
 extern void pq_init(void);
+extern int	pq_configure(Port *port);
 extern int	pq_getbytes(char *s, size_t len);
 extern void pq_startmsgread(void);
 extern void pq_endmsgread(void);
diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h
index fcf68df39b9..98665015c51 100644
--- a/src/include/libpq/pqcomm.h
+++ b/src/include/libpq/pqcomm.h
@@ -99,6 +99,9 @@ typedef uint32 PacketLen;
 
 extern PGDLLIMPORT bool Db_user_namespace;
 
+/* List of allowed compression algorithms */
+extern char *libpq_compress_algorithms;
+
 /*
  * In protocol 3.0 and later, the startup packet length is not fixed, but
  * we set an arbitrary limit on it anyway.  This is just to prevent simple
diff --git a/src/include/utils/backend_status.h b/src/include/utils/backend_status.h
index b582b46e9f9..1bda101f368 100644
--- a/src/include/utils/backend_status.h
+++ b/src/include/utils/backend_status.h
@@ -145,6 +145,12 @@ typedef struct PgBackendStatus
 	/* application name; MUST be null-terminated */
 	char	   *st_appname;
 
+	/* client-server traffic information */
+	uint64		st_rx_raw_bytes;
+	uint64		st_tx_raw_bytes;
+	uint64		st_rx_compressed_bytes;
+	uint64		st_tx_compressed_bytes;
+
 	/*
 	 * Current command string; MUST be null-terminated. Note that this string
 	 * possibly is truncated in the middle of a multi-byte character. As
@@ -309,6 +315,7 @@ extern void pgstat_report_query_id(uint64 query_id, bool force);
 extern void pgstat_report_tempfile(size_t filesize);
 extern void pgstat_report_appname(const char *appname);
 extern void pgstat_report_xact_timestamp(TimestampTz tstamp);
+extern void pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes);
 extern const char *pgstat_get_backend_current_activity(int pid, bool checkUser);
 extern const char *pgstat_get_crashed_backend_activity(int pid, char *buffer,
 													   int buflen);
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index 1d31b256fc9..27875cbee0f 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -30,6 +30,20 @@ endif
 # The MSVC build system scrapes OBJS from this file.  If you change any of
 # the conditional additions of files to OBJS, update Mkvcbuild.pm to match.
 
+ifeq ($(with_lz4),yes)
+LIBS += -llz4
+SHLIB_LINK += -llz4
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+SHLIB_LINK += -lz
+endif
+
+# We can't use Makefile variables here because the MSVC build system scrapes
+# OBJS from this file.
+
+
 OBJS = \
 	$(WIN32RES) \
 	fe-auth-scram.o \
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index e8bcc883709..2759801b00e 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -186,3 +186,5 @@ PQpipelineStatus          183
 PQsetTraceFlags           184
 PQmblenBounded            185
 PQsendFlushRequest        186
+PQcompression             187
+PQreadPending             188
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index a6120bf58b8..c50f9ac602c 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -25,6 +25,7 @@
 #include "common/ip.h"
 #include "common/link-canary.h"
 #include "common/scram-common.h"
+#include "common/zpq_stream.h"
 #include "common/string.h"
 #include "fe-auth.h"
 #include "libpq-fe.h"
@@ -336,6 +337,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 		"Replication", "D", 5,
 	offsetof(struct pg_conn, replication)},
 
+	{"compression", "PGCOMPRESSION", "off", NULL,
+		"Libpq-compression", "", 16,
+	offsetof(struct pg_conn, compression)},
+
 	{"target_session_attrs", "PGTARGETSESSIONATTRS",
 		DefaultTargetSessionAttrs, NULL,
 		"Target-Session-Attrs", "", 15, /* sizeof("prefer-standby") = 15 */
@@ -445,6 +450,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
 void
 pqDropConnection(PGconn *conn, bool flushInput)
 {
+	/* Release compression streams */
+	zpq_free(conn->zpqStream);
+	conn->zpqStream = NULL;
+
 	/* Drop any SSL state */
 	pqsecure_close(conn);
 
@@ -1371,6 +1380,36 @@ connectOptions2(PGconn *conn)
 			goto oom_error;
 	}
 
+	/*
+	 * validate compression option
+	 */
+	if (conn->compression && conn->compression[0])
+	{
+		zpq_compressor *compressors;
+		size_t		n_compressors;
+		int			rc = zpq_parse_compression_setting(conn->compression, &compressors, &n_compressors);
+
+		if (rc == -1)
+		{
+			conn->status = CONNECTION_BAD;
+			appendPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("invalid %s value: \"%s\"\n"),
+							  "compression", conn->compression);
+			return false;
+		}
+
+		if (rc == 1 && n_compressors == 0)
+		{
+			conn->status = CONNECTION_BAD;
+			appendPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("no supported algorithms found, %s value: \"%s\"\n"),
+							  "compression", conn->compression);
+			return false;
+		}
+
+		free(compressors);
+	}
+
 	/*
 	 * validate target_session_attrs option, and set target_server_type
 	 */
@@ -3193,11 +3232,14 @@ keep_going:						/* We will come back to here until there is
 				}
 
 				/*
-				 * Validate message type: we expect only an authentication
-				 * request or an error here.  Anything else probably means
-				 * it's not Postgres on the other end at all.
+				 * Validate message type. We expect only:
+				 * - authentication request ('R')
+				 * - error ('E')
+				 * - protocol compression acknowledgment ('z')
+				 * - NegotiateProtocolVersion in cases when server does not support protocol compression
+				 * Anything else probably means it's not Postgres on the other end at all.
 				 */
-				if (!(beresp == 'R' || beresp == 'E'))
+				if (!(beresp == 'R' || beresp == 'E' || beresp == 'z' || beresp == 'v'))
 				{
 					libpq_append_conn_error(conn, "expected authentication request from server, but received %c",
 									   beresp);
@@ -3273,6 +3315,72 @@ keep_going:						/* We will come back to here until there is
 					return PGRES_POLLING_READING;
 				}
 
+				if (beresp == 'z')	/* Switch on compression */
+				{
+					zpq_compressor *compressors;
+					size_t		n_compressors;
+					char	   *resp = malloc(msgLength);
+
+					pqGetnchar(resp, msgLength, conn);
+
+					if (!zpq_deserialize_compressors(resp, &compressors, &n_compressors))
+					{
+						appendPQExpBuffer(&conn->errorMessage,
+										  libpq_gettext("server returned unrecognized compression setting: %s\n"),
+										  resp);
+						free(resp);
+						goto error_return;
+					}
+					free(resp);
+
+					if (n_compressors == 0)
+					{
+						/*
+						 * If there are no compressors returned, it means that
+						 * the server rejected all the proposed compression
+						 * algorithms. Report an error and exit.
+						 */
+						// conn->inStart = conn->inCursor;
+						appendPQExpBuffer(&conn->errorMessage,
+										  libpq_gettext("server rejected protocol compression\n"));
+						goto error_return;
+						// goto keep_going;
+					}
+
+					Assert(!conn->zpqStream);
+					conn->zpqStream = zpq_create(compressors, n_compressors,
+												 (zpq_tx_func) pqsecure_write, (zpq_rx_func) pqsecure_read,
+												 conn,
+												 &conn->inBuffer[conn->inCursor],
+												 conn->inEnd - conn->inCursor);
+					if (!conn->zpqStream)
+					{
+						appendPQExpBuffer(&conn->errorMessage,
+										  libpq_gettext("failed to initialize compression\n"));
+						free(compressors);
+						goto error_return;
+					}
+					/* reset buffer */
+					conn->inStart = conn->inCursor = conn->inEnd = 0;
+				}
+				else if (conn->n_compressors > 0 && !conn->zpqStream)
+				{
+					/*
+					 * Despite the client requesting the compression, the
+					 * backend did not reply with the CompressionACK message.
+					 * This case covers either no reply at all or
+					 * NegotiateProtocolVersion reply. If the backend supports
+					 * the protocol compression feature, it should reply with
+					 * CompressionACK in any case, even in case of compression
+					 * request rejection. If the backend did not reply with
+					 * the CompressionACK message, it means that it does not
+					 * support this feature. Report an error and exit.
+					 */
+					appendPQExpBuffer(&conn->errorMessage,
+									  libpq_gettext("server does not support protocol compression\n"));
+					goto error_return;
+				}
+
 				/* Handle errors. */
 				if (beresp == 'E')
 				{
@@ -3972,8 +4080,8 @@ freePGconn(PGconn *conn)
 			free(conn->connhost[i].password);
 		}
 	}
-	free(conn->connhost);
 
+	free(conn->connhost);
 	free(conn->client_encoding_initial);
 	free(conn->events);
 	free(conn->pghost);
@@ -3986,7 +4094,9 @@ freePGconn(PGconn *conn)
 	free(conn->fbappname);
 	free(conn->dbName);
 	free(conn->replication);
+	free(conn->compression);
 	free(conn->pguser);
+
 	if (conn->pgpass)
 	{
 		explicit_bzero(conn->pgpass, strlen(conn->pgpass));
@@ -6590,6 +6700,15 @@ PQuser(const PGconn *conn)
 	return conn->pguser;
 }
 
+char *
+PQcompression(const PGconn *conn)
+{
+	if (!conn || !conn->zpqStream)
+		return NULL;
+
+	return zpq_algorithms(conn->zpqStream);
+}
+
 char *
 PQpass(const PGconn *conn)
 {
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index da229d632a1..575e634639a 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -2063,9 +2063,7 @@ PQgetResult(PGconn *conn)
 		 * EOF indication.  We expect therefore that this won't result in any
 		 * undue delay in reporting a previous write failure.)
 		 */
-		if (flushResult ||
-			pqWait(true, false, conn) ||
-			pqReadData(conn) < 0)
+		if (flushResult || pqWait(true, false, conn) || pqReadData(conn) < 0)
 		{
 			/* Report the error saved by pqWait or pqReadData */
 			pqSaveErrorResult(conn);
@@ -3854,6 +3852,12 @@ pqPipelineFlush(PGconn *conn)
 	return 0;
 }
 
+int
+PQreadPending(PGconn *conn)
+{
+	return pqReadPending(conn);
+}
+
 
 /*
  *		PQfreemem - safely frees memory allocated
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 4159610f6c7..d3a4e34d93e 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -51,12 +51,24 @@
 #include "pg_config_paths.h"
 #include "port/pg_bswap.h"
 
+#include  <common/zpq_stream.h>
+
 static int	pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
 static int	pqSendSome(PGconn *conn, int len);
 static int	pqSocketCheck(PGconn *conn, int forRead, int forWrite,
 						  time_t end_time);
 static int	pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
 
+/*
+ * Use zpq_read if compression is switched on
+ */
+#define pq_read_conn(conn)												\
+	(conn->zpqStream													\
+	 ? zpq_read(conn->zpqStream, conn->inBuffer + conn->inEnd,			\
+				conn->inBufSize - conn->inEnd, false)							\
+	 : pqsecure_read(conn, conn->inBuffer + conn->inEnd,				\
+					 conn->inBufSize - conn->inEnd))
+
 /*
  * PQlibVersion: return the libpq version number
  */
@@ -614,10 +626,17 @@ pqReadData(PGconn *conn)
 
 	/* OK, try to read some data */
 retry3:
-	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+	nread = pq_read_conn(conn);
 	if (nread < 0)
 	{
+		if (nread == ZS_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("decompress error: %s\n"),
+							  zpq_decompress_error(conn->zpqStream));
+			return -1;
+		}
+
 		switch (SOCK_ERRNO)
 		{
 			case EINTR:
@@ -709,10 +728,18 @@ retry3:
 	 * arrived.
 	 */
 retry4:
-	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+	nread = pq_read_conn(conn);
+
 	if (nread < 0)
 	{
+		if (nread == ZS_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("decompress error: %s\n"),
+							  zpq_decompress_error(conn->zpqStream));
+			return -1;
+		}
+
 		switch (SOCK_ERRNO)
 		{
 			case EINTR:
@@ -822,12 +849,18 @@ pqSendSome(PGconn *conn, int len)
 	}
 
 	/* while there's still data to send */
-	while (len > 0)
+	while (len > 0 || zpq_buffered_tx(conn->zpqStream))
 	{
 		int			sent;
+		size_t		processed = 0;
 
+		/*
+		 * Use zpq_write if compression is switched on
+		 */
+		sent = conn->zpqStream
+			? zpq_write(conn->zpqStream, ptr, len, &processed)
 #ifndef WIN32
-		sent = pqsecure_write(conn, ptr, len);
+			: pqsecure_write(conn, ptr, len);
 #else
 
 		/*
@@ -835,8 +868,11 @@ pqSendSome(PGconn *conn, int len)
 		 * failure-point appears to be different in different versions of
 		 * Windows, but 64k should always be safe.
 		 */
-		sent = pqsecure_write(conn, ptr, Min(len, 65536));
+:			pqsecure_write(conn, ptr, Min(len, 65536));
 #endif
+		ptr += processed;
+		len -= processed;
+		remaining -= processed;
 
 		if (sent < 0)
 		{
@@ -885,7 +921,7 @@ pqSendSome(PGconn *conn, int len)
 			remaining -= sent;
 		}
 
-		if (len > 0)
+		if (len > 0 || sent < 0 || zpq_buffered_tx(conn->zpqStream))
 		{
 			/*
 			 * We didn't send it all, wait till we can send more.
@@ -953,7 +989,7 @@ pqSendSome(PGconn *conn, int len)
 int
 pqFlush(PGconn *conn)
 {
-	if (conn->outCount > 0)
+	if (conn->outCount > 0 || zpq_buffered_tx(conn->zpqStream))
 	{
 		if (conn->Pfdebug)
 			fflush(conn->Pfdebug);
@@ -978,6 +1014,8 @@ pqFlush(PGconn *conn)
 int
 pqWait(int forRead, int forWrite, PGconn *conn)
 {
+	if (forRead && conn->inCursor < conn->inEnd)
+		return 0;
 	return pqWaitTimed(forRead, forWrite, conn, (time_t) -1);
 }
 
@@ -1034,6 +1072,9 @@ pqWriteReady(PGconn *conn)
  *
  * If SSL is in use, the SSL buffer is checked prior to checking the socket
  * for read data directly.
+ *
+ * If ZPQ stream is in use, the ZPQ buffer is checked prior to checking
+ * the socket for read data directly.
  */
 static int
 pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time)
@@ -1048,14 +1089,10 @@ pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time)
 		return -1;
 	}
 
-#ifdef USE_SSL
-	/* Check for SSL library buffering read bytes */
-	if (forRead && conn->ssl_in_use && pgtls_read_pending(conn))
+	if (forRead && (pqReadPending(conn) > 0))
 	{
-		/* short-circuit the select */
 		return 1;
 	}
-#endif
 
 	/* We will retry as long as we get EINTR */
 	do
@@ -1073,6 +1110,33 @@ pqSocketCheck(PGconn *conn, int forRead, int forWrite, time_t end_time)
 	return result;
 }
 
+/*
+ * Check if there is some data pending in ZPQ / SSL read buffers.
+ * Returns -1 on failure, 0 if no, 1 if yes.
+ */
+int
+pqReadPending(PGconn *conn)
+{
+	if (!conn)
+		return -1;
+
+	/* check for ZPQ stream buffered read bytes */
+	if (zpq_buffered_rx(conn->zpqStream))
+	{
+		/* short-circuit the select */
+		return 1;
+	}
+
+#ifdef USE_SSL
+	/* Check for SSL library buffering read bytes */
+	if (conn->ssl_in_use && pgtls_read_pending(conn))
+	{
+		/* short-circuit the select */
+		return 1;
+	}
+#endif
+	return 0;
+}
 
 /*
  * Check a file descriptor for read and/or write data, possibly waiting.
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 88dd360c905..e76a1a9b36c 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -69,6 +69,19 @@ pqParseInput3(PGconn *conn)
 	 */
 	for (;;)
 	{
+		/*
+		 * Read the buffered compressed data w/o blocking
+		 */
+		if (conn->zpqStream && pqReadPending(conn) && (conn->inBufSize - conn->inEnd > 0))
+		{
+			int			rc = zpq_read(conn->zpqStream, conn->inBuffer + conn->inEnd, conn->inBufSize - conn->inEnd, true);
+
+			if (rc > 0)
+			{
+				conn->inEnd += rc;
+			}
+		}
+
 		/*
 		 * Try to read a message.  First get the type code and length. Return
 		 * if not enough data.
@@ -1692,7 +1705,7 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
 		if (msgLength == 0)
 		{
 			/* Don't block if async read requested */
-			if (async)
+			if (async && !pqReadPending(conn))
 				return 0;
 			/* Need to load more data */
 			if (pqWait(true, false, conn) ||
@@ -2174,6 +2187,50 @@ pqBuildStartupPacket3(PGconn *conn, int *packetlen,
 	return startpacket;
 }
 
+/*
+ * Build comma-separated list of compression algorithms requested by client.
+ * It can be either explicitly specified by user in connection string, or
+ * include all algorithms supported by client library.
+ * This function returns true if the compression string is successfully parsed and
+ * stores a comma-separated list of algorithms in *client_compressors.
+ * If compression is disabled, then NULL is assigned to *client_compressors.
+ * Also it creates an array of compressor descriptors, each element of which corresponds to
+ * the corresponding algorithm name in *client_compressors list. This array is stored in PGconn
+ * and is used during handshake when a compression acknowledgment response is received from the server.
+ */
+static bool
+build_compressors_list(PGconn *conn, char **client_compressors, bool build_descriptors)
+{
+	zpq_compressor *compressors;
+	size_t		n_compressors;
+
+	if (!zpq_parse_compression_setting(conn->compression, &compressors, &n_compressors))
+	{
+		return false;
+	}
+
+	*client_compressors = NULL;
+	if (build_descriptors)
+	{
+		conn->compressors = compressors;
+		conn->n_compressors = n_compressors;
+	}
+
+	if (n_compressors == 0)
+	{
+		/* no compressors available, return */
+		return true;
+	}
+
+	*client_compressors = zpq_serialize_compressors(compressors, n_compressors);
+
+	if (!build_descriptors)
+	{
+		free(compressors);
+	}
+	return true;
+}
+
 /*
  * Build a startup packet given a filled-in PGconn structure.
  *
@@ -2220,6 +2277,18 @@ build_startup_packet(const PGconn *conn, char *packet,
 		ADD_STARTUP_OPTION("replication", conn->replication);
 	if (conn->pgoptions && conn->pgoptions[0])
 		ADD_STARTUP_OPTION("options", conn->pgoptions);
+	if (conn->compression && conn->compression[0])
+	{
+		char	   *client_compression_algorithms;
+
+		if (build_compressors_list((PGconn *) conn, &client_compression_algorithms, packet == NULL))
+		{
+			if (client_compression_algorithms)
+			{
+				ADD_STARTUP_OPTION("_pq_.compression", client_compression_algorithms);
+			}
+		}
+	}
 	if (conn->send_appname)
 	{
 		/* Use appname if present, otherwise use fallback */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index b7df3224c0f..6fd24d2dda3 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -342,6 +342,7 @@ extern char *PQhostaddr(const PGconn *conn);
 extern char *PQport(const PGconn *conn);
 extern char *PQtty(const PGconn *conn);
 extern char *PQoptions(const PGconn *conn);
+extern char *PQcompression(const PGconn *conn);
 extern ConnStatusType PQstatus(const PGconn *conn);
 extern PGTransactionStatusType PQtransactionStatus(const PGconn *conn);
 extern const char *PQparameterStatus(const PGconn *conn,
@@ -500,6 +501,9 @@ extern PGPing PQpingParams(const char *const *keywords,
 /* Force the write buffer to be written (or at least try) */
 extern int	PQflush(PGconn *conn);
 
+extern int
+			PQreadPending(PGconn *conn);
+
 /*
  * "Fast path" interface --- not really recommended for application
  * use
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index c24645b4696..43ba1c6ad88 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -41,6 +41,7 @@
 
 /* include stuff common to fe and be */
 #include "libpq/pqcomm.h"
+#include "common/zpq_stream.h"
 /* include stuff found in fe only */
 #include "fe-auth-sasl.h"
 #include "pqexpbuffer.h"
@@ -343,6 +344,7 @@ typedef struct pg_conn_host
 								 * found in password file. */
 } pg_conn_host;
 
+
 /*
  * PGconn stores all the state data associated with a single connection
  * to a backend.
@@ -395,6 +397,14 @@ struct pg_conn
 								 * "sspi") */
 	char	   *ssl_min_protocol_version;	/* minimum TLS protocol version */
 	char	   *ssl_max_protocol_version;	/* maximum TLS protocol version */
+
+	char	   *compression;	/* stream compression (boolean value, "any" or
+								 * list of compression algorithms separated by
+								 * comma) */
+	zpq_compressor *compressors;	/* descriptors of compression algorithms
+									 * chosen by client */
+	unsigned	n_compressors;	/* size of compressors array  */
+
 	char	   *target_session_attrs;	/* desired session properties */
 
 	/* Optional file to write trace info to */
@@ -590,6 +600,9 @@ struct pg_conn
 
 	/* Buffer for receiving various parts of messages */
 	PQExpBufferData workBuffer; /* expansible string */
+
+	/* Compression stream */
+	ZpqStream  *zpqStream;
 };
 
 /* PGcancel stores all data necessary to cancel a connection. A copy of this
@@ -722,6 +735,7 @@ extern int	pqWaitTimed(int forRead, int forWrite, PGconn *conn,
 						time_t finish_time);
 extern int	pqReadReady(PGconn *conn);
 extern int	pqWriteReady(PGconn *conn);
+extern int	pqReadPending(PGconn *conn);
 
 /* === in fe-secure.c === */
 
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 624d0e5aae1..606b72e9c1e 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1756,7 +1756,7 @@ pg_stat_activity| SELECT s.datid,
     s.query_id,
     s.query,
     s.backend_type
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_all_indexes| SELECT c.oid AS relid,
@@ -1871,8 +1871,14 @@ pg_stat_gssapi| SELECT s.pid,
     s.gss_auth AS gss_authenticated,
     s.gss_princ AS principal,
     s.gss_enc AS encrypted
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id)
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
   WHERE (s.client_port IS NOT NULL);
+pg_stat_network_traffic| SELECT s.pid,
+    s.rx_raw_bytes,
+    s.tx_raw_bytes,
+    s.rx_compressed_bytes,
+    s.tx_compressed_bytes
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes);
 pg_stat_progress_analyze| SELECT s.pid,
     s.datid,
     d.datname,
@@ -2052,7 +2058,7 @@ pg_stat_replication| SELECT s.pid,
     w.sync_priority,
     w.sync_state,
     w.reply_time
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
      JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_replication_slots| SELECT s.slot_name,
@@ -2086,7 +2092,7 @@ pg_stat_ssl| SELECT s.pid,
     s.ssl_client_dn AS client_dn,
     s.ssl_client_serial AS client_serial,
     s.ssl_issuer_dn AS issuer_dn
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id)
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, query_id, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
   WHERE (s.client_port IS NOT NULL);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 83a3e404254..9dec3431ce9 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -139,7 +139,7 @@ sub mkvcbuild
 	  keywords.c kwlookup.c link-canary.c md5_common.c
 	  pg_get_line.c pg_lzcompress.c pg_prng.c pgfnames.c psprintf.c relpath.c
 	  rmtree.c saslprep.c scram-common.c string.c stringinfo.c unicode_norm.c
-	  username.c wait_error.c wchar.c);
+	  username.c wait_error.c wchar.c z_stream.c zpq_stream.c);
 
 	if ($solution->{options}->{openssl})
 	{
-- 
2.25.1

>From e32a9655ea7892c3f89abbdd4a401bc8172ef507 Mon Sep 17 00:00:00 2001
From: usernamedt <usernam...@protonmail.ch>
Date: Thu, 30 Dec 2021 15:59:01 +0500
Subject: [PATCH 2/4] Add ZSTD support

---
 src/Makefile.global.in        |   1 +
 src/backend/Makefile          |   4 +
 src/common/z_stream.c         | 228 ++++++++++++++++++++++++++++++++++
 src/interfaces/libpq/Makefile |   5 +
 src/tools/msvc/Solution.pm    |   3 +-
 5 files changed, 240 insertions(+), 1 deletion(-)

diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 1b3dc97e97d..da3abeddc94 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -197,6 +197,7 @@ with_system_tzdata = @with_system_tzdata@
 with_uuid	= @with_uuid@
 with_zlib	= @with_zlib@
 with_lz4    = @with_lz4@
+with_zstd   = @with_zstd@
 enable_rpath	= @enable_rpath@
 enable_nls	= @enable_nls@
 enable_debug	= @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index f90e078b6ed..d7041c9aede 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -55,6 +55,10 @@ ifeq ($(with_systemd),yes)
 LIBS += -lsystemd
 endif
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
 ifeq ($(with_lz4),yes)
 LIBS += -llz4
 endif
diff --git a/src/common/z_stream.c b/src/common/z_stream.c
index b75f194a1cf..98f4e4d1d9e 100644
--- a/src/common/z_stream.c
+++ b/src/common/z_stream.c
@@ -89,6 +89,231 @@ struct ZStream
 	bool		not_flushed;
 };
 
+#if HAVE_LIBZSTD
+
+#include <stdlib.h>
+#include <zstd.h>
+
+/*
+ * Maximum allowed back-reference distance, expressed as power of 2.
+ * This setting controls max compressor/decompressor window size.
+ * More details https://github.com/facebook/zstd/blob/v1.4.7/lib/zstd.h#L536
+ */
+#define ZSTD_WINDOWLOG_LIMIT 23 /* set max window size to 8MB */
+
+
+typedef struct ZS_ZSTD_CStream
+{
+	ZSTD_CStream *stream;
+	char const *error;			/* error message */
+}			ZS_ZSTD_CStream;
+
+typedef struct ZS_ZSTD_DStream
+{
+	ZSTD_DStream *stream;
+	char const *error;			/* error message */
+}			ZS_ZSTD_DStream;
+
+static void *
+zstd_create_compressor(int level)
+{
+	size_t		rc;
+	ZS_ZSTD_CStream *c_stream = (ZS_ZSTD_CStream *) malloc(sizeof(ZS_ZSTD_CStream));
+
+	c_stream->stream = ZSTD_createCStream();
+	rc = ZSTD_initCStream(c_stream->stream, level);
+	if (ZSTD_isError(rc))
+	{
+		ZSTD_freeCStream(c_stream->stream);
+		free(c_stream);
+		return NULL;
+	}
+#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3
+	ZSTD_CCtx_setParameter(c_stream->stream, ZSTD_c_windowLog, ZSTD_WINDOWLOG_LIMIT);
+#endif
+	c_stream->error = NULL;
+	return c_stream;
+}
+
+static void *
+zstd_create_decompressor()
+{
+	size_t		rc;
+	ZS_ZSTD_DStream *d_stream = (ZS_ZSTD_DStream *) malloc(sizeof(ZS_ZSTD_DStream));
+
+	d_stream->stream = ZSTD_createDStream();
+	rc = ZSTD_initDStream(d_stream->stream);
+	if (ZSTD_isError(rc))
+	{
+		ZSTD_freeDStream(d_stream->stream);
+		free(d_stream);
+		return NULL;
+	}
+#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3
+	ZSTD_DCtx_setParameter(d_stream->stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT);
+#endif
+	d_stream->error = NULL;
+	return d_stream;
+}
+
+static ssize_t
+zstd_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	ZS_ZSTD_DStream *ds = (ZS_ZSTD_DStream *) d_stream;
+	ZSTD_inBuffer in;
+	ZSTD_outBuffer out;
+	size_t		rc;
+
+	in.src = src;
+	in.pos = 0;
+	in.size = src_size;
+
+	out.dst = dst;
+	out.pos = 0;
+	out.size = dst_size;
+
+	rc = ZSTD_decompressStream(ds->stream, &out, &in);
+
+	*src_processed = in.pos;
+	*dst_processed = out.pos;
+	if (ZSTD_isError(rc))
+	{
+		ds->error = ZSTD_getErrorName(rc);
+		return ZS_DECOMPRESS_ERROR;
+	}
+
+	if (rc == 0)
+	{
+		return ZS_STREAM_END;
+	}
+
+	if (out.pos == out.size)
+	{
+		/*
+		 * if `output.pos == output.size`, there might be some data left
+		 * within internal buffers
+		 */
+		return ZS_DATA_PENDING;
+	}
+	return ZS_OK;
+}
+
+static ssize_t
+zstd_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream;
+	ZSTD_inBuffer in;
+	ZSTD_outBuffer out;
+
+	in.src = src;
+	in.pos = 0;
+	in.size = src_size;
+
+	out.dst = dst;
+	out.pos = 0;
+	out.size = dst_size;
+
+	if (in.pos < src_size)		/* Has something to compress in input buffer */
+	{
+		size_t		rc = ZSTD_compressStream(cs->stream, &out, &in);
+
+		*dst_processed = out.pos;
+		*src_processed = in.pos;
+		if (ZSTD_isError(rc))
+		{
+			cs->error = ZSTD_getErrorName(rc);
+			return ZS_COMPRESS_ERROR;
+		}
+	}
+
+	if (in.pos == src_size)		/* All data is compressed: flush internal zstd
+								 * buffer */
+	{
+		size_t		tx_not_flushed = ZSTD_flushStream(cs->stream, &out);
+
+		*dst_processed = out.pos;
+		if (tx_not_flushed > 0)
+		{
+			return ZS_DATA_PENDING;
+		}
+	}
+
+	return ZS_OK;
+}
+
+static ssize_t
+zstd_end(void *c_stream, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	size_t		tx_not_flushed;
+	ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream;
+	ZSTD_outBuffer output;
+
+	output.dst = dst;
+	output.pos = 0;
+	output.size = dst_size;
+
+	do
+	{
+		tx_not_flushed = ZSTD_endStream(cs->stream, &output);
+	} while ((tx_not_flushed > 0) && (output.pos < output.size));
+
+	*dst_processed = output.pos;
+
+	if (tx_not_flushed > 0)
+	{
+		return ZS_DATA_PENDING;
+	}
+	return ZS_OK;
+}
+
+static void
+zstd_free_compressor(void *c_stream)
+{
+	ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream;
+
+	if (cs != NULL)
+	{
+		ZSTD_freeCStream(cs->stream);
+		free(cs);
+	}
+}
+
+static void
+zstd_free_decompressor(void *d_stream)
+{
+	ZS_ZSTD_DStream *ds = (ZS_ZSTD_DStream *) d_stream;
+
+	if (ds != NULL)
+	{
+		ZSTD_freeDStream(ds->stream);
+		free(ds);
+	}
+}
+
+static char const *
+zstd_compress_error(void *c_stream)
+{
+	ZS_ZSTD_CStream *cs = (ZS_ZSTD_CStream *) c_stream;
+
+	return cs->error;
+}
+
+static char const *
+zstd_decompress_error(void *d_stream)
+{
+	ZS_ZSTD_DStream *ds = (ZS_ZSTD_DStream *) d_stream;
+
+	return ds->error;
+}
+
+static char const *
+zstd_name(void)
+{
+	return "zstd";
+}
+
+#endif
+
 #if HAVE_LIBZ
 
 #include <stdlib.h>
@@ -424,6 +649,9 @@ no_compression_name(void)
  */
 static ZAlgorithm const zs_algorithms[] =
 {
+#if HAVE_LIBZSTD
+	{zstd_name, zstd_create_compressor, zstd_create_decompressor, zstd_decompress, zstd_compress, zstd_free_compressor, zstd_free_decompressor, zstd_compress_error, zstd_decompress_error, zstd_end},
+#endif
 #if HAVE_LIBZ
 	{zlib_name, zlib_create_compressor, zlib_create_decompressor, zlib_decompress, zlib_compress, zlib_free_compressor, zlib_free_decompressor, zlib_error, zlib_error, zlib_end},
 #endif
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index 27875cbee0f..9b1db2c0dc5 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -35,6 +35,11 @@ LIBS += -llz4
 SHLIB_LINK += -llz4
 endif
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+SHLIB_LINK += -lzstd
+endif
+
 ifeq ($(with_zlib),yes)
 LIBS += -lz
 SHLIB_LINK += -lz
diff --git a/src/tools/msvc/Solution.pm b/src/tools/msvc/Solution.pm
index c2acb58df0e..f35b29f8ebd 100644
--- a/src/tools/msvc/Solution.pm
+++ b/src/tools/msvc/Solution.pm
@@ -293,7 +293,8 @@ sub GenerateFiles
 		HAVE_LIBXML2                                => undef,
 		HAVE_LIBXSLT                                => undef,
 		HAVE_LIBZ                   => $self->{options}->{zlib} ? 1 : undef,
-		HAVE_LIBZSTD                => undef,
+		HAVE_LIBZSTD                => $self->{options}->{zstd} ? 1 : undef,
+		HAVE_LINK                   => undef,
 		HAVE_LOCALE_T               => 1,
 		HAVE_LONG_INT_64            => undef,
 		HAVE_LONG_LONG_INT_64       => 1,
-- 
2.25.1

>From 707f406252e2b38699677081bd0da9bf5a19b04e Mon Sep 17 00:00:00 2001
From: usernamedt <usernam...@yandex-team.com>
Date: Wed, 12 Jan 2022 23:14:02 +0500
Subject: [PATCH 3/4] Turn on zlib compression for CI test runs

---
 src/backend/utils/misc/guc_tables.c |  2 +-
 src/common/zpq_stream.c             | 11 +++++------
 src/interfaces/libpq/fe-connect.c   |  2 +-
 3 files changed, 7 insertions(+), 8 deletions(-)

diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index bef045e5be9..e1e5cab63af 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -4453,7 +4453,7 @@ struct config_string ConfigureNamesString[] =
 			NULL
 		},
 		&libpq_compress_algorithms,
-		"off",
+		"zlib",
 		check_libpq_compression, NULL, NULL
 	},
 
diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c
index a712200e453..9bdb3527072 100644
--- a/src/common/zpq_stream.c
+++ b/src/common/zpq_stream.c
@@ -156,9 +156,11 @@ zpq_build_msg_compression_map(ZpqStream * zpq)
 
 	for (i = 0; i < 256; i++)
 	{
-		zpq->compressor_by_msg_type[i] = -1;
+		/* Compress all messages during the CI test runs */
+		zpq->compressor_by_msg_type[i] = 0;
 	}
 
+	// XXX why is this looping around compressors and overwriting the same bytes??
 	for (i = 0; i < zpq->n_compressors; i++)
 	{
 		/* compress CopyData, DataRow and Query messages */
@@ -186,11 +188,8 @@ zpq_build_msg_compression_map(ZpqStream * zpq)
 static inline int
 zpq_choose_compressor(ZpqStream * zpq, char msg_type, uint32 msg_len)
 {
-	if (msg_len >= ZPQ_COMPRESS_THRESHOLD)
-	{
-		return zpq->compressor_by_msg_type[(unsigned char) msg_type];
-	}
-	return -1;
+    /* Compress messages with any length during the CI test runs */
+    return zpq->compressor_by_msg_type[(unsigned char) msg_type];
 }
 
 /*
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index c50f9ac602c..f8f831f6665 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -337,7 +337,7 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 		"Replication", "D", 5,
 	offsetof(struct pg_conn, replication)},
 
-	{"compression", "PGCOMPRESSION", "off", NULL,
+	{"compression", "PGCOMPRESSION", "zlib", NULL,
 		"Libpq-compression", "", 16,
 	offsetof(struct pg_conn, compression)},
 
-- 
2.25.1

>From 7f49573af55ea8824c919cf12271175cb8983497 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Sat, 12 Nov 2022 16:20:59 -0600
Subject: [PATCH 4/4] rebase and convert to meson

---
 src/backend/Makefile                          | 12 ------------
 src/backend/libpq/pqcomm.c                    |  4 ++--
 src/backend/utils/misc/guc_funcs.c            |  4 ++--
 src/backend/utils/misc/guc_tables.c           |  4 +++-
 src/backend/utils/misc/postgresql.conf.sample |  1 +
 src/common/meson.build                        |  4 +++-
 6 files changed, 11 insertions(+), 18 deletions(-)

diff --git a/src/backend/Makefile b/src/backend/Makefile
index d7041c9aede..181c217fae4 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -55,18 +55,6 @@ ifeq ($(with_systemd),yes)
 LIBS += -lsystemd
 endif
 
-ifeq ($(with_zstd),yes)
-LIBS += -lzstd
-endif
-
-ifeq ($(with_lz4),yes)
-LIBS += -llz4
-endif
-
-ifeq ($(with_zlib),yes)
-LIBS += -lz
-endif
-
 ##########################################################################
 
 all: submake-libpgport submake-catalog-headers submake-utils-headers postgres $(POSTGRES_IMP)
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 9bf4672950c..55fede3dc7d 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -111,7 +111,7 @@ int			Unix_socket_permissions;
 char	   *Unix_socket_group;
 
 /* GUC variable containing the allowed compression algorithms list (separated by comma) */
-char	   *libpq_compress_algorithms;
+char	   *libpq_compress_algorithms = "on";
 
 /* Where the Unix socket files are (list of palloc'd strings) */
 static List *sock_paths = NIL;
@@ -2209,7 +2209,7 @@ retry:
 	return true;
 }
 
-PG_FUNCTION_INFO_V1(pg_compression_algorithm);
+// PG_FUNCTION_INFO_V1(pg_compression_algorithm);
 
 Datum
 pg_compression_algorithm(PG_FUNCTION_ARGS)
diff --git a/src/backend/utils/misc/guc_funcs.c b/src/backend/utils/misc/guc_funcs.c
index 5585f992702..767764bfcbf 100644
--- a/src/backend/utils/misc/guc_funcs.c
+++ b/src/backend/utils/misc/guc_funcs.c
@@ -1052,9 +1052,9 @@ show_all_file_settings(PG_FUNCTION_ARGS)
 
 
 #include "common/zpq_stream.h"
-static bool check_libpq_compression(char **newval, void **extra, GucSource source);
+bool check_libpq_compression(char **newval, void **extra, GucSource source);
 
-static bool
+bool
 check_libpq_compression(char **newval, void **extra, GucSource source)
 {
 	zpq_compressor *compressors;
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index e1e5cab63af..d270d339e0e 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -3691,6 +3691,8 @@ struct config_real ConfigureNamesReal[] =
 };
 
 
+extern bool check_libpq_compression(char **newval, void **extra, GucSource source);
+
 struct config_string ConfigureNamesString[] =
 {
 	{
@@ -4453,7 +4455,7 @@ struct config_string ConfigureNamesString[] =
 			NULL
 		},
 		&libpq_compress_algorithms,
-		"zlib",
+		"on",
 		check_libpq_compression, NULL, NULL
 	},
 
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 868d21c351e..da3d0bfcc16 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -742,6 +742,7 @@
 #dynamic_library_path = '$libdir'
 #gin_fuzzy_search_limit = 0
 
+#libpq_compression = on
 
 #------------------------------------------------------------------------------
 # LOCK MANAGEMENT
diff --git a/src/common/meson.build b/src/common/meson.build
index 1c9b8a3a018..c6b3bae6b54 100644
--- a/src/common/meson.build
+++ b/src/common/meson.build
@@ -30,6 +30,8 @@ common_sources = files(
   'username.c',
   'wait_error.c',
   'wchar.c',
+  'z_stream.c',
+  'zpq_stream.c',
 )
 
 if ssl.found()
@@ -132,7 +134,7 @@ pgcommon_variants = {
   '_shlib': default_lib_args + {
     'pic': true,
     'sources': common_sources_frontend_shlib,
-    'dependencies': [frontend_common_code],
+    'dependencies': [frontend_common_code, lz4],
   },
 }
 
-- 
2.25.1

Reply via email to