On 09.01.2021 23:31, Justin Pryzby wrote:
On Thu, Dec 17, 2020 at 05:54:28PM +0300, Konstantin Knizhnik wrote:
I am maintaining this code in
g...@github.com:postgrespro/libpq_compression.git repository.
I will be pleased if anybody, who wants to suggest any bug
fixes/improvements of libpq compression, create pull requests: it will be
much easier for me to merge them.
Thanks for working on this.
I have a patch for zstd compression in pg_dump so I looked at your patch.
I'm attaching some language fixes.

Thank you very much.
I applied your patch on top of pull request of Daniil Zakhlystov who has implemented support of using different compressors in different direction.

Frankly speaking I still very skeptical concerning too much flexibility in compression configuration:
- toggle compression on the fly
- using different compression algorithms in both directions
- toggle compression on the fly

According to Daniil's results there is only 30% differences in compression ration between zstd:1 and zstd:19. But making it possible to specify arbitrary compression level we give user of a simple tool to attack server (cause CPU/memory exhaustion).


+zstd_create(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, 
char* rx_data, size_t rx_data_size)
+zlib_create(int level, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, 
char* rx_data, size_t rx_data_size)
+build_compressors_list(PGconn *conn, char** client_compressors, bool 
build_descriptors)
Are you able to run pg_indent to fix all the places where "*" is before the
space ?  (And similar style issues).

Also done by Daniil.


There are several compression patches in the commitfest, I'm not sure how much
they need to be coordinated, but for sure we should coordinate the list of
compressions available at compile time.

Maybe there should be a central structure for this, or maybe just the ID
numbers of compressors should be a common enum/define.  In my patch, I have:

+struct compressLibs {
+       const CompressionAlgorithm alg;
+       const char      *name;                  /* Name in -Z alg= */
+       const char      *suffix;                /* file extension */
+       const int       defaultlevel;   /* Default compression level */
+};

Maybe we'd also want to store the "magic number" of each compression library.
Maybe there'd also be a common parsing of compression options.

You're supporting a syntax like zlib,zstd:5, but zstd also supports long-range,
checksum, and rsyncable modes (rsyncable is relevant to pg_dump, but not to
libpq).

There are at least three places in Postgres where compression is used right :
1. TOAST and extened attributes compression.
2. pg_basebackup compression
3. pg_dump compression

And there are also patches for
4. page compression
5. protocol compression


It is awful that all this five places are using compression in their own way. It seems to me that compression (as well as all other system dependent stuff as socket IO, file IO, synchronization primitives,...) should be extracted to SAL (system-abstract layer) where then can be used both by backend, frontend and utilities. Including external utilities, like pg_probackup, pg_bouncer, Odyssey, ... Unfortunately such refactoring requires so much efforts, that it can have any chance to be committed if this work will be coordinated by one of the core committers.


I think your patch has an issue here.  You have this:

src/interfaces/libpq/fe-connect.c

+            pqGetc(&resp, conn);
+            index = resp;
+            if (index == (char)-1)
+            {
+                    appendPQExpBuffer(&conn->errorMessage,
+                                                      libpq_gettext(
+                                                              "server is not 
supported requested compression algorithms %s\n"),
+                                                      conn->compression);
+                    goto error_return;
+            }
+            Assert(!conn->zstream);
+            conn->zstream = zpq_create(conn->compressors[index].impl,
+                                                               
conn->compressors[index].level,
+                                                               
(zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn,
+                                                               
&conn->inBuffer[conn->inCursor], conn->inEnd-conn->inCursor);

This takes the "index" returned by the server and then accesses
conn->compressors[index] without first checking if the index is out of range,
so a malicious server could (at least) crash the client by returning index=666.

Thank you for pointed this problem. I will add the check, although I think that problem of malicious server is less critical than malicious client. Also such "byzantine" server may return wrong data, which in many cases
is more fatal than crash of a client.
I suggest that there should be an enum of algorithms, which is constant across
all servers.  They would be unconditionally included and not #ifdef depending
on compilation options.

I do not think that it is possible (even right now, it is possible to build Postgres without zlib support). Also if new compression algorithms  are added, then in any case we have to somehow handle situation when
old client is connected to new server and visa versa.


That would affect the ZpqAlgorithm data structure, which would include an ID
number similar to
src/bin/pg_dump/compress_io.h:typedef enum...CompressionAlgorithm;

The CompressionAck would send the ID rather than the "index".
A protocol analyzer like wireshark could show "Compression: Zstd".
You'd have to verify that the ID is supported (and not bogus).

Right now, when I try to connect to an unpatched server, I get:
psql: error: expected authentication request from server, but received v

Thank you for pointing it: fixed.
+/*
+ * Array with all supported compression algorithms.
+ */
+static ZpqAlgorithm const zpq_algorithms[] =
+{
+#if HAVE_LIBZSTD
+       {zstd_name, zstd_create, zstd_read, zstd_write, zstd_free, zstd_error, 
zstd_buffered_tx, zstd_buffered_rx},
+#endif
+#if HAVE_LIBZ
+       {zlib_name, zlib_create, zlib_read, zlib_write, zlib_free, zlib_error, 
zlib_buffered_tx, zlib_buffered_rx},
+#endif
+       {no_compression_name}
+};

In config.sgml, it says that libpq_compression defaults to on (on the server
side), but in libpq.sgml it says that it defaults to off (on the client side).
Is that what's intended ?  I would've thought the defaults would match, or that
the server would enforce a default more conservative than the client's (the DBA
should probably have to explicitly enable compression, and need to "opt-in"
rather than "opt-out").

Yes, it is intended behavior:  libpq_compression GUC allows to prohibit compression requests fro clients
if due to some reasons (security, CPU consumption is not desired).
But by default server should support compression if it is requested by client. But client should not request compression by default: it makes sense only for queries returning large result sets
or transferring a lot of data (liek COPY).


Maybe instead of a boolean, this should be a list of permitted compression
algorithms.  This allows the admin to set a "policy" rather than using the
server's hard-coded preferences.  This could be important to disable an
algorithm at run-time if there's a vulnerability found, or performance problem,
or buggy client, or for diagnostics, or performance testing.  Actually, I think
it may be important to allow the admin to disable not just an algorithm, but
also its options.  It should be possible for the server to allow "zstd"
compression but not "zstd --long", or zstd:99 or arbitrarily large window
sizes.  This seems similar to SSL cipher strings.  I think we'd want to be able
to allow (or prohibit) at least alg:* (with options) or alg (with default
options).

Sorry, may be you are looking not at the latest version of the patch?
Right now "compression" parameter accepts not only boolean values but also list of suggested algorithms with optional compression level, like "zstd:1,zlib"

Your patch documents "libpq_compression=auto" but that doesn't work:
WARNING: none of specified algirthms auto is supported by client
I guess you mean "any" which is what's implemented.
I suggest to just remove that.
It is some inconsistency with documentation.
It seems to me that I have already fixed it. "auto" was renamed to "any",
I think maybe your patch should include a way to trivially change the client's
compile-time default:
"if (!conn->compression) conn->compression = DefaultCompression"

It can be done using PG_COMPRESSION environment variable.
Do we need some other mechanism for it?


Your patch warns if *none* of the specified algorithms are supported, but I
wonder if it should warn if *any* of them are unsupported (like if someone
writes libz instead of zlib, or zst/libzstd instead of zstd, which I guess
about half of us will do).
Ugh... Handling mistyping in connection string seems to be not so good idea (from my point of view). And situation when some of algorithms is not supported by server seems to normal (if new client connects to old server).
So I do not want to produce warning in this case.

I once again want to repeat my opinion: choosing of compression algorithms should be done automatically. Client should just make an intention to use compression (using compression=on or compression=any) and server should choose most efficient algorithm which is supported by both of them.



$ LD_LIBRARY_PATH=./src/interfaces/libpq PGCOMPRESSION=abc,def 
src/bin/psql/psql 'host=localhost port=1111'
WARNING: none of the specified algorithms are supported by client: abc,def
$ LD_LIBRARY_PATH=./src/interfaces/libpq PGCOMPRESSION=abc,zlib 
src/bin/psql/psql 'host=localhost port=1111'
(no warning)

The libpq_compression GUC can be set for a user, like ALTER ROLE .. SET ...
Is there any utility in making it configurable by client address?  Or by
encryption?  I'm thinking of a policy like "do not allow compression from LOCAL
connection" or "do not allow compression on encrypted connection".  Maybe this
would be somehow integrated into pg_hba.  But maybe it's not needed (a separate
user could be used to allow/disallow compression).

There is definitely no sense to use compression together with encryption: if compression is desired in this case, it cane be done at SSL level. But I do not think that we need more sophisticated mechanism to prohibit compression requests at server level. Yes, it is possible to grant privileges to use compression to some particular role. Or to prohibit it for SSL connections.
But I can't imagine some natural arguments for it.
May be I am wrong. I will be pleased of somebody can describe some realistic scenarios when any of this features may be needed (taken in account that compression of libpq traffic is not something very bad, insecure or expensive, which can harm server). Misuse of libpq compression may just consume that extra CPU (but not so much to overload server). In any case: we do not have such mechanism to restrict of use SSL connections for some particular roles or disable it for local connection. Why do we need it for compression, which is very similar with encryption?


New version of libpq compression patch is attached.
It can be also be found at g...@github.com:postgrespro/libpq_compression.git

--
Konstantin Knizhnik
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/configure b/configure
index dd64692..9f70773 100755
--- a/configure
+++ b/configure
@@ -700,6 +700,7 @@ LD
 LDFLAGS_SL
 LDFLAGS_EX
 with_zlib
+with_zstd
 with_system_tzdata
 with_libxslt
 XML2_LIBS
@@ -867,6 +868,7 @@ with_libxml
 with_libxslt
 with_system_tzdata
 with_zlib
+with_zstd
 with_gnu_ld
 enable_largefile
 '
@@ -8571,6 +8573,85 @@ fi
 
 
 
+#
+# ZStd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+  withval=$with_zstd;
+  case $withval in
+    yes)
+      ;;
+    no)
+      :
+      ;;
+    *)
+      as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
+      ;;
+  esac
+
+else
+  with_zstd=no
+
+fi
+
+
+
+
+if test "$with_zstd" = yes ; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5
+$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd  $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_compress ();
+int
+main ()
+{
+return ZSTD_compress ();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_lib_zstd_ZSTD_compress=yes
+else
+  ac_cv_lib_zstd_ZSTD_compress=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+  LIBS="-lzstd $LIBS"
+
+else
+  as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5
+fi
+
+fi
+
+
 
 #
 # Zlib
diff --git a/configure.ac b/configure.ac
index 748fb50..4e2435c 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1000,6 +1000,13 @@ PGAC_ARG_BOOL(with, zlib, yes,
 AC_SUBST(with_zlib)
 
 #
+# Zstd
+#
+PGAC_ARG_BOOL(with, zstd, no,
+              [use zstd])
+AC_SUBST(with_zstd)
+
+#
 # Assignments
 #
 
@@ -1186,6 +1193,13 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_zstd" = yes; then
+  AC_CHECK_LIB(zstd, ZSTD_decompressStream, [],
+               [AC_MSG_ERROR([zstd library not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.])])
+fi
+
 if test "$enable_spinlocks" = yes; then
   AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.])
 else
@@ -1400,6 +1414,13 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_zstd" = yes; then
+  AC_CHECK_HEADER(zstd.h, [], [AC_MSG_ERROR([zstd header not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support.])])
+fi
+
 if test "$with_gssapi" = yes ; then
   AC_CHECK_HEADERS(gssapi/gssapi.h, [],
 	[AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])])
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 2d88d06..0fc11c1 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8911,7 +8911,7 @@ DO $d$
     END;
 $d$;
 ERROR:  invalid option "password"
-HINT:  Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size
+HINT:  Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, compression, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size
 CONTEXT:  SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')"
 PL/pgSQL function inline_code_block line 3 at EXECUTE
 -- If we add a password for our user mapping instead, we should get a different
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 3795c57..28935d8 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -974,6 +974,22 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-libpq-compression" xreflabel="libpq_compression">
+      <term><varname>libpq_compression</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>libpq_compression</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        This parameter enables compression of libpq traffic between client and server.
+        The default is <literal>on</literal>.
+        This option allows rejecting compression requests even if it is supported by server
+        (for example, due to security, or CPU consumption).
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
      </sect2>
 
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 9d4b6ab..3267c90 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -1225,6 +1225,31 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
       </listitem>
      </varlistentry>
 
+     <varlistentry id="libpq-connect-compression" xreflabel="compression">
+      <term><literal>compression</literal></term>
+      <listitem>
+      <para>
+        Request compression of libpq traffic. The client sends a request with a list of compression algorithms.
+        Compression can be requested by a client by including the "compression" option in its connection string.
+        This can either be a boolean value to enable or disable compression
+        ("true"/"false", "on"/"off", "yes"/"no", "1"/"0"), "any", or an explicit list of comma-separated compression algorithms
+        which can optionally include compression level ("zlib,zstd:5").
+        If compression is enabled but an algorithm is not explicitly specified, the client library sends its full list of
+        supported algorithms and the server chooses a preferred algorithm.
+
+        If the server accepts one of the algorithms, it replies with an acknowledgment and all future libpq messages between client and server
+        will be compressed.
+        If the server rejects the compression request, it is up to the client whether to continue without compression or to report an error.
+        Support for compression algorithms must be enabled when the server is compiled.
+        Currently, two libraries are supported: zlib (default) and zstd (if Postgres was
+        configured with --with-zstd option). In both cases, streaming mode is used.
+        By default, compression is not requested by the client.
+        Please note that using compression together with SSL may expose extra vulnerabilities:
+        <ulink url="https://en.wikipedia.org/wiki/CRIME";>CRIME</ulink>
+      </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="libpq-connect-client-encoding" xreflabel="client_encoding">
       <term><literal>client_encoding</literal></term>
       <listitem>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index cee2888..530a657 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -92,6 +92,15 @@
    such as <command>COPY</command>.
   </para>
 
+  <para>
+    It is possible to compress protocol data to reduce traffic and speed-up client-server interaction.
+    Compression is especially useful for importing/exporting data to/from the database using the <literal>COPY</literal> command
+    and for replication (both physical and logical). Compression can also reduce the server's response time
+    for queries returning a large amount of data (for example, JSON, BLOBs, text, ...).
+    Currently, two libraries are supported: zlib (default) and zstd (if Postgres was
+    configured with --with-zstd option).
+  </para>
+
  <sect2 id="protocol-message-concepts">
   <title>Messaging Overview</title>
 
@@ -263,6 +272,22 @@
      </varlistentry>
 
      <varlistentry>
+      <term>CompressionAck</term>
+      <listitem>
+       <para>
+         The server accepts the client's compression request.
+         Compression is requested when a client connection includes the "compression" option, which includes
+         a list of requested compression algorithms.
+         If the server accepts one of these algorithms, it acknowledges use of compression and
+         all subsequent libpq messages between the client and server will be compressed.
+         The server chooses an algorithm from the list specified by client and responds with the index of the chosen algorithm from the client-supplied list.
+         If the server does not accept any of the requested algorithms, then it replies with an index of -1
+         and it is up to the client whether to continue without compression or to report an error.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term>AuthenticationOk</term>
       <listitem>
        <para>
@@ -3398,6 +3423,59 @@ following:
 </listitem>
 </varlistentry>
 
+<varlistentry>
+<term>
+CompressionAck (B)
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('z')
+</term>
+<listitem>
+<para>
+  Acknowledge use of compression for protocol data. The client sends to the server a list of requested compression algorithms.
+  If the server supports any of these algorithms, it acknowledges use of this algorithm and all subsequent libpq messages between client and server
+  will be compressed.
+  The server selects the preferred algorithm from the list specified by client and responds with the
+  index of the chosen algorithm in this list.
+  If the server does not support any of the requested algorithms, it replies with -1
+  and it is up to the client whether to continue without compression or to report an error.
+  After receiving this message with algorithm index other than -1, both server and client switch to compressed mode
+  and exchange compressed messages.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32
+</term>
+<listitem>
+<para>
+                Length of message contents in bytes, including self.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Byte1
+</term>
+<listitem>
+<para>
+        Index of algorithm in the list of supported algorithms specified by client or -1 if none of them are supported.
+</para>
+</listitem>
+</varlistentry>
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+
 
 <varlistentry>
 <term>
@@ -5964,6 +6042,21 @@ StartupMessage (F)
 </para>
 </listitem>
 </varlistentry>
+<varlistentry>
+<term>
+                <literal>_pq_.compression</literal>
+</term>
+<listitem>
+<para>
+                        Request compression of libpq traffic. The value is a list of compression algorithms requested by the client with an optional
+                        specification of compression level: <literal>"zlib,zstd:5"</literal>.
+                        If the server does not accept compression, the backend will ignore the _pq_.compression
+                        parameter and will not send the CompressionAck message to the frontend.
+                        By default, compression is disabled. Please note that using compression together with SSL may expose extra vulnerabilities:
+                        <ulink url="https://en.wikipedia.org/wiki/CRIME";>CRIME</ulink>.
+</para>
+</listitem>
+</varlistentry>
 </variablelist>
 
                 In addition to the above, other parameters may be listed.
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 7ca1e9a..9e11599 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -196,6 +196,7 @@ with_llvm	= @with_llvm@
 with_system_tzdata = @with_system_tzdata@
 with_uuid	= @with_uuid@
 with_zlib	= @with_zlib@
+with_zstd   = @with_zstd@
 enable_rpath	= @enable_rpath@
 enable_nls	= @enable_nls@
 enable_debug	= @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index 9706a95..f32a780 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -54,6 +54,14 @@ ifeq ($(with_systemd),yes)
 LIBS += -lsystemd
 endif
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+endif
+
 ##########################################################################
 
 all: submake-libpgport submake-catalog-headers submake-utils-headers postgres $(POSTGRES_IMP)
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 2e4aa1c..9d05ec2 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -770,6 +770,15 @@ CREATE VIEW pg_stat_activity AS
         LEFT JOIN pg_database AS D ON (S.datid = D.oid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
 
+CREATE VIEW pg_stat_network_traffic AS
+    SELECT
+	    S.pid,
+        S.rx_raw_bytes,
+        S.tx_raw_bytes,
+        S.rx_compressed_bytes,
+        S.tx_compressed_bytes
+    FROM pg_stat_get_activity(NULL) AS S;
+
 CREATE VIEW pg_stat_replication AS
     SELECT
             S.pid,
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index d7de962..7e25757 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -71,6 +71,7 @@
 #include <signal.h>
 #include <fcntl.h>
 #include <grp.h>
+#include <pgstat.h>
 #include <unistd.h>
 #include <sys/file.h>
 #include <sys/socket.h>
@@ -88,11 +89,14 @@
 
 #include "common/ip.h"
 #include "libpq/libpq.h"
+#include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "port/pg_bswap.h"
 #include "storage/ipc.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
+#include "utils/builtins.h"
+#include "common/zpq_stream.h"
 
 /*
  * Cope with the various platform-specific ways to spell TCP keepalive socket
@@ -118,6 +122,7 @@
  */
 int			Unix_socket_permissions;
 char	   *Unix_socket_group;
+bool		libpq_compression;
 
 /* Where the Unix socket files are (list of palloc'd strings) */
 static List *sock_paths = NIL;
@@ -141,6 +146,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
 static int	PqRecvLength;		/* End of data available in PqRecvBuffer */
 
+static ZpqStream * PqStream;
+
+
 /*
  * Message status
  */
@@ -183,6 +191,115 @@ const PQcommMethods *PqCommMethods = &PqCommSocketMethods;
 
 WaitEventSet *FeBeWaitSet;
 
+static ssize_t
+write_compressed(void *arg, void const *data, size_t size)
+{
+	ssize_t		rc = secure_write((Port *) arg, (void *) data, size);
+
+	if (rc > 0)
+		pgstat_report_network_traffic(0, 0, 0, rc);
+	return rc;
+}
+
+static ssize_t
+read_compressed(void *arg, void *data, size_t size)
+{
+	ssize_t		rc = secure_read((Port *) arg, data, size);
+
+	if (rc > 0)
+		pgstat_report_network_traffic(0, 0, rc, 0);
+	return rc;
+}
+
+/*
+ * Send to the client index of chosen compression algorithm
+ */
+static void
+SendCompressionACK(int algorithm)
+{
+	StringInfoData buf;
+
+	pq_beginmessage(&buf, 'z');
+	pq_sendbyte(&buf, (uint8) algorithm);
+	pq_endmessage(&buf);
+	pq_flush();
+}
+
+/* --------------------------------
+ *		pq_configure - configure connection using port settings
+ *
+ * Right now only compression is toggled in the configure.
+ * Function returns 0 in case of success, non-null in case of error
+ * --------------------------------
+ */
+int
+pq_configure(Port *port)
+{
+	char	   *client_compression_algorithms = port->compression_algorithms;
+
+	/*
+	 * If client request compression, it sends list of supported compression
+	 * algorithms separated by comma.
+	 */
+	if (client_compression_algorithms && libpq_compression)
+	{
+		int			compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+		int			impl = -1;
+		char	  **server_compression_algorithms = zpq_get_supported_algorithms();
+		int			index = -1;
+		char	   *protocol_extension = strchr(client_compression_algorithms, ';');
+
+		/* No protocol extension are currently supported */
+		if (protocol_extension)
+			*protocol_extension = '\0';
+
+		for (int i = 0; *client_compression_algorithms; i++)
+		{
+			char	   *sep = strchr(client_compression_algorithms, ',');
+			char	   *level;
+
+			if (sep != NULL)
+				*sep = '\0';
+
+			level = strchr(client_compression_algorithms, ':');
+			if (level != NULL)
+			{
+				*level = '\0';	/* compression level is ignored now */
+				if (sscanf(level + 1, "%d", &compression_level) != 1)
+					ereport(LOG,
+							(errmsg("Invalid compression level: %s", level + 1)));
+			}
+			for (impl = 0; server_compression_algorithms[impl] != NULL; impl++)
+			{
+				if (pg_strcasecmp(client_compression_algorithms, server_compression_algorithms[impl]) == 0)
+				{
+					index = i;
+					goto SendCompressionAck;
+				}
+			}
+
+			if (sep != NULL)
+				client_compression_algorithms = sep + 1;
+			else
+				break;
+		}
+SendCompressionAck:
+		free(server_compression_algorithms);
+		SendCompressionACK(index);
+
+		if (index >= 0)			/* Use compression */
+		{
+			PqStream = zpq_create(impl, compression_level, impl, write_compressed, read_compressed, MyProcPort, NULL, 0);
+			if (!PqStream)
+			{
+				ereport(LOG,
+						(errmsg("Failed to initialize compressor %s", server_compression_algorithms[impl])));
+				return -1;
+			}
+		}
+	}
+	return 0;
+}
 
 /* --------------------------------
  *		pq_init - initialize libpq at backend startup
@@ -280,6 +397,9 @@ socket_close(int code, Datum arg)
 		free(MyProcPort->gss);
 #endif							/* ENABLE_GSS || ENABLE_SSPI */
 
+		/* Release compression streams */
+		zpq_free(PqStream);
+
 		/*
 		 * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
 		 * call this, so this is safe when interrupting BackendInitialize().
@@ -919,12 +1039,15 @@ socket_set_nonblocking(bool nonblocking)
 /* --------------------------------
  *		pq_recvbuf - load some bytes into the input buffer
  *
- *		returns 0 if OK, EOF if trouble
+ *      nowait parameter toggles non-blocking mode.
+ *		returns number of read bytes, EOF if trouble
  * --------------------------------
  */
 static int
-pq_recvbuf(void)
+pq_recvbuf(bool nowait)
 {
+	int			r;
+
 	if (PqRecvPointer > 0)
 	{
 		if (PqRecvLength > PqRecvPointer)
@@ -940,21 +1063,40 @@ pq_recvbuf(void)
 	}
 
 	/* Ensure that we're in blocking mode */
-	socket_set_nonblocking(false);
+	socket_set_nonblocking(nowait);
 
 	/* Can fill buffer from PqRecvLength and upwards */
 	for (;;)
 	{
-		int			r;
-
-		r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
-						PQ_RECV_BUFFER_SIZE - PqRecvLength);
+		/*
+		 * If streaming compression is enabled then use correspondent
+		 * compression read function.
+		 */
+		r = PqStream
+			? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
+					   PQ_RECV_BUFFER_SIZE - PqRecvLength)
+			: secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
+						  PQ_RECV_BUFFER_SIZE - PqRecvLength);
 
 		if (r < 0)
 		{
+			if (r == ZPQ_DECOMPRESS_ERROR)
+			{
+				char const *msg = zpq_decompress_error(PqStream);
+
+				if (msg == NULL)
+					msg = "end of stream";
+				ereport(COMMERROR,
+						(errcode_for_socket_access(),
+						 errmsg("failed to decompress data: %s", msg)));
+				return EOF;
+			}
 			if (errno == EINTR)
 				continue;		/* Ok if interrupted */
 
+			if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK))
+				return 0;
+
 			/*
 			 * Careful: an ereport() that tries to write to the client would
 			 * cause recursion to here, leading to stack overflow and core
@@ -975,7 +1117,8 @@ pq_recvbuf(void)
 		}
 		/* r contains number of bytes read, so just incr length */
 		PqRecvLength += r;
-		return 0;
+		pgstat_report_network_traffic(r, 0, 0, 0);
+		return r;
 	}
 }
 
@@ -990,7 +1133,8 @@ pq_getbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+										 * some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer++];
@@ -1009,7 +1153,8 @@ pq_peekbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+										 * some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer];
@@ -1026,48 +1171,15 @@ pq_peekbyte(void)
 int
 pq_getbyte_if_available(unsigned char *c)
 {
-	int			r;
+	int			r = 0;
 
 	Assert(PqCommReadingMsg);
 
-	if (PqRecvPointer < PqRecvLength)
+	if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0)
 	{
 		*c = PqRecvBuffer[PqRecvPointer++];
 		return 1;
 	}
-
-	/* Put the socket into non-blocking mode */
-	socket_set_nonblocking(true);
-
-	r = secure_read(MyProcPort, c, 1);
-	if (r < 0)
-	{
-		/*
-		 * Ok if no data available without blocking or interrupted (though
-		 * EINTR really shouldn't happen with a non-blocking socket). Report
-		 * other errors.
-		 */
-		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
-			r = 0;
-		else
-		{
-			/*
-			 * Careful: an ereport() that tries to write to the client would
-			 * cause recursion to here, leading to stack overflow and core
-			 * dump!  This message must go *only* to the postmaster log.
-			 */
-			ereport(COMMERROR,
-					(errcode_for_socket_access(),
-					 errmsg("could not receive data from client: %m")));
-			r = EOF;
-		}
-	}
-	else if (r == 0)
-	{
-		/* EOF detected */
-		r = EOF;
-	}
-
 	return r;
 }
 
@@ -1088,7 +1200,8 @@ pq_getbytes(char *s, size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+											 * some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1122,7 +1235,8 @@ pq_discardbytes(size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+											 * some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1163,7 +1277,8 @@ pq_getstring(StringInfo s)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+											 * some */
 				return EOF;		/* Failed to recv data */
 		}
 
@@ -1413,13 +1528,24 @@ internal_flush(void)
 	char	   *bufptr = PqSendBuffer + PqSendStart;
 	char	   *bufend = PqSendBuffer + PqSendPointer;
 
-	while (bufptr < bufend)
+	while (bufptr < bufend || zpq_buffered_tx(PqStream) != 0)
+
+		/*
+		 * has more data to flush or unsent data in internal compression
+		 * buffer
+		 */
 	{
 		int			r;
+		size_t		processed = 0;
+		size_t		available = bufend - bufptr;
 
-		r = secure_write(MyProcPort, bufptr, bufend - bufptr);
+		r = PqStream
+			? zpq_write(PqStream, bufptr, available, &processed)
+			: secure_write(MyProcPort, bufptr, available);
+		bufptr += processed;
+		PqSendStart += processed;
 
-		if (r <= 0)
+		if (r < 0 || (r == 0 && available))
 		{
 			if (errno == EINTR)
 				continue;		/* Ok if we were interrupted */
@@ -1462,12 +1588,12 @@ internal_flush(void)
 			InterruptPending = 1;
 			return EOF;
 		}
+		pgstat_report_network_traffic(0, r, 0, 0);
 
 		last_reported_send_errno = 0;	/* reset after any successful send */
 		bufptr += r;
 		PqSendStart += r;
 	}
-
 	PqSendStart = PqSendPointer = 0;
 	return 0;
 }
@@ -1484,7 +1610,7 @@ socket_flush_if_writable(void)
 	int			res;
 
 	/* Quick exit if nothing to do */
-	if (PqSendPointer == PqSendStart)
+	if ((PqSendPointer == PqSendStart) && (zpq_buffered_tx(PqStream) == 0))
 		return 0;
 
 	/* No-op if reentrant call */
@@ -1507,7 +1633,7 @@ socket_flush_if_writable(void)
 static bool
 socket_is_send_pending(void)
 {
-	return (PqSendStart < PqSendPointer);
+	return (PqSendStart < PqSendPointer || (zpq_buffered_tx(PqStream) != 0));
 }
 
 /* --------------------------------
@@ -1985,3 +2111,16 @@ pq_settcpusertimeout(int timeout, Port *port)
 
 	return STATUS_OK;
 }
+
+PG_FUNCTION_INFO_V1(pg_compression_algorithm);
+
+Datum
+pg_compression_algorithm(PG_FUNCTION_ARGS)
+{
+	char const *algorithm_name = PqStream ? zpq_compress_algorithm_name(PqStream) : NULL;
+
+	if (algorithm_name)
+		PG_RETURN_TEXT_P(cstring_to_text(algorithm_name));
+	else
+		PG_RETURN_NULL();
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index e76e627..4714b51 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3115,6 +3115,9 @@ pgstat_bestart(void)
 	lbeentry.st_xact_start_timestamp = 0;
 	lbeentry.st_databaseid = MyDatabaseId;
 
+	lbeentry.st_tx_raw_bytes = lbeentry.st_rx_raw_bytes =
+		lbeentry.st_tx_compressed_bytes = lbeentry.st_rx_compressed_bytes = 0;
+
 	/* We have userid for client-backends, wal-sender and bgworker processes */
 	if (lbeentry.st_backendType == B_BACKEND
 		|| lbeentry.st_backendType == B_WAL_SENDER
@@ -3496,6 +3499,33 @@ pgstat_report_xact_timestamp(TimestampTz tstamp)
 	PGSTAT_END_WRITE_ACTIVITY(beentry);
 }
 
+/*
+ * Report current transaction start timestamp as the specified value.
+ * Zero means there is no active transaction.
+ */
+void
+pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes)
+{
+	volatile PgBackendStatus *beentry = MyBEEntry;
+
+	if (!pgstat_track_activities || !beentry)
+		return;
+
+	/*
+	 * Update my status entry, following the protocol of bumping
+	 * st_changecount before and after.  We use a volatile pointer here to
+	 * ensure the compiler doesn't try to get cute.
+	 */
+	PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
+
+	beentry->st_rx_raw_bytes += rx_raw_bytes;
+	beentry->st_tx_raw_bytes += tx_raw_bytes;
+	beentry->st_rx_compressed_bytes += rx_compressed_bytes;
+	beentry->st_tx_compressed_bytes += tx_compressed_bytes;
+
+	PGSTAT_END_WRITE_ACTIVITY(beentry);
+}
+
 /* ----------
  * pgstat_read_current_status() -
  *
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index b7799ed..330ddaf 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2136,6 +2136,8 @@ retry1:
 				port->database_name = pstrdup(valptr);
 			else if (strcmp(nameptr, "user") == 0)
 				port->user_name = pstrdup(valptr);
+			else if (strcmp(nameptr, "_pq_.compression") == 0)
+				port->compression_algorithms = pstrdup(valptr);
 			else if (strcmp(nameptr, "options") == 0)
 				port->cmdline_options = pstrdup(valptr);
 			else if (strcmp(nameptr, "replication") == 0)
@@ -4443,6 +4445,14 @@ BackendInitialize(Port *port)
 	if (status != STATUS_OK)
 		proc_exit(0);
 
+	if (pq_configure(port))
+	{
+		ereport(COMMERROR,
+				(errcode_for_socket_access(),
+				 errmsg("failed to send compression message: %m")));
+		proc_exit(0);
+	}
+
 	/*
 	 * Now that we have the user and database name, we can set the process
 	 * title for ps.  It's good to do this as early as possible in startup.
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index a210fc9..b93ac11 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -567,7 +567,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_activity(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_ACTIVITY_COLS	30
+#define PG_STAT_GET_ACTIVITY_COLS	34
 	int			num_backends = pgstat_fetch_stat_numbackends();
 	int			curr_backend;
 	int			pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0);
@@ -913,6 +913,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 				values[28] = BoolGetDatum(false);	/* GSS Encryption not in
 													 * use */
 			}
+			values[30] = beentry->st_rx_raw_bytes;
+			values[31] = beentry->st_tx_raw_bytes;
+			values[32] = beentry->st_rx_compressed_bytes;
+			values[33] = beentry->st_tx_compressed_bytes;
 		}
 		else
 		{
@@ -941,6 +945,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 			nulls[27] = true;
 			nulls[28] = true;
 			nulls[29] = true;
+			nulls[30] = true;
+			nulls[31] = true;
+			nulls[32] = true;
+			nulls[33] = true;
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -1140,6 +1148,46 @@ pg_stat_get_backend_start(PG_FUNCTION_ARGS)
 	PG_RETURN_TIMESTAMPTZ(result);
 }
 
+Datum
+pg_stat_get_network_traffic(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_NETWORK_TRAFFIC_COLS	4
+	TupleDesc	tupdesc;
+	Datum		values[PG_STAT_NETWORK_TRAFFIC_COLS];
+	bool		nulls[PG_STAT_NETWORK_TRAFFIC_COLS];
+	int32		beid = PG_GETARG_INT32(0);
+	PgBackendStatus *beentry;
+
+	if ((beentry = pgstat_fetch_stat_beentry(beid)) == NULL)
+		PG_RETURN_NULL();
+	else if (!HAS_PGSTAT_PERMISSIONS(beentry->st_userid))
+		PG_RETURN_NULL();
+
+	/* Initialise values and NULL flags arrays */
+	MemSet(values, 0, sizeof(values));
+	MemSet(nulls, 0, sizeof(nulls));
+
+	/* Initialise attributes information in the tuple descriptor */
+	tupdesc = CreateTemplateTupleDesc(PG_STAT_NETWORK_TRAFFIC_COLS);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "rx_raw_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "tx_raw_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "rx_compressed_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "tx_compressed_bytes",
+					   INT8OID, -1, 0);
+	BlessTupleDesc(tupdesc);
+
+	/* Fill values and NULLs */
+	values[0] = beentry->st_rx_raw_bytes;
+	values[1] = beentry->st_tx_raw_bytes;
+	values[2] = beentry->st_rx_compressed_bytes;
+	values[3] = beentry->st_tx_compressed_bytes;
+
+	/* Returns the record as Datum */
+	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
+}
 
 Datum
 pg_stat_get_backend_client_addr(PG_FUNCTION_ARGS)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index bb34630..83a3c1f 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1290,6 +1290,16 @@ static struct config_bool ConfigureNamesBool[] =
 	},
 
 	{
+		{"libpq_compression", PGC_SIGHUP, CLIENT_CONN_OTHER,
+			gettext_noop("Compress client-server traffic."),
+			NULL
+		},
+		&libpq_compression,
+		true,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"log_checkpoints", PGC_SIGHUP, LOGGING_WHAT,
 			gettext_noop("Logs each checkpoint."),
 			NULL
diff --git a/src/common/Makefile b/src/common/Makefile
index 25c55bd..bc6cba8 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -77,7 +77,8 @@ OBJS_COMMON = \
 	unicode_norm.o \
 	username.o \
 	wait_error.o \
-	wchar.o
+	wchar.o \
+	zpq_stream.o
 
 ifeq ($(with_openssl),yes)
 OBJS_COMMON += \
diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c
new file mode 100644
index 0000000..0451d2d
--- /dev/null
+++ b/src/common/zpq_stream.c
@@ -0,0 +1,684 @@
+#include "postgres_fe.h"
+#include "common/zpq_stream.h"
+#include "c.h"
+#include "pg_config.h"
+
+/*
+ * Functions implementing streaming compression algorithm
+ */
+typedef struct
+{
+	/*
+	 * Name of compression algorithm.
+	 */
+	char const *(*name) (void);
+
+	/*
+	 * Create new compression stream.
+	 * level: compression level
+	 */
+	void	   *(*create_compressor) (int level);
+
+	/*
+	 * Create new decompression stream.
+	 */
+	void	   *(*create_decompressor) ();
+
+	/*
+	 * Decompress up to "src_size" compressed bytes from *src and write up to
+	 * "dst_size" raw (decompressed) bytes to *dst. Number of decompressed
+	 * bytes written to *dst is stored in *dst_processed. Number of compressed
+	 * bytes read from *src is stored in *src_processed.
+	 *
+	 * Return codes: ZPQ_OK if no errors were encountered during decompression
+	 * attempt. This return code does not guarantee that *src_processed > 0 or
+	 * *dst_processed > 0.
+	 *
+	 * ZPQ_DATA_PENDING means that there might be some data left within
+	 * decompressor internal buffers.
+	 *
+	 * ZPQ_STREAM_END if encountered end of compressed data stream.
+	 *
+	 * ZPQ_DECOMPRESS_ERROR if encountered an error during decompression
+	 * attempt.
+	 */
+	ssize_t		(*decompress) (void *ds, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed);
+
+	/*
+	 * Compress up to "src_size" raw (non-compressed) bytes from *src and
+	 * write up to "dst_size" compressed bytes to *dst. Number of compressed
+	 * bytes written to *dst is stored in *dst_processed. Number of
+	 * non-compressed bytes read from *src is stored in *src_processed.
+	 *
+	 * Return codes: ZPQ_OK if no errors were encountered during compression
+	 * attempt. This return code does not guarantee that *src_processed > 0 or
+	 * *dst_processed > 0.
+	 *
+	 * ZPQ_DATA_PENDING means that there might be some data left within
+	 * compressor internal buffers.
+	 *
+	 * ZPQ_COMPRESS_ERROR if encountered an error during compression attempt.
+	 */
+	ssize_t		(*compress) (void *cs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed);
+
+	/*
+	 * Free compression stream created by create_compressor function.
+	 */
+	void		(*free_compressor) (void *cs);
+
+	/*
+	 * Free decompression stream created by create_decompressor function.
+	 */
+	void		(*free_decompressor) (void *ds);
+
+	/*
+	 * Get compressor error message.
+	 */
+	char const *(*compress_error) (void *cs);
+
+	/*
+	 * Get decompressor error message.
+	 */
+	char const *(*decompress_error) (void *ds);
+}			ZpqAlgorithm;
+
+
+#define ZPQ_BUFFER_SIZE       8192	/* We have to flush stream after each
+									 * protocol command and command is mostly
+									 * limited by record length, which in turn
+									 * is usually less than page size (except
+									 * TOAST)
+									 */
+
+struct ZpqStream
+{
+	ZpqAlgorithm const *c_algorithm;
+	void	   *c_stream;
+
+	ZpqAlgorithm const *d_algorithm;
+	void	   *d_stream;
+
+	char		tx_buf[ZPQ_BUFFER_SIZE];
+	size_t		tx_pos;
+	size_t		tx_size;
+
+	char		rx_buf[ZPQ_BUFFER_SIZE];
+	size_t		rx_pos;
+	size_t		rx_size;
+
+	zpq_tx_func tx_func;
+	zpq_rx_func rx_func;
+	void	   *arg;
+
+	size_t		tx_total;
+	size_t		tx_total_raw;
+	size_t		rx_total;
+	size_t		rx_total_raw;
+
+	bool		rx_not_flushed;
+	bool		tx_not_flushed;
+};
+
+#if HAVE_LIBZSTD
+
+#include <stdlib.h>
+#include <zstd.h>
+
+/*
+ * Maximum allowed back-reference distance, expressed as power of 2.
+ * This setting controls max compressor/decompressor window size.
+ * More details https://github.com/facebook/zstd/blob/v1.4.7/lib/zstd.h#L536
+ */
+#define ZSTD_WINDOWLOG_LIMIT 23 /* set max window size to 8MB */
+
+typedef struct ZPQ_ZSTD_CStream
+{
+	ZSTD_CStream *stream;
+	char const *error;			/* error message */
+}			ZPQ_ZSTD_CStream;
+
+typedef struct ZPQ_ZSTD_DStream
+{
+	ZSTD_DStream *stream;
+	char const *error;			/* error message */
+}			ZPQ_ZSTD_DStream;
+
+static void *
+zstd_create_compressor(int level)
+{
+	ZPQ_ZSTD_CStream *c_stream = (ZPQ_ZSTD_CStream *) malloc(sizeof(ZPQ_ZSTD_CStream));
+
+	c_stream->stream = ZSTD_createCStream();
+	ZSTD_initCStream(c_stream->stream, level);
+#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3
+	ZSTD_CCtx_setParameter(c_stream->stream, ZSTD_c_windowLog, ZSTD_WINDOWLOG_LIMIT);
+#endif
+	c_stream->error = NULL;
+	return c_stream;
+}
+
+static void *
+zstd_create_decompressor()
+{
+	ZPQ_ZSTD_DStream *d_stream = (ZPQ_ZSTD_DStream *) malloc(sizeof(ZPQ_ZSTD_DStream));
+
+	d_stream->stream = ZSTD_createDStream();
+	ZSTD_initDStream(d_stream->stream);
+#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3
+	ZSTD_DCtx_setParameter(d_stream->stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT);
+#endif
+	d_stream->error = NULL;
+	return d_stream;
+}
+
+static ssize_t
+zstd_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream;
+	ZSTD_inBuffer in;
+	ZSTD_outBuffer out;
+	size_t		rc;
+
+	in.src = src;
+	in.pos = 0;
+	in.size = src_size;
+
+	out.dst = dst;
+	out.pos = 0;
+	out.size = dst_size;
+
+	rc = ZSTD_decompressStream(ds->stream, &out, &in);
+
+	*src_processed = in.pos;
+	*dst_processed = out.pos;
+	if (ZSTD_isError(rc))
+	{
+		ds->error = ZSTD_getErrorName(rc);
+		return ZPQ_DECOMPRESS_ERROR;
+	}
+
+	if (out.pos == out.size)
+	{
+		/*
+		 * if `output.pos == output.size`, there might be some data left
+		 * within internal buffers
+		 */
+		return ZPQ_DATA_PENDING;
+	}
+	return ZPQ_OK;
+}
+
+static ssize_t
+zstd_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream;
+	ZSTD_inBuffer in;
+	ZSTD_outBuffer out;
+
+	in.src = src;
+	in.pos = 0;
+	in.size = src_size;
+
+	out.dst = dst;
+	out.pos = 0;
+	out.size = dst_size;
+
+	if (in.pos < src_size)		/* Has something to compress in input buffer */
+	{
+		size_t		rc = ZSTD_compressStream(cs->stream, &out, &in);
+
+		*dst_processed = out.pos;
+		*src_processed = in.pos;
+		if (ZSTD_isError(rc))
+		{
+			cs->error = ZSTD_getErrorName(rc);
+			return ZPQ_COMPRESS_ERROR;
+		}
+	}
+
+	if (in.pos == src_size)		/* All data is compressed: flush internal zstd
+								 * buffer */
+	{
+		size_t		tx_not_flushed = ZSTD_flushStream(cs->stream, &out);
+
+		*dst_processed = out.pos;
+		if (tx_not_flushed > 0)
+		{
+			return ZPQ_DATA_PENDING;
+		}
+	}
+
+	return ZPQ_OK;
+}
+
+static void
+zstd_free_compressor(void *c_stream)
+{
+	ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream;
+
+	if (cs != NULL)
+	{
+		ZSTD_freeCStream(cs->stream);
+		free(cs);
+	}
+}
+
+static void
+zstd_free_decompressor(void *d_stream)
+{
+	ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream;
+
+	if (ds != NULL)
+	{
+		ZSTD_freeDStream(ds->stream);
+		free(ds);
+	}
+}
+
+static char const *
+zstd_compress_error(void *c_stream)
+{
+	ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream;
+
+	return cs->error;
+}
+
+static char const *
+zstd_decompress_error(void *d_stream)
+{
+	ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream;
+
+	return ds->error;
+}
+
+static char const *
+zstd_name(void)
+{
+	return "zstd";
+}
+
+#endif
+
+#if HAVE_LIBZ
+
+#include <stdlib.h>
+#include <zlib.h>
+
+
+static void *
+zlib_create_compressor(int level)
+{
+	int			rc;
+	z_stream   *c_stream = (z_stream *) malloc(sizeof(z_stream));
+
+	memset(c_stream, 0, sizeof(*c_stream));
+	rc = deflateInit(c_stream, level);
+	if (rc != Z_OK)
+	{
+		free(c_stream);
+		return NULL;
+	}
+	return c_stream;
+}
+
+static void *
+zlib_create_decompressor()
+{
+	int			rc;
+	z_stream   *d_stream = (z_stream *) malloc(sizeof(z_stream));
+
+	memset(d_stream, 0, sizeof(*d_stream));
+	rc = inflateInit(d_stream);
+	if (rc != Z_OK)
+	{
+		free(d_stream);
+		return NULL;
+	}
+	return d_stream;
+}
+
+static ssize_t
+zlib_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	z_stream   *ds = (z_stream *) d_stream;
+	int			rc;
+
+	ds->next_in = (Bytef *) src;
+	ds->avail_in = src_size;
+	ds->next_out = (Bytef *) dst;
+	ds->avail_out = dst_size;
+
+	rc = inflate(ds, Z_SYNC_FLUSH);
+	*src_processed = src_size - ds->avail_in;
+	*dst_processed = dst_size - ds->avail_out;
+
+	if (rc == Z_STREAM_END)
+	{
+		return ZPQ_STREAM_END;
+	}
+	if (rc != Z_OK && rc != Z_BUF_ERROR)
+	{
+		return ZPQ_DECOMPRESS_ERROR;
+	}
+
+	return ZPQ_OK;
+}
+
+static ssize_t
+zlib_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	z_stream   *cs = (z_stream *) c_stream;
+	int			rc;
+	unsigned	deflate_pending = 0;
+
+
+	cs->next_out = (Bytef *) dst;
+	cs->avail_out = dst_size;
+	cs->next_in = (Bytef *) src;
+	cs->avail_in = src_size;
+
+	rc = deflate(cs, Z_SYNC_FLUSH);
+	Assert(rc == Z_OK);
+	*dst_processed = dst_size - cs->avail_out;
+	*src_processed = src_size - cs->avail_in;
+
+	deflatePending(cs, &deflate_pending, Z_NULL);	/* check if any data left
+													 * in deflate buffer */
+	if (deflate_pending > 0)
+	{
+		return ZPQ_DATA_PENDING;
+	}
+	return ZPQ_OK;
+}
+
+static void
+zlib_free_compressor(void *c_stream)
+{
+	z_stream   *cs = (z_stream *) c_stream;
+
+	if (cs != NULL)
+	{
+		deflateEnd(cs);
+		free(cs);
+	}
+}
+
+static void
+zlib_free_decompressor(void *d_stream)
+{
+	z_stream   *ds = (z_stream *) d_stream;
+
+	if (ds != NULL)
+	{
+		inflateEnd(ds);
+		free(ds);
+	}
+}
+
+static char const *
+zlib_error(void *stream)
+{
+	z_stream   *zs = (z_stream *) stream;
+
+	return zs->msg;
+}
+
+static char const *
+zlib_name(void)
+{
+	return "zlib";
+}
+
+#endif
+
+static char const *
+no_compression_name(void)
+{
+	return NULL;
+}
+
+/*
+ * Array with all supported compression algorithms.
+ */
+static ZpqAlgorithm const zpq_algorithms[] =
+{
+#if HAVE_LIBZSTD
+	{zstd_name, zstd_create_compressor, zstd_create_decompressor, zstd_decompress, zstd_compress, zstd_free_compressor, zstd_free_decompressor, zstd_compress_error, zstd_decompress_error},
+#endif
+#if HAVE_LIBZ
+	{zlib_name, zlib_create_compressor, zlib_create_decompressor, zlib_decompress, zlib_compress, zlib_free_compressor, zlib_free_decompressor, zlib_error, zlib_error},
+#endif
+	{no_compression_name}
+};
+
+/*
+ * Index of used compression algorithm in zpq_algorithms array.
+ */
+ZpqStream *
+zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size)
+{
+	ZpqStream  *zs = (ZpqStream *) malloc(sizeof(ZpqStream));
+
+	zs->tx_pos = 0;
+	zs->tx_size = 0;
+	zs->rx_pos = 0;
+	zs->rx_size = 0;
+	zs->tx_func = tx_func;
+	zs->rx_func = rx_func;
+	zs->arg = arg;
+	zs->tx_total = 0;
+	zs->tx_total_raw = 0;
+	zs->rx_total = 0;
+	zs->rx_total_raw = 0;
+	zs->tx_not_flushed = false;
+	zs->rx_not_flushed = false;
+
+	zs->rx_size = rx_data_size;
+	Assert(rx_data_size < ZPQ_BUFFER_SIZE);
+	memcpy(zs->rx_buf, rx_data, rx_data_size);
+
+	zs->c_algorithm = &zpq_algorithms[c_alg_impl];
+	zs->c_stream = zpq_algorithms[c_alg_impl].create_compressor(c_level);
+	if (zs->c_stream == NULL)
+	{
+		free(zs);
+		return NULL;
+	}
+	zs->d_algorithm = &zpq_algorithms[d_alg_impl];
+	zs->d_stream = zpq_algorithms[d_alg_impl].create_decompressor();
+	if (zs->d_stream == NULL)
+	{
+		free(zs);
+		return NULL;
+	}
+
+	return zs;
+}
+
+ssize_t
+zpq_read(ZpqStream * zs, void *buf, size_t size)
+{
+	size_t		buf_pos = 0;
+	size_t		rx_processed;
+	size_t		buf_processed;
+	ssize_t		rc;
+
+	while (buf_pos == 0)
+	{							/* Read until some data fetched */
+		if (zs->rx_pos == zs->rx_size)
+		{
+			zs->rx_pos = zs->rx_size = 0;	/* Reset rx buffer */
+		}
+
+		if (zs->rx_pos == zs->rx_size && !zs->rx_not_flushed)
+		{
+			ssize_t		rc = zs->rx_func(zs->arg, (char *) zs->rx_buf + zs->rx_size, ZPQ_BUFFER_SIZE - zs->rx_size);
+
+			if (rc > 0)			/* read fetches some data */
+			{
+				zs->rx_size += rc;
+				zs->rx_total += rc;
+			}
+			else				/* read failed */
+			{
+				return rc;
+			}
+		}
+
+		Assert(zs->rx_pos <= zs->rx_size);
+		rx_processed = 0;
+		buf_processed = 0;
+		rc = zs->d_algorithm->decompress(zs->d_stream,
+										 (char *) zs->rx_buf + zs->rx_pos, zs->rx_size - zs->rx_pos, &rx_processed,
+										 buf, size, &buf_processed);
+
+		zs->rx_pos += rx_processed;
+		zs->rx_total_raw += rx_processed;
+		buf_pos += buf_processed;
+		zs->rx_not_flushed = false;
+		if (rc == ZPQ_STREAM_END)
+		{
+			break;
+		}
+		if (rc == ZPQ_DATA_PENDING)
+		{
+			zs->rx_not_flushed = true;
+			continue;
+		}
+		if (rc != ZPQ_OK)
+		{
+			return ZPQ_DECOMPRESS_ERROR;
+		}
+	}
+	return buf_pos;
+}
+
+ssize_t
+zpq_write(ZpqStream * zs, void const *buf, size_t size, size_t *processed)
+{
+	size_t		buf_pos = 0;
+
+	do
+	{
+		if (zs->tx_pos == zs->tx_size)	/* Have nothing to send */
+		{
+			size_t		tx_processed = 0;
+			size_t		buf_processed = 0;
+			ssize_t		rc;
+
+			zs->tx_pos = zs->tx_size = 0;	/* Reset pointer to the beginning of buffer */
+
+			rc = zs->c_algorithm->compress(zs->c_stream,
+										   (char *) buf + buf_pos, size - buf_pos, &buf_processed,
+										   (char *) zs->tx_buf + zs->tx_size, ZPQ_BUFFER_SIZE - zs->tx_size, &tx_processed);
+
+			zs->tx_size += tx_processed;
+			buf_pos += buf_processed;
+			zs->tx_total_raw += buf_processed;
+			zs->tx_not_flushed = false;
+
+			if (rc == ZPQ_DATA_PENDING)
+			{
+				zs->tx_not_flushed = true;
+				continue;
+			}
+			if (rc != ZPQ_OK)
+			{
+				*processed = buf_pos;
+				return ZPQ_COMPRESS_ERROR;
+			}
+		}
+		while (zs->tx_pos < zs->tx_size)
+		{
+			ssize_t		rc = zs->tx_func(zs->arg, (char *) zs->tx_buf + zs->tx_pos, zs->tx_size - zs->tx_pos);
+
+			if (rc > 0)
+			{
+				zs->tx_pos += rc;
+				zs->tx_total += rc;
+			}
+			else
+			{
+				*processed = buf_pos;
+				return rc;
+			}
+		}
+
+		/*
+		 * repeat sending while there is some data in input or internal
+		 * compression buffer
+		 */
+	} while (buf_pos < size || zs->tx_not_flushed);
+
+	return buf_pos;
+}
+
+void
+zpq_free(ZpqStream * zs)
+{
+	if (zs)
+	{
+		if (zs->c_stream)
+		{
+			zs->c_algorithm->free_compressor(zs->c_stream);
+		}
+		if (zs->d_stream)
+		{
+			zs->d_algorithm->free_decompressor(zs->d_stream);
+		}
+		free(zs);
+	}
+}
+
+char const *
+zpq_compress_error(ZpqStream * zs)
+{
+	return zs->c_algorithm->compress_error(zs->c_stream);
+}
+
+char const *
+zpq_decompress_error(ZpqStream * zs)
+{
+	return zs->d_algorithm->decompress_error(zs->d_stream);
+}
+
+size_t
+zpq_buffered_rx(ZpqStream * zs)
+{
+	return zs ? zs->rx_not_flushed || (zs->rx_size - zs->rx_pos) : 0;
+}
+
+size_t
+zpq_buffered_tx(ZpqStream * zs)
+{
+	return zs ? zs->tx_not_flushed || (zs->tx_size - zs->tx_pos) : 0;
+}
+
+/*
+ * Get list of the supported algorithms.
+ */
+char	  **
+zpq_get_supported_algorithms(void)
+{
+	size_t		n_algorithms = sizeof(zpq_algorithms) / sizeof(*zpq_algorithms);
+	char	  **algorithm_names = malloc(n_algorithms * sizeof(char *));
+
+	for (size_t i = 0; i < n_algorithms; i++)
+	{
+		algorithm_names[i] = (char *) zpq_algorithms[i].name();
+	}
+
+	return algorithm_names;
+}
+
+char const *
+zpq_compress_algorithm_name(ZpqStream * zs)
+{
+	return zs ? zs->c_algorithm->name() : NULL;
+}
+
+char const *
+zpq_decompress_algorithm_name(ZpqStream * zs)
+{
+	return zs ? zs->d_algorithm->name() : NULL;
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 33dacfd..baa4d6d 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5239,9 +5239,9 @@
   proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'int4',
-  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid}',
+  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4,int8,int8,int8,int8}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}',
   prosrc => 'pg_stat_get_activity' },
 { oid => '3318',
   descr => 'statistics: information about progress of backends running maintenance command',
@@ -5507,6 +5507,14 @@
   proargnames => '{wal_buffers_full,stats_reset}',
   prosrc => 'pg_stat_get_wal' },
 
+{ oid => '1137', descr => 'statistics: information about network traffic',
+  proname => 'pg_stat_get_network_traffic', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'record', proargtypes => 'int4',
+  proallargtypes => '{int4,int8,int8,int8,int8}',
+  proargmodes => '{i,o,o,o,o}',
+  proargnames => '{_beid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}',
+  prosrc => 'pg_stat_get_network_traffic' },
+
 { oid => '2306', descr => 'statistics: information about SLRU caches',
   proname => 'pg_stat_get_slru', prorows => '100', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
@@ -5638,6 +5646,10 @@
   proname => 'pg_tablespace_location', provolatile => 's', prorettype => 'text',
   proargtypes => 'oid', prosrc => 'pg_tablespace_location' },
 
+{ oid => '4142', descr => 'connection compression algorithm',
+  proname => 'pg_compression_algorithm', provolatile => 's', prorettype => 'text',
+  proargtypes => '', prosrc => 'pg_compression_algorithm' },
+
 { oid => '1946',
   descr => 'convert bytea value into some ascii-only text string',
   proname => 'encode', prorettype => 'text', proargtypes => 'bytea text',
diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h
new file mode 100644
index 0000000..e48b5ac
--- /dev/null
+++ b/src/include/common/zpq_stream.h
@@ -0,0 +1,94 @@
+/*
+ * zpq_stream.h
+ *     Streaming compression for libpq
+ */
+
+#ifndef ZPQ_STREAM_H
+#define ZPQ_STREAM_H
+
+#include <stdlib.h>
+
+#define ZPQ_OK (0)
+#define ZPQ_IO_ERROR (-1)
+#define ZPQ_DECOMPRESS_ERROR (-2)
+#define ZPQ_COMPRESS_ERROR (-3)
+#define ZPQ_STREAM_END (-4)
+#define ZPQ_DATA_PENDING (-5)
+
+#define ZPQ_DEFAULT_COMPRESSION_LEVEL (1)
+
+struct ZpqStream;
+typedef struct ZpqStream ZpqStream;
+
+typedef ssize_t (*zpq_tx_func) (void *arg, void const *data, size_t size);
+typedef ssize_t (*zpq_rx_func) (void *arg, void *data, size_t size);
+
+/*
+ * Create compression stream with rx/tx function for reading/sending compressed data.
+ * c_alg_impl: index of chosen compression algorithm
+ * c_level: compression c_level
+ * d_alg_impl: index of chosen decompression algorithm
+ * tx_func: function for writing compressed data in underlying stream
+ * rx_func: function for reading compressed data from underlying stream
+ * arg: context passed to the function
+ * rx_data: received data (compressed data already fetched from input stream)
+ * rx_data_size: size of data fetched from input stream
+ */
+extern ZpqStream  *zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size);
+
+/*
+ * Read up to "size" raw (decompressed) bytes.
+ * Returns number of decompressed bytes or error code.
+ * Error code is either ZPQ_DECOMPRESS_ERROR or error code returned by the rx function.
+ */
+extern ssize_t		zpq_read(ZpqStream * zs, void *buf, size_t size);
+
+/*
+ * Write up to "size" raw (decompressed) bytes.
+ * Returns number of written raw bytes or error code.
+ * Error code is either ZPQ_COMPRESS_ERROR or error code returned by the tx function.
+ * In the last case number of bytes written is stored in *processed.
+ */
+extern ssize_t		zpq_write(ZpqStream * zs, void const *buf, size_t size, size_t *processed);
+
+/*
+ * Get decompressor error message.
+ */
+extern char const *zpq_decompress_error(ZpqStream * zs);
+
+/*
+ * Get compressor error message.
+ */
+extern char const *zpq_compress_error(ZpqStream * zs);
+
+/*
+ * Return an estimated amount of data in internal rx decompression buffer.
+ */
+extern size_t		zpq_buffered_rx(ZpqStream * zs);
+
+/*
+ * Return an estimated amount of data in internal tx compression buffer.
+ */
+extern size_t		zpq_buffered_tx(ZpqStream * zs);
+
+/*
+ * Free stream created by zpq_create function.
+ */
+extern void		zpq_free(ZpqStream * zs);
+
+/*
+ * Get the name of chosen compression algorithm.
+ */
+extern char const *zpq_compress_algorithm_name(ZpqStream * zs);
+
+/*
+ * Get the name of chosen decompression algorithm.
+ */
+extern char const *zpq_decompress_algorithm_name(ZpqStream * zs);
+
+/*
+  Returns zero terminated array with compression algorithms names
+*/
+extern char	  **zpq_get_supported_algorithms(void);
+
+#endif
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 0a23281..a24bbca 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -170,6 +170,9 @@ typedef struct Port
 	int			keepalives_count;
 	int			tcp_user_timeout;
 
+	char	   *compression_algorithms; /* Compression algorithms supported by
+										 * client */
+
 	/*
 	 * GSSAPI structures.
 	 */
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index b115247..de24a3f 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -63,6 +63,7 @@ extern void StreamClose(pgsocket sock);
 extern void TouchSocketFiles(void);
 extern void RemoveSocketFiles(void);
 extern void pq_init(void);
+extern int	pq_configure(Port *port);
 extern int	pq_getbytes(char *s, size_t len);
 extern int	pq_getstring(StringInfo s);
 extern void pq_startmsgread(void);
diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h
index 781d86c..c0d15f1 100644
--- a/src/include/libpq/pqcomm.h
+++ b/src/include/libpq/pqcomm.h
@@ -150,6 +150,7 @@ typedef struct StartupPacket
 } StartupPacket;
 
 extern bool Db_user_namespace;
+extern bool libpq_compression;
 
 /*
  * In protocol 3.0 and later, the startup packet length is not fixed, but
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index de8f838..b336eeb 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -349,6 +349,9 @@
 /* Define to 1 if you have the `link' function. */
 #undef HAVE_LINK
 
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
 /* Define to 1 if the system has the type `locale_t'. */
 #undef HAVE_LOCALE_T
 
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 257e515..9b86ee0 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -1190,6 +1190,12 @@ typedef struct PgBackendStatus
 	/* application name; MUST be null-terminated */
 	char	   *st_appname;
 
+	/* client-server traffic information */
+	uint64		st_rx_raw_bytes;
+	uint64		st_tx_raw_bytes;
+	uint64		st_rx_compressed_bytes;
+	uint64		st_tx_compressed_bytes;
+
 	/*
 	 * Current command string; MUST be null-terminated. Note that this string
 	 * possibly is truncated in the middle of a multi-byte character. As
@@ -1403,6 +1409,7 @@ extern void pgstat_report_activity(BackendState state, const char *cmd_str);
 extern void pgstat_report_tempfile(size_t filesize);
 extern void pgstat_report_appname(const char *appname);
 extern void pgstat_report_xact_timestamp(TimestampTz tstamp);
+extern void pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes);
 extern const char *pgstat_get_wait_event(uint32 wait_event_info);
 extern const char *pgstat_get_wait_event_type(uint32 wait_event_info);
 extern const char *pgstat_get_backend_current_activity(int pid, bool checkUser);
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index 4ac5f4b..be8cb34 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -29,6 +29,20 @@ endif
 # The MSVC build system scrapes OBJS from this file.  If you change any of
 # the conditional additions of files to OBJS, update Mkvcbuild.pm to match.
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+SHLIB_LINK += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+SHLIB_LINK += -lz
+endif
+
+# We can't use Makefile variables here because the MSVC build system scrapes
+# OBJS from this file.
+
+
 OBJS = \
 	$(WIN32RES) \
 	fe-auth-scram.o \
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index e7781d0..3c704ba 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -24,6 +24,7 @@
 #include "common/ip.h"
 #include "common/link-canary.h"
 #include "common/scram-common.h"
+#include "common/zpq_stream.h"
 #include "common/string.h"
 #include "fe-auth.h"
 #include "libpq-fe.h"
@@ -350,6 +351,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 		"Replication", "D", 5,
 	offsetof(struct pg_conn, replication)},
 
+	{"compression", "PGCOMPRESSION", NULL, NULL,
+		"Libpq-compression", "", 16,
+	offsetof(struct pg_conn, compression)},
+
 	{"target_session_attrs", "PGTARGETSESSIONATTRS",
 		DefaultTargetSessionAttrs, NULL,
 		"Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
@@ -458,6 +463,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
 void
 pqDropConnection(PGconn *conn, bool flushInput)
 {
+	/* Release compression streams */
+	zpq_free(conn->zstream);
+	conn->zstream = NULL;
+
 	/* Drop any SSL state */
 	pqsecure_close(conn);
 
@@ -2148,6 +2157,12 @@ connectDBComplete(PGconn *conn)
 				return 1;		/* success! */
 
 			case PGRES_POLLING_READING:
+			    /* if there is some buffered RX data in ZpqStream
+			     * then don't proceed to pqWaitTimed */
+			    if (zpq_buffered_rx(conn->zstream)) {
+			        break;
+			    }
+
 				ret = pqWaitTimed(1, 0, conn, finish_time);
 				if (ret == -1)
 				{
@@ -3216,11 +3231,78 @@ keep_going:						/* We will come back to here until there is
 				 */
 				conn->inCursor = conn->inStart;
 
-				/* Read type byte */
-				if (pqGetc(&beresp, conn))
+				while (1)
 				{
-					/* We'll come back when there is more data */
-					return PGRES_POLLING_READING;
+					/* Read type byte */
+					if (pqGetc(&beresp, conn))
+					{
+						/* We'll come back when there is more data */
+						return PGRES_POLLING_READING;
+					}
+
+					if (beresp == 'z') /* Switch on compression */
+					{
+						int index;
+						char resp;
+						/* Read message length word */
+						if (pqGetInt(&msgLength, 4, conn))
+						{
+							/* We'll come back when there is more data */
+							return PGRES_POLLING_READING;
+						}
+						if (msgLength != 5)
+						{
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "expected compression algorithm specification message length is 5 bytes, but %d is received\n"),
+											  msgLength);
+							goto error_return;
+						}
+						pqGetc(&resp, conn);
+						index = resp;
+						if (index == (char)-1)
+						{
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "server does not support requested compression algorithms %s\n"),
+											  conn->compression);
+							goto error_return;
+						}
+						if ((unsigned)index >= conn->n_compressors)
+						{
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "server returns incorrect compression aslogirhm  index: %d\n"),
+											  index);
+							goto error_return;
+						}
+						Assert(!conn->zstream);
+						conn->zstream = zpq_create(conn->compressors[index].impl,
+												   conn->compressors[index].level,
+                                                   conn->compressors[index].impl,
+												   (zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn,
+												   &conn->inBuffer[conn->inCursor], conn->inEnd-conn->inCursor);
+						if (!conn->zstream)
+						{
+							char** supported_algorithms = zpq_get_supported_algorithms();
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "failed to initialize compressor %s\n"),
+											  supported_algorithms[conn->compressors[index].impl]);
+							free(supported_algorithms);
+							goto error_return;
+						}
+						/* reset buffer */
+						conn->inStart = conn->inCursor = conn->inEnd = 0;
+					}
+					else if (conn->n_compressors != 0 && beresp == 'v') /* negotiate protocol version */
+					{
+						appendPQExpBuffer(&conn->errorMessage,
+										  libpq_gettext(
+											  "server is not supporting libpq compression\n"));
+						goto error_return;
+					} else
+						break;
 				}
 
 				/*
@@ -4020,6 +4102,8 @@ freePGconn(PGconn *conn)
 		free(conn->dbName);
 	if (conn->replication)
 		free(conn->replication);
+	if (conn->compression)
+		free(conn->compression);
 	if (conn->pguser)
 		free(conn->pguser);
 	if (conn->pgpass)
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index eea0237..00ccefb 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1810,8 +1810,8 @@ PQgetResult(PGconn *conn)
 		 * EOF indication.  We expect therefore that this won't result in any
 		 * undue delay in reporting a previous write failure.)
 		 */
-		if (flushResult ||
-			pqWait(true, false, conn) ||
+		if (flushResult || (zpq_buffered_rx(conn->zstream) == 0 &&
+			pqWait(true, false, conn)) ||
 			pqReadData(conn) < 0)
 		{
 			/*
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 4ffc7f3..c9bae65 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,6 +53,8 @@
 #include "pg_config_paths.h"
 #include "port/pg_bswap.h"
 
+#include  <common/zpq_stream.h>
+
 static int	pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
 static int	pqSendSome(PGconn *conn, int len);
 static int	pqSocketCheck(PGconn *conn, int forRead, int forWrite,
@@ -60,6 +62,16 @@ static int	pqSocketCheck(PGconn *conn, int forRead, int forWrite,
 static int	pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
 
 /*
+ * Use zpq_read if compression is switched on
+ */
+#define pq_read_conn(conn)												\
+	(conn->zstream														\
+	 ? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd,			\
+				conn->inBufSize - conn->inEnd)							\
+	 : pqsecure_read(conn, conn->inBuffer + conn->inEnd,				\
+					 conn->inBufSize - conn->inEnd))
+
+/*
  * PQlibVersion: return the libpq version number
  */
 int
@@ -664,10 +676,17 @@ pqReadData(PGconn *conn)
 
 	/* OK, try to read some data */
 retry3:
-	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+	nread = pq_read_conn(conn);
 	if (nread < 0)
 	{
+		if (nread == ZPQ_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+                              libpq_gettext("decompress error: %s\n"),
+                              zpq_decompress_error(conn->zstream));
+			return -1;
+		}
+
 		switch (SOCK_ERRNO)
 		{
 			case EINTR:
@@ -759,10 +778,18 @@ retry3:
 	 * arrived.
 	 */
 retry4:
-	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+	nread = pq_read_conn(conn);
+
 	if (nread < 0)
 	{
+		if (nread == ZPQ_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+                              libpq_gettext("decompress error: %s\n"),
+                              zpq_decompress_error(conn->zstream));
+			return -1;
+		}
+
 		switch (SOCK_ERRNO)
 		{
 			case EINTR:
@@ -875,12 +902,17 @@ pqSendSome(PGconn *conn, int len)
 	}
 
 	/* while there's still data to send */
-	while (len > 0)
+	while (len > 0 || zpq_buffered_tx(conn->zstream))
 	{
 		int			sent;
-
+		size_t      processed = 0;
+        /*
+		 * Use zpq_write if compression is switched on
+		 */
+		sent = conn->zstream
+			? zpq_write(conn->zstream, ptr, len, &processed)
 #ifndef WIN32
-		sent = pqsecure_write(conn, ptr, len);
+			: pqsecure_write(conn, ptr, len);
 #else
 
 		/*
@@ -888,8 +920,11 @@ pqSendSome(PGconn *conn, int len)
 		 * failure-point appears to be different in different versions of
 		 * Windows, but 64k should always be safe.
 		 */
-		sent = pqsecure_write(conn, ptr, Min(len, 65536));
+			: pqsecure_write(conn, ptr, Min(len, 65536));
 #endif
+		ptr += processed;
+		len -= processed;
+		remaining -= processed;
 
 		if (sent < 0)
 		{
@@ -943,7 +978,7 @@ pqSendSome(PGconn *conn, int len)
 			remaining -= sent;
 		}
 
-		if (len > 0)
+		if (len > 0 || sent < 0 || zpq_buffered_tx(conn->zstream))
 		{
 			/*
 			 * We didn't send it all, wait till we can send more.
@@ -1034,6 +1069,8 @@ pqFlush(PGconn *conn)
 int
 pqWait(int forRead, int forWrite, PGconn *conn)
 {
+	if (forRead && conn->inCursor < conn->inEnd)
+		return 0;
 	return pqWaitTimed(forRead, forWrite, conn, (time_t) -1);
 }
 
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 1696525..fe4eaca 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -1679,7 +1679,7 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
 			if (async)
 				return 0;
 			/* Need to load more data */
-			if (pqWait(true, false, conn) ||
+			if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
 				pqReadData(conn) < 0)
 				return -2;
 			continue;
@@ -1737,7 +1737,7 @@ pqGetline3(PGconn *conn, char *s, int maxlen)
 	while ((status = PQgetlineAsync(conn, s, maxlen - 1)) == 0)
 	{
 		/* need to load more data */
-		if (pqWait(true, false, conn) ||
+		if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
 			pqReadData(conn) < 0)
 		{
 			*s = '\0';
@@ -1975,7 +1975,7 @@ pqFunctionCall3(PGconn *conn, Oid fnid,
 		if (needInput)
 		{
 			/* Wait for some data to arrive (or for the channel to close) */
-			if (pqWait(true, false, conn) ||
+			if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
 				pqReadData(conn) < 0)
 				break;
 		}
@@ -2135,6 +2135,154 @@ pqBuildStartupPacket3(PGconn *conn, int *packetlen,
 }
 
 /*
+ * Build comma-separated list of compression algorithms requested by client.
+ * It can be either explicitly specified by user in connection string, or
+ * include all algorithms supported by client library.
+ * This function returns true if the compression string is successfully parsed and
+ * stores a comma-separated list of algorithms in *client_compressors.
+ * If compression is disabled, then NULL is assigned to *client_compressors.
+ * Also it creates an array of compressor descriptors, each element of which corresponds to
+ * the corresponding algorithm name in *client_compressors list. This array is stored in PGconn
+ * and is used during handshake when a compression acknowledgment response is received from the server.
+ */
+static bool
+build_compressors_list(PGconn *conn, char** client_compressors, bool build_descriptors)
+{
+	char** supported_algorithms = zpq_get_supported_algorithms();
+	char* value = conn->compression;
+	int n_supported_algorithms;
+	int total_len = 0;
+	int i;
+
+	for (n_supported_algorithms = 0; supported_algorithms[n_supported_algorithms] != NULL; n_supported_algorithms++)
+	{
+		total_len += strlen(supported_algorithms[n_supported_algorithms])+1;
+	}
+
+	if (pg_strcasecmp(value, "true") == 0 ||
+		pg_strcasecmp(value, "yes") == 0 ||
+		pg_strcasecmp(value, "on") == 0 ||
+		pg_strcasecmp(value, "any") == 0 ||
+		pg_strcasecmp(value, "1") == 0)
+	{
+		/* Compression is enabled: choose algorithm automatically */
+		char* p;
+
+		if (n_supported_algorithms == 0)
+		{
+			*client_compressors = NULL; /* no compressors are available */
+			conn->compressors = NULL;
+			conn->n_compressors = 0;
+			return true;
+		}
+		*client_compressors = p = malloc(total_len);
+		if (build_descriptors)
+			conn->compressors = malloc(n_supported_algorithms*sizeof(pg_conn_compressor));
+		for (i = 0; i < n_supported_algorithms; i++)
+		{
+			strcpy(p, supported_algorithms[i]);
+			p += strlen(p);
+			*p++ = ',';
+			if (build_descriptors)
+			{
+				conn->compressors[i].impl = i;
+				conn->compressors[i].level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+			}
+		}
+		p[-1] = '\0';
+		return true;
+	}
+	else if (*value == 0 ||
+			 pg_strcasecmp(value, "false") == 0 ||
+			 pg_strcasecmp(value, "no") == 0 ||
+			 pg_strcasecmp(value, "off") == 0 ||
+			 pg_strcasecmp(value, "0") == 0)
+	{
+		/* Compression is disabled */
+		*client_compressors = NULL;
+		conn->compressors = NULL;
+		conn->n_compressors = 0;
+		return true;
+	}
+	else
+	{
+		/* List of compression algorithms separated by commas */
+		char *src, *dst;
+		int n_suggested_algorithms = 0;
+		char* suggested_algorithms = strdup(value);
+		src = suggested_algorithms;
+		*client_compressors = dst = strdup(value);
+
+		if (build_descriptors)
+			conn->compressors = malloc(n_supported_algorithms*sizeof(pg_conn_compressor));
+
+		while (*src != '\0')
+		{
+			char* sep = strchr(src, ',');
+			char* col;
+			int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+
+			if (sep != NULL)
+				*sep = '\0';
+
+			strcpy(dst, src);
+
+			col = strchr(src, ':');
+			if (col != NULL)
+			{
+				*col = '\0';
+				if (sscanf(col+1, "%d", &compression_level) != 1 && !build_descriptors)
+				{
+					fprintf(stderr,
+							libpq_gettext("WARNING: invalid compression level %s in compression option '%s'\n"),
+							col+1, value);
+					return false;
+				}
+			}
+			for (i = 0; supported_algorithms[i] != NULL; i++)
+			{
+				if (pg_strcasecmp(src, supported_algorithms[i]) == 0)
+				{
+					if (build_descriptors)
+					{
+						conn->compressors[n_suggested_algorithms].impl = i;
+						conn->compressors[n_suggested_algorithms].level = compression_level;
+					}
+					n_suggested_algorithms += 1;
+					dst += strlen(dst);
+					*dst++ = ',';
+					break;
+				}
+			}
+			if (sep)
+				src = sep+1;
+			else
+				break;
+		}
+		free(suggested_algorithms);
+		conn->n_compressors = n_suggested_algorithms;
+		if (n_suggested_algorithms == 0)
+		{
+			if (!build_descriptors)
+				fprintf(stderr,
+						libpq_gettext("WARNING: none of the specified algorithms are supported by client: %s\n"),
+						value);
+			else
+			{
+				free(conn->compressors);
+				conn->compressors = NULL;
+				conn->n_compressors = 0;
+			}
+			free(*client_compressors);
+			*client_compressors = NULL;
+			return false;
+		}
+		dst[-1] = '\0';
+		return true;
+	}
+}
+
+/*
  * Build a startup packet given a filled-in PGconn structure.
  *
  * We need to figure out how much space is needed, then fill it in.
@@ -2180,6 +2328,17 @@ build_startup_packet(const PGconn *conn, char *packet,
 		ADD_STARTUP_OPTION("replication", conn->replication);
 	if (conn->pgoptions && conn->pgoptions[0])
 		ADD_STARTUP_OPTION("options", conn->pgoptions);
+	if (conn->compression && conn->compression[0])
+	{
+		char* client_compression_algorithms;
+		if (build_compressors_list((PGconn*)conn, &client_compression_algorithms, packet == NULL))
+		{
+			if (client_compression_algorithms)
+			{
+				ADD_STARTUP_OPTION("_pq_.compression", client_compression_algorithms);
+			}
+		}
+	}
 	if (conn->send_appname)
 	{
 		/* Use appname if present, otherwise use fallback */
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 1de91ae..7406857 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -40,6 +40,7 @@
 /* include stuff common to fe and be */
 #include "getaddrinfo.h"
 #include "libpq/pqcomm.h"
+#include "common/zpq_stream.h"
 /* include stuff found in fe only */
 #include "pqexpbuffer.h"
 
@@ -317,6 +318,16 @@ typedef struct pg_conn_host
 								 * found in password file. */
 } pg_conn_host;
 
+
+/*
+ * Descriptors of compression algorithms chosen by client
+ */
+typedef struct pg_conn_compressor
+{
+	int			impl;			/* compression implementation index */
+	int			level;			/* compression level */
+}			pg_conn_compressor;
+
 /*
  * PGconn stores all the state data associated with a single connection
  * to a backend.
@@ -370,6 +381,13 @@ struct pg_conn
 	char	   *ssl_min_protocol_version;	/* minimum TLS protocol version */
 	char	   *ssl_max_protocol_version;	/* maximum TLS protocol version */
 
+	char	   *compression;	/* stream compression (boolean value, "any" or
+								 * list of compression algorithms separated by
+								 * comma) */
+	pg_conn_compressor *compressors;	/* descriptors of compression
+										 * algorithms chosen by client */
+	unsigned			n_compressors;  /* size of compressors array  */
+
 	/* Type of connection to make.  Possible values: any, read-write. */
 	char	   *target_session_attrs;
 
@@ -527,6 +545,9 @@ struct pg_conn
 
 	/* Buffer for receiving various parts of messages */
 	PQExpBufferData workBuffer; /* expansible string */
+
+	/* Compression stream */
+	ZpqStream  *zstream;
 };
 
 /* PGcancel stores all data necessary to cancel a connection. A copy of this
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 097ff5d..aa9359c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1762,7 +1762,7 @@ pg_stat_activity| SELECT s.datid,
     s.backend_xmin,
     s.query,
     s.backend_type
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_all_indexes| SELECT c.oid AS relid,
@@ -1867,8 +1867,14 @@ pg_stat_gssapi| SELECT s.pid,
     s.gss_auth AS gss_authenticated,
     s.gss_princ AS principal,
     s.gss_enc AS encrypted
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
   WHERE (s.client_port IS NOT NULL);
+pg_stat_network_traffic| SELECT s.pid,
+    s.rx_raw_bytes,
+    s.tx_raw_bytes,
+    s.rx_compressed_bytes,
+    s.tx_compressed_bytes
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes);
 pg_stat_progress_analyze| SELECT s.pid,
     s.datid,
     d.datname,
@@ -2015,7 +2021,7 @@ pg_stat_replication| SELECT s.pid,
     w.sync_priority,
     w.sync_state,
     w.reply_time
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
      JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_replication_slots| SELECT s.slot_name,
@@ -2046,7 +2052,7 @@ pg_stat_ssl| SELECT s.pid,
     s.ssl_client_dn AS client_dn,
     s.ssl_client_serial AS client_serial,
     s.ssl_issuer_dn AS issuer_dn
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
   WHERE (s.client_port IS NOT NULL);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 90594bd..daac58c 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -123,7 +123,7 @@ sub mkvcbuild
 	  config_info.c controldata_utils.c d2s.c encnames.c exec.c
 	  f2s.c file_perm.c file_utils.c hashfn.c ip.c jsonapi.c
 	  keywords.c kwlookup.c link-canary.c md5.c
-	  pg_get_line.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
+	  pg_get_line.c zpq_stream.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
 	  saslprep.c scram-common.c string.c stringinfo.c unicode_norm.c username.c
 	  wait_error.c wchar.c);
 

Reply via email to