On 17.12.2020 16:39, Denis Smirnov wrote:
Hello all,

I’ve finally read the whole thread (it was huge). It is extremely sad that this 
patch hang without progress for such a long time. It seems that the main 
problem in discussion is that everyone has its own view what problems should be 
solve with this patch. Here are some of positions (not all of them):

1. Add a compression for networks with a bad bandwidth (and make a patch as 
simple and maintainable as possible) - author’s position.
2. Don’t change current network protocol and related code much.
3. Refactor compression API (and network compression as well)
4. Solve cloud provider’s problems: on demand buy network bandwidth with CPU 
utilisation and vice versa.

All of these requirements have a different nature and sometimes conflict with 
each other. Without clearly formed requirements this patch would never be 
released.

Anyway, I have rebased it to the current master branch, applied pgindent, 
tested on MacOS and fixed a MacOS specific problem with strcpy in 
build_compressors_list(): it has an undefined behaviour when source and 
destination strings overlap.
-       *client_compressors = src = dst = strdup(value);
+       *client_compressors = src = strdup(value);
+       dst = strdup(value);

According to my very simple tests with randomly generated data, zstd gives 
about 3x compression (zlib has a little worse compression ratio and a little 
bigger CPU utilisation). It seems to be a normal ratio for any streaming data - 
Greenplum also uses zstd/zlib to compress append optimised tables and 
compression ratio is usually about 3-5x. Also according to my Greenplum 
experience, the most commonly used zstd ratio is 1, while for zlib it is 
usually in a range of 1-5. CPU and execution time were not affected much 
according to uncompressed data (but my tests were very simple and they should 
not be treated as reliable).



Best regards,
Denis Smirnov | Developer
s...@arenadata.io
Arenadata | Godovikova 9-17, Moscow 129085 Russia


Thank you very much for reporting the problem.
Sorry, but you fix is entirely correct: it is necessary to assign dst to *client_compressors, not  *src. Also extra strdup requires correspondent memory deallocation. I prepared new version of the patch with this fix + some other code refactoring requested by reviewers (like sending ack message in more standard way).

I am maintaining this code in g...@github.com:postgrespro/libpq_compression.git repository. I will be pleased if anybody, who wants to suggest any bug fixes/improvements of libpq compression, create pull requests: it will be much easier for me to merge them.

--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/configure b/configure
index dd64692..9f70773 100755
--- a/configure
+++ b/configure
@@ -700,6 +700,7 @@ LD
 LDFLAGS_SL
 LDFLAGS_EX
 with_zlib
+with_zstd
 with_system_tzdata
 with_libxslt
 XML2_LIBS
@@ -867,6 +868,7 @@ with_libxml
 with_libxslt
 with_system_tzdata
 with_zlib
+with_zstd
 with_gnu_ld
 enable_largefile
 '
@@ -8571,6 +8573,85 @@ fi
 
 
 
+#
+# ZStd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+  withval=$with_zstd;
+  case $withval in
+    yes)
+      ;;
+    no)
+      :
+      ;;
+    *)
+      as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
+      ;;
+  esac
+
+else
+  with_zstd=no
+
+fi
+
+
+
+
+if test "$with_zstd" = yes ; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5
+$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd  $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_compress ();
+int
+main ()
+{
+return ZSTD_compress ();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_lib_zstd_ZSTD_compress=yes
+else
+  ac_cv_lib_zstd_ZSTD_compress=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+  LIBS="-lzstd $LIBS"
+
+else
+  as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5
+fi
+
+fi
+
+
 
 #
 # Zlib
diff --git a/configure.ac b/configure.ac
index 748fb50..4e2435c 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1000,6 +1000,13 @@ PGAC_ARG_BOOL(with, zlib, yes,
 AC_SUBST(with_zlib)
 
 #
+# Zstd
+#
+PGAC_ARG_BOOL(with, zstd, no,
+              [use zstd])
+AC_SUBST(with_zstd)
+
+#
 # Assignments
 #
 
@@ -1186,6 +1193,13 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_zstd" = yes; then
+  AC_CHECK_LIB(zstd, ZSTD_decompressStream, [],
+               [AC_MSG_ERROR([zstd library not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.])])
+fi
+
 if test "$enable_spinlocks" = yes; then
   AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.])
 else
@@ -1400,6 +1414,13 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_zstd" = yes; then
+  AC_CHECK_HEADER(zstd.h, [], [AC_MSG_ERROR([zstd header not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support.])])
+fi
+
 if test "$with_gssapi" = yes ; then
   AC_CHECK_HEADERS(gssapi/gssapi.h, [],
 	[AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])])
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 2d88d06..0fc11c1 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8911,7 +8911,7 @@ DO $d$
     END;
 $d$;
 ERROR:  invalid option "password"
-HINT:  Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size
+HINT:  Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, compression, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size
 CONTEXT:  SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')"
 PL/pgSQL function inline_code_block line 3 at EXECUTE
 -- If we add a password for our user mapping instead, we should get a different
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 3795c57..e0feb36 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -974,6 +974,24 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-libpq-compression" xreflabel="libpq_compression">
+      <term><varname>libpq_compression</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>libpq_compression</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        When this parameter is <literal>on</literal> (default), the <productname>PostgreSQL</productname>
+        server can switch on compression of traffic between server and client if it is requested by client.
+        Client sends to the server list of compression algorithms supported by frontend library,
+        server chooses one which is supported by backend library and sends it in compression acknowledgement message
+        to the client. This option allows to reject compression request even if it is supported by server
+        (due to security, CPU consumption or whatever else reasons...).
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
      </sect2>
 
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 9d4b6ab..a8f6919 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -1225,6 +1225,22 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
       </listitem>
      </varlistentry>
 
+     <varlistentry id="libpq-connect-compression" xreflabel="compression">
+      <term><literal>compression</literal></term>
+      <listitem>
+      <para>
+        Request compression of libpq traffic. Client sends to the server list of compression algorithms, supported by client library.
+        If server supports one of this algorithms, then it acknowledges use of this algorithm and then all libpq messages send both from client to server and
+        visa versa will be compressed. If server is not supporting any of the suggested algorithms, then it rejects client request to use compression
+        and it is up to the client whether to continue work without compression or report error.
+        Supported compression algorithms are chosen at configure time. Right now two libraries are supported: zlib (default) and zstd (if Postgres was
+        configured with --with-zstd option). In both cases streaming mode is used.
+        By default compression is disabled. Please notice that using compression together with SSL may add extra vulnerabilities:
+        <ulink url="https://en.wikipedia.org/wiki/CRIME";>CRIME</ulink>
+      </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="libpq-connect-client-encoding" xreflabel="client_encoding">
       <term><literal>client_encoding</literal></term>
       <listitem>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index cee2888..d5fd532 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -92,6 +92,15 @@
    such as <command>COPY</command>.
   </para>
 
+  <para>
+    It is possible to compress protocol data to reduce traffic and speed-up client-server interaction.
+    Compression is especially useful for importing/exporting data to/from database using COPY command
+    and for replication (both physical and logical). Also compression can reduce server response time
+    in case of queries returning large amount of data (for example returning JSON, BLOBs, text,...)
+    Right now two libraries are supported: zlib (default) and zstd (if Postgres was
+    configured with --with-zstd option).
+  </para>
+
  <sect2 id="protocol-message-concepts">
   <title>Messaging Overview</title>
 
@@ -263,6 +272,27 @@
      </varlistentry>
 
      <varlistentry>
+      <term>CompressionAck</term>
+      <listitem>
+       <para>
+         Server acknowledges using compression for client-server communication protocol.
+         Compression can be requested by client by including "compression" option in connection string.
+         It can be just boolean values enabling or disabling compression
+         ("true"/"false", "on"/"off", "yes"/"no", "1"/"0"), "auto" or explicit list of compression algorithms
+         separated by comma with optional specification of compression level: "zlib,zstd:5".
+         If compression algorithm is not explicitly specified the most efficient one supported both by
+         client and server is chosen. Client sends to the server list of compression algorithms,
+         supported by client library.
+         If server supports one of this algorithms, then it acknowledges use of this algorithm and
+         all subsequent libpq messages send both from client to server and
+         visa versa will be compressed. Server selects most efficient algorithm among list specified by client and returns to the client
+         index of chosen algorithm in this list. If server is not supporting any of the suggested algorithms, then it replies with -1
+         and it is up to the client whether to continue work without compression or report error.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term>AuthenticationOk</term>
       <listitem>
        <para>
@@ -3398,6 +3428,57 @@ following:
 </listitem>
 </varlistentry>
 
+<varlistentry>
+<term>
+CompressionAck (B)
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('z')
+</term>
+<listitem>
+<para>
+  Acknowledge use of compression for protocol data. Client sends to the server list of compression algorithms, supported by client library.
+  If server supports one of this algorithms, then it acknowledges use of this algorithm and all subsequent libpq messages send both from client to server and
+  visa versa will be compressed. Server selects most efficient algorithm among list specified by client and returns to the client
+  index of chosen algorithm in this list. If server is not supporting any of the suggested algorithms, then it replies with -1
+  and it is up to the client whether to continue work without compression or report error.
+  After receiving this message with algorithm index other than -1, both server and client are switched to compression mode
+  and exchange compressed messages.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32
+</term>
+<listitem>
+<para>
+                Length of message contents in bytes, including self.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Byte1
+</term>
+<listitem>
+<para>
+        Index of algorithm in the list of supported algotihms specified by client or -1 if none of them is supported.
+</para>
+</listitem>
+</varlistentry>
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+
 
 <varlistentry>
 <term>
@@ -5964,6 +6045,22 @@ StartupMessage (F)
 </para>
 </listitem>
 </varlistentry>
+<varlistentry>
+<term>
+                <literal>_pq_.compression</literal>
+</term>
+<listitem>
+<para>
+                        Request compression of libpq traffic. Value is list of compression algorithms supported by client with optional
+                        specification of compression level: <literal>"zlib,zstd:5"</literal>.
+                        When connecting to an older backend, which does not support compression, or in case when the backend support compression
+                        but for some reason wants to disable it, the backend will just ignore the _pq_.compression parameter and won’t send
+                        the compressionAck message to the frontend.
+                        By default compression is disabled. Please notice that using compression together with SSL may add extra vulnerabilities:
+                        <ulink url="https://en.wikipedia.org/wiki/CRIME";>CRIME</ulink>.
+</para>
+</listitem>
+</varlistentry>
 </variablelist>
 
                 In addition to the above, other parameters may be listed.
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 7ca1e9a..9e11599 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -196,6 +196,7 @@ with_llvm	= @with_llvm@
 with_system_tzdata = @with_system_tzdata@
 with_uuid	= @with_uuid@
 with_zlib	= @with_zlib@
+with_zstd   = @with_zstd@
 enable_rpath	= @enable_rpath@
 enable_nls	= @enable_nls@
 enable_debug	= @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index 9706a95..f32a780 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -54,6 +54,14 @@ ifeq ($(with_systemd),yes)
 LIBS += -lsystemd
 endif
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+endif
+
 ##########################################################################
 
 all: submake-libpgport submake-catalog-headers submake-utils-headers postgres $(POSTGRES_IMP)
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 2e4aa1c..9d05ec2 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -770,6 +770,15 @@ CREATE VIEW pg_stat_activity AS
         LEFT JOIN pg_database AS D ON (S.datid = D.oid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
 
+CREATE VIEW pg_stat_network_traffic AS
+    SELECT
+	    S.pid,
+        S.rx_raw_bytes,
+        S.tx_raw_bytes,
+        S.rx_compressed_bytes,
+        S.tx_compressed_bytes
+    FROM pg_stat_get_activity(NULL) AS S;
+
 CREATE VIEW pg_stat_replication AS
     SELECT
             S.pid,
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index d7de962..471f786 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -71,6 +71,7 @@
 #include <signal.h>
 #include <fcntl.h>
 #include <grp.h>
+#include <pgstat.h>
 #include <unistd.h>
 #include <sys/file.h>
 #include <sys/socket.h>
@@ -88,11 +89,14 @@
 
 #include "common/ip.h"
 #include "libpq/libpq.h"
+#include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "port/pg_bswap.h"
 #include "storage/ipc.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
+#include "utils/builtins.h"
+#include "common/zpq_stream.h"
 
 /*
  * Cope with the various platform-specific ways to spell TCP keepalive socket
@@ -118,6 +122,7 @@
  */
 int			Unix_socket_permissions;
 char	   *Unix_socket_group;
+bool        libpq_compression;
 
 /* Where the Unix socket files are (list of palloc'd strings) */
 static List *sock_paths = NIL;
@@ -141,6 +146,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
 static int	PqRecvLength;		/* End of data available in PqRecvBuffer */
 
+static ZpqStream* PqStream;
+
+
 /*
  * Message status
  */
@@ -183,6 +191,108 @@ const PQcommMethods *PqCommMethods = &PqCommSocketMethods;
 
 WaitEventSet *FeBeWaitSet;
 
+static ssize_t write_compressed(void* arg, void const* data, size_t size)
+{
+	ssize_t rc = secure_write((Port*)arg, (void*)data, size);
+	if (rc > 0)
+		pgstat_report_network_traffic(0, 0, 0, rc);
+	return rc;
+}
+
+static ssize_t read_compressed(void* arg, void* data, size_t size)
+{
+	ssize_t rc = secure_read((Port*)arg, data, size);
+	if (rc > 0)
+		pgstat_report_network_traffic(0, 0, rc, 0);
+	return rc;
+}
+
+/*
+ * Send to the client index of chosen compression algorithm
+ */
+static void
+SendCompressionACK(int algorithm)
+{
+	StringInfoData buf;
+	pq_beginmessage(&buf, 'z');
+	pq_sendbyte(&buf, (uint8)algorithm);
+	pq_endmessage(&buf);
+	pq_flush();
+}
+
+/* --------------------------------
+ *		pq_configure - configure connection using port settings
+ *
+ * Right now only compression is toggled in the configure.
+ * Function returns 0 in case of success, non-null in case of error
+ * --------------------------------
+ */
+int
+pq_configure(Port* port)
+{
+	char* client_compression_algorithms = port->compression_algorithms;
+
+	/*
+	 * If client request compression, it sends list of supported compression algorithms separated by comma.
+	 */
+	if (client_compression_algorithms && libpq_compression)
+	{
+		int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+		int impl = -1;
+		char** server_compression_algorithms = zpq_get_supported_algorithms();
+		int index = -1;
+		char* protocol_extension = strchr(client_compression_algorithms, ';');
+
+		/* No protocol extension are currently supported */
+		if (protocol_extension)
+			*protocol_extension = '\0';
+
+		for (int i = 0; *client_compression_algorithms; i++)
+		{
+			char* sep = strchr(client_compression_algorithms, ',');
+			char* level;
+			if (sep != NULL)
+				*sep = '\0';
+
+			level = strchr(client_compression_algorithms, ':');
+			if (level != NULL)
+			{
+				*level = '\0'; /* compression level is ignored now */
+				if (sscanf("%d", level+1, &compression_level) != 1)
+					ereport(LOG,
+							(errmsg("Invalid compression level: %s", level+1)));
+			}
+			for (impl = 0; server_compression_algorithms[impl] != NULL; impl++)
+			{
+				if (pg_strcasecmp(client_compression_algorithms, server_compression_algorithms[impl]) == 0)
+				{
+					index = i;
+					goto SendCompressionAck;
+				}
+			}
+
+			if (sep != NULL)
+				client_compression_algorithms = sep+1;
+			else
+				break;
+		}
+	  SendCompressionAck:
+		free(server_compression_algorithms);
+		SendCompressionACK(index);
+
+		if (index >= 0) /* Use compression */
+		{
+			PqStream = zpq_create(impl, compression_level, write_compressed, read_compressed, MyProcPort, NULL, 0);
+			if (!PqStream)
+			{
+				ereport(LOG,
+						(errmsg("Failed to initialize compressor %s", server_compression_algorithms[impl])));
+				return -1;
+			}
+		}
+	}
+	return 0;
+}
 
 /* --------------------------------
  *		pq_init - initialize libpq at backend startup
@@ -280,6 +390,9 @@ socket_close(int code, Datum arg)
 		free(MyProcPort->gss);
 #endif							/* ENABLE_GSS || ENABLE_SSPI */
 
+		/* Release compression streams */
+		zpq_free(PqStream);
+
 		/*
 		 * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
 		 * call this, so this is safe when interrupting BackendInitialize().
@@ -919,12 +1032,15 @@ socket_set_nonblocking(bool nonblocking)
 /* --------------------------------
  *		pq_recvbuf - load some bytes into the input buffer
  *
- *		returns 0 if OK, EOF if trouble
+ *      nowait parameter toggles non-blocking mode. 
+ *		returns number of read bytes, EOF if trouble
  * --------------------------------
  */
 static int
-pq_recvbuf(void)
+pq_recvbuf(bool nowait)
 {
+	int			   r;
+
 	if (PqRecvPointer > 0)
 	{
 		if (PqRecvLength > PqRecvPointer)
@@ -940,21 +1056,36 @@ pq_recvbuf(void)
 	}
 
 	/* Ensure that we're in blocking mode */
-	socket_set_nonblocking(false);
+	socket_set_nonblocking(nowait);
 
 	/* Can fill buffer from PqRecvLength and upwards */
 	for (;;)
 	{
-		int			r;
-
-		r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
-						PQ_RECV_BUFFER_SIZE - PqRecvLength);
+		/* If streaming compression is enabled then use correspondent compression read function. */
+		r = PqStream
+			? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
+					   PQ_RECV_BUFFER_SIZE - PqRecvLength)
+			: secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
+						  PQ_RECV_BUFFER_SIZE - PqRecvLength);
 
 		if (r < 0)
 		{
+			if (r == ZPQ_DECOMPRESS_ERROR)
+			{
+				char const* msg = zpq_error(PqStream);
+				if (msg == NULL)
+					msg = "end of stream";
+				ereport(COMMERROR,
+						(errcode_for_socket_access(),
+						 errmsg("failed to decompress data: %s", msg)));
+				return EOF;
+			}
 			if (errno == EINTR)
 				continue;		/* Ok if interrupted */
 
+			if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK))
+				return 0;
+
 			/*
 			 * Careful: an ereport() that tries to write to the client would
 			 * cause recursion to here, leading to stack overflow and core
@@ -975,7 +1106,8 @@ pq_recvbuf(void)
 		}
 		/* r contains number of bytes read, so just incr length */
 		PqRecvLength += r;
-		return 0;
+		pgstat_report_network_traffic(r, 0, 0, 0);
+		return r;
 	}
 }
 
@@ -990,7 +1122,7 @@ pq_getbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)		/* If nothing in buffer, then recv some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer++];
@@ -1009,7 +1141,7 @@ pq_peekbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)		/* If nothing in buffer, then recv some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer];
@@ -1026,48 +1158,15 @@ pq_peekbyte(void)
 int
 pq_getbyte_if_available(unsigned char *c)
 {
-	int			r;
+	int			r = 0;
 
 	Assert(PqCommReadingMsg);
 
-	if (PqRecvPointer < PqRecvLength)
+	if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0)
 	{
 		*c = PqRecvBuffer[PqRecvPointer++];
 		return 1;
 	}
-
-	/* Put the socket into non-blocking mode */
-	socket_set_nonblocking(true);
-
-	r = secure_read(MyProcPort, c, 1);
-	if (r < 0)
-	{
-		/*
-		 * Ok if no data available without blocking or interrupted (though
-		 * EINTR really shouldn't happen with a non-blocking socket). Report
-		 * other errors.
-		 */
-		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
-			r = 0;
-		else
-		{
-			/*
-			 * Careful: an ereport() that tries to write to the client would
-			 * cause recursion to here, leading to stack overflow and core
-			 * dump!  This message must go *only* to the postmaster log.
-			 */
-			ereport(COMMERROR,
-					(errcode_for_socket_access(),
-					 errmsg("could not receive data from client: %m")));
-			r = EOF;
-		}
-	}
-	else if (r == 0)
-	{
-		/* EOF detected */
-		r = EOF;
-	}
-
 	return r;
 }
 
@@ -1088,7 +1187,7 @@ pq_getbytes(char *s, size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1122,7 +1221,7 @@ pq_discardbytes(size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1163,7 +1262,7 @@ pq_getstring(StringInfo s)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv some */
 				return EOF;		/* Failed to recv data */
 		}
 
@@ -1413,13 +1512,19 @@ internal_flush(void)
 	char	   *bufptr = PqSendBuffer + PqSendStart;
 	char	   *bufend = PqSendBuffer + PqSendPointer;
 
-	while (bufptr < bufend)
+	while (bufptr < bufend || zpq_buffered_tx(PqStream) != 0)
+    /* has more data to flush or unsent data in internal compression buffer */
 	{
-		int			r;
-
-		r = secure_write(MyProcPort, bufptr, bufend - bufptr);
-
-		if (r <= 0)
+		int		r;
+		size_t  processed = 0;
+		size_t  available = bufend - bufptr;
+		r = PqStream
+			? zpq_write(PqStream, bufptr, available, &processed)
+			: secure_write(MyProcPort, bufptr, available);
+		bufptr += processed;
+		PqSendStart += processed;
+
+		if (r < 0 || (r == 0 && available))
 		{
 			if (errno == EINTR)
 				continue;		/* Ok if we were interrupted */
@@ -1462,12 +1567,12 @@ internal_flush(void)
 			InterruptPending = 1;
 			return EOF;
 		}
+		pgstat_report_network_traffic(0, r, 0, 0);
 
 		last_reported_send_errno = 0;	/* reset after any successful send */
 		bufptr += r;
 		PqSendStart += r;
 	}
-
 	PqSendStart = PqSendPointer = 0;
 	return 0;
 }
@@ -1484,7 +1589,7 @@ socket_flush_if_writable(void)
 	int			res;
 
 	/* Quick exit if nothing to do */
-	if (PqSendPointer == PqSendStart)
+	if ((PqSendPointer == PqSendStart) && (zpq_buffered_tx(PqStream) == 0))
 		return 0;
 
 	/* No-op if reentrant call */
@@ -1507,7 +1612,7 @@ socket_flush_if_writable(void)
 static bool
 socket_is_send_pending(void)
 {
-	return (PqSendStart < PqSendPointer);
+	return (PqSendStart < PqSendPointer || (zpq_buffered_tx(PqStream) != 0));
 }
 
 /* --------------------------------
@@ -1985,3 +2090,15 @@ pq_settcpusertimeout(int timeout, Port *port)
 
 	return STATUS_OK;
 }
+
+PG_FUNCTION_INFO_V1(pg_compression_algorithm);
+
+Datum
+pg_compression_algorithm(PG_FUNCTION_ARGS)
+{
+	char const* algorithm_name = PqStream ? zpq_algorithm_name(PqStream) : NULL;
+	if (algorithm_name)
+		PG_RETURN_TEXT_P(cstring_to_text(algorithm_name));
+	else
+		PG_RETURN_NULL();
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index e76e627..4714b51 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3115,6 +3115,9 @@ pgstat_bestart(void)
 	lbeentry.st_xact_start_timestamp = 0;
 	lbeentry.st_databaseid = MyDatabaseId;
 
+	lbeentry.st_tx_raw_bytes = lbeentry.st_rx_raw_bytes =
+		lbeentry.st_tx_compressed_bytes = lbeentry.st_rx_compressed_bytes = 0;
+
 	/* We have userid for client-backends, wal-sender and bgworker processes */
 	if (lbeentry.st_backendType == B_BACKEND
 		|| lbeentry.st_backendType == B_WAL_SENDER
@@ -3496,6 +3499,33 @@ pgstat_report_xact_timestamp(TimestampTz tstamp)
 	PGSTAT_END_WRITE_ACTIVITY(beentry);
 }
 
+/*
+ * Report current transaction start timestamp as the specified value.
+ * Zero means there is no active transaction.
+ */
+void
+pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes)
+{
+	volatile PgBackendStatus *beentry = MyBEEntry;
+
+	if (!pgstat_track_activities || !beentry)
+		return;
+
+	/*
+	 * Update my status entry, following the protocol of bumping
+	 * st_changecount before and after.  We use a volatile pointer here to
+	 * ensure the compiler doesn't try to get cute.
+	 */
+	PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
+
+	beentry->st_rx_raw_bytes += rx_raw_bytes;
+	beentry->st_tx_raw_bytes += tx_raw_bytes;
+	beentry->st_rx_compressed_bytes += rx_compressed_bytes;
+	beentry->st_tx_compressed_bytes += tx_compressed_bytes;
+
+	PGSTAT_END_WRITE_ACTIVITY(beentry);
+}
+
 /* ----------
  * pgstat_read_current_status() -
  *
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index b7799ed..330ddaf 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2136,6 +2136,8 @@ retry1:
 				port->database_name = pstrdup(valptr);
 			else if (strcmp(nameptr, "user") == 0)
 				port->user_name = pstrdup(valptr);
+			else if (strcmp(nameptr, "_pq_.compression") == 0)
+				port->compression_algorithms = pstrdup(valptr);
 			else if (strcmp(nameptr, "options") == 0)
 				port->cmdline_options = pstrdup(valptr);
 			else if (strcmp(nameptr, "replication") == 0)
@@ -4443,6 +4445,14 @@ BackendInitialize(Port *port)
 	if (status != STATUS_OK)
 		proc_exit(0);
 
+	if (pq_configure(port))
+	{
+		ereport(COMMERROR,
+				(errcode_for_socket_access(),
+				 errmsg("failed to send compression message: %m")));
+		proc_exit(0);
+	}
+
 	/*
 	 * Now that we have the user and database name, we can set the process
 	 * title for ps.  It's good to do this as early as possible in startup.
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index a210fc9..b93ac11 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -567,7 +567,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_activity(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_ACTIVITY_COLS	30
+#define PG_STAT_GET_ACTIVITY_COLS	34
 	int			num_backends = pgstat_fetch_stat_numbackends();
 	int			curr_backend;
 	int			pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0);
@@ -913,6 +913,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 				values[28] = BoolGetDatum(false);	/* GSS Encryption not in
 													 * use */
 			}
+			values[30] = beentry->st_rx_raw_bytes;
+			values[31] = beentry->st_tx_raw_bytes;
+			values[32] = beentry->st_rx_compressed_bytes;
+			values[33] = beentry->st_tx_compressed_bytes;
 		}
 		else
 		{
@@ -941,6 +945,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 			nulls[27] = true;
 			nulls[28] = true;
 			nulls[29] = true;
+			nulls[30] = true;
+			nulls[31] = true;
+			nulls[32] = true;
+			nulls[33] = true;
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -1140,6 +1148,46 @@ pg_stat_get_backend_start(PG_FUNCTION_ARGS)
 	PG_RETURN_TIMESTAMPTZ(result);
 }
 
+Datum
+pg_stat_get_network_traffic(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_NETWORK_TRAFFIC_COLS	4
+	TupleDesc	tupdesc;
+	Datum		values[PG_STAT_NETWORK_TRAFFIC_COLS];
+	bool		nulls[PG_STAT_NETWORK_TRAFFIC_COLS];
+	int32		beid = PG_GETARG_INT32(0);
+	PgBackendStatus *beentry;
+
+	if ((beentry = pgstat_fetch_stat_beentry(beid)) == NULL)
+		PG_RETURN_NULL();
+	else if (!HAS_PGSTAT_PERMISSIONS(beentry->st_userid))
+		PG_RETURN_NULL();
+
+	/* Initialise values and NULL flags arrays */
+	MemSet(values, 0, sizeof(values));
+	MemSet(nulls, 0, sizeof(nulls));
+
+	/* Initialise attributes information in the tuple descriptor */
+	tupdesc = CreateTemplateTupleDesc(PG_STAT_NETWORK_TRAFFIC_COLS);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "rx_raw_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "tx_raw_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "rx_compressed_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "tx_compressed_bytes",
+					   INT8OID, -1, 0);
+	BlessTupleDesc(tupdesc);
+
+	/* Fill values and NULLs */
+	values[0] = beentry->st_rx_raw_bytes;
+	values[1] = beentry->st_tx_raw_bytes;
+	values[2] = beentry->st_rx_compressed_bytes;
+	values[3] = beentry->st_tx_compressed_bytes;
+
+	/* Returns the record as Datum */
+	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
+}
 
 Datum
 pg_stat_get_backend_client_addr(PG_FUNCTION_ARGS)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index bb34630..83a3c1f 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1290,6 +1290,16 @@ static struct config_bool ConfigureNamesBool[] =
 	},
 
 	{
+		{"libpq_compression", PGC_SIGHUP, CLIENT_CONN_OTHER,
+			gettext_noop("Compress client-server traffic."),
+			NULL
+		},
+		&libpq_compression,
+		true,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"log_checkpoints", PGC_SIGHUP, LOGGING_WHAT,
 			gettext_noop("Logs each checkpoint."),
 			NULL
diff --git a/src/common/Makefile b/src/common/Makefile
index 25c55bd..bc6cba8 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -77,7 +77,8 @@ OBJS_COMMON = \
 	unicode_norm.o \
 	username.o \
 	wait_error.o \
-	wchar.o
+	wchar.o \
+	zpq_stream.o
 
 ifeq ($(with_openssl),yes)
 OBJS_COMMON += \
diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c
new file mode 100644
index 0000000..4595662
--- /dev/null
+++ b/src/common/zpq_stream.c
@@ -0,0 +1,543 @@
+#include "postgres_fe.h"
+#include "common/zpq_stream.h"
+#include "c.h"
+#include "pg_config.h"
+
+/*
+ * Functions implementing streaming compression algorithm
+ */
+typedef struct
+{
+	/*
+	 * Returns name of compression algorithm.
+	 */
+	char const* (*name)(void);
+
+	/*
+	 * Create compression stream with using rx/tx function for fetching/sending compressed data.
+	 * level: compression level
+	 * tx_func: function for writing compressed data in underlying stream
+	 * rx_func: function for receiving compressed data from underlying stream
+	 * arg: context passed to the function
+     * rx_data: received data (compressed data already fetched from input stream)
+	 * rx_data_size: size of data fetched from input stream
+	 */
+	ZpqStream* (*create)(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size);
+
+	/*
+	 * Read up to "size" raw (decompressed) bytes.
+	 * Returns number of decompressed bytes or error code.
+	 * Error code is either ZPQ_DECOMPRESS_ERROR either error code returned by the rx function.
+	 */
+	ssize_t (*read)(ZpqStream *zs, void *buf, size_t size);
+
+	/*
+	 * Write up to "size" raw (decompressed) bytes.
+	 * Returns number of written raw bytes or error code returned by tx function.
+	 * In the last case amount of written raw bytes is stored in *processed.
+	 */
+	ssize_t (*write)(ZpqStream *zs, void const *buf, size_t size, size_t *processed);
+
+	/*
+	 * Free stream created by create function.
+	 */
+	void    (*free)(ZpqStream *zs);
+
+	/*
+	 * Get error message.
+	 */
+	char const* (*error)(ZpqStream *zs);
+
+	/*
+	 * Returns amount of data in internal tx decompression buffer.
+	 */
+	size_t  (*buffered_tx)(ZpqStream *zs);
+
+	/*
+	 * Returns amount of data in internal rx compression buffer.
+	 */
+	size_t  (*buffered_rx)(ZpqStream *zs);
+} ZpqAlgorithm;
+
+struct ZpqStream
+{
+	ZpqAlgorithm const* algorithm;
+};
+
+#if HAVE_LIBZSTD
+
+#include <stdlib.h>
+#include <zstd.h>
+
+#define ZSTD_BUFFER_SIZE (8*1024)
+
+typedef struct ZstdStream
+{
+	ZpqStream      common;
+	ZSTD_CStream*  tx_stream;
+	ZSTD_DStream*  rx_stream;
+	ZSTD_outBuffer tx;
+	ZSTD_inBuffer  rx;
+	size_t         tx_not_flushed; /* Amount of data in internal zstd buffer */
+	size_t         tx_buffered;    /* Data which is consumed by ztd_write but not yet sent */
+	size_t         rx_buffered;    /* Data which is needed for ztd_read */
+	zpq_tx_func    tx_func;
+	zpq_rx_func    rx_func;
+	void*          arg;
+	char const*    rx_error;    /* Decompress error message */
+	size_t         tx_total;
+	size_t         tx_total_raw;
+	size_t         rx_total;
+	size_t         rx_total_raw;
+	char           tx_buf[ZSTD_BUFFER_SIZE];
+	char           rx_buf[ZSTD_BUFFER_SIZE];
+} ZstdStream;
+
+static ZpqStream*
+zstd_create(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size)
+{
+	ZstdStream* zs = (ZstdStream*)malloc(sizeof(ZstdStream));
+
+	zs->tx_stream = ZSTD_createCStream();
+	ZSTD_initCStream(zs->tx_stream, level);
+	zs->rx_stream = ZSTD_createDStream();
+	ZSTD_initDStream(zs->rx_stream);
+	zs->tx.dst = zs->tx_buf;
+	zs->tx.pos = 0;
+	zs->tx.size = ZSTD_BUFFER_SIZE;
+	zs->rx.src = zs->rx_buf;
+	zs->rx.pos = 0;
+	zs->rx.size = 0;
+	zs->rx_func = rx_func;
+	zs->tx_func = tx_func;
+	zs->tx_buffered = 0;
+	zs->rx_buffered = 0;
+	zs->tx_not_flushed = 0;
+	zs->rx_error = NULL;
+	zs->arg = arg;
+	zs->tx_total = zs->tx_total_raw = 0;
+	zs->rx_total = zs->rx_total_raw = 0;
+	zs->rx.size = rx_data_size;
+	Assert(rx_data_size < ZSTD_BUFFER_SIZE);
+	memcpy(zs->rx_buf, rx_data, rx_data_size);
+
+	return (ZpqStream*)zs;
+}
+
+static ssize_t
+zstd_read(ZpqStream *zstream, void *buf, size_t size)
+{
+	ZstdStream* zs = (ZstdStream*)zstream;
+	ssize_t rc;
+	ZSTD_outBuffer out;
+	out.dst = buf;
+	out.pos = 0;
+	out.size = size;
+
+	while (1)
+	{
+		if (zs->rx.pos != zs->rx.size || zs->rx_buffered == 0)
+		{
+			rc = ZSTD_decompressStream(zs->rx_stream, &out, &zs->rx);
+			if (ZSTD_isError(rc))
+			{
+				zs->rx_error = ZSTD_getErrorName(rc);
+				return ZPQ_DECOMPRESS_ERROR;
+			}
+			/* Return result if we fill requested amount of bytes or read operation was performed */
+			if (out.pos != 0)
+			{
+				zs->rx_total_raw += out.pos;
+				zs->rx_buffered = 0;
+				return out.pos;
+			}
+			zs->rx_buffered = rc;
+			if (zs->rx.pos == zs->rx.size)
+			{
+				zs->rx.pos = zs->rx.size = 0; /* Reset rx buffer */
+			}
+		}
+		rc = zs->rx_func(zs->arg, (char*)zs->rx.src + zs->rx.size, ZSTD_BUFFER_SIZE - zs->rx.size);
+		if (rc > 0) /* read fetches some data */
+		{
+			zs->rx.size += rc;
+			zs->rx_total += rc;
+		}
+		else /* read failed */
+		{
+			zs->rx_total_raw += out.pos;
+			return rc;
+		}
+	}
+}
+
+static ssize_t
+zstd_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed)
+{
+	ZstdStream* zs = (ZstdStream*)zstream;
+	ssize_t rc;
+	ZSTD_inBuffer in_buf;
+	in_buf.src = buf;
+	in_buf.pos = 0;
+	in_buf.size = size;
+
+	do
+	{
+		if (zs->tx.pos == 0) /* Compress buffer is empty */
+		{
+			zs->tx.dst = zs->tx_buf; /* Reset pointer to the beginning of buffer */
+
+			if (in_buf.pos < size) /* Has something to compress in input buffer */
+				ZSTD_compressStream(zs->tx_stream, &zs->tx, &in_buf);
+
+			if (in_buf.pos == size) /* All data is compressed: flushed internal zstd buffer */
+			{
+				zs->tx_not_flushed = ZSTD_flushStream(zs->tx_stream, &zs->tx);
+			}
+		}
+		rc = zs->tx_func(zs->arg, zs->tx.dst, zs->tx.pos);
+		if (rc > 0)
+		{
+			zs->tx.pos -= rc;
+			zs->tx.dst = (char*)zs->tx.dst + rc;
+			zs->tx_total += rc;
+		}
+		else
+		{
+			*processed = in_buf.pos;
+			zs->tx_buffered = zs->tx.pos;
+			zs->tx_total_raw += in_buf.pos;
+			return rc;
+		}
+    /* repeat sending while there is some data in input or internal zstd buffer */
+    } while (in_buf.pos < size || zs->tx_not_flushed);
+
+	zs->tx_total_raw += in_buf.pos;
+	zs->tx_buffered = zs->tx.pos;
+	return in_buf.pos;
+}
+
+static void
+zstd_free(ZpqStream *zstream)
+{
+	ZstdStream* zs = (ZstdStream*)zstream;
+	if (zs != NULL)
+	{
+		ZSTD_freeCStream(zs->tx_stream);
+		ZSTD_freeDStream(zs->rx_stream);
+		free(zs);
+	}
+}
+
+static char const*
+zstd_error(ZpqStream *zstream)
+{
+	ZstdStream* zs = (ZstdStream*)zstream;
+	return zs->rx_error;
+}
+
+static size_t
+zstd_buffered_tx(ZpqStream *zstream)
+{
+	ZstdStream* zs = (ZstdStream*)zstream;
+	return zs != NULL ? zs->tx_buffered + zs->tx_not_flushed : 0;
+}
+
+static size_t
+zstd_buffered_rx(ZpqStream *zstream)
+{
+	ZstdStream* zs = (ZstdStream*)zstream;
+	return zs != NULL ? zs->rx.size - zs->rx.pos : 0;
+}
+
+static char const*
+zstd_name(void)
+{
+	return "zstd";
+}
+
+#endif
+
+#if HAVE_LIBZ
+
+#include <stdlib.h>
+#include <zlib.h>
+
+#define ZLIB_BUFFER_SIZE       8192 /* We have to flush stream after each protocol command
+									 * and command is mostly limited by record length,
+									 * which in turn usually less than page size (except TOAST)
+									 */
+
+typedef struct ZlibStream
+{
+	ZpqStream      common;
+
+	z_stream tx;
+	z_stream rx;
+
+	zpq_tx_func    tx_func;
+	zpq_rx_func    rx_func;
+	void*          arg;
+    unsigned       tx_deflate_pending;
+	size_t         tx_buffered;
+
+	Bytef          tx_buf[ZLIB_BUFFER_SIZE];
+	Bytef          rx_buf[ZLIB_BUFFER_SIZE];
+} ZlibStream;
+
+static ZpqStream*
+zlib_create(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size)
+{
+	int rc;
+	ZlibStream* zs = (ZlibStream*)malloc(sizeof(ZlibStream));
+	memset(&zs->tx, 0, sizeof(zs->tx));
+	zs->tx.next_out = zs->tx_buf;
+	zs->tx.avail_out = ZLIB_BUFFER_SIZE;
+	zs->tx_buffered = 0;
+	rc = deflateInit(&zs->tx, level);
+	if (rc != Z_OK)
+	{
+		free(zs);
+		return NULL;
+	}
+	Assert(zs->tx.next_out == zs->tx_buf && zs->tx.avail_out == ZLIB_BUFFER_SIZE);
+
+	memset(&zs->rx, 0, sizeof(zs->tx));
+	zs->rx.next_in = zs->rx_buf;
+	zs->rx.avail_in = ZLIB_BUFFER_SIZE;
+	zs->tx_deflate_pending = 0;
+	rc = inflateInit(&zs->rx);
+	if (rc != Z_OK)
+	{
+		free(zs);
+		return NULL;
+	}
+	Assert(zs->rx.next_in == zs->rx_buf && zs->rx.avail_in == ZLIB_BUFFER_SIZE);
+
+	zs->rx.avail_in = rx_data_size;
+	Assert(rx_data_size < ZLIB_BUFFER_SIZE);
+	memcpy(zs->rx_buf, rx_data, rx_data_size);
+
+	zs->rx_func = rx_func;
+	zs->tx_func = tx_func;
+	zs->arg = arg;
+
+	return (ZpqStream*)zs;
+}
+
+static ssize_t
+zlib_read(ZpqStream *zstream, void *buf, size_t size)
+{
+	ZlibStream* zs = (ZlibStream*)zstream;
+	int rc;
+	zs->rx.next_out = (Bytef *)buf;
+	zs->rx.avail_out = size;
+
+	while (1)
+	{
+		if (zs->rx.avail_in != 0) /* If there is some data in receiver buffer, then decompress it */
+		{
+			rc = inflate(&zs->rx, Z_SYNC_FLUSH);
+			if (rc != Z_OK && rc != Z_BUF_ERROR)
+			{
+				return ZPQ_DECOMPRESS_ERROR;
+			}
+			if (zs->rx.avail_out != size)
+			{
+				return size - zs->rx.avail_out;
+			}
+			if (zs->rx.avail_in == 0)
+			{
+				zs->rx.next_in = zs->rx_buf;
+			}
+		}
+		else
+		{
+			zs->rx.next_in = zs->rx_buf;
+		}
+		rc = zs->rx_func(zs->arg, zs->rx.next_in + zs->rx.avail_in, zs->rx_buf + ZLIB_BUFFER_SIZE - zs->rx.next_in - zs->rx.avail_in);
+		if (rc > 0)
+		{
+			zs->rx.avail_in += rc;
+		}
+		else
+		{
+			return rc;
+		}
+	}
+}
+
+static ssize_t
+zlib_write(ZpqStream *zstream, void const *buf, size_t size, size_t *processed)
+{
+	ZlibStream* zs = (ZlibStream*)zstream;
+    int rc;
+	zs->tx.next_in = (Bytef *)buf;
+	zs->tx.avail_in = size;
+	do
+	{
+		if (zs->tx.avail_out == ZLIB_BUFFER_SIZE) /* Compress buffer is empty */
+		{
+			zs->tx.next_out = zs->tx_buf; /* Reset pointer to the  beginning of buffer */
+
+			if (zs->tx.avail_in != 0 || (zs->tx_deflate_pending > 0)) /* Has something in input or deflate buffer */
+			{
+				rc = deflate(&zs->tx, Z_SYNC_FLUSH);
+				Assert(rc == Z_OK);
+                deflatePending(&zs->tx, &zs->tx_deflate_pending, Z_NULL); /* check if any data left in deflate buffer */
+				zs->tx.next_out = zs->tx_buf; /* Reset pointer to the  beginning of buffer */
+			}
+		}
+		rc = zs->tx_func(zs->arg, zs->tx.next_out, ZLIB_BUFFER_SIZE - zs->tx.avail_out);
+		if (rc > 0)
+		{
+			zs->tx.next_out += rc;
+			zs->tx.avail_out += rc;
+		}
+		else
+		{
+			*processed = size - zs->tx.avail_in;
+			zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out;
+			return rc;
+		}
+    /* repeat sending while there is some data in input or deflate buffer */
+	} while (zs->tx.avail_in != 0 || zs->tx_deflate_pending > 0);
+
+	zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out;
+
+	return size - zs->tx.avail_in;
+}
+
+static void
+zlib_free(ZpqStream *zstream)
+{
+	ZlibStream* zs = (ZlibStream*)zstream;
+	if (zs != NULL)
+	{
+		inflateEnd(&zs->rx);
+		deflateEnd(&zs->tx);
+		free(zs);
+	}
+}
+
+static char const*
+zlib_error(ZpqStream *zstream)
+{
+	ZlibStream* zs = (ZlibStream*)zstream;
+	return zs->rx.msg;
+}
+
+static size_t
+zlib_buffered_tx(ZpqStream *zstream)
+{
+	ZlibStream* zs = (ZlibStream*)zstream;
+	return zs != NULL ? zs->tx_buffered + zs->tx_deflate_pending : 0;
+}
+
+static size_t
+zlib_buffered_rx(ZpqStream *zstream)
+{
+	ZlibStream* zs = (ZlibStream*)zstream;
+	return zs != NULL ? zs->rx.avail_in : 0;
+}
+
+static char const*
+zlib_name(void)
+{
+	return "zlib";
+}
+
+#endif
+
+static char const*
+no_compression_name(void)
+{
+	return NULL;
+}
+
+/*
+ * Array with all supported compression algorithms.
+ */
+static ZpqAlgorithm const zpq_algorithms[] =
+{
+#if HAVE_LIBZSTD
+	{zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, zstd_buffered_tx, zstd_buffered_rx},
+#endif
+#if HAVE_LIBZ
+	{zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, zlib_buffered_tx, zlib_buffered_rx},
+#endif
+	{no_compression_name}
+};
+
+/*
+ * Index of used compression algorithm in zpq_algorithms array.
+ */
+ZpqStream*
+zpq_create(int algorithm_impl, int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char* rx_data, size_t rx_data_size)
+{
+	ZpqStream* stream = zpq_algorithms[algorithm_impl].create(level, tx_func, rx_func, arg, rx_data, rx_data_size);
+	if (stream)
+		stream->algorithm = &zpq_algorithms[algorithm_impl];
+	return stream;
+}
+
+ssize_t
+zpq_read(ZpqStream *zs, void *buf, size_t size)
+{
+	return zs->algorithm->read(zs, buf, size);
+}
+
+ssize_t
+zpq_write(ZpqStream *zs, void const *buf, size_t size, size_t* processed)
+{
+	return zs->algorithm->write(zs, buf, size, processed);
+}
+
+void
+zpq_free(ZpqStream *zs)
+{
+	if (zs)
+		zs->algorithm->free(zs);
+}
+
+char const*
+zpq_error(ZpqStream *zs)
+{
+	return zs->algorithm->error(zs);
+}
+
+
+size_t
+zpq_buffered_rx(ZpqStream *zs)
+{
+	return zs ? zs->algorithm->buffered_rx(zs) : 0;
+}
+
+size_t
+zpq_buffered_tx(ZpqStream *zs)
+{
+	return zs ? zs->algorithm->buffered_tx(zs) : 0;
+}
+
+/*
+ * Get list of the supported algorithms.
+ */
+char**
+zpq_get_supported_algorithms(void)
+{
+	size_t n_algorithms = sizeof(zpq_algorithms)/sizeof(*zpq_algorithms);
+	char** algorithm_names = malloc(n_algorithms*sizeof(char*));
+
+	for (size_t i = 0; i < n_algorithms; i++)
+	{
+		algorithm_names[i] = (char*)zpq_algorithms[i].name();
+	}
+
+	return algorithm_names;
+}
+
+char const*
+zpq_algorithm_name(ZpqStream *zs)
+{
+	return zs ? zs->algorithm->name() : NULL;
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 33dacfd..baa4d6d 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5239,9 +5239,9 @@
   proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'int4',
-  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid}',
+  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4,int8,int8,int8,int8}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}',
   prosrc => 'pg_stat_get_activity' },
 { oid => '3318',
   descr => 'statistics: information about progress of backends running maintenance command',
@@ -5507,6 +5507,14 @@
   proargnames => '{wal_buffers_full,stats_reset}',
   prosrc => 'pg_stat_get_wal' },
 
+{ oid => '1137', descr => 'statistics: information about network traffic',
+  proname => 'pg_stat_get_network_traffic', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'record', proargtypes => 'int4',
+  proallargtypes => '{int4,int8,int8,int8,int8}',
+  proargmodes => '{i,o,o,o,o}',
+  proargnames => '{_beid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}',
+  prosrc => 'pg_stat_get_network_traffic' },
+
 { oid => '2306', descr => 'statistics: information about SLRU caches',
   proname => 'pg_stat_get_slru', prorows => '100', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
@@ -5638,6 +5646,10 @@
   proname => 'pg_tablespace_location', provolatile => 's', prorettype => 'text',
   proargtypes => 'oid', prosrc => 'pg_tablespace_location' },
 
+{ oid => '4142', descr => 'connection compression algorithm',
+  proname => 'pg_compression_algorithm', provolatile => 's', prorettype => 'text',
+  proargtypes => '', prosrc => 'pg_compression_algorithm' },
+
 { oid => '1946',
   descr => 'convert bytea value into some ascii-only text string',
   proname => 'encode', prorettype => 'text', proargtypes => 'bytea text',
diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h
new file mode 100644
index 0000000..27aef0a
--- /dev/null
+++ b/src/include/common/zpq_stream.h
@@ -0,0 +1,36 @@
+/*
+ * zpq_stream.h
+ *     Streaiming compression for libpq
+ */
+
+#ifndef ZPQ_STREAM_H
+#define ZPQ_STREAM_H
+
+#include <stdlib.h>
+
+#define ZPQ_IO_ERROR (-1)
+#define ZPQ_DECOMPRESS_ERROR (-2)
+
+#define ZPQ_DEFAULT_COMPRESSION_LEVEL (1)
+
+struct ZpqStream;
+typedef struct ZpqStream ZpqStream;
+
+typedef ssize_t(*zpq_tx_func)(void* arg, void const* data, size_t size);
+typedef ssize_t(*zpq_rx_func)(void* arg, void* data, size_t size);
+
+ZpqStream* zpq_create(int impl, int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void* arg, char* rx_data, size_t rx_data_size);
+ssize_t zpq_read(ZpqStream* zs, void* buf, size_t size);
+ssize_t zpq_write(ZpqStream* zs, void const* buf, size_t size, size_t* processed);
+char const* zpq_error(ZpqStream* zs);
+size_t zpq_buffered_rx(ZpqStream* zs);
+size_t zpq_buffered_tx(ZpqStream* zs);
+void zpq_free(ZpqStream* zs);
+char const* zpq_algorithm_name(ZpqStream* zs);
+
+/*
+  Returns zero terminated array with compression algorithms names
+*/
+char** zpq_get_supported_algorithms(void);
+
+#endif
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 0a23281..5289f9f 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -170,6 +170,8 @@ typedef struct Port
 	int			keepalives_count;
 	int			tcp_user_timeout;
 
+	char*       compression_algorithms; /* Compression algorithms supported by client */
+
 	/*
 	 * GSSAPI structures.
 	 */
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index b115247..224ff3d 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -63,6 +63,7 @@ extern void StreamClose(pgsocket sock);
 extern void TouchSocketFiles(void);
 extern void RemoveSocketFiles(void);
 extern void pq_init(void);
+extern int  pq_configure(Port* port);
 extern int	pq_getbytes(char *s, size_t len);
 extern int	pq_getstring(StringInfo s);
 extern void pq_startmsgread(void);
diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h
index 781d86c..c0d15f1 100644
--- a/src/include/libpq/pqcomm.h
+++ b/src/include/libpq/pqcomm.h
@@ -150,6 +150,7 @@ typedef struct StartupPacket
 } StartupPacket;
 
 extern bool Db_user_namespace;
+extern bool libpq_compression;
 
 /*
  * In protocol 3.0 and later, the startup packet length is not fixed, but
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index de8f838..b336eeb 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -349,6 +349,9 @@
 /* Define to 1 if you have the `link' function. */
 #undef HAVE_LINK
 
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
 /* Define to 1 if the system has the type `locale_t'. */
 #undef HAVE_LOCALE_T
 
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 257e515..b43efc0 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -1190,6 +1190,12 @@ typedef struct PgBackendStatus
 	/* application name; MUST be null-terminated */
 	char	   *st_appname;
 
+	/* client-server traffic information */
+	uint64      st_rx_raw_bytes;
+	uint64      st_tx_raw_bytes;
+	uint64      st_rx_compressed_bytes;
+	uint64      st_tx_compressed_bytes;
+
 	/*
 	 * Current command string; MUST be null-terminated. Note that this string
 	 * possibly is truncated in the middle of a multi-byte character. As
@@ -1403,6 +1409,7 @@ extern void pgstat_report_activity(BackendState state, const char *cmd_str);
 extern void pgstat_report_tempfile(size_t filesize);
 extern void pgstat_report_appname(const char *appname);
 extern void pgstat_report_xact_timestamp(TimestampTz tstamp);
+extern void pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes);
 extern const char *pgstat_get_wait_event(uint32 wait_event_info);
 extern const char *pgstat_get_wait_event_type(uint32 wait_event_info);
 extern const char *pgstat_get_backend_current_activity(int pid, bool checkUser);
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index 4ac5f4b..be8cb34 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -29,6 +29,20 @@ endif
 # The MSVC build system scrapes OBJS from this file.  If you change any of
 # the conditional additions of files to OBJS, update Mkvcbuild.pm to match.
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+SHLIB_LINK += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+SHLIB_LINK += -lz
+endif
+
+# We can't use Makefile variables here because the MSVC build system scrapes
+# OBJS from this file.
+
+
 OBJS = \
 	$(WIN32RES) \
 	fe-auth-scram.o \
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index e7781d0..7486482 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -24,6 +24,7 @@
 #include "common/ip.h"
 #include "common/link-canary.h"
 #include "common/scram-common.h"
+#include "common/zpq_stream.h"
 #include "common/string.h"
 #include "fe-auth.h"
 #include "libpq-fe.h"
@@ -350,6 +351,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 		"Replication", "D", 5,
 	offsetof(struct pg_conn, replication)},
 
+	{"compression", "PGCOMPRESSION", NULL, NULL,
+		"Libpq-compression", "", 16,
+	offsetof(struct pg_conn, compression)},
+
 	{"target_session_attrs", "PGTARGETSESSIONATTRS",
 		DefaultTargetSessionAttrs, NULL,
 		"Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
@@ -458,6 +463,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
 void
 pqDropConnection(PGconn *conn, bool flushInput)
 {
+	/* Release compression streams */
+	zpq_free(conn->zstream);
+	conn->zstream = NULL;
+
 	/* Drop any SSL state */
 	pqsecure_close(conn);
 
@@ -2148,6 +2157,12 @@ connectDBComplete(PGconn *conn)
 				return 1;		/* success! */
 
 			case PGRES_POLLING_READING:
+			    /* if there is some buffered RX data in ZpqStream
+			     * then don't proceed to pqWaitTimed */
+			    if (zpq_buffered_rx(conn->zstream)) {
+			        break;
+			    }
+
 				ret = pqWaitTimed(1, 0, conn, finish_time);
 				if (ret == -1)
 				{
@@ -3216,11 +3231,62 @@ keep_going:						/* We will come back to here until there is
 				 */
 				conn->inCursor = conn->inStart;
 
-				/* Read type byte */
-				if (pqGetc(&beresp, conn))
+				while (1)
 				{
-					/* We'll come back when there is more data */
-					return PGRES_POLLING_READING;
+					/* Read type byte */
+					if (pqGetc(&beresp, conn))
+					{
+						/* We'll come back when there is more data */
+						return PGRES_POLLING_READING;
+					}
+
+					if (beresp == 'z') /* Switch on compression */
+					{
+						int index;
+						char resp;
+						/* Read message length word */
+						if (pqGetInt(&msgLength, 4, conn))
+						{
+							/* We'll come back when there is more data */
+							return PGRES_POLLING_READING;
+						}
+						if (msgLength != 5)
+						{
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "expected compression algorithm specification message length is 5 bytes, but %d is received\n"),
+											  msgLength);
+							goto error_return;
+						}
+						pqGetc(&resp, conn);
+						index = resp;
+						if (index == (char)-1)
+						{
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "server is not supported requested compression algorithms %s\n"),
+											  conn->compression);
+							goto error_return;
+						}
+						Assert(!conn->zstream);
+						conn->zstream = zpq_create(conn->compressors[index].impl,
+												   conn->compressors[index].level,
+												   (zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn,
+												   &conn->inBuffer[conn->inCursor], conn->inEnd-conn->inCursor);
+						if (!conn->zstream)
+						{
+							char** supported_algorithms = zpq_get_supported_algorithms();
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "failed to initialize compressor %s\n"),
+											  supported_algorithms[conn->compressors[index].impl]);
+							free(supported_algorithms);
+							goto error_return;
+						}
+						/* reset buffer */
+						conn->inStart = conn->inCursor = conn->inEnd = 0;
+					} else
+						break;
 				}
 
 				/*
@@ -4020,6 +4086,8 @@ freePGconn(PGconn *conn)
 		free(conn->dbName);
 	if (conn->replication)
 		free(conn->replication);
+	if (conn->compression)
+		free(conn->compression);
 	if (conn->pguser)
 		free(conn->pguser);
 	if (conn->pgpass)
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index eea0237..00ccefb 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1810,8 +1810,8 @@ PQgetResult(PGconn *conn)
 		 * EOF indication.  We expect therefore that this won't result in any
 		 * undue delay in reporting a previous write failure.)
 		 */
-		if (flushResult ||
-			pqWait(true, false, conn) ||
+		if (flushResult || (zpq_buffered_rx(conn->zstream) == 0 &&
+			pqWait(true, false, conn)) ||
 			pqReadData(conn) < 0)
 		{
 			/*
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 4ffc7f3..4c69aec 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,6 +53,8 @@
 #include "pg_config_paths.h"
 #include "port/pg_bswap.h"
 
+#include  <common/zpq_stream.h>
+
 static int	pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
 static int	pqSendSome(PGconn *conn, int len);
 static int	pqSocketCheck(PGconn *conn, int forRead, int forWrite,
@@ -60,6 +62,16 @@ static int	pqSocketCheck(PGconn *conn, int forRead, int forWrite,
 static int	pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
 
 /*
+ * Use zpq_read if compression is switched on
+ */
+#define pq_read_conn(conn)												\
+	(conn->zstream														\
+	 ? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd,			\
+				conn->inBufSize - conn->inEnd)							\
+	 : pqsecure_read(conn, conn->inBuffer + conn->inEnd,				\
+					 conn->inBufSize - conn->inEnd))
+
+/*
  * PQlibVersion: return the libpq version number
  */
 int
@@ -664,10 +676,17 @@ pqReadData(PGconn *conn)
 
 	/* OK, try to read some data */
 retry3:
-	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+	nread = pq_read_conn(conn);
 	if (nread < 0)
 	{
+		if (nread == ZPQ_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("decompress error: %s\n"),
+							  zpq_error(conn->zstream));
+			return -1;
+		}
+
 		switch (SOCK_ERRNO)
 		{
 			case EINTR:
@@ -759,10 +778,18 @@ retry3:
 	 * arrived.
 	 */
 retry4:
-	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+	nread = pq_read_conn(conn);
+
 	if (nread < 0)
 	{
+		if (nread == ZPQ_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+							  libpq_gettext("decompress error: %s\n"),
+							  zpq_error(conn->zstream));
+			return -1;
+		}
+
 		switch (SOCK_ERRNO)
 		{
 			case EINTR:
@@ -875,12 +902,17 @@ pqSendSome(PGconn *conn, int len)
 	}
 
 	/* while there's still data to send */
-	while (len > 0)
+	while (len > 0 || zpq_buffered_tx(conn->zstream))
 	{
 		int			sent;
-
+		size_t      processed = 0;
+        /*
+		 * Use zpq_write if compression is switched on
+		 */
+		sent = conn->zstream
+			? zpq_write(conn->zstream, ptr, len, &processed)
 #ifndef WIN32
-		sent = pqsecure_write(conn, ptr, len);
+			: pqsecure_write(conn, ptr, len);
 #else
 
 		/*
@@ -888,8 +920,11 @@ pqSendSome(PGconn *conn, int len)
 		 * failure-point appears to be different in different versions of
 		 * Windows, but 64k should always be safe.
 		 */
-		sent = pqsecure_write(conn, ptr, Min(len, 65536));
+			: pqsecure_write(conn, ptr, Min(len, 65536));
 #endif
+		ptr += processed;
+		len -= processed;
+		remaining -= processed;
 
 		if (sent < 0)
 		{
@@ -943,7 +978,7 @@ pqSendSome(PGconn *conn, int len)
 			remaining -= sent;
 		}
 
-		if (len > 0)
+		if (len > 0 || sent < 0 || zpq_buffered_tx(conn->zstream))
 		{
 			/*
 			 * We didn't send it all, wait till we can send more.
@@ -1034,6 +1069,8 @@ pqFlush(PGconn *conn)
 int
 pqWait(int forRead, int forWrite, PGconn *conn)
 {
+	if (forRead && conn->inCursor < conn->inEnd)
+		return 0;
 	return pqWaitTimed(forRead, forWrite, conn, (time_t) -1);
 }
 
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 1696525..0558d50 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -1679,7 +1679,7 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
 			if (async)
 				return 0;
 			/* Need to load more data */
-			if (pqWait(true, false, conn) ||
+			if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
 				pqReadData(conn) < 0)
 				return -2;
 			continue;
@@ -1737,7 +1737,7 @@ pqGetline3(PGconn *conn, char *s, int maxlen)
 	while ((status = PQgetlineAsync(conn, s, maxlen - 1)) == 0)
 	{
 		/* need to load more data */
-		if (pqWait(true, false, conn) ||
+		if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
 			pqReadData(conn) < 0)
 		{
 			*s = '\0';
@@ -1975,7 +1975,7 @@ pqFunctionCall3(PGconn *conn, Oid fnid,
 		if (needInput)
 		{
 			/* Wait for some data to arrive (or for the channel to close) */
-			if (pqWait(true, false, conn) ||
+			if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
 				pqReadData(conn) < 0)
 				break;
 		}
@@ -2135,6 +2135,150 @@ pqBuildStartupPacket3(PGconn *conn, int *packetlen,
 }
 
 /*
+ * Build comma-separated list of compression algorithms suggested by client to the server.
+ * It can be either explicitly specified by user in connection string, either
+ * include all algorithms supported by clit library.
+ * This functions returns true if compression string is successfully parsed and
+ * stores comma-separated list of algorithms in *client_compressors.
+ * If compression is disabled, then NULL is assigned to  *client_compressors.
+ * Also it creates array of compressor descriptors, each element of which corresponds
+ * the correspondent algorithm name in *client_compressors list. This array is stored in PGconn
+ * and is used during handshake when compassion acknowledgment response is received from the server.
+ */
+static bool
+build_compressors_list(PGconn *conn, char** client_compressors, bool build_descriptors)
+{
+	char** supported_algorithms = zpq_get_supported_algorithms();
+	char* value = conn->compression;
+	int n_supported_algorithms;
+	int total_len = 0;
+	int i;
+
+	for (n_supported_algorithms = 0; supported_algorithms[n_supported_algorithms] != NULL; n_supported_algorithms++)
+	{
+		total_len += strlen(supported_algorithms[n_supported_algorithms])+1;
+	}
+
+	if (pg_strcasecmp(value, "true") == 0 ||
+		pg_strcasecmp(value, "yes") == 0 ||
+		pg_strcasecmp(value, "on") == 0 ||
+		pg_strcasecmp(value, "any") == 0 ||
+		pg_strcasecmp(value, "1") == 0)
+	{
+		/* Compression is enabled: choose algorithm automatically */
+		char* p;
+
+		if (n_supported_algorithms == 0)
+		{
+			*client_compressors = NULL; /* no compressors are avaialable */
+			conn->compressors = NULL;
+			return true;
+		}
+		*client_compressors = p = malloc(total_len);
+		if (build_descriptors)
+			conn->compressors = malloc(n_supported_algorithms*sizeof(pg_conn_compressor));
+		for (i = 0; i < n_supported_algorithms; i++)
+		{
+			strcpy(p, supported_algorithms[i]);
+			p += strlen(p);
+			*p++ = ',';
+			if (build_descriptors)
+			{
+				conn->compressors[i].impl = i;
+				conn->compressors[i].level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+			}
+		}
+		p[-1] = '\0';
+		return true;
+	}
+	else if (*value == 0 ||
+			 pg_strcasecmp(value, "false") == 0 ||
+			 pg_strcasecmp(value, "no") == 0 ||
+			 pg_strcasecmp(value, "off") == 0 ||
+			 pg_strcasecmp(value, "0") == 0)
+	{
+		/* Compression is disabled */
+		*client_compressors = NULL;
+		conn->compressors = NULL;
+		return true;
+	}
+	else
+	{
+		/* List of compresison algorithms separated by commas */
+		char *src, *dst;
+		int n_suggested_algorithms = 0;
+		char* suggested_algorithms = strdup(value);
+		src = suggested_algorithms;
+		*client_compressors = dst = strdup(value);
+
+		if (build_descriptors)
+			conn->compressors = malloc(n_supported_algorithms*sizeof(pg_conn_compressor));
+
+		while (*src != '\0')
+		{
+			char* sep = strchr(src, ',');
+			char* col;
+			int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+
+			if (sep != NULL)
+				*sep = '\0';
+
+			strcpy(dst, src);
+
+			col = strchr(src, ':');
+			if (col != NULL)
+			{
+				*col = '\0';
+				if (sscanf(col+1, "%d", &compression_level) != 1 && !build_descriptors)
+				{
+					fprintf(stderr,
+							libpq_gettext("WARNING: invlaid compression level %s in compression option '%s'\n"),
+							col+1, value);
+					return false;
+				}
+			}
+			for (i = 0; supported_algorithms[i] != NULL; i++)
+			{
+				if (pg_strcasecmp(src, supported_algorithms[i]) == 0)
+				{
+					if (build_descriptors)
+					{
+						conn->compressors[n_suggested_algorithms].impl = i;
+						conn->compressors[n_suggested_algorithms].level = compression_level;
+					}
+					n_suggested_algorithms += 1;
+					dst += strlen(dst);
+					*dst++ = ',';
+					break;
+				}
+			}
+			if (sep)
+				src = sep+1;
+			else
+				break;
+		}
+		free(suggested_algorithms);
+		if (n_suggested_algorithms == 0)
+		{
+			if (!build_descriptors)
+				fprintf(stderr,
+						libpq_gettext("WARNING: none of specified algirthms %s is supported by client\n"),
+						value);
+			else
+			{
+				free(conn->compressors);
+				conn->compressors = NULL;
+			}
+			free(*client_compressors);
+			*client_compressors = NULL;
+			return false;
+		}
+		dst[-1] = '\0';
+		return true;
+	}
+}
+
+/*
  * Build a startup packet given a filled-in PGconn structure.
  *
  * We need to figure out how much space is needed, then fill it in.
@@ -2180,6 +2324,17 @@ build_startup_packet(const PGconn *conn, char *packet,
 		ADD_STARTUP_OPTION("replication", conn->replication);
 	if (conn->pgoptions && conn->pgoptions[0])
 		ADD_STARTUP_OPTION("options", conn->pgoptions);
+	if (conn->compression && conn->compression[0])
+	{
+		char* client_compression_algorithms;
+		if (build_compressors_list((PGconn*)conn, &client_compression_algorithms, packet == NULL))
+		{
+			if (client_compression_algorithms)
+			{
+				ADD_STARTUP_OPTION("_pq_.compression", client_compression_algorithms);
+			}
+		}
+	}
 	if (conn->send_appname)
 	{
 		/* Use appname if present, otherwise use fallback */
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 1de91ae..89128df 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -40,6 +40,7 @@
 /* include stuff common to fe and be */
 #include "getaddrinfo.h"
 #include "libpq/pqcomm.h"
+#include "common/zpq_stream.h"
 /* include stuff found in fe only */
 #include "pqexpbuffer.h"
 
@@ -317,6 +318,16 @@ typedef struct pg_conn_host
 								 * found in password file. */
 } pg_conn_host;
 
+
+/*
+ * Descriptors of compression algorithms chosen by client
+ */
+typedef struct pg_conn_compressor
+{
+	int        impl;            /* compression implementation index */
+	int        level;           /* compression level */
+} pg_conn_compressor;
+
 /*
  * PGconn stores all the state data associated with a single connection
  * to a backend.
@@ -370,6 +381,9 @@ struct pg_conn
 	char	   *ssl_min_protocol_version;	/* minimum TLS protocol version */
 	char	   *ssl_max_protocol_version;	/* maximum TLS protocol version */
 
+	char	   *compression;    /* stream compression (boolean value, "any" or list of compression algorithms separated by comma) */
+	pg_conn_compressor* compressors; /* descriptors of compression algorithms chosen by client */
+
 	/* Type of connection to make.  Possible values: any, read-write. */
 	char	   *target_session_attrs;
 
@@ -527,6 +541,9 @@ struct pg_conn
 
 	/* Buffer for receiving various parts of messages */
 	PQExpBufferData workBuffer; /* expansible string */
+
+	/* Compression stream */
+	ZpqStream* zstream;
 };
 
 /* PGcancel stores all data necessary to cancel a connection. A copy of this
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 097ff5d..aa9359c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1762,7 +1762,7 @@ pg_stat_activity| SELECT s.datid,
     s.backend_xmin,
     s.query,
     s.backend_type
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_all_indexes| SELECT c.oid AS relid,
@@ -1867,8 +1867,14 @@ pg_stat_gssapi| SELECT s.pid,
     s.gss_auth AS gss_authenticated,
     s.gss_princ AS principal,
     s.gss_enc AS encrypted
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
   WHERE (s.client_port IS NOT NULL);
+pg_stat_network_traffic| SELECT s.pid,
+    s.rx_raw_bytes,
+    s.tx_raw_bytes,
+    s.rx_compressed_bytes,
+    s.tx_compressed_bytes
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes);
 pg_stat_progress_analyze| SELECT s.pid,
     s.datid,
     d.datname,
@@ -2015,7 +2021,7 @@ pg_stat_replication| SELECT s.pid,
     w.sync_priority,
     w.sync_state,
     w.reply_time
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
      JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_replication_slots| SELECT s.slot_name,
@@ -2046,7 +2052,7 @@ pg_stat_ssl| SELECT s.pid,
     s.ssl_client_dn AS client_dn,
     s.ssl_client_serial AS client_serial,
     s.ssl_issuer_dn AS issuer_dn
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
   WHERE (s.client_port IS NOT NULL);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 90594bd..daac58c 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -123,7 +123,7 @@ sub mkvcbuild
 	  config_info.c controldata_utils.c d2s.c encnames.c exec.c
 	  f2s.c file_perm.c file_utils.c hashfn.c ip.c jsonapi.c
 	  keywords.c kwlookup.c link-canary.c md5.c
-	  pg_get_line.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
+	  pg_get_line.c zpq_stream.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
 	  saslprep.c scram-common.c string.c stringinfo.c unicode_norm.c username.c
 	  wait_error.c wchar.c);
 

Reply via email to