On 12.01.2021 4:20, Justin Pryzby wrote:
On Mon, Jan 11, 2021 at 04:53:51PM +0300, Konstantin Knizhnik wrote:
On 09.01.2021 23:31, Justin Pryzby wrote:
I suggest that there should be an enum of algorithms, which is constant across
all servers.  They would be unconditionally included and not #ifdef depending
on compilation options.
I do not think that it is possible (even right now, it is possible to build
Postgres without zlib support).
Also if new compression algorithms  are added, then in any case we have to
somehow handle situation when
old client is connected to new server and visa versa.
I mean an enum of all compression supported in the master branch, starting with
ZLIB = 1.  I think this applies to libpq compression (which requires client and
server to handle a common compression algorithm), and pg_dump (output of which
needs to be read by pg_restore), but maybe not TOAST, the patch for which
supports extensions, with dynamically allocated OIDs.

Sorry, I do not understand the goal of introducing this enum.
Algorithms are in any case specified by name. And internally there is no need in such enum,
at least in libpq compression.

In config.sgml, it says that libpq_compression defaults to on (on the server
side), but in libpq.sgml it says that it defaults to off (on the client side).
Is that what's intended ?  I would've thought the defaults would match, or that
the server would enforce a default more conservative than the client's (the DBA
should probably have to explicitly enable compression, and need to "opt-in"
rather than "opt-out").
Yes, it is intended behavior:  libpq_compression GUC allows to prohibit
compression requests fro clients if due to some reasons (security, CPU
consumption is not desired).
But by default server should support compression if it is requested by
client.
It's not clear to me if that's true..   It may be what's convenient for you,
especially during development, but that doesn't mean it's safe or efficient or
what's generally desirable to everyone.

Definitely it is only my point of view, may be DBA will have different opinions. I think that compression is not dangerousness or resource consuming feature which should be disabled by default. At least there is on GUC prohibiting SSL connections and them are even more expensive. In any case, I will be glad to collect votes whether compression requests should be be default rejected by server or not.

But client should not request compression by default: it makes sense only for
queries returning large result sets or transferring a lot of data (liek
COPY).
I think you're making assumptions about everyone's use of the tools, and it's
better if the DBA makes that determination.  The clients aren't generally under
the admin's control, and if they don't request compression, then it's not used.
If they request compression, then the DBA still has control over whether it's
allowed.

We agree that it should be disabled by default, but I suggest that it's most
flexible if client's makes the request and allow the server to decide.  By
default the server should deny/ignore the request, with the client gracefully
falling back to no compression.

I think that compression will be most efficient for internal connections (i.e. replication, bulk data loading, ...)
which are mostly controlled by DBA.
Compression would have little effect on most queries, especially at default
level=1.
I mostly agree with you here, except that compression level 1 gives little effect. All my experiments both bith page level compression and libpq compression shows that default compression level
provides optimal balance between compression ratio and compression speed.

In Daniil's benchmark the difference between compression ration 1 and 19 in zstd is only 30%.

Right now "compression" parameter accepts not only boolean values but also
list of suggested algorithms with optional compression level, like
"zstd:1,zlib"
You're talking about the client compression param.  I'm suggesting that the
server should allow fine-grained control of what compression algs are permitted
at *runtime*.  This would allow distributions to compile with all compression
libraries enabled at configure time, and still allow an DBA to disable one
without recompiling.

Frankly speaking I do not see much sense in it.
My point of view is the following: there are several well known and widely used compression algorithms now: zlib, zstd, lz4. There are few others but results of various my benchmarks (mostly for page level compression) shows that zstd provides best compression ratio/speed  balance and lz4 - the best speed.

zlib is available almost everywhere and postgres by default is configured with zlib. So it can be considered as default compression algorithm available everywhere. If Postgres was built with zstd or lz4 (not currently supported for libpq compression) then it should be used instead of zlib
because it is faster and provides better compression quality.

So I do not see any other arguments for enabling or disabling some particular compression algorithms by DBA or suggesting it by client.

That's possible with environment variable or connection string.

I'm proposing to simplify change of its default when recompiling, just like 
this one:
src/interfaces/libpq/fe-connect.c:#define DefaultHost           "localhost"

Think this would be a 2 line change, and makes the default less hardcoded.

Sorry, my intention was the following: there is list of supported algorithms in order of decreasing their efficiency (yes, I realize that in general case it may be not always possible to provide partial order on the set of supported compression algorithms,
but right now with zstd and zlib it is trivial).
Client sends to the server its list of supported algorithms (or explicitly specified by user) and server choose most efficient and supported among them. So there is no need to specify some default in this schema. We can change order of algorithms in the array to affect choice of default algorithm.

Your patch warns if *none* of the specified algorithms are supported, but I
wonder if it should warn if *any* of them are unsupported (like if someone
writes libz instead of zlib, or zst/libzstd instead of zstd, which I guess
about half of us will do).
Ugh... Handling mistyping in connection string seems to be not so good idea
(from my point of view).
And situation when some of algorithms is not supported by server seems to
normal (if new client connects to old server).
So I do not want to produce warning in this case.
I'm not talking about warning if an alg is "not supported by server", but
rather if it's "not supported by client".  I think there should be a warning if
a connection string specifies two libraries, but one is misspelled.
Makes sense. I  add this check.


I think it's good if it *can* be automatic.  But it's also good if the DBA can
implement a simple "policy" about what's (not) supported.  If the user requests
a compression and it's not known on the *client* side I think it should warn.

BTW I think the compression should be shown in psql \conninfo

Good point: done.



+                  if ((unsigned)index >= conn->n_compressors)
+                  {
+                          appendPQExpBuffer(&conn->errorMessage,
+                                                            libpq_gettext(
+                                                                    "server returns 
incorrect compression aslogirhm  index: %d\n"),

Now, you're checking that the index is not too large, but not that it's not too
small.

Sorry, it was my"fad" to write most optimal code which is completely not relevant in this place. I specially use unsigned comparison to perform check using just one comparison instead of two.
I have added comment here.
Right now, when I try to connect to an unpatched server, I get:
psql: error: expected authentication request from server, but received v
Thank you for pointing it: fixed.
I tested that I can connect to unpatced server, thanks.

I have to confess that I don't know how this works, or what it has to do with
what the commit message claims it does ?

Are you saying I can use zlib in one direction and zstd in the other ?  How
would I do that ?

Sorry, this feature was suggested by Andres and partly implemented by Daniil.
I always think that it is useless and overkill feature.


New version of the patch with the suggested changes is attached.
diff --git a/configure b/configure
index dd64692..9f70773 100755
--- a/configure
+++ b/configure
@@ -700,6 +700,7 @@ LD
 LDFLAGS_SL
 LDFLAGS_EX
 with_zlib
+with_zstd
 with_system_tzdata
 with_libxslt
 XML2_LIBS
@@ -867,6 +868,7 @@ with_libxml
 with_libxslt
 with_system_tzdata
 with_zlib
+with_zstd
 with_gnu_ld
 enable_largefile
 '
@@ -8571,6 +8573,85 @@ fi
 
 
 
+#
+# ZStd
+#
+
+
+
+# Check whether --with-zstd was given.
+if test "${with_zstd+set}" = set; then :
+  withval=$with_zstd;
+  case $withval in
+    yes)
+      ;;
+    no)
+      :
+      ;;
+    *)
+      as_fn_error $? "no argument expected for --with-zstd option" "$LINENO" 5
+      ;;
+  esac
+
+else
+  with_zstd=no
+
+fi
+
+
+
+
+if test "$with_zstd" = yes ; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for ZSTD_compress in -lzstd" >&5
+$as_echo_n "checking for ZSTD_compress in -lzstd... " >&6; }
+if ${ac_cv_lib_zstd_ZSTD_compress+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzstd  $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char ZSTD_compress ();
+int
+main ()
+{
+return ZSTD_compress ();
+  ;
+  return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_lib_zstd_ZSTD_compress=yes
+else
+  ac_cv_lib_zstd_ZSTD_compress=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zstd_ZSTD_compress" >&5
+$as_echo "$ac_cv_lib_zstd_ZSTD_compress" >&6; }
+if test "x$ac_cv_lib_zstd_ZSTD_compress" = xyes; then :
+  cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZSTD 1
+_ACEOF
+
+  LIBS="-lzstd $LIBS"
+
+else
+  as_fn_error $? "library 'zstd' is required for ZSTD support" "$LINENO" 5
+fi
+
+fi
+
+
 
 #
 # Zlib
diff --git a/configure.ac b/configure.ac
index 748fb50..4e2435c 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1000,6 +1000,13 @@ PGAC_ARG_BOOL(with, zlib, yes,
 AC_SUBST(with_zlib)
 
 #
+# Zstd
+#
+PGAC_ARG_BOOL(with, zstd, no,
+              [use zstd])
+AC_SUBST(with_zstd)
+
+#
 # Assignments
 #
 
@@ -1186,6 +1193,13 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_zstd" = yes; then
+  AC_CHECK_LIB(zstd, ZSTD_decompressStream, [],
+               [AC_MSG_ERROR([zstd library not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.])])
+fi
+
 if test "$enable_spinlocks" = yes; then
   AC_DEFINE(HAVE_SPINLOCKS, 1, [Define to 1 if you have spinlocks.])
 else
@@ -1400,6 +1414,13 @@ failure.  It is possible the compiler isn't looking in the proper directory.
 Use --without-zlib to disable zlib support.])])
 fi
 
+if test "$with_zstd" = yes; then
+  AC_CHECK_HEADER(zstd.h, [], [AC_MSG_ERROR([zstd header not found
+If you have zstd already installed, see config.log for details on the
+failure.  It is possible the compiler isn't looking in the proper directory.
+Use --without-zstd to disable zstd support.])])
+fi
+
 if test "$with_gssapi" = yes ; then
   AC_CHECK_HEADERS(gssapi/gssapi.h, [],
 	[AC_CHECK_HEADERS(gssapi.h, [], [AC_MSG_ERROR([gssapi.h header file is required for GSSAPI])])])
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 2d88d06..0fc11c1 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8911,7 +8911,7 @@ DO $d$
     END;
 $d$;
 ERROR:  invalid option "password"
-HINT:  Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size
+HINT:  Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, compression, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size
 CONTEXT:  SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')"
 PL/pgSQL function inline_code_block line 3 at EXECUTE
 -- If we add a password for our user mapping instead, we should get a different
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 3795c57..28935d8 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -974,6 +974,22 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-libpq-compression" xreflabel="libpq_compression">
+      <term><varname>libpq_compression</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>libpq_compression</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        This parameter enables compression of libpq traffic between client and server.
+        The default is <literal>on</literal>.
+        This option allows rejecting compression requests even if it is supported by server
+        (for example, due to security, or CPU consumption).
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
      </sect2>
 
diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 9d4b6ab..bcc84d4 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -1225,6 +1225,39 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
       </listitem>
      </varlistentry>
 
+     <varlistentry id="libpq-connect-compression" xreflabel="compression">
+      <term><literal>compression</literal></term>
+      <listitem>
+      <para>
+        Request compression of libpq traffic. The client sends a request with
+        a list of compression algorithms. Compression can be requested by a client
+        by including the <literal>"compression"</literal> option in its connection string.
+        This can either be a boolean value to enable or disable compression
+        (<literal>"true"</literal>/<literal>"false"</literal>,
+        <literal>"on"</literal>/<literal>"off"</literal>,
+        <literal>"yes"</literal>/<literal>"no"</literal>,
+        <literal>"1"</literal>/<literal>"0"</literal>),
+        <literal>"any"</literal>,
+        or an explicit list of comma-separated compression algorithms
+        which can optionally include compression level (<literal>"zlib,zstd:5"</literal>).
+        If compression is enabled but an algorithm is not explicitly specified,
+        the client library sends its full list of
+        supported algorithms and the server chooses a preferred algorithm.
+
+        If the server accepts one of the algorithms, it replies with an acknowledgment
+        and all future libpq messages between client and server  will be compressed.
+        If the server rejects the compression request, it is up to the client whether
+        to continue without compression or to report an error.
+        Support for compression algorithms must be enabled when the server is compiled.
+        Currently, two libraries are supported: zlib (default) and zstd (if Postgres was
+        configured with --with-zstd option). In both cases, streaming mode is used.
+        By default, compression is not requested by the client.
+        Please note that using compression together with SSL may expose extra vulnerabilities:
+        <ulink url="https://en.wikipedia.org/wiki/CRIME";>CRIME</ulink>
+      </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="libpq-connect-client-encoding" xreflabel="client_encoding">
       <term><literal>client_encoding</literal></term>
       <listitem>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index cee2888..530a657 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -92,6 +92,15 @@
    such as <command>COPY</command>.
   </para>
 
+  <para>
+    It is possible to compress protocol data to reduce traffic and speed-up client-server interaction.
+    Compression is especially useful for importing/exporting data to/from the database using the <literal>COPY</literal> command
+    and for replication (both physical and logical). Compression can also reduce the server's response time
+    for queries returning a large amount of data (for example, JSON, BLOBs, text, ...).
+    Currently, two libraries are supported: zlib (default) and zstd (if Postgres was
+    configured with --with-zstd option).
+  </para>
+
  <sect2 id="protocol-message-concepts">
   <title>Messaging Overview</title>
 
@@ -263,6 +272,22 @@
      </varlistentry>
 
      <varlistentry>
+      <term>CompressionAck</term>
+      <listitem>
+       <para>
+         The server accepts the client's compression request.
+         Compression is requested when a client connection includes the "compression" option, which includes
+         a list of requested compression algorithms.
+         If the server accepts one of these algorithms, it acknowledges use of compression and
+         all subsequent libpq messages between the client and server will be compressed.
+         The server chooses an algorithm from the list specified by client and responds with the index of the chosen algorithm from the client-supplied list.
+         If the server does not accept any of the requested algorithms, then it replies with an index of -1
+         and it is up to the client whether to continue without compression or to report an error.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term>AuthenticationOk</term>
       <listitem>
        <para>
@@ -3398,6 +3423,59 @@ following:
 </listitem>
 </varlistentry>
 
+<varlistentry>
+<term>
+CompressionAck (B)
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('z')
+</term>
+<listitem>
+<para>
+  Acknowledge use of compression for protocol data. The client sends to the server a list of requested compression algorithms.
+  If the server supports any of these algorithms, it acknowledges use of this algorithm and all subsequent libpq messages between client and server
+  will be compressed.
+  The server selects the preferred algorithm from the list specified by client and responds with the
+  index of the chosen algorithm in this list.
+  If the server does not support any of the requested algorithms, it replies with -1
+  and it is up to the client whether to continue without compression or to report an error.
+  After receiving this message with algorithm index other than -1, both server and client switch to compressed mode
+  and exchange compressed messages.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32
+</term>
+<listitem>
+<para>
+                Length of message contents in bytes, including self.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Byte1
+</term>
+<listitem>
+<para>
+        Index of algorithm in the list of supported algorithms specified by client or -1 if none of them are supported.
+</para>
+</listitem>
+</varlistentry>
+</variablelist>
+
+</para>
+</listitem>
+</varlistentry>
+
+
 
 <varlistentry>
 <term>
@@ -5964,6 +6042,21 @@ StartupMessage (F)
 </para>
 </listitem>
 </varlistentry>
+<varlistentry>
+<term>
+                <literal>_pq_.compression</literal>
+</term>
+<listitem>
+<para>
+                        Request compression of libpq traffic. The value is a list of compression algorithms requested by the client with an optional
+                        specification of compression level: <literal>"zlib,zstd:5"</literal>.
+                        If the server does not accept compression, the backend will ignore the _pq_.compression
+                        parameter and will not send the CompressionAck message to the frontend.
+                        By default, compression is disabled. Please note that using compression together with SSL may expose extra vulnerabilities:
+                        <ulink url="https://en.wikipedia.org/wiki/CRIME";>CRIME</ulink>.
+</para>
+</listitem>
+</varlistentry>
 </variablelist>
 
                 In addition to the above, other parameters may be listed.
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 7ca1e9a..9e11599 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -196,6 +196,7 @@ with_llvm	= @with_llvm@
 with_system_tzdata = @with_system_tzdata@
 with_uuid	= @with_uuid@
 with_zlib	= @with_zlib@
+with_zstd   = @with_zstd@
 enable_rpath	= @enable_rpath@
 enable_nls	= @enable_nls@
 enable_debug	= @enable_debug@
diff --git a/src/backend/Makefile b/src/backend/Makefile
index 9706a95..f32a780 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -54,6 +54,14 @@ ifeq ($(with_systemd),yes)
 LIBS += -lsystemd
 endif
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+endif
+
 ##########################################################################
 
 all: submake-libpgport submake-catalog-headers submake-utils-headers postgres $(POSTGRES_IMP)
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 2e4aa1c..9d05ec2 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -770,6 +770,15 @@ CREATE VIEW pg_stat_activity AS
         LEFT JOIN pg_database AS D ON (S.datid = D.oid)
         LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
 
+CREATE VIEW pg_stat_network_traffic AS
+    SELECT
+	    S.pid,
+        S.rx_raw_bytes,
+        S.tx_raw_bytes,
+        S.rx_compressed_bytes,
+        S.tx_compressed_bytes
+    FROM pg_stat_get_activity(NULL) AS S;
+
 CREATE VIEW pg_stat_replication AS
     SELECT
             S.pid,
diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index d7de962..7e25757 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -71,6 +71,7 @@
 #include <signal.h>
 #include <fcntl.h>
 #include <grp.h>
+#include <pgstat.h>
 #include <unistd.h>
 #include <sys/file.h>
 #include <sys/socket.h>
@@ -88,11 +89,14 @@
 
 #include "common/ip.h"
 #include "libpq/libpq.h"
+#include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "port/pg_bswap.h"
 #include "storage/ipc.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
+#include "utils/builtins.h"
+#include "common/zpq_stream.h"
 
 /*
  * Cope with the various platform-specific ways to spell TCP keepalive socket
@@ -118,6 +122,7 @@
  */
 int			Unix_socket_permissions;
 char	   *Unix_socket_group;
+bool		libpq_compression;
 
 /* Where the Unix socket files are (list of palloc'd strings) */
 static List *sock_paths = NIL;
@@ -141,6 +146,9 @@ static char PqRecvBuffer[PQ_RECV_BUFFER_SIZE];
 static int	PqRecvPointer;		/* Next index to read a byte from PqRecvBuffer */
 static int	PqRecvLength;		/* End of data available in PqRecvBuffer */
 
+static ZpqStream * PqStream;
+
+
 /*
  * Message status
  */
@@ -183,6 +191,115 @@ const PQcommMethods *PqCommMethods = &PqCommSocketMethods;
 
 WaitEventSet *FeBeWaitSet;
 
+static ssize_t
+write_compressed(void *arg, void const *data, size_t size)
+{
+	ssize_t		rc = secure_write((Port *) arg, (void *) data, size);
+
+	if (rc > 0)
+		pgstat_report_network_traffic(0, 0, 0, rc);
+	return rc;
+}
+
+static ssize_t
+read_compressed(void *arg, void *data, size_t size)
+{
+	ssize_t		rc = secure_read((Port *) arg, data, size);
+
+	if (rc > 0)
+		pgstat_report_network_traffic(0, 0, rc, 0);
+	return rc;
+}
+
+/*
+ * Send to the client index of chosen compression algorithm
+ */
+static void
+SendCompressionACK(int algorithm)
+{
+	StringInfoData buf;
+
+	pq_beginmessage(&buf, 'z');
+	pq_sendbyte(&buf, (uint8) algorithm);
+	pq_endmessage(&buf);
+	pq_flush();
+}
+
+/* --------------------------------
+ *		pq_configure - configure connection using port settings
+ *
+ * Right now only compression is toggled in the configure.
+ * Function returns 0 in case of success, non-null in case of error
+ * --------------------------------
+ */
+int
+pq_configure(Port *port)
+{
+	char	   *client_compression_algorithms = port->compression_algorithms;
+
+	/*
+	 * If client request compression, it sends list of supported compression
+	 * algorithms separated by comma.
+	 */
+	if (client_compression_algorithms && libpq_compression)
+	{
+		int			compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+		int			impl = -1;
+		char	  **server_compression_algorithms = zpq_get_supported_algorithms();
+		int			index = -1;
+		char	   *protocol_extension = strchr(client_compression_algorithms, ';');
+
+		/* No protocol extension are currently supported */
+		if (protocol_extension)
+			*protocol_extension = '\0';
+
+		for (int i = 0; *client_compression_algorithms; i++)
+		{
+			char	   *sep = strchr(client_compression_algorithms, ',');
+			char	   *level;
+
+			if (sep != NULL)
+				*sep = '\0';
+
+			level = strchr(client_compression_algorithms, ':');
+			if (level != NULL)
+			{
+				*level = '\0';	/* compression level is ignored now */
+				if (sscanf(level + 1, "%d", &compression_level) != 1)
+					ereport(LOG,
+							(errmsg("Invalid compression level: %s", level + 1)));
+			}
+			for (impl = 0; server_compression_algorithms[impl] != NULL; impl++)
+			{
+				if (pg_strcasecmp(client_compression_algorithms, server_compression_algorithms[impl]) == 0)
+				{
+					index = i;
+					goto SendCompressionAck;
+				}
+			}
+
+			if (sep != NULL)
+				client_compression_algorithms = sep + 1;
+			else
+				break;
+		}
+SendCompressionAck:
+		free(server_compression_algorithms);
+		SendCompressionACK(index);
+
+		if (index >= 0)			/* Use compression */
+		{
+			PqStream = zpq_create(impl, compression_level, impl, write_compressed, read_compressed, MyProcPort, NULL, 0);
+			if (!PqStream)
+			{
+				ereport(LOG,
+						(errmsg("Failed to initialize compressor %s", server_compression_algorithms[impl])));
+				return -1;
+			}
+		}
+	}
+	return 0;
+}
 
 /* --------------------------------
  *		pq_init - initialize libpq at backend startup
@@ -280,6 +397,9 @@ socket_close(int code, Datum arg)
 		free(MyProcPort->gss);
 #endif							/* ENABLE_GSS || ENABLE_SSPI */
 
+		/* Release compression streams */
+		zpq_free(PqStream);
+
 		/*
 		 * Cleanly shut down SSL layer.  Nowhere else does a postmaster child
 		 * call this, so this is safe when interrupting BackendInitialize().
@@ -919,12 +1039,15 @@ socket_set_nonblocking(bool nonblocking)
 /* --------------------------------
  *		pq_recvbuf - load some bytes into the input buffer
  *
- *		returns 0 if OK, EOF if trouble
+ *      nowait parameter toggles non-blocking mode.
+ *		returns number of read bytes, EOF if trouble
  * --------------------------------
  */
 static int
-pq_recvbuf(void)
+pq_recvbuf(bool nowait)
 {
+	int			r;
+
 	if (PqRecvPointer > 0)
 	{
 		if (PqRecvLength > PqRecvPointer)
@@ -940,21 +1063,40 @@ pq_recvbuf(void)
 	}
 
 	/* Ensure that we're in blocking mode */
-	socket_set_nonblocking(false);
+	socket_set_nonblocking(nowait);
 
 	/* Can fill buffer from PqRecvLength and upwards */
 	for (;;)
 	{
-		int			r;
-
-		r = secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
-						PQ_RECV_BUFFER_SIZE - PqRecvLength);
+		/*
+		 * If streaming compression is enabled then use correspondent
+		 * compression read function.
+		 */
+		r = PqStream
+			? zpq_read(PqStream, PqRecvBuffer + PqRecvLength,
+					   PQ_RECV_BUFFER_SIZE - PqRecvLength)
+			: secure_read(MyProcPort, PqRecvBuffer + PqRecvLength,
+						  PQ_RECV_BUFFER_SIZE - PqRecvLength);
 
 		if (r < 0)
 		{
+			if (r == ZPQ_DECOMPRESS_ERROR)
+			{
+				char const *msg = zpq_decompress_error(PqStream);
+
+				if (msg == NULL)
+					msg = "end of stream";
+				ereport(COMMERROR,
+						(errcode_for_socket_access(),
+						 errmsg("failed to decompress data: %s", msg)));
+				return EOF;
+			}
 			if (errno == EINTR)
 				continue;		/* Ok if interrupted */
 
+			if (nowait && (errno == EAGAIN || errno == EWOULDBLOCK))
+				return 0;
+
 			/*
 			 * Careful: an ereport() that tries to write to the client would
 			 * cause recursion to here, leading to stack overflow and core
@@ -975,7 +1117,8 @@ pq_recvbuf(void)
 		}
 		/* r contains number of bytes read, so just incr length */
 		PqRecvLength += r;
-		return 0;
+		pgstat_report_network_traffic(r, 0, 0, 0);
+		return r;
 	}
 }
 
@@ -990,7 +1133,8 @@ pq_getbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+										 * some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer++];
@@ -1009,7 +1153,8 @@ pq_peekbyte(void)
 
 	while (PqRecvPointer >= PqRecvLength)
 	{
-		if (pq_recvbuf())		/* If nothing in buffer, then recv some */
+		if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+										 * some */
 			return EOF;			/* Failed to recv data */
 	}
 	return (unsigned char) PqRecvBuffer[PqRecvPointer];
@@ -1026,48 +1171,15 @@ pq_peekbyte(void)
 int
 pq_getbyte_if_available(unsigned char *c)
 {
-	int			r;
+	int			r = 0;
 
 	Assert(PqCommReadingMsg);
 
-	if (PqRecvPointer < PqRecvLength)
+	if (PqRecvPointer < PqRecvLength || (r = pq_recvbuf(true)) > 0)
 	{
 		*c = PqRecvBuffer[PqRecvPointer++];
 		return 1;
 	}
-
-	/* Put the socket into non-blocking mode */
-	socket_set_nonblocking(true);
-
-	r = secure_read(MyProcPort, c, 1);
-	if (r < 0)
-	{
-		/*
-		 * Ok if no data available without blocking or interrupted (though
-		 * EINTR really shouldn't happen with a non-blocking socket). Report
-		 * other errors.
-		 */
-		if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
-			r = 0;
-		else
-		{
-			/*
-			 * Careful: an ereport() that tries to write to the client would
-			 * cause recursion to here, leading to stack overflow and core
-			 * dump!  This message must go *only* to the postmaster log.
-			 */
-			ereport(COMMERROR,
-					(errcode_for_socket_access(),
-					 errmsg("could not receive data from client: %m")));
-			r = EOF;
-		}
-	}
-	else if (r == 0)
-	{
-		/* EOF detected */
-		r = EOF;
-	}
-
 	return r;
 }
 
@@ -1088,7 +1200,8 @@ pq_getbytes(char *s, size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+											 * some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1122,7 +1235,8 @@ pq_discardbytes(size_t len)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+											 * some */
 				return EOF;		/* Failed to recv data */
 		}
 		amount = PqRecvLength - PqRecvPointer;
@@ -1163,7 +1277,8 @@ pq_getstring(StringInfo s)
 	{
 		while (PqRecvPointer >= PqRecvLength)
 		{
-			if (pq_recvbuf())	/* If nothing in buffer, then recv some */
+			if (pq_recvbuf(false) == EOF)	/* If nothing in buffer, then recv
+											 * some */
 				return EOF;		/* Failed to recv data */
 		}
 
@@ -1413,13 +1528,24 @@ internal_flush(void)
 	char	   *bufptr = PqSendBuffer + PqSendStart;
 	char	   *bufend = PqSendBuffer + PqSendPointer;
 
-	while (bufptr < bufend)
+	while (bufptr < bufend || zpq_buffered_tx(PqStream) != 0)
+
+		/*
+		 * has more data to flush or unsent data in internal compression
+		 * buffer
+		 */
 	{
 		int			r;
+		size_t		processed = 0;
+		size_t		available = bufend - bufptr;
 
-		r = secure_write(MyProcPort, bufptr, bufend - bufptr);
+		r = PqStream
+			? zpq_write(PqStream, bufptr, available, &processed)
+			: secure_write(MyProcPort, bufptr, available);
+		bufptr += processed;
+		PqSendStart += processed;
 
-		if (r <= 0)
+		if (r < 0 || (r == 0 && available))
 		{
 			if (errno == EINTR)
 				continue;		/* Ok if we were interrupted */
@@ -1462,12 +1588,12 @@ internal_flush(void)
 			InterruptPending = 1;
 			return EOF;
 		}
+		pgstat_report_network_traffic(0, r, 0, 0);
 
 		last_reported_send_errno = 0;	/* reset after any successful send */
 		bufptr += r;
 		PqSendStart += r;
 	}
-
 	PqSendStart = PqSendPointer = 0;
 	return 0;
 }
@@ -1484,7 +1610,7 @@ socket_flush_if_writable(void)
 	int			res;
 
 	/* Quick exit if nothing to do */
-	if (PqSendPointer == PqSendStart)
+	if ((PqSendPointer == PqSendStart) && (zpq_buffered_tx(PqStream) == 0))
 		return 0;
 
 	/* No-op if reentrant call */
@@ -1507,7 +1633,7 @@ socket_flush_if_writable(void)
 static bool
 socket_is_send_pending(void)
 {
-	return (PqSendStart < PqSendPointer);
+	return (PqSendStart < PqSendPointer || (zpq_buffered_tx(PqStream) != 0));
 }
 
 /* --------------------------------
@@ -1985,3 +2111,16 @@ pq_settcpusertimeout(int timeout, Port *port)
 
 	return STATUS_OK;
 }
+
+PG_FUNCTION_INFO_V1(pg_compression_algorithm);
+
+Datum
+pg_compression_algorithm(PG_FUNCTION_ARGS)
+{
+	char const *algorithm_name = PqStream ? zpq_compress_algorithm_name(PqStream) : NULL;
+
+	if (algorithm_name)
+		PG_RETURN_TEXT_P(cstring_to_text(algorithm_name));
+	else
+		PG_RETURN_NULL();
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index e76e627..4714b51 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3115,6 +3115,9 @@ pgstat_bestart(void)
 	lbeentry.st_xact_start_timestamp = 0;
 	lbeentry.st_databaseid = MyDatabaseId;
 
+	lbeentry.st_tx_raw_bytes = lbeentry.st_rx_raw_bytes =
+		lbeentry.st_tx_compressed_bytes = lbeentry.st_rx_compressed_bytes = 0;
+
 	/* We have userid for client-backends, wal-sender and bgworker processes */
 	if (lbeentry.st_backendType == B_BACKEND
 		|| lbeentry.st_backendType == B_WAL_SENDER
@@ -3496,6 +3499,33 @@ pgstat_report_xact_timestamp(TimestampTz tstamp)
 	PGSTAT_END_WRITE_ACTIVITY(beentry);
 }
 
+/*
+ * Report current transaction start timestamp as the specified value.
+ * Zero means there is no active transaction.
+ */
+void
+pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes)
+{
+	volatile PgBackendStatus *beentry = MyBEEntry;
+
+	if (!pgstat_track_activities || !beentry)
+		return;
+
+	/*
+	 * Update my status entry, following the protocol of bumping
+	 * st_changecount before and after.  We use a volatile pointer here to
+	 * ensure the compiler doesn't try to get cute.
+	 */
+	PGSTAT_BEGIN_WRITE_ACTIVITY(beentry);
+
+	beentry->st_rx_raw_bytes += rx_raw_bytes;
+	beentry->st_tx_raw_bytes += tx_raw_bytes;
+	beentry->st_rx_compressed_bytes += rx_compressed_bytes;
+	beentry->st_tx_compressed_bytes += tx_compressed_bytes;
+
+	PGSTAT_END_WRITE_ACTIVITY(beentry);
+}
+
 /* ----------
  * pgstat_read_current_status() -
  *
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index b7799ed..330ddaf 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -2136,6 +2136,8 @@ retry1:
 				port->database_name = pstrdup(valptr);
 			else if (strcmp(nameptr, "user") == 0)
 				port->user_name = pstrdup(valptr);
+			else if (strcmp(nameptr, "_pq_.compression") == 0)
+				port->compression_algorithms = pstrdup(valptr);
 			else if (strcmp(nameptr, "options") == 0)
 				port->cmdline_options = pstrdup(valptr);
 			else if (strcmp(nameptr, "replication") == 0)
@@ -4443,6 +4445,14 @@ BackendInitialize(Port *port)
 	if (status != STATUS_OK)
 		proc_exit(0);
 
+	if (pq_configure(port))
+	{
+		ereport(COMMERROR,
+				(errcode_for_socket_access(),
+				 errmsg("failed to send compression message: %m")));
+		proc_exit(0);
+	}
+
 	/*
 	 * Now that we have the user and database name, we can set the process
 	 * title for ps.  It's good to do this as early as possible in startup.
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index a210fc9..b93ac11 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -567,7 +567,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
 Datum
 pg_stat_get_activity(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_ACTIVITY_COLS	30
+#define PG_STAT_GET_ACTIVITY_COLS	34
 	int			num_backends = pgstat_fetch_stat_numbackends();
 	int			curr_backend;
 	int			pid = PG_ARGISNULL(0) ? -1 : PG_GETARG_INT32(0);
@@ -913,6 +913,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 				values[28] = BoolGetDatum(false);	/* GSS Encryption not in
 													 * use */
 			}
+			values[30] = beentry->st_rx_raw_bytes;
+			values[31] = beentry->st_tx_raw_bytes;
+			values[32] = beentry->st_rx_compressed_bytes;
+			values[33] = beentry->st_tx_compressed_bytes;
 		}
 		else
 		{
@@ -941,6 +945,10 @@ pg_stat_get_activity(PG_FUNCTION_ARGS)
 			nulls[27] = true;
 			nulls[28] = true;
 			nulls[29] = true;
+			nulls[30] = true;
+			nulls[31] = true;
+			nulls[32] = true;
+			nulls[33] = true;
 		}
 
 		tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -1140,6 +1148,46 @@ pg_stat_get_backend_start(PG_FUNCTION_ARGS)
 	PG_RETURN_TIMESTAMPTZ(result);
 }
 
+Datum
+pg_stat_get_network_traffic(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_NETWORK_TRAFFIC_COLS	4
+	TupleDesc	tupdesc;
+	Datum		values[PG_STAT_NETWORK_TRAFFIC_COLS];
+	bool		nulls[PG_STAT_NETWORK_TRAFFIC_COLS];
+	int32		beid = PG_GETARG_INT32(0);
+	PgBackendStatus *beentry;
+
+	if ((beentry = pgstat_fetch_stat_beentry(beid)) == NULL)
+		PG_RETURN_NULL();
+	else if (!HAS_PGSTAT_PERMISSIONS(beentry->st_userid))
+		PG_RETURN_NULL();
+
+	/* Initialise values and NULL flags arrays */
+	MemSet(values, 0, sizeof(values));
+	MemSet(nulls, 0, sizeof(nulls));
+
+	/* Initialise attributes information in the tuple descriptor */
+	tupdesc = CreateTemplateTupleDesc(PG_STAT_NETWORK_TRAFFIC_COLS);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "rx_raw_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "tx_raw_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "rx_compressed_bytes",
+					   INT8OID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 4, "tx_compressed_bytes",
+					   INT8OID, -1, 0);
+	BlessTupleDesc(tupdesc);
+
+	/* Fill values and NULLs */
+	values[0] = beentry->st_rx_raw_bytes;
+	values[1] = beentry->st_tx_raw_bytes;
+	values[2] = beentry->st_rx_compressed_bytes;
+	values[3] = beentry->st_tx_compressed_bytes;
+
+	/* Returns the record as Datum */
+	PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
+}
 
 Datum
 pg_stat_get_backend_client_addr(PG_FUNCTION_ARGS)
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index bb34630..83a3c1f 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -1290,6 +1290,16 @@ static struct config_bool ConfigureNamesBool[] =
 	},
 
 	{
+		{"libpq_compression", PGC_SIGHUP, CLIENT_CONN_OTHER,
+			gettext_noop("Compress client-server traffic."),
+			NULL
+		},
+		&libpq_compression,
+		true,
+		NULL, NULL, NULL
+	},
+
+	{
 		{"log_checkpoints", PGC_SIGHUP, LOGGING_WHAT,
 			gettext_noop("Logs each checkpoint."),
 			NULL
diff --git a/src/bin/psql/command.c b/src/bin/psql/command.c
index c7a83d5..b916eb4 100644
--- a/src/bin/psql/command.c
+++ b/src/bin/psql/command.c
@@ -163,6 +163,7 @@ static void print_with_linenumbers(FILE *output, char *lines,
 								   const char *header_keyword);
 static void minimal_error_message(PGresult *res);
 
+static void printCompressionInfo(void);
 static void printSSLInfo(void);
 static void printGSSInfo(void);
 static bool printPsetInfo(const char *param, printQueryOpt *popt);
@@ -626,6 +627,7 @@ exec_command_conninfo(PsqlScanState scan_state, bool active_branch)
 					printf(_("You are connected to database \"%s\" as user \"%s\" on host \"%s\" at port \"%s\".\n"),
 						   db, PQuser(pset.db), host, PQport(pset.db));
 			}
+			printCompressionInfo();
 			printSSLInfo();
 			printGSSInfo();
 		}
@@ -3496,6 +3498,27 @@ connection_warnings(bool in_startup)
 	}
 }
 
+/*
+ * printCompressionInfo
+ *
+ * Print information about used compressor/decompressor
+ */
+static void
+printCompressionInfo(void)
+{
+	char *compressor = PQcompressor(pset.db);
+	char *decompressor = PQdecompressor(pset.db);
+
+	if (compressor != NULL)
+	{
+		printf(_("Compressor %s\n"), compressor);
+	}
+
+	if (decompressor != NULL)
+	{
+		printf(_("Decompressor %s\n"), decompressor);
+	}
+}
 
 /*
  * printSSLInfo
diff --git a/src/common/Makefile b/src/common/Makefile
index 25c55bd..bc6cba8 100644
--- a/src/common/Makefile
+++ b/src/common/Makefile
@@ -77,7 +77,8 @@ OBJS_COMMON = \
 	unicode_norm.o \
 	username.o \
 	wait_error.o \
-	wchar.o
+	wchar.o \
+	zpq_stream.o
 
 ifeq ($(with_openssl),yes)
 OBJS_COMMON += \
diff --git a/src/common/zpq_stream.c b/src/common/zpq_stream.c
new file mode 100644
index 0000000..0451d2d
--- /dev/null
+++ b/src/common/zpq_stream.c
@@ -0,0 +1,684 @@
+#include "postgres_fe.h"
+#include "common/zpq_stream.h"
+#include "c.h"
+#include "pg_config.h"
+
+/*
+ * Functions implementing streaming compression algorithm
+ */
+typedef struct
+{
+	/*
+	 * Name of compression algorithm.
+	 */
+	char const *(*name) (void);
+
+	/*
+	 * Create new compression stream.
+	 * level: compression level
+	 */
+	void	   *(*create_compressor) (int level);
+
+	/*
+	 * Create new decompression stream.
+	 */
+	void	   *(*create_decompressor) ();
+
+	/*
+	 * Decompress up to "src_size" compressed bytes from *src and write up to
+	 * "dst_size" raw (decompressed) bytes to *dst. Number of decompressed
+	 * bytes written to *dst is stored in *dst_processed. Number of compressed
+	 * bytes read from *src is stored in *src_processed.
+	 *
+	 * Return codes: ZPQ_OK if no errors were encountered during decompression
+	 * attempt. This return code does not guarantee that *src_processed > 0 or
+	 * *dst_processed > 0.
+	 *
+	 * ZPQ_DATA_PENDING means that there might be some data left within
+	 * decompressor internal buffers.
+	 *
+	 * ZPQ_STREAM_END if encountered end of compressed data stream.
+	 *
+	 * ZPQ_DECOMPRESS_ERROR if encountered an error during decompression
+	 * attempt.
+	 */
+	ssize_t		(*decompress) (void *ds, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed);
+
+	/*
+	 * Compress up to "src_size" raw (non-compressed) bytes from *src and
+	 * write up to "dst_size" compressed bytes to *dst. Number of compressed
+	 * bytes written to *dst is stored in *dst_processed. Number of
+	 * non-compressed bytes read from *src is stored in *src_processed.
+	 *
+	 * Return codes: ZPQ_OK if no errors were encountered during compression
+	 * attempt. This return code does not guarantee that *src_processed > 0 or
+	 * *dst_processed > 0.
+	 *
+	 * ZPQ_DATA_PENDING means that there might be some data left within
+	 * compressor internal buffers.
+	 *
+	 * ZPQ_COMPRESS_ERROR if encountered an error during compression attempt.
+	 */
+	ssize_t		(*compress) (void *cs, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed);
+
+	/*
+	 * Free compression stream created by create_compressor function.
+	 */
+	void		(*free_compressor) (void *cs);
+
+	/*
+	 * Free decompression stream created by create_decompressor function.
+	 */
+	void		(*free_decompressor) (void *ds);
+
+	/*
+	 * Get compressor error message.
+	 */
+	char const *(*compress_error) (void *cs);
+
+	/*
+	 * Get decompressor error message.
+	 */
+	char const *(*decompress_error) (void *ds);
+}			ZpqAlgorithm;
+
+
+#define ZPQ_BUFFER_SIZE       8192	/* We have to flush stream after each
+									 * protocol command and command is mostly
+									 * limited by record length, which in turn
+									 * is usually less than page size (except
+									 * TOAST)
+									 */
+
+struct ZpqStream
+{
+	ZpqAlgorithm const *c_algorithm;
+	void	   *c_stream;
+
+	ZpqAlgorithm const *d_algorithm;
+	void	   *d_stream;
+
+	char		tx_buf[ZPQ_BUFFER_SIZE];
+	size_t		tx_pos;
+	size_t		tx_size;
+
+	char		rx_buf[ZPQ_BUFFER_SIZE];
+	size_t		rx_pos;
+	size_t		rx_size;
+
+	zpq_tx_func tx_func;
+	zpq_rx_func rx_func;
+	void	   *arg;
+
+	size_t		tx_total;
+	size_t		tx_total_raw;
+	size_t		rx_total;
+	size_t		rx_total_raw;
+
+	bool		rx_not_flushed;
+	bool		tx_not_flushed;
+};
+
+#if HAVE_LIBZSTD
+
+#include <stdlib.h>
+#include <zstd.h>
+
+/*
+ * Maximum allowed back-reference distance, expressed as power of 2.
+ * This setting controls max compressor/decompressor window size.
+ * More details https://github.com/facebook/zstd/blob/v1.4.7/lib/zstd.h#L536
+ */
+#define ZSTD_WINDOWLOG_LIMIT 23 /* set max window size to 8MB */
+
+typedef struct ZPQ_ZSTD_CStream
+{
+	ZSTD_CStream *stream;
+	char const *error;			/* error message */
+}			ZPQ_ZSTD_CStream;
+
+typedef struct ZPQ_ZSTD_DStream
+{
+	ZSTD_DStream *stream;
+	char const *error;			/* error message */
+}			ZPQ_ZSTD_DStream;
+
+static void *
+zstd_create_compressor(int level)
+{
+	ZPQ_ZSTD_CStream *c_stream = (ZPQ_ZSTD_CStream *) malloc(sizeof(ZPQ_ZSTD_CStream));
+
+	c_stream->stream = ZSTD_createCStream();
+	ZSTD_initCStream(c_stream->stream, level);
+#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3
+	ZSTD_CCtx_setParameter(c_stream->stream, ZSTD_c_windowLog, ZSTD_WINDOWLOG_LIMIT);
+#endif
+	c_stream->error = NULL;
+	return c_stream;
+}
+
+static void *
+zstd_create_decompressor()
+{
+	ZPQ_ZSTD_DStream *d_stream = (ZPQ_ZSTD_DStream *) malloc(sizeof(ZPQ_ZSTD_DStream));
+
+	d_stream->stream = ZSTD_createDStream();
+	ZSTD_initDStream(d_stream->stream);
+#if ZSTD_VERSION_MAJOR > 1 || ZSTD_VERSION_MINOR > 3
+	ZSTD_DCtx_setParameter(d_stream->stream, ZSTD_d_windowLogMax, ZSTD_WINDOWLOG_LIMIT);
+#endif
+	d_stream->error = NULL;
+	return d_stream;
+}
+
+static ssize_t
+zstd_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream;
+	ZSTD_inBuffer in;
+	ZSTD_outBuffer out;
+	size_t		rc;
+
+	in.src = src;
+	in.pos = 0;
+	in.size = src_size;
+
+	out.dst = dst;
+	out.pos = 0;
+	out.size = dst_size;
+
+	rc = ZSTD_decompressStream(ds->stream, &out, &in);
+
+	*src_processed = in.pos;
+	*dst_processed = out.pos;
+	if (ZSTD_isError(rc))
+	{
+		ds->error = ZSTD_getErrorName(rc);
+		return ZPQ_DECOMPRESS_ERROR;
+	}
+
+	if (out.pos == out.size)
+	{
+		/*
+		 * if `output.pos == output.size`, there might be some data left
+		 * within internal buffers
+		 */
+		return ZPQ_DATA_PENDING;
+	}
+	return ZPQ_OK;
+}
+
+static ssize_t
+zstd_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream;
+	ZSTD_inBuffer in;
+	ZSTD_outBuffer out;
+
+	in.src = src;
+	in.pos = 0;
+	in.size = src_size;
+
+	out.dst = dst;
+	out.pos = 0;
+	out.size = dst_size;
+
+	if (in.pos < src_size)		/* Has something to compress in input buffer */
+	{
+		size_t		rc = ZSTD_compressStream(cs->stream, &out, &in);
+
+		*dst_processed = out.pos;
+		*src_processed = in.pos;
+		if (ZSTD_isError(rc))
+		{
+			cs->error = ZSTD_getErrorName(rc);
+			return ZPQ_COMPRESS_ERROR;
+		}
+	}
+
+	if (in.pos == src_size)		/* All data is compressed: flush internal zstd
+								 * buffer */
+	{
+		size_t		tx_not_flushed = ZSTD_flushStream(cs->stream, &out);
+
+		*dst_processed = out.pos;
+		if (tx_not_flushed > 0)
+		{
+			return ZPQ_DATA_PENDING;
+		}
+	}
+
+	return ZPQ_OK;
+}
+
+static void
+zstd_free_compressor(void *c_stream)
+{
+	ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream;
+
+	if (cs != NULL)
+	{
+		ZSTD_freeCStream(cs->stream);
+		free(cs);
+	}
+}
+
+static void
+zstd_free_decompressor(void *d_stream)
+{
+	ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream;
+
+	if (ds != NULL)
+	{
+		ZSTD_freeDStream(ds->stream);
+		free(ds);
+	}
+}
+
+static char const *
+zstd_compress_error(void *c_stream)
+{
+	ZPQ_ZSTD_CStream *cs = (ZPQ_ZSTD_CStream *) c_stream;
+
+	return cs->error;
+}
+
+static char const *
+zstd_decompress_error(void *d_stream)
+{
+	ZPQ_ZSTD_DStream *ds = (ZPQ_ZSTD_DStream *) d_stream;
+
+	return ds->error;
+}
+
+static char const *
+zstd_name(void)
+{
+	return "zstd";
+}
+
+#endif
+
+#if HAVE_LIBZ
+
+#include <stdlib.h>
+#include <zlib.h>
+
+
+static void *
+zlib_create_compressor(int level)
+{
+	int			rc;
+	z_stream   *c_stream = (z_stream *) malloc(sizeof(z_stream));
+
+	memset(c_stream, 0, sizeof(*c_stream));
+	rc = deflateInit(c_stream, level);
+	if (rc != Z_OK)
+	{
+		free(c_stream);
+		return NULL;
+	}
+	return c_stream;
+}
+
+static void *
+zlib_create_decompressor()
+{
+	int			rc;
+	z_stream   *d_stream = (z_stream *) malloc(sizeof(z_stream));
+
+	memset(d_stream, 0, sizeof(*d_stream));
+	rc = inflateInit(d_stream);
+	if (rc != Z_OK)
+	{
+		free(d_stream);
+		return NULL;
+	}
+	return d_stream;
+}
+
+static ssize_t
+zlib_decompress(void *d_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	z_stream   *ds = (z_stream *) d_stream;
+	int			rc;
+
+	ds->next_in = (Bytef *) src;
+	ds->avail_in = src_size;
+	ds->next_out = (Bytef *) dst;
+	ds->avail_out = dst_size;
+
+	rc = inflate(ds, Z_SYNC_FLUSH);
+	*src_processed = src_size - ds->avail_in;
+	*dst_processed = dst_size - ds->avail_out;
+
+	if (rc == Z_STREAM_END)
+	{
+		return ZPQ_STREAM_END;
+	}
+	if (rc != Z_OK && rc != Z_BUF_ERROR)
+	{
+		return ZPQ_DECOMPRESS_ERROR;
+	}
+
+	return ZPQ_OK;
+}
+
+static ssize_t
+zlib_compress(void *c_stream, void const *src, size_t src_size, size_t *src_processed, void *dst, size_t dst_size, size_t *dst_processed)
+{
+	z_stream   *cs = (z_stream *) c_stream;
+	int			rc;
+	unsigned	deflate_pending = 0;
+
+
+	cs->next_out = (Bytef *) dst;
+	cs->avail_out = dst_size;
+	cs->next_in = (Bytef *) src;
+	cs->avail_in = src_size;
+
+	rc = deflate(cs, Z_SYNC_FLUSH);
+	Assert(rc == Z_OK);
+	*dst_processed = dst_size - cs->avail_out;
+	*src_processed = src_size - cs->avail_in;
+
+	deflatePending(cs, &deflate_pending, Z_NULL);	/* check if any data left
+													 * in deflate buffer */
+	if (deflate_pending > 0)
+	{
+		return ZPQ_DATA_PENDING;
+	}
+	return ZPQ_OK;
+}
+
+static void
+zlib_free_compressor(void *c_stream)
+{
+	z_stream   *cs = (z_stream *) c_stream;
+
+	if (cs != NULL)
+	{
+		deflateEnd(cs);
+		free(cs);
+	}
+}
+
+static void
+zlib_free_decompressor(void *d_stream)
+{
+	z_stream   *ds = (z_stream *) d_stream;
+
+	if (ds != NULL)
+	{
+		inflateEnd(ds);
+		free(ds);
+	}
+}
+
+static char const *
+zlib_error(void *stream)
+{
+	z_stream   *zs = (z_stream *) stream;
+
+	return zs->msg;
+}
+
+static char const *
+zlib_name(void)
+{
+	return "zlib";
+}
+
+#endif
+
+static char const *
+no_compression_name(void)
+{
+	return NULL;
+}
+
+/*
+ * Array with all supported compression algorithms.
+ */
+static ZpqAlgorithm const zpq_algorithms[] =
+{
+#if HAVE_LIBZSTD
+	{zstd_name, zstd_create_compressor, zstd_create_decompressor, zstd_decompress, zstd_compress, zstd_free_compressor, zstd_free_decompressor, zstd_compress_error, zstd_decompress_error},
+#endif
+#if HAVE_LIBZ
+	{zlib_name, zlib_create_compressor, zlib_create_decompressor, zlib_decompress, zlib_compress, zlib_free_compressor, zlib_free_decompressor, zlib_error, zlib_error},
+#endif
+	{no_compression_name}
+};
+
+/*
+ * Index of used compression algorithm in zpq_algorithms array.
+ */
+ZpqStream *
+zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size)
+{
+	ZpqStream  *zs = (ZpqStream *) malloc(sizeof(ZpqStream));
+
+	zs->tx_pos = 0;
+	zs->tx_size = 0;
+	zs->rx_pos = 0;
+	zs->rx_size = 0;
+	zs->tx_func = tx_func;
+	zs->rx_func = rx_func;
+	zs->arg = arg;
+	zs->tx_total = 0;
+	zs->tx_total_raw = 0;
+	zs->rx_total = 0;
+	zs->rx_total_raw = 0;
+	zs->tx_not_flushed = false;
+	zs->rx_not_flushed = false;
+
+	zs->rx_size = rx_data_size;
+	Assert(rx_data_size < ZPQ_BUFFER_SIZE);
+	memcpy(zs->rx_buf, rx_data, rx_data_size);
+
+	zs->c_algorithm = &zpq_algorithms[c_alg_impl];
+	zs->c_stream = zpq_algorithms[c_alg_impl].create_compressor(c_level);
+	if (zs->c_stream == NULL)
+	{
+		free(zs);
+		return NULL;
+	}
+	zs->d_algorithm = &zpq_algorithms[d_alg_impl];
+	zs->d_stream = zpq_algorithms[d_alg_impl].create_decompressor();
+	if (zs->d_stream == NULL)
+	{
+		free(zs);
+		return NULL;
+	}
+
+	return zs;
+}
+
+ssize_t
+zpq_read(ZpqStream * zs, void *buf, size_t size)
+{
+	size_t		buf_pos = 0;
+	size_t		rx_processed;
+	size_t		buf_processed;
+	ssize_t		rc;
+
+	while (buf_pos == 0)
+	{							/* Read until some data fetched */
+		if (zs->rx_pos == zs->rx_size)
+		{
+			zs->rx_pos = zs->rx_size = 0;	/* Reset rx buffer */
+		}
+
+		if (zs->rx_pos == zs->rx_size && !zs->rx_not_flushed)
+		{
+			ssize_t		rc = zs->rx_func(zs->arg, (char *) zs->rx_buf + zs->rx_size, ZPQ_BUFFER_SIZE - zs->rx_size);
+
+			if (rc > 0)			/* read fetches some data */
+			{
+				zs->rx_size += rc;
+				zs->rx_total += rc;
+			}
+			else				/* read failed */
+			{
+				return rc;
+			}
+		}
+
+		Assert(zs->rx_pos <= zs->rx_size);
+		rx_processed = 0;
+		buf_processed = 0;
+		rc = zs->d_algorithm->decompress(zs->d_stream,
+										 (char *) zs->rx_buf + zs->rx_pos, zs->rx_size - zs->rx_pos, &rx_processed,
+										 buf, size, &buf_processed);
+
+		zs->rx_pos += rx_processed;
+		zs->rx_total_raw += rx_processed;
+		buf_pos += buf_processed;
+		zs->rx_not_flushed = false;
+		if (rc == ZPQ_STREAM_END)
+		{
+			break;
+		}
+		if (rc == ZPQ_DATA_PENDING)
+		{
+			zs->rx_not_flushed = true;
+			continue;
+		}
+		if (rc != ZPQ_OK)
+		{
+			return ZPQ_DECOMPRESS_ERROR;
+		}
+	}
+	return buf_pos;
+}
+
+ssize_t
+zpq_write(ZpqStream * zs, void const *buf, size_t size, size_t *processed)
+{
+	size_t		buf_pos = 0;
+
+	do
+	{
+		if (zs->tx_pos == zs->tx_size)	/* Have nothing to send */
+		{
+			size_t		tx_processed = 0;
+			size_t		buf_processed = 0;
+			ssize_t		rc;
+
+			zs->tx_pos = zs->tx_size = 0;	/* Reset pointer to the beginning of buffer */
+
+			rc = zs->c_algorithm->compress(zs->c_stream,
+										   (char *) buf + buf_pos, size - buf_pos, &buf_processed,
+										   (char *) zs->tx_buf + zs->tx_size, ZPQ_BUFFER_SIZE - zs->tx_size, &tx_processed);
+
+			zs->tx_size += tx_processed;
+			buf_pos += buf_processed;
+			zs->tx_total_raw += buf_processed;
+			zs->tx_not_flushed = false;
+
+			if (rc == ZPQ_DATA_PENDING)
+			{
+				zs->tx_not_flushed = true;
+				continue;
+			}
+			if (rc != ZPQ_OK)
+			{
+				*processed = buf_pos;
+				return ZPQ_COMPRESS_ERROR;
+			}
+		}
+		while (zs->tx_pos < zs->tx_size)
+		{
+			ssize_t		rc = zs->tx_func(zs->arg, (char *) zs->tx_buf + zs->tx_pos, zs->tx_size - zs->tx_pos);
+
+			if (rc > 0)
+			{
+				zs->tx_pos += rc;
+				zs->tx_total += rc;
+			}
+			else
+			{
+				*processed = buf_pos;
+				return rc;
+			}
+		}
+
+		/*
+		 * repeat sending while there is some data in input or internal
+		 * compression buffer
+		 */
+	} while (buf_pos < size || zs->tx_not_flushed);
+
+	return buf_pos;
+}
+
+void
+zpq_free(ZpqStream * zs)
+{
+	if (zs)
+	{
+		if (zs->c_stream)
+		{
+			zs->c_algorithm->free_compressor(zs->c_stream);
+		}
+		if (zs->d_stream)
+		{
+			zs->d_algorithm->free_decompressor(zs->d_stream);
+		}
+		free(zs);
+	}
+}
+
+char const *
+zpq_compress_error(ZpqStream * zs)
+{
+	return zs->c_algorithm->compress_error(zs->c_stream);
+}
+
+char const *
+zpq_decompress_error(ZpqStream * zs)
+{
+	return zs->d_algorithm->decompress_error(zs->d_stream);
+}
+
+size_t
+zpq_buffered_rx(ZpqStream * zs)
+{
+	return zs ? zs->rx_not_flushed || (zs->rx_size - zs->rx_pos) : 0;
+}
+
+size_t
+zpq_buffered_tx(ZpqStream * zs)
+{
+	return zs ? zs->tx_not_flushed || (zs->tx_size - zs->tx_pos) : 0;
+}
+
+/*
+ * Get list of the supported algorithms.
+ */
+char	  **
+zpq_get_supported_algorithms(void)
+{
+	size_t		n_algorithms = sizeof(zpq_algorithms) / sizeof(*zpq_algorithms);
+	char	  **algorithm_names = malloc(n_algorithms * sizeof(char *));
+
+	for (size_t i = 0; i < n_algorithms; i++)
+	{
+		algorithm_names[i] = (char *) zpq_algorithms[i].name();
+	}
+
+	return algorithm_names;
+}
+
+char const *
+zpq_compress_algorithm_name(ZpqStream * zs)
+{
+	return zs ? zs->c_algorithm->name() : NULL;
+}
+
+char const *
+zpq_decompress_algorithm_name(ZpqStream * zs)
+{
+	return zs ? zs->d_algorithm->name() : NULL;
+}
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 33dacfd..0bc4d0d 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -5239,9 +5239,9 @@
   proname => 'pg_stat_get_activity', prorows => '100', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
   prorettype => 'record', proargtypes => 'int4',
-  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid}',
+  proallargtypes => '{int4,oid,int4,oid,text,text,text,text,text,timestamptz,timestamptz,timestamptz,timestamptz,inet,text,int4,xid,xid,text,bool,text,text,int4,bool,text,numeric,text,bool,text,bool,int4,int8,int8,int8,int8}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{pid,datid,pid,usesysid,application_name,state,query,wait_event_type,wait_event,xact_start,query_start,backend_start,state_change,client_addr,client_hostname,client_port,backend_xid,backend_xmin,backend_type,ssl,sslversion,sslcipher,sslbits,sslcompression,ssl_client_dn,ssl_client_serial,ssl_issuer_dn,gss_auth,gss_princ,gss_enc,leader_pid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}',
   prosrc => 'pg_stat_get_activity' },
 { oid => '3318',
   descr => 'statistics: information about progress of backends running maintenance command',
@@ -5507,6 +5507,14 @@
   proargnames => '{wal_buffers_full,stats_reset}',
   prosrc => 'pg_stat_get_wal' },
 
+{ oid => '1137', descr => 'statistics: information about network traffic',
+  proname => 'pg_stat_get_network_traffic', proisstrict => 'f', provolatile => 's',
+  proparallel => 'r', prorettype => 'record', proargtypes => 'int4',
+  proallargtypes => '{int4,int8,int8,int8,int8}',
+  proargmodes => '{i,o,o,o,o}',
+  proargnames => '{_beid,rx_raw_bytes,tx_raw_bytes,rx_compressed_bytes,tx_compressed_bytes}',
+  prosrc => 'pg_stat_get_network_traffic' },
+
 { oid => '2306', descr => 'statistics: information about SLRU caches',
   proname => 'pg_stat_get_slru', prorows => '100', proisstrict => 'f',
   proretset => 't', provolatile => 's', proparallel => 'r',
@@ -5638,6 +5646,10 @@
   proname => 'pg_tablespace_location', provolatile => 's', prorettype => 'text',
   proargtypes => 'oid', prosrc => 'pg_tablespace_location' },
 
+{ oid => '9257', descr => 'connection compression algorithm',
+  proname => 'pg_compression_algorithm', provolatile => 's', prorettype => 'text',
+  proargtypes => '', prosrc => 'pg_compression_algorithm' },
+
 { oid => '1946',
   descr => 'convert bytea value into some ascii-only text string',
   proname => 'encode', prorettype => 'text', proargtypes => 'bytea text',
diff --git a/src/include/common/zpq_stream.h b/src/include/common/zpq_stream.h
new file mode 100644
index 0000000..e48b5ac
--- /dev/null
+++ b/src/include/common/zpq_stream.h
@@ -0,0 +1,94 @@
+/*
+ * zpq_stream.h
+ *     Streaming compression for libpq
+ */
+
+#ifndef ZPQ_STREAM_H
+#define ZPQ_STREAM_H
+
+#include <stdlib.h>
+
+#define ZPQ_OK (0)
+#define ZPQ_IO_ERROR (-1)
+#define ZPQ_DECOMPRESS_ERROR (-2)
+#define ZPQ_COMPRESS_ERROR (-3)
+#define ZPQ_STREAM_END (-4)
+#define ZPQ_DATA_PENDING (-5)
+
+#define ZPQ_DEFAULT_COMPRESSION_LEVEL (1)
+
+struct ZpqStream;
+typedef struct ZpqStream ZpqStream;
+
+typedef ssize_t (*zpq_tx_func) (void *arg, void const *data, size_t size);
+typedef ssize_t (*zpq_rx_func) (void *arg, void *data, size_t size);
+
+/*
+ * Create compression stream with rx/tx function for reading/sending compressed data.
+ * c_alg_impl: index of chosen compression algorithm
+ * c_level: compression c_level
+ * d_alg_impl: index of chosen decompression algorithm
+ * tx_func: function for writing compressed data in underlying stream
+ * rx_func: function for reading compressed data from underlying stream
+ * arg: context passed to the function
+ * rx_data: received data (compressed data already fetched from input stream)
+ * rx_data_size: size of data fetched from input stream
+ */
+extern ZpqStream  *zpq_create(int c_alg_impl, int c_level, int d_alg_impl, zpq_tx_func tx_func, zpq_rx_func rx_func, void *arg, char *rx_data, size_t rx_data_size);
+
+/*
+ * Read up to "size" raw (decompressed) bytes.
+ * Returns number of decompressed bytes or error code.
+ * Error code is either ZPQ_DECOMPRESS_ERROR or error code returned by the rx function.
+ */
+extern ssize_t		zpq_read(ZpqStream * zs, void *buf, size_t size);
+
+/*
+ * Write up to "size" raw (decompressed) bytes.
+ * Returns number of written raw bytes or error code.
+ * Error code is either ZPQ_COMPRESS_ERROR or error code returned by the tx function.
+ * In the last case number of bytes written is stored in *processed.
+ */
+extern ssize_t		zpq_write(ZpqStream * zs, void const *buf, size_t size, size_t *processed);
+
+/*
+ * Get decompressor error message.
+ */
+extern char const *zpq_decompress_error(ZpqStream * zs);
+
+/*
+ * Get compressor error message.
+ */
+extern char const *zpq_compress_error(ZpqStream * zs);
+
+/*
+ * Return an estimated amount of data in internal rx decompression buffer.
+ */
+extern size_t		zpq_buffered_rx(ZpqStream * zs);
+
+/*
+ * Return an estimated amount of data in internal tx compression buffer.
+ */
+extern size_t		zpq_buffered_tx(ZpqStream * zs);
+
+/*
+ * Free stream created by zpq_create function.
+ */
+extern void		zpq_free(ZpqStream * zs);
+
+/*
+ * Get the name of chosen compression algorithm.
+ */
+extern char const *zpq_compress_algorithm_name(ZpqStream * zs);
+
+/*
+ * Get the name of chosen decompression algorithm.
+ */
+extern char const *zpq_decompress_algorithm_name(ZpqStream * zs);
+
+/*
+  Returns zero terminated array with compression algorithms names
+*/
+extern char	  **zpq_get_supported_algorithms(void);
+
+#endif
diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h
index 0a23281..a24bbca 100644
--- a/src/include/libpq/libpq-be.h
+++ b/src/include/libpq/libpq-be.h
@@ -170,6 +170,9 @@ typedef struct Port
 	int			keepalives_count;
 	int			tcp_user_timeout;
 
+	char	   *compression_algorithms; /* Compression algorithms supported by
+										 * client */
+
 	/*
 	 * GSSAPI structures.
 	 */
diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h
index b115247..de24a3f 100644
--- a/src/include/libpq/libpq.h
+++ b/src/include/libpq/libpq.h
@@ -63,6 +63,7 @@ extern void StreamClose(pgsocket sock);
 extern void TouchSocketFiles(void);
 extern void RemoveSocketFiles(void);
 extern void pq_init(void);
+extern int	pq_configure(Port *port);
 extern int	pq_getbytes(char *s, size_t len);
 extern int	pq_getstring(StringInfo s);
 extern void pq_startmsgread(void);
diff --git a/src/include/libpq/pqcomm.h b/src/include/libpq/pqcomm.h
index 781d86c..c0d15f1 100644
--- a/src/include/libpq/pqcomm.h
+++ b/src/include/libpq/pqcomm.h
@@ -150,6 +150,7 @@ typedef struct StartupPacket
 } StartupPacket;
 
 extern bool Db_user_namespace;
+extern bool libpq_compression;
 
 /*
  * In protocol 3.0 and later, the startup packet length is not fixed, but
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index de8f838..b336eeb 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -349,6 +349,9 @@
 /* Define to 1 if you have the `link' function. */
 #undef HAVE_LINK
 
+/* Define to 1 if you have the `zstd' library (-lzstd). */
+#undef HAVE_LIBZSTD
+
 /* Define to 1 if the system has the type `locale_t'. */
 #undef HAVE_LOCALE_T
 
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 257e515..9b86ee0 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -1190,6 +1190,12 @@ typedef struct PgBackendStatus
 	/* application name; MUST be null-terminated */
 	char	   *st_appname;
 
+	/* client-server traffic information */
+	uint64		st_rx_raw_bytes;
+	uint64		st_tx_raw_bytes;
+	uint64		st_rx_compressed_bytes;
+	uint64		st_tx_compressed_bytes;
+
 	/*
 	 * Current command string; MUST be null-terminated. Note that this string
 	 * possibly is truncated in the middle of a multi-byte character. As
@@ -1403,6 +1409,7 @@ extern void pgstat_report_activity(BackendState state, const char *cmd_str);
 extern void pgstat_report_tempfile(size_t filesize);
 extern void pgstat_report_appname(const char *appname);
 extern void pgstat_report_xact_timestamp(TimestampTz tstamp);
+extern void pgstat_report_network_traffic(uint64 rx_raw_bytes, uint64 tx_raw_bytes, uint64 rx_compressed_bytes, uint64 tx_compressed_bytes);
 extern const char *pgstat_get_wait_event(uint32 wait_event_info);
 extern const char *pgstat_get_wait_event_type(uint32 wait_event_info);
 extern const char *pgstat_get_backend_current_activity(int pid, bool checkUser);
diff --git a/src/interfaces/libpq/Makefile b/src/interfaces/libpq/Makefile
index 4ac5f4b..be8cb34 100644
--- a/src/interfaces/libpq/Makefile
+++ b/src/interfaces/libpq/Makefile
@@ -29,6 +29,20 @@ endif
 # The MSVC build system scrapes OBJS from this file.  If you change any of
 # the conditional additions of files to OBJS, update Mkvcbuild.pm to match.
 
+ifeq ($(with_zstd),yes)
+LIBS += -lzstd
+SHLIB_LINK += -lzstd
+endif
+
+ifeq ($(with_zlib),yes)
+LIBS += -lz
+SHLIB_LINK += -lz
+endif
+
+# We can't use Makefile variables here because the MSVC build system scrapes
+# OBJS from this file.
+
+
 OBJS = \
 	$(WIN32RES) \
 	fe-auth-scram.o \
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index bbc1f90..294d8a7 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -179,3 +179,5 @@ PQgetgssctx               176
 PQsetSSLKeyPassHook_OpenSSL         177
 PQgetSSLKeyPassHook_OpenSSL         178
 PQdefaultSSLKeyPassHook_OpenSSL     179
+PQcompressor              180
+PQdecompressor            181
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index e7781d0..50ba0d9 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -24,6 +24,7 @@
 #include "common/ip.h"
 #include "common/link-canary.h"
 #include "common/scram-common.h"
+#include "common/zpq_stream.h"
 #include "common/string.h"
 #include "fe-auth.h"
 #include "libpq-fe.h"
@@ -350,6 +351,10 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 		"Replication", "D", 5,
 	offsetof(struct pg_conn, replication)},
 
+	{"compression", "PGCOMPRESSION", NULL, NULL,
+		"Libpq-compression", "", 16,
+	offsetof(struct pg_conn, compression)},
+
 	{"target_session_attrs", "PGTARGETSESSIONATTRS",
 		DefaultTargetSessionAttrs, NULL,
 		"Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
@@ -458,6 +463,10 @@ pgthreadlock_t pg_g_threadlock = default_threadlock;
 void
 pqDropConnection(PGconn *conn, bool flushInput)
 {
+	/* Release compression streams */
+	zpq_free(conn->zstream);
+	conn->zstream = NULL;
+
 	/* Drop any SSL state */
 	pqsecure_close(conn);
 
@@ -2148,6 +2157,12 @@ connectDBComplete(PGconn *conn)
 				return 1;		/* success! */
 
 			case PGRES_POLLING_READING:
+			    /* if there is some buffered RX data in ZpqStream
+			     * then don't proceed to pqWaitTimed */
+			    if (zpq_buffered_rx(conn->zstream)) {
+			        break;
+			    }
+
 				ret = pqWaitTimed(1, 0, conn, finish_time);
 				if (ret == -1)
 				{
@@ -3216,11 +3231,79 @@ keep_going:						/* We will come back to here until there is
 				 */
 				conn->inCursor = conn->inStart;
 
-				/* Read type byte */
-				if (pqGetc(&beresp, conn))
+				while (1)
 				{
-					/* We'll come back when there is more data */
-					return PGRES_POLLING_READING;
+					/* Read type byte */
+					if (pqGetc(&beresp, conn))
+					{
+						/* We'll come back when there is more data */
+						return PGRES_POLLING_READING;
+					}
+
+					if (beresp == 'z') /* Switch on compression */
+					{
+						int index;
+						char resp;
+						/* Read message length word */
+						if (pqGetInt(&msgLength, 4, conn))
+						{
+							/* We'll come back when there is more data */
+							return PGRES_POLLING_READING;
+						}
+						if (msgLength != 5)
+						{
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "expected compression algorithm specification message length is 5 bytes, but %d is received\n"),
+											  msgLength);
+							goto error_return;
+						}
+						pqGetc(&resp, conn);
+						index = resp;
+						if (index == (char)-1)
+						{
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "server does not support requested compression algorithms %s\n"),
+											  conn->compression);
+							goto error_return;
+						}
+						/* Use unigned comparison to handle negative values */
+						if ((unsigned)index >= conn->n_compressors)
+						{
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "server returns incorrect compression aslogirhm  index: %d\n"),
+											  index);
+							goto error_return;
+						}
+						Assert(!conn->zstream);
+						conn->zstream = zpq_create(conn->compressors[index].impl,
+												   conn->compressors[index].level,
+                                                   conn->compressors[index].impl,
+												   (zpq_tx_func)pqsecure_write, (zpq_rx_func)pqsecure_read, conn,
+												   &conn->inBuffer[conn->inCursor], conn->inEnd-conn->inCursor);
+						if (!conn->zstream)
+						{
+							char** supported_algorithms = zpq_get_supported_algorithms();
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext(
+												  "failed to initialize compressor %s\n"),
+											  supported_algorithms[conn->compressors[index].impl]);
+							free(supported_algorithms);
+							goto error_return;
+						}
+						/* reset buffer */
+						conn->inStart = conn->inCursor = conn->inEnd = 0;
+					}
+					else if (conn->n_compressors != 0 && beresp == 'v') /* negotiate protocol version */
+					{
+						appendPQExpBuffer(&conn->errorMessage,
+										  libpq_gettext(
+											  "server is not supporting libpq compression\n"));
+						goto error_return;
+					} else
+						break;
 				}
 
 				/*
@@ -4020,6 +4103,8 @@ freePGconn(PGconn *conn)
 		free(conn->dbName);
 	if (conn->replication)
 		free(conn->replication);
+	if (conn->compression)
+		free(conn->compression);
 	if (conn->pguser)
 		free(conn->pguser);
 	if (conn->pgpass)
@@ -6532,6 +6617,22 @@ PQuser(const PGconn *conn)
 }
 
 char *
+PQcompressor(const PGconn *conn)
+{
+	if (!conn || !conn->zstream)
+		return NULL;
+	return (char*)zpq_compress_algorithm_name(conn->zstream);
+}
+
+char *
+PQdecompressor(const PGconn *conn)
+{
+	if (!conn || !conn->zstream)
+		return NULL;
+	return (char*)zpq_decompress_algorithm_name(conn->zstream);
+}
+
+char *
 PQpass(const PGconn *conn)
 {
 	char	   *password = NULL;
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index eea0237..00ccefb 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -1810,8 +1810,8 @@ PQgetResult(PGconn *conn)
 		 * EOF indication.  We expect therefore that this won't result in any
 		 * undue delay in reporting a previous write failure.)
 		 */
-		if (flushResult ||
-			pqWait(true, false, conn) ||
+		if (flushResult || (zpq_buffered_rx(conn->zstream) == 0 &&
+			pqWait(true, false, conn)) ||
 			pqReadData(conn) < 0)
 		{
 			/*
diff --git a/src/interfaces/libpq/fe-misc.c b/src/interfaces/libpq/fe-misc.c
index 4ffc7f3..c9bae65 100644
--- a/src/interfaces/libpq/fe-misc.c
+++ b/src/interfaces/libpq/fe-misc.c
@@ -53,6 +53,8 @@
 #include "pg_config_paths.h"
 #include "port/pg_bswap.h"
 
+#include  <common/zpq_stream.h>
+
 static int	pqPutMsgBytes(const void *buf, size_t len, PGconn *conn);
 static int	pqSendSome(PGconn *conn, int len);
 static int	pqSocketCheck(PGconn *conn, int forRead, int forWrite,
@@ -60,6 +62,16 @@ static int	pqSocketCheck(PGconn *conn, int forRead, int forWrite,
 static int	pqSocketPoll(int sock, int forRead, int forWrite, time_t end_time);
 
 /*
+ * Use zpq_read if compression is switched on
+ */
+#define pq_read_conn(conn)												\
+	(conn->zstream														\
+	 ? zpq_read(conn->zstream, conn->inBuffer + conn->inEnd,			\
+				conn->inBufSize - conn->inEnd)							\
+	 : pqsecure_read(conn, conn->inBuffer + conn->inEnd,				\
+					 conn->inBufSize - conn->inEnd))
+
+/*
  * PQlibVersion: return the libpq version number
  */
 int
@@ -664,10 +676,17 @@ pqReadData(PGconn *conn)
 
 	/* OK, try to read some data */
 retry3:
-	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+	nread = pq_read_conn(conn);
 	if (nread < 0)
 	{
+		if (nread == ZPQ_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+                              libpq_gettext("decompress error: %s\n"),
+                              zpq_decompress_error(conn->zstream));
+			return -1;
+		}
+
 		switch (SOCK_ERRNO)
 		{
 			case EINTR:
@@ -759,10 +778,18 @@ retry3:
 	 * arrived.
 	 */
 retry4:
-	nread = pqsecure_read(conn, conn->inBuffer + conn->inEnd,
-						  conn->inBufSize - conn->inEnd);
+	nread = pq_read_conn(conn);
+
 	if (nread < 0)
 	{
+		if (nread == ZPQ_DECOMPRESS_ERROR)
+		{
+			printfPQExpBuffer(&conn->errorMessage,
+                              libpq_gettext("decompress error: %s\n"),
+                              zpq_decompress_error(conn->zstream));
+			return -1;
+		}
+
 		switch (SOCK_ERRNO)
 		{
 			case EINTR:
@@ -875,12 +902,17 @@ pqSendSome(PGconn *conn, int len)
 	}
 
 	/* while there's still data to send */
-	while (len > 0)
+	while (len > 0 || zpq_buffered_tx(conn->zstream))
 	{
 		int			sent;
-
+		size_t      processed = 0;
+        /*
+		 * Use zpq_write if compression is switched on
+		 */
+		sent = conn->zstream
+			? zpq_write(conn->zstream, ptr, len, &processed)
 #ifndef WIN32
-		sent = pqsecure_write(conn, ptr, len);
+			: pqsecure_write(conn, ptr, len);
 #else
 
 		/*
@@ -888,8 +920,11 @@ pqSendSome(PGconn *conn, int len)
 		 * failure-point appears to be different in different versions of
 		 * Windows, but 64k should always be safe.
 		 */
-		sent = pqsecure_write(conn, ptr, Min(len, 65536));
+			: pqsecure_write(conn, ptr, Min(len, 65536));
 #endif
+		ptr += processed;
+		len -= processed;
+		remaining -= processed;
 
 		if (sent < 0)
 		{
@@ -943,7 +978,7 @@ pqSendSome(PGconn *conn, int len)
 			remaining -= sent;
 		}
 
-		if (len > 0)
+		if (len > 0 || sent < 0 || zpq_buffered_tx(conn->zstream))
 		{
 			/*
 			 * We didn't send it all, wait till we can send more.
@@ -1034,6 +1069,8 @@ pqFlush(PGconn *conn)
 int
 pqWait(int forRead, int forWrite, PGconn *conn)
 {
+	if (forRead && conn->inCursor < conn->inEnd)
+		return 0;
 	return pqWaitTimed(forRead, forWrite, conn, (time_t) -1);
 }
 
diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c
index 1696525..1b76700 100644
--- a/src/interfaces/libpq/fe-protocol3.c
+++ b/src/interfaces/libpq/fe-protocol3.c
@@ -1679,7 +1679,7 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
 			if (async)
 				return 0;
 			/* Need to load more data */
-			if (pqWait(true, false, conn) ||
+			if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
 				pqReadData(conn) < 0)
 				return -2;
 			continue;
@@ -1737,7 +1737,7 @@ pqGetline3(PGconn *conn, char *s, int maxlen)
 	while ((status = PQgetlineAsync(conn, s, maxlen - 1)) == 0)
 	{
 		/* need to load more data */
-		if (pqWait(true, false, conn) ||
+		if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
 			pqReadData(conn) < 0)
 		{
 			*s = '\0';
@@ -1975,7 +1975,7 @@ pqFunctionCall3(PGconn *conn, Oid fnid,
 		if (needInput)
 		{
 			/* Wait for some data to arrive (or for the channel to close) */
-			if (pqWait(true, false, conn) ||
+			if ((zpq_buffered_rx(conn->zstream) == 0 && pqWait(true, false, conn)) ||
 				pqReadData(conn) < 0)
 				break;
 		}
@@ -2135,6 +2135,163 @@ pqBuildStartupPacket3(PGconn *conn, int *packetlen,
 }
 
 /*
+ * Build comma-separated list of compression algorithms requested by client.
+ * It can be either explicitly specified by user in connection string, or
+ * include all algorithms supported by client library.
+ * This function returns true if the compression string is successfully parsed and
+ * stores a comma-separated list of algorithms in *client_compressors.
+ * If compression is disabled, then NULL is assigned to *client_compressors.
+ * Also it creates an array of compressor descriptors, each element of which corresponds to
+ * the corresponding algorithm name in *client_compressors list. This array is stored in PGconn
+ * and is used during handshake when a compression acknowledgment response is received from the server.
+ */
+static bool
+build_compressors_list(PGconn *conn, char** client_compressors, bool build_descriptors)
+{
+	char** supported_algorithms = zpq_get_supported_algorithms();
+	char* value = conn->compression;
+	int n_supported_algorithms;
+	int total_len = 0;
+	int i;
+
+	for (n_supported_algorithms = 0; supported_algorithms[n_supported_algorithms] != NULL; n_supported_algorithms++)
+	{
+		total_len += strlen(supported_algorithms[n_supported_algorithms])+1;
+	}
+
+	if (pg_strcasecmp(value, "true") == 0 ||
+		pg_strcasecmp(value, "yes") == 0 ||
+		pg_strcasecmp(value, "on") == 0 ||
+		pg_strcasecmp(value, "any") == 0 ||
+		pg_strcasecmp(value, "1") == 0)
+	{
+		/* Compression is enabled: choose algorithm automatically */
+		char* p;
+
+		if (n_supported_algorithms == 0)
+		{
+			*client_compressors = NULL; /* no compressors are available */
+			conn->compressors = NULL;
+			conn->n_compressors = 0;
+			return true;
+		}
+		*client_compressors = p = malloc(total_len);
+		if (build_descriptors)
+			conn->compressors = malloc(n_supported_algorithms*sizeof(pg_conn_compressor));
+		for (i = 0; i < n_supported_algorithms; i++)
+		{
+			strcpy(p, supported_algorithms[i]);
+			p += strlen(p);
+			*p++ = ',';
+			if (build_descriptors)
+			{
+				conn->compressors[i].impl = i;
+				conn->compressors[i].level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+			}
+		}
+		p[-1] = '\0';
+		return true;
+	}
+	else if (*value == 0 ||
+			 pg_strcasecmp(value, "false") == 0 ||
+			 pg_strcasecmp(value, "no") == 0 ||
+			 pg_strcasecmp(value, "off") == 0 ||
+			 pg_strcasecmp(value, "0") == 0)
+	{
+		/* Compression is disabled */
+		*client_compressors = NULL;
+		conn->compressors = NULL;
+		conn->n_compressors = 0;
+		return true;
+	}
+	else
+	{
+		/* List of compression algorithms separated by commas */
+		char *src, *dst;
+		int n_suggested_algorithms = 0;
+		char* suggested_algorithms = strdup(value);
+		src = suggested_algorithms;
+		*client_compressors = dst = strdup(value);
+
+		if (build_descriptors)
+			conn->compressors = malloc(n_supported_algorithms*sizeof(pg_conn_compressor));
+
+		while (*src != '\0')
+		{
+			char* sep = strchr(src, ',');
+			char* col;
+			int compression_level = ZPQ_DEFAULT_COMPRESSION_LEVEL;
+			bool found;
+
+			if (sep != NULL)
+				*sep = '\0';
+
+			strcpy(dst, src);
+
+			col = strchr(src, ':');
+			if (col != NULL)
+			{
+				*col = '\0';
+				if (sscanf(col+1, "%d", &compression_level) != 1 && !build_descriptors)
+				{
+					fprintf(stderr,
+							libpq_gettext("WARNING: invalid compression level %s in compression option '%s'\n"),
+							col+1, value);
+					return false;
+				}
+			}
+			found = false;
+			for (i = 0; supported_algorithms[i] != NULL; i++)
+			{
+				if (pg_strcasecmp(src, supported_algorithms[i]) == 0)
+				{
+					if (build_descriptors)
+					{
+						conn->compressors[n_suggested_algorithms].impl = i;
+						conn->compressors[n_suggested_algorithms].level = compression_level;
+					}
+					n_suggested_algorithms += 1;
+					dst += strlen(dst);
+					*dst++ = ',';
+					found = true;
+					break;
+				}
+			}
+			if (!found)
+			{
+				fprintf(stderr,
+						libpq_gettext("WARNING: algorithm %s is not supported by client\n"),
+						src);
+			}
+			if (sep)
+				src = sep+1;
+			else
+				break;
+		}
+		free(suggested_algorithms);
+		conn->n_compressors = n_suggested_algorithms;
+		if (n_suggested_algorithms == 0)
+		{
+			if (!build_descriptors)
+				fprintf(stderr,
+						libpq_gettext("WARNING: none of the specified algorithms are supported by client: %s\n"),
+						value);
+			else
+			{
+				free(conn->compressors);
+				conn->compressors = NULL;
+				conn->n_compressors = 0;
+			}
+			free(*client_compressors);
+			*client_compressors = NULL;
+			return false;
+		}
+		dst[-1] = '\0';
+		return true;
+	}
+}
+
+/*
  * Build a startup packet given a filled-in PGconn structure.
  *
  * We need to figure out how much space is needed, then fill it in.
@@ -2180,6 +2337,17 @@ build_startup_packet(const PGconn *conn, char *packet,
 		ADD_STARTUP_OPTION("replication", conn->replication);
 	if (conn->pgoptions && conn->pgoptions[0])
 		ADD_STARTUP_OPTION("options", conn->pgoptions);
+	if (conn->compression && conn->compression[0])
+	{
+		char* client_compression_algorithms;
+		if (build_compressors_list((PGconn*)conn, &client_compression_algorithms, packet == NULL))
+		{
+			if (client_compression_algorithms)
+			{
+				ADD_STARTUP_OPTION("_pq_.compression", client_compression_algorithms);
+			}
+		}
+	}
 	if (conn->send_appname)
 	{
 		/* Use appname if present, otherwise use fallback */
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 3b6a9fb..76c2c6e 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -319,6 +319,8 @@ extern char *PQhostaddr(const PGconn *conn);
 extern char *PQport(const PGconn *conn);
 extern char *PQtty(const PGconn *conn);
 extern char *PQoptions(const PGconn *conn);
+extern char *PQcompressor(const PGconn *conn);
+extern char *PQdecompressor(const PGconn *conn);
 extern ConnStatusType PQstatus(const PGconn *conn);
 extern PGTransactionStatusType PQtransactionStatus(const PGconn *conn);
 extern const char *PQparameterStatus(const PGconn *conn,
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index 1de91ae..7406857 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -40,6 +40,7 @@
 /* include stuff common to fe and be */
 #include "getaddrinfo.h"
 #include "libpq/pqcomm.h"
+#include "common/zpq_stream.h"
 /* include stuff found in fe only */
 #include "pqexpbuffer.h"
 
@@ -317,6 +318,16 @@ typedef struct pg_conn_host
 								 * found in password file. */
 } pg_conn_host;
 
+
+/*
+ * Descriptors of compression algorithms chosen by client
+ */
+typedef struct pg_conn_compressor
+{
+	int			impl;			/* compression implementation index */
+	int			level;			/* compression level */
+}			pg_conn_compressor;
+
 /*
  * PGconn stores all the state data associated with a single connection
  * to a backend.
@@ -370,6 +381,13 @@ struct pg_conn
 	char	   *ssl_min_protocol_version;	/* minimum TLS protocol version */
 	char	   *ssl_max_protocol_version;	/* maximum TLS protocol version */
 
+	char	   *compression;	/* stream compression (boolean value, "any" or
+								 * list of compression algorithms separated by
+								 * comma) */
+	pg_conn_compressor *compressors;	/* descriptors of compression
+										 * algorithms chosen by client */
+	unsigned			n_compressors;  /* size of compressors array  */
+
 	/* Type of connection to make.  Possible values: any, read-write. */
 	char	   *target_session_attrs;
 
@@ -527,6 +545,9 @@ struct pg_conn
 
 	/* Buffer for receiving various parts of messages */
 	PQExpBufferData workBuffer; /* expansible string */
+
+	/* Compression stream */
+	ZpqStream  *zstream;
 };
 
 /* PGcancel stores all data necessary to cancel a connection. A copy of this
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 097ff5d..aa9359c 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1762,7 +1762,7 @@ pg_stat_activity| SELECT s.datid,
     s.backend_xmin,
     s.query,
     s.backend_type
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_all_indexes| SELECT c.oid AS relid,
@@ -1867,8 +1867,14 @@ pg_stat_gssapi| SELECT s.pid,
     s.gss_auth AS gss_authenticated,
     s.gss_princ AS principal,
     s.gss_enc AS encrypted
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
   WHERE (s.client_port IS NOT NULL);
+pg_stat_network_traffic| SELECT s.pid,
+    s.rx_raw_bytes,
+    s.tx_raw_bytes,
+    s.rx_compressed_bytes,
+    s.tx_compressed_bytes
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes);
 pg_stat_progress_analyze| SELECT s.pid,
     s.datid,
     d.datname,
@@ -2015,7 +2021,7 @@ pg_stat_replication| SELECT s.pid,
     w.sync_priority,
     w.sync_state,
     w.reply_time
-   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
      JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
      LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
 pg_stat_replication_slots| SELECT s.slot_name,
@@ -2046,7 +2052,7 @@ pg_stat_ssl| SELECT s.pid,
     s.ssl_client_dn AS client_dn,
     s.ssl_client_serial AS client_serial,
     s.ssl_issuer_dn AS issuer_dn
-   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
+   FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, rx_raw_bytes, tx_raw_bytes, rx_compressed_bytes, tx_compressed_bytes)
   WHERE (s.client_port IS NOT NULL);
 pg_stat_subscription| SELECT su.oid AS subid,
     su.subname,
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index 90594bd..41722a6 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -125,7 +125,7 @@ sub mkvcbuild
 	  keywords.c kwlookup.c link-canary.c md5.c
 	  pg_get_line.c pg_lzcompress.c pgfnames.c psprintf.c relpath.c rmtree.c
 	  saslprep.c scram-common.c string.c stringinfo.c unicode_norm.c username.c
-	  wait_error.c wchar.c);
+	  wait_error.c wchar.c zpq_stream.c);
 
 	if ($solution->{options}->{openssl})
 	{

Reply via email to