Hello all,

I’ve finally read the whole thread (it was huge). It is extremely sad that this 
patch hang without progress for such a long time. It seems that the main 
problem in discussion is that everyone has its own view what problems should be 
solve with this patch. Here are some of positions (not all of them):

1. Add a compression for networks with a bad bandwidth (and make a patch as 
simple and maintainable as possible) - author’s position.
2. Don’t change current network protocol and related code much.
3. Refactor compression API (and network compression as well)
4. Solve cloud provider’s problems: on demand buy network bandwidth with CPU 
utilisation and vice versa.

All of these requirements have a different nature and sometimes conflict with 
each other. Without clearly formed requirements this patch would never be 
released.

Anyway, I have rebased it to the current master branch, applied pgindent, 
tested on MacOS and fixed a MacOS specific problem with strcpy in 
build_compressors_list(): it has an undefined behaviour when source and 
destination strings overlap.
-       *client_compressors = src = dst = strdup(value);
+       *client_compressors = src = strdup(value);
+       dst = strdup(value);

According to my very simple tests with randomly generated data, zstd gives 
about 3x compression (zlib has a little worse compression ratio and a little 
bigger CPU utilisation). It seems to be a normal ratio for any streaming data - 
Greenplum also uses zstd/zlib to compress append optimised tables and 
compression ratio is usually about 3-5x. Also according to my Greenplum 
experience, the most commonly used zstd ratio is 1, while for zlib it is 
usually in a range of 1-5. CPU and execution time were not affected much 
according to uncompressed data (but my tests were very simple and they should 
not be treated as reliable).
From e33b9c99a6fdac54df40aa0c14eb43b6b1f3709d Mon Sep 17 00:00:00 2001
From: Denis Smirnov <s...@arenadata.io>
Date: Wed, 16 Dec 2020 22:02:49 +1000
Subject: [PATCH] Rebase patch-27 to actual master and fix strcpy

1. Patch-27 had conflicts with actual master. They were fixed.
2. On MacOS psql failed with EXC_BAD_INSTRUCTION on "compression"
   option. The problem was in strcpy OS-specific implementation:
   "The source and destination strings should not overlap, as the
   behavior is undefined."
3. pgindent was applied to the code.
---
 configure                                     | 102 +++-
 configure.ac                                  |  21 +
 .../postgres_fdw/expected/postgres_fdw.out    |   2 +-
 doc/src/sgml/config.sgml                      |  18 +
 doc/src/sgml/libpq.sgml                       |  16 +
 doc/src/sgml/protocol.sgml                    |  97 +++
 src/Makefile.global.in                        |   1 +
 src/backend/Makefile                          |   8 +
 src/backend/catalog/system_views.sql          |   9 +
 src/backend/libpq/pqcomm.c                    | 245 ++++++--
 src/backend/postmaster/pgstat.c               |  30 +
 src/backend/postmaster/postmaster.c           |  13 +-
 src/backend/utils/adt/pgstatfuncs.c           |  50 +-
 src/backend/utils/misc/guc.c                  |  10 +
 src/common/Makefile                           |   3 +-
 src/common/zpq_stream.c                       | 578 ++++++++++++++++++
 src/include/catalog/pg_proc.dat               |  18 +-
 src/include/common/zpq_stream.h               |  36 ++
 src/include/libpq/libpq-be.h                  |   3 +
 src/include/libpq/libpq.h                     |   1 +
 src/include/libpq/pqcomm.h                    |   1 +
 src/include/pg_config.h.in                    |   3 +
 src/include/pgstat.h                          |   7 +
 src/interfaces/libpq/Makefile                 |  14 +
 src/interfaces/libpq/fe-connect.c             |  83 ++-
 src/interfaces/libpq/fe-exec.c                |   4 +-
 src/interfaces/libpq/fe-misc.c                |  54 +-
 src/interfaces/libpq/fe-protocol3.c           | 162 ++++-
 src/interfaces/libpq/libpq-int.h              |  20 +
 src/test/regress/expected/rules.out           |  14 +-
 src/tools/msvc/Mkvcbuild.pm                   |   2 +-
 31 files changed, 1532 insertions(+), 93 deletions(-)
 create mode 100644 src/common/zpq_stream.c
 create mode 100644 src/include/common/zpq_stream.h

diff --git a/configure b/configure
index 11a4284e5b..a64125f3ec 100755
--- a/configure
+++ b/configure
@@ -699,6 +699,7 @@ LD
 LDFLAGS_SL
 LDFLAGS_EX
 with_zlib
+with_zstd
 with_system_tzdata
 with_libxslt
 XML2_LIBS
@@ -866,6 +867,7 @@ with_libxml
 with_libxslt
 with_system_tzdata
 with_zlib
+with_zstd
 with_gnu_ld
 enable_largefile
 '
@@ -8573,40 +8575,120 @@ fi
 
 
 #
-# Zlib
+# Assignments
 #
 
+CPPFLAGS="$CPPFLAGS $INCLUDES"
+LDFLAGS="$LDFLAGS $LIBDIRS"
 
 
-# Check whether --with-zlib was given.
-if test "${with_zlib+set}" = set; then :
-  withval=$with_zlib;
+
+
+#
+# ZStd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+  withval=$with_zstd;
   case $withval in
     yes)
-      :
       ;;
     no)
       :
       ;;
     *)
-      as_fn_error $? "no argument expected for --with-zlib option" "$LINENO" 5
+      as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
       ;;
   esac
 
 else
-  with_zlib=yes
+  with_zstd=no
 
 fi
 
 
 
 
+if test "$with_zstd" = yes ; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in 
-lzstd" >&5
+$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd  $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_compress ();
+int
+main ()
+{
+return ZSTD_compress ();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_lib_zstd_ZSTD_compress=yes
+else
+  ac_cv_lib_zstd_ZSTD_compress=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: 
$ac_cv_lib_zstd_ZSTD_compress" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+  LIBS="-lzstd $LIBS"
+
+else
+  as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5
+fi
+
+fi
+
+
+
 #
-# Assignments
+# Zlib
 #
 
-CPPFLAGS="$CPPFLAGS $INCLUDES"
-LDFLAGS="$LDFLAGS $LIBDIRS"
+
+
+# Check whether --with-zlib was given.
+if test "${with_zlib+set}" = set; then :
+  withval=$with_zlib;
+  case $withval in
+    yes)
+      :
+      ;;
+    no)
+      :
+      ;;
+    *)
+      as_fn_error $? "no argument expected for --with-zlib option" "$LINENO" 5
+      ;;
+  esac
+
+else
+  with_zlib=yes
+
+fi
 
 
 
diff --git a/configure.ac b/configure.ac
index fc523c6aeb..ebc8182f2b 100644
--- a/configure.ac
+++ b/configure.ac
@@ -999,6 +999,13 @@ PGAC_ARG_BOOL(with, zlib, yes,
               [do not use Zlib])
 AC_SUBST(with_zlib)
 
+#
+# Zstd
+#
+PGAC_ARG_BOOL(with, zstd, no,
+              [use zstd])
+AC_SUBST(with_zstd)
+
 #
 # Assignments
 #
@@ -1186,6 +1193,13 @@ failure.  It is possible the compiler isn't looking in 
the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_zstd" = yes; then
+  AC_CHECK_LIB(zstd, ZSTD_decompressStream, [],
+               [AC_MSG_ERROR([zstd library not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper 
directory.])])
+fi
+
 if test "$enable_spinlocks" = yes; then
   AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.])
 else
@@ -1400,6 +1414,13 @@ failure.  It is possible the compiler isn't looking in 
the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_zstd" = yes; then
+  AC_CHECK_HEADER(zstd.h, [], [AC_MSG_ERROR([zstd header not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support.])])
+fi
+
 if test "$with_gssapi" = yes ; then
   AC_CHECK_HEADERS(gssapi/gssapi.h, [],
        [AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is 
required for GSSAPI])])])
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out 
b/contrib/postgres_fdw/expected/postgres_fdw.out
index 2d88d06358..0fc11c12fc 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8911,7 +8911,7 @@ DO $d$
     END;
 $d$;
 ERROR:  invalid option "password"
-HINT:  Valid options in this context are: service, passfile, channel_binding, 
connect_timeout, dbname, host, hostaddr, port, options, application_name, 
keepalives, keepalives_idle, keepalives_interval, keepalives_count, 
tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, 
sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, 
gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, 
fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size
+HINT:  Valid options in this context are: service, passfile, channel_binding, 
connect_timeout, dbname, host, hostaddr, port, options, application_name, 
keepalives, keepalives_idle, keepalives_interval, keepalives_count, 
tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, 
sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, 
gssencmode, krbsrvname, gsslib, compression, target_session_attrs, 
use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, 
fetch_size
 CONTEXT:  SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 
'dummypw')"
 PL/pgSQL function inline_code_block line 3 at EXECUTE
 -- If we add a password for our user mapping instead, we should get a different
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 4b60382778..61c6ecbc96 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -996,6 +996,24 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-libpq-compression" xreflabel="libpq_compression">
+      <term><varname>libpq_compression</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>libpq_compression</varname> configuration 
parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        When this parameter is <literal>on</literal> (default), the 
<productname>PostgreSQL</productname>
+        server can switch on compression of traffic between server and client 
if it is requested by client.
+        Client sends to the server list of compression algorithms supported by 
frontend library,
+        server chooses one which is supported by backend library and sends it 
in compression acknowledgement message
+        to the client. This option allows to reject compression request even 
if it is supported by server
+        (due to security, CPU consumption or whatever else reasons...).
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
      </sect2>
 
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 67c5d4c36b..5ab1b08527 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -1240,6 +1240,22 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
       </listitem>
      </varlistentry>
 
+     <varlistentry id="libpq-connect-compression" xreflabel="compression">
+      <term><literal>compression</literal></term>
+      <listitem>
+      <para>
+        Request compression of libpq traffic. Client sends to the server list 
of compression algorithms, supported by client library.
+        If server supports one of this algorithms, then it acknowledges use of 
this algorithm and then all libpq messages send both from client to server and
+        visa versa will be compressed. If server is not supporting any of the 
suggested algorithms, then it rejects client request to use compression
+        and it is up to the client whether to continue work without 
compression or report error.
+        Supported compression algorithms are chosen at configure time. Right 
now two libraries are supported: zlib (default) and zstd (if Postgres was
+        configured with --with-zstd option). In both cases streaming mode is 
used.
+        By default compression is disabled. Please notice that using 
compression together with SSL may add extra vulnerabilities:
+        <ulink url="https://en.wikipedia.org/wiki/CRIME";>CRIME</ulink>
+      </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="libpq-connect-client-encoding" 
xreflabel="client_encoding">
       <term><literal>client_encoding</literal></term>
       <listitem>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 4899bacda7..db3f94c4b1 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -92,6 +92,15 @@
    such as <command>COPY</command>.
   </para>
 
+  <para>
+    It is possible to compress protocol data to reduce traffic and speed-up 
client-server interaction.
+    Compression is especially useful for importing/exporting data to/from 
database using COPY command
+    and for replication (both physical and logical). Also compression can 
reduce server response time
+    in case of queries returning large amount of data (for example returning 
JSON, BLOBs, text,...)
+    Right now two libraries are supported: zlib (default) and zstd (if 
Postgres was
+    configured with --with-zstd option).
+  </para>
+
  <sect2 id="protocol-message-concepts">
   <title>Messaging Overview</title>
 
@@ -262,6 +271,27 @@
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term>CompressionAck</term>
+      <listitem>
+       <para>
+         Server acknowledges using compression for client-server communication 
protocol.
+         Compression can be requested by client by including "compression" 
option in connection string.
+         It can be just boolean values enabling or disabling compression
+         ("true"/"false", "on"/"off", "yes"/"no", "1"/"0"), "auto" or explicit 
list of compression algorithms
+         separated by comma with optional specification of compression level: 
"zlib,zstd:5".
+         If compression algorithm is not explicitly specified the most 
efficient one supported both by
+         client and server is chosen. Client sends to the server list of 
compression algorithms,
+         supported by client library.
+         If server supports one of this algorithms, then it acknowledges use 
of this algorithm and
+         all subsequent libpq messages send both from client to server and
+         visa versa will be compressed. Server selects most efficient 
algorithm among list specified by client and returns to the client
+         index of chosen algorithm in this list. If server is not supporting 
any of the suggested algorithms, then it replies with -1
+         and it is up to the client whether to continue work without 
compression or report error.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term>AuthenticationOk</term>
       <listitem>
@@ -3400,6 +3430,57 @@ following:
 </listitem>
 </varlistentry>
 
+<varlistentry>
+<term>
+CompressionAck (B)
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('z')
+</term>
+<listitem>
+<para>
+  Acknowledge use of compression for protocol data. Client sends to the server 
list of compression algorithms, supported by client library.
+  If server supports one of this algorithms, then it acknowledges use of this 
algorithm and all subsequent libpq messages send both from client to server and
+  visa versa will be compressed. Server selects most efficient algorithm among 
list specified by client and returns to the client
+  index of chosen algorithm in this list. If server is not supporting any of 
the suggested algorithms, then it replies with -1
+  and it is up to the client whether to continue work without compression or 
report error.
+  After receiving this message with algorithm index other than -1, both server 
and client are switched to compression mode
+  and exchange compressed messages.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32
+</term>
+<listitem>
+<para>
+                Length of message contents in bytes, including self.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Byte1
+</term>
+<listitem>
+<para>
+        Index of algorithm in the list of supported algotihms specified by 
client or -1 if none of them is supported.
+</para>
+</listitem>
+</varlistentry>
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+
 
 <varlistentry>
 <term>
@@ -5966,6 +6047,22 @@ StartupMessage (F)
 </para>
 </listitem>
 </varlistentry>
+<varlistentry>
+<term>
+                <literal>_pq_.compression</literal>
+</term>
+<listitem>
+<para>
+                        Request compression of libpq traffic. Value is list of 
compression algorithms supported by client with optional
+                        specification of compression level: 
<literal>"zlib,zstd:5"</literal>.
+                        When connecting to an older backend, which does not 
support compression, or in case when the backend support compression
+                        but for some reason wants to disable it, the backend 
will just ignore the _pq_.compression parameter and won’t send
+                        the compressionAck message to the frontend.
+                        By default compression is disabled. Please notice that 
using compression together with SSL may add extra vulnerabilities:
+                        <ulink 
url="https://en.wikipedia.org/wiki/CRIME";>CRIME</ulink>.
+</para>
+</listitem>
+</varlistentry>
 </variablelist>
 
                 In addition to the above, other parameters may be listed.
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 7ca1e9aac5..9e11599002 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -196,6 +196,7 @@ with_llvm   = @with_llvm@
 with_system_tzdata = @with_system_tzdata@
 with_uuid      = @with_uuid@
 with_zlib      = @with_zlib@
+with_zstd   = @with_zstd@
 enable_rpath   = @enable_rpath@
 enable_nls     = @enable_nls@
 enable_debug   = @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index 9706a95848..f32a780b81 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -54,6 +54,14 @@ ifeq ($(with_systemd),yes)
 LIBS += -lsystemd
 endif
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+endif
+
 ##########################################################################
 
 all: submake-libpgport submake-catalog-headers submake-utils-headers postgres 
$(POSTGRES_IMP)
diff --git a/src/backend/catalog/system_views.sql 
b/src/backend/catalog/system_views.sql
index b140c210bc..30c784eeba 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -770,6 +770,15 @@ CREATE VIEW pg_stat_activity AS
         LEFT JOIN pg_database AS D ON (S.datid = D.oid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
 
+CREATE VIEW pg_stat_network_traffic AS
+    SELECT
+           S.pid,
+        S.rx_raw_bytes,
+        S.tx_raw_bytes,
+        S.rx_compressed_bytes,
+        S.tx_compressed_bytes
+    FROM pg_stat_get_activity(NULL) AS S;
+
 CREATE VIEW pg_stat_replication AS
     SELECT
             S.pid,
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index 0b511008fc..1bbcd2adc8 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -71,6 +71,7 @@
 #include <signal.h>
 #include <fcntl.h>
 #include <grp.h>
+#include <pgstat.h>
 #include <unistd.h>
 #include <sys/file.h>
 #include <sys/socket.h>
@@ -93,6 +94,8 @@
 #include "storage/ipc.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
+#include "utils/builtins.h"
+#include "common/zpq_stream.h"
 
 /*
  * Cope with the various platform-specific ways to spell TCP keepalive socket
@@ -118,6 +121,7 @@
  */
 int                    Unix_socket_permissions;
 char      *Unix_socket_group;
+bool           libpq_compression;
 
 /* Where the Unix socket files are (list of palloc'd strings) */
 static List *sock_paths = NIL;
@@ -141,6 +145,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int     PqRecvPointer;          /* Next index to read a byte from 
PqRecvBuffer */
 static int     PqRecvLength;           /* End of data available in 
PqRecvBuffer */
 
+static ZpqStream * PqStream;
+
+
 /*
  * Message status
  */
@@ -183,6 +190,114 @@ const PQcommMethods *PqCommMethods = &PqCommSocketMethods;
 
 WaitEventSet *FeBeWaitSet;
 
+static ssize_t
+write_compressed(void *arg, void const *data, size_t size)
+{
+       ssize_t         rc = secure_write((Port *) arg, (void *) data, size);
+
+       if (rc > 0)
+               pgstat_report_network_traffic(0, 0, 0, rc);
+       return rc;
+}
+
+static ssize_t
+read_compressed(void *arg, void *data, size_t size)
+{
+       ssize_t         rc = secure_read((Port *) arg, data, size);
+
+       if (rc > 0)
+               pgstat_report_network_traffic(0, 0, rc, 0);
+       return rc;
+}
+
+
+/* --------------------------------
+ *             pq_configure - configure connection using port settings
+ *
+ * Right now only compression is toggled in the configure.
+ * Function returns 0 in case of success, non-null in case of error
+ * --------------------------------
+ */
+int
+pq_configure(Port *port)
+{
+       char       *client_compression_algorithms = 
port->compression_algorithms;
+
+       /*
+        * If client request compression, it sends list of supported compression
+        * algorithms separated by comma.
+        */
+       if (client_compression_algorithms && libpq_compression)
+       {
+               int                     compression_level = 
ZPQ_DEFAULT_COMPRESSION_LEVEL;
+               char            compression[6] = {'z', 0, 0, 0, 5, 0};  /* 
message length = 5 */
+               int                     impl = -1;
+               int                     rc;
+               char      **server_compression_algorithms = 
zpq_get_supported_algorithms();
+               int                     index = -1;
+               char       *protocol_extension = 
strchr(client_compression_algorithms, ';');
+
+               /* No protocol extension are currently supported */
+               if (protocol_extension)
+                       *protocol_extension = '\0';
+
+               for (int i = 0; *client_compression_algorithms; i++)
+               {
+                       char       *sep = strchr(client_compression_algorithms, 
',');
+                       char       *level;
+
+                       if (sep != NULL)
+                               *sep = '\0';
+
+                       level = strchr(client_compression_algorithms, ':');
+                       if (level != NULL)
+                       {
+                               *level = '\0';  /* compression level is ignored 
now */
+                               if (sscanf("%d", level + 1, &compression_level) 
!= 1)
+                                       ereport(LOG,
+                                                       (errmsg("Invalid 
compression level: %s", level + 1)));
+                       }
+                       for (impl = 0; server_compression_algorithms[impl] != 
NULL; impl++)
+                       {
+                               if 
(pg_strcasecmp(client_compression_algorithms, 
server_compression_algorithms[impl]) == 0)
+                               {
+                                       index = i;
+                                       goto SendCompressionAck;
+                               }
+                       }
+
+                       if (sep != NULL)
+                               client_compression_algorithms = sep + 1;
+                       else
+                               break;
+               }
+SendCompressionAck:
+               free(server_compression_algorithms);
+               compression[5] = (char) index;
+
+               /*
+                * Send 'z' message to the client with selected compression 
algorithm
+                * (or -1 if not found)
+                */
+               socket_set_nonblocking(false);
+               while ((rc = secure_write(MyProcPort, compression, 
sizeof(compression))) < 0
+                          && errno == EINTR);
+               if ((size_t) rc != sizeof(compression))
+                       return -1;
+
+               if (index >= 0)                 /* Use compression */
+               {
+                       PqStream = zpq_create(impl, compression_level, 
write_compressed, read_compressed, MyProcPort, NULL, 0);
+                       if (!PqStream)
+                       {
+                               ereport(LOG,
+                                               (errmsg("Failed to initialize 
compressor %s", server_compression_algorithms[impl])));
+                               return -1;
+                       }
+               }
+       }
+       return 0;
+}
 
 /* --------------------------------
  *             pq_init - initialize libpq at backend startup
@@ -280,6 +395,9 @@ socket_close(int code, Datum arg)
                free(MyProcPort->gss);
 #endif                                                 /* ENABLE_GSS || 
ENABLE_SSPI */
 
+               /* Release compression streams */
+               zpq_free(PqStream);
+
                /*
                 * Cleanly shut down SSL layer.  Nowhere else does a postmaster 
child
                 * call this, so this is safe when interrupting 
BackendInitialize().
@@ -934,12 +1052,15 @@ socket_set_nonblocking(bool nonblocking)
 /* --------------------------------
  *             pq_recvbuf - load some bytes into the input buffer
  *
- *             returns 0 if OK, EOF if trouble
+ *      nowait parameter toggles non-blocking mode.
+ *             returns number of read bytes, EOF if trouble
  * --------------------------------
  */
 static int
-pq_recvbuf(void)
+pq_recvbuf(bool nowait)
 {
+       int                     r;
+
        if (PqRecvPointer > 0)
        {
                if (PqRecvLength > PqRecvPointer)
@@ -955,21 +1076,40 @@ pq_recvbuf(void)
        }
 
        /* Ensure that we're in blocking mode */
-       socket_set_nonblocking(false);
+       socket_set_nonblocking(nowait);
 
        /* Can fill buffer from PqRecvLength and upwards */
        for (;;)
        {
-               int                     r;
-
-               r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
-                                               PQ_RECV_BUFFER_SIZE - 
PqRecvLength);
+               /*
+                * If streaming compression is enabled then use correspondent
+                * compression read function.
+                */
+               r = PqStream
+                       ? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
+                                          PQ_RECV_BUFFER_SIZE - PqRecvLength)
+                       : secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
+                                                 PQ_RECV_BUFFER_SIZE - 
PqRecvLength);
 
                if (r < 0)
                {
+                       if (r == ZPQ_DECOMPRESS_ERROR)
+                       {
+                               char const *msg = zpq_error(PqStream);
+
+                               if (msg == NULL)
+                                       msg = "end of stream";
+                               ereport(COMMERROR,
+                                               (errcode_for_socket_access(),
+                                                errmsg("failed to decompress 
data: %s", msg)));
+                               return EOF;
+                       }
                        if (errno == EINTR)
                                continue;               /* Ok if interrupted */
 
+                       if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK))
+                               return 0;
+
                        /*
                         * Careful: an ereport() that tries to write to the 
client would
                         * cause recursion to here, leading to stack overflow 
and core
@@ -990,7 +1130,8 @@ pq_recvbuf(void)
                }
                /* r contains number of bytes read, so just incr length */
                PqRecvLength += r;
-               return 0;
+               pgstat_report_network_traffic(r, 0, 0, 0);
+               return r;
        }
 }
 
@@ -1005,7 +1146,8 @@ pq_getbyte(void)
 
        while (PqRecvPointer >= PqRecvLength)
        {
-               if (pq_recvbuf())               /* If nothing in buffer, then 
recv some */
+               if (pq_recvbuf(false) == EOF)   /* If nothing in buffer, then 
recv
+                                                                               
 * some */
                        return EOF;                     /* Failed to recv data 
*/
        }
        return (unsigned char) PqRecvBuffer[PqRecvPointer++];
@@ -1024,7 +1166,8 @@ pq_peekbyte(void)
 
        while (PqRecvPointer >= PqRecvLength)
        {
-               if (pq_recvbuf())               /* If nothing in buffer, then 
recv some */
+               if (pq_recvbuf(false) == EOF)   /* If nothing in buffer, then 
recv
+                                                                               
 * some */
                        return EOF;                     /* Failed to recv data 
*/
        }
        return (unsigned char) PqRecvBuffer[PqRecvPointer];
@@ -1041,48 +1184,15 @@ pq_peekbyte(void)
 int
 pq_getbyte_if_available(unsigned char *c)
 {
-       int                     r;
+       int                     r = 0;
 
        Assert(PqCommReadingMsg);
 
-       if (PqRecvPointer < PqRecvLength)
+       if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0)
        {
                *c = PqRecvBuffer[PqRecvPointer++];
                return 1;
        }
-
-       /* Put the socket into non-blocking mode */
-       socket_set_nonblocking(true);
-
-       r = secure_read(MyProcPort, c, 1);
-       if (r < 0)
-       {
-               /*
-                * Ok if no data available without blocking or interrupted 
(though
-                * EINTR really shouldn't happen with a non-blocking socket). 
Report
-                * other errors.
-                */
-               if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
-                       r = 0;
-               else
-               {
-                       /*
-                        * Careful: an ereport() that tries to write to the 
client would
-                        * cause recursion to here, leading to stack overflow 
and core
-                        * dump!  This message must go *only* to the postmaster 
log.
-                        */
-                       ereport(COMMERROR,
-                                       (errcode_for_socket_access(),
-                                        errmsg("could not receive data from 
client: %m")));
-                       r = EOF;
-               }
-       }
-       else if (r == 0)
-       {
-               /* EOF detected */
-               r = EOF;
-       }
-
        return r;
 }
 
@@ -1103,7 +1213,8 @@ pq_getbytes(char *s, size_t len)
        {
                while (PqRecvPointer >= PqRecvLength)
                {
-                       if (pq_recvbuf())       /* If nothing in buffer, then 
recv some */
+                       if (pq_recvbuf(false) == EOF)   /* If nothing in 
buffer, then recv
+                                                                               
         * some */
                                return EOF;             /* Failed to recv data 
*/
                }
                amount = PqRecvLength - PqRecvPointer;
@@ -1137,7 +1248,8 @@ pq_discardbytes(size_t len)
        {
                while (PqRecvPointer >= PqRecvLength)
                {
-                       if (pq_recvbuf())       /* If nothing in buffer, then 
recv some */
+                       if (pq_recvbuf(false) == EOF)   /* If nothing in 
buffer, then recv
+                                                                               
         * some */
                                return EOF;             /* Failed to recv data 
*/
                }
                amount = PqRecvLength - PqRecvPointer;
@@ -1178,7 +1290,8 @@ pq_getstring(StringInfo s)
        {
                while (PqRecvPointer >= PqRecvLength)
                {
-                       if (pq_recvbuf())       /* If nothing in buffer, then 
recv some */
+                       if (pq_recvbuf(false) == EOF)   /* If nothing in 
buffer, then recv
+                                                                               
         * some */
                                return EOF;             /* Failed to recv data 
*/
                }
 
@@ -1428,13 +1541,24 @@ internal_flush(void)
        char       *bufptr = PqSendBuffer + PqSendStart;
        char       *bufend = PqSendBuffer + PqSendPointer;
 
-       while (bufptr < bufend)
+       while (bufptr < bufend || zpq_buffered_tx(PqStream) != 0)
+
+               /*
+                * has more data to flush or unsent data in internal compression
+                * buffer
+                */
        {
                int                     r;
+               size_t          processed = 0;
+               size_t          available = bufend - bufptr;
 
-               r = secure_write(MyProcPort, bufptr, bufend - bufptr);
+               r = PqStream
+                       ? zpq_write(PqStream, bufptr, available, &processed)
+                       : secure_write(MyProcPort, bufptr, available);
+               bufptr += processed;
+               PqSendStart += processed;
 
-               if (r <= 0)
+               if (r < 0 || (r == 0 && available))
                {
                        if (errno == EINTR)
                                continue;               /* Ok if we were 
interrupted */
@@ -1477,12 +1601,12 @@ internal_flush(void)
                        InterruptPending = 1;
                        return EOF;
                }
+               pgstat_report_network_traffic(0, r, 0, 0);
 
                last_reported_send_errno = 0;   /* reset after any successful 
send */
                bufptr += r;
                PqSendStart += r;
        }
-
        PqSendStart = PqSendPointer = 0;
        return 0;
 }
@@ -1499,7 +1623,7 @@ socket_flush_if_writable(void)
        int                     res;
 
        /* Quick exit if nothing to do */
-       if (PqSendPointer == PqSendStart)
+       if ((PqSendPointer == PqSendStart) && (zpq_buffered_tx(PqStream) == 0))
                return 0;
 
        /* No-op if reentrant call */
@@ -1522,7 +1646,7 @@ socket_flush_if_writable(void)
 static bool
 socket_is_send_pending(void)
 {
-       return (PqSendStart < PqSendPointer);
+       return (PqSendStart < PqSendPointer || (zpq_buffered_tx(PqStream) != 
0));
 }
 
 /* --------------------------------
@@ -2013,3 +2137,16 @@ pq_settcpusertimeout(int timeout, Port *port)
 
        return STATUS_OK;
 }
+
+PG_FUNCTION_INFO_V1(pg_compression_algorithm);
+
+Datum
+pg_compression_algorithm(PG_FUNCTION_ARGS)
+{
+       char const *algorithm_name = PqStream ? zpq_algorithm_name(PqStream) : 
NULL;
+
+       if (algorithm_name)
+               PG_RETURN_TEXT_P(cstring_to_text(algorithm_name));
+       else
+               PG_RETURN_NULL();
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 6b60f293e9..773df0eaa6 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3130,6 +3130,9 @@ pgstat_bestart(void)
        lbeentry.st_xact_start_timestamp = 0;
        lbeentry.st_databaseid = MyDatabaseId;
 
+       lbeentry.st_tx_raw_bytes = lbeentry.st_rx_raw_bytes =
+               lbeentry.st_tx_compressed_bytes = 
lbeentry.st_rx_compressed_bytes = 0;
+
        /* We have userid for client-backends, wal-sender and bgworker 
processes */
        if (lbeentry.st_backendType == B_BACKEND
                || lbeentry.st_backendType == B_WAL_SENDER
@@ -3511,6 +3514,33 @@ pgstat_report_xact_timestamp(TimestampTz tstamp)
        PGSTAT_END_WRITE_ACTIVITY(beentry);
 }
 
+/*
+ * Report current transaction start timestamp as the specified value.
+ * Zero means there is no active transaction.
+ */
+void
+pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 
rx_compressed_bytes, uint64 tx_compressed_bytes)
+{
+       volatile PgBackendStatus *beentry = MyBEEntry;
+
+       if (!pgstat_track_activities || !beentry)
+               return;
+
+       /*
+        * Update my status entry, following the protocol of bumping
+        * st_changecount before and after.  We use a volatile pointer here to
+        * ensure the compiler doesn't try to get cute.
+        */
+       PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
+
+       beentry->st_rx_raw_bytes += rx_raw_bytes;
+       beentry->st_tx_raw_bytes += tx_raw_bytes;
+       beentry->st_rx_compressed_bytes += rx_compressed_bytes;
+       beentry->st_tx_compressed_bytes += tx_compressed_bytes;
+
+       PGSTAT_END_WRITE_ACTIVITY(beentry);
+}
+
 /* ----------
  * pgstat_read_current_status() -
  *
diff --git a/src/backend/postmaster/postmaster.c 
b/src/backend/postmaster/postmaster.c
index 5d09822c81..039f14c46d 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -218,6 +218,7 @@ int                 ReservedBackends;
 /* The socket(s) we're listening to. */
 #define MAXLISTEN      64
 static pgsocket ListenSocket[MAXLISTEN];
+
 /*
  * These globals control the behavior of the postmaster in case some
  * backend dumps core.  Normally, it kills all peers of the dead backend
@@ -345,7 +346,7 @@ typedef enum
        ALLOW_ALL_CONNS,                        /* normal not-shutting-down 
state */
        ALLOW_SUPERUSER_CONNS,          /* only superusers can connect */
        ALLOW_NO_CONNS                          /* no new connections allowed, 
period */
-} ConnsAllowedState;
+}                      ConnsAllowedState;
 
 static ConnsAllowedState connsAllowed = ALLOW_ALL_CONNS;
 
@@ -2138,6 +2139,8 @@ retry1:
                                port->database_name = pstrdup(valptr);
                        else if (strcmp(nameptr, "user") == 0)
                                port->user_name = pstrdup(valptr);
+                       else if (strcmp(nameptr, "_pq_.compression") == 0)
+                               port->compression_algorithms = pstrdup(valptr);
                        else if (strcmp(nameptr, "options") == 0)
                                port->cmdline_options = pstrdup(valptr);
                        else if (strcmp(nameptr, "replication") == 0)
@@ -4445,6 +4448,14 @@ BackendInitialize(Port *port)
        if (status != STATUS_OK)
                proc_exit(0);
 
+       if (pq_configure(port))
+       {
+               ereport(COMMERROR,
+                               (errcode_for_socket_access(),
+                                errmsg("failed to send compression message: 
%m")));
+               proc_exit(0);
+       }
+
        /*
         * Now that we have the user and database name, we can set the process
         * title for ps.  It's good to do this as early as possible in startup.
diff --git a/src/backend/utils/adt/pgstatfuncs.c 
b/src/backend/utils/adt/pgstatfuncs.c
index 6afe1b6f56..e3256d7098 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -567,7 +567,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_activity(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_ACTIVITY_COLS      30
+#define PG_STAT_GET_ACTIVITY_COLS      34
        int                     num_backends = pgstat_fetch_stat_numbackends();
        int                     curr_backend;
        int                     pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0);
@@ -913,6 +913,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
                                values[28] = BoolGetDatum(false);       /* GSS 
Encryption not in
                                                                                
                         * use */
                        }
+                       values[30] = beentry->st_rx_raw_bytes;
+                       values[31] = beentry->st_tx_raw_bytes;
+                       values[32] = beentry->st_rx_compressed_bytes;
+                       values[33] = beentry->st_tx_compressed_bytes;
                }
                else
                {
@@ -941,6 +945,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
                        nulls[27] = true;
                        nulls[28] = true;
                        nulls[29] = true;
+                       nulls[30] = true;
+                       nulls[31] = true;
+                       nulls[32] = true;
+                       nulls[33] = true;
                }
 
                tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -1140,6 +1148,46 @@ pg_stat_get_backend_start(PG_FUNCTION_ARGS)
        PG_RETURN_TIMESTAMPTZ(result);
 }
 
+Datum
+pg_stat_get_network_traffic(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_NETWORK_TRAFFIC_COLS   4
+       TupleDesc       tupdesc;
+       Datum           values[PG_STAT_NETWORK_TRAFFIC_COLS];
+       bool            nulls[PG_STAT_NETWORK_TRAFFIC_COLS];
+       int32           beid = PG_GETARG_INT32(0);
+       PgBackendStatus *beentry;
+
+       if ((beentry = pgstat_fetch_stat_beentry(beid)) == NULL)
+               PG_RETURN_NULL();
+       else if (!HAS_PGSTAT_PERMISSIONS(beentry->st_userid))
+               PG_RETURN_NULL();
+
+       /* Initialise values and NULL flags arrays */
+       MemSet(values, 0, sizeof(values));
+       MemSet(nulls, 0, sizeof(nulls));
+
+       /* Initialise attributes information in the tuple descriptor */
+       tupdesc = CreateTemplateTupleDesc(PG_STAT_NETWORK_TRAFFIC_COLS);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 1, "rx_raw_bytes",
+                                          INT8OID, -1, 0);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 2, "tx_raw_bytes",
+                                          INT8OID, -1, 0);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 3, "rx_compressed_bytes",
+                                          INT8OID, -1, 0);
+       TupleDescInitEntry(tupdesc, (AttrNumber) 4, "tx_compressed_bytes",
+                                          INT8OID, -1, 0);
+       BlessTupleDesc(tupdesc);
+
+       /* Fill values and NULLs */
+       values[0] = beentry->st_rx_raw_bytes;
+       values[1] = beentry->st_tx_raw_bytes;
+       values[2] = beentry->st_rx_compressed_bytes;
+       values[3] = beentry->st_tx_compressed_bytes;
+
+       /* Returns the record as Datum */
+       PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, 
nulls)));
+}
 
 Datum
 pg_stat_get_backend_client_addr(PG_FUNCTION_ARGS)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index dabcbb0736..bbae6f92a4 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1289,6 +1289,16 @@ static struct config_bool ConfigureNamesBool[] =
                NULL, NULL, NULL
        },
 
+       {
+               {"libpq_compression", PGC_SIGHUP, CLIENT_CONN_OTHER,
+                       gettext_noop("Compress client-server traffic."),
+                       NULL
+               },
+               &libpq_compression,
+               true,
+               NULL, NULL, NULL
+       },
+
        {
                {"log_checkpoints", PGC_SIGHUP, LOGGING_WHAT,
                        gettext_noop("Logs each checkpoint."),
diff --git a/src/common/Makefile b/src/common/Makefile
index af891cb0ce..0ae5eb1a02 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -77,7 +77,8 @@ OBJS_COMMON = \
        unicode_norm.o \
        username.o \
        wait_error.o \
-       wchar.o
+       wchar.o \
+       zpq_stream.o
 
 ifeq ($(with_openssl),yes)
 OBJS_COMMON += \
diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c
new file mode 100644
index 0000000000..1c8c07bfcb
--- /dev/null
+++ b/src/common/zpq_stream.c
@@ -0,0 +1,578 @@
+#include "postgres_fe.h"
+#include "common/zpq_stream.h"
+#include "c.h"
+#include "pg_config.h"
+
+/*
+ * Functions implementing streaming compression algorithm
+ */
+typedef struct
+{
+       /*
+        * Returns name of compression algorithm.
+        */
+       char const *(*name) (void);
+
+       /*
+        * Create compression stream with using rx/tx function for
+        * fetching/sending compressed data. level: compression level tx_func:
+        * function for writing compressed data in underlying stream rx_func:
+        * function for receiving compressed data from underlying stream arg:
+        * context passed to the function rx_data: received data (compressed 
data
+        * already fetched from input stream) rx_data_size: size of data fetched
+        * from input stream
+        */
+       ZpqStream  *(*create) (int level, zpq_tx_func tx_func, zpq_rx_func 
rx_func, void *arg, char *rx_data, size_t rx_data_size);
+
+       /*
+        * Read up to "size" raw (decompressed) bytes. Returns number of
+        * decompressed bytes or error code. Error code is either
+        * ZPQ_DECOMPRESS_ERROR either error code returned by the rx function.
+        */
+       ssize_t         (*read) (ZpqStream * zs, void *buf, size_t size);
+
+       /*
+        * Write up to "size" raw (decompressed) bytes. Returns number of 
written
+        * raw bytes or error code returned by tx function. In the last case
+        * amount of written raw bytes is stored in *processed.
+        */
+       ssize_t         (*write) (ZpqStream * zs, void const *buf, size_t size, 
size_t *processed);
+
+       /*
+        * Free stream created by create function.
+        */
+       void            (*free) (ZpqStream * zs);
+
+       /*
+        * Get error message.
+        */
+       char const *(*error) (ZpqStream * zs);
+
+       /*
+        * Returns amount of data in internal tx decompression buffer.
+        */
+       size_t          (*buffered_tx) (ZpqStream * zs);
+
+       /*
+        * Returns amount of data in internal rx compression buffer.
+        */
+       size_t          (*buffered_rx) (ZpqStream * zs);
+}                      ZpqAlgorithm;
+
+struct ZpqStream
+{
+       ZpqAlgorithm const *algorithm;
+};
+
+#if HAVE_LIBZSTD
+
+#include <stdlib.h>
+#include <zstd.h>
+
+#define ZSTD_BUFFER_SIZE (8*1024)
+
+typedef struct ZstdStream
+{
+       ZpqStream       common;
+       ZSTD_CStream *tx_stream;
+       ZSTD_DStream *rx_stream;
+       ZSTD_outBuffer tx;
+       ZSTD_inBuffer rx;
+       size_t          tx_not_flushed; /* Amount of data in internal zstd 
buffer */
+       size_t          tx_buffered;    /* Data which is consumed by ztd_write 
but not
+                                                                * yet sent */
+       size_t          rx_buffered;    /* Data which is needed for ztd_read */
+       zpq_tx_func tx_func;
+       zpq_rx_func rx_func;
+       void       *arg;
+       char const *rx_error;           /* Decompress error message */
+       size_t          tx_total;
+       size_t          tx_total_raw;
+       size_t          rx_total;
+       size_t          rx_total_raw;
+       char            tx_buf[ZSTD_BUFFER_SIZE];
+       char            rx_buf[ZSTD_BUFFER_SIZE];
+}                      ZstdStream;
+
+static ZpqStream *
+zstd_create(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, 
char *rx_data, size_t rx_data_size)
+{
+       ZstdStream *zs = (ZstdStream *) malloc(sizeof(ZstdStream));
+
+       zs->tx_stream = ZSTD_createCStream();
+       ZSTD_initCStream(zs->tx_stream, level);
+       zs->rx_stream = ZSTD_createDStream();
+       ZSTD_initDStream(zs->rx_stream);
+       zs->tx.dst = zs->tx_buf;
+       zs->tx.pos = 0;
+       zs->tx.size = ZSTD_BUFFER_SIZE;
+       zs->rx.src = zs->rx_buf;
+       zs->rx.pos = 0;
+       zs->rx.size = 0;
+       zs->rx_func = rx_func;
+       zs->tx_func = tx_func;
+       zs->tx_buffered = 0;
+       zs->rx_buffered = 0;
+       zs->tx_not_flushed = 0;
+       zs->rx_error = NULL;
+       zs->arg = arg;
+       zs->tx_total = zs->tx_total_raw = 0;
+       zs->rx_total = zs->rx_total_raw = 0;
+       zs->rx.size = rx_data_size;
+       Assert(rx_data_size < ZSTD_BUFFER_SIZE);
+       memcpy(zs->rx_buf, rx_data, rx_data_size);
+
+       return (ZpqStream *) zs;
+}
+
+static ssize_t
+zstd_read(ZpqStream * zstream, void *buf, size_t size)
+{
+       ZstdStream *zs = (ZstdStream *) zstream;
+       ssize_t         rc;
+       ZSTD_outBuffer out;
+
+       out.dst = buf;
+       out.pos = 0;
+       out.size = size;
+
+       while (1)
+       {
+               if (zs->rx.pos != zs->rx.size || zs->rx_buffered == 0)
+               {
+                       rc = ZSTD_decompressStream(zs->rx_stream, &out, 
&zs->rx);
+                       if (ZSTD_isError(rc))
+                       {
+                               zs->rx_error = ZSTD_getErrorName(rc);
+                               return ZPQ_DECOMPRESS_ERROR;
+                       }
+
+                       /*
+                        * Return result if we fill requested amount of bytes 
or read
+                        * operation was performed
+                        */
+                       if (out.pos != 0)
+                       {
+                               zs->rx_total_raw += out.pos;
+                               zs->rx_buffered = 0;
+                               return out.pos;
+                       }
+                       zs->rx_buffered = rc;
+                       if (zs->rx.pos == zs->rx.size)
+                       {
+                               zs->rx.pos = zs->rx.size = 0;   /* Reset rx 
buffer */
+                       }
+               }
+               rc = zs->rx_func(zs->arg, (char *) zs->rx.src + zs->rx.size, 
ZSTD_BUFFER_SIZE - zs->rx.size);
+               if (rc > 0)                             /* read fetches some 
data */
+               {
+                       zs->rx.size += rc;
+                       zs->rx_total += rc;
+               }
+               else                                    /* read failed */
+               {
+                       zs->rx_total_raw += out.pos;
+                       return rc;
+               }
+       }
+}
+
+static ssize_t
+zstd_write(ZpqStream * zstream, void const *buf, size_t size, size_t 
*processed)
+{
+       ZstdStream *zs = (ZstdStream *) zstream;
+       ssize_t         rc;
+       ZSTD_inBuffer in_buf;
+
+       in_buf.src = buf;
+       in_buf.pos = 0;
+       in_buf.size = size;
+
+       do
+       {
+               if (zs->tx.pos == 0)    /* Compress buffer is empty */
+               {
+                       zs->tx.dst = zs->tx_buf;        /* Reset pointer to the 
beginning of
+                                                                               
 * buffer */
+
+                       if (in_buf.pos < size)  /* Has something to compress in 
input
+                                                                        * 
buffer */
+                               ZSTD_compressStream(zs->tx_stream, &zs->tx, 
&in_buf);
+
+                       if (in_buf.pos == size) /* All data is compressed: 
flushed
+                                                                        * 
internal zstd buffer */
+                       {
+                               zs->tx_not_flushed = 
ZSTD_flushStream(zs->tx_stream, &zs->tx);
+                       }
+               }
+               rc = zs->tx_func(zs->arg, zs->tx.dst, zs->tx.pos);
+               if (rc > 0)
+               {
+                       zs->tx.pos -= rc;
+                       zs->tx.dst = (char *) zs->tx.dst + rc;
+                       zs->tx_total += rc;
+               }
+               else
+               {
+                       *processed = in_buf.pos;
+                       zs->tx_buffered = zs->tx.pos;
+                       zs->tx_total_raw += in_buf.pos;
+                       return rc;
+               }
+
+               /*
+                * repeat sending while there is some data in input or internal 
zstd
+                * buffer
+                */
+       } while (in_buf.pos < size || zs->tx_not_flushed);
+
+       zs->tx_total_raw += in_buf.pos;
+       zs->tx_buffered = zs->tx.pos;
+       return in_buf.pos;
+}
+
+static void
+zstd_free(ZpqStream * zstream)
+{
+       ZstdStream *zs = (ZstdStream *) zstream;
+
+       if (zs != NULL)
+       {
+               ZSTD_freeCStream(zs->tx_stream);
+               ZSTD_freeDStream(zs->rx_stream);
+               free(zs);
+       }
+}
+
+static char const *
+zstd_error(ZpqStream * zstream)
+{
+       ZstdStream *zs = (ZstdStream *) zstream;
+
+       return zs->rx_error;
+}
+
+static size_t
+zstd_buffered_tx(ZpqStream * zstream)
+{
+       ZstdStream *zs = (ZstdStream *) zstream;
+
+       return zs != NULL ? zs->tx_buffered + zs->tx_not_flushed : 0;
+}
+
+static size_t
+zstd_buffered_rx(ZpqStream * zstream)
+{
+       ZstdStream *zs = (ZstdStream *) zstream;
+
+       return zs != NULL ? zs->rx.size - zs->rx.pos : 0;
+}
+
+static char const *
+zstd_name(void)
+{
+       return "zstd";
+}
+
+#endif
+
+#if HAVE_LIBZ
+
+#include <stdlib.h>
+#include <zlib.h>
+
+#define ZLIB_BUFFER_SIZE       8192 /* We have to flush stream after each
+                                                                        * 
protocol command and command is mostly
+                                                                        * 
limited by record length, which in turn
+                                                                        * 
usually less than page size (except
+                                                                        * 
TOAST) */
+
+typedef struct ZlibStream
+{
+       ZpqStream       common;
+
+       z_stream        tx;
+       z_stream        rx;
+
+       zpq_tx_func tx_func;
+       zpq_rx_func rx_func;
+       void       *arg;
+       unsigned        tx_deflate_pending;
+       size_t          tx_buffered;
+
+       Bytef           tx_buf[ZLIB_BUFFER_SIZE];
+       Bytef           rx_buf[ZLIB_BUFFER_SIZE];
+}                      ZlibStream;
+
+static ZpqStream *
+zlib_create(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, 
char *rx_data, size_t rx_data_size)
+{
+       int                     rc;
+       ZlibStream *zs = (ZlibStream *) malloc(sizeof(ZlibStream));
+
+       memset(&zs->tx, 0, sizeof(zs->tx));
+       zs->tx.next_out = zs->tx_buf;
+       zs->tx.avail_out = ZLIB_BUFFER_SIZE;
+       zs->tx_buffered = 0;
+       rc = deflateInit(&zs->tx, level);
+       if (rc != Z_OK)
+       {
+               free(zs);
+               return NULL;
+       }
+       Assert(zs->tx.next_out == zs->tx_buf && zs->tx.avail_out == 
ZLIB_BUFFER_SIZE);
+
+       memset(&zs->rx, 0, sizeof(zs->tx));
+       zs->rx.next_in = zs->rx_buf;
+       zs->rx.avail_in = ZLIB_BUFFER_SIZE;
+       zs->tx_deflate_pending = 0;
+       rc = inflateInit(&zs->rx);
+       if (rc != Z_OK)
+       {
+               free(zs);
+               return NULL;
+       }
+       Assert(zs->rx.next_in == zs->rx_buf && zs->rx.avail_in == 
ZLIB_BUFFER_SIZE);
+
+       zs->rx.avail_in = rx_data_size;
+       Assert(rx_data_size < ZLIB_BUFFER_SIZE);
+       memcpy(zs->rx_buf, rx_data, rx_data_size);
+
+       zs->rx_func = rx_func;
+       zs->tx_func = tx_func;
+       zs->arg = arg;
+
+       return (ZpqStream *) zs;
+}
+
+static ssize_t
+zlib_read(ZpqStream * zstream, void *buf, size_t size)
+{
+       ZlibStream *zs = (ZlibStream *) zstream;
+       int                     rc;
+
+       zs->rx.next_out = (Bytef *) buf;
+       zs->rx.avail_out = size;
+
+       while (1)
+       {
+               if (zs->rx.avail_in != 0)       /* If there is some data in 
receiver
+                                                                        * 
buffer, then decompress it */
+               {
+                       rc = inflate(&zs->rx, Z_SYNC_FLUSH);
+                       if (rc != Z_OK && rc != Z_BUF_ERROR)
+                       {
+                               return ZPQ_DECOMPRESS_ERROR;
+                       }
+                       if (zs->rx.avail_out != size)
+                       {
+                               return size - zs->rx.avail_out;
+                       }
+                       if (zs->rx.avail_in == 0)
+                       {
+                               zs->rx.next_in = zs->rx_buf;
+                       }
+               }
+               else
+               {
+                       zs->rx.next_in = zs->rx_buf;
+               }
+               rc = zs->rx_func(zs->arg, zs->rx.next_in + zs->rx.avail_in, 
zs->rx_buf + ZLIB_BUFFER_SIZE - zs->rx.next_in - zs->rx.avail_in);
+               if (rc > 0)
+               {
+                       zs->rx.avail_in += rc;
+               }
+               else
+               {
+                       return rc;
+               }
+       }
+}
+
+static ssize_t
+zlib_write(ZpqStream * zstream, void const *buf, size_t size, size_t 
*processed)
+{
+       ZlibStream *zs = (ZlibStream *) zstream;
+       int                     rc;
+
+       zs->tx.next_in = (Bytef *) buf;
+       zs->tx.avail_in = size;
+       do
+       {
+               if (zs->tx.avail_out == ZLIB_BUFFER_SIZE)       /* Compress 
buffer is
+                                                                               
                         * empty */
+               {
+                       zs->tx.next_out = zs->tx_buf;   /* Reset pointer to the 
 beginning
+                                                                               
         * of buffer */
+
+                       if (zs->tx.avail_in != 0 || (zs->tx_deflate_pending > 
0))       /* Has something in
+                                                                               
                                                                 * input or 
deflate
+                                                                               
                                                                 * buffer */
+                       {
+                               rc = deflate(&zs->tx, Z_SYNC_FLUSH);
+                               Assert(rc == Z_OK);
+                               deflatePending(&zs->tx, 
&zs->tx_deflate_pending, Z_NULL);       /* check if any data
+                                                                               
                                                                         * left 
in deflate
+                                                                               
                                                                         * 
buffer */
+                               zs->tx.next_out = zs->tx_buf;   /* Reset 
pointer to the
+                                                                               
                 * beginning of buffer */
+                       }
+               }
+               rc = zs->tx_func(zs->arg, zs->tx.next_out, ZLIB_BUFFER_SIZE - 
zs->tx.avail_out);
+               if (rc > 0)
+               {
+                       zs->tx.next_out += rc;
+                       zs->tx.avail_out += rc;
+               }
+               else
+               {
+                       *processed = size - zs->tx.avail_in;
+                       zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out;
+                       return rc;
+               }
+               /* repeat sending while there is some data in input or deflate 
buffer */
+       } while (zs->tx.avail_in != 0 || zs->tx_deflate_pending > 0);
+
+       zs->tx_buffered = ZLIB_BUFFER_SIZE - zs->tx.avail_out;
+
+       return size - zs->tx.avail_in;
+}
+
+static void
+zlib_free(ZpqStream * zstream)
+{
+       ZlibStream *zs = (ZlibStream *) zstream;
+
+       if (zs != NULL)
+       {
+               inflateEnd(&zs->rx);
+               deflateEnd(&zs->tx);
+               free(zs);
+       }
+}
+
+static char const *
+zlib_error(ZpqStream * zstream)
+{
+       ZlibStream *zs = (ZlibStream *) zstream;
+
+       return zs->rx.msg;
+}
+
+static size_t
+zlib_buffered_tx(ZpqStream * zstream)
+{
+       ZlibStream *zs = (ZlibStream *) zstream;
+
+       return zs != NULL ? zs->tx_buffered + zs->tx_deflate_pending : 0;
+}
+
+static size_t
+zlib_buffered_rx(ZpqStream * zstream)
+{
+       ZlibStream *zs = (ZlibStream *) zstream;
+
+       return zs != NULL ? zs->rx.avail_in : 0;
+}
+
+static char const *
+zlib_name(void)
+{
+       return "zlib";
+}
+
+#endif
+
+static char const *
+no_compression_name(void)
+{
+       return NULL;
+}
+
+/*
+ * Array with all supported compression algorithms.
+ */
+static ZpqAlgorithm const zpq_algorithms[] =
+{
+#if HAVE_LIBZSTD
+       {zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, 
zstd_buffered_tx, zstd_buffered_rx},
+#endif
+#if HAVE_LIBZ
+       {zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, 
zlib_buffered_tx, zlib_buffered_rx},
+#endif
+       {no_compression_name}
+};
+
+/*
+ * Index of used compression algorithm in zpq_algorithms array.
+ */
+ZpqStream *
+zpq_create(int algorithm_impl, int level, zpq_tx_func tx_func, zpq_rx_func 
rx_func, void *arg, char *rx_data, size_t rx_data_size)
+{
+       ZpqStream  *stream = zpq_algorithms[algorithm_impl].create(level, 
tx_func, rx_func, arg, rx_data, rx_data_size);
+
+       if (stream)
+               stream->algorithm = &zpq_algorithms[algorithm_impl];
+       return stream;
+}
+
+ssize_t
+zpq_read(ZpqStream * zs, void *buf, size_t size)
+{
+       return zs->algorithm->read(zs, buf, size);
+}
+
+ssize_t
+zpq_write(ZpqStream * zs, void const *buf, size_t size, size_t *processed)
+{
+       return zs->algorithm->write(zs, buf, size, processed);
+}
+
+void
+zpq_free(ZpqStream * zs)
+{
+       if (zs)
+               zs->algorithm->free(zs);
+}
+
+char const *
+zpq_error(ZpqStream * zs)
+{
+       return zs->algorithm->error(zs);
+}
+
+
+size_t
+zpq_buffered_rx(ZpqStream * zs)
+{
+       return zs ? zs->algorithm->buffered_rx(zs) : 0;
+}
+
+size_t
+zpq_buffered_tx(ZpqStream * zs)
+{
+       return zs ? zs->algorithm->buffered_tx(zs) : 0;
+}
+
+/*
+ * Get list of the supported algorithms.
+ */
+char     **
+zpq_get_supported_algorithms(void)
+{
+       size_t          n_algorithms = sizeof(zpq_algorithms) / 
sizeof(*zpq_algorithms);
+       char      **algorithm_names = malloc(n_algorithms * sizeof(char *));
+
+       for (size_t i = 0; i < n_algorithms; i++)
+       {
+               algorithm_names[i] = (char *) zpq_algorithms[i].name();
+       }
+
+       return algorithm_names;
+}
+
+char const *
+zpq_algorithm_name(ZpqStream * zs)
+{
+       return zs ? zs->algorithm->name() : NULL;
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index e6c7b070f6..0b25c39825 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5236,9 +5236,9 @@
   proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'int4',
-  proallargtypes => 
'{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4}',
-  proargmodes => 
'{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => 
'{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid}',
+  proallargtypes => 
'{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4,int8,int8,int8,int8}',
+  proargmodes => 
'{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => 
'{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}',
   prosrc => 'pg_stat_get_activity' },
 { oid => '3318',
   descr => 'statistics: information about progress of backends running 
maintenance command',
@@ -5505,6 +5505,14 @@
    proargnames => 
'{wal_records,wal_fpi,wal_bytes,wal_buffers_full,stats_reset}',
   prosrc => 'pg_stat_get_wal' },
 
+{ oid => '1137', descr => 'statistics: information about network traffic',
+  proname => 'pg_stat_get_network_traffic', proisstrict => 'f', provolatile => 
's',
+  proparallel => 'r', prorettype => 'record', proargtypes => 'int4',
+  proallargtypes => '{int4,int8,int8,int8,int8}',
+  proargmodes => '{i,o,o,o,o}',
+  proargnames => 
'{_beid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}',
+  prosrc => 'pg_stat_get_network_traffic' },
+
 { oid => '2306', descr => 'statistics: information about SLRU caches',
   proname => 'pg_stat_get_slru', prorows => '100', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
@@ -5636,6 +5644,10 @@
   proname => 'pg_tablespace_location', provolatile => 's', prorettype => 
'text',
   proargtypes => 'oid', prosrc => 'pg_tablespace_location' },
 
+{ oid => '4142', descr => 'connection compression algorithm',
+  proname => 'pg_compression_algorithm', provolatile => 's', prorettype => 
'text',
+  proargtypes => '', prosrc => 'pg_compression_algorithm' },
+
 { oid => '1946',
   descr => 'convert bytea value into some ascii-only text string',
   proname => 'encode', prorettype => 'text', proargtypes => 'bytea text',
diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h
new file mode 100644
index 0000000000..4e1e03852f
--- /dev/null
+++ b/src/include/common/zpq_stream.h
@@ -0,0 +1,36 @@
+/*
+ * zpq_stream.h
+ *     Streaiming compression for libpq
+ */
+
+#ifndef ZPQ_STREAM_H
+#define ZPQ_STREAM_H
+
+#include <stdlib.h>
+
+#define ZPQ_IO_ERROR (-1)
+#define ZPQ_DECOMPRESS_ERROR (-2)
+
+#define ZPQ_DEFAULT_COMPRESSION_LEVEL (1)
+
+struct ZpqStream;
+typedef struct ZpqStream ZpqStream;
+
+typedef ssize_t (*zpq_tx_func) (void *arg, void const *data, size_t size);
+typedef ssize_t (*zpq_rx_func) (void *arg, void *data, size_t size);
+
+ZpqStream  *zpq_create(int impl, int level, zpq_tx_func tx_func, zpq_rx_func 
rx_func, void *arg, char *rx_data, size_t rx_data_size);
+ssize_t                zpq_read(ZpqStream * zs, void *buf, size_t size);
+ssize_t                zpq_write(ZpqStream * zs, void const *buf, size_t size, 
size_t *processed);
+char const *zpq_error(ZpqStream * zs);
+size_t         zpq_buffered_rx(ZpqStream * zs);
+size_t         zpq_buffered_tx(ZpqStream * zs);
+void           zpq_free(ZpqStream * zs);
+char const *zpq_algorithm_name(ZpqStream * zs);
+
+/*
+  Returns zero terminated array with compression algorithms names
+*/
+char     **zpq_get_supported_algorithms(void);
+
+#endif
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 0a23281ad5..a24bbca1a6 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -170,6 +170,9 @@ typedef struct Port
        int                     keepalives_count;
        int                     tcp_user_timeout;
 
+       char       *compression_algorithms; /* Compression algorithms supported 
by
+                                                                               
 * client */
+
        /*
         * GSSAPI structures.
         */
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index b1152475ac..de24a3f76a 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -63,6 +63,7 @@ extern void StreamClose(pgsocket sock);
 extern void TouchSocketFiles(void);
 extern void RemoveSocketFiles(void);
 extern void pq_init(void);
+extern int     pq_configure(Port *port);
 extern int     pq_getbytes(char *s, size_t len);
 extern int     pq_getstring(StringInfo s);
 extern void pq_startmsgread(void);
diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h
index cf967c3987..f4560f9d51 100644
--- a/src/include/libpq/pqcomm.h
+++ b/src/include/libpq/pqcomm.h
@@ -159,6 +159,7 @@ typedef struct StartupPacket
 } StartupPacket;
 
 extern bool Db_user_namespace;
+extern bool libpq_compression;
 
 /*
  * In protocol 3.0 and later, the startup packet length is not fixed, but
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index de8f838e53..b336eeb175 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -349,6 +349,9 @@
 /* Define to 1 if you have the `link' function. */
 #undef HAVE_LINK
 
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
 /* Define to 1 if the system has the type `locale_t'. */
 #undef HAVE_LOCALE_T
 
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 5954068dec..09b137ff89 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -1196,6 +1196,12 @@ typedef struct PgBackendStatus
        /* application name; MUST be null-terminated */
        char       *st_appname;
 
+       /* client-server traffic information */
+       uint64          st_rx_raw_bytes;
+       uint64          st_tx_raw_bytes;
+       uint64          st_rx_compressed_bytes;
+       uint64          st_tx_compressed_bytes;
+
        /*
         * Current command string; MUST be null-terminated. Note that this 
string
         * possibly is truncated in the middle of a multi-byte character. As
@@ -1409,6 +1415,7 @@ extern void pgstat_report_activity(BackendState state, 
const char *cmd_str);
 extern void pgstat_report_tempfile(size_t filesize);
 extern void pgstat_report_appname(const char *appname);
 extern void pgstat_report_xact_timestamp(TimestampTz tstamp);
+extern void pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 
tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes);
 extern const char *pgstat_get_wait_event(uint32 wait_event_info);
 extern const char *pgstat_get_wait_event_type(uint32 wait_event_info);
 extern const char *pgstat_get_backend_current_activity(int pid, bool 
checkUser);
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index 4ac5f4b340..be8cb34e40 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -29,6 +29,20 @@ endif
 # The MSVC build system scrapes OBJS from this file.  If you change any of
 # the conditional additions of files to OBJS, update Mkvcbuild.pm to match.
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+SHLIB_LINK += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+SHLIB_LINK += -lz
+endif
+
+# We can't use Makefile variables here because the MSVC build system scrapes
+# OBJS from this file.
+
+
 OBJS = \
        $(WIN32RES) \
        fe-auth-scram.o \
diff --git a/src/interfaces/libpq/fe-connect.c 
b/src/interfaces/libpq/fe-connect.c
index 7d04d3664e..de5d0e2ae4 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -24,6 +24,7 @@
 #include "common/ip.h"
 #include "common/link-canary.h"
 #include "common/scram-common.h"
+#include "common/zpq_stream.h"
 #include "common/string.h"
 #include "fe-auth.h"
 #include "libpq-fe.h"
@@ -350,6 +351,10 @@ static const internalPQconninfoOption PQconninfoOptions[] 
= {
                "Replication", "D", 5,
        offsetof(struct pg_conn, replication)},
 
+       {"compression", "PGCOMPRESSION", NULL, NULL,
+               "Libpq-compression", "", 16,
+       offsetof(struct pg_conn, compression)},
+
        {"target_session_attrs", "PGTARGETSESSIONATTRS",
                DefaultTargetSessionAttrs, NULL,
                "Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
@@ -458,6 +463,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
 void
 pqDropConnection(PGconn *conn, bool flushInput)
 {
+       /* Release compression streams */
+       zpq_free(conn->zstream);
+       conn->zstream = NULL;
+
        /* Drop any SSL state */
        pqsecure_close(conn);
 
@@ -2148,6 +2157,16 @@ connectDBComplete(PGconn *conn)
                                return 1;               /* success! */
 
                        case PGRES_POLLING_READING:
+
+                               /*
+                                * if there is some buffered RX data in 
ZpqStream then don't
+                                * proceed to pqWaitTimed
+                                */
+                               if (zpq_buffered_rx(conn->zstream))
+                               {
+                                       break;
+                               }
+
                                ret = pqWaitTimed(1, 0, conn, finish_time);
                                if (ret == -1)
                                {
@@ -3216,11 +3235,65 @@ keep_going:                                             
/* We will come back to here until there is
                                 */
                                conn->inCursor = conn->inStart;
 
-                               /* Read type byte */
-                               if (pqGetc(&beresp, conn))
+                               while (1)
                                {
-                                       /* We'll come back when there is more 
data */
-                                       return PGRES_POLLING_READING;
+                                       /* Read type byte */
+                                       if (pqGetc(&beresp, conn))
+                                       {
+                                               /* We'll come back when there 
is more data */
+                                               return PGRES_POLLING_READING;
+                                       }
+
+                                       if (beresp == 'z')      /* Switch on 
compression */
+                                       {
+                                               int                     index;
+                                               char            resp;
+
+                                               /* Read message length word */
+                                               if (pqGetInt(&msgLength, 4, 
conn))
+                                               {
+                                                       /* We'll come back when 
there is more data */
+                                                       return 
PGRES_POLLING_READING;
+                                               }
+                                               if (msgLength != 5)
+                                               {
+                                                       
appendPQExpBuffer(&conn->errorMessage,
+                                                                               
          libpq_gettext(
+                                                                               
                                        "expected compression algorithm 
specification message length is 5 bytes, but %d is received\n"),
+                                                                               
          msgLength);
+                                                       goto error_return;
+                                               }
+                                               pqGetc(&resp, conn);
+                                               index = resp;
+                                               if (index == (char) -1)
+                                               {
+                                                       
appendPQExpBuffer(&conn->errorMessage,
+                                                                               
          libpq_gettext(
+                                                                               
                                        "server is not supported requested 
compression algorithms %s\n"),
+                                                                               
          conn->compression);
+                                                       goto error_return;
+                                               }
+                                               Assert(!conn->zstream);
+                                               conn->zstream = 
zpq_create(conn->compressors[index].impl,
+                                                                               
                   conn->compressors[index].level,
+                                                                               
                   (zpq_tx_func) pqsecure_write, (zpq_rx_func) pqsecure_read, 
conn,
+                                                                               
                   &conn->inBuffer[conn->inCursor], conn->inEnd - 
conn->inCursor);
+                                               if (!conn->zstream)
+                                               {
+                                                       char      
**supported_algorithms = zpq_get_supported_algorithms();
+
+                                                       
appendPQExpBuffer(&conn->errorMessage,
+                                                                               
          libpq_gettext(
+                                                                               
                                        "failed to initialize compressor %s\n"),
+                                                                               
          supported_algorithms[conn->compressors[index].impl]);
+                                                       
free(supported_algorithms);
+                                                       goto error_return;
+                                               }
+                                               /* reset buffer */
+                                               conn->inStart = conn->inCursor 
= conn->inEnd = 0;
+                                       }
+                                       else
+                                               break;
                                }
 
                                /*
@@ -4020,6 +4093,8 @@ freePGconn(PGconn *conn)
                free(conn->dbName);
        if (conn->replication)
                free(conn->replication);
+       if (conn->compression)
+               free(conn->compression);
        if (conn->pguser)
                free(conn->pguser);
        if (conn->pgpass)
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index eea0237c3a..6d5574aebe 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1810,8 +1810,8 @@ PQgetResult(PGconn *conn)
                 * EOF indication.  We expect therefore that this won't result 
in any
                 * undue delay in reporting a previous write failure.)
                 */
-               if (flushResult ||
-                       pqWait(true, false, conn) ||
+               if (flushResult || (zpq_buffered_rx(conn->zstream) == 0 &&
+                                                       pqWait(true, false, 
conn)) ||
                        pqReadData(conn) < 0)
                {
                        /*
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 4ffc7f33fb..94025e311f 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,12 +53,24 @@
 #include "pg_config_paths.h"
 #include "port/pg_bswap.h"
 
+#include  <common/zpq_stream.h>
+
 static int     pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
 static int     pqSendSome(PGconn *conn, int len);
 static int     pqSocketCheck(PGconn *conn, int forRead, int forWrite,
                                                  time_t end_time);
 static int     pqSocketPoll(int sock, int forRead, int forWrite, time_t 
end_time);
 
+/*
+ * Use zpq_read if compression is switched on
+ */
+#define pq_read_conn(conn)                                                     
                                        \
+       (conn->zstream                                                          
                                                \
+        ? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd,                
        \
+                               conn->inBufSize - conn->inEnd)                  
                                \
+        : pqsecure_read(conn, conn->inBuffer + conn->inEnd,                    
        \
+                                        conn->inBufSize - conn->inEnd))
+
 /*
  * PQlibVersion: return the libpq version number
  */
@@ -664,10 +676,17 @@ pqReadData(PGconn *conn)
 
        /* OK, try to read some data */
 retry3:
-       nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-                                                 conn->inBufSize - 
conn->inEnd);
+       nread = pq_read_conn(conn);
        if (nread < 0)
        {
+               if (nread == ZPQ_DECOMPRESS_ERROR)
+               {
+                       printfPQExpBuffer(&conn->errorMessage,
+                                                         
libpq_gettext("decompress error: %s\n"),
+                                                         
zpq_error(conn->zstream));
+                       return -1;
+               }
+
                switch (SOCK_ERRNO)
                {
                        case EINTR:
@@ -759,10 +778,18 @@ retry3:
         * arrived.
         */
 retry4:
-       nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-                                                 conn->inBufSize - 
conn->inEnd);
+       nread = pq_read_conn(conn);
+
        if (nread < 0)
        {
+               if (nread == ZPQ_DECOMPRESS_ERROR)
+               {
+                       printfPQExpBuffer(&conn->errorMessage,
+                                                         
libpq_gettext("decompress error: %s\n"),
+                                                         
zpq_error(conn->zstream));
+                       return -1;
+               }
+
                switch (SOCK_ERRNO)
                {
                        case EINTR:
@@ -875,12 +902,18 @@ pqSendSome(PGconn *conn, int len)
        }
 
        /* while there's still data to send */
-       while (len > 0)
+       while (len > 0 || zpq_buffered_tx(conn->zstream))
        {
                int                     sent;
+               size_t          processed = 0;
 
+               /*
+                * Use zpq_write if compression is switched on
+                */
+               sent = conn->zstream
+                       ? zpq_write(conn->zstream, ptr, len, &processed)
 #ifndef WIN32
-               sent = pqsecure_write(conn, ptr, len);
+                       : pqsecure_write(conn, ptr, len);
 #else
 
                /*
@@ -888,8 +921,11 @@ pqSendSome(PGconn *conn, int len)
                 * failure-point appears to be different in different versions 
of
                 * Windows, but 64k should always be safe.
                 */
-               sent = pqsecure_write(conn, ptr, Min(len, 65536));
+:                      pqsecure_write(conn, ptr, Min(len, 65536));
 #endif
+               ptr += processed;
+               len -= processed;
+               remaining -= processed;
 
                if (sent < 0)
                {
@@ -943,7 +979,7 @@ pqSendSome(PGconn *conn, int len)
                        remaining -= sent;
                }
 
-               if (len > 0)
+               if (len > 0 || sent < 0 || zpq_buffered_tx(conn->zstream))
                {
                        /*
                         * We didn't send it all, wait till we can send more.
@@ -1034,6 +1070,8 @@ pqFlush(PGconn *conn)
 int
 pqWait(int forRead, int forWrite, PGconn *conn)
 {
+       if (forRead && conn->inCursor < conn->inEnd)
+               return 0;
        return pqWaitTimed(forRead, forWrite, conn, (time_t) -1);
 }
 
diff --git a/src/interfaces/libpq/fe-protocol3.c 
b/src/interfaces/libpq/fe-protocol3.c
index 1696525475..4c1e8789ff 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -1679,7 +1679,7 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
                        if (async)
                                return 0;
                        /* Need to load more data */
-                       if (pqWait(true, false, conn) ||
+                       if ((zpq_buffered_rx(conn->zstream) == 0 && 
pqWait(true, false, conn)) ||
                                pqReadData(conn) < 0)
                                return -2;
                        continue;
@@ -1737,7 +1737,7 @@ pqGetline3(PGconn *conn, char *s, int maxlen)
        while ((status = PQgetlineAsync(conn, s, maxlen - 1)) == 0)
        {
                /* need to load more data */
-               if (pqWait(true, false, conn) ||
+               if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, 
conn)) ||
                        pqReadData(conn) < 0)
                {
                        *s = '\0';
@@ -1975,7 +1975,7 @@ pqFunctionCall3(PGconn *conn, Oid fnid,
                if (needInput)
                {
                        /* Wait for some data to arrive (or for the channel to 
close) */
-                       if (pqWait(true, false, conn) ||
+                       if ((zpq_buffered_rx(conn->zstream) == 0 && 
pqWait(true, false, conn)) ||
                                pqReadData(conn) < 0)
                                break;
                }
@@ -2134,6 +2134,150 @@ pqBuildStartupPacket3(PGconn *conn, int *packetlen,
        return startpacket;
 }
 
+/*
+ * Build comma-separated list of compression algorithms suggested by client to 
the server.
+ * It can be either explicitly specified by user in connection string, either
+ * include all algorithms supported by client library.
+ * This functions returns true if compression string is successfully parsed and
+ * stores comma-separated list of algorithms in *client_compressors.
+ * If compression is disabled, then NULL is assigned to  *client_compressors.
+ * Also it creates array of compressor descriptors, each element of which 
corresponds
+ * the correspondent algorithm name in *client_compressors list. This array is 
stored in PGconn
+ * and is used during handshake when compassion acknowledgment response is 
received from the server.
+ */
+static bool
+build_compressors_list(PGconn *conn, char **client_compressors, bool 
build_descriptors)
+{
+       char      **supported_algorithms = zpq_get_supported_algorithms();
+       char       *value = conn->compression;
+       int                     n_supported_algorithms;
+       int                     total_len = 0;
+       int                     i;
+
+       for (n_supported_algorithms = 0; 
supported_algorithms[n_supported_algorithms] != NULL; n_supported_algorithms++)
+       {
+               total_len += 
strlen(supported_algorithms[n_supported_algorithms]) + 1;
+       }
+
+       if (pg_strcasecmp(value, "true") == 0 ||
+               pg_strcasecmp(value, "yes") == 0 ||
+               pg_strcasecmp(value, "on") == 0 ||
+               pg_strcasecmp(value, "any") == 0 ||
+               pg_strcasecmp(value, "1") == 0)
+       {
+               /* Compression is enabled: choose algorithm automatically */
+               char       *p;
+
+               if (n_supported_algorithms == 0)
+               {
+                       *client_compressors = NULL; /* no compressors are 
avaialable */
+                       conn->compressors = NULL;
+                       return true;
+               }
+               *client_compressors = p = malloc(total_len);
+               if (build_descriptors)
+                       conn->compressors = malloc(n_supported_algorithms * 
sizeof(pg_conn_compressor));
+               for (i = 0; i < n_supported_algorithms; i++)
+               {
+                       strcpy(p, supported_algorithms[i]);
+                       p += strlen(p);
+                       *p++ = ',';
+                       if (build_descriptors)
+                       {
+                               conn->compressors[i].impl = i;
+                               conn->compressors[i].level = 
ZPQ_DEFAULT_COMPRESSION_LEVEL;
+                       }
+               }
+               p[-1] = '\0';
+               return true;
+       }
+       else if (*value == 0 ||
+                        pg_strcasecmp(value, "false") == 0 ||
+                        pg_strcasecmp(value, "no") == 0 ||
+                        pg_strcasecmp(value, "off") == 0 ||
+                        pg_strcasecmp(value, "0") == 0)
+       {
+               /* Compression is disabled */
+               *client_compressors = NULL;
+               conn->compressors = NULL;
+               return true;
+       }
+       else
+       {
+               /* List of compresison algorithms separated by commas */
+               char       *src,
+                                  *dst;
+               int                     n_suggested_algorithms = 0;
+
+               *client_compressors = src = strdup(value);
+               dst = strdup(value);
+
+               if (build_descriptors)
+                       conn->compressors = malloc(n_supported_algorithms * 
sizeof(pg_conn_compressor));
+
+               while (*src != '\0')
+               {
+                       char       *sep = strchr(src, ',');
+                       char       *col;
+                       int                     compression_level = 
ZPQ_DEFAULT_COMPRESSION_LEVEL;
+
+                       if (sep != NULL)
+                               *sep = '\0';
+
+                       strcpy(dst, src);
+
+                       col = strchr(src, ':');
+                       if (col != NULL)
+                       {
+                               *col = '\0';
+                               if (sscanf(col + 1, "%d", &compression_level) 
!= 1 && !build_descriptors)
+                               {
+                                       fprintf(stderr,
+                                                       libpq_gettext("WARNING: 
invlaid compression level %s in compression option '%s'\n"),
+                                                       col + 1, value);
+                                       return false;
+                               }
+                       }
+                       for (i = 0; supported_algorithms[i] != NULL; i++)
+                       {
+                               if (pg_strcasecmp(src, supported_algorithms[i]) 
== 0)
+                               {
+                                       if (build_descriptors)
+                                       {
+                                               
conn->compressors[n_suggested_algorithms].impl = i;
+                                               
conn->compressors[n_suggested_algorithms].level = compression_level;
+                                       }
+                                       n_suggested_algorithms += 1;
+                                       dst += strlen(dst);
+                                       *dst++ = ',';
+                                       break;
+                               }
+                       }
+                       if (sep)
+                               src = sep + 1;
+                       else
+                               break;
+               }
+               if (n_suggested_algorithms == 0)
+               {
+                       if (!build_descriptors)
+                               fprintf(stderr,
+                                               libpq_gettext("WARNING: none of 
specified algirthms %s is supported by client\n"),
+                                               value);
+                       else
+                       {
+                               free(conn->compressors);
+                               conn->compressors = NULL;
+                       }
+                       free(*client_compressors);
+                       *client_compressors = NULL;
+                       return false;
+               }
+               dst[-1] = '\0';
+               return true;
+       }
+}
+
 /*
  * Build a startup packet given a filled-in PGconn structure.
  *
@@ -2180,6 +2324,18 @@ build_startup_packet(const PGconn *conn, char *packet,
                ADD_STARTUP_OPTION("replication", conn->replication);
        if (conn->pgoptions && conn->pgoptions[0])
                ADD_STARTUP_OPTION("options", conn->pgoptions);
+       if (conn->compression && conn->compression[0])
+       {
+               char       *client_compression_algorithms;
+
+               if (build_compressors_list((PGconn *) conn, 
&client_compression_algorithms, packet == NULL))
+               {
+                       if (client_compression_algorithms)
+                       {
+                               ADD_STARTUP_OPTION("_pq_.compression", 
client_compression_algorithms);
+                       }
+               }
+       }
        if (conn->send_appname)
        {
                /* Use appname if present, otherwise use fallback */
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 1de91ae295..932666f671 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -40,6 +40,7 @@
 /* include stuff common to fe and be */
 #include "getaddrinfo.h"
 #include "libpq/pqcomm.h"
+#include "common/zpq_stream.h"
 /* include stuff found in fe only */
 #include "pqexpbuffer.h"
 
@@ -317,6 +318,16 @@ typedef struct pg_conn_host
                                                                 * found in 
password file. */
 } pg_conn_host;
 
+
+/*
+ * Descriptors of compression algorithms chosen by client
+ */
+typedef struct pg_conn_compressor
+{
+       int                     impl;                   /* compression 
implementation index */
+       int                     level;                  /* compression level */
+}                      pg_conn_compressor;
+
 /*
  * PGconn stores all the state data associated with a single connection
  * to a backend.
@@ -370,6 +381,12 @@ struct pg_conn
        char       *ssl_min_protocol_version;   /* minimum TLS protocol version 
*/
        char       *ssl_max_protocol_version;   /* maximum TLS protocol version 
*/
 
+       char       *compression;        /* stream compression (boolean value, 
"any" or
+                                                                * list of 
compression algorithms separated by
+                                                                * comma) */
+       pg_conn_compressor *compressors;        /* descriptors of compression
+                                                                               
 * algorithms chosen by client */
+
        /* Type of connection to make.  Possible values: any, read-write. */
        char       *target_session_attrs;
 
@@ -527,6 +544,9 @@ struct pg_conn
 
        /* Buffer for receiving various parts of messages */
        PQExpBufferData workBuffer; /* expansible string */
+
+       /* Compression stream */
+       ZpqStream  *zstream;
 };
 
 /* PGcancel stores all data necessary to cancel a connection. A copy of this
diff --git a/src/test/regress/expected/rules.out 
b/src/test/regress/expected/rules.out
index 6293ab57bc..c01fb20dee 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1762,7 +1762,7 @@ pg_stat_activity| SELECT s.datid,
     s.backend_xmin,
     s.query,
     s.backend_type
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, 
application_name, state, query, wait_event_type, wait_event, xact_start, 
query_start, backend_start, state_change, client_addr, client_hostname, 
client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, 
sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, 
ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, 
application_name, state, query, wait_event_type, wait_event, xact_start, 
query_start, backend_start, state_change, client_addr, client_hostname, 
client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, 
sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, 
ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, 
tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_all_indexes| SELECT c.oid AS relid,
@@ -1867,8 +1867,14 @@ pg_stat_gssapi| SELECT s.pid,
     s.gss_auth AS gss_authenticated,
     s.gss_princ AS principal,
     s.gss_enc AS encrypted
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, 
application_name, state, query, wait_event_type, wait_event, xact_start, 
query_start, backend_start, state_change, client_addr, client_hostname, 
client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, 
sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, 
ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, 
application_name, state, query, wait_event_type, wait_event, xact_start, 
query_start, backend_start, state_change, client_addr, client_hostname, 
client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, 
sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, 
ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, 
tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
   WHERE (s.client_port IS NOT NULL);
+pg_stat_network_traffic| SELECT s.pid,
+    s.rx_raw_bytes,
+    s.tx_raw_bytes,
+    s.rx_compressed_bytes,
+    s.tx_compressed_bytes
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, 
application_name, state, query, wait_event_type, wait_event, xact_start, 
query_start, backend_start, state_change, client_addr, client_hostname, 
client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, 
sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, 
ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, 
tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes);
 pg_stat_progress_analyze| SELECT s.pid,
     s.datid,
     d.datname,
@@ -2015,7 +2021,7 @@ pg_stat_replication| SELECT s.pid,
     w.sync_priority,
     w.sync_state,
     w.reply_time
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, 
application_name, state, query, wait_event_type, wait_event, xact_start, 
query_start, backend_start, state_change, client_addr, client_hostname, 
client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, 
sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, 
ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, 
application_name, state, query, wait_event_type, wait_event, xact_start, 
query_start, backend_start, state_change, client_addr, client_hostname, 
client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, 
sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, 
ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, 
tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
      JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, 
flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, 
sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_replication_slots| SELECT s.slot_name,
@@ -2046,7 +2052,7 @@ pg_stat_ssl| SELECT s.pid,
     s.ssl_client_dn AS client_dn,
     s.ssl_client_serial AS client_serial,
     s.ssl_issuer_dn AS issuer_dn
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, 
application_name, state, query, wait_event_type, wait_event, xact_start, 
query_start, backend_start, state_change, client_addr, client_hostname, 
client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, 
sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, 
ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, 
application_name, state, query, wait_event_type, wait_event, xact_start, 
query_start, backend_start, state_change, client_addr, client_hostname, 
client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, 
sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, 
ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, 
tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
   WHERE (s.client_port IS NOT NULL);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index f92c14030d..6dde97165f 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -123,7 +123,7 @@ sub mkvcbuild
          config_info.c controldata_utils.c d2s.c encnames.c exec.c
          f2s.c file_perm.c file_utils.c hashfn.c ip.c jsonapi.c
          keywords.c kwlookup.c link-canary.c md5_common.c
-         pg_get_line.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
+         pg_get_line.c zpq_stream.c pg_lzcompress.c pgfnames.c psprintf.c 
relpath.c rmtree.c
          saslprep.c scram-common.c string.c stringinfo.c unicode_norm.c 
username.c
          wait_error.c wchar.c);
 
-- 
2.24.3 (Apple Git-128)



Best regards,
Denis Smirnov | Developer
s...@arenadata.io 
Arenadata | Godovikova 9-17, Moscow 129085 Russia

Reply via email to